nixfleet_agent/runtime/
reducer.rs

1//! Reducer task body. Sole `nixfleet_state_machine::step` caller per
2//! invariant (1) in `runtime::mod`.
3//!
4//! State held in-task:
5//!   - per-rollout `HostRolloutState` keyed by `rollout_id` (the agent
6//!     owns its own host's state, so the key is just the rollout id)
7//!   - cached `SignedManifestSet` (refreshed by the `manifest_poll`
8//!     worker per RFC-0004 §1 invariant #1 — single signed source of
9//!     truth fetched + verified once per tick; the reducer reads
10//!     rollout policy from it for `step()` calls)
11//!
12//! Seq assignment: workers emit events with `seq = 0`. The reducer
13//! rewrites it to `state.last_event_seq + 1` before calling step()
14//! — single mutator owns the per-rollout monotonic counter, so
15//! cross-worker ordering can't race.
16
17use std::collections::HashMap;
18
19use nixfleet_proto::RolloutPolicy;
20use nixfleet_proto::clock::ClockHandle;
21use nixfleet_reconciler::planner_types::SignedManifestSet;
22use nixfleet_state_machine::{Event, HostRolloutState, HostState};
23use tokio::sync::{mpsc, oneshot};
24use tokio_util::sync::CancellationToken;
25
26use std::sync::Arc;
27
28use super::applier::apply_effect;
29use super::outbound_queue::OutboundQueue;
30use super::{
31    ActivationIntentTx, AgentConfig, ApplierCtx, OutboundKickTx, ProbeResetTx, ReducerInput,
32    ShutdownGuard,
33};
34
35/// Sustained-failure window cap. RFC-0005 §6 — the agent transitions
36/// Soaking → Failed when a probe has been failing continuously past
37/// this threshold.
38///
39/// TODO(v0.2.1): wire from `services.nixfleet-agent.healthChecks` via
40/// the NixOS module → agent CLI arg → runtime config struct, per
41/// RFC-0005 §6 + §9.1 ("each agent reads it from
42/// services.nixfleet-agent.healthChecks"). Bigger than v0.2 scope
43/// because it touches the NixOS module surface; v0.2 ships with the
44/// hardcoded floor.
45///
46/// Tracking issue: open on `abstracts33d/nixfleet` (Forgejo `origin`
47/// or GitHub `upstream`, operator's call) — title "Wire
48/// SUSTAINED_FAILURE_THRESHOLD_SECS from NixOS module config (v0.2.1)".
49///
50/// The hardcoded 120s is twice RFC-0005 §6's documented default
51/// (60s), so under-shooting safely: real probe-failure detection
52/// still fires, just 60s later than a tuned deployment would. Safe
53/// for v0.2 demo + lab work; not appropriate for production fleets
54/// with tight SLOs. NixOS-module wire-through to make this
55/// operator-tunable is tracked in `v0.2.1-followups.md`.
56const SUSTAINED_FAILURE_THRESHOLD_SECS: i64 = 120;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60    cancel: CancellationToken,
61    cfg: AgentConfig,
62    clock: ClockHandle,
63    mut input_rx: mpsc::Receiver<ReducerInput>,
64    input_tx: mpsc::Sender<ReducerInput>,
65    activation_tx: ActivationIntentTx,
66    probe_reset_tx: ProbeResetTx,
67    outbound_queue: Arc<OutboundQueue>,
68    outbound_kick: OutboundKickTx,
69    shutdown_senders: Vec<oneshot::Sender<()>>,
70) {
71    let _shutdown_guard = ShutdownGuard(shutdown_senders);
72
73    let mut host_states: HashMap<nixfleet_proto::RolloutId, HostRolloutState> = HashMap::new();
74    // Cached signed manifests. Populated by the longpoll worker after
75    // each `verify_rollout_manifest` succeeds (and the boot-recovery
76    // handshake in 7f); the reducer reads it to find `RolloutPolicy`
77    // for step() calls. None at startup; events that arrive before
78    // the cache is warm are dropped with a warn (RFC-0005 §9.5's
79    // "agent can't act on unverified state").
80    let mut manifests: Option<SignedManifestSet> = None;
81
82    loop {
83        tokio::select! {
84            _ = cancel.cancelled() => {
85                tracing::info!(
86                    target: "shutdown",
87                    task = "agent_reducer",
88                    "task shut down",
89                );
90                return;
91            }
92            maybe_input = input_rx.recv() => {
93                let Some(input) = maybe_input else { return };
94                let ctx = ApplierCtx {
95                    cfg: &cfg,
96                    clock: &clock,
97                    input_tx: &input_tx,
98                    activation_tx: &activation_tx,
99                    probe_reset_tx: &probe_reset_tx,
100                    outbound_queue: &outbound_queue,
101                    outbound_kick: &outbound_kick,
102                };
103                handle_input(&mut host_states, &mut manifests, &clock, &ctx, input).await;
104            }
105        }
106    }
107}
108
109async fn handle_input(
110    host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
111    manifests: &mut Option<SignedManifestSet>,
112    clock: &ClockHandle,
113    ctx: &ApplierCtx<'_>,
114    input: ReducerInput,
115) {
116    match input {
117        ReducerInput::HostEvent { rollout_id, event } => {
118            run_host_event(host_states, manifests, clock, ctx, rollout_id, event).await;
119        }
120        ReducerInput::AgentAdvanceTick => {
121            run_advance_tick(host_states, manifests, clock, ctx).await;
122        }
123        ReducerInput::ManifestSetUpdated(set) => {
124            *manifests = Some(*set);
125            tracing::info!(
126                target: "agent_reducer",
127                "manifest cache refreshed",
128            );
129        }
130        ReducerInput::BootstrapHost(snapshot) => {
131            apply_bootstrap_snapshot(host_states, ctx, *snapshot).await;
132        }
133    }
134}
135
136/// LIFT #3 + LIFT #4: apply a CP-supplied HostRolloutSnapshot to the
137/// agent's in-memory reducer cache, then emit the worker re-priming
138/// effects the rehydrated state demands.
139///
140/// Snapshot-shape, not event-replay — the canonical state lives on CP,
141/// the agent's HostRolloutState is a reconstructable cache. Called from
142/// two entry points: the boot-recovery handshake before workers spawn
143/// (`recovery.rs`), and the steady-state heartbeat worker after CP
144/// signals a fresh snapshot (`workers/heartbeat.rs`). Both paths share
145/// this function so worker re-priming is consistent.
146///
147/// LOADBEARING: the merge is asymmetric. Canonical fields (state,
148/// target_closure, dispatch/activation timestamps, last_event_seq)
149/// always come from the snapshot. Agent-local-only fields that the
150/// wire snapshot does NOT carry (probes, probe_observed_first_at,
151/// probe_failure_first_at, failed_at, converged_at, etc.) are
152/// preserved from the existing entry when one is present, defaulted
153/// when not.
154///
155/// `probe_failure_first_at` in particular MUST survive a warm
156/// heartbeat rehydration: LIFT #5 makes CP return `bootstrap_rollouts`
157/// on every steady-state heartbeat (~60s cadence), and clobbering the
158/// sustained-failure timer on each tick prevents `Soaking → Failed`
159/// from ever firing (HEALTH_FAILURE_THRESHOLD_SECS = 120s, so a
160/// 60s clobber starves the timer indefinitely).
161///
162/// LOADBEARING: every non-Pending rehydration emits effects via
163/// `nixfleet_state_machine::rehydration_effects` and routes them through
164/// `apply_effect` — the same channel workers consume during ordinary
165/// transitions. Without this, probe runners (and any future worker that
166/// caches per-rollout state) keep tickers tagged with stale rollout_ids
167/// from a prior process incarnation; the reducer rejects the resulting
168/// events with `LocalProbeResult not legal from state Converged`.
169async fn apply_bootstrap_snapshot(
170    host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
171    ctx: &ApplierCtx<'_>,
172    snapshot: nixfleet_proto::agent_wire::HostRolloutSnapshot,
173) {
174    let warm = host_states.contains_key(&snapshot.rollout_id);
175    let rollout_id = snapshot.rollout_id.clone();
176    let record = merge_snapshot_into_state(host_states.get(&rollout_id), snapshot);
177    tracing::info!(
178        target: "agent_reducer",
179        rollout_id = %record.rollout_id,
180        state = ?record.state,
181        target_closure = %record.target_closure,
182        warm,
183        probe_failure_first_at = ?record.probe_failure_first_at,
184        "bootstrap: rehydrating in-memory HostRolloutState from CP snapshot (LIFT #3)",
185    );
186    let effects = nixfleet_state_machine::rehydration_effects(&record);
187    host_states.insert(rollout_id, record);
188    for effect in effects {
189        apply_effect(ctx, effect).await;
190    }
191}
192
193/// Pure merge of a wire snapshot with the existing in-memory entry.
194/// Canonical fields (state, target_closure, dispatch/activation
195/// timestamps, last_event_seq) come from the snapshot. Agent-local
196/// fields not carried in the wire shape are preserved from `existing`
197/// when present, defaulted when not.
198fn merge_snapshot_into_state(
199    existing: Option<&HostRolloutState>,
200    snapshot: nixfleet_proto::agent_wire::HostRolloutSnapshot,
201) -> HostRolloutState {
202    use nixfleet_proto::HostRolloutState as WireState;
203    use nixfleet_state_machine::HostState;
204    let internal_state = match snapshot.state {
205        WireState::Pending => HostState::Pending,
206        WireState::Activating => HostState::Activating,
207        WireState::Deferred => HostState::Deferred,
208        WireState::Soaking => HostState::Soaking,
209        WireState::Converged => HostState::Converged,
210        WireState::Failed => HostState::Failed,
211        WireState::Reverted => HostState::Reverted,
212    };
213    HostRolloutState {
214        // Canonical fields — always from the snapshot.
215        rollout_id: snapshot.rollout_id,
216        hostname: snapshot.hostname,
217        channel: snapshot.channel,
218        state: internal_state,
219        target_closure: snapshot.target_closure,
220        current_closure_at_dispatch: snapshot.current_closure_at_dispatch,
221        current_closure: snapshot.current_closure,
222        dispatched_at: snapshot.dispatched_at,
223        dispatch_acked_at: snapshot.dispatch_acked_at,
224        activation_started_at: snapshot.activation_started_at,
225        activation_completed_at: snapshot.activation_completed_at,
226        soak_due_at: snapshot.soak_due_at,
227        last_event_seq: snapshot.last_event_seq,
228        // Agent-local-only fields — preserved from the existing entry
229        // on warm rehydration, defaulted on cold rehydration. Wire
230        // snapshot does not carry these (agent_wire.rs:55-78).
231        probes: existing.map(|e| e.probes.clone()).unwrap_or_default(),
232        probe_observed_first_at: existing.and_then(|e| e.probe_observed_first_at),
233        probe_failure_first_at: existing.and_then(|e| e.probe_failure_first_at),
234        activation_failed_at: existing.and_then(|e| e.activation_failed_at),
235        failed_at: existing.and_then(|e| e.failed_at),
236        converged_at: existing.and_then(|e| e.converged_at),
237        reverted_to: existing.and_then(|e| e.reverted_to.clone()),
238        reverted_at: existing.and_then(|e| e.reverted_at),
239        policy_applied: existing.and_then(|e| e.policy_applied),
240    }
241}
242
243async fn run_host_event(
244    host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
245    manifests: &Option<SignedManifestSet>,
246    clock: &ClockHandle,
247    ctx: &ApplierCtx<'_>,
248    rollout_id: nixfleet_proto::RolloutId,
249    event: Event,
250) {
251    // Bootstrap from LocalActivate: when no state exists yet, the
252    // event must be a fresh dispatch. `target_closure` is carried on
253    // the event payload, having been validated by the longpoll
254    // worker's `manifest_cache.ensure_for_dispatch` call against the
255    // freshly-fetched per-rollout manifest.
256    //
257    // LOADBEARING: the reducer's own `manifests` cache is NOT
258    // consulted at bootstrap — that cache is fed by
259    // `agent_manifest_poll` on a slower cadence and can be stale
260    // immediately after a new rollout's channel_ref is published,
261    // producing a TOCTOU between longpoll's verify and the reducer's
262    // snapshot read. Carrying the validated value with the event
263    // makes the trust chain explicit: longpoll verifies against the
264    // signed manifest, longpoll passes the value forward, reducer
265    // consumes it without re-derivation. RFC-0004 §1 invariant 1.
266    let prior = host_states.get(&rollout_id).cloned();
267    let (state, policy_channel) = match (prior, &event) {
268        (Some(s), _) => {
269            let ch = s.channel.clone();
270            (s, ch)
271        }
272        (
273            None,
274            Event::LocalActivate {
275                target_closure,
276                soak_due_at,
277                ..
278            },
279        ) => {
280            let now = clock.now();
281            let state = bootstrap_pending_state(&rollout_id, target_closure, *soak_due_at, now);
282            (state, "<bootstrap>".to_string())
283        }
284        (None, _) => {
285            tracing::warn!(
286                target: "agent_reducer",
287                %rollout_id,
288                ?event,
289                "HostEvent for unknown rollout (no LocalActivate seen yet); dropping",
290            );
291            return;
292        }
293    };
294
295    let Some(policy) = resolve_policy(manifests, &state.channel) else {
296        tracing::warn!(
297            target: "agent_reducer",
298            %rollout_id,
299            channel = %state.channel,
300            policy_channel,
301            "HostEvent: rollout policy not cached yet; dropping event (longpoll will refill)",
302        );
303        return;
304    };
305
306    // Rewrite seq so workers can emit with seq=0 and the reducer owns
307    // the per-rollout monotonic counter (single mutator → no race).
308    let next_seq = state.last_event_seq + 1;
309    let event = with_seq(event, next_seq);
310
311    let now = clock.now();
312    let (next_state, effects) = match nixfleet_state_machine::step(state, event, now, &policy) {
313        Ok(out) => out,
314        Err(err) => {
315            tracing::warn!(
316                target: "agent_reducer",
317                %rollout_id,
318                error = %err,
319                "step() rejected event — illegal transition or invariant violation",
320            );
321            return;
322        }
323    };
324    host_states.insert(rollout_id, next_state);
325
326    // Effects carry their own `rollout_id` per RFC-0006 §9. Applier
327    // reads directly from the effect variant.
328    for effect in effects {
329        apply_effect(ctx, effect).await;
330    }
331}
332
333async fn run_advance_tick(
334    host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
335    manifests: &Option<SignedManifestSet>,
336    clock: &ClockHandle,
337    ctx: &ApplierCtx<'_>,
338) {
339    let now = clock.now();
340    // Collect synthesised events first so we don't mutate the map
341    // while iterating.
342    let mut synth: Vec<(nixfleet_proto::RolloutId, Event)> = Vec::new();
343
344    // Fail-gate: Soaking → Failed via LocalSustainedFailureCrossed. Mode
345    // filter on the failing-probe set per RFC-0007 §3.3 (ProbeMode
346    // docstring): only Enforce-mode probes contribute to sustained-failure.
347    for (rollout_id, state) in host_states.iter() {
348        if state.state != HostState::Soaking {
349            continue;
350        }
351        let Some(first_failed) = state.probe_failure_first_at else {
352            continue;
353        };
354        if (now - first_failed).num_seconds() < SUSTAINED_FAILURE_THRESHOLD_SECS {
355            continue;
356        }
357        let failing_probes = collect_failing_enforce_probes(&state.probes);
358        if failing_probes.is_empty() {
359            continue;
360        }
361        let policy_applied = resolve_policy(manifests, &state.channel)
362            .map(|p| p.on_health_failure)
363            .unwrap_or(nixfleet_proto::OnHealthFailure::Halt);
364        synth.push((
365            rollout_id.clone(),
366            Event::LocalSustainedFailureCrossed {
367                failed_at: now,
368                sustained_duration_secs: (now - first_failed).num_seconds() as u64,
369                failing_probes,
370                policy_applied,
371                seq: 0, // run_host_event rewrites
372            },
373        ));
374    }
375
376    // Pass-gate: Soaking → Converged via LocalConvergedReached. Three
377    // RFC-0005 §4.2 invariants: current==target, soak_due_at elapsed, all
378    // enforce-mode probes Pass. Mode filter on the probe-pass check
379    // brings convergence into parity with the fail-gate above per
380    // RFC-0007 §3.3 (ProbeMode docstring). The shared verifier
381    // (state-machine soaking::verify_converged_invariants) re-checks
382    // these at step() time before transitioning.
383    for (rollout_id, state) in host_states.iter() {
384        if state.state != HostState::Soaking {
385            continue;
386        }
387        let Some(soak_due_at) = state.soak_due_at else {
388            continue;
389        };
390        if now < soak_due_at {
391            continue;
392        }
393        let Some(current) = state.current_closure.as_ref() else {
394            continue;
395        };
396        if *current != state.target_closure {
397            continue;
398        }
399        if !all_enforce_probes_pass(&state.probes) {
400            continue;
401        }
402        synth.push((
403            rollout_id.clone(),
404            Event::LocalConvergedReached {
405                converged_at: now,
406                current_closure: current.clone(),
407                seq: 0, // run_host_event rewrites
408            },
409        ));
410    }
411
412    for (rollout_id, event) in synth {
413        run_host_event(host_states, manifests, clock, ctx, rollout_id, event).await;
414    }
415}
416
417/// First-touch bootstrap for a fresh `LocalActivate` event. Pure: derives
418/// channel from the canonical `RolloutId` composite (RFC-0008 §6.3); the
419/// caller threads in the manifest-looked-up `target_closure` for this
420/// host (selecting by `hostname == cfg.machine_id`) and the
421/// CP-resolved `soak_due_at` carried by the `LocalActivate` event
422/// from `DispatchResponse.soak_due_at` (CP is the single source of
423/// truth for the policy-resolved soak window per RFC-0004 §1
424/// invariant 1). Caller also threads `now` so the helper stays
425/// clock-injection-free.
426fn bootstrap_pending_state(
427    rollout_id: &nixfleet_proto::RolloutId,
428    target_closure: &str,
429    soak_due_at: chrono::DateTime<chrono::Utc>,
430    now: chrono::DateTime<chrono::Utc>,
431) -> HostRolloutState {
432    let channel = rollout_id.channel().to_string();
433    HostRolloutState::new_pending(
434        rollout_id.clone(),
435        "self".to_string(),
436        channel,
437        target_closure.to_string(),
438        now,
439        soak_due_at,
440    )
441}
442
443/// Collect probe names that are currently failing AND declared with
444/// `mode = Enforce`. Per RFC-0007 §3.4, only `Enforce`-mode probes
445/// participate in the soak gate; `Observe` and `Disabled` records
446/// events but does not gate. The pre-fix builder filtered only by
447/// `status == Fail`, which silently included failing `Observe`-mode
448/// probes in `LocalSustainedFailureCrossed.failing_probes` and gated
449/// soak promotion against the documented contract.
450fn collect_failing_enforce_probes(
451    probes: &HashMap<String, nixfleet_state_machine::ProbeRecord>,
452) -> Vec<String> {
453    probes
454        .iter()
455        .filter(|(_, r)| {
456            r.status == nixfleet_state_machine::ProbeStatus::Fail
457                && matches!(r.mode, nixfleet_state_machine::ProbeMode::Enforce)
458        })
459        .map(|(name, _)| name.clone())
460        .collect()
461}
462
463/// All enforce-mode probes have status `Pass`. Observe and Disabled are
464/// ignored per RFC-0007 §3.3 (ProbeMode docstring, state.rs); they do not
465/// gate convergence. Mirror of `collect_failing_enforce_probes` on the
466/// Soaking → Converged exit path. Empty enforce set trivially satisfies
467/// — matches the shared verifier's "empty probe map acceptable" semantic
468/// in `verify_converged_invariants`.
469fn all_enforce_probes_pass(probes: &HashMap<String, nixfleet_state_machine::ProbeRecord>) -> bool {
470    probes
471        .values()
472        .filter(|r| matches!(r.mode, nixfleet_state_machine::ProbeMode::Enforce))
473        .all(|r| r.status == nixfleet_state_machine::ProbeStatus::Pass)
474}
475
476fn resolve_policy(manifests: &Option<SignedManifestSet>, channel: &str) -> Option<RolloutPolicy> {
477    let m = manifests.as_ref()?;
478    let fleet = m.fleet();
479    let channel_entry = fleet.channels.get(channel)?;
480    fleet
481        .rollout_policies
482        .get(&channel_entry.rollout_policy)
483        .cloned()
484}
485
486/// Rewrite the `seq` field on a `Local*` event. The reducer owns the
487/// monotonic counter (single mutator) so workers can emit with `seq = 0`
488/// and let this function fill it in.
489fn with_seq(event: Event, seq: u64) -> Event {
490    match event {
491        Event::LocalActivate {
492            current_closure_at_dispatch,
493            target_closure,
494            received_at,
495            soak_due_at,
496            ..
497        } => Event::LocalActivate {
498            current_closure_at_dispatch,
499            target_closure,
500            received_at,
501            soak_due_at,
502            seq,
503        },
504        Event::LocalActivationStarted {
505            started_at,
506            switch_method,
507            ..
508        } => Event::LocalActivationStarted {
509            started_at,
510            switch_method,
511            seq,
512        },
513        Event::LocalActivationCompleted {
514            observed_current_closure,
515            exit_code,
516            completed_at,
517            ..
518        } => Event::LocalActivationCompleted {
519            observed_current_closure,
520            exit_code,
521            completed_at,
522            seq,
523        },
524        Event::LocalActivationFailed {
525            exit_code,
526            stderr_tail,
527            failed_at,
528            ..
529        } => Event::LocalActivationFailed {
530            exit_code,
531            stderr_tail,
532            failed_at,
533            seq,
534        },
535        Event::LocalProbeObservedFirst {
536            probe_name,
537            mode,
538            observed_at,
539            ..
540        } => Event::LocalProbeObservedFirst {
541            probe_name,
542            mode,
543            observed_at,
544            seq,
545        },
546        Event::LocalProbeResult {
547            probe_name,
548            mode,
549            status,
550            observed_at,
551            failure_reason,
552            sub_results,
553            ..
554        } => Event::LocalProbeResult {
555            probe_name,
556            mode,
557            status,
558            observed_at,
559            failure_reason,
560            sub_results,
561            seq,
562        },
563        Event::LocalProbeFailureFirst {
564            probe_name,
565            mode,
566            first_failed_at,
567            ..
568        } => Event::LocalProbeFailureFirst {
569            probe_name,
570            mode,
571            first_failed_at,
572            seq,
573        },
574        Event::LocalSustainedFailureCrossed {
575            failed_at,
576            sustained_duration_secs,
577            failing_probes,
578            policy_applied,
579            ..
580        } => Event::LocalSustainedFailureCrossed {
581            failed_at,
582            sustained_duration_secs,
583            failing_probes,
584            policy_applied,
585            seq,
586        },
587        Event::LocalRollbackCompleted {
588            reverted_to_closure,
589            exit_code,
590            completed_at,
591            ..
592        } => Event::LocalRollbackCompleted {
593            reverted_to_closure,
594            exit_code,
595            completed_at,
596            seq,
597        },
598        Event::LocalConvergedReached {
599            converged_at,
600            current_closure,
601            ..
602        } => Event::LocalConvergedReached {
603            converged_at,
604            current_closure,
605            seq,
606        },
607        Event::LocalProbeTopologyDeclared {
608            probes,
609            declared_at,
610            ..
611        } => Event::LocalProbeTopologyDeclared {
612            probes,
613            declared_at,
614            seq,
615        },
616        // Remote* events should never reach the agent reducer's
617        // run_host_event path. Return as-is; the upstream layer will
618        // log + drop via the applier's Remote* arm.
619        other => other,
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use chrono::TimeZone;
627
628    fn fixed_now() -> chrono::DateTime<chrono::Utc> {
629        chrono::Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap()
630    }
631
632    #[test]
633    fn bootstrap_extracts_channel_from_canonical_rollout_id() {
634        // SR-1 regression guard: the bootstrap derives `channel` from
635        // the canonical RolloutId composite, not from a scan over
636        // manifests.rollouts. Verified directly against the pure helper.
637        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
638        let soak = fixed_now() + chrono::Duration::minutes(5);
639        let state = bootstrap_pending_state(&rid, "closure-X", soak, fixed_now());
640        assert_eq!(
641            state.channel, "stable",
642            "channel derived from rollout_id.channel(), not from manifest scan",
643        );
644    }
645
646    #[test]
647    fn bootstrap_uses_caller_provided_target_closure_not_host_set_first() {
648        // SR-2 regression guard: target_closure is resolved by the
649        // caller from a hostname-aware manifest lookup
650        // (`hw.hostname == cfg.machine_id`). The pre-fix shape used
651        // `host_set.first()` which silently produced the wrong closure
652        // on any host whose hostname did not sort first in host_set.
653        // The bootstrap helper is now a pure function over the
654        // caller-resolved target; this test pins the helper's
655        // pass-through behaviour against any future drift that would
656        // re-introduce a manifest-scan inside the helper.
657        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
658        let soak = fixed_now() + chrono::Duration::minutes(5);
659        let state = bootstrap_pending_state(&rid, "RIGHT-closure", soak, fixed_now());
660        assert_eq!(
661            state.target_closure, "RIGHT-closure",
662            "target_closure comes from the caller (manifest by-hostname lookup), not from inside the helper",
663        );
664    }
665
666    #[test]
667    fn bootstrap_uses_caller_provided_soak_due_at_not_hardcoded() {
668        // LOADBEARING: soak_due_at is the CP-resolved value carried
669        // by the LocalActivate event (from
670        // DispatchResponse.soak_due_at, computed by CP from the
671        // manifest's
672        // rollout_policies[policy].waves[wave_index].soak_minutes).
673        // A hardcoded agent-side default would ignore CP's
674        // resolution and force every host through the same soak
675        // window regardless of policy.
676        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
677        let now = fixed_now();
678        let dispatched_soak = now + chrono::Duration::seconds(0);
679        let state = bootstrap_pending_state(&rid, "closure-X", dispatched_soak, now);
680        assert_eq!(state.state, HostState::Pending);
681        assert_eq!(
682            state.soak_due_at,
683            Some(dispatched_soak),
684            "soak_due_at comes from the caller (CP-dispatched value), not a hardcoded default",
685        );
686
687        // And confirm a non-zero value passes through faithfully too —
688        // proves the helper is genuinely caller-driven, not coincidentally
689        // matching the old 5-minute default.
690        let custom_soak = now + chrono::Duration::minutes(17);
691        let state2 = bootstrap_pending_state(&rid, "closure-X", custom_soak, now);
692        assert_eq!(state2.soak_due_at, Some(custom_soak));
693    }
694
695    #[test]
696    fn bootstrap_target_closure_independent_of_manifests_snapshot() {
697        // LOADBEARING: bootstrap MUST read target_closure from the
698        // LocalActivate event (which longpoll filled with the
699        // freshly-validated dispatch target), NOT from the reducer's
700        // `manifests` snapshot. The snapshot is fed by
701        // `agent_manifest_poll` on a slower cadence than longpoll's
702        // dispatch arrival; when a new rollout's channel_ref is
703        // freshly published, the snapshot can still hold the OLD
704        // per-rollout manifest even after longpoll verified the NEW
705        // target — producing a TOCTOU that strands the agent in
706        // Soaking against an OLD target while CP has already moved on
707        // (RFC-0004 §1 invariant 1). The pure helper is
708        // caller-driven; the call site in run_host_event extracts the
709        // field from the event and passes it through.
710        let rid = nixfleet_proto::RolloutId::new("edge", "f8c46e472deadbeef");
711        let now = fixed_now();
712        let soak = now;
713
714        // Simulate "manifest snapshot has STALE-X cached but longpoll
715        // just validated FRESH-Y". Both values exercise the helper;
716        // neither comes from any manifest snapshot.
717        let stale_from_cache = "STALE-target-from-old-manifest".to_string();
718        let fresh_from_dispatch = "FRESH-target-from-just-verified-dispatch".to_string();
719
720        let state_stale = bootstrap_pending_state(&rid, &stale_from_cache, soak, now);
721        assert_eq!(state_stale.target_closure, stale_from_cache);
722
723        let state_fresh = bootstrap_pending_state(&rid, &fresh_from_dispatch, soak, now);
724        assert_eq!(state_fresh.target_closure, fresh_from_dispatch);
725
726        // The two values differ deliberately. The helper produces what
727        // the caller asks for; neither path consults any global state.
728        assert_ne!(state_stale.target_closure, state_fresh.target_closure);
729    }
730
731    fn probe_record(
732        status: nixfleet_state_machine::ProbeStatus,
733        mode: nixfleet_state_machine::ProbeMode,
734    ) -> nixfleet_state_machine::ProbeRecord {
735        nixfleet_state_machine::ProbeRecord {
736            status,
737            mode,
738            last_observed_at: fixed_now(),
739            last_pass_at: None,
740            failure_reason: None,
741        }
742    }
743
744    #[test]
745    fn collect_failing_enforce_probes_includes_failing_enforce() {
746        let mut probes = HashMap::new();
747        probes.insert(
748            "enforce-fail".to_string(),
749            probe_record(
750                nixfleet_state_machine::ProbeStatus::Fail,
751                nixfleet_state_machine::ProbeMode::Enforce,
752            ),
753        );
754        let failing = collect_failing_enforce_probes(&probes);
755        assert_eq!(
756            failing,
757            vec!["enforce-fail".to_string()],
758            "failing enforce-mode probe MUST gate per RFC-0007 §3.4",
759        );
760    }
761
762    #[test]
763    fn collect_failing_enforce_probes_excludes_failing_observe_and_disabled() {
764        // RFC-0007 §3.4 regression guard: a failing observe-mode probe
765        // (e.g. an evidence-kind compliance probe with mode = "observe"
766        // that triggers an audit failure) records the event but MUST
767        // NOT gate soak promotion. Same for disabled mode. The pre-fix
768        // builder filtered only on `status == Fail` and let observe +
769        // disabled failures gate, contradicting the documented contract
770        // and tripping lab's edge soak window on anssi-bp028.
771        let mut probes = HashMap::new();
772        probes.insert(
773            "observe-fail".to_string(),
774            probe_record(
775                nixfleet_state_machine::ProbeStatus::Fail,
776                nixfleet_state_machine::ProbeMode::Observe,
777            ),
778        );
779        probes.insert(
780            "disabled-fail".to_string(),
781            probe_record(
782                nixfleet_state_machine::ProbeStatus::Fail,
783                nixfleet_state_machine::ProbeMode::Disabled,
784            ),
785        );
786        probes.insert(
787            "enforce-pass".to_string(),
788            probe_record(
789                nixfleet_state_machine::ProbeStatus::Pass,
790                nixfleet_state_machine::ProbeMode::Enforce,
791            ),
792        );
793        let failing = collect_failing_enforce_probes(&probes);
794        assert!(
795            failing.is_empty(),
796            "observe + disabled failures and passing enforce probes MUST NOT gate; got: {failing:?}",
797        );
798    }
799
800    #[test]
801    fn all_enforce_probes_pass_with_empty_map_is_true() {
802        let probes: HashMap<String, nixfleet_state_machine::ProbeRecord> = HashMap::new();
803        assert!(
804            all_enforce_probes_pass(&probes),
805            "empty probe map satisfies convergence vacuously — matches shared verifier semantic",
806        );
807    }
808
809    #[test]
810    fn all_enforce_probes_pass_with_passing_enforce_only_is_true() {
811        let mut probes = HashMap::new();
812        probes.insert(
813            "nginx-version".to_string(),
814            probe_record(
815                nixfleet_state_machine::ProbeStatus::Pass,
816                nixfleet_state_machine::ProbeMode::Enforce,
817            ),
818        );
819        assert!(all_enforce_probes_pass(&probes));
820    }
821
822    #[test]
823    fn all_enforce_probes_pass_ignores_failing_observe_and_disabled() {
824        // RFC-0007 §3.3 regression guard: observe + disabled probe
825        // failures MUST NOT gate convergence. Mirror of the
826        // collect_failing_enforce_probes filter on the soak-fail side.
827        let mut probes = HashMap::new();
828        probes.insert(
829            "nginx-version".to_string(),
830            probe_record(
831                nixfleet_state_machine::ProbeStatus::Pass,
832                nixfleet_state_machine::ProbeMode::Enforce,
833            ),
834        );
835        probes.insert(
836            "evidence-nis2".to_string(),
837            probe_record(
838                nixfleet_state_machine::ProbeStatus::Fail,
839                nixfleet_state_machine::ProbeMode::Observe,
840            ),
841        );
842        probes.insert(
843            "suppressed-probe".to_string(),
844            probe_record(
845                nixfleet_state_machine::ProbeStatus::Fail,
846                nixfleet_state_machine::ProbeMode::Disabled,
847            ),
848        );
849        assert!(
850            all_enforce_probes_pass(&probes),
851            "observe + disabled failures MUST NOT gate convergence per RFC-0007 §3.3",
852        );
853    }
854
855    #[test]
856    fn all_enforce_probes_pass_with_failing_enforce_is_false() {
857        let mut probes = HashMap::new();
858        probes.insert(
859            "nginx-version".to_string(),
860            probe_record(
861                nixfleet_state_machine::ProbeStatus::Fail,
862                nixfleet_state_machine::ProbeMode::Enforce,
863            ),
864        );
865        assert!(!all_enforce_probes_pass(&probes));
866    }
867
868    fn make_snapshot(
869        rid: &nixfleet_proto::RolloutId,
870        wire_state: nixfleet_proto::HostRolloutState,
871        last_event_seq: u64,
872    ) -> nixfleet_proto::agent_wire::HostRolloutSnapshot {
873        let now = fixed_now();
874        nixfleet_proto::agent_wire::HostRolloutSnapshot {
875            rollout_id: rid.clone(),
876            hostname: "h1".to_string(),
877            channel: "stable".to_string(),
878            state: wire_state,
879            target_closure: "target-closure-X".to_string(),
880            current_closure_at_dispatch: Some("prior-closure".to_string()),
881            current_closure: Some("target-closure-X".to_string()),
882            dispatched_at: now,
883            dispatch_acked_at: Some(now),
884            activation_started_at: Some(now),
885            activation_completed_at: Some(now),
886            soak_due_at: Some(now + chrono::Duration::minutes(5)),
887            last_event_seq,
888        }
889    }
890
891    #[test]
892    fn merge_snapshot_cold_rehydration_defaults_agent_local_fields() {
893        // No existing entry — agent-local fields must default. Mirrors
894        // the boot-recovery handshake path.
895        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
896        let snapshot =
897            make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 7);
898        let record = merge_snapshot_into_state(None, snapshot);
899        assert_eq!(record.state, HostState::Soaking);
900        assert_eq!(record.target_closure, "target-closure-X");
901        assert_eq!(record.last_event_seq, 7);
902        assert!(record.probes.is_empty(), "cold rehydration starts with empty probe map");
903        assert_eq!(record.probe_failure_first_at, None);
904        assert_eq!(record.probe_observed_first_at, None);
905        assert_eq!(record.failed_at, None);
906        assert_eq!(record.converged_at, None);
907    }
908
909    #[test]
910    fn merge_snapshot_warm_rehydration_preserves_probe_failure_timer() {
911        // LOADBEARING: Bug D regression guard. Under LIFT #5, CP returns
912        // bootstrap_rollouts on every steady-state heartbeat (~60s). If
913        // `probe_failure_first_at` got clobbered on each tick, the
914        // sustained-failure threshold (120s) would never cross and
915        // Soaking → Failed could never fire (BT-04). The merge MUST
916        // preserve the timer from the existing entry.
917        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
918        let failure_stamp = fixed_now() - chrono::Duration::seconds(90);
919        let mut existing = HostRolloutState::new_pending(
920            rid.clone(),
921            "h1".to_string(),
922            "stable".to_string(),
923            "target-closure-X".to_string(),
924            fixed_now() - chrono::Duration::minutes(2),
925            fixed_now() + chrono::Duration::minutes(5),
926        );
927        existing.state = HostState::Soaking;
928        existing.probe_failure_first_at = Some(failure_stamp);
929        existing.probe_observed_first_at = Some(failure_stamp);
930        existing.probes.insert(
931            "tcp-fail".to_string(),
932            probe_record(
933                nixfleet_state_machine::ProbeStatus::Fail,
934                nixfleet_state_machine::ProbeMode::Enforce,
935            ),
936        );
937
938        let snapshot =
939            make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 12);
940        let record = merge_snapshot_into_state(Some(&existing), snapshot);
941
942        assert_eq!(record.state, HostState::Soaking, "canonical state still comes from snapshot");
943        assert_eq!(record.last_event_seq, 12, "last_event_seq still comes from snapshot");
944        assert_eq!(
945            record.probe_failure_first_at,
946            Some(failure_stamp),
947            "probe_failure_first_at MUST survive warm rehydration so sustained-failure timer can accumulate",
948        );
949        assert_eq!(
950            record.probe_observed_first_at,
951            Some(failure_stamp),
952            "probe_observed_first_at also preserved (agent-local, not in wire snapshot)",
953        );
954        assert!(
955            record.probes.contains_key("tcp-fail"),
956            "existing probe map preserved so probe scheduler doesn't lose mid-flight tracking",
957        );
958    }
959
960    #[test]
961    fn merge_snapshot_warm_rehydration_advances_state_but_preserves_probes() {
962        // Symmetric to the BT-04 case: when CP synthesises a state
963        // advance (e.g. LIFT #5 Pending → Soaking after wipe), the
964        // record's state must update to match CP's view AND probe
965        // state survives. Pinned so future drift can't trade one bug
966        // for the other.
967        let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
968        let mut existing = HostRolloutState::new_pending(
969            rid.clone(),
970            "h1".to_string(),
971            "stable".to_string(),
972            "target-closure-X".to_string(),
973            fixed_now(),
974            fixed_now() + chrono::Duration::minutes(5),
975        );
976        existing.state = HostState::Pending;
977        existing.probe_failure_first_at = Some(fixed_now() - chrono::Duration::seconds(30));
978
979        let snapshot =
980            make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 5);
981        let record = merge_snapshot_into_state(Some(&existing), snapshot);
982
983        assert_eq!(record.state, HostState::Soaking);
984        assert_eq!(
985            record.probe_failure_first_at,
986            existing.probe_failure_first_at,
987            "probe timer survives state advance",
988        );
989    }
990}