nixfleet_agent/runtime/
mod.rs

1//! Agent runtime: MPSC reducer loop + applier + workers (RFC-0006 §7.1).
2//!
3//! Symmetric to the CP-side runtime in
4//! [`nixfleet_control_plane::runtime`]. Workers (probe, activation,
5//! longpoll, heartbeat, advance_ticker) feed a single MPSC channel; the
6//! reducer task is the sole `nixfleet_state_machine::step` caller; the
7//! applier executes `Local*` effects (FireSwitch, FireRollbackTo,
8//! ResetProbeCache, EmitEvent).
9//!
10//! ```text
11//!   probe ─────────┐
12//!   activation ────┤
13//!   longpoll ──────┼──▶  mpsc::Sender<ReducerInput>  ───▶  reducer task ───▶ applier
14//!   heartbeat ─────┤                                          │
15//!   advance_ticker ┘                                          ▼
16//!                                                           (step)
17//! ```
18//!
19//! Invariants (mirror CP's `runtime::mod` — RFC-0006 §2 principle 4):
20//!
21//! 1. **One MPSC, one mutator.** The reducer task is the only thing that
22//!    calls [`nixfleet_state_machine::step`]. Workers emit
23//!    [`ReducerInput`] values; the applier executes the produced effects.
24//!    A second writer "for performance" is the defect class the v0.2 fold
25//!    folded away — don't reintroduce it.
26//!
27//! 2. **Agent runs the SAME `step()` as the CP mirror.** No "leaner"
28//!    agent-side reimplementation. Identical transitions across the
29//!    `Local*` / `Remote*` event pairs by construction.
30//!
31//! 3. **Shutdown via oneshot drop.** The reducer task owns a
32//!    `Vec<oneshot::Sender<()>>`; each worker holds the matching
33//!    [`ShutdownToken`] wrapping a `Receiver`. Reducer exit drops every
34//!    sender; every worker's `select!` shutdown arm fires; workers exit
35//!    cleanly.
36//!
37//! 4. **Bounded channels with sizing rationale.** Capacities are
38//!    documented inline. No `unbounded`.
39//!
40//! 5. **No `chrono::Utc::now()` in the runtime.** Use the `ClockHandle`
41//!    abstraction so tests can advance time deterministically and the
42//!    runtime can't drift from any caller's notion of "now".
43
44use std::sync::Arc;
45
46use nixfleet_proto::clock::ClockHandle;
47use nixfleet_state_machine::Event;
48use tokio::sync::{mpsc, oneshot};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51
52pub mod applier;
53pub mod outbound_queue;
54pub mod recovery;
55mod reducer;
56pub mod wire;
57pub mod workers;
58
59pub use outbound_queue::{OutboundQueue, QueuedEvent};
60pub use recovery::{RecoveryOutcome, handshake as boot_recovery_handshake};
61
62/// Reducer input channel depth.
63///
64/// Sizing rationale: a single agent's peak burst during one rollout
65/// cycle is roughly:
66///   - 1 `LocalActivate` (from longpoll)
67///   - 1 `LocalActivationStarted` + 1 `LocalActivationCompleted`
68///   - per declared probe (~1–10): `LocalProbeObservedFirst` then
69///     `LocalProbeResult` every interval until soak elapses
70///   - 1 `LocalConvergedReached`
71/// 32 covers the spike comfortably; 64 doubles for slack. The CP side
72/// uses 1024 because one CP fans 256 hosts; one agent only fans
73/// itself.
74const REDUCER_INPUT_CAPACITY: usize = 64;
75
76/// Reducer-task inputs.
77///
78/// - [`ReducerInput::HostEvent`] — workers synthesised a `Local*` event;
79///   reducer calls `step()` on the agent's `HostRolloutState` for the
80///   given `rollout_id`, then runs the applier on the produced effects.
81/// - [`ReducerInput::AgentAdvanceTick`] — periodic timer (mirror of
82///   CP's `PlanTick`). Drives sustained-failure detection: when a
83///   Soaking host's `probe_failure_first_at` exceeds the rollout's
84///   policy threshold the reducer synthesises
85///   `LocalSustainedFailureCrossed` and feeds it through `step()`.
86pub enum ReducerInput {
87    HostEvent {
88        rollout_id: nixfleet_proto::RolloutId,
89        event: Event,
90    },
91    AgentAdvanceTick,
92    /// Refresh the reducer's cached `SignedManifestSet`. Emitted by
93    /// the longpoll worker (after `verify_rollout_manifest` succeeds)
94    /// + by the boot-recovery handshake (7f). The reducer needs a
95    /// cached manifest to look up `RolloutPolicy` for `step()` and to
96    /// resolve `channel`/`target_closure` when bootstrapping
97    /// `LocalActivate`.
98    ManifestSetUpdated(Box<nixfleet_reconciler::planner_types::SignedManifestSet>),
99    /// LIFT #3: rehydrate the reducer's in-memory HostRolloutState for
100    /// an active rollout from a CP-supplied snapshot. Emitted only by
101    /// `runtime::spawn` after the boot-recovery handshake returns
102    /// snapshots, before any worker spawns. Directly assigns the
103    /// state-machine cache from the snapshot fields, bypassing the
104    /// transition machinery — the canonical source of truth lives on
105    /// CP, the agent is just rebuilding its local view post-restart.
106    BootstrapHost(Box<nixfleet_proto::agent_wire::HostRolloutSnapshot>),
107}
108
109/// Shutdown signal handed to a worker at spawn time. Reducer-task exit
110/// drops the matching `Sender<()>` and the receiver resolves with
111/// `Err(RecvError)`; the worker's `select!` shutdown arm fires and the
112/// task exits cleanly.
113pub struct ShutdownToken(oneshot::Receiver<()>);
114
115impl ShutdownToken {
116    /// Unwrap to the inner `Receiver<()>` for direct use in `select!`.
117    pub fn into_inner(self) -> oneshot::Receiver<()> {
118        self.0
119    }
120
121    /// Test-only constructor. Integration tests spawn a single worker
122    /// in isolation (e.g., `integration_mtls.rs::longpoll_worker_*`) and
123    /// own their shutdown channel directly; production code goes
124    /// through [`runtime::spawn`] which constructs the token internally.
125    ///
126    /// Available only when the crate is compiled with `#[cfg(test)]`
127    /// (the crate's own lib tests) or with the `test-helpers` feature
128    /// flag (integration tests via the test target's
129    /// `required-features`). Matches the
130    /// `nixfleet_reconciler::Verified::unverified_for_tests` escape-
131    /// hatch convention so production builds cannot construct a fake
132    /// `ShutdownToken`.
133    #[cfg(any(test, feature = "test-helpers"))]
134    pub fn __test_only_from_rx(rx: oneshot::Receiver<()>) -> Self {
135        Self(rx)
136    }
137}
138
139/// Handle returned by [`spawn`].
140pub struct RuntimeHandle {
141    pub input_tx: mpsc::Sender<ReducerInput>,
142    pub reducer_handle: JoinHandle<()>,
143    pub worker_handles: Vec<JoinHandle<()>>,
144}
145
146impl RuntimeHandle {
147    pub fn into_join_handles(self) -> Vec<JoinHandle<()>> {
148        let mut handles = self.worker_handles;
149        handles.push(self.reducer_handle);
150        handles
151    }
152}
153
154/// Static runtime configuration. Cheap to clone; the workers each get a
155/// reference. Built once at startup from CLI args.
156///
157/// `ca_cert` / `client_cert` / `client_key` carry the same paths the CLI
158/// receives (`Args::ca_cert` etc.) and the first-boot enrollment path
159/// already uses (`main.rs::maybe_run_first_boot_enrollment`). Workers
160/// pass them straight through to `crate::comms::build_client` so every
161/// post-enroll `/v1/agent/*` request rides the agent's enrollment-cert
162/// mTLS identity. DEFECT-003 regression-guard: any worker building a
163/// bare `reqwest::Client` (no client cert) would talk to CP
164/// unauthenticated; `integration_mtls.rs` pins the property end-to-end.
165#[derive(Clone)]
166pub struct AgentConfig {
167    pub control_plane_url: String,
168    pub machine_id: String,
169    pub state_dir: std::path::PathBuf,
170    /// LOADBEARING: same `--trust-file` path the binary validated at
171    /// startup. Workers that fetch + verify signed manifests
172    /// (`manifest_poll`, `longpoll`) MUST read this — the startup check
173    /// alone doesn't propagate into runtime trust loading.
174    pub trust_file: std::path::PathBuf,
175    /// Acceptance window for `meta.signedAt` on fleet.resolved.json +
176    /// per-rollout manifest verification (RFC-0010 §1.5 replay
177    /// defense). Production: 3600s — matches the channel-refs poll
178    /// cadence so any signed artifact older than one refresh cycle is
179    /// rejected. Test harnesses with fixed-`signedAt` fixtures override
180    /// to a much larger value (e.g. 1 year) since their fixture bytes
181    /// are deterministic and committed; bumping the window is the
182    /// alternative to regenerating signatures on every test run.
183    pub manifest_freshness_window_secs: u64,
184    pub ca_cert: Option<std::path::PathBuf>,
185    pub client_cert: Option<std::path::PathBuf>,
186    pub client_key: Option<std::path::PathBuf>,
187    /// Path to the NixOS `current-system` symlink. The agent reads it
188    /// to populate `current_closure` on every steady-state heartbeat
189    /// (LIFT #5) so CP can rebuild `host_rollout_records` from agent
190    /// inputs alone after a wipe-and-restart, per the architectural
191    /// promise in `docs/design/architecture.md` §305. Production:
192    /// `/run/current-system`. Test fixtures point at a non-existent
193    /// path; `read_current_closure` returns `None` and the heartbeat
194    /// behaves as it did pre-LIFT-#5.
195    pub current_system_path: std::path::PathBuf,
196    /// Path to the host's SSH ed25519 key. The agent uses it as the
197    /// CSR signing key on `enroll` and on cert `renew` — the renewed
198    /// cert's pubkey is the host's SSH host pubkey, same as at
199    /// first-boot enrollment (RFC-0003 §2). Production:
200    /// `/etc/ssh/ssh_host_ed25519_key`.
201    pub ssh_host_key_file: std::path::PathBuf,
202    /// Fraction of mTLS cert validity remaining below which the
203    /// `cert_renewal` worker calls `enrollment::renew`. `None` →
204    /// disable the renewal worker (the cert stays in place until
205    /// expiry). Operator MAY raise (e.g. 0.8) for short-cycle
206    /// hardware testing of renewal flows. Strictly between 0 and 1
207    /// when set; the worker refuses to start otherwise.
208    pub renewal_threshold_fraction: Option<f64>,
209}
210
211/// Spawn the reducer + worker constellation.
212///
213/// `clock` and `cfg` are the only external dependencies. Workers receive
214/// what they need by parameter (no `state` god-object yet — the agent's
215/// reducer holds its own state and exposes operations via the input
216/// MPSC).
217/// Sender end of the applier → activation worker intent channel.
218pub type ActivationIntentTx = mpsc::Sender<wire::ActivationIntent>;
219/// Sender end of the applier → probe worker reset channel.
220pub type ProbeResetTx = mpsc::Sender<wire::ProbeResetCommand>;
221/// Watch channel the applier hits after enqueueing a fresh outbound
222/// event; wakes the outbound drainer worker immediately rather than
223/// waiting for its next periodic tick.
224pub type OutboundKickTx = tokio::sync::watch::Sender<()>;
225
226/// Context handed to `applier::apply_effect`. Bundles the channels +
227/// reducer input sender the applier needs to dispatch a `Local*` effect
228/// without inlining platform-specific code or HTTP. Mirrors the CP-side
229/// `ApplierCtx`.
230pub struct ApplierCtx<'a> {
231    pub cfg: &'a AgentConfig,
232    pub clock: &'a ClockHandle,
233    pub input_tx: &'a mpsc::Sender<ReducerInput>,
234    pub activation_tx: &'a ActivationIntentTx,
235    pub probe_reset_tx: &'a ProbeResetTx,
236    pub outbound_queue: &'a Arc<OutboundQueue>,
237    pub outbound_kick: &'a OutboundKickTx,
238}
239
240pub fn spawn(cancel: CancellationToken, cfg: AgentConfig, clock: ClockHandle) -> RuntimeHandle {
241    let (input_tx, input_rx) = mpsc::channel::<ReducerInput>(REDUCER_INPUT_CAPACITY);
242    // Applier → activation worker. Small bounded queue: the applier
243    // emits at most one ActivationIntent per LocalFireSwitch /
244    // LocalFireRollbackTo, and the agent can have at most one in-flight
245    // switch at a time. Cap 4 covers retry bursts without unbounded
246    // growth.
247    let (activation_tx, activation_rx) = mpsc::channel::<wire::ActivationIntent>(4);
248    // Applier → probe worker reset channel. Same shape; reset is a
249    // single signal per `LocalActivationCompleted`.
250    let (probe_reset_tx, probe_reset_rx) = mpsc::channel::<wire::ProbeResetCommand>(4);
251    // Applier → outbound drainer kick. Watch semantics: a burst of
252    // enqueues collapses to one wake, the drainer's own ticker stays
253    // as a safety net if the watch is ever missed.
254    let (outbound_kick_tx, outbound_kick_rx) = tokio::sync::watch::channel(());
255
256    // Open / create the durable outbound queue on disk. Per Plan 07,
257    // every outbound event must hit disk before the network POST so
258    // an agent crash between local state change + CP receipt is
259    // recoverable on restart. Failure to open the queue is fatal at
260    // startup; we'd otherwise lose events silently.
261    let outbound_queue = Arc::new(OutboundQueue::open(&cfg.state_dir).unwrap_or_else(|err| {
262        panic!(
263            "open durable outbound queue under {}: {err:#}",
264            cfg.state_dir.display(),
265        )
266    }));
267
268    // One oneshot per worker. Reducer task takes ownership of the
269    // senders; on reducer exit every sender drops, every worker's
270    // `select!` shutdown arm fires. Eight workers: probe, activation,
271    // longpoll, heartbeat, advance_ticker, outbound, manifest_poll,
272    // cert_renewal.
273    const SHUTDOWN_TOKEN_COUNT: usize = 8;
274    let mut shutdown_senders: Vec<oneshot::Sender<()>> = Vec::new();
275    let mut shutdown_tokens: Vec<ShutdownToken> = Vec::new();
276    for _ in 0..SHUTDOWN_TOKEN_COUNT {
277        let (tx, rx) = oneshot::channel::<()>();
278        shutdown_senders.push(tx);
279        shutdown_tokens.push(ShutdownToken(rx));
280    }
281    // Pop in reverse so each consumer gets a distinct token.
282    let cert_renewal_shutdown = shutdown_tokens
283        .pop()
284        .expect("SHUTDOWN_TOKEN_COUNT allocated");
285    let manifest_poll_shutdown = shutdown_tokens
286        .pop()
287        .expect("SHUTDOWN_TOKEN_COUNT allocated");
288    let outbound_shutdown = shutdown_tokens
289        .pop()
290        .expect("SHUTDOWN_TOKEN_COUNT allocated");
291    let advance_shutdown = shutdown_tokens
292        .pop()
293        .expect("SHUTDOWN_TOKEN_COUNT allocated");
294    let heartbeat_shutdown = shutdown_tokens
295        .pop()
296        .expect("SHUTDOWN_TOKEN_COUNT allocated");
297    let longpoll_shutdown = shutdown_tokens
298        .pop()
299        .expect("SHUTDOWN_TOKEN_COUNT allocated");
300    let activation_shutdown = shutdown_tokens
301        .pop()
302        .expect("SHUTDOWN_TOKEN_COUNT allocated");
303    let probe_shutdown = shutdown_tokens
304        .pop()
305        .expect("SHUTDOWN_TOKEN_COUNT allocated");
306
307    let worker_handles = vec![
308        workers::probe::spawn(
309            cfg.clone(),
310            clock.clone(),
311            input_tx.clone(),
312            probe_reset_rx,
313            probe_shutdown,
314        ),
315        workers::activation::spawn(
316            cfg.clone(),
317            clock.clone(),
318            input_tx.clone(),
319            activation_rx,
320            activation_shutdown,
321        ),
322        workers::longpoll::spawn(
323            cfg.clone(),
324            clock.clone(),
325            input_tx.clone(),
326            longpoll_shutdown,
327        ),
328        workers::heartbeat::spawn(
329            cfg.clone(),
330            clock.clone(),
331            input_tx.clone(),
332            heartbeat_shutdown,
333        ),
334        workers::advance_ticker::spawn(clock.clone(), input_tx.clone(), advance_shutdown),
335        workers::outbound::spawn(
336            cfg.clone(),
337            outbound_queue.clone(),
338            outbound_kick_rx,
339            input_tx.clone(),
340            outbound_shutdown,
341        ),
342        workers::manifest_poll::spawn(
343            cfg.clone(),
344            clock.clone(),
345            input_tx.clone(),
346            manifest_poll_shutdown,
347        ),
348        workers::cert_renewal::spawn(
349            cfg.clone(),
350            clock.clone(),
351            cert_renewal_shutdown,
352        ),
353    ];
354
355    let reducer_handle = tokio::spawn(reducer::run(
356        cancel,
357        cfg,
358        clock,
359        input_rx,
360        input_tx.clone(),
361        activation_tx,
362        probe_reset_tx,
363        outbound_queue,
364        outbound_kick_tx,
365        shutdown_senders,
366    ));
367
368    RuntimeHandle {
369        input_tx,
370        reducer_handle,
371        worker_handles,
372    }
373}
374
375/// Used internally by `spawn` to release worker shutdown receivers when
376/// the reducer task exits. Held in scope by the reducer body so dropping
377/// it signals all workers in one go.
378pub(crate) struct ShutdownGuard(#[allow(dead_code)] pub Vec<oneshot::Sender<()>>);
379
380/// Convenience: cloneable Arc<AgentConfig> for tests / external callers.
381pub type AgentConfigHandle = Arc<AgentConfig>;