nixfleet_state_machine/transitions/
pending.rs

1//! Transitions from `Pending`. Only legal events: `LocalActivate` (agent
2//! side, after long-poll receives Dispatch + cross-check vs manifest) and
3//! `RemoteDispatchAck` (CP mirror, after agent posts to `/v1/agent/events`).
4//! Both drive `Pending → Activating`.
5
6use chrono::{DateTime, Utc};
7use nixfleet_proto::RolloutPolicy;
8
9use crate::effect::{Effect, OutboundAgentEvent};
10use crate::error::TransitionError;
11use crate::event::Event;
12use crate::state::{HostRolloutState, HostState};
13
14use super::illegal;
15
16pub(super) fn handle(
17    mut state: HostRolloutState,
18    event: Event,
19    _now: DateTime<Utc>,
20    _policy: &RolloutPolicy,
21) -> Result<(HostRolloutState, Vec<Effect>), TransitionError> {
22    match event {
23        Event::LocalActivate {
24            current_closure_at_dispatch,
25            // target_closure + soak_due_at are consumed by the agent
26            // reducer bootstrap (sets state.target_closure /
27            // state.soak_due_at before this handler runs); the
28            // state-machine transition only flips state + stamps the
29            // rollback target and dispatch_acked_at timestamps.
30            target_closure: _,
31            received_at,
32            soak_due_at: _,
33            seq,
34        } => {
35            let from = state.state;
36            state.state = HostState::Activating;
37            state.current_closure_at_dispatch = Some(current_closure_at_dispatch.clone());
38            state.dispatch_acked_at = Some(received_at);
39            state.last_event_seq = seq;
40
41            let target = state.target_closure.clone();
42            let rollout_id = state.rollout_id.clone();
43            let effects = vec![
44                Effect::RecordTransition {
45                    host: state.hostname.clone(),
46                    rollout_id: rollout_id.clone(),
47                    from,
48                    to: HostState::Activating,
49                    at: received_at,
50                },
51                Effect::LocalEmitEvent {
52                    rollout_id: rollout_id.clone(),
53                    payload: OutboundAgentEvent::DispatchAck {
54                        current_closure_at_dispatch,
55                        received_at,
56                        seq,
57                    },
58                    durable: true,
59                },
60                Effect::LocalFireSwitch { rollout_id, target },
61            ];
62            Ok((state, effects))
63        }
64
65        Event::RemoteDispatchAck {
66            current_closure_at_dispatch,
67            received_at,
68            seq,
69        } => {
70            let from = state.state;
71            state.state = HostState::Activating;
72            state.current_closure_at_dispatch = Some(current_closure_at_dispatch);
73            state.dispatch_acked_at = Some(received_at);
74            state.last_event_seq = seq;
75
76            let effects = vec![
77                Effect::RemoteAppendEventLog {
78                    host: state.hostname.clone(),
79                    rollout_id: state.rollout_id.clone(),
80                    payload: OutboundAgentEvent::DispatchAck {
81                        current_closure_at_dispatch: state
82                            .current_closure_at_dispatch
83                            .clone()
84                            .unwrap_or_default(),
85                        received_at,
86                        seq,
87                    },
88                },
89                Effect::RecordTransition {
90                    host: state.hostname.clone(),
91                    rollout_id: state.rollout_id.clone(),
92                    from,
93                    to: HostState::Activating,
94                    at: received_at,
95                },
96            ];
97            Ok((state, effects))
98        }
99
100        other => Err(illegal(&state, &other)),
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::transitions::dispatch;
108    use chrono::{TimeZone, Utc};
109    use nixfleet_proto::{HealthGate, OnHealthFailure, RolloutPolicy};
110
111    fn t0() -> chrono::DateTime<Utc> {
112        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
113    }
114
115    fn fake_policy() -> RolloutPolicy {
116        RolloutPolicy {
117            strategy: "all-at-once".into(),
118            waves: vec![],
119            health_gate: HealthGate::default(),
120            on_health_failure: OnHealthFailure::Halt,
121        }
122    }
123
124    fn pending() -> HostRolloutState {
125        HostRolloutState::new_pending(
126            "r1".into(),
127            "h1".into(),
128            "stable".into(),
129            "abc123".into(),
130            t0(),
131            t0() + chrono::Duration::minutes(5),
132        )
133    }
134
135    #[test]
136    fn local_activate_moves_pending_to_activating() {
137        let state = pending();
138        let event = Event::LocalActivate {
139            current_closure_at_dispatch: "prior-closure".into(),
140            target_closure: "target".into(),
141            received_at: t0() + chrono::Duration::seconds(1),
142            soak_due_at: t0() + chrono::Duration::minutes(5),
143            seq: 1,
144        };
145        let (next, effects) = dispatch(state, event, t0(), &fake_policy()).unwrap();
146        assert_eq!(next.state, HostState::Activating);
147        assert_eq!(
148            next.current_closure_at_dispatch.as_deref(),
149            Some("prior-closure")
150        );
151        assert!(next.dispatch_acked_at.is_some());
152        assert_eq!(next.last_event_seq, 1);
153        assert!(matches!(effects[0], Effect::RecordTransition { .. }));
154        assert!(matches!(effects[1], Effect::LocalEmitEvent { .. }));
155        assert!(matches!(effects[2], Effect::LocalFireSwitch { .. }));
156    }
157
158    #[test]
159    fn remote_dispatch_ack_mirrors_pending_to_activating() {
160        let state = pending();
161        let event = Event::RemoteDispatchAck {
162            current_closure_at_dispatch: "prior-closure".into(),
163            received_at: t0() + chrono::Duration::seconds(1),
164            seq: 1,
165        };
166        let (next, effects) = dispatch(state, event, t0(), &fake_policy()).unwrap();
167        assert_eq!(next.state, HostState::Activating);
168        assert!(matches!(effects[0], Effect::RemoteAppendEventLog { .. }));
169        assert!(matches!(effects[1], Effect::RecordTransition { .. }));
170    }
171
172    #[test]
173    fn illegal_event_in_pending_returns_illegal_for_state() {
174        let state = pending();
175        let event = Event::LocalActivationCompleted {
176            observed_current_closure: "abc123".into(),
177            exit_code: 0,
178            completed_at: t0(),
179            seq: 1,
180        };
181        let err = dispatch(state, event, t0(), &fake_policy()).unwrap_err();
182        assert!(matches!(err, TransitionError::IllegalForState { .. }));
183    }
184
185    #[test]
186    fn seq_regression_rejected() {
187        let mut state = pending();
188        state.last_event_seq = 5;
189        let event = Event::LocalActivate {
190            current_closure_at_dispatch: "prior".into(),
191            target_closure: "target".into(),
192            received_at: t0(),
193            soak_due_at: t0() + chrono::Duration::minutes(5),
194            seq: 5, // <= last_event_seq
195        };
196        let err = dispatch(state, event, t0(), &fake_policy()).unwrap_err();
197        assert!(matches!(err, TransitionError::SeqRegression { .. }));
198    }
199}