nixfleet_state_machine/rehydration.rs
1//! Snapshot rehydration — effect emission rules for state restored from a
2//! CP-supplied `HostRolloutSnapshot` (RFC-0005 §9.5 / LIFT #3).
3//!
4//! LIFT #4: bootstrap-applied state must re-prime worker channels via the
5//! same `Effect` contract as ordinary transitions. The agent's reducer
6//! caches `HostRolloutState` directly from the snapshot (the canonical
7//! state lives on CP, not in an event ladder), but every worker that
8//! re-primes on a state change — probe runners, activation drainer,
9//! soak detector — must learn about the rehydration through an Effect.
10//!
11//! Adding a new worker re-priming need lands here (one match arm), not
12//! in every bootstrap entry point.
13
14use crate::effect::Effect;
15use crate::state::{HostRolloutState, HostState};
16
17/// Effects to emit immediately after applying a `HostRolloutSnapshot` to
18/// the agent's in-memory state. Workers consume these to refresh any
19/// cached per-rollout state seeded by prior process incarnations.
20///
21/// `Pending` emits nothing: no rollout is in flight, no probes were ever
22/// declared for it locally, no worker has any cached state to invalidate.
23///
24/// Every other state implies the rollout is (or was) live for this host —
25/// probe runners may be holding tickers tagged with a stale `rollout_id`
26/// from a prior agent process, so `LocalResetProbeCache` fires
27/// unconditionally to force the probe worker to drop those tickers and
28/// reload declarations from `health-checks.json` under the bootstrapped
29/// `rollout_id`.
30pub fn rehydration_effects(state: &HostRolloutState) -> Vec<Effect> {
31 match state.state {
32 HostState::Pending => Vec::new(),
33 HostState::Activating
34 | HostState::Deferred
35 | HostState::Soaking
36 | HostState::Converged
37 | HostState::Failed
38 | HostState::Reverted => vec![Effect::LocalResetProbeCache {
39 rollout_id: state.rollout_id.clone(),
40 }],
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use chrono::{TimeZone, Utc};
47
48 use super::*;
49 use crate::state::HostState;
50
51 fn state_in(s: HostState) -> HostRolloutState {
52 let mut st = HostRolloutState::new_pending(
53 "stable@r1".into(),
54 "h1".into(),
55 "stable".into(),
56 "target".into(),
57 Utc.with_ymd_and_hms(2026, 5, 18, 0, 0, 0).unwrap(),
58 Utc.with_ymd_and_hms(2026, 5, 18, 0, 5, 0).unwrap(),
59 );
60 st.state = s;
61 st
62 }
63
64 #[test]
65 fn pending_emits_nothing() {
66 assert!(rehydration_effects(&state_in(HostState::Pending)).is_empty());
67 }
68
69 #[test]
70 fn non_pending_states_emit_probe_cache_reset() {
71 for s in [
72 HostState::Activating,
73 HostState::Deferred,
74 HostState::Soaking,
75 HostState::Converged,
76 HostState::Failed,
77 HostState::Reverted,
78 ] {
79 let effects = rehydration_effects(&state_in(s));
80 assert!(
81 effects.iter().any(|e| matches!(
82 e,
83 Effect::LocalResetProbeCache { rollout_id } if rollout_id.as_str() == "stable@r1"
84 )),
85 "state {s:?} must emit LocalResetProbeCache; got {effects:?}",
86 );
87 }
88 }
89}