1use std::collections::HashMap;
18
19use nixfleet_proto::RolloutPolicy;
20use nixfleet_proto::clock::ClockHandle;
21use nixfleet_reconciler::planner_types::SignedManifestSet;
22use nixfleet_state_machine::{Event, HostRolloutState, HostState};
23use tokio::sync::{mpsc, oneshot};
24use tokio_util::sync::CancellationToken;
25
26use std::sync::Arc;
27
28use super::applier::apply_effect;
29use super::outbound_queue::OutboundQueue;
30use super::{
31 ActivationIntentTx, AgentConfig, ApplierCtx, OutboundKickTx, ProbeResetTx, ReducerInput,
32 ShutdownGuard,
33};
34
35const SUSTAINED_FAILURE_THRESHOLD_SECS: i64 = 120;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60 cancel: CancellationToken,
61 cfg: AgentConfig,
62 clock: ClockHandle,
63 mut input_rx: mpsc::Receiver<ReducerInput>,
64 input_tx: mpsc::Sender<ReducerInput>,
65 activation_tx: ActivationIntentTx,
66 probe_reset_tx: ProbeResetTx,
67 outbound_queue: Arc<OutboundQueue>,
68 outbound_kick: OutboundKickTx,
69 shutdown_senders: Vec<oneshot::Sender<()>>,
70) {
71 let _shutdown_guard = ShutdownGuard(shutdown_senders);
72
73 let mut host_states: HashMap<nixfleet_proto::RolloutId, HostRolloutState> = HashMap::new();
74 let mut manifests: Option<SignedManifestSet> = None;
81
82 loop {
83 tokio::select! {
84 _ = cancel.cancelled() => {
85 tracing::info!(
86 target: "shutdown",
87 task = "agent_reducer",
88 "task shut down",
89 );
90 return;
91 }
92 maybe_input = input_rx.recv() => {
93 let Some(input) = maybe_input else { return };
94 let ctx = ApplierCtx {
95 cfg: &cfg,
96 clock: &clock,
97 input_tx: &input_tx,
98 activation_tx: &activation_tx,
99 probe_reset_tx: &probe_reset_tx,
100 outbound_queue: &outbound_queue,
101 outbound_kick: &outbound_kick,
102 };
103 handle_input(&mut host_states, &mut manifests, &clock, &ctx, input).await;
104 }
105 }
106 }
107}
108
109async fn handle_input(
110 host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
111 manifests: &mut Option<SignedManifestSet>,
112 clock: &ClockHandle,
113 ctx: &ApplierCtx<'_>,
114 input: ReducerInput,
115) {
116 match input {
117 ReducerInput::HostEvent { rollout_id, event } => {
118 run_host_event(host_states, manifests, clock, ctx, rollout_id, event).await;
119 }
120 ReducerInput::AgentAdvanceTick => {
121 run_advance_tick(host_states, manifests, clock, ctx).await;
122 }
123 ReducerInput::ManifestSetUpdated(set) => {
124 *manifests = Some(*set);
125 tracing::info!(
126 target: "agent_reducer",
127 "manifest cache refreshed",
128 );
129 }
130 ReducerInput::BootstrapHost(snapshot) => {
131 apply_bootstrap_snapshot(host_states, ctx, *snapshot).await;
132 }
133 }
134}
135
136async fn apply_bootstrap_snapshot(
170 host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
171 ctx: &ApplierCtx<'_>,
172 snapshot: nixfleet_proto::agent_wire::HostRolloutSnapshot,
173) {
174 let warm = host_states.contains_key(&snapshot.rollout_id);
175 let rollout_id = snapshot.rollout_id.clone();
176 let record = merge_snapshot_into_state(host_states.get(&rollout_id), snapshot);
177 tracing::info!(
178 target: "agent_reducer",
179 rollout_id = %record.rollout_id,
180 state = ?record.state,
181 target_closure = %record.target_closure,
182 warm,
183 probe_failure_first_at = ?record.probe_failure_first_at,
184 "bootstrap: rehydrating in-memory HostRolloutState from CP snapshot (LIFT #3)",
185 );
186 let effects = nixfleet_state_machine::rehydration_effects(&record);
187 host_states.insert(rollout_id, record);
188 for effect in effects {
189 apply_effect(ctx, effect).await;
190 }
191}
192
193fn merge_snapshot_into_state(
199 existing: Option<&HostRolloutState>,
200 snapshot: nixfleet_proto::agent_wire::HostRolloutSnapshot,
201) -> HostRolloutState {
202 use nixfleet_proto::HostRolloutState as WireState;
203 use nixfleet_state_machine::HostState;
204 let internal_state = match snapshot.state {
205 WireState::Pending => HostState::Pending,
206 WireState::Activating => HostState::Activating,
207 WireState::Deferred => HostState::Deferred,
208 WireState::Soaking => HostState::Soaking,
209 WireState::Converged => HostState::Converged,
210 WireState::Failed => HostState::Failed,
211 WireState::Reverted => HostState::Reverted,
212 };
213 HostRolloutState {
214 rollout_id: snapshot.rollout_id,
216 hostname: snapshot.hostname,
217 channel: snapshot.channel,
218 state: internal_state,
219 target_closure: snapshot.target_closure,
220 current_closure_at_dispatch: snapshot.current_closure_at_dispatch,
221 current_closure: snapshot.current_closure,
222 dispatched_at: snapshot.dispatched_at,
223 dispatch_acked_at: snapshot.dispatch_acked_at,
224 activation_started_at: snapshot.activation_started_at,
225 activation_completed_at: snapshot.activation_completed_at,
226 soak_due_at: snapshot.soak_due_at,
227 last_event_seq: snapshot.last_event_seq,
228 probes: existing.map(|e| e.probes.clone()).unwrap_or_default(),
232 probe_observed_first_at: existing.and_then(|e| e.probe_observed_first_at),
233 probe_failure_first_at: existing.and_then(|e| e.probe_failure_first_at),
234 activation_failed_at: existing.and_then(|e| e.activation_failed_at),
235 failed_at: existing.and_then(|e| e.failed_at),
236 converged_at: existing.and_then(|e| e.converged_at),
237 reverted_to: existing.and_then(|e| e.reverted_to.clone()),
238 reverted_at: existing.and_then(|e| e.reverted_at),
239 policy_applied: existing.and_then(|e| e.policy_applied),
240 }
241}
242
243async fn run_host_event(
244 host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
245 manifests: &Option<SignedManifestSet>,
246 clock: &ClockHandle,
247 ctx: &ApplierCtx<'_>,
248 rollout_id: nixfleet_proto::RolloutId,
249 event: Event,
250) {
251 let prior = host_states.get(&rollout_id).cloned();
267 let (state, policy_channel) = match (prior, &event) {
268 (Some(s), _) => {
269 let ch = s.channel.clone();
270 (s, ch)
271 }
272 (
273 None,
274 Event::LocalActivate {
275 target_closure,
276 soak_due_at,
277 ..
278 },
279 ) => {
280 let now = clock.now();
281 let state = bootstrap_pending_state(&rollout_id, target_closure, *soak_due_at, now);
282 (state, "<bootstrap>".to_string())
283 }
284 (None, _) => {
285 tracing::warn!(
286 target: "agent_reducer",
287 %rollout_id,
288 ?event,
289 "HostEvent for unknown rollout (no LocalActivate seen yet); dropping",
290 );
291 return;
292 }
293 };
294
295 let Some(policy) = resolve_policy(manifests, &state.channel) else {
296 tracing::warn!(
297 target: "agent_reducer",
298 %rollout_id,
299 channel = %state.channel,
300 policy_channel,
301 "HostEvent: rollout policy not cached yet; dropping event (longpoll will refill)",
302 );
303 return;
304 };
305
306 let next_seq = state.last_event_seq + 1;
309 let event = with_seq(event, next_seq);
310
311 let now = clock.now();
312 let (next_state, effects) = match nixfleet_state_machine::step(state, event, now, &policy) {
313 Ok(out) => out,
314 Err(err) => {
315 tracing::warn!(
316 target: "agent_reducer",
317 %rollout_id,
318 error = %err,
319 "step() rejected event — illegal transition or invariant violation",
320 );
321 return;
322 }
323 };
324 host_states.insert(rollout_id, next_state);
325
326 for effect in effects {
329 apply_effect(ctx, effect).await;
330 }
331}
332
333async fn run_advance_tick(
334 host_states: &mut HashMap<nixfleet_proto::RolloutId, HostRolloutState>,
335 manifests: &Option<SignedManifestSet>,
336 clock: &ClockHandle,
337 ctx: &ApplierCtx<'_>,
338) {
339 let now = clock.now();
340 let mut synth: Vec<(nixfleet_proto::RolloutId, Event)> = Vec::new();
343
344 for (rollout_id, state) in host_states.iter() {
348 if state.state != HostState::Soaking {
349 continue;
350 }
351 let Some(first_failed) = state.probe_failure_first_at else {
352 continue;
353 };
354 if (now - first_failed).num_seconds() < SUSTAINED_FAILURE_THRESHOLD_SECS {
355 continue;
356 }
357 let failing_probes = collect_failing_enforce_probes(&state.probes);
358 if failing_probes.is_empty() {
359 continue;
360 }
361 let policy_applied = resolve_policy(manifests, &state.channel)
362 .map(|p| p.on_health_failure)
363 .unwrap_or(nixfleet_proto::OnHealthFailure::Halt);
364 synth.push((
365 rollout_id.clone(),
366 Event::LocalSustainedFailureCrossed {
367 failed_at: now,
368 sustained_duration_secs: (now - first_failed).num_seconds() as u64,
369 failing_probes,
370 policy_applied,
371 seq: 0, },
373 ));
374 }
375
376 for (rollout_id, state) in host_states.iter() {
384 if state.state != HostState::Soaking {
385 continue;
386 }
387 let Some(soak_due_at) = state.soak_due_at else {
388 continue;
389 };
390 if now < soak_due_at {
391 continue;
392 }
393 let Some(current) = state.current_closure.as_ref() else {
394 continue;
395 };
396 if *current != state.target_closure {
397 continue;
398 }
399 if !all_enforce_probes_pass(&state.probes) {
400 continue;
401 }
402 synth.push((
403 rollout_id.clone(),
404 Event::LocalConvergedReached {
405 converged_at: now,
406 current_closure: current.clone(),
407 seq: 0, },
409 ));
410 }
411
412 for (rollout_id, event) in synth {
413 run_host_event(host_states, manifests, clock, ctx, rollout_id, event).await;
414 }
415}
416
417fn bootstrap_pending_state(
427 rollout_id: &nixfleet_proto::RolloutId,
428 target_closure: &str,
429 soak_due_at: chrono::DateTime<chrono::Utc>,
430 now: chrono::DateTime<chrono::Utc>,
431) -> HostRolloutState {
432 let channel = rollout_id.channel().to_string();
433 HostRolloutState::new_pending(
434 rollout_id.clone(),
435 "self".to_string(),
436 channel,
437 target_closure.to_string(),
438 now,
439 soak_due_at,
440 )
441}
442
443fn collect_failing_enforce_probes(
451 probes: &HashMap<String, nixfleet_state_machine::ProbeRecord>,
452) -> Vec<String> {
453 probes
454 .iter()
455 .filter(|(_, r)| {
456 r.status == nixfleet_state_machine::ProbeStatus::Fail
457 && matches!(r.mode, nixfleet_state_machine::ProbeMode::Enforce)
458 })
459 .map(|(name, _)| name.clone())
460 .collect()
461}
462
463fn all_enforce_probes_pass(probes: &HashMap<String, nixfleet_state_machine::ProbeRecord>) -> bool {
470 probes
471 .values()
472 .filter(|r| matches!(r.mode, nixfleet_state_machine::ProbeMode::Enforce))
473 .all(|r| r.status == nixfleet_state_machine::ProbeStatus::Pass)
474}
475
476fn resolve_policy(manifests: &Option<SignedManifestSet>, channel: &str) -> Option<RolloutPolicy> {
477 let m = manifests.as_ref()?;
478 let fleet = m.fleet();
479 let channel_entry = fleet.channels.get(channel)?;
480 fleet
481 .rollout_policies
482 .get(&channel_entry.rollout_policy)
483 .cloned()
484}
485
486fn with_seq(event: Event, seq: u64) -> Event {
490 match event {
491 Event::LocalActivate {
492 current_closure_at_dispatch,
493 target_closure,
494 received_at,
495 soak_due_at,
496 ..
497 } => Event::LocalActivate {
498 current_closure_at_dispatch,
499 target_closure,
500 received_at,
501 soak_due_at,
502 seq,
503 },
504 Event::LocalActivationStarted {
505 started_at,
506 switch_method,
507 ..
508 } => Event::LocalActivationStarted {
509 started_at,
510 switch_method,
511 seq,
512 },
513 Event::LocalActivationCompleted {
514 observed_current_closure,
515 exit_code,
516 completed_at,
517 ..
518 } => Event::LocalActivationCompleted {
519 observed_current_closure,
520 exit_code,
521 completed_at,
522 seq,
523 },
524 Event::LocalActivationFailed {
525 exit_code,
526 stderr_tail,
527 failed_at,
528 ..
529 } => Event::LocalActivationFailed {
530 exit_code,
531 stderr_tail,
532 failed_at,
533 seq,
534 },
535 Event::LocalProbeObservedFirst {
536 probe_name,
537 mode,
538 observed_at,
539 ..
540 } => Event::LocalProbeObservedFirst {
541 probe_name,
542 mode,
543 observed_at,
544 seq,
545 },
546 Event::LocalProbeResult {
547 probe_name,
548 mode,
549 status,
550 observed_at,
551 failure_reason,
552 sub_results,
553 ..
554 } => Event::LocalProbeResult {
555 probe_name,
556 mode,
557 status,
558 observed_at,
559 failure_reason,
560 sub_results,
561 seq,
562 },
563 Event::LocalProbeFailureFirst {
564 probe_name,
565 mode,
566 first_failed_at,
567 ..
568 } => Event::LocalProbeFailureFirst {
569 probe_name,
570 mode,
571 first_failed_at,
572 seq,
573 },
574 Event::LocalSustainedFailureCrossed {
575 failed_at,
576 sustained_duration_secs,
577 failing_probes,
578 policy_applied,
579 ..
580 } => Event::LocalSustainedFailureCrossed {
581 failed_at,
582 sustained_duration_secs,
583 failing_probes,
584 policy_applied,
585 seq,
586 },
587 Event::LocalRollbackCompleted {
588 reverted_to_closure,
589 exit_code,
590 completed_at,
591 ..
592 } => Event::LocalRollbackCompleted {
593 reverted_to_closure,
594 exit_code,
595 completed_at,
596 seq,
597 },
598 Event::LocalConvergedReached {
599 converged_at,
600 current_closure,
601 ..
602 } => Event::LocalConvergedReached {
603 converged_at,
604 current_closure,
605 seq,
606 },
607 Event::LocalProbeTopologyDeclared {
608 probes,
609 declared_at,
610 ..
611 } => Event::LocalProbeTopologyDeclared {
612 probes,
613 declared_at,
614 seq,
615 },
616 other => other,
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use chrono::TimeZone;
627
628 fn fixed_now() -> chrono::DateTime<chrono::Utc> {
629 chrono::Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap()
630 }
631
632 #[test]
633 fn bootstrap_extracts_channel_from_canonical_rollout_id() {
634 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
638 let soak = fixed_now() + chrono::Duration::minutes(5);
639 let state = bootstrap_pending_state(&rid, "closure-X", soak, fixed_now());
640 assert_eq!(
641 state.channel, "stable",
642 "channel derived from rollout_id.channel(), not from manifest scan",
643 );
644 }
645
646 #[test]
647 fn bootstrap_uses_caller_provided_target_closure_not_host_set_first() {
648 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
658 let soak = fixed_now() + chrono::Duration::minutes(5);
659 let state = bootstrap_pending_state(&rid, "RIGHT-closure", soak, fixed_now());
660 assert_eq!(
661 state.target_closure, "RIGHT-closure",
662 "target_closure comes from the caller (manifest by-hostname lookup), not from inside the helper",
663 );
664 }
665
666 #[test]
667 fn bootstrap_uses_caller_provided_soak_due_at_not_hardcoded() {
668 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
677 let now = fixed_now();
678 let dispatched_soak = now + chrono::Duration::seconds(0);
679 let state = bootstrap_pending_state(&rid, "closure-X", dispatched_soak, now);
680 assert_eq!(state.state, HostState::Pending);
681 assert_eq!(
682 state.soak_due_at,
683 Some(dispatched_soak),
684 "soak_due_at comes from the caller (CP-dispatched value), not a hardcoded default",
685 );
686
687 let custom_soak = now + chrono::Duration::minutes(17);
691 let state2 = bootstrap_pending_state(&rid, "closure-X", custom_soak, now);
692 assert_eq!(state2.soak_due_at, Some(custom_soak));
693 }
694
695 #[test]
696 fn bootstrap_target_closure_independent_of_manifests_snapshot() {
697 let rid = nixfleet_proto::RolloutId::new("edge", "f8c46e472deadbeef");
711 let now = fixed_now();
712 let soak = now;
713
714 let stale_from_cache = "STALE-target-from-old-manifest".to_string();
718 let fresh_from_dispatch = "FRESH-target-from-just-verified-dispatch".to_string();
719
720 let state_stale = bootstrap_pending_state(&rid, &stale_from_cache, soak, now);
721 assert_eq!(state_stale.target_closure, stale_from_cache);
722
723 let state_fresh = bootstrap_pending_state(&rid, &fresh_from_dispatch, soak, now);
724 assert_eq!(state_fresh.target_closure, fresh_from_dispatch);
725
726 assert_ne!(state_stale.target_closure, state_fresh.target_closure);
729 }
730
731 fn probe_record(
732 status: nixfleet_state_machine::ProbeStatus,
733 mode: nixfleet_state_machine::ProbeMode,
734 ) -> nixfleet_state_machine::ProbeRecord {
735 nixfleet_state_machine::ProbeRecord {
736 status,
737 mode,
738 last_observed_at: fixed_now(),
739 last_pass_at: None,
740 failure_reason: None,
741 }
742 }
743
744 #[test]
745 fn collect_failing_enforce_probes_includes_failing_enforce() {
746 let mut probes = HashMap::new();
747 probes.insert(
748 "enforce-fail".to_string(),
749 probe_record(
750 nixfleet_state_machine::ProbeStatus::Fail,
751 nixfleet_state_machine::ProbeMode::Enforce,
752 ),
753 );
754 let failing = collect_failing_enforce_probes(&probes);
755 assert_eq!(
756 failing,
757 vec!["enforce-fail".to_string()],
758 "failing enforce-mode probe MUST gate per RFC-0007 §3.4",
759 );
760 }
761
762 #[test]
763 fn collect_failing_enforce_probes_excludes_failing_observe_and_disabled() {
764 let mut probes = HashMap::new();
772 probes.insert(
773 "observe-fail".to_string(),
774 probe_record(
775 nixfleet_state_machine::ProbeStatus::Fail,
776 nixfleet_state_machine::ProbeMode::Observe,
777 ),
778 );
779 probes.insert(
780 "disabled-fail".to_string(),
781 probe_record(
782 nixfleet_state_machine::ProbeStatus::Fail,
783 nixfleet_state_machine::ProbeMode::Disabled,
784 ),
785 );
786 probes.insert(
787 "enforce-pass".to_string(),
788 probe_record(
789 nixfleet_state_machine::ProbeStatus::Pass,
790 nixfleet_state_machine::ProbeMode::Enforce,
791 ),
792 );
793 let failing = collect_failing_enforce_probes(&probes);
794 assert!(
795 failing.is_empty(),
796 "observe + disabled failures and passing enforce probes MUST NOT gate; got: {failing:?}",
797 );
798 }
799
800 #[test]
801 fn all_enforce_probes_pass_with_empty_map_is_true() {
802 let probes: HashMap<String, nixfleet_state_machine::ProbeRecord> = HashMap::new();
803 assert!(
804 all_enforce_probes_pass(&probes),
805 "empty probe map satisfies convergence vacuously — matches shared verifier semantic",
806 );
807 }
808
809 #[test]
810 fn all_enforce_probes_pass_with_passing_enforce_only_is_true() {
811 let mut probes = HashMap::new();
812 probes.insert(
813 "nginx-version".to_string(),
814 probe_record(
815 nixfleet_state_machine::ProbeStatus::Pass,
816 nixfleet_state_machine::ProbeMode::Enforce,
817 ),
818 );
819 assert!(all_enforce_probes_pass(&probes));
820 }
821
822 #[test]
823 fn all_enforce_probes_pass_ignores_failing_observe_and_disabled() {
824 let mut probes = HashMap::new();
828 probes.insert(
829 "nginx-version".to_string(),
830 probe_record(
831 nixfleet_state_machine::ProbeStatus::Pass,
832 nixfleet_state_machine::ProbeMode::Enforce,
833 ),
834 );
835 probes.insert(
836 "evidence-nis2".to_string(),
837 probe_record(
838 nixfleet_state_machine::ProbeStatus::Fail,
839 nixfleet_state_machine::ProbeMode::Observe,
840 ),
841 );
842 probes.insert(
843 "suppressed-probe".to_string(),
844 probe_record(
845 nixfleet_state_machine::ProbeStatus::Fail,
846 nixfleet_state_machine::ProbeMode::Disabled,
847 ),
848 );
849 assert!(
850 all_enforce_probes_pass(&probes),
851 "observe + disabled failures MUST NOT gate convergence per RFC-0007 §3.3",
852 );
853 }
854
855 #[test]
856 fn all_enforce_probes_pass_with_failing_enforce_is_false() {
857 let mut probes = HashMap::new();
858 probes.insert(
859 "nginx-version".to_string(),
860 probe_record(
861 nixfleet_state_machine::ProbeStatus::Fail,
862 nixfleet_state_machine::ProbeMode::Enforce,
863 ),
864 );
865 assert!(!all_enforce_probes_pass(&probes));
866 }
867
868 fn make_snapshot(
869 rid: &nixfleet_proto::RolloutId,
870 wire_state: nixfleet_proto::HostRolloutState,
871 last_event_seq: u64,
872 ) -> nixfleet_proto::agent_wire::HostRolloutSnapshot {
873 let now = fixed_now();
874 nixfleet_proto::agent_wire::HostRolloutSnapshot {
875 rollout_id: rid.clone(),
876 hostname: "h1".to_string(),
877 channel: "stable".to_string(),
878 state: wire_state,
879 target_closure: "target-closure-X".to_string(),
880 current_closure_at_dispatch: Some("prior-closure".to_string()),
881 current_closure: Some("target-closure-X".to_string()),
882 dispatched_at: now,
883 dispatch_acked_at: Some(now),
884 activation_started_at: Some(now),
885 activation_completed_at: Some(now),
886 soak_due_at: Some(now + chrono::Duration::minutes(5)),
887 last_event_seq,
888 }
889 }
890
891 #[test]
892 fn merge_snapshot_cold_rehydration_defaults_agent_local_fields() {
893 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
896 let snapshot =
897 make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 7);
898 let record = merge_snapshot_into_state(None, snapshot);
899 assert_eq!(record.state, HostState::Soaking);
900 assert_eq!(record.target_closure, "target-closure-X");
901 assert_eq!(record.last_event_seq, 7);
902 assert!(record.probes.is_empty(), "cold rehydration starts with empty probe map");
903 assert_eq!(record.probe_failure_first_at, None);
904 assert_eq!(record.probe_observed_first_at, None);
905 assert_eq!(record.failed_at, None);
906 assert_eq!(record.converged_at, None);
907 }
908
909 #[test]
910 fn merge_snapshot_warm_rehydration_preserves_probe_failure_timer() {
911 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
918 let failure_stamp = fixed_now() - chrono::Duration::seconds(90);
919 let mut existing = HostRolloutState::new_pending(
920 rid.clone(),
921 "h1".to_string(),
922 "stable".to_string(),
923 "target-closure-X".to_string(),
924 fixed_now() - chrono::Duration::minutes(2),
925 fixed_now() + chrono::Duration::minutes(5),
926 );
927 existing.state = HostState::Soaking;
928 existing.probe_failure_first_at = Some(failure_stamp);
929 existing.probe_observed_first_at = Some(failure_stamp);
930 existing.probes.insert(
931 "tcp-fail".to_string(),
932 probe_record(
933 nixfleet_state_machine::ProbeStatus::Fail,
934 nixfleet_state_machine::ProbeMode::Enforce,
935 ),
936 );
937
938 let snapshot =
939 make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 12);
940 let record = merge_snapshot_into_state(Some(&existing), snapshot);
941
942 assert_eq!(record.state, HostState::Soaking, "canonical state still comes from snapshot");
943 assert_eq!(record.last_event_seq, 12, "last_event_seq still comes from snapshot");
944 assert_eq!(
945 record.probe_failure_first_at,
946 Some(failure_stamp),
947 "probe_failure_first_at MUST survive warm rehydration so sustained-failure timer can accumulate",
948 );
949 assert_eq!(
950 record.probe_observed_first_at,
951 Some(failure_stamp),
952 "probe_observed_first_at also preserved (agent-local, not in wire snapshot)",
953 );
954 assert!(
955 record.probes.contains_key("tcp-fail"),
956 "existing probe map preserved so probe scheduler doesn't lose mid-flight tracking",
957 );
958 }
959
960 #[test]
961 fn merge_snapshot_warm_rehydration_advances_state_but_preserves_probes() {
962 let rid = nixfleet_proto::RolloutId::new("stable", "abc1234deadbeef");
968 let mut existing = HostRolloutState::new_pending(
969 rid.clone(),
970 "h1".to_string(),
971 "stable".to_string(),
972 "target-closure-X".to_string(),
973 fixed_now(),
974 fixed_now() + chrono::Duration::minutes(5),
975 );
976 existing.state = HostState::Pending;
977 existing.probe_failure_first_at = Some(fixed_now() - chrono::Duration::seconds(30));
978
979 let snapshot =
980 make_snapshot(&rid, nixfleet_proto::HostRolloutState::Soaking, 5);
981 let record = merge_snapshot_into_state(Some(&existing), snapshot);
982
983 assert_eq!(record.state, HostState::Soaking);
984 assert_eq!(
985 record.probe_failure_first_at,
986 existing.probe_failure_first_at,
987 "probe timer survives state advance",
988 );
989 }
990}