nixfleet_state_machine/rollout/transitions/
converging.rs

1//! `Converging` source state. All current-wave hosts have reached
2//! Soaking or beyond; later waves remain to dispatch (or, with no
3//! more waves, all hosts await Converged).
4
5use chrono::{DateTime, Utc};
6
7use super::{illegal, transition_effect};
8use crate::HostState;
9use crate::rollout::effect::RolloutEffect;
10use crate::rollout::error::RolloutTransitionError;
11use crate::rollout::event::RolloutEvent;
12use crate::rollout::state::{RolloutRecord, RolloutState};
13
14pub(super) fn step(
15    mut record: RolloutRecord,
16    event: RolloutEvent,
17    _now: DateTime<Utc>,
18) -> Result<(RolloutRecord, Vec<RolloutEffect>), RolloutTransitionError> {
19    match event {
20        // Next wave dispatched โ†’ back to Active.
21        RolloutEvent::WaveAdvanced {
22            from_wave,
23            to_wave,
24            at,
25            ..
26        } if to_wave > from_wave => {
27            record.state = RolloutState::Active;
28            let mut effects = Vec::new();
29            effects.push(transition_effect(
30                &record,
31                RolloutState::Converging,
32                RolloutState::Active,
33                at,
34            ));
35            // Monotonic โ€” never roll back current_wave.
36            if to_wave > record.current_wave {
37                record.current_wave = to_wave;
38                effects.push(RolloutEffect::UpdateCurrentWave {
39                    rollout_id: record.rollout_id.clone(),
40                    wave: to_wave,
41                });
42            }
43            Ok((record, effects))
44        }
45
46        // Idempotent: already-Converging WaveAdvanced for the same wave.
47        RolloutEvent::WaveAdvanced { .. } => Ok((record, Vec::new())),
48
49        // All hosts in all waves Converged.
50        RolloutEvent::RolloutTerminal { at, .. } => {
51            record.state = RolloutState::Terminal;
52            record.terminal_at = Some(at);
53            let effects = vec![transition_effect(
54                &record,
55                RolloutState::Converging,
56                RolloutState::Terminal,
57                at,
58            )];
59            Ok((record, effects))
60        }
61
62        // A host slipped back from Soaking to Failed/Reverted late.
63        RolloutEvent::HostStateChanged { to, at, .. } => match to {
64            HostState::Reverted => {
65                record.state = RolloutState::Reverted;
66                let effects = vec![transition_effect(
67                    &record,
68                    RolloutState::Converging,
69                    RolloutState::Reverted,
70                    at,
71                )];
72                Ok((record, effects))
73            }
74            HostState::Failed => {
75                record.state = RolloutState::Failed;
76                let effects = vec![transition_effect(
77                    &record,
78                    RolloutState::Converging,
79                    RolloutState::Failed,
80                    at,
81                )];
82                Ok((record, effects))
83            }
84            _ => Ok((record, Vec::new())),
85        },
86
87        RolloutEvent::SuccessorOpened { at, .. } => {
88            record.state = RolloutState::Superseded;
89            record.superseded_at = Some(at);
90            let effects = vec![transition_effect(
91                &record,
92                RolloutState::Converging,
93                RolloutState::Superseded,
94                at,
95            )];
96            Ok((record, effects))
97        }
98
99        // HostJoined is observed but does NOT mutate `current_wave` โ€”
100        // the wave-cursor progresses ONLY via `advance_current_waves`
101        // and the resulting `WaveAdvanced` event (RFC-0008 ยง6.3). See
102        // opening.rs's HostJoined arm for the full rationale.
103        RolloutEvent::HostJoined { .. } => Ok((record, Vec::new())),
104
105        RolloutEvent::RolloutOpened { .. }
106        | RolloutEvent::RetentionExpired { .. }
107        | RolloutEvent::OperatorClearance { .. } => Err(illegal(
108            RolloutState::Converging,
109            &event,
110            record.rollout_id.clone(),
111        )),
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use chrono::TimeZone;
119
120    fn t0() -> DateTime<Utc> {
121        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
122    }
123
124    fn converging_record() -> RolloutRecord {
125        RolloutRecord {
126            rollout_id: "r1".into(),
127            channel: "stable".into(),
128            target_ref: "ref-1".into(),
129            state: RolloutState::Converging,
130            current_wave: 0,
131            opened_event_log_seq: None,
132            last_transition_event_log_seq: None,
133            opened_at: t0(),
134            terminal_at: None,
135            superseded_at: None,
136        }
137    }
138
139    #[test]
140    fn wave_advanced_to_higher_returns_to_active() {
141        let event = RolloutEvent::WaveAdvanced {
142            rollout_id: "r1".into(),
143            from_wave: 0,
144            to_wave: 1,
145            at: t0(),
146        };
147        let (record, _) = step(converging_record(), event, t0()).unwrap();
148        assert_eq!(record.state, RolloutState::Active);
149        assert_eq!(record.current_wave, 1);
150    }
151
152    #[test]
153    fn rollout_terminal_transitions() {
154        let event = RolloutEvent::RolloutTerminal {
155            rollout_id: "r1".into(),
156            at: t0(),
157        };
158        let (record, _) = step(converging_record(), event, t0()).unwrap();
159        assert_eq!(record.state, RolloutState::Terminal);
160        assert_eq!(record.terminal_at, Some(t0()));
161    }
162
163    #[test]
164    fn late_host_failed_transitions_to_failed() {
165        let event = RolloutEvent::HostStateChanged {
166            rollout_id: "r1".into(),
167            host_id: "h1".into(),
168            from: HostState::Soaking,
169            to: HostState::Failed,
170            at: t0(),
171        };
172        let (record, _) = step(converging_record(), event, t0()).unwrap();
173        assert_eq!(record.state, RolloutState::Failed);
174    }
175}