nixfleet_agent/runtime/mod.rs
1//! Agent runtime: MPSC reducer loop + applier + workers (RFC-0006 §7.1).
2//!
3//! Symmetric to the CP-side runtime in
4//! [`nixfleet_control_plane::runtime`]. Workers (probe, activation,
5//! longpoll, heartbeat, advance_ticker) feed a single MPSC channel; the
6//! reducer task is the sole `nixfleet_state_machine::step` caller; the
7//! applier executes `Local*` effects (FireSwitch, FireRollbackTo,
8//! ResetProbeCache, EmitEvent).
9//!
10//! ```text
11//! probe ─────────┐
12//! activation ────┤
13//! longpoll ──────┼──▶ mpsc::Sender<ReducerInput> ───▶ reducer task ───▶ applier
14//! heartbeat ─────┤ │
15//! advance_ticker ┘ ▼
16//! (step)
17//! ```
18//!
19//! Invariants (mirror CP's `runtime::mod` — RFC-0006 §2 principle 4):
20//!
21//! 1. **One MPSC, one mutator.** The reducer task is the only thing that
22//! calls [`nixfleet_state_machine::step`]. Workers emit
23//! [`ReducerInput`] values; the applier executes the produced effects.
24//! A second writer "for performance" is the defect class the v0.2 fold
25//! folded away — don't reintroduce it.
26//!
27//! 2. **Agent runs the SAME `step()` as the CP mirror.** No "leaner"
28//! agent-side reimplementation. Identical transitions across the
29//! `Local*` / `Remote*` event pairs by construction.
30//!
31//! 3. **Shutdown via oneshot drop.** The reducer task owns a
32//! `Vec<oneshot::Sender<()>>`; each worker holds the matching
33//! [`ShutdownToken`] wrapping a `Receiver`. Reducer exit drops every
34//! sender; every worker's `select!` shutdown arm fires; workers exit
35//! cleanly.
36//!
37//! 4. **Bounded channels with sizing rationale.** Capacities are
38//! documented inline. No `unbounded`.
39//!
40//! 5. **No `chrono::Utc::now()` in the runtime.** Use the `ClockHandle`
41//! abstraction so tests can advance time deterministically and the
42//! runtime can't drift from any caller's notion of "now".
43
44use std::sync::Arc;
45
46use nixfleet_proto::clock::ClockHandle;
47use nixfleet_state_machine::Event;
48use tokio::sync::{mpsc, oneshot};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51
52pub mod applier;
53pub mod outbound_queue;
54pub mod recovery;
55mod reducer;
56pub mod wire;
57pub mod workers;
58
59pub use outbound_queue::{OutboundQueue, QueuedEvent};
60pub use recovery::{RecoveryOutcome, handshake as boot_recovery_handshake};
61
62/// Reducer input channel depth.
63///
64/// Sizing rationale: a single agent's peak burst during one rollout
65/// cycle is roughly:
66/// - 1 `LocalActivate` (from longpoll)
67/// - 1 `LocalActivationStarted` + 1 `LocalActivationCompleted`
68/// - per declared probe (~1–10): `LocalProbeObservedFirst` then
69/// `LocalProbeResult` every interval until soak elapses
70/// - 1 `LocalConvergedReached`
71/// 32 covers the spike comfortably; 64 doubles for slack. The CP side
72/// uses 1024 because one CP fans 256 hosts; one agent only fans
73/// itself.
74const REDUCER_INPUT_CAPACITY: usize = 64;
75
76/// Reducer-task inputs.
77///
78/// - [`ReducerInput::HostEvent`] — workers synthesised a `Local*` event;
79/// reducer calls `step()` on the agent's `HostRolloutState` for the
80/// given `rollout_id`, then runs the applier on the produced effects.
81/// - [`ReducerInput::AgentAdvanceTick`] — periodic timer (mirror of
82/// CP's `PlanTick`). Drives sustained-failure detection: when a
83/// Soaking host's `probe_failure_first_at` exceeds the rollout's
84/// policy threshold the reducer synthesises
85/// `LocalSustainedFailureCrossed` and feeds it through `step()`.
86pub enum ReducerInput {
87 HostEvent {
88 rollout_id: nixfleet_proto::RolloutId,
89 event: Event,
90 },
91 AgentAdvanceTick,
92 /// Refresh the reducer's cached `SignedManifestSet`. Emitted by
93 /// the longpoll worker (after `verify_rollout_manifest` succeeds)
94 /// + by the boot-recovery handshake (7f). The reducer needs a
95 /// cached manifest to look up `RolloutPolicy` for `step()` and to
96 /// resolve `channel`/`target_closure` when bootstrapping
97 /// `LocalActivate`.
98 ManifestSetUpdated(Box<nixfleet_reconciler::planner_types::SignedManifestSet>),
99 /// LIFT #3: rehydrate the reducer's in-memory HostRolloutState for
100 /// an active rollout from a CP-supplied snapshot. Emitted only by
101 /// `runtime::spawn` after the boot-recovery handshake returns
102 /// snapshots, before any worker spawns. Directly assigns the
103 /// state-machine cache from the snapshot fields, bypassing the
104 /// transition machinery — the canonical source of truth lives on
105 /// CP, the agent is just rebuilding its local view post-restart.
106 BootstrapHost(Box<nixfleet_proto::agent_wire::HostRolloutSnapshot>),
107}
108
109/// Shutdown signal handed to a worker at spawn time. Reducer-task exit
110/// drops the matching `Sender<()>` and the receiver resolves with
111/// `Err(RecvError)`; the worker's `select!` shutdown arm fires and the
112/// task exits cleanly.
113pub struct ShutdownToken(oneshot::Receiver<()>);
114
115impl ShutdownToken {
116 /// Unwrap to the inner `Receiver<()>` for direct use in `select!`.
117 pub fn into_inner(self) -> oneshot::Receiver<()> {
118 self.0
119 }
120
121 /// Test-only constructor. Integration tests spawn a single worker
122 /// in isolation (e.g., `integration_mtls.rs::longpoll_worker_*`) and
123 /// own their shutdown channel directly; production code goes
124 /// through [`runtime::spawn`] which constructs the token internally.
125 ///
126 /// Available only when the crate is compiled with `#[cfg(test)]`
127 /// (the crate's own lib tests) or with the `test-helpers` feature
128 /// flag (integration tests via the test target's
129 /// `required-features`). Matches the
130 /// `nixfleet_reconciler::Verified::unverified_for_tests` escape-
131 /// hatch convention so production builds cannot construct a fake
132 /// `ShutdownToken`.
133 #[cfg(any(test, feature = "test-helpers"))]
134 pub fn __test_only_from_rx(rx: oneshot::Receiver<()>) -> Self {
135 Self(rx)
136 }
137}
138
139/// Handle returned by [`spawn`].
140pub struct RuntimeHandle {
141 pub input_tx: mpsc::Sender<ReducerInput>,
142 pub reducer_handle: JoinHandle<()>,
143 pub worker_handles: Vec<JoinHandle<()>>,
144}
145
146impl RuntimeHandle {
147 pub fn into_join_handles(self) -> Vec<JoinHandle<()>> {
148 let mut handles = self.worker_handles;
149 handles.push(self.reducer_handle);
150 handles
151 }
152}
153
154/// Static runtime configuration. Cheap to clone; the workers each get a
155/// reference. Built once at startup from CLI args.
156///
157/// `ca_cert` / `client_cert` / `client_key` carry the same paths the CLI
158/// receives (`Args::ca_cert` etc.) and the first-boot enrollment path
159/// already uses (`main.rs::maybe_run_first_boot_enrollment`). Workers
160/// pass them straight through to `crate::comms::build_client` so every
161/// post-enroll `/v1/agent/*` request rides the agent's enrollment-cert
162/// mTLS identity. DEFECT-003 regression-guard: any worker building a
163/// bare `reqwest::Client` (no client cert) would talk to CP
164/// unauthenticated; `integration_mtls.rs` pins the property end-to-end.
165#[derive(Clone)]
166pub struct AgentConfig {
167 pub control_plane_url: String,
168 pub machine_id: String,
169 pub state_dir: std::path::PathBuf,
170 /// LOADBEARING: same `--trust-file` path the binary validated at
171 /// startup. Workers that fetch + verify signed manifests
172 /// (`manifest_poll`, `longpoll`) MUST read this — the startup check
173 /// alone doesn't propagate into runtime trust loading.
174 pub trust_file: std::path::PathBuf,
175 /// Acceptance window for `meta.signedAt` on fleet.resolved.json +
176 /// per-rollout manifest verification (RFC-0010 §1.5 replay
177 /// defense). Production: 3600s — matches the channel-refs poll
178 /// cadence so any signed artifact older than one refresh cycle is
179 /// rejected. Test harnesses with fixed-`signedAt` fixtures override
180 /// to a much larger value (e.g. 1 year) since their fixture bytes
181 /// are deterministic and committed; bumping the window is the
182 /// alternative to regenerating signatures on every test run.
183 pub manifest_freshness_window_secs: u64,
184 pub ca_cert: Option<std::path::PathBuf>,
185 pub client_cert: Option<std::path::PathBuf>,
186 pub client_key: Option<std::path::PathBuf>,
187 /// Path to the NixOS `current-system` symlink. The agent reads it
188 /// to populate `current_closure` on every steady-state heartbeat
189 /// (LIFT #5) so CP can rebuild `host_rollout_records` from agent
190 /// inputs alone after a wipe-and-restart, per the architectural
191 /// promise in `docs/design/architecture.md` §305. Production:
192 /// `/run/current-system`. Test fixtures point at a non-existent
193 /// path; `read_current_closure` returns `None` and the heartbeat
194 /// behaves as it did pre-LIFT-#5.
195 pub current_system_path: std::path::PathBuf,
196 /// Path to the host's SSH ed25519 key. The agent uses it as the
197 /// CSR signing key on `enroll` and on cert `renew` — the renewed
198 /// cert's pubkey is the host's SSH host pubkey, same as at
199 /// first-boot enrollment (RFC-0003 §2). Production:
200 /// `/etc/ssh/ssh_host_ed25519_key`.
201 pub ssh_host_key_file: std::path::PathBuf,
202 /// Fraction of mTLS cert validity remaining below which the
203 /// `cert_renewal` worker calls `enrollment::renew`. `None` →
204 /// disable the renewal worker (the cert stays in place until
205 /// expiry). Operator MAY raise (e.g. 0.8) for short-cycle
206 /// hardware testing of renewal flows. Strictly between 0 and 1
207 /// when set; the worker refuses to start otherwise.
208 pub renewal_threshold_fraction: Option<f64>,
209}
210
211/// Spawn the reducer + worker constellation.
212///
213/// `clock` and `cfg` are the only external dependencies. Workers receive
214/// what they need by parameter (no `state` god-object yet — the agent's
215/// reducer holds its own state and exposes operations via the input
216/// MPSC).
217/// Sender end of the applier → activation worker intent channel.
218pub type ActivationIntentTx = mpsc::Sender<wire::ActivationIntent>;
219/// Sender end of the applier → probe worker reset channel.
220pub type ProbeResetTx = mpsc::Sender<wire::ProbeResetCommand>;
221/// Watch channel the applier hits after enqueueing a fresh outbound
222/// event; wakes the outbound drainer worker immediately rather than
223/// waiting for its next periodic tick.
224pub type OutboundKickTx = tokio::sync::watch::Sender<()>;
225
226/// Context handed to `applier::apply_effect`. Bundles the channels +
227/// reducer input sender the applier needs to dispatch a `Local*` effect
228/// without inlining platform-specific code or HTTP. Mirrors the CP-side
229/// `ApplierCtx`.
230pub struct ApplierCtx<'a> {
231 pub cfg: &'a AgentConfig,
232 pub clock: &'a ClockHandle,
233 pub input_tx: &'a mpsc::Sender<ReducerInput>,
234 pub activation_tx: &'a ActivationIntentTx,
235 pub probe_reset_tx: &'a ProbeResetTx,
236 pub outbound_queue: &'a Arc<OutboundQueue>,
237 pub outbound_kick: &'a OutboundKickTx,
238}
239
240pub fn spawn(cancel: CancellationToken, cfg: AgentConfig, clock: ClockHandle) -> RuntimeHandle {
241 let (input_tx, input_rx) = mpsc::channel::<ReducerInput>(REDUCER_INPUT_CAPACITY);
242 // Applier → activation worker. Small bounded queue: the applier
243 // emits at most one ActivationIntent per LocalFireSwitch /
244 // LocalFireRollbackTo, and the agent can have at most one in-flight
245 // switch at a time. Cap 4 covers retry bursts without unbounded
246 // growth.
247 let (activation_tx, activation_rx) = mpsc::channel::<wire::ActivationIntent>(4);
248 // Applier → probe worker reset channel. Same shape; reset is a
249 // single signal per `LocalActivationCompleted`.
250 let (probe_reset_tx, probe_reset_rx) = mpsc::channel::<wire::ProbeResetCommand>(4);
251 // Applier → outbound drainer kick. Watch semantics: a burst of
252 // enqueues collapses to one wake, the drainer's own ticker stays
253 // as a safety net if the watch is ever missed.
254 let (outbound_kick_tx, outbound_kick_rx) = tokio::sync::watch::channel(());
255
256 // Open / create the durable outbound queue on disk. Per Plan 07,
257 // every outbound event must hit disk before the network POST so
258 // an agent crash between local state change + CP receipt is
259 // recoverable on restart. Failure to open the queue is fatal at
260 // startup; we'd otherwise lose events silently.
261 let outbound_queue = Arc::new(OutboundQueue::open(&cfg.state_dir).unwrap_or_else(|err| {
262 panic!(
263 "open durable outbound queue under {}: {err:#}",
264 cfg.state_dir.display(),
265 )
266 }));
267
268 // One oneshot per worker. Reducer task takes ownership of the
269 // senders; on reducer exit every sender drops, every worker's
270 // `select!` shutdown arm fires. Eight workers: probe, activation,
271 // longpoll, heartbeat, advance_ticker, outbound, manifest_poll,
272 // cert_renewal.
273 const SHUTDOWN_TOKEN_COUNT: usize = 8;
274 let mut shutdown_senders: Vec<oneshot::Sender<()>> = Vec::new();
275 let mut shutdown_tokens: Vec<ShutdownToken> = Vec::new();
276 for _ in 0..SHUTDOWN_TOKEN_COUNT {
277 let (tx, rx) = oneshot::channel::<()>();
278 shutdown_senders.push(tx);
279 shutdown_tokens.push(ShutdownToken(rx));
280 }
281 // Pop in reverse so each consumer gets a distinct token.
282 let cert_renewal_shutdown = shutdown_tokens
283 .pop()
284 .expect("SHUTDOWN_TOKEN_COUNT allocated");
285 let manifest_poll_shutdown = shutdown_tokens
286 .pop()
287 .expect("SHUTDOWN_TOKEN_COUNT allocated");
288 let outbound_shutdown = shutdown_tokens
289 .pop()
290 .expect("SHUTDOWN_TOKEN_COUNT allocated");
291 let advance_shutdown = shutdown_tokens
292 .pop()
293 .expect("SHUTDOWN_TOKEN_COUNT allocated");
294 let heartbeat_shutdown = shutdown_tokens
295 .pop()
296 .expect("SHUTDOWN_TOKEN_COUNT allocated");
297 let longpoll_shutdown = shutdown_tokens
298 .pop()
299 .expect("SHUTDOWN_TOKEN_COUNT allocated");
300 let activation_shutdown = shutdown_tokens
301 .pop()
302 .expect("SHUTDOWN_TOKEN_COUNT allocated");
303 let probe_shutdown = shutdown_tokens
304 .pop()
305 .expect("SHUTDOWN_TOKEN_COUNT allocated");
306
307 let worker_handles = vec![
308 workers::probe::spawn(
309 cfg.clone(),
310 clock.clone(),
311 input_tx.clone(),
312 probe_reset_rx,
313 probe_shutdown,
314 ),
315 workers::activation::spawn(
316 cfg.clone(),
317 clock.clone(),
318 input_tx.clone(),
319 activation_rx,
320 activation_shutdown,
321 ),
322 workers::longpoll::spawn(
323 cfg.clone(),
324 clock.clone(),
325 input_tx.clone(),
326 longpoll_shutdown,
327 ),
328 workers::heartbeat::spawn(
329 cfg.clone(),
330 clock.clone(),
331 input_tx.clone(),
332 heartbeat_shutdown,
333 ),
334 workers::advance_ticker::spawn(clock.clone(), input_tx.clone(), advance_shutdown),
335 workers::outbound::spawn(
336 cfg.clone(),
337 outbound_queue.clone(),
338 outbound_kick_rx,
339 input_tx.clone(),
340 outbound_shutdown,
341 ),
342 workers::manifest_poll::spawn(
343 cfg.clone(),
344 clock.clone(),
345 input_tx.clone(),
346 manifest_poll_shutdown,
347 ),
348 workers::cert_renewal::spawn(
349 cfg.clone(),
350 clock.clone(),
351 cert_renewal_shutdown,
352 ),
353 ];
354
355 let reducer_handle = tokio::spawn(reducer::run(
356 cancel,
357 cfg,
358 clock,
359 input_rx,
360 input_tx.clone(),
361 activation_tx,
362 probe_reset_tx,
363 outbound_queue,
364 outbound_kick_tx,
365 shutdown_senders,
366 ));
367
368 RuntimeHandle {
369 input_tx,
370 reducer_handle,
371 worker_handles,
372 }
373}
374
375/// Used internally by `spawn` to release worker shutdown receivers when
376/// the reducer task exits. Held in scope by the reducer body so dropping
377/// it signals all workers in one go.
378pub(crate) struct ShutdownGuard(#[allow(dead_code)] pub Vec<oneshot::Sender<()>>);
379
380/// Convenience: cloneable Arc<AgentConfig> for tests / external callers.
381pub type AgentConfigHandle = Arc<AgentConfig>;