1use std::sync::Arc;
33
34use nixfleet_proto::RolloutId;
35use nixfleet_proto::clock::ClockHandle;
36use nixfleet_reconciler::planner::compute_soak_due_at;
37use nixfleet_reconciler::planner_types::{PlanAction, SignedManifestSet};
38use nixfleet_state_machine::rollout::{
39 self as rollout_sm, RolloutEffect, RolloutEvent, RolloutRecord,
40};
41use nixfleet_state_machine::{
42 Effect, HostRolloutState, HostState, LogLevel, OutboundAgentEvent, ProbeStatus,
43};
44use serde_json::{Value, json};
45
46use super::EventLogTx;
47use crate::db::Db;
48use crate::db::dispatch_queue::QueuedDispatch;
49use crate::db::event_log::{EventLogEntry, EventLogKind};
50use crate::db::probe_failures;
51use crate::server::AppState;
52
53pub struct ApplierCtx<'a> {
56 pub state: &'a Arc<AppState>,
57 pub manifests: &'a SignedManifestSet,
58 pub clock: &'a ClockHandle,
59 pub event_log_tx: &'a EventLogTx,
60}
61
62pub async fn apply_plan_action(ctx: &ApplierCtx<'_>, action: PlanAction) {
66 let Some(db) = ctx.state.db.as_ref() else {
67 tracing::warn!(
68 target: "cp_runtime",
69 action = ?action,
70 "apply_plan_action: no DB attached (in-memory mode); skipping",
71 );
72 return;
73 };
74 let now = ctx.clock.now();
75 match action {
76 PlanAction::OpenRollout {
77 rollout_id,
78 channel,
79 target_ref,
80 } => open_rollout(ctx, db, now, &rollout_id, &channel, &target_ref).await,
81 PlanAction::QueueDispatch {
82 host,
83 rollout,
84 target_closure,
85 soak_due_at,
86 } => queue_dispatch(ctx, db, now, &host, &rollout, &target_closure, soak_due_at).await,
87 PlanAction::RecordHaltLifted { channel } => record_halt_lifted(ctx, now, &channel).await,
88 PlanAction::DeferDispatch {
89 host,
90 rollout,
91 gate,
92 reason,
93 } => defer_dispatch(ctx, now, &host, &rollout, gate, &reason).await,
94 }
95}
96
97async fn open_rollout(
98 ctx: &ApplierCtx<'_>,
99 db: &Arc<Db>,
100 now: chrono::DateTime<chrono::Utc>,
101 rollout_id: &RolloutId,
102 channel: &str,
103 target_ref: &str,
104) {
105 let in_flight_predecessors: Vec<RolloutId> = db
115 .rollouts()
116 .list_active()
117 .map(|rs| {
118 rs.into_iter()
119 .filter(|r| r.channel == channel && &r.rollout_id != rollout_id)
120 .map(|r| r.rollout_id)
121 .collect()
122 })
123 .unwrap_or_default();
124
125 if let Err(err) = db.rollouts().record_rollout_opened(
126 rollout_id.as_str(),
127 channel,
128 target_ref,
129 now,
130 None,
133 ) {
134 tracing::error!(
135 target: "cp_runtime",
136 rollout_id = %rollout_id,
137 %channel,
138 error = %err,
139 "OpenRollout: rollouts insert failed",
140 );
141 return;
142 }
143
144 append_rollout_event(
149 ctx,
150 now,
151 rollout_id,
152 &RolloutEvent::RolloutOpened {
153 rollout_id: rollout_id.clone(),
154 channel: channel.to_string(),
155 target_ref: target_ref.to_string(),
156 at: now,
157 },
158 )
159 .await;
160
161 for predecessor in in_flight_predecessors {
169 process_rollout_event(
170 ctx,
171 db,
172 now,
173 RolloutEvent::SuccessorOpened {
174 superseded_rollout_id: predecessor,
175 successor_rollout_id: rollout_id.clone(),
176 at: now,
177 },
178 )
179 .await;
180 }
181
182 let Some(manifest) = ctx.manifests.rollouts.get(channel).map(|v| v.inner()) else {
183 tracing::warn!(
184 target: "cp_runtime",
185 %rollout_id,
186 %channel,
187 "OpenRollout: manifest absent from cached SignedManifestSet; skipping per-host record creation",
188 );
189 return;
190 };
191
192 const DEFAULT_SOAK_MINUTES: u32 = 60;
199 let fleet = ctx.manifests.fleet();
200 let policy = fleet
201 .channels
202 .get(channel)
203 .and_then(|c| fleet.rollout_policies.get(&c.rollout_policy));
204
205 let records = db.host_rollout_records();
206 for hw in &manifest.host_set {
207 let soak_minutes = policy
208 .and_then(|p| p.waves.get(hw.wave_index as usize))
209 .map(|w| w.soak_minutes)
210 .unwrap_or(DEFAULT_SOAK_MINUTES);
211 let soak_due_at = compute_soak_due_at(now, soak_minutes);
212 let pending = HostRolloutState::new_pending(
213 rollout_id.clone(),
214 hw.hostname.clone(),
215 channel.to_string(),
216 hw.target_closure.clone(),
217 now,
218 soak_due_at,
219 );
220 if let Err(err) = records.upsert(&pending) {
221 tracing::error!(
222 target: "cp_runtime",
223 rollout_id = %rollout_id,
224 hostname = %hw.hostname,
225 error = %err,
226 "OpenRollout: host_rollout_records upsert failed",
227 );
228 continue;
229 }
230 process_rollout_event(
233 ctx,
234 db,
235 now,
236 RolloutEvent::HostJoined {
237 rollout_id: rollout_id.clone(),
238 host_id: hw.hostname.clone(),
239 wave: hw.wave_index,
240 at: now,
241 },
242 )
243 .await;
244 }
245
246 append_event_log(
247 ctx,
248 now,
249 None,
250 Some(rollout_id.as_str()),
251 EventLogKind::PlanAction,
252 json!({
253 "action": "OpenRollout",
254 "rollout_id": rollout_id,
255 "channel": channel,
256 "target_ref": target_ref,
257 "hosts": manifest.host_set.iter().map(|h| &h.hostname).collect::<Vec<_>>(),
258 }),
259 )
260 .await;
261}
262
263async fn queue_dispatch(
264 ctx: &ApplierCtx<'_>,
265 db: &Arc<Db>,
266 now: chrono::DateTime<chrono::Utc>,
267 host: &str,
268 rollout: &RolloutId,
269 target_closure: &str,
270 soak_due_at: chrono::DateTime<chrono::Utc>,
271) {
272 let queued = QueuedDispatch {
273 hostname: host.to_string(),
274 rollout_id: rollout.clone(),
275 target_closure: target_closure.to_string(),
276 soak_due_at,
277 enqueued_at: now,
278 };
279 if let Err(err) = db.dispatch_queue().upsert(&queued) {
280 tracing::error!(
281 target: "cp_runtime",
282 %host,
283 %rollout,
284 error = %err,
285 "QueueDispatch: dispatch_queue upsert failed",
286 );
287 return;
288 }
289 let _ = ctx.state.dispatch_kick.send(());
294 append_event_log(
295 ctx,
296 now,
297 Some(host),
298 Some(rollout.as_str()),
299 EventLogKind::PlanAction,
300 json!({
301 "action": "QueueDispatch",
302 "host": host,
303 "rollout": rollout,
304 "target_closure": target_closure,
305 "soak_due_at": soak_due_at.to_rfc3339(),
306 }),
307 )
308 .await;
309}
310
311async fn record_halt_lifted(
316 ctx: &ApplierCtx<'_>,
317 now: chrono::DateTime<chrono::Utc>,
318 channel: &str,
319) {
320 append_event_log(
321 ctx,
322 now,
323 None,
324 None,
325 EventLogKind::PlanAction,
326 json!({
327 "action": "RecordHaltLifted",
328 "channel": channel,
329 }),
330 )
331 .await;
332}
333
334async fn defer_dispatch(
335 ctx: &ApplierCtx<'_>,
336 now: chrono::DateTime<chrono::Utc>,
337 host: &str,
338 rollout: &RolloutId,
339 gate: &'static str,
340 reason: &str,
341) {
342 append_event_log(
343 ctx,
344 now,
345 Some(host),
346 Some(rollout.as_str()),
347 EventLogKind::GateDecision,
348 json!({
349 "gate": gate,
350 "reason": reason,
351 "host": host,
352 "rollout": rollout,
353 }),
354 )
355 .await;
356}
357
358pub async fn apply_effect(ctx: &ApplierCtx<'_>, effect: Effect) {
367 if ctx.state.db.is_none() {
372 tracing::debug!(
373 target: "cp_runtime",
374 effect = ?effect_kind(&effect),
375 "apply_effect: in-memory mode; mutating arms are no-ops",
376 );
377 }
378 let now = ctx.clock.now();
379 match effect {
380 Effect::RemoteQueueDispatch {
384 host,
385 rollout_id,
386 target_closure,
387 soak_due_at,
388 } => {
389 if let Some(db) = ctx.state.db.as_ref() {
390 queue_dispatch(
391 ctx,
392 db,
393 now,
394 &host,
395 &rollout_id,
396 &target_closure,
397 soak_due_at,
398 )
399 .await;
400 }
401 }
402 Effect::RemoteInsertQuarantine { channel, closure } => {
403 if let Some(db) = ctx.state.db.as_ref()
404 && let Err(err) = db.quarantined_closures().insert(
405 &channel, &closure, now,
406 None,
409 )
410 {
411 tracing::error!(
412 target: "cp_runtime",
413 %channel,
414 %closure,
415 error = %err,
416 "RemoteInsertQuarantine: insert failed",
417 );
418 return;
419 }
420 append_event_log(
421 ctx,
422 now,
423 None,
424 None,
425 EventLogKind::Effect,
426 json!({
427 "effect": "RemoteInsertQuarantine",
428 "channel": channel,
429 "closure": closure,
430 }),
431 )
432 .await;
433 }
434 Effect::RemoteOpenRolloutRecord {
435 rollout_id,
436 channel,
437 host,
438 } => {
439 if let Some(db) = ctx.state.db.as_ref() {
440 open_one_rollout_record(ctx, db, now, &rollout_id, &channel, &host).await;
441 }
442 }
443 Effect::RemoteAppendEventLog {
444 host,
445 rollout_id,
446 payload,
447 } => {
448 append_event_log(
449 ctx,
450 now,
451 Some(&host),
452 Some(rollout_id.as_str()),
453 EventLogKind::AgentEvent,
454 outbound_event_to_json(&payload),
455 )
456 .await;
457
458 if let nixfleet_state_machine::OutboundAgentEvent::ProbeResult {
467 probe_name,
468 mode,
469 status,
470 observed_at,
471 sub_results,
472 ..
473 } = &payload
474 && let Some(db) = ctx.state.db.as_ref()
475 {
476 let rows: Vec<probe_failures::ProbeFailureInsert<'_>> = match sub_results {
492 Some(srs) if !srs.is_empty() => srs
493 .iter()
494 .filter(|sr| {
495 matches!(sr.status, nixfleet_state_machine::ProbeStatus::Fail)
496 && matches!(
497 sr.effective_mode,
498 nixfleet_state_machine::ProbeMode::Enforce
499 )
500 })
501 .map(|sr| probe_failures::ProbeFailureInsert {
502 rollout_id: rollout_id.as_str(),
503 host_id: &host,
504 probe_name,
505 control_id: Some(&sr.control_id),
506 framework: Some(&sr.framework),
507 observed_at: *observed_at,
508 })
509 .collect(),
510 _ => {
511 if matches!(mode, nixfleet_state_machine::ProbeMode::Enforce)
512 && matches!(status, nixfleet_state_machine::ProbeStatus::Fail)
513 {
514 vec![probe_failures::ProbeFailureInsert {
515 rollout_id: rollout_id.as_str(),
516 host_id: &host,
517 probe_name,
518 control_id: None,
519 framework: None,
520 observed_at: *observed_at,
521 }]
522 } else {
523 Vec::new()
524 }
525 }
526 };
527 if !rows.is_empty()
528 && let Err(err) = db.probe_failures().insert_many(&rows)
529 {
530 tracing::warn!(
531 target: "cp_applier",
532 rollout_id = %rollout_id,
533 host = %host,
534 probe = %probe_name,
535 error = %err,
536 "probe_failures insert failed",
537 );
538 }
539 }
540 }
541
542 Effect::RecordTransition {
546 host,
547 rollout_id,
548 from,
549 to,
550 at,
551 } => {
552 append_event_log(
553 ctx,
554 now,
555 Some(&host),
556 Some(rollout_id.as_str()),
557 EventLogKind::Effect,
558 json!({
559 "effect": "RecordTransition",
560 "host": host,
561 "rollout_id": rollout_id.as_str(),
562 "from": host_state_str(from),
563 "to": host_state_str(to),
564 "at": at.to_rfc3339(),
565 }),
566 )
567 .await;
568 }
569 Effect::EmitMetric {
570 name,
571 labels,
572 value,
573 } => {
574 tracing::debug!(
580 target: "cp_runtime",
581 metric = %name,
582 ?labels,
583 value,
584 "EmitMetric",
585 );
586 }
587 Effect::EmitLog {
588 level,
589 target,
590 message,
591 fields,
592 } => {
593 match level {
599 LogLevel::Trace => tracing::trace!(
600 target: "cp_runtime_emitted",
601 emitted_target = target,
602 ?fields,
603 "{message}",
604 ),
605 LogLevel::Debug => tracing::debug!(
606 target: "cp_runtime_emitted",
607 emitted_target = target,
608 ?fields,
609 "{message}",
610 ),
611 LogLevel::Info => tracing::info!(
612 target: "cp_runtime_emitted",
613 emitted_target = target,
614 ?fields,
615 "{message}",
616 ),
617 LogLevel::Warn => tracing::warn!(
618 target: "cp_runtime_emitted",
619 emitted_target = target,
620 ?fields,
621 "{message}",
622 ),
623 LogLevel::Error => tracing::error!(
624 target: "cp_runtime_emitted",
625 emitted_target = target,
626 ?fields,
627 "{message}",
628 ),
629 }
630 }
631
632 Effect::LocalFireSwitch { .. }
636 | Effect::LocalFireRollbackTo { .. }
637 | Effect::LocalResetProbeCache { .. }
638 | Effect::LocalEmitEvent { .. } => {
639 tracing::error!(
640 target: "cp_runtime",
641 effect = ?effect_kind(&effect),
642 "apply_effect: agent-only Local* effect reached the CP applier — \
643 reducer state-machine defect. Dropping.",
644 );
645 }
646 }
647}
648
649async fn open_one_rollout_record(
650 ctx: &ApplierCtx<'_>,
651 db: &Arc<Db>,
652 now: chrono::DateTime<chrono::Utc>,
653 rollout_id: &RolloutId,
654 channel: &str,
655 host: &str,
656) {
657 let Some(manifest) = ctx.manifests.rollouts.get(channel).map(|v| v.inner()) else {
661 tracing::warn!(
662 target: "cp_runtime",
663 %rollout_id,
664 %channel,
665 %host,
666 "RemoteOpenRolloutRecord: rollout manifest absent from cached set",
667 );
668 return;
669 };
670 let Some(hw) = manifest.host_set.iter().find(|h| h.hostname == host) else {
671 tracing::warn!(
672 target: "cp_runtime",
673 %rollout_id,
674 %channel,
675 %host,
676 "RemoteOpenRolloutRecord: host not in manifest host_set",
677 );
678 return;
679 };
680 const DEFAULT_SOAK_MINUTES: u32 = 60;
681 let fleet = ctx.manifests.fleet();
682 let soak_minutes = fleet
683 .channels
684 .get(channel)
685 .and_then(|c| fleet.rollout_policies.get(&c.rollout_policy))
686 .and_then(|p| p.waves.get(hw.wave_index as usize))
687 .map(|w| w.soak_minutes)
688 .unwrap_or(DEFAULT_SOAK_MINUTES);
689 let soak_due_at = compute_soak_due_at(now, soak_minutes);
690 let pending = HostRolloutState::new_pending(
691 rollout_id.clone(),
692 host.to_string(),
693 channel.to_string(),
694 hw.target_closure.clone(),
695 now,
696 soak_due_at,
697 );
698 if let Err(err) = db.host_rollout_records().upsert(&pending) {
699 tracing::error!(
700 target: "cp_runtime",
701 %rollout_id,
702 %host,
703 error = %err,
704 "RemoteOpenRolloutRecord: upsert failed",
705 );
706 return;
707 }
708 append_event_log(
709 ctx,
710 now,
711 Some(host),
712 Some(rollout_id.as_str()),
713 EventLogKind::Effect,
714 json!({
715 "effect": "RemoteOpenRolloutRecord",
716 "rollout_id": rollout_id.as_str(),
717 "channel": channel,
718 "host": host,
719 }),
720 )
721 .await;
722}
723
724async fn append_event_log(
733 ctx: &ApplierCtx<'_>,
734 ts: chrono::DateTime<chrono::Utc>,
735 host_id: Option<&str>,
736 rollout_id: Option<&str>,
737 kind: EventLogKind,
738 payload: Value,
739) {
740 let entry = EventLogEntry {
741 kind,
742 ts,
743 host_id: host_id.map(str::to_string),
744 rollout_id: rollout_id.map(str::to_string),
745 payload: payload.to_string(),
746 };
747 if let Err(err) = ctx.event_log_tx.send(entry).await {
748 tracing::error!(
749 target: "cp_runtime",
750 ?kind,
751 host_id,
752 rollout_id,
753 error = %err,
754 "append_event_log: writer channel closed",
755 );
756 }
757}
758
759pub(super) async fn process_rollout_event(
777 ctx: &ApplierCtx<'_>,
778 db: &Arc<Db>,
779 now: chrono::DateTime<chrono::Utc>,
780 event: RolloutEvent,
781) {
782 let rollout_id: RolloutId = rollout_event_rollout_id(&event).clone();
783
784 let row = match db.rollouts().state(rollout_id.as_str()) {
785 Ok(Some(row)) => row,
786 Ok(None) => {
787 tracing::debug!(
788 target: "cp_runtime",
789 rollout_id = %rollout_id,
790 event_kind = event.kind(),
791 "process_rollout_event: unknown rollout; dropping",
792 );
793 return;
794 }
795 Err(err) => {
796 tracing::error!(
797 target: "cp_runtime",
798 rollout_id = %rollout_id,
799 error = %err,
800 "process_rollout_event: state() query failed",
801 );
802 return;
803 }
804 };
805
806 let record = RolloutRecord {
807 rollout_id: row.rollout_id,
808 channel: row.channel,
809 target_ref: row.target_ref,
810 state: row.state,
811 current_wave: row.current_wave,
812 opened_event_log_seq: row.opened_event_log_seq,
813 last_transition_event_log_seq: row.last_transition_event_log_seq,
814 opened_at: row.opened_at,
815 terminal_at: row.terminal_at,
816 superseded_at: row.superseded_at,
817 };
818
819 append_event_log(
820 ctx,
821 now,
822 None,
823 Some(rollout_id.as_str()),
824 EventLogKind::RolloutEvent,
825 rollout_event_to_json(&event),
826 )
827 .await;
828
829 match rollout_sm::step(record, event.clone(), now) {
830 Ok((_new_record, effects)) => {
831 for effect in effects {
832 apply_rollout_effect(ctx, db, now, effect).await;
833 }
834 }
835 Err(err) => {
836 tracing::warn!(
837 target: "cp_runtime",
838 rollout_id = %rollout_id,
839 event_kind = event.kind(),
840 error = %err,
841 "process_rollout_event: rollout step() rejected",
842 );
843 }
844 }
845}
846
847async fn apply_rollout_effect(
848 ctx: &ApplierCtx<'_>,
849 db: &Arc<Db>,
850 now: chrono::DateTime<chrono::Utc>,
851 effect: RolloutEffect,
852) {
853 match effect {
854 RolloutEffect::RecordRolloutTransition {
855 rollout_id,
856 from,
857 to,
858 at,
859 } => {
860 if let Err(err) =
861 db.rollouts()
862 .record_rollout_transition(rollout_id.as_str(), to, at, None)
863 {
864 tracing::error!(
865 target: "cp_runtime",
866 rollout_id = %rollout_id,
867 from = from.as_db_str(),
868 to = to.as_db_str(),
869 error = %err,
870 "RolloutEffect::RecordRolloutTransition: db write failed",
871 );
872 }
873 append_event_log(
874 ctx,
875 now,
876 None,
877 Some(rollout_id.as_str()),
878 EventLogKind::Effect,
879 json!({
880 "effect": "RecordRolloutTransition",
881 "rolloutId": rollout_id.as_str(),
882 "from": from.as_db_str(),
883 "to": to.as_db_str(),
884 "at": at.to_rfc3339(),
885 }),
886 )
887 .await;
888 }
889 RolloutEffect::UpdateCurrentWave { rollout_id, wave } => {
890 if let Err(err) = db
891 .rollouts()
892 .set_current_wave(rollout_id.as_str(), wave, None)
893 {
894 tracing::error!(
895 target: "cp_runtime",
896 rollout_id = %rollout_id,
897 wave,
898 error = %err,
899 "RolloutEffect::UpdateCurrentWave: db write failed",
900 );
901 }
902 }
903 RolloutEffect::InsertQuarantineFromRollout {
904 channel,
905 closure_hash,
906 } => {
907 if let Err(err) = db
908 .quarantined_closures()
909 .insert(&channel, &closure_hash, now, None)
910 {
911 tracing::error!(
912 target: "cp_runtime",
913 channel,
914 closure_hash,
915 error = %err,
916 "RolloutEffect::InsertQuarantineFromRollout: db write failed",
917 );
918 }
919 }
920 RolloutEffect::SchedulePruning {
921 rollout_id,
922 delay_seconds,
923 } => {
924 tracing::debug!(
929 target: "cp_runtime",
930 rollout_id = %rollout_id,
931 delay_seconds,
932 "RolloutEffect::SchedulePruning: deferred to v0.2.x follow-up",
933 );
934 }
935 }
936}
937
938fn rollout_event_rollout_id(event: &RolloutEvent) -> &RolloutId {
939 match event {
940 RolloutEvent::RolloutOpened { rollout_id, .. }
941 | RolloutEvent::HostJoined { rollout_id, .. }
942 | RolloutEvent::HostStateChanged { rollout_id, .. }
943 | RolloutEvent::WaveAdvanced { rollout_id, .. }
944 | RolloutEvent::RolloutTerminal { rollout_id, .. }
945 | RolloutEvent::RetentionExpired { rollout_id, .. }
946 | RolloutEvent::OperatorClearance { rollout_id, .. } => rollout_id,
947 RolloutEvent::SuccessorOpened {
951 superseded_rollout_id,
952 ..
953 } => superseded_rollout_id,
954 }
955}
956
957fn rollout_event_to_json(event: &RolloutEvent) -> Value {
958 match event {
959 RolloutEvent::RolloutOpened {
960 rollout_id,
961 channel,
962 target_ref,
963 at,
964 } => json!({
965 "kind": "RolloutOpened",
966 "rolloutId": rollout_id,
967 "channel": channel,
968 "targetRef": target_ref,
969 "at": at.to_rfc3339(),
970 }),
971 RolloutEvent::HostJoined {
972 rollout_id,
973 host_id,
974 wave,
975 at,
976 } => json!({
977 "kind": "HostJoined",
978 "rolloutId": rollout_id,
979 "hostId": host_id,
980 "wave": wave,
981 "at": at.to_rfc3339(),
982 }),
983 RolloutEvent::HostStateChanged {
984 rollout_id,
985 host_id,
986 from,
987 to,
988 at,
989 } => json!({
990 "kind": "HostStateChanged",
991 "rolloutId": rollout_id,
992 "hostId": host_id,
993 "from": host_state_str(*from),
994 "to": host_state_str(*to),
995 "at": at.to_rfc3339(),
996 }),
997 RolloutEvent::WaveAdvanced {
998 rollout_id,
999 from_wave,
1000 to_wave,
1001 at,
1002 } => json!({
1003 "kind": "WaveAdvanced",
1004 "rolloutId": rollout_id,
1005 "fromWave": from_wave,
1006 "toWave": to_wave,
1007 "at": at.to_rfc3339(),
1008 }),
1009 RolloutEvent::RolloutTerminal { rollout_id, at } => json!({
1010 "kind": "RolloutTerminal",
1011 "rolloutId": rollout_id,
1012 "at": at.to_rfc3339(),
1013 }),
1014 RolloutEvent::SuccessorOpened {
1015 superseded_rollout_id,
1016 successor_rollout_id,
1017 at,
1018 } => json!({
1019 "kind": "SuccessorOpened",
1020 "supersededRolloutId": superseded_rollout_id,
1021 "successorRolloutId": successor_rollout_id,
1022 "at": at.to_rfc3339(),
1023 }),
1024 RolloutEvent::RetentionExpired { rollout_id, at } => json!({
1025 "kind": "RetentionExpired",
1026 "rolloutId": rollout_id,
1027 "at": at.to_rfc3339(),
1028 }),
1029 RolloutEvent::OperatorClearance {
1030 rollout_id,
1031 operator,
1032 reason,
1033 at,
1034 } => json!({
1035 "kind": "OperatorClearance",
1036 "rolloutId": rollout_id,
1037 "operator": operator,
1038 "reason": reason,
1039 "at": at.to_rfc3339(),
1040 }),
1041 }
1042}
1043
1044pub(super) async fn append_rollout_event(
1049 ctx: &ApplierCtx<'_>,
1050 now: chrono::DateTime<chrono::Utc>,
1051 rollout_id: &RolloutId,
1052 event: &RolloutEvent,
1053) {
1054 append_event_log(
1055 ctx,
1056 now,
1057 None,
1058 Some(rollout_id.as_str()),
1059 EventLogKind::RolloutEvent,
1060 rollout_event_to_json(event),
1061 )
1062 .await;
1063}
1064
1065fn host_state_str(s: HostState) -> &'static str {
1066 match s {
1067 HostState::Pending => "Pending",
1068 HostState::Activating => "Activating",
1069 HostState::Deferred => "Deferred",
1070 HostState::Soaking => "Soaking",
1071 HostState::Converged => "Converged",
1072 HostState::Failed => "Failed",
1073 HostState::Reverted => "Reverted",
1074 }
1075}
1076
1077fn probe_status_str(s: ProbeStatus) -> &'static str {
1078 match s {
1079 ProbeStatus::Pass => "pass",
1080 ProbeStatus::Fail => "fail",
1081 }
1082}
1083
1084fn probe_mode_str(m: nixfleet_state_machine::ProbeMode) -> &'static str {
1085 use nixfleet_state_machine::ProbeMode;
1086 match m {
1087 ProbeMode::Enforce => "enforce",
1088 ProbeMode::Observe => "observe",
1089 ProbeMode::Disabled => "disabled",
1090 }
1091}
1092
1093fn effect_kind(e: &Effect) -> &'static str {
1094 match e {
1095 Effect::LocalFireSwitch { .. } => "LocalFireSwitch",
1096 Effect::LocalFireRollbackTo { .. } => "LocalFireRollbackTo",
1097 Effect::LocalResetProbeCache { .. } => "LocalResetProbeCache",
1098 Effect::LocalEmitEvent { .. } => "LocalEmitEvent",
1099 Effect::RemoteQueueDispatch { .. } => "RemoteQueueDispatch",
1100 Effect::RemoteInsertQuarantine { .. } => "RemoteInsertQuarantine",
1101 Effect::RemoteOpenRolloutRecord { .. } => "RemoteOpenRolloutRecord",
1102 Effect::RemoteAppendEventLog { .. } => "RemoteAppendEventLog",
1103 Effect::RecordTransition { .. } => "RecordTransition",
1104 Effect::EmitMetric { .. } => "EmitMetric",
1105 Effect::EmitLog { .. } => "EmitLog",
1106 }
1107}
1108
1109fn outbound_event_to_json(payload: &OutboundAgentEvent) -> Value {
1114 match payload {
1115 OutboundAgentEvent::DispatchAck {
1116 current_closure_at_dispatch,
1117 received_at,
1118 seq,
1119 } => json!({
1120 "kind": "DispatchAck",
1121 "currentClosureAtDispatch": current_closure_at_dispatch,
1122 "receivedAt": received_at.to_rfc3339(),
1123 "seq": seq,
1124 }),
1125 OutboundAgentEvent::ActivationStarted {
1126 started_at,
1127 switch_method,
1128 seq,
1129 } => json!({
1130 "kind": "ActivationStarted",
1131 "startedAt": started_at.to_rfc3339(),
1132 "switchMethod": switch_method,
1133 "seq": seq,
1134 }),
1135 OutboundAgentEvent::ActivationCompleted {
1136 observed_current_closure,
1137 exit_code,
1138 completed_at,
1139 seq,
1140 } => json!({
1141 "kind": "ActivationCompleted",
1142 "observedCurrentClosure": observed_current_closure,
1143 "exitCode": exit_code,
1144 "completedAt": completed_at.to_rfc3339(),
1145 "seq": seq,
1146 }),
1147 OutboundAgentEvent::ActivationFailed {
1148 exit_code,
1149 stderr_tail,
1150 failed_at,
1151 seq,
1152 } => json!({
1153 "kind": "ActivationFailed",
1154 "exitCode": exit_code,
1155 "stderrTail": stderr_tail,
1156 "failedAt": failed_at.to_rfc3339(),
1157 "seq": seq,
1158 }),
1159 OutboundAgentEvent::ActivationDeferred {
1160 component,
1161 deferred_at,
1162 seq,
1163 } => json!({
1164 "kind": "ActivationDeferred",
1165 "component": component,
1166 "deferredAt": deferred_at.to_rfc3339(),
1167 "seq": seq,
1168 }),
1169 OutboundAgentEvent::ProbeTopologyDeclared {
1170 probes,
1171 declared_at,
1172 seq,
1173 } => json!({
1174 "kind": "ProbeTopologyDeclared",
1175 "probes": probes.iter().map(|e| json!({
1176 "probeName": e.probe_name,
1177 "kind": e.kind,
1178 "mode": probe_mode_str(e.mode),
1179 })).collect::<Vec<_>>(),
1180 "declaredAt": declared_at.to_rfc3339(),
1181 "seq": seq,
1182 }),
1183 OutboundAgentEvent::ProbeObservedFirst {
1184 probe_name,
1185 mode,
1186 observed_at,
1187 seq,
1188 } => json!({
1189 "kind": "ProbeObservedFirst",
1190 "probeName": probe_name,
1191 "mode": probe_mode_str(*mode),
1192 "observedAt": observed_at.to_rfc3339(),
1193 "seq": seq,
1194 }),
1195 OutboundAgentEvent::ProbeResult {
1196 probe_name,
1197 mode,
1198 status,
1199 observed_at,
1200 failure_reason,
1201 sub_results,
1202 seq,
1203 } => json!({
1204 "kind": "ProbeResult",
1205 "probeName": probe_name,
1206 "mode": probe_mode_str(*mode),
1207 "status": probe_status_str(*status),
1208 "observedAt": observed_at.to_rfc3339(),
1209 "failureReason": failure_reason,
1210 "subResults": sub_results.as_ref().map(|v| v.iter().map(|sr| json!({
1211 "controlId": sr.control_id,
1212 "status": probe_status_str(sr.status),
1213 "framework": sr.framework,
1214 "article": sr.article,
1215 "effectiveMode": probe_mode_str(sr.effective_mode),
1216 "overrideReason": sr.override_reason,
1217 })).collect::<Vec<_>>()),
1218 "seq": seq,
1219 }),
1220 OutboundAgentEvent::ProbeFailureFirst {
1221 probe_name,
1222 mode,
1223 first_failed_at,
1224 seq,
1225 } => json!({
1226 "kind": "ProbeFailureFirst",
1227 "probeName": probe_name,
1228 "mode": probe_mode_str(*mode),
1229 "firstFailedAt": first_failed_at.to_rfc3339(),
1230 "seq": seq,
1231 }),
1232 OutboundAgentEvent::Failed {
1233 failed_at,
1234 sustained_duration_secs,
1235 failing_probes,
1236 policy_applied,
1237 seq,
1238 } => json!({
1239 "kind": "Failed",
1240 "failedAt": failed_at.to_rfc3339(),
1241 "sustainedDurationSecs": sustained_duration_secs,
1242 "failingProbes": failing_probes,
1243 "policyApplied": policy_applied.to_string(),
1244 "seq": seq,
1245 }),
1246 OutboundAgentEvent::RollbackComplete {
1247 reverted_to_closure,
1248 exit_code,
1249 completed_at,
1250 seq,
1251 } => json!({
1252 "kind": "RollbackComplete",
1253 "revertedToClosure": reverted_to_closure,
1254 "exitCode": exit_code,
1255 "completedAt": completed_at.to_rfc3339(),
1256 "seq": seq,
1257 }),
1258 OutboundAgentEvent::Converged {
1259 converged_at,
1260 current_closure,
1261 seq,
1262 } => json!({
1263 "kind": "Converged",
1264 "convergedAt": converged_at.to_rfc3339(),
1265 "currentClosure": current_closure,
1266 "seq": seq,
1267 }),
1268 }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::*;
1274 use chrono::TimeZone;
1275 use nixfleet_state_machine::{ProbeMode, ProbeStatus, ProbeSubResult};
1276
1277 #[test]
1285 fn probe_result_json_carries_per_control_audit_fields() {
1286 let payload = OutboundAgentEvent::ProbeResult {
1287 probe_name: "evidence-nis2-essential".into(),
1288 mode: ProbeMode::Enforce,
1289 status: ProbeStatus::Fail,
1290 observed_at: chrono::Utc.with_ymd_and_hms(2026, 5, 18, 18, 5, 27).unwrap(),
1291 failure_reason: Some("encryption-at-rest failed".into()),
1292 sub_results: Some(vec![ProbeSubResult {
1293 control_id: "encryption-at-rest".into(),
1294 status: ProbeStatus::Fail,
1295 framework: "nis2-essential".into(),
1296 article: Some("21(h)".into()),
1297 effective_mode: ProbeMode::Observe,
1298 override_reason: Some("Q2 audit window: observe-mode default".into()),
1299 }]),
1300 seq: 7,
1301 };
1302 let v = outbound_event_to_json(&payload);
1303 let sub = &v["subResults"][0];
1304 assert_eq!(sub["controlId"], "encryption-at-rest");
1305 assert_eq!(sub["framework"], "nis2-essential");
1306 assert_eq!(sub["article"], "21(h)");
1307 assert_eq!(sub["effectiveMode"], "observe");
1308 assert_eq!(sub["overrideReason"], "Q2 audit window: observe-mode default");
1309 }
1310}