nixfleet_state_machine/rollout/transitions/
opening.rs

1//! `Opening` source state. The rollout has been opened but no hosts
2//! have joined yet (RFC-0008 §3).
3
4use chrono::{DateTime, Utc};
5
6use super::{illegal, transition_effect};
7use crate::rollout::effect::RolloutEffect;
8use crate::rollout::error::RolloutTransitionError;
9use crate::rollout::event::RolloutEvent;
10use crate::rollout::state::{RolloutRecord, RolloutState};
11
12pub(super) fn step(
13    mut record: RolloutRecord,
14    event: RolloutEvent,
15    _now: DateTime<Utc>,
16) -> Result<(RolloutRecord, Vec<RolloutEffect>), RolloutTransitionError> {
17    match event {
18        // First host dispatched into the rollout → Active.
19        //
20        // LOADBEARING: `wave` is recorded in the event for event_log
21        // audit / replay reconstruction (which wave was the first
22        // joiner) but does NOT mutate `record.current_wave`. The
23        // wave-promotion gate reads `current_wave` as the "wave
24        // cursor for which dispatches are currently allowed" per
25        // RFC-0008 §6.3; bumping the cursor on HostJoined would leak
26        // it forward — wave-N+1 hosts dispatching alongside wave-N
27        // hosts on the first plan tick would set the cursor to
28        // max-of-joiners' wave_index, passing wave-N+1 vacuously.
29        // The cursor advances ONLY via deliberate progression:
30        // `advance_current_waves` in the reducer when every host in
31        // `current_wave` reaches Converged → emits `WaveAdvanced` →
32        // the active.rs transition bumps the cursor.
33        RolloutEvent::HostJoined { at, .. } => {
34            record.state = RolloutState::Active;
35            Ok((
36                record.clone(),
37                vec![transition_effect(
38                    &record,
39                    RolloutState::Opening,
40                    RolloutState::Active,
41                    at,
42                )],
43            ))
44        }
45
46        // A successor was opened before any host joined — race condition
47        // operators may encounter on a rapid manifest republish.
48        RolloutEvent::SuccessorOpened { at, .. } => {
49            record.state = RolloutState::Superseded;
50            record.superseded_at = Some(at);
51            let effects = vec![transition_effect(
52                &record,
53                RolloutState::Opening,
54                RolloutState::Superseded,
55                at,
56            )];
57            Ok((record, effects))
58        }
59
60        // OperatorClearance from Opening would be a no-op (record an
61        // event_log entry but no state change). Wiring deferred per
62        // v0.2.1-followups; marked illegal here so a future wiring
63        // step explicitly addresses the semantic instead of silently
64        // accepting it.
65        RolloutEvent::OperatorClearance { .. }
66        | RolloutEvent::RolloutOpened { .. }
67        | RolloutEvent::HostStateChanged { .. }
68        | RolloutEvent::WaveAdvanced { .. }
69        | RolloutEvent::RolloutTerminal { .. }
70        | RolloutEvent::RetentionExpired { .. } => Err(illegal(
71            RolloutState::Opening,
72            &event,
73            record.rollout_id.clone(),
74        )),
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::HostState;
82    use crate::rollout::state::RolloutRecord;
83    use chrono::TimeZone;
84
85    fn t0() -> DateTime<Utc> {
86        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
87    }
88
89    fn opening_record() -> RolloutRecord {
90        RolloutRecord {
91            rollout_id: "r1".into(),
92            channel: "stable".into(),
93            target_ref: "ref-1".into(),
94            state: RolloutState::Opening,
95            current_wave: 0,
96            opened_event_log_seq: None,
97            last_transition_event_log_seq: None,
98            opened_at: t0(),
99            terminal_at: None,
100            superseded_at: None,
101        }
102    }
103
104    #[test]
105    fn host_joined_transitions_to_active() {
106        let event = RolloutEvent::HostJoined {
107            rollout_id: "r1".into(),
108            host_id: "h1".into(),
109            wave: 0,
110            at: t0(),
111        };
112        let (record, effects) = step(opening_record(), event, t0()).unwrap();
113        assert_eq!(record.state, RolloutState::Active);
114        assert_eq!(record.current_wave, 0);
115        assert_eq!(effects.len(), 1);
116        assert!(matches!(
117            effects[0],
118            RolloutEffect::RecordRolloutTransition {
119                from: RolloutState::Opening,
120                to: RolloutState::Active,
121                ..
122            }
123        ));
124    }
125
126    /// Regression pin: HostJoined MUST NOT mutate `current_wave`.
127    /// Assigning the joiner's `wave` to the cursor would leak the
128    /// wave-promotion cursor forward when multi-wave rollouts have
129    /// wave-N+1 hosts joining on the first plan tick — the cursor
130    /// must stay at the canonical initial value (0) so the
131    /// wave-promotion gate correctly blocks higher-wave dispatches.
132    #[test]
133    fn host_joined_does_not_mutate_current_wave_even_at_higher_wave_index() {
134        let event = RolloutEvent::HostJoined {
135            rollout_id: "r1".into(),
136            host_id: "h1".into(),
137            wave: 2,
138            at: t0(),
139        };
140        let (record, effects) = step(opening_record(), event, t0()).unwrap();
141        assert_eq!(
142            record.current_wave, 0,
143            "HostJoined MUST NOT bump current_wave — the wave cursor advances ONLY via advance_current_waves + WaveAdvanced"
144        );
145        assert!(
146            !effects
147                .iter()
148                .any(|e| matches!(e, RolloutEffect::UpdateCurrentWave { .. })),
149            "HostJoined MUST NOT emit UpdateCurrentWave; emitting it would persist the wrong cursor in the rollouts table"
150        );
151        // The transition itself still fires.
152        assert_eq!(record.state, RolloutState::Active);
153        assert!(matches!(
154            effects[0],
155            RolloutEffect::RecordRolloutTransition {
156                from: RolloutState::Opening,
157                to: RolloutState::Active,
158                ..
159            }
160        ));
161    }
162
163    #[test]
164    fn successor_opened_transitions_to_superseded() {
165        let event = RolloutEvent::SuccessorOpened {
166            superseded_rollout_id: "r1".into(),
167            successor_rollout_id: "r2".into(),
168            at: t0(),
169        };
170        let (record, _effects) = step(opening_record(), event, t0()).unwrap();
171        assert_eq!(record.state, RolloutState::Superseded);
172        assert_eq!(record.superseded_at, Some(t0()));
173    }
174
175    #[test]
176    fn rollout_terminal_from_opening_is_illegal() {
177        let event = RolloutEvent::RolloutTerminal {
178            rollout_id: "r1".into(),
179            at: t0(),
180        };
181        let err = step(opening_record(), event, t0()).unwrap_err();
182        assert!(matches!(
183            err,
184            RolloutTransitionError::IllegalForState {
185                from: RolloutState::Opening,
186                ..
187            }
188        ));
189    }
190
191    #[test]
192    fn host_state_changed_from_opening_is_illegal() {
193        // No hosts have joined → can't have state changes.
194        let event = RolloutEvent::HostStateChanged {
195            rollout_id: "r1".into(),
196            host_id: "h1".into(),
197            from: HostState::Pending,
198            to: HostState::Activating,
199            at: t0(),
200        };
201        let err = step(opening_record(), event, t0()).unwrap_err();
202        assert!(matches!(
203            err,
204            RolloutTransitionError::IllegalForState { .. }
205        ));
206    }
207}