nixfleet_state_machine/transitions/
soaking.rs

1//! Transitions from `Soaking`.
2//!
3//! - `LocalProbeObservedFirst` / `RemoteProbeObservedFirst` — stamps
4//!   `probe_observed_first_at`. Soak gate may now consult probe state.
5//! - `LocalProbeResult` / `RemoteProbeResult` — updates probe map.
6//! - `LocalProbeFailureFirst` / `RemoteProbeFailureFirst` — stamps
7//!   `probe_failure_first_at`. Sweep window measured from this exact
8//!   agent-supplied timestamp.
9//! - `LocalSustainedFailureCrossed` / `RemoteFailed` — drives
10//!   `Soaking → Failed`. Agent reads `onHealthFailure` and (if
11//!   `rollback-and-halt`) fires rollback in same handler.
12//! - `LocalConvergedReached` / `RemoteConverged` — drives
13//!   `Soaking → Converged` after re-verifying the three §4.2 invariants.
14
15use chrono::{DateTime, Utc};
16use nixfleet_proto::{OnHealthFailure, RolloutPolicy};
17
18use crate::effect::{Effect, OutboundAgentEvent};
19use crate::error::TransitionError;
20use crate::event::Event;
21use crate::state::{HostRolloutState, HostState, ProbeMode, ProbeRecord, ProbeStatus};
22
23use super::illegal;
24
25pub(super) fn handle(
26    mut state: HostRolloutState,
27    event: Event,
28    _now: DateTime<Utc>,
29    policy: &RolloutPolicy,
30) -> Result<(HostRolloutState, Vec<Effect>), TransitionError> {
31    match event {
32        Event::LocalProbeTopologyDeclared {
33            probes,
34            declared_at,
35            seq,
36        } => {
37            state.last_event_seq = seq;
38            let rollout_id = state.rollout_id.clone();
39            Ok((
40                state,
41                vec![Effect::LocalEmitEvent {
42                    rollout_id,
43                    payload: OutboundAgentEvent::ProbeTopologyDeclared {
44                        probes,
45                        declared_at,
46                        seq,
47                    },
48                    durable: true,
49                }],
50            ))
51        }
52        Event::RemoteProbeTopologyDeclared {
53            probes,
54            declared_at,
55            seq,
56        } => {
57            state.last_event_seq = seq;
58            let host = state.hostname.clone();
59            let rollout_id = state.rollout_id.clone();
60            Ok((
61                state,
62                vec![Effect::RemoteAppendEventLog {
63                    host,
64                    rollout_id,
65                    payload: OutboundAgentEvent::ProbeTopologyDeclared {
66                        probes,
67                        declared_at,
68                        seq,
69                    },
70                }],
71            ))
72        }
73
74        Event::LocalProbeObservedFirst {
75            probe_name,
76            mode,
77            observed_at,
78            seq,
79        } => {
80            if state.probe_observed_first_at.is_none() {
81                state.probe_observed_first_at = Some(observed_at);
82            }
83            state.last_event_seq = seq;
84            let rollout_id = state.rollout_id.clone();
85            Ok((
86                state,
87                vec![Effect::LocalEmitEvent {
88                    rollout_id,
89                    payload: OutboundAgentEvent::ProbeObservedFirst {
90                        probe_name,
91                        mode,
92                        observed_at,
93                        seq,
94                    },
95                    durable: false,
96                }],
97            ))
98        }
99        Event::RemoteProbeObservedFirst {
100            probe_name,
101            mode,
102            observed_at,
103            seq,
104        } => {
105            if state.probe_observed_first_at.is_none() {
106                state.probe_observed_first_at = Some(observed_at);
107            }
108            state.last_event_seq = seq;
109            Ok((
110                state.clone(),
111                vec![Effect::RemoteAppendEventLog {
112                    host: state.hostname,
113                    rollout_id: state.rollout_id,
114                    payload: OutboundAgentEvent::ProbeObservedFirst {
115                        probe_name,
116                        mode,
117                        observed_at,
118                        seq,
119                    },
120                }],
121            ))
122        }
123
124        Event::LocalProbeResult {
125            probe_name,
126            mode,
127            status,
128            observed_at,
129            failure_reason,
130            sub_results,
131            seq,
132        } => {
133            update_probe(
134                &mut state,
135                &probe_name,
136                mode,
137                status,
138                observed_at,
139                failure_reason.clone(),
140            );
141            state.last_event_seq = seq;
142            let rollout_id = state.rollout_id.clone();
143            Ok((
144                state,
145                vec![Effect::LocalEmitEvent {
146                    rollout_id,
147                    payload: OutboundAgentEvent::ProbeResult {
148                        probe_name,
149                        mode,
150                        status,
151                        observed_at,
152                        failure_reason,
153                        sub_results,
154                        seq,
155                    },
156                    durable: false,
157                }],
158            ))
159        }
160        Event::RemoteProbeResult {
161            probe_name,
162            mode,
163            status,
164            observed_at,
165            failure_reason,
166            sub_results,
167            seq,
168        } => {
169            update_probe(
170                &mut state,
171                &probe_name,
172                mode,
173                status,
174                observed_at,
175                failure_reason.clone(),
176            );
177            state.last_event_seq = seq;
178            let host = state.hostname.clone();
179            let rollout_id = state.rollout_id.clone();
180            Ok((
181                state,
182                vec![Effect::RemoteAppendEventLog {
183                    host,
184                    rollout_id,
185                    payload: OutboundAgentEvent::ProbeResult {
186                        probe_name,
187                        mode,
188                        status,
189                        observed_at,
190                        failure_reason,
191                        sub_results,
192                        seq,
193                    },
194                }],
195            ))
196        }
197
198        Event::LocalProbeFailureFirst {
199            probe_name,
200            mode,
201            first_failed_at,
202            seq,
203        } => {
204            if state.probe_failure_first_at.is_none() {
205                state.probe_failure_first_at = Some(first_failed_at);
206            }
207            state.last_event_seq = seq;
208            let rollout_id = state.rollout_id.clone();
209            Ok((
210                state,
211                vec![Effect::LocalEmitEvent {
212                    rollout_id,
213                    payload: OutboundAgentEvent::ProbeFailureFirst {
214                        probe_name,
215                        mode,
216                        first_failed_at,
217                        seq,
218                    },
219                    durable: false,
220                }],
221            ))
222        }
223        Event::RemoteProbeFailureFirst {
224            probe_name,
225            mode,
226            first_failed_at,
227            seq,
228        } => {
229            if state.probe_failure_first_at.is_none() {
230                state.probe_failure_first_at = Some(first_failed_at);
231            }
232            state.last_event_seq = seq;
233            let host = state.hostname.clone();
234            let rollout_id = state.rollout_id.clone();
235            Ok((
236                state,
237                vec![Effect::RemoteAppendEventLog {
238                    host,
239                    rollout_id,
240                    payload: OutboundAgentEvent::ProbeFailureFirst {
241                        probe_name,
242                        mode,
243                        first_failed_at,
244                        seq,
245                    },
246                }],
247            ))
248        }
249
250        // Soaking → Failed (agent self-detected sustained failure)
251        Event::LocalSustainedFailureCrossed {
252            failed_at,
253            sustained_duration_secs,
254            failing_probes,
255            policy_applied,
256            seq,
257        } => {
258            let from = state.state;
259            state.state = HostState::Failed;
260            state.failed_at = Some(failed_at);
261            state.policy_applied = Some(policy_applied);
262            state.last_event_seq = seq;
263
264            let mut effects = vec![
265                Effect::LocalEmitEvent {
266                    rollout_id: state.rollout_id.clone(),
267                    payload: OutboundAgentEvent::Failed {
268                        failed_at,
269                        sustained_duration_secs,
270                        failing_probes: failing_probes.clone(),
271                        policy_applied,
272                        seq,
273                    },
274                    durable: true,
275                },
276                Effect::RecordTransition {
277                    host: state.hostname.clone(),
278                    rollout_id: state.rollout_id.clone(),
279                    from,
280                    to: HostState::Failed,
281                    at: failed_at,
282                },
283            ];
284            if matches!(policy_applied, OnHealthFailure::RollbackAndHalt)
285                && let Some(prior) = state.current_closure_at_dispatch.clone()
286            {
287                effects.push(Effect::LocalFireRollbackTo {
288                    rollout_id: state.rollout_id.clone(),
289                    closure: prior,
290                });
291            }
292            Ok((state, effects))
293        }
294        Event::RemoteFailed {
295            failed_at,
296            sustained_duration_secs,
297            failing_probes,
298            policy_applied,
299            seq,
300        } => {
301            let from = state.state;
302            state.state = HostState::Failed;
303            state.failed_at = Some(failed_at);
304            state.policy_applied = Some(policy_applied);
305            state.last_event_seq = seq;
306
307            let effects = vec![
308                Effect::RemoteAppendEventLog {
309                    host: state.hostname.clone(),
310                    rollout_id: state.rollout_id.clone(),
311                    payload: OutboundAgentEvent::Failed {
312                        failed_at,
313                        sustained_duration_secs,
314                        failing_probes,
315                        policy_applied,
316                        seq,
317                    },
318                },
319                Effect::RecordTransition {
320                    host: state.hostname.clone(),
321                    rollout_id: state.rollout_id.clone(),
322                    from,
323                    to: HostState::Failed,
324                    at: failed_at,
325                },
326            ];
327            Ok((state, effects))
328        }
329
330        // Soaking → Converged (after re-verifying RFC-0005 §4.2 invariants)
331        Event::LocalConvergedReached {
332            converged_at,
333            current_closure,
334            seq,
335        } => {
336            verify_converged_invariants(&state, &current_closure, converged_at)?;
337            let from = state.state;
338            state.state = HostState::Converged;
339            state.converged_at = Some(converged_at);
340            state.current_closure = Some(current_closure.clone());
341            state.last_event_seq = seq;
342
343            let effects = vec![
344                Effect::LocalEmitEvent {
345                    rollout_id: state.rollout_id.clone(),
346                    payload: OutboundAgentEvent::Converged {
347                        converged_at,
348                        current_closure,
349                        seq,
350                    },
351                    durable: true,
352                },
353                Effect::RecordTransition {
354                    host: state.hostname.clone(),
355                    rollout_id: state.rollout_id.clone(),
356                    from,
357                    to: HostState::Converged,
358                    at: converged_at,
359                },
360            ];
361            // policy currently unused for Converged but available for future
362            // wave-promotion side effects.
363            let _ = policy;
364            Ok((state, effects))
365        }
366        Event::RemoteConverged {
367            converged_at,
368            current_closure,
369            seq,
370        } => {
371            verify_converged_invariants(&state, &current_closure, converged_at)?;
372            let from = state.state;
373            state.state = HostState::Converged;
374            state.converged_at = Some(converged_at);
375            state.current_closure = Some(current_closure.clone());
376            state.last_event_seq = seq;
377
378            let effects = vec![
379                Effect::RemoteAppendEventLog {
380                    host: state.hostname.clone(),
381                    rollout_id: state.rollout_id.clone(),
382                    payload: OutboundAgentEvent::Converged {
383                        converged_at,
384                        current_closure,
385                        seq,
386                    },
387                },
388                Effect::RecordTransition {
389                    host: state.hostname.clone(),
390                    rollout_id: state.rollout_id.clone(),
391                    from,
392                    to: HostState::Converged,
393                    at: converged_at,
394                },
395            ];
396            Ok((state, effects))
397        }
398
399        other => Err(illegal(&state, &other)),
400    }
401}
402
403fn update_probe(
404    state: &mut HostRolloutState,
405    name: &str,
406    mode: crate::state::ProbeMode,
407    status: ProbeStatus,
408    observed_at: chrono::DateTime<chrono::Utc>,
409    failure_reason: Option<String>,
410) {
411    let entry = state.probes.entry(name.to_string()).or_insert(ProbeRecord {
412        status,
413        mode,
414        last_observed_at: observed_at,
415        last_pass_at: None,
416        failure_reason: None,
417    });
418    entry.status = status;
419    entry.mode = mode;
420    entry.last_observed_at = observed_at;
421    if matches!(status, ProbeStatus::Pass) {
422        entry.last_pass_at = Some(observed_at);
423        entry.failure_reason = None;
424    } else {
425        entry.failure_reason = failure_reason;
426    }
427}
428
429/// RFC-0005 §4.2 `Converged` event: CP re-verifies the three invariants
430/// before transitioning. Same check runs agent-side too — same code, both
431/// sides (RFC-0006 §2 principle 4).
432fn verify_converged_invariants(
433    state: &HostRolloutState,
434    current_closure: &str,
435    converged_at: chrono::DateTime<chrono::Utc>,
436) -> Result<(), TransitionError> {
437    // Invariant 1: current_closure == target_closure
438    if current_closure != state.target_closure {
439        return Err(TransitionError::Invariant(
440            "Converged event: current_closure != target_closure",
441        ));
442    }
443    // Invariant 2: soak_due_at has elapsed
444    if let Some(soak_due) = state.soak_due_at
445        && converged_at < soak_due
446    {
447        return Err(TransitionError::Invariant(
448            "Converged event: converged_at before soak_due_at",
449        ));
450    }
451    // Invariant 3: all enforce-mode declared probes are Pass. Observe and
452    // Disabled probes do not gate per RFC-0007 §3.3 (ProbeMode docstring,
453    // state.rs); the wave-promotion gate already filters by mode, and the
454    // sustained-failure path (reducer's collect_failing_enforce_probes)
455    // is on the same side of the line — this brings convergence into
456    // parity. An empty enforce set is acceptable (no enforce probes
457    // declared = no probe constraint).
458    for probe in state.probes.values() {
459        if matches!(probe.mode, ProbeMode::Enforce) && !matches!(probe.status, ProbeStatus::Pass) {
460            return Err(TransitionError::Invariant(
461                "Converged event: at least one enforce-mode probe is not Pass",
462            ));
463        }
464    }
465    Ok(())
466}