nixfleet_state_machine/rollout/transitions/
active.rs

1//! `Active` source state. At least one host is in-flight
2//! (`Pending`/`Activating`/`Soaking` per RFC-0005).
3
4use chrono::{DateTime, Utc};
5
6use super::{illegal, transition_effect};
7use crate::HostState;
8use crate::rollout::effect::RolloutEffect;
9use crate::rollout::error::RolloutTransitionError;
10use crate::rollout::event::RolloutEvent;
11use crate::rollout::state::{RolloutRecord, RolloutState};
12
13pub(super) fn step(
14    mut record: RolloutRecord,
15    event: RolloutEvent,
16    _now: DateTime<Utc>,
17) -> Result<(RolloutRecord, Vec<RolloutEffect>), RolloutTransitionError> {
18    match event {
19        // Hosts can keep joining as later waves dispatch — stays Active.
20        //
21        // LOADBEARING: HostJoined is observed (the wave is logged in
22        // event_log via the event itself) but does NOT mutate
23        // `current_wave`. Wave-cursor progression happens via
24        // `advance_current_waves` → `WaveAdvanced`. See
25        // opening.rs::step's HostJoined arm for the full rationale.
26        RolloutEvent::HostJoined { .. } => Ok((record, Vec::new())),
27
28        // Per-host transitions drive aggregate state changes.
29        RolloutEvent::HostStateChanged { to, at, .. } => match to {
30            HostState::Reverted => {
31                record.state = RolloutState::Reverted;
32                let effects = vec![transition_effect(
33                    &record,
34                    RolloutState::Active,
35                    RolloutState::Reverted,
36                    at,
37                )];
38                Ok((record, effects))
39            }
40            HostState::Failed => {
41                record.state = RolloutState::Failed;
42                let effects = vec![transition_effect(
43                    &record,
44                    RolloutState::Active,
45                    RolloutState::Failed,
46                    at,
47                )];
48                Ok((record, effects))
49            }
50            // All other host transitions (→ Activating, → Soaking,
51            // → Converged) are observed but don't change the rollout
52            // state directly; the applier emits `RolloutTerminal` or
53            // `WaveAdvanced` separately when aggregates flip.
54            _ => Ok((record, Vec::new())),
55        },
56
57        // Wave aggregation. The applier emits this when the current
58        // wave's hosts have all reached Soaking (or beyond). If
59        // `to_wave > from_wave` the planner has dispatched the next
60        // wave → stays Active; if `to_wave == from_wave` the wave is
61        // complete and no successor is dispatching → Converging.
62        RolloutEvent::WaveAdvanced {
63            from_wave,
64            to_wave,
65            at,
66            ..
67        } => {
68            if to_wave > from_wave {
69                // Monotonic — never roll back. A stale `WaveAdvanced`
70                // for an earlier wave is observed but doesn't update.
71                let mut effects = Vec::new();
72                if to_wave > record.current_wave {
73                    effects.push(RolloutEffect::UpdateCurrentWave {
74                        rollout_id: record.rollout_id.clone(),
75                        wave: to_wave,
76                    });
77                    record.current_wave = to_wave;
78                }
79                Ok((record, effects))
80            } else {
81                record.state = RolloutState::Converging;
82                let effects = vec![transition_effect(
83                    &record,
84                    RolloutState::Active,
85                    RolloutState::Converging,
86                    at,
87                )];
88                Ok((record, effects))
89            }
90        }
91
92        // All hosts in all waves Converged — applier aggregated.
93        RolloutEvent::RolloutTerminal { at, .. } => {
94            record.state = RolloutState::Terminal;
95            record.terminal_at = Some(at);
96            let effects = vec![transition_effect(
97                &record,
98                RolloutState::Active,
99                RolloutState::Terminal,
100                at,
101            )];
102            Ok((record, effects))
103        }
104
105        RolloutEvent::SuccessorOpened { at, .. } => {
106            record.state = RolloutState::Superseded;
107            record.superseded_at = Some(at);
108            let effects = vec![transition_effect(
109                &record,
110                RolloutState::Active,
111                RolloutState::Superseded,
112                at,
113            )];
114            Ok((record, effects))
115        }
116
117        RolloutEvent::RolloutOpened { .. }
118        | RolloutEvent::RetentionExpired { .. }
119        | RolloutEvent::OperatorClearance { .. } => Err(illegal(
120            RolloutState::Active,
121            &event,
122            record.rollout_id.clone(),
123        )),
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use chrono::TimeZone;
131
132    fn t0() -> DateTime<Utc> {
133        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
134    }
135
136    fn active_record() -> RolloutRecord {
137        RolloutRecord {
138            rollout_id: "r1".into(),
139            channel: "stable".into(),
140            target_ref: "ref-1".into(),
141            state: RolloutState::Active,
142            current_wave: 0,
143            opened_event_log_seq: None,
144            last_transition_event_log_seq: None,
145            opened_at: t0(),
146            terminal_at: None,
147            superseded_at: None,
148        }
149    }
150
151    #[test]
152    fn host_state_changed_to_reverted_transitions() {
153        let event = RolloutEvent::HostStateChanged {
154            rollout_id: "r1".into(),
155            host_id: "h1".into(),
156            from: HostState::Soaking,
157            to: HostState::Reverted,
158            at: t0(),
159        };
160        let (record, _) = step(active_record(), event, t0()).unwrap();
161        assert_eq!(record.state, RolloutState::Reverted);
162    }
163
164    #[test]
165    fn host_state_changed_to_failed_transitions() {
166        let event = RolloutEvent::HostStateChanged {
167            rollout_id: "r1".into(),
168            host_id: "h1".into(),
169            from: HostState::Activating,
170            to: HostState::Failed,
171            at: t0(),
172        };
173        let (record, _) = step(active_record(), event, t0()).unwrap();
174        assert_eq!(record.state, RolloutState::Failed);
175    }
176
177    #[test]
178    fn host_state_changed_to_soaking_is_observed_only() {
179        let event = RolloutEvent::HostStateChanged {
180            rollout_id: "r1".into(),
181            host_id: "h1".into(),
182            from: HostState::Activating,
183            to: HostState::Soaking,
184            at: t0(),
185        };
186        let (record, effects) = step(active_record(), event, t0()).unwrap();
187        assert_eq!(record.state, RolloutState::Active);
188        assert!(effects.is_empty());
189    }
190
191    #[test]
192    fn wave_advanced_to_higher_wave_stays_active() {
193        let event = RolloutEvent::WaveAdvanced {
194            rollout_id: "r1".into(),
195            from_wave: 0,
196            to_wave: 1,
197            at: t0(),
198        };
199        let (record, effects) = step(active_record(), event, t0()).unwrap();
200        assert_eq!(record.state, RolloutState::Active);
201        assert_eq!(record.current_wave, 1);
202        assert!(
203            effects
204                .iter()
205                .any(|e| matches!(e, RolloutEffect::UpdateCurrentWave { wave: 1, .. }))
206        );
207    }
208
209    #[test]
210    fn wave_advanced_to_same_wave_transitions_to_converging() {
211        let event = RolloutEvent::WaveAdvanced {
212            rollout_id: "r1".into(),
213            from_wave: 0,
214            to_wave: 0,
215            at: t0(),
216        };
217        let (record, _) = step(active_record(), event, t0()).unwrap();
218        assert_eq!(record.state, RolloutState::Converging);
219    }
220
221    #[test]
222    fn rollout_terminal_transitions_with_terminal_at() {
223        let event = RolloutEvent::RolloutTerminal {
224            rollout_id: "r1".into(),
225            at: t0(),
226        };
227        let (record, _) = step(active_record(), event, t0()).unwrap();
228        assert_eq!(record.state, RolloutState::Terminal);
229        assert_eq!(record.terminal_at, Some(t0()));
230    }
231
232    #[test]
233    fn successor_opened_transitions_with_superseded_at() {
234        let event = RolloutEvent::SuccessorOpened {
235            superseded_rollout_id: "r1".into(),
236            successor_rollout_id: "r2".into(),
237            at: t0(),
238        };
239        let (record, _) = step(active_record(), event, t0()).unwrap();
240        assert_eq!(record.state, RolloutState::Superseded);
241        assert_eq!(record.superseded_at, Some(t0()));
242    }
243
244    #[test]
245    fn rollout_opened_from_active_is_illegal() {
246        let event = RolloutEvent::RolloutOpened {
247            rollout_id: "r1".into(),
248            channel: "stable".into(),
249            target_ref: "ref-1".into(),
250            at: t0(),
251        };
252        let err = step(active_record(), event, t0()).unwrap_err();
253        assert!(matches!(
254            err,
255            RolloutTransitionError::IllegalForState { .. }
256        ));
257    }
258}