nixfleet_control_plane/runtime/
applier.rs

1//! Imperative shell for the pure planner + reducer (RFC-0006 §7.2).
2//!
3//! Two entrypoints:
4//!
5//! - [`apply_plan_action`] executes one [`PlanAction`] emitted by
6//!   `nixfleet_reconciler::plan_next`. Side effects: DB writes (open
7//!   rollout, queue dispatch, mark terminal, record gate decision),
8//!   event_log appends.
9//!
10//! - [`apply_effect`] executes one [`Effect`] emitted by
11//!   `nixfleet_state_machine::step`. The CP applier handles the `Remote*`
12//!   variants + the three shared variants. `Local*` variants are agent-only
13//!   and reaching the CP applier indicates a code defect — the applier
14//!   logs and drops them rather than panicking, so a malformed event from
15//!   a broken peer cannot crash the runtime.
16//!
17//! Both entrypoints are async because they touch the DB pool. They must
18//! not call `step()` or `plan_next()` themselves — only the reducer task
19//! does that, per the one-MPSC-one-mutator invariant in `runtime::mod`.
20//!
21//! Error policy: per-action errors are logged and swallowed. A single bad
22//! DB write must not poison the reducer task; the next `plan_next()` tick
23//! re-emits the same action if its preconditions still hold (the planner
24//! is pure of the applier's failure history).
25//!
26//! Event-log routing: every event_log append goes through the bounded MPSC
27//! to the [`super::event_log_writer`] task. The applier never calls
28//! `Db::event_log().append()` directly — that keeps SQLite latency out of
29//! the reducer's critical section and surfaces writer hiccups as
30//! backpressure on the reducer's input.
31
32use std::sync::Arc;
33
34use nixfleet_proto::RolloutId;
35use nixfleet_proto::clock::ClockHandle;
36use nixfleet_reconciler::planner::compute_soak_due_at;
37use nixfleet_reconciler::planner_types::{PlanAction, SignedManifestSet};
38use nixfleet_state_machine::rollout::{
39    self as rollout_sm, RolloutEffect, RolloutEvent, RolloutRecord,
40};
41use nixfleet_state_machine::{
42    Effect, HostRolloutState, HostState, LogLevel, OutboundAgentEvent, ProbeStatus,
43};
44use serde_json::{Value, json};
45
46use super::EventLogTx;
47use crate::db::Db;
48use crate::db::dispatch_queue::QueuedDispatch;
49use crate::db::event_log::{EventLogEntry, EventLogKind};
50use crate::db::probe_failures;
51use crate::server::AppState;
52
53/// Per-call context. Bundles the four borrows every applier path needs so
54/// individual handlers stay readable without the parameter-pyramid.
55pub struct ApplierCtx<'a> {
56    pub state: &'a Arc<AppState>,
57    pub manifests: &'a SignedManifestSet,
58    pub clock: &'a ClockHandle,
59    pub event_log_tx: &'a EventLogTx,
60}
61
62/// Execute one `PlanAction`. Errors are logged + swallowed: a single bad
63/// DB write must not poison the reducer task. The next plan_next() tick
64/// will re-emit the same action if its preconditions still hold.
65pub async fn apply_plan_action(ctx: &ApplierCtx<'_>, action: PlanAction) {
66    let Some(db) = ctx.state.db.as_ref() else {
67        tracing::warn!(
68            target: "cp_runtime",
69            action = ?action,
70            "apply_plan_action: no DB attached (in-memory mode); skipping",
71        );
72        return;
73    };
74    let now = ctx.clock.now();
75    match action {
76        PlanAction::OpenRollout {
77            rollout_id,
78            channel,
79            target_ref,
80        } => open_rollout(ctx, db, now, &rollout_id, &channel, &target_ref).await,
81        PlanAction::QueueDispatch {
82            host,
83            rollout,
84            target_closure,
85            soak_due_at,
86        } => queue_dispatch(ctx, db, now, &host, &rollout, &target_closure, soak_due_at).await,
87        PlanAction::RecordHaltLifted { channel } => record_halt_lifted(ctx, now, &channel).await,
88        PlanAction::DeferDispatch {
89            host,
90            rollout,
91            gate,
92            reason,
93        } => defer_dispatch(ctx, now, &host, &rollout, gate, &reason).await,
94    }
95}
96
97async fn open_rollout(
98    ctx: &ApplierCtx<'_>,
99    db: &Arc<Db>,
100    now: chrono::DateTime<chrono::Utc>,
101    rollout_id: &RolloutId,
102    channel: &str,
103    target_ref: &str,
104) {
105    // Snapshot in-flight predecessors on this channel before the new
106    // rollout's INSERT. Each predecessor gets a `SuccessorOpened`
107    // routed through the rollout reducer below; the reducer's existing
108    // arms transition `Opening | Active | Converging | Terminal |
109    // Reverted | Failed → Superseded` and emit a
110    // `RolloutEffect::RecordRolloutTransition` that the applier writes
111    // via `record_rollout_transition`. This replaces the Phase-10b
112    // inline `UPDATE` supersession in `record_rollout_opened` (now a
113    // pure insert) — single concern per applier step (RFC-0004 §3).
114    let in_flight_predecessors: Vec<RolloutId> = db
115        .rollouts()
116        .list_active()
117        .map(|rs| {
118            rs.into_iter()
119                .filter(|r| r.channel == channel && &r.rollout_id != rollout_id)
120                .map(|r| r.rollout_id)
121                .collect()
122        })
123        .unwrap_or_default();
124
125    if let Err(err) = db.rollouts().record_rollout_opened(
126        rollout_id.as_str(),
127        channel,
128        target_ref,
129        now,
130        // FK NULL-able under v0.2.1 baseline (RFC-0008 §6.1 item 3 +
131        // v0.2.1-followups #1).
132        None,
133    ) {
134        tracing::error!(
135            target: "cp_runtime",
136            rollout_id = %rollout_id,
137            %channel,
138            error = %err,
139            "OpenRollout: rollouts insert failed",
140        );
141        return;
142    }
143
144    // RolloutOpened is the creation marker; the rollout row is already
145    // in state=Opening from the INSERT above. Reducer has nothing to
146    // validate (no source state), so we append the event_log row
147    // directly without going through process_rollout_event.
148    append_rollout_event(
149        ctx,
150        now,
151        rollout_id,
152        &RolloutEvent::RolloutOpened {
153            rollout_id: rollout_id.clone(),
154            channel: channel.to_string(),
155            target_ref: target_ref.to_string(),
156            at: now,
157        },
158    )
159    .await;
160
161    // Drive supersession through the reducer for each predecessor —
162    // process_rollout_event writes the `rollout_event` event_log entry
163    // AND interprets the resulting RecordRolloutTransition effect
164    // (UPDATE state='Superseded', superseded_at=now via
165    // `record_rollout_transition`). Pruned predecessors are absorbed
166    // by the reducer's `pruned.rs` arm → IllegalForState → logged +
167    // dropped; already-Superseded predecessors are idempotent no-ops.
168    for predecessor in in_flight_predecessors {
169        process_rollout_event(
170            ctx,
171            db,
172            now,
173            RolloutEvent::SuccessorOpened {
174                superseded_rollout_id: predecessor,
175                successor_rollout_id: rollout_id.clone(),
176                at: now,
177            },
178        )
179        .await;
180    }
181
182    let Some(manifest) = ctx.manifests.rollouts.get(channel).map(|v| v.inner()) else {
183        tracing::warn!(
184            target: "cp_runtime",
185            %rollout_id,
186            %channel,
187            "OpenRollout: manifest absent from cached SignedManifestSet; skipping per-host record creation",
188        );
189        return;
190    };
191
192    // Per-wave soak resolution. Path:
193    //   manifests.fleet → channels[channel].rollout_policy
194    //                  → rollout_policies[policy].waves[wave_index].soak_minutes
195    // A missing link falls back to `DEFAULT_SOAK_MINUTES`; this can only
196    // happen against a malformed manifest, which we'd rather flag with a
197    // warn + continue than crash the runtime.
198    const DEFAULT_SOAK_MINUTES: u32 = 60;
199    let fleet = ctx.manifests.fleet();
200    let policy = fleet
201        .channels
202        .get(channel)
203        .and_then(|c| fleet.rollout_policies.get(&c.rollout_policy));
204
205    let records = db.host_rollout_records();
206    for hw in &manifest.host_set {
207        let soak_minutes = policy
208            .and_then(|p| p.waves.get(hw.wave_index as usize))
209            .map(|w| w.soak_minutes)
210            .unwrap_or(DEFAULT_SOAK_MINUTES);
211        let soak_due_at = compute_soak_due_at(now, soak_minutes);
212        let pending = HostRolloutState::new_pending(
213            rollout_id.clone(),
214            hw.hostname.clone(),
215            channel.to_string(),
216            hw.target_closure.clone(),
217            now,
218            soak_due_at,
219        );
220        if let Err(err) = records.upsert(&pending) {
221            tracing::error!(
222                target: "cp_runtime",
223                rollout_id = %rollout_id,
224                hostname = %hw.hostname,
225                error = %err,
226                "OpenRollout: host_rollout_records upsert failed",
227            );
228            continue;
229        }
230        // HostJoined drives the rollout reducer's Opening → Active
231        // transition on the first host (RFC-0008 §3).
232        process_rollout_event(
233            ctx,
234            db,
235            now,
236            RolloutEvent::HostJoined {
237                rollout_id: rollout_id.clone(),
238                host_id: hw.hostname.clone(),
239                wave: hw.wave_index,
240                at: now,
241            },
242        )
243        .await;
244    }
245
246    append_event_log(
247        ctx,
248        now,
249        None,
250        Some(rollout_id.as_str()),
251        EventLogKind::PlanAction,
252        json!({
253            "action": "OpenRollout",
254            "rollout_id": rollout_id,
255            "channel": channel,
256            "target_ref": target_ref,
257            "hosts": manifest.host_set.iter().map(|h| &h.hostname).collect::<Vec<_>>(),
258        }),
259    )
260    .await;
261}
262
263async fn queue_dispatch(
264    ctx: &ApplierCtx<'_>,
265    db: &Arc<Db>,
266    now: chrono::DateTime<chrono::Utc>,
267    host: &str,
268    rollout: &RolloutId,
269    target_closure: &str,
270    soak_due_at: chrono::DateTime<chrono::Utc>,
271) {
272    let queued = QueuedDispatch {
273        hostname: host.to_string(),
274        rollout_id: rollout.clone(),
275        target_closure: target_closure.to_string(),
276        soak_due_at,
277        enqueued_at: now,
278    };
279    if let Err(err) = db.dispatch_queue().upsert(&queued) {
280        tracing::error!(
281            target: "cp_runtime",
282            %host,
283            %rollout,
284            error = %err,
285            "QueueDispatch: dispatch_queue upsert failed",
286        );
287        return;
288    }
289    // Wake any /v1/agent/dispatch long-pollers parked for this host. The
290    // watch channel collapses bursts to one wake; a `send_replace` here
291    // is preferable to `send` because subscribers may not have read the
292    // last value yet and we don't want to block the applier on that.
293    let _ = ctx.state.dispatch_kick.send(());
294    append_event_log(
295        ctx,
296        now,
297        Some(host),
298        Some(rollout.as_str()),
299        EventLogKind::PlanAction,
300        json!({
301            "action": "QueueDispatch",
302            "host": host,
303            "rollout": rollout,
304            "target_closure": target_closure,
305            "soak_due_at": soak_due_at.to_rfc3339(),
306        }),
307    )
308    .await;
309}
310
311// No `mark_channel_terminal` helper: terminal transitions are driven
312// by the rollout reducer via `RolloutEffect::RecordRolloutTransition`
313// (RFC-0008 §5 + §7).
314
315async fn record_halt_lifted(
316    ctx: &ApplierCtx<'_>,
317    now: chrono::DateTime<chrono::Utc>,
318    channel: &str,
319) {
320    append_event_log(
321        ctx,
322        now,
323        None,
324        None,
325        EventLogKind::PlanAction,
326        json!({
327            "action": "RecordHaltLifted",
328            "channel": channel,
329        }),
330    )
331    .await;
332}
333
334async fn defer_dispatch(
335    ctx: &ApplierCtx<'_>,
336    now: chrono::DateTime<chrono::Utc>,
337    host: &str,
338    rollout: &RolloutId,
339    gate: &'static str,
340    reason: &str,
341) {
342    append_event_log(
343        ctx,
344        now,
345        Some(host),
346        Some(rollout.as_str()),
347        EventLogKind::GateDecision,
348        json!({
349            "gate": gate,
350            "reason": reason,
351            "host": host,
352            "rollout": rollout,
353        }),
354    )
355    .await;
356}
357
358/// Execute one `Effect` emitted by [`nixfleet_state_machine::step`].
359///
360/// Variant routing (RFC-0006 §9):
361/// - `Local*` (4 variants) — agent-only. CP receiving one is a defect; log
362///   at `error` and return. Never panic — a buggy peer must not crash CP.
363/// - `Remote*` (5 variants) — CP-only. Handled here.
364/// - Shared (3 variants: `RecordTransition`, `EmitMetric`, `EmitLog`) —
365///   handled here too, identically to the agent's applier.
366pub async fn apply_effect(ctx: &ApplierCtx<'_>, effect: Effect) {
367    // In-memory mode (no `--db-path`): mutating arms become no-ops, but
368    // shared arms (metric/log/record_transition) still emit via the
369    // tracing layer and the event_log writer task (drains silently when
370    // there's no DB). Single warn at entry beats a noisy per-arm one.
371    if ctx.state.db.is_none() {
372        tracing::debug!(
373            target: "cp_runtime",
374            effect = ?effect_kind(&effect),
375            "apply_effect: in-memory mode; mutating arms are no-ops",
376        );
377    }
378    let now = ctx.clock.now();
379    match effect {
380        // ─────────────────────────────────────────────────────────────
381        // CP-only Remote* effects.
382        // ─────────────────────────────────────────────────────────────
383        Effect::RemoteQueueDispatch {
384            host,
385            rollout_id,
386            target_closure,
387            soak_due_at,
388        } => {
389            if let Some(db) = ctx.state.db.as_ref() {
390                queue_dispatch(
391                    ctx,
392                    db,
393                    now,
394                    &host,
395                    &rollout_id,
396                    &target_closure,
397                    soak_due_at,
398                )
399                .await;
400            }
401        }
402        Effect::RemoteInsertQuarantine { channel, closure } => {
403            if let Some(db) = ctx.state.db.as_ref()
404                && let Err(err) = db.quarantined_closures().insert(
405                    &channel, &closure, now,
406                    // FK populated by Phase 10b's rollout reducer co-write
407                    // (RFC-0008 §6.1 item 3 + §6.4).
408                    None,
409                )
410            {
411                tracing::error!(
412                    target: "cp_runtime",
413                    %channel,
414                    %closure,
415                    error = %err,
416                    "RemoteInsertQuarantine: insert failed",
417                );
418                return;
419            }
420            append_event_log(
421                ctx,
422                now,
423                None,
424                None,
425                EventLogKind::Effect,
426                json!({
427                    "effect": "RemoteInsertQuarantine",
428                    "channel": channel,
429                    "closure": closure,
430                }),
431            )
432            .await;
433        }
434        Effect::RemoteOpenRolloutRecord {
435            rollout_id,
436            channel,
437            host,
438        } => {
439            if let Some(db) = ctx.state.db.as_ref() {
440                open_one_rollout_record(ctx, db, now, &rollout_id, &channel, &host).await;
441            }
442        }
443        Effect::RemoteAppendEventLog {
444            host,
445            rollout_id,
446            payload,
447        } => {
448            append_event_log(
449                ctx,
450                now,
451                Some(&host),
452                Some(rollout_id.as_str()),
453                EventLogKind::AgentEvent,
454                outbound_event_to_json(&payload),
455            )
456            .await;
457
458            // Enforce-mode probe Fail → write a `probe_failures` row
459            // per sub_result (or one row with control_id=NULL for
460            // non-evidence enforce-fail). Single transaction with the
461            // event_log row above is the contract (RFC-0007 §7.2); for
462            // 9b we trail the event_log append and accept eventual
463            // consistency within milliseconds — the writer task is
464            // bounded-mpsc, so a crash between rows is a known small
465            // window operators monitor via the prune-timer's metric.
466            if let nixfleet_state_machine::OutboundAgentEvent::ProbeResult {
467                probe_name,
468                mode,
469                status,
470                observed_at,
471                sub_results,
472                ..
473            } = &payload
474                && let Some(db) = ctx.state.db.as_ref()
475            {
476                // Per-control gate (RFC-0007 §3.4 + §7.2): each
477                // sub_result carries its own `effective_mode` resolved
478                // by the agent's probe runner against the probe's
479                // `controlOverrides` / `controls` declaration. Only
480                // `Enforce`-mode failures land in `probe_failures`;
481                // `Observe`-mode failures stay in `event_log` for
482                // visibility but do not gate. `Disabled` controls
483                // are dropped at the agent before emission so they
484                // never reach here.
485                //
486                // For non-evidence probes (HTTP / TCP / exec — no
487                // sub_results) the probe-level `mode` is the
488                // effective mode. The existing pre-LIFT behaviour
489                // (probe-level Enforce + Fail → one row with
490                // control_id = NULL) holds.
491                let rows: Vec<probe_failures::ProbeFailureInsert<'_>> = match sub_results {
492                    Some(srs) if !srs.is_empty() => srs
493                        .iter()
494                        .filter(|sr| {
495                            matches!(sr.status, nixfleet_state_machine::ProbeStatus::Fail)
496                                && matches!(
497                                    sr.effective_mode,
498                                    nixfleet_state_machine::ProbeMode::Enforce
499                                )
500                        })
501                        .map(|sr| probe_failures::ProbeFailureInsert {
502                            rollout_id: rollout_id.as_str(),
503                            host_id: &host,
504                            probe_name,
505                            control_id: Some(&sr.control_id),
506                            framework: Some(&sr.framework),
507                            observed_at: *observed_at,
508                        })
509                        .collect(),
510                    _ => {
511                        if matches!(mode, nixfleet_state_machine::ProbeMode::Enforce)
512                            && matches!(status, nixfleet_state_machine::ProbeStatus::Fail)
513                        {
514                            vec![probe_failures::ProbeFailureInsert {
515                                rollout_id: rollout_id.as_str(),
516                                host_id: &host,
517                                probe_name,
518                                control_id: None,
519                                framework: None,
520                                observed_at: *observed_at,
521                            }]
522                        } else {
523                            Vec::new()
524                        }
525                    }
526                };
527                if !rows.is_empty()
528                    && let Err(err) = db.probe_failures().insert_many(&rows)
529                {
530                    tracing::warn!(
531                        target: "cp_applier",
532                        rollout_id = %rollout_id,
533                        host = %host,
534                        probe = %probe_name,
535                        error = %err,
536                        "probe_failures insert failed",
537                    );
538                }
539            }
540        }
541
542        // ─────────────────────────────────────────────────────────────
543        // Shared effects.
544        // ─────────────────────────────────────────────────────────────
545        Effect::RecordTransition {
546            host,
547            rollout_id,
548            from,
549            to,
550            at,
551        } => {
552            append_event_log(
553                ctx,
554                now,
555                Some(&host),
556                Some(rollout_id.as_str()),
557                EventLogKind::Effect,
558                json!({
559                    "effect": "RecordTransition",
560                    "host": host,
561                    "rollout_id": rollout_id.as_str(),
562                    "from": host_state_str(from),
563                    "to": host_state_str(to),
564                    "at": at.to_rfc3339(),
565                }),
566            )
567            .await;
568        }
569        Effect::EmitMetric {
570            name,
571            labels,
572            value,
573        } => {
574            // The CP-side metrics surface is feature-gated and uses typed
575            // helpers (`record_compliance_event`, `record_gate_block`).
576            // The reducer emits generic name+labels+value; we log at debug
577            // so the values are visible in JSON logs (Loki captures these),
578            // and Phase 8 routes specific names through the typed surface.
579            tracing::debug!(
580                target: "cp_runtime",
581                metric = %name,
582                ?labels,
583                value,
584                "EmitMetric",
585            );
586        }
587        Effect::EmitLog {
588            level,
589            target,
590            message,
591            fields,
592        } => {
593            // tracing's `target:` arg requires a string literal at the
594            // macro site; the reducer's `target` is a `&'static str` but
595            // not a literal. We log under "cp_runtime_emitted" and surface
596            // the reducer-emitted target as a field. Loki dashboards that
597            // care can filter on `emitted_target=...`.
598            match level {
599                LogLevel::Trace => tracing::trace!(
600                    target: "cp_runtime_emitted",
601                    emitted_target = target,
602                    ?fields,
603                    "{message}",
604                ),
605                LogLevel::Debug => tracing::debug!(
606                    target: "cp_runtime_emitted",
607                    emitted_target = target,
608                    ?fields,
609                    "{message}",
610                ),
611                LogLevel::Info => tracing::info!(
612                    target: "cp_runtime_emitted",
613                    emitted_target = target,
614                    ?fields,
615                    "{message}",
616                ),
617                LogLevel::Warn => tracing::warn!(
618                    target: "cp_runtime_emitted",
619                    emitted_target = target,
620                    ?fields,
621                    "{message}",
622                ),
623                LogLevel::Error => tracing::error!(
624                    target: "cp_runtime_emitted",
625                    emitted_target = target,
626                    ?fields,
627                    "{message}",
628                ),
629            }
630        }
631
632        // ─────────────────────────────────────────────────────────────
633        // Local* — agent-only. Reaching CP is a code defect; log + drop.
634        // ─────────────────────────────────────────────────────────────
635        Effect::LocalFireSwitch { .. }
636        | Effect::LocalFireRollbackTo { .. }
637        | Effect::LocalResetProbeCache { .. }
638        | Effect::LocalEmitEvent { .. } => {
639            tracing::error!(
640                target: "cp_runtime",
641                effect = ?effect_kind(&effect),
642                "apply_effect: agent-only Local* effect reached the CP applier — \
643                 reducer state-machine defect. Dropping.",
644            );
645        }
646    }
647}
648
649async fn open_one_rollout_record(
650    ctx: &ApplierCtx<'_>,
651    db: &Arc<Db>,
652    now: chrono::DateTime<chrono::Utc>,
653    rollout_id: &RolloutId,
654    channel: &str,
655    host: &str,
656) {
657    // Resolve the per-host target_closure + soak from the cached manifests.
658    // Absent manifest means the reducer's cache drifted past where this
659    // host's rollout still lives — log + skip.
660    let Some(manifest) = ctx.manifests.rollouts.get(channel).map(|v| v.inner()) else {
661        tracing::warn!(
662            target: "cp_runtime",
663            %rollout_id,
664            %channel,
665            %host,
666            "RemoteOpenRolloutRecord: rollout manifest absent from cached set",
667        );
668        return;
669    };
670    let Some(hw) = manifest.host_set.iter().find(|h| h.hostname == host) else {
671        tracing::warn!(
672            target: "cp_runtime",
673            %rollout_id,
674            %channel,
675            %host,
676            "RemoteOpenRolloutRecord: host not in manifest host_set",
677        );
678        return;
679    };
680    const DEFAULT_SOAK_MINUTES: u32 = 60;
681    let fleet = ctx.manifests.fleet();
682    let soak_minutes = fleet
683        .channels
684        .get(channel)
685        .and_then(|c| fleet.rollout_policies.get(&c.rollout_policy))
686        .and_then(|p| p.waves.get(hw.wave_index as usize))
687        .map(|w| w.soak_minutes)
688        .unwrap_or(DEFAULT_SOAK_MINUTES);
689    let soak_due_at = compute_soak_due_at(now, soak_minutes);
690    let pending = HostRolloutState::new_pending(
691        rollout_id.clone(),
692        host.to_string(),
693        channel.to_string(),
694        hw.target_closure.clone(),
695        now,
696        soak_due_at,
697    );
698    if let Err(err) = db.host_rollout_records().upsert(&pending) {
699        tracing::error!(
700            target: "cp_runtime",
701            %rollout_id,
702            %host,
703            error = %err,
704            "RemoteOpenRolloutRecord: upsert failed",
705        );
706        return;
707    }
708    append_event_log(
709        ctx,
710        now,
711        Some(host),
712        Some(rollout_id.as_str()),
713        EventLogKind::Effect,
714        json!({
715            "effect": "RemoteOpenRolloutRecord",
716            "rollout_id": rollout_id.as_str(),
717            "channel": channel,
718            "host": host,
719        }),
720    )
721    .await;
722}
723
724/// Send an entry to the bounded MPSC drained by
725/// [`super::event_log_writer`]. Backpressure (full channel) blocks the
726/// caller via `await` — that's the desired propagation per the audit-log
727/// no-fail-open contract.
728///
729/// If the writer task has died (closed channel), log + return rather than
730/// panic — the runtime is being torn down and losing a few tail entries
731/// is acceptable.
732async fn append_event_log(
733    ctx: &ApplierCtx<'_>,
734    ts: chrono::DateTime<chrono::Utc>,
735    host_id: Option<&str>,
736    rollout_id: Option<&str>,
737    kind: EventLogKind,
738    payload: Value,
739) {
740    let entry = EventLogEntry {
741        kind,
742        ts,
743        host_id: host_id.map(str::to_string),
744        rollout_id: rollout_id.map(str::to_string),
745        payload: payload.to_string(),
746    };
747    if let Err(err) = ctx.event_log_tx.send(entry).await {
748        tracing::error!(
749            target: "cp_runtime",
750            ?kind,
751            host_id,
752            rollout_id,
753            error = %err,
754            "append_event_log: writer channel closed",
755        );
756    }
757}
758
759/// Step the rollout reducer with the given event and apply its effects.
760///
761/// Reads the current `rollouts` row, builds a `RolloutRecord`, steps the
762/// pure reducer (RFC-0008 §3), then appends a `rollout_event` row to
763/// `event_log` and writes each emitted `RolloutEffect` against the
764/// derived-view tables. Matches Phase 9b's eventual-consistency pattern
765/// (SR-2): the event_log row and the derived-view rows are sequential
766/// writes within the applier task, not a single SQL transaction.
767///
768/// `event_log_seq` on derived-view rows is NULL under the v0.2.1
769/// baseline (RFC-0008 §6.1 item 3 + v0.2.1-followups #1); the writer
770/// task is fire-and-forget so the applier doesn't know `seq` at co-
771/// write time.
772///
773/// Unknown rollout IDs are logged and dropped — the rollout may have
774/// been pruned or the event is for a CP-mirror view that hasn't caught
775/// up.
776pub(super) async fn process_rollout_event(
777    ctx: &ApplierCtx<'_>,
778    db: &Arc<Db>,
779    now: chrono::DateTime<chrono::Utc>,
780    event: RolloutEvent,
781) {
782    let rollout_id: RolloutId = rollout_event_rollout_id(&event).clone();
783
784    let row = match db.rollouts().state(rollout_id.as_str()) {
785        Ok(Some(row)) => row,
786        Ok(None) => {
787            tracing::debug!(
788                target: "cp_runtime",
789                rollout_id = %rollout_id,
790                event_kind = event.kind(),
791                "process_rollout_event: unknown rollout; dropping",
792            );
793            return;
794        }
795        Err(err) => {
796            tracing::error!(
797                target: "cp_runtime",
798                rollout_id = %rollout_id,
799                error = %err,
800                "process_rollout_event: state() query failed",
801            );
802            return;
803        }
804    };
805
806    let record = RolloutRecord {
807        rollout_id: row.rollout_id,
808        channel: row.channel,
809        target_ref: row.target_ref,
810        state: row.state,
811        current_wave: row.current_wave,
812        opened_event_log_seq: row.opened_event_log_seq,
813        last_transition_event_log_seq: row.last_transition_event_log_seq,
814        opened_at: row.opened_at,
815        terminal_at: row.terminal_at,
816        superseded_at: row.superseded_at,
817    };
818
819    append_event_log(
820        ctx,
821        now,
822        None,
823        Some(rollout_id.as_str()),
824        EventLogKind::RolloutEvent,
825        rollout_event_to_json(&event),
826    )
827    .await;
828
829    match rollout_sm::step(record, event.clone(), now) {
830        Ok((_new_record, effects)) => {
831            for effect in effects {
832                apply_rollout_effect(ctx, db, now, effect).await;
833            }
834        }
835        Err(err) => {
836            tracing::warn!(
837                target: "cp_runtime",
838                rollout_id = %rollout_id,
839                event_kind = event.kind(),
840                error = %err,
841                "process_rollout_event: rollout step() rejected",
842            );
843        }
844    }
845}
846
847async fn apply_rollout_effect(
848    ctx: &ApplierCtx<'_>,
849    db: &Arc<Db>,
850    now: chrono::DateTime<chrono::Utc>,
851    effect: RolloutEffect,
852) {
853    match effect {
854        RolloutEffect::RecordRolloutTransition {
855            rollout_id,
856            from,
857            to,
858            at,
859        } => {
860            if let Err(err) =
861                db.rollouts()
862                    .record_rollout_transition(rollout_id.as_str(), to, at, None)
863            {
864                tracing::error!(
865                    target: "cp_runtime",
866                    rollout_id = %rollout_id,
867                    from = from.as_db_str(),
868                    to = to.as_db_str(),
869                    error = %err,
870                    "RolloutEffect::RecordRolloutTransition: db write failed",
871                );
872            }
873            append_event_log(
874                ctx,
875                now,
876                None,
877                Some(rollout_id.as_str()),
878                EventLogKind::Effect,
879                json!({
880                    "effect": "RecordRolloutTransition",
881                    "rolloutId": rollout_id.as_str(),
882                    "from": from.as_db_str(),
883                    "to": to.as_db_str(),
884                    "at": at.to_rfc3339(),
885                }),
886            )
887            .await;
888        }
889        RolloutEffect::UpdateCurrentWave { rollout_id, wave } => {
890            if let Err(err) = db
891                .rollouts()
892                .set_current_wave(rollout_id.as_str(), wave, None)
893            {
894                tracing::error!(
895                    target: "cp_runtime",
896                    rollout_id = %rollout_id,
897                    wave,
898                    error = %err,
899                    "RolloutEffect::UpdateCurrentWave: db write failed",
900                );
901            }
902        }
903        RolloutEffect::InsertQuarantineFromRollout {
904            channel,
905            closure_hash,
906        } => {
907            if let Err(err) = db
908                .quarantined_closures()
909                .insert(&channel, &closure_hash, now, None)
910            {
911                tracing::error!(
912                    target: "cp_runtime",
913                    channel,
914                    closure_hash,
915                    error = %err,
916                    "RolloutEffect::InsertQuarantineFromRollout: db write failed",
917                );
918            }
919        }
920        RolloutEffect::SchedulePruning {
921            rollout_id,
922            delay_seconds,
923        } => {
924            // Out of scope for Phase 10b. Retention-driven pruning is the
925            // existing `prune_finished_rollouts` timer path; the
926            // reducer-driven event-emission cycle is v0.2.x follow-up
927            // territory (RFC-0008 §3 + §13).
928            tracing::debug!(
929                target: "cp_runtime",
930                rollout_id = %rollout_id,
931                delay_seconds,
932                "RolloutEffect::SchedulePruning: deferred to v0.2.x follow-up",
933            );
934        }
935    }
936}
937
938fn rollout_event_rollout_id(event: &RolloutEvent) -> &RolloutId {
939    match event {
940        RolloutEvent::RolloutOpened { rollout_id, .. }
941        | RolloutEvent::HostJoined { rollout_id, .. }
942        | RolloutEvent::HostStateChanged { rollout_id, .. }
943        | RolloutEvent::WaveAdvanced { rollout_id, .. }
944        | RolloutEvent::RolloutTerminal { rollout_id, .. }
945        | RolloutEvent::RetentionExpired { rollout_id, .. }
946        | RolloutEvent::OperatorClearance { rollout_id, .. } => rollout_id,
947        // `SuccessorOpened` carries the predecessor's id as the
948        // "rollout this event targets" — the successor opening is the
949        // separate `RolloutOpened` event.
950        RolloutEvent::SuccessorOpened {
951            superseded_rollout_id,
952            ..
953        } => superseded_rollout_id,
954    }
955}
956
957fn rollout_event_to_json(event: &RolloutEvent) -> Value {
958    match event {
959        RolloutEvent::RolloutOpened {
960            rollout_id,
961            channel,
962            target_ref,
963            at,
964        } => json!({
965            "kind": "RolloutOpened",
966            "rolloutId": rollout_id,
967            "channel": channel,
968            "targetRef": target_ref,
969            "at": at.to_rfc3339(),
970        }),
971        RolloutEvent::HostJoined {
972            rollout_id,
973            host_id,
974            wave,
975            at,
976        } => json!({
977            "kind": "HostJoined",
978            "rolloutId": rollout_id,
979            "hostId": host_id,
980            "wave": wave,
981            "at": at.to_rfc3339(),
982        }),
983        RolloutEvent::HostStateChanged {
984            rollout_id,
985            host_id,
986            from,
987            to,
988            at,
989        } => json!({
990            "kind": "HostStateChanged",
991            "rolloutId": rollout_id,
992            "hostId": host_id,
993            "from": host_state_str(*from),
994            "to": host_state_str(*to),
995            "at": at.to_rfc3339(),
996        }),
997        RolloutEvent::WaveAdvanced {
998            rollout_id,
999            from_wave,
1000            to_wave,
1001            at,
1002        } => json!({
1003            "kind": "WaveAdvanced",
1004            "rolloutId": rollout_id,
1005            "fromWave": from_wave,
1006            "toWave": to_wave,
1007            "at": at.to_rfc3339(),
1008        }),
1009        RolloutEvent::RolloutTerminal { rollout_id, at } => json!({
1010            "kind": "RolloutTerminal",
1011            "rolloutId": rollout_id,
1012            "at": at.to_rfc3339(),
1013        }),
1014        RolloutEvent::SuccessorOpened {
1015            superseded_rollout_id,
1016            successor_rollout_id,
1017            at,
1018        } => json!({
1019            "kind": "SuccessorOpened",
1020            "supersededRolloutId": superseded_rollout_id,
1021            "successorRolloutId": successor_rollout_id,
1022            "at": at.to_rfc3339(),
1023        }),
1024        RolloutEvent::RetentionExpired { rollout_id, at } => json!({
1025            "kind": "RetentionExpired",
1026            "rolloutId": rollout_id,
1027            "at": at.to_rfc3339(),
1028        }),
1029        RolloutEvent::OperatorClearance {
1030            rollout_id,
1031            operator,
1032            reason,
1033            at,
1034        } => json!({
1035            "kind": "OperatorClearance",
1036            "rolloutId": rollout_id,
1037            "operator": operator,
1038            "reason": reason,
1039            "at": at.to_rfc3339(),
1040        }),
1041    }
1042}
1043
1044/// Write a bare `rollout_event` entry to event_log without going
1045/// through the reducer. Used for `RolloutOpened` (creation marker;
1046/// reducer has nothing to validate) and for any out-of-band
1047/// rollout-level signal.
1048pub(super) async fn append_rollout_event(
1049    ctx: &ApplierCtx<'_>,
1050    now: chrono::DateTime<chrono::Utc>,
1051    rollout_id: &RolloutId,
1052    event: &RolloutEvent,
1053) {
1054    append_event_log(
1055        ctx,
1056        now,
1057        None,
1058        Some(rollout_id.as_str()),
1059        EventLogKind::RolloutEvent,
1060        rollout_event_to_json(event),
1061    )
1062    .await;
1063}
1064
1065fn host_state_str(s: HostState) -> &'static str {
1066    match s {
1067        HostState::Pending => "Pending",
1068        HostState::Activating => "Activating",
1069        HostState::Deferred => "Deferred",
1070        HostState::Soaking => "Soaking",
1071        HostState::Converged => "Converged",
1072        HostState::Failed => "Failed",
1073        HostState::Reverted => "Reverted",
1074    }
1075}
1076
1077fn probe_status_str(s: ProbeStatus) -> &'static str {
1078    match s {
1079        ProbeStatus::Pass => "pass",
1080        ProbeStatus::Fail => "fail",
1081    }
1082}
1083
1084fn probe_mode_str(m: nixfleet_state_machine::ProbeMode) -> &'static str {
1085    use nixfleet_state_machine::ProbeMode;
1086    match m {
1087        ProbeMode::Enforce => "enforce",
1088        ProbeMode::Observe => "observe",
1089        ProbeMode::Disabled => "disabled",
1090    }
1091}
1092
1093fn effect_kind(e: &Effect) -> &'static str {
1094    match e {
1095        Effect::LocalFireSwitch { .. } => "LocalFireSwitch",
1096        Effect::LocalFireRollbackTo { .. } => "LocalFireRollbackTo",
1097        Effect::LocalResetProbeCache { .. } => "LocalResetProbeCache",
1098        Effect::LocalEmitEvent { .. } => "LocalEmitEvent",
1099        Effect::RemoteQueueDispatch { .. } => "RemoteQueueDispatch",
1100        Effect::RemoteInsertQuarantine { .. } => "RemoteInsertQuarantine",
1101        Effect::RemoteOpenRolloutRecord { .. } => "RemoteOpenRolloutRecord",
1102        Effect::RemoteAppendEventLog { .. } => "RemoteAppendEventLog",
1103        Effect::RecordTransition { .. } => "RecordTransition",
1104        Effect::EmitMetric { .. } => "EmitMetric",
1105        Effect::EmitLog { .. } => "EmitLog",
1106    }
1107}
1108
1109/// Convert an `OutboundAgentEvent` to its event_log JSON payload. Schema is
1110/// the wire-side RFC-0005 §4.2 shape (camelCase). Hand-written because the
1111/// state-machine crate keeps its types serde-derive-free for now; if Phase
1112/// 7/8 adds `Serialize` we collapse this into a single `serde_json::to_value`.
1113fn outbound_event_to_json(payload: &OutboundAgentEvent) -> Value {
1114    match payload {
1115        OutboundAgentEvent::DispatchAck {
1116            current_closure_at_dispatch,
1117            received_at,
1118            seq,
1119        } => json!({
1120            "kind": "DispatchAck",
1121            "currentClosureAtDispatch": current_closure_at_dispatch,
1122            "receivedAt": received_at.to_rfc3339(),
1123            "seq": seq,
1124        }),
1125        OutboundAgentEvent::ActivationStarted {
1126            started_at,
1127            switch_method,
1128            seq,
1129        } => json!({
1130            "kind": "ActivationStarted",
1131            "startedAt": started_at.to_rfc3339(),
1132            "switchMethod": switch_method,
1133            "seq": seq,
1134        }),
1135        OutboundAgentEvent::ActivationCompleted {
1136            observed_current_closure,
1137            exit_code,
1138            completed_at,
1139            seq,
1140        } => json!({
1141            "kind": "ActivationCompleted",
1142            "observedCurrentClosure": observed_current_closure,
1143            "exitCode": exit_code,
1144            "completedAt": completed_at.to_rfc3339(),
1145            "seq": seq,
1146        }),
1147        OutboundAgentEvent::ActivationFailed {
1148            exit_code,
1149            stderr_tail,
1150            failed_at,
1151            seq,
1152        } => json!({
1153            "kind": "ActivationFailed",
1154            "exitCode": exit_code,
1155            "stderrTail": stderr_tail,
1156            "failedAt": failed_at.to_rfc3339(),
1157            "seq": seq,
1158        }),
1159        OutboundAgentEvent::ActivationDeferred {
1160            component,
1161            deferred_at,
1162            seq,
1163        } => json!({
1164            "kind": "ActivationDeferred",
1165            "component": component,
1166            "deferredAt": deferred_at.to_rfc3339(),
1167            "seq": seq,
1168        }),
1169        OutboundAgentEvent::ProbeTopologyDeclared {
1170            probes,
1171            declared_at,
1172            seq,
1173        } => json!({
1174            "kind": "ProbeTopologyDeclared",
1175            "probes": probes.iter().map(|e| json!({
1176                "probeName": e.probe_name,
1177                "kind": e.kind,
1178                "mode": probe_mode_str(e.mode),
1179            })).collect::<Vec<_>>(),
1180            "declaredAt": declared_at.to_rfc3339(),
1181            "seq": seq,
1182        }),
1183        OutboundAgentEvent::ProbeObservedFirst {
1184            probe_name,
1185            mode,
1186            observed_at,
1187            seq,
1188        } => json!({
1189            "kind": "ProbeObservedFirst",
1190            "probeName": probe_name,
1191            "mode": probe_mode_str(*mode),
1192            "observedAt": observed_at.to_rfc3339(),
1193            "seq": seq,
1194        }),
1195        OutboundAgentEvent::ProbeResult {
1196            probe_name,
1197            mode,
1198            status,
1199            observed_at,
1200            failure_reason,
1201            sub_results,
1202            seq,
1203        } => json!({
1204            "kind": "ProbeResult",
1205            "probeName": probe_name,
1206            "mode": probe_mode_str(*mode),
1207            "status": probe_status_str(*status),
1208            "observedAt": observed_at.to_rfc3339(),
1209            "failureReason": failure_reason,
1210            "subResults": sub_results.as_ref().map(|v| v.iter().map(|sr| json!({
1211                "controlId": sr.control_id,
1212                "status": probe_status_str(sr.status),
1213                "framework": sr.framework,
1214                "article": sr.article,
1215                "effectiveMode": probe_mode_str(sr.effective_mode),
1216                "overrideReason": sr.override_reason,
1217            })).collect::<Vec<_>>()),
1218            "seq": seq,
1219        }),
1220        OutboundAgentEvent::ProbeFailureFirst {
1221            probe_name,
1222            mode,
1223            first_failed_at,
1224            seq,
1225        } => json!({
1226            "kind": "ProbeFailureFirst",
1227            "probeName": probe_name,
1228            "mode": probe_mode_str(*mode),
1229            "firstFailedAt": first_failed_at.to_rfc3339(),
1230            "seq": seq,
1231        }),
1232        OutboundAgentEvent::Failed {
1233            failed_at,
1234            sustained_duration_secs,
1235            failing_probes,
1236            policy_applied,
1237            seq,
1238        } => json!({
1239            "kind": "Failed",
1240            "failedAt": failed_at.to_rfc3339(),
1241            "sustainedDurationSecs": sustained_duration_secs,
1242            "failingProbes": failing_probes,
1243            "policyApplied": policy_applied.to_string(),
1244            "seq": seq,
1245        }),
1246        OutboundAgentEvent::RollbackComplete {
1247            reverted_to_closure,
1248            exit_code,
1249            completed_at,
1250            seq,
1251        } => json!({
1252            "kind": "RollbackComplete",
1253            "revertedToClosure": reverted_to_closure,
1254            "exitCode": exit_code,
1255            "completedAt": completed_at.to_rfc3339(),
1256            "seq": seq,
1257        }),
1258        OutboundAgentEvent::Converged {
1259            converged_at,
1260            current_closure,
1261            seq,
1262        } => json!({
1263            "kind": "Converged",
1264            "convergedAt": converged_at.to_rfc3339(),
1265            "currentClosure": current_closure,
1266            "seq": seq,
1267        }),
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use super::*;
1274    use chrono::TimeZone;
1275    use nixfleet_state_machine::{ProbeMode, ProbeStatus, ProbeSubResult};
1276
1277    /// Pins the audit-trail-carrying field set on the wire shape of a
1278    /// `ProbeResult` row in `event_log`. Sub-results must surface
1279    /// `effectiveMode` + `overrideReason` end-to-end so an auditor
1280    /// inspecting signed payloads can answer "why was control X
1281    /// downgraded?" from the chain alone. Drops here historically
1282    /// reduced the audit-trail-recovery story to nulls — see the
1283    /// architect's BT-B' blocker.
1284    #[test]
1285    fn probe_result_json_carries_per_control_audit_fields() {
1286        let payload = OutboundAgentEvent::ProbeResult {
1287            probe_name: "evidence-nis2-essential".into(),
1288            mode: ProbeMode::Enforce,
1289            status: ProbeStatus::Fail,
1290            observed_at: chrono::Utc.with_ymd_and_hms(2026, 5, 18, 18, 5, 27).unwrap(),
1291            failure_reason: Some("encryption-at-rest failed".into()),
1292            sub_results: Some(vec![ProbeSubResult {
1293                control_id: "encryption-at-rest".into(),
1294                status: ProbeStatus::Fail,
1295                framework: "nis2-essential".into(),
1296                article: Some("21(h)".into()),
1297                effective_mode: ProbeMode::Observe,
1298                override_reason: Some("Q2 audit window: observe-mode default".into()),
1299            }]),
1300            seq: 7,
1301        };
1302        let v = outbound_event_to_json(&payload);
1303        let sub = &v["subResults"][0];
1304        assert_eq!(sub["controlId"], "encryption-at-rest");
1305        assert_eq!(sub["framework"], "nis2-essential");
1306        assert_eq!(sub["article"], "21(h)");
1307        assert_eq!(sub["effectiveMode"], "observe");
1308        assert_eq!(sub["overrideReason"], "Q2 audit window: observe-mode default");
1309    }
1310}