1use chrono::{DateTime, Utc};
16use nixfleet_proto::{OnHealthFailure, RolloutPolicy};
17
18use crate::effect::{Effect, OutboundAgentEvent};
19use crate::error::TransitionError;
20use crate::event::Event;
21use crate::state::{HostRolloutState, HostState, ProbeMode, ProbeRecord, ProbeStatus};
22
23use super::illegal;
24
25pub(super) fn handle(
26 mut state: HostRolloutState,
27 event: Event,
28 _now: DateTime<Utc>,
29 policy: &RolloutPolicy,
30) -> Result<(HostRolloutState, Vec<Effect>), TransitionError> {
31 match event {
32 Event::LocalProbeTopologyDeclared {
33 probes,
34 declared_at,
35 seq,
36 } => {
37 state.last_event_seq = seq;
38 let rollout_id = state.rollout_id.clone();
39 Ok((
40 state,
41 vec![Effect::LocalEmitEvent {
42 rollout_id,
43 payload: OutboundAgentEvent::ProbeTopologyDeclared {
44 probes,
45 declared_at,
46 seq,
47 },
48 durable: true,
49 }],
50 ))
51 }
52 Event::RemoteProbeTopologyDeclared {
53 probes,
54 declared_at,
55 seq,
56 } => {
57 state.last_event_seq = seq;
58 let host = state.hostname.clone();
59 let rollout_id = state.rollout_id.clone();
60 Ok((
61 state,
62 vec![Effect::RemoteAppendEventLog {
63 host,
64 rollout_id,
65 payload: OutboundAgentEvent::ProbeTopologyDeclared {
66 probes,
67 declared_at,
68 seq,
69 },
70 }],
71 ))
72 }
73
74 Event::LocalProbeObservedFirst {
75 probe_name,
76 mode,
77 observed_at,
78 seq,
79 } => {
80 if state.probe_observed_first_at.is_none() {
81 state.probe_observed_first_at = Some(observed_at);
82 }
83 state.last_event_seq = seq;
84 let rollout_id = state.rollout_id.clone();
85 Ok((
86 state,
87 vec![Effect::LocalEmitEvent {
88 rollout_id,
89 payload: OutboundAgentEvent::ProbeObservedFirst {
90 probe_name,
91 mode,
92 observed_at,
93 seq,
94 },
95 durable: false,
96 }],
97 ))
98 }
99 Event::RemoteProbeObservedFirst {
100 probe_name,
101 mode,
102 observed_at,
103 seq,
104 } => {
105 if state.probe_observed_first_at.is_none() {
106 state.probe_observed_first_at = Some(observed_at);
107 }
108 state.last_event_seq = seq;
109 Ok((
110 state.clone(),
111 vec![Effect::RemoteAppendEventLog {
112 host: state.hostname,
113 rollout_id: state.rollout_id,
114 payload: OutboundAgentEvent::ProbeObservedFirst {
115 probe_name,
116 mode,
117 observed_at,
118 seq,
119 },
120 }],
121 ))
122 }
123
124 Event::LocalProbeResult {
125 probe_name,
126 mode,
127 status,
128 observed_at,
129 failure_reason,
130 sub_results,
131 seq,
132 } => {
133 update_probe(
134 &mut state,
135 &probe_name,
136 mode,
137 status,
138 observed_at,
139 failure_reason.clone(),
140 );
141 state.last_event_seq = seq;
142 let rollout_id = state.rollout_id.clone();
143 Ok((
144 state,
145 vec![Effect::LocalEmitEvent {
146 rollout_id,
147 payload: OutboundAgentEvent::ProbeResult {
148 probe_name,
149 mode,
150 status,
151 observed_at,
152 failure_reason,
153 sub_results,
154 seq,
155 },
156 durable: false,
157 }],
158 ))
159 }
160 Event::RemoteProbeResult {
161 probe_name,
162 mode,
163 status,
164 observed_at,
165 failure_reason,
166 sub_results,
167 seq,
168 } => {
169 update_probe(
170 &mut state,
171 &probe_name,
172 mode,
173 status,
174 observed_at,
175 failure_reason.clone(),
176 );
177 state.last_event_seq = seq;
178 let host = state.hostname.clone();
179 let rollout_id = state.rollout_id.clone();
180 Ok((
181 state,
182 vec![Effect::RemoteAppendEventLog {
183 host,
184 rollout_id,
185 payload: OutboundAgentEvent::ProbeResult {
186 probe_name,
187 mode,
188 status,
189 observed_at,
190 failure_reason,
191 sub_results,
192 seq,
193 },
194 }],
195 ))
196 }
197
198 Event::LocalProbeFailureFirst {
199 probe_name,
200 mode,
201 first_failed_at,
202 seq,
203 } => {
204 if state.probe_failure_first_at.is_none() {
205 state.probe_failure_first_at = Some(first_failed_at);
206 }
207 state.last_event_seq = seq;
208 let rollout_id = state.rollout_id.clone();
209 Ok((
210 state,
211 vec![Effect::LocalEmitEvent {
212 rollout_id,
213 payload: OutboundAgentEvent::ProbeFailureFirst {
214 probe_name,
215 mode,
216 first_failed_at,
217 seq,
218 },
219 durable: false,
220 }],
221 ))
222 }
223 Event::RemoteProbeFailureFirst {
224 probe_name,
225 mode,
226 first_failed_at,
227 seq,
228 } => {
229 if state.probe_failure_first_at.is_none() {
230 state.probe_failure_first_at = Some(first_failed_at);
231 }
232 state.last_event_seq = seq;
233 let host = state.hostname.clone();
234 let rollout_id = state.rollout_id.clone();
235 Ok((
236 state,
237 vec![Effect::RemoteAppendEventLog {
238 host,
239 rollout_id,
240 payload: OutboundAgentEvent::ProbeFailureFirst {
241 probe_name,
242 mode,
243 first_failed_at,
244 seq,
245 },
246 }],
247 ))
248 }
249
250 Event::LocalSustainedFailureCrossed {
252 failed_at,
253 sustained_duration_secs,
254 failing_probes,
255 policy_applied,
256 seq,
257 } => {
258 let from = state.state;
259 state.state = HostState::Failed;
260 state.failed_at = Some(failed_at);
261 state.policy_applied = Some(policy_applied);
262 state.last_event_seq = seq;
263
264 let mut effects = vec![
265 Effect::LocalEmitEvent {
266 rollout_id: state.rollout_id.clone(),
267 payload: OutboundAgentEvent::Failed {
268 failed_at,
269 sustained_duration_secs,
270 failing_probes: failing_probes.clone(),
271 policy_applied,
272 seq,
273 },
274 durable: true,
275 },
276 Effect::RecordTransition {
277 host: state.hostname.clone(),
278 rollout_id: state.rollout_id.clone(),
279 from,
280 to: HostState::Failed,
281 at: failed_at,
282 },
283 ];
284 if matches!(policy_applied, OnHealthFailure::RollbackAndHalt)
285 && let Some(prior) = state.current_closure_at_dispatch.clone()
286 {
287 effects.push(Effect::LocalFireRollbackTo {
288 rollout_id: state.rollout_id.clone(),
289 closure: prior,
290 });
291 }
292 Ok((state, effects))
293 }
294 Event::RemoteFailed {
295 failed_at,
296 sustained_duration_secs,
297 failing_probes,
298 policy_applied,
299 seq,
300 } => {
301 let from = state.state;
302 state.state = HostState::Failed;
303 state.failed_at = Some(failed_at);
304 state.policy_applied = Some(policy_applied);
305 state.last_event_seq = seq;
306
307 let effects = vec![
308 Effect::RemoteAppendEventLog {
309 host: state.hostname.clone(),
310 rollout_id: state.rollout_id.clone(),
311 payload: OutboundAgentEvent::Failed {
312 failed_at,
313 sustained_duration_secs,
314 failing_probes,
315 policy_applied,
316 seq,
317 },
318 },
319 Effect::RecordTransition {
320 host: state.hostname.clone(),
321 rollout_id: state.rollout_id.clone(),
322 from,
323 to: HostState::Failed,
324 at: failed_at,
325 },
326 ];
327 Ok((state, effects))
328 }
329
330 Event::LocalConvergedReached {
332 converged_at,
333 current_closure,
334 seq,
335 } => {
336 verify_converged_invariants(&state, ¤t_closure, converged_at)?;
337 let from = state.state;
338 state.state = HostState::Converged;
339 state.converged_at = Some(converged_at);
340 state.current_closure = Some(current_closure.clone());
341 state.last_event_seq = seq;
342
343 let effects = vec![
344 Effect::LocalEmitEvent {
345 rollout_id: state.rollout_id.clone(),
346 payload: OutboundAgentEvent::Converged {
347 converged_at,
348 current_closure,
349 seq,
350 },
351 durable: true,
352 },
353 Effect::RecordTransition {
354 host: state.hostname.clone(),
355 rollout_id: state.rollout_id.clone(),
356 from,
357 to: HostState::Converged,
358 at: converged_at,
359 },
360 ];
361 let _ = policy;
364 Ok((state, effects))
365 }
366 Event::RemoteConverged {
367 converged_at,
368 current_closure,
369 seq,
370 } => {
371 verify_converged_invariants(&state, ¤t_closure, converged_at)?;
372 let from = state.state;
373 state.state = HostState::Converged;
374 state.converged_at = Some(converged_at);
375 state.current_closure = Some(current_closure.clone());
376 state.last_event_seq = seq;
377
378 let effects = vec![
379 Effect::RemoteAppendEventLog {
380 host: state.hostname.clone(),
381 rollout_id: state.rollout_id.clone(),
382 payload: OutboundAgentEvent::Converged {
383 converged_at,
384 current_closure,
385 seq,
386 },
387 },
388 Effect::RecordTransition {
389 host: state.hostname.clone(),
390 rollout_id: state.rollout_id.clone(),
391 from,
392 to: HostState::Converged,
393 at: converged_at,
394 },
395 ];
396 Ok((state, effects))
397 }
398
399 other => Err(illegal(&state, &other)),
400 }
401}
402
403fn update_probe(
404 state: &mut HostRolloutState,
405 name: &str,
406 mode: crate::state::ProbeMode,
407 status: ProbeStatus,
408 observed_at: chrono::DateTime<chrono::Utc>,
409 failure_reason: Option<String>,
410) {
411 let entry = state.probes.entry(name.to_string()).or_insert(ProbeRecord {
412 status,
413 mode,
414 last_observed_at: observed_at,
415 last_pass_at: None,
416 failure_reason: None,
417 });
418 entry.status = status;
419 entry.mode = mode;
420 entry.last_observed_at = observed_at;
421 if matches!(status, ProbeStatus::Pass) {
422 entry.last_pass_at = Some(observed_at);
423 entry.failure_reason = None;
424 } else {
425 entry.failure_reason = failure_reason;
426 }
427}
428
429fn verify_converged_invariants(
433 state: &HostRolloutState,
434 current_closure: &str,
435 converged_at: chrono::DateTime<chrono::Utc>,
436) -> Result<(), TransitionError> {
437 if current_closure != state.target_closure {
439 return Err(TransitionError::Invariant(
440 "Converged event: current_closure != target_closure",
441 ));
442 }
443 if let Some(soak_due) = state.soak_due_at
445 && converged_at < soak_due
446 {
447 return Err(TransitionError::Invariant(
448 "Converged event: converged_at before soak_due_at",
449 ));
450 }
451 for probe in state.probes.values() {
459 if matches!(probe.mode, ProbeMode::Enforce) && !matches!(probe.status, ProbeStatus::Pass) {
460 return Err(TransitionError::Invariant(
461 "Converged event: at least one enforce-mode probe is not Pass",
462 ));
463 }
464 }
465 Ok(())
466}