nixfleet_state_machine/transitions/
pending.rs1use chrono::{DateTime, Utc};
7use nixfleet_proto::RolloutPolicy;
8
9use crate::effect::{Effect, OutboundAgentEvent};
10use crate::error::TransitionError;
11use crate::event::Event;
12use crate::state::{HostRolloutState, HostState};
13
14use super::illegal;
15
16pub(super) fn handle(
17 mut state: HostRolloutState,
18 event: Event,
19 _now: DateTime<Utc>,
20 _policy: &RolloutPolicy,
21) -> Result<(HostRolloutState, Vec<Effect>), TransitionError> {
22 match event {
23 Event::LocalActivate {
24 current_closure_at_dispatch,
25 target_closure: _,
31 received_at,
32 soak_due_at: _,
33 seq,
34 } => {
35 let from = state.state;
36 state.state = HostState::Activating;
37 state.current_closure_at_dispatch = Some(current_closure_at_dispatch.clone());
38 state.dispatch_acked_at = Some(received_at);
39 state.last_event_seq = seq;
40
41 let target = state.target_closure.clone();
42 let rollout_id = state.rollout_id.clone();
43 let effects = vec![
44 Effect::RecordTransition {
45 host: state.hostname.clone(),
46 rollout_id: rollout_id.clone(),
47 from,
48 to: HostState::Activating,
49 at: received_at,
50 },
51 Effect::LocalEmitEvent {
52 rollout_id: rollout_id.clone(),
53 payload: OutboundAgentEvent::DispatchAck {
54 current_closure_at_dispatch,
55 received_at,
56 seq,
57 },
58 durable: true,
59 },
60 Effect::LocalFireSwitch { rollout_id, target },
61 ];
62 Ok((state, effects))
63 }
64
65 Event::RemoteDispatchAck {
66 current_closure_at_dispatch,
67 received_at,
68 seq,
69 } => {
70 let from = state.state;
71 state.state = HostState::Activating;
72 state.current_closure_at_dispatch = Some(current_closure_at_dispatch);
73 state.dispatch_acked_at = Some(received_at);
74 state.last_event_seq = seq;
75
76 let effects = vec![
77 Effect::RemoteAppendEventLog {
78 host: state.hostname.clone(),
79 rollout_id: state.rollout_id.clone(),
80 payload: OutboundAgentEvent::DispatchAck {
81 current_closure_at_dispatch: state
82 .current_closure_at_dispatch
83 .clone()
84 .unwrap_or_default(),
85 received_at,
86 seq,
87 },
88 },
89 Effect::RecordTransition {
90 host: state.hostname.clone(),
91 rollout_id: state.rollout_id.clone(),
92 from,
93 to: HostState::Activating,
94 at: received_at,
95 },
96 ];
97 Ok((state, effects))
98 }
99
100 other => Err(illegal(&state, &other)),
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::transitions::dispatch;
108 use chrono::{TimeZone, Utc};
109 use nixfleet_proto::{HealthGate, OnHealthFailure, RolloutPolicy};
110
111 fn t0() -> chrono::DateTime<Utc> {
112 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
113 }
114
115 fn fake_policy() -> RolloutPolicy {
116 RolloutPolicy {
117 strategy: "all-at-once".into(),
118 waves: vec![],
119 health_gate: HealthGate::default(),
120 on_health_failure: OnHealthFailure::Halt,
121 }
122 }
123
124 fn pending() -> HostRolloutState {
125 HostRolloutState::new_pending(
126 "r1".into(),
127 "h1".into(),
128 "stable".into(),
129 "abc123".into(),
130 t0(),
131 t0() + chrono::Duration::minutes(5),
132 )
133 }
134
135 #[test]
136 fn local_activate_moves_pending_to_activating() {
137 let state = pending();
138 let event = Event::LocalActivate {
139 current_closure_at_dispatch: "prior-closure".into(),
140 target_closure: "target".into(),
141 received_at: t0() + chrono::Duration::seconds(1),
142 soak_due_at: t0() + chrono::Duration::minutes(5),
143 seq: 1,
144 };
145 let (next, effects) = dispatch(state, event, t0(), &fake_policy()).unwrap();
146 assert_eq!(next.state, HostState::Activating);
147 assert_eq!(
148 next.current_closure_at_dispatch.as_deref(),
149 Some("prior-closure")
150 );
151 assert!(next.dispatch_acked_at.is_some());
152 assert_eq!(next.last_event_seq, 1);
153 assert!(matches!(effects[0], Effect::RecordTransition { .. }));
154 assert!(matches!(effects[1], Effect::LocalEmitEvent { .. }));
155 assert!(matches!(effects[2], Effect::LocalFireSwitch { .. }));
156 }
157
158 #[test]
159 fn remote_dispatch_ack_mirrors_pending_to_activating() {
160 let state = pending();
161 let event = Event::RemoteDispatchAck {
162 current_closure_at_dispatch: "prior-closure".into(),
163 received_at: t0() + chrono::Duration::seconds(1),
164 seq: 1,
165 };
166 let (next, effects) = dispatch(state, event, t0(), &fake_policy()).unwrap();
167 assert_eq!(next.state, HostState::Activating);
168 assert!(matches!(effects[0], Effect::RemoteAppendEventLog { .. }));
169 assert!(matches!(effects[1], Effect::RecordTransition { .. }));
170 }
171
172 #[test]
173 fn illegal_event_in_pending_returns_illegal_for_state() {
174 let state = pending();
175 let event = Event::LocalActivationCompleted {
176 observed_current_closure: "abc123".into(),
177 exit_code: 0,
178 completed_at: t0(),
179 seq: 1,
180 };
181 let err = dispatch(state, event, t0(), &fake_policy()).unwrap_err();
182 assert!(matches!(err, TransitionError::IllegalForState { .. }));
183 }
184
185 #[test]
186 fn seq_regression_rejected() {
187 let mut state = pending();
188 state.last_event_seq = 5;
189 let event = Event::LocalActivate {
190 current_closure_at_dispatch: "prior".into(),
191 target_closure: "target".into(),
192 received_at: t0(),
193 soak_due_at: t0() + chrono::Duration::minutes(5),
194 seq: 5, };
196 let err = dispatch(state, event, t0(), &fake_policy()).unwrap_err();
197 assert!(matches!(err, TransitionError::SeqRegression { .. }));
198 }
199}