nixfleet_control_plane/runtime/
reducer.rs

1//! Reducer task body: the single mutator of CP-mirror state.
2//!
3//! Owns the cached `SignedManifestSet`, the per-channel quarantine set,
4//! and (transiently) the per-rollout `HostRolloutState` map it loads from
5//! the DB. Calls into `nixfleet_state_machine::step` on every `HostEvent`
6//! input, `nixfleet_reconciler::plan_next` on `ManifestSetUpdated` /
7//! `PlanTick`, and the applier (`apply_plan_action`/`apply_effect`) for
8//! every output.
9//!
10//! Invariant 1 from `runtime::mod`: this is the only place that calls
11//! `step()` or `plan_next()`. Workers and HTTP route handlers emit
12//! `ReducerInput` values; the applier executes the produced effects.
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18use chrono::{DateTime, Utc};
19use nixfleet_proto::RolloutPolicy;
20use nixfleet_proto::clock::ClockHandle;
21use nixfleet_reconciler::planner_types::{
22    FleetState, HostId, QuarantineSet, RolloutId, RolloutSummary, SignedManifestSet,
23};
24use nixfleet_state_machine::HostRolloutState;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27
28use super::applier::{ApplierCtx, apply_effect, apply_plan_action};
29use super::event_log_writer::EventLogTx;
30use super::{HeartbeatReply, ReducerInput};
31use crate::db::Db;
32use crate::server::AppState;
33
34/// Safety-net replan cadence. Triggers `plan_next` even when no event
35/// arrives — covers manifest_poll crashes mid-cycle, missed kicks, etc.
36const PLAN_TICK_INTERVAL: Duration = Duration::from_secs(15);
37
38pub(super) async fn run(
39    cancel: CancellationToken,
40    state: Arc<AppState>,
41    clock: ClockHandle,
42    mut input_rx: mpsc::Receiver<ReducerInput>,
43    event_log_tx: EventLogTx,
44    shutdown_senders: Vec<oneshot::Sender<()>>,
45) {
46    let _shutdown_guard = ShutdownGuard(shutdown_senders);
47
48    let mut reducer_state = ReducerState {
49        manifests: None,
50        quarantines: QuarantineSet::new(),
51        last_heartbeat_at: HashMap::new(),
52    };
53
54    let mut plan_ticker = tokio::time::interval(PLAN_TICK_INTERVAL);
55    plan_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
56
57    loop {
58        tokio::select! {
59            _ = cancel.cancelled() => {
60                tracing::info!(target: "shutdown", task = "cp_reducer", "task shut down");
61                return;
62            }
63            maybe_input = input_rx.recv() => {
64                let Some(input) = maybe_input else { return };
65                handle_input(&state, &clock, &event_log_tx, &mut reducer_state, input).await;
66            }
67            _ = plan_ticker.tick() => {
68                run_plan(&state, &clock, &event_log_tx, &reducer_state).await;
69            }
70        }
71    }
72}
73
74/// Reducer-task-private state. The DB-backed CP-mirror lives in
75/// `host_rollout_records`; this struct holds only the data the
76/// reducer can't or shouldn't go back to SQLite for on every input.
77struct ReducerState {
78    manifests: Option<SignedManifestSet>,
79    quarantines: QuarantineSet,
80    /// `last_heartbeat_at[hostname]` — in-memory only. Lost on CP restart;
81    /// the agent's next heartbeat re-seeds within seconds. Operator-
82    /// observable freshness, not gate input.
83    last_heartbeat_at: HashMap<String, DateTime<Utc>>,
84}
85
86async fn handle_input(
87    state: &Arc<AppState>,
88    clock: &ClockHandle,
89    event_log_tx: &EventLogTx,
90    rs: &mut ReducerState,
91    input: ReducerInput,
92) {
93    match input {
94        ReducerInput::HostEvent {
95            host,
96            rollout_id,
97            event,
98        } => {
99            handle_host_event(state, clock, event_log_tx, rs, &host, &rollout_id, event).await;
100        }
101        ReducerInput::ManifestSetUpdated(set) => {
102            rs.manifests = Some(*set);
103            run_plan(state, clock, event_log_tx, rs).await;
104        }
105        ReducerInput::HeartbeatReceived {
106            host,
107            rollout_id,
108            current_closure,
109            at,
110            reply,
111        } => {
112            handle_heartbeat(
113                state,
114                clock,
115                event_log_tx,
116                rs,
117                &host,
118                rollout_id,
119                current_closure,
120                at,
121                reply,
122            )
123            .await;
124        }
125        ReducerInput::PlanTick => {
126            run_plan(state, clock, event_log_tx, rs).await;
127        }
128    }
129}
130
131async fn handle_host_event(
132    state: &Arc<AppState>,
133    clock: &ClockHandle,
134    event_log_tx: &EventLogTx,
135    rs: &mut ReducerState,
136    host: &str,
137    rollout_id: &nixfleet_proto::RolloutId,
138    event: nixfleet_state_machine::Event,
139) {
140    let Some(db) = state.db.as_ref() else {
141        tracing::warn!(
142            target: "cp_reducer",
143            host, rollout_id = %rollout_id,
144            "HostEvent: no DB attached; skipping",
145        );
146        return;
147    };
148
149    // Load current state. Absence is legal — first event from an agent on
150    // a fresh rollout. The state machine's transitions starting from
151    // Pending require an existing record though; if absent, we drop +
152    // log so the operator sees a "saw event for unknown host" alert.
153    let prior = match db.host_rollout_records().load(rollout_id.as_str(), host) {
154        Ok(Some(s)) => s,
155        Ok(None) => {
156            tracing::warn!(
157                target: "cp_reducer",
158                host, rollout_id = %rollout_id,
159                "HostEvent: no host_rollout_records row; dropping (planner-side OpenRollout creates Pending records — was this an out-of-order arrival?)",
160            );
161            return;
162        }
163        Err(err) => {
164            tracing::error!(
165                target: "cp_reducer",
166                host, rollout_id = %rollout_id,
167                error = %err,
168                "HostEvent: load failed",
169            );
170            return;
171        }
172    };
173
174    // Dedup: drop seq <= last_event_seq. The events route does the same
175    // check, but a race between two simultaneous POSTs can have both pass
176    // the route check before either updates the DB.
177    let incoming_seq = event.seq();
178    if incoming_seq <= prior.last_event_seq {
179        tracing::debug!(
180            target: "cp_reducer",
181            host, rollout_id = %rollout_id,
182            incoming_seq,
183            last_event_seq = prior.last_event_seq,
184            "HostEvent: duplicate seq; dropping",
185        );
186        return;
187    }
188
189    let Some(manifests) = rs.manifests.as_ref() else {
190        tracing::warn!(
191            target: "cp_reducer",
192            host, rollout_id = %rollout_id,
193            "HostEvent: no cached SignedManifestSet; cannot resolve RolloutPolicy. Dropping (manifest_poll will warm the cache and a retry will succeed).",
194        );
195        return;
196    };
197    let policy = match resolve_policy(manifests, &prior.channel) {
198        Some(p) => p.clone(),
199        None => {
200            tracing::warn!(
201                target: "cp_reducer",
202                host, rollout_id = %rollout_id,
203                channel = %prior.channel,
204                "HostEvent: rollout_policy not found in cached manifests; dropping",
205            );
206            return;
207        }
208    };
209
210    let now = clock.now();
211    let prior_host_state = prior.state;
212    let (mut next_state, effects) = match nixfleet_state_machine::step(prior, event, now, &policy) {
213        Ok(out) => out,
214        Err(err) => {
215            tracing::warn!(
216                target: "cp_reducer",
217                host, rollout_id = %rollout_id,
218                error = %err,
219                "HostEvent: step() rejected — illegal transition or out-of-order event",
220            );
221            return;
222        }
223    };
224    next_state.last_event_seq = incoming_seq;
225    let next_host_state = next_state.state;
226
227    if let Err(err) = db.host_rollout_records().upsert(&next_state) {
228        tracing::error!(
229            target: "cp_reducer",
230            host, rollout_id = %rollout_id,
231            error = %err,
232            "HostEvent: host_rollout_records upsert failed",
233        );
234        return;
235    }
236
237    let ctx = ApplierCtx {
238        state,
239        manifests,
240        clock,
241        event_log_tx,
242    };
243    for effect in effects {
244        apply_effect(&ctx, effect).await;
245    }
246
247    // RFC-0008 §7 reducer composition: per-host transitions feed the
248    // rollout reducer as `HostStateChanged`; aggregate signals (e.g.,
249    // "all hosts in this rollout reached Converged") are computed
250    // applier-side from `host_rollout_records` and emitted as
251    // `RolloutTerminal`.
252    if prior_host_state != next_host_state {
253        super::applier::process_rollout_event(
254            &ctx,
255            db,
256            now,
257            nixfleet_state_machine::rollout::RolloutEvent::HostStateChanged {
258                rollout_id: rollout_id.clone(),
259                host_id: host.to_string(),
260                from: prior_host_state,
261                to: next_host_state,
262                at: now,
263            },
264        )
265        .await;
266
267        // Terminal aggregate: if every host in this rollout has reached
268        // Converged, emit `RolloutTerminal` so the rollout reducer
269        // transitions to Terminal (RFC-0008 §3 invariant: "Terminal ⇒
270        // ∀ host ∈ rollout: state == Converged").
271        if next_host_state == nixfleet_state_machine::HostState::Converged
272            && let Ok(rows) = db
273                .host_rollout_records()
274                .all_for_rollout(rollout_id.as_str())
275            && !rows.is_empty()
276            && rows
277                .iter()
278                .all(|r| r.state == nixfleet_state_machine::HostState::Converged)
279        {
280            super::applier::process_rollout_event(
281                &ctx,
282                db,
283                now,
284                nixfleet_state_machine::rollout::RolloutEvent::RolloutTerminal {
285                    rollout_id: rollout_id.clone(),
286                    at: now,
287                },
288            )
289            .await;
290        }
291    }
292}
293
294// Threads the same reducer-task dependencies (state, clock,
295// event_log_tx, rs) as handle_host_event, plus the heartbeat envelope
296// (host, rollout_id, current_closure, at, reply). Refactoring to a
297// context struct would obscure the call site at handle_input where the
298// reducer dispatches inputs. The lint is fine to suppress here.
299#[allow(clippy::too_many_arguments)]
300async fn handle_heartbeat(
301    state: &Arc<AppState>,
302    clock: &ClockHandle,
303    event_log_tx: &EventLogTx,
304    rs: &mut ReducerState,
305    host: &str,
306    rollout_id: Option<nixfleet_proto::RolloutId>,
307    current_closure: Option<String>,
308    at: DateTime<Utc>,
309    reply: oneshot::Sender<HeartbeatReply>,
310) {
311    rs.last_heartbeat_at.insert(host.to_string(), at);
312
313    // Boot-recovery retroactive confirmation (RFC-0005 §9.5).
314    // Closes the "agent restart mid-Activating leaves CP forever stuck
315    // at Activating" defect. The flow: an agent's
316    // `nixfleet-agent.service` restart kills the in-flight verify_poll
317    // before it can emit LocalActivationCompleted. The new agent's
318    // boot-recovery handshake reports `current_closure` (read from
319    // /run/current-system) but no rollout_id, so the steady-state
320    // replay_from path can't match. Here we scan active
321    // host_rollout_records for this hostname; if any record's
322    // target_closure matches the agent's current_closure AND state is
323    // Activating or Deferred, we synthesize
324    // `Event::RemoteActivationCompleted` and feed it through
325    // `handle_host_event` — same path the wire-borne version takes. CP
326    // transitions Activating/Deferred → Soaking, populates
327    // activation_completed_at, the planner unblocks, the cascade
328    // continues. Recovery.rs:45-51 documented this design intent ("CP
329    // synthesises an ActivationCompleted-shaped Replay-From event").
330    //
331    // Synthesis runs BEFORE bootstrap + reply: the bootstrap reflects
332    // post-synthesis state (e.g. Soaking, not Activating). The reducer
333    // is single-threaded so the read-modify-read is race-free;
334    // synthesis is in-process and well under the route's
335    // REDUCER_REPLY_TIMEOUT.
336    if let Some(agent_current) = current_closure.as_deref() {
337        maybe_synthesize_recovery_completion(
338            state,
339            clock,
340            event_log_tx,
341            rs,
342            host,
343            agent_current,
344            at,
345        )
346        .await;
347    }
348
349    let replay_from = compute_replay_from(
350        state,
351        host,
352        rollout_id.as_ref().map(|r| r.as_str()),
353        current_closure.as_deref(),
354    );
355
356    // When the agent's heartbeat carried no rollout_id (the
357    // boot-recovery shape — agent's reducer is empty post-restart),
358    // but CP holds non-terminal records for the host, build a
359    // bootstrap snapshot per record. The agent's runtime applies each
360    // snapshot to its in-memory HostRolloutState before workers spawn,
361    // restoring the cache so probe runners + advance-ticker resume
362    // work post-restart. Steady-state heartbeats (rollout_id
363    // populated) skip this — the agent's reducer already knows.
364    let bootstrap_rollouts = if rollout_id.is_none() {
365        build_bootstrap_for_host(state, host)
366    } else {
367        Vec::new()
368    };
369
370    let _ = reply.send(HeartbeatReply {
371        replay_from,
372        bootstrap_rollouts,
373    });
374}
375
376/// Scan active records for `host` and produce a
377/// `HostRolloutSnapshot` per record. Called only on
378/// boot-recovery-shaped heartbeats (rollout_id=None). Order is
379/// deterministic by (rollout_id, hostname) PK in SQL; the agent
380/// applies them in arrival order.
381fn build_bootstrap_for_host(
382    state: &Arc<AppState>,
383    host: &str,
384) -> Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot> {
385    let Some(db) = state.db.as_ref() else {
386        return Vec::new();
387    };
388    let records = match db.host_rollout_records().active_for_host(host) {
389        Ok(r) => r,
390        Err(err) => {
391            tracing::warn!(
392                target: "cp_reducer",
393                host,
394                error = %err,
395                "bootstrap build: active_for_host load failed; returning empty",
396            );
397            return Vec::new();
398        }
399    };
400    records
401        .into_iter()
402        .map(host_rollout_state_to_snapshot)
403        .collect()
404}
405
406fn host_rollout_state_to_snapshot(
407    record: nixfleet_state_machine::HostRolloutState,
408) -> nixfleet_proto::agent_wire::HostRolloutSnapshot {
409    use nixfleet_proto::HostRolloutState as WireState;
410    use nixfleet_state_machine::HostState;
411    let wire_state = match record.state {
412        HostState::Pending => WireState::Pending,
413        HostState::Activating => WireState::Activating,
414        HostState::Deferred => WireState::Deferred,
415        HostState::Soaking => WireState::Soaking,
416        HostState::Converged => WireState::Converged,
417        HostState::Failed => WireState::Failed,
418        HostState::Reverted => WireState::Reverted,
419    };
420    nixfleet_proto::agent_wire::HostRolloutSnapshot {
421        rollout_id: record.rollout_id,
422        hostname: record.hostname,
423        channel: record.channel,
424        state: wire_state,
425        target_closure: record.target_closure,
426        current_closure_at_dispatch: record.current_closure_at_dispatch,
427        current_closure: record.current_closure,
428        dispatched_at: record.dispatched_at,
429        dispatch_acked_at: record.dispatch_acked_at,
430        activation_started_at: record.activation_started_at,
431        activation_completed_at: record.activation_completed_at,
432        soak_due_at: record.soak_due_at,
433        last_event_seq: record.last_event_seq,
434    }
435}
436
437/// Scan active host_rollout_records for `host`; for each record whose
438/// observed closure on the wire identifies a missed transition,
439/// synthesize the event chain that advances the row to a state
440/// consistent with the agent's reality. Idempotent: if the state has
441/// already advanced (e.g. concurrent agent emit), the record won't
442/// match and the synthesis is a no-op.
443///
444/// LOADBEARING: this is the CP-side half of architecture.md §305
445/// acceptance gate 1 ("destroying the CP database and rebuilding
446/// from empty state results in full fleet visibility within one
447/// reconcile cycle, with zero operator intervention beyond restarting
448/// the service"). The agent's heartbeat carries `current_closure` on
449/// every tick; CP rebuilds soft-state HRR rows from those inputs.
450///
451/// Four reachable recovery cases:
452///   - `Activating` + `current == target`: agent restarted
453///     mid-rollout, post-boot observed the activation took.
454///     Synthesise `RemoteActivationCompleted`.
455///   - `Deferred`   + `current == target`: operator rebooted to
456///     finish a critical-component activation. Same synthesis.
457///   - `Pending`    + `current == target`: CP itself was wiped, the
458///     planner re-opened the rollout in `Pending`, but the agent has
459///     been running the target closure all along. Synthesise the
460///     full `RemoteDispatchAck → RemoteActivationCompleted →
461///     RemoteConverged` chain. `RemoteConverged`'s soak-elapsed
462///     invariant is satisfied by stamping `converged_at = max(at,
463///     record.soak_due_at)` — the soak window's purpose (give probes
464///     time to fail) was exercised pre-wipe. Gated by
465///     `evaluate_synth_gates` so a fleet bump that opens a Pending
466///     row for a host whose closure already matches doesn't bypass
467///     channel-edges or wave-promotion ordering.
468///   - `Failed`     + `current == current_closure_at_dispatch`: the
469///     rollback's switch-to-configuration restarted the agent
470///     mid-VerifyPoll, dropping `LocalRollbackCompleted`. Synthesise
471///     `RemoteRollbackComplete` on the heartbeat that observes the
472///     rolled-back closure; the canonical `Failed` reducer arm
473///     (`failed.rs::RemoteRollbackComplete`) produces the quarantine
474///     + event_log + transition effects.
475async fn maybe_synthesize_recovery_completion(
476    state: &Arc<AppState>,
477    clock: &ClockHandle,
478    event_log_tx: &EventLogTx,
479    rs: &mut ReducerState,
480    host: &str,
481    agent_current: &str,
482    at: DateTime<Utc>,
483) {
484    let Some(db) = state.db.as_ref() else {
485        return;
486    };
487    let records = match db.host_rollout_records().active_for_host(host) {
488        Ok(r) => r,
489        Err(err) => {
490            tracing::warn!(
491                target: "cp_reducer",
492                host,
493                error = %err,
494                "boot-recovery synthesis: active_for_host load failed; skipping",
495            );
496            return;
497        }
498    };
499    for record in records {
500        let rollout_id = record.rollout_id.clone();
501        match record.state {
502            nixfleet_state_machine::HostState::Activating
503            | nixfleet_state_machine::HostState::Deferred
504                if record.target_closure == agent_current =>
505            {
506                tracing::info!(
507                    target: "cp_reducer",
508                    host,
509                    rollout_id = %record.rollout_id,
510                    target = %record.target_closure,
511                    prior_state = ?record.state,
512                    "post-restart recovery: synthesizing RemoteActivationCompleted (RFC-0005 §9.5)",
513                );
514                let synth_event = nixfleet_state_machine::Event::RemoteActivationCompleted {
515                    observed_current_closure: agent_current.to_string(),
516                    exit_code: 0,
517                    completed_at: at,
518                    seq: record.last_event_seq + 1,
519                };
520                handle_host_event(
521                    state,
522                    clock,
523                    event_log_tx,
524                    rs,
525                    host,
526                    &rollout_id,
527                    synth_event,
528                )
529                .await;
530            }
531            nixfleet_state_machine::HostState::Pending
532                if record.target_closure == agent_current =>
533            {
534                // LOADBEARING: consult the dispatch gates before
535                // synthesising the chain. The agent's heartbeat
536                // reporting `current_closure == target_closure`
537                // legitimately means "I've been on this closure all
538                // along" — but only safe to fast-forward to Converged
539                // when the gates that would have applied at dispatch
540                // are satisfied. Channel-edges in particular: a fleet
541                // bump that leaves a host's closure unchanged still
542                // opens a new Pending HRR for that host; without this
543                // gate check the synth races past `stable` Converging
544                // while its predecessor `edge` is still Active,
545                // silently bypassing the RFC-0002 §4.3 ordering
546                // contract. If any gate blocks, skip the synth — the
547                // host stays Pending and the next regular plan tick
548                // dispatches it when the gate clears.
549                let gate_block = match rs.manifests.as_ref() {
550                    Some(manifests) => evaluate_synth_gates(
551                        db,
552                        manifests,
553                        &rs.quarantines,
554                        &record,
555                    ),
556                    None => None,
557                };
558                if let Some(reason) = gate_block {
559                    tracing::info!(
560                        target: "cp_reducer",
561                        host,
562                        rollout_id = %record.rollout_id,
563                        gate_block = %reason,
564                        "synth-converge held: dispatch gate would block; staying Pending until next plan tick",
565                    );
566                    continue;
567                }
568                synthesize_pending_to_converged(
569                    state,
570                    clock,
571                    event_log_tx,
572                    rs,
573                    &record,
574                    agent_current,
575                    at,
576                )
577                .await;
578            }
579            nixfleet_state_machine::HostState::Failed
580                if record.current_closure_at_dispatch.as_deref() == Some(agent_current) =>
581            {
582                // Agent rollback-ack is non-durable: the rollback's
583                // switch-to-configuration restarts the agent
584                // mid-VerifyPoll, SIGTERMing it before
585                // `LocalRollbackCompleted` can be emitted. The new
586                // agent PID comes up on the rolled-back closure with
587                // no memory of the pending event; the rollback
588                // completed operationally but CP keeps the HRR in
589                // `Failed` indefinitely. Detect this from the
590                // post-restart heartbeat (current_closure ==
591                // current_closure_at_dispatch, i.e. the closure the
592                // agent ran before the failed activation = the
593                // rollback target) and synthesise the missing
594                // `RemoteRollbackComplete`. The reducer's `Failed`
595                // arm in `failed.rs` produces the canonical effects:
596                // `RemoteInsertQuarantine` for the bad closure +
597                // `RemoteAppendEventLog` for the audit trail + the
598                // `Failed → Reverted` state transition.
599                //
600                // No gates apply: the agent's rollback decision was
601                // policy-driven from the signed manifest's
602                // `onHealthFailure`, not gate-driven, so there's
603                // nothing for CP to second-guess on synthesis.
604                tracing::info!(
605                    target: "cp_reducer",
606                    host,
607                    rollout_id = %record.rollout_id,
608                    reverted_to = %agent_current,
609                    "post-rollback-restart recovery: synthesizing RemoteRollbackComplete",
610                );
611                let synth_event = nixfleet_state_machine::Event::RemoteRollbackComplete {
612                    reverted_to_closure: agent_current.to_string(),
613                    exit_code: 0,
614                    completed_at: at,
615                    seq: record.last_event_seq + 1,
616                };
617                handle_host_event(
618                    state,
619                    clock,
620                    event_log_tx,
621                    rs,
622                    host,
623                    &rollout_id,
624                    synth_event,
625                )
626                .await;
627            }
628            _ => continue,
629        }
630    }
631}
632
633/// Run the dispatch-gate evaluation against a Pending HRR record so
634/// `maybe_synthesize_recovery_completion` can refuse to fast-forward
635/// Pending → Converged when an active gate (channel-edges,
636/// wave-promotion, quarantine, etc.) would still hold the host.
637///
638/// Returns `Some(reason)` when a gate blocks; the caller leaves the
639/// host in Pending and lets the next regular plan tick handle it.
640fn evaluate_synth_gates(
641    db: &Arc<Db>,
642    manifests: &SignedManifestSet,
643    quarantines: &QuarantineSet,
644    record: &nixfleet_state_machine::HostRolloutState,
645) -> Option<String> {
646    let fleet_state = build_fleet_state(db, manifests).ok()?;
647    let host_id: HostId = record.hostname.clone();
648    let channel: nixfleet_reconciler::planner_types::ChannelId =
649        record.channel.clone();
650    let target: nixfleet_reconciler::planner_types::ClosureHash =
651        record.target_closure.clone();
652    nixfleet_reconciler::planner_gates::evaluate_for_dispatch(
653        &fleet_state,
654        manifests,
655        quarantines,
656        &record.rollout_id,
657        &host_id,
658        &target,
659        &channel,
660        &std::collections::HashMap::new(),
661    )
662    .map(|b| format!("{b:?}"))
663}
664
665/// Drive a `Pending` HRR row through the full lifecycle to
666/// `Converged` when the agent reports `current_closure == target`.
667/// The chain preserves the event-log audit trail (RFC-0004 §1):
668/// every transition emits its usual `RemoteAppendEventLog` effect
669/// flagged with the synthesis context via the `seq` ordering relative
670/// to the pre-synthesis `last_event_seq`.
671///
672/// LOADBEARING: callers MUST first verify that the dispatch gates
673/// (channel-edges, wave-promotion, quarantine, …) wouldn't have
674/// blocked dispatch — see `evaluate_synth_gates`. Without the gate
675/// check, a fleet bump that leaves a host's closure unchanged
676/// silently fast-forwards the host to Converged on the very first
677/// post-bump heartbeat, bypassing the RFC-0002 §4.3 channel-edges
678/// ordering contract.
679async fn synthesize_pending_to_converged(
680    state: &Arc<AppState>,
681    clock: &ClockHandle,
682    event_log_tx: &EventLogTx,
683    rs: &mut ReducerState,
684    record: &nixfleet_state_machine::HostRolloutState,
685    agent_current: &str,
686    at: DateTime<Utc>,
687) {
688    let host = record.hostname.as_str();
689    let rollout_id = &record.rollout_id;
690    tracing::info!(
691        target: "cp_reducer",
692        host,
693        rollout_id = %rollout_id,
694        target = %record.target_closure,
695        "post-wipe recovery: synthesizing Pending → Converged chain (architecture.md §305)",
696    );
697
698    // 1. Pending → Activating. `current_closure_at_dispatch` is the
699    //    pre-dispatch closure; CP has no way to know it post-wipe.
700    //    Empty string is the documented placeholder — rollback never
701    //    fires from a synthesis chain that lands at Converged (terminal),
702    //    so the rollback-target ambiguity is inert.
703    let dispatch_ack = nixfleet_state_machine::Event::RemoteDispatchAck {
704        current_closure_at_dispatch: String::new(),
705        received_at: at,
706        seq: record.last_event_seq + 1,
707    };
708    handle_host_event(state, clock, event_log_tx, rs, host, rollout_id, dispatch_ack).await;
709
710    // 2. Activating → Soaking.
711    let activation_completed = nixfleet_state_machine::Event::RemoteActivationCompleted {
712        observed_current_closure: agent_current.to_string(),
713        exit_code: 0,
714        completed_at: at,
715        seq: record.last_event_seq + 2,
716    };
717    handle_host_event(
718        state,
719        clock,
720        event_log_tx,
721        rs,
722        host,
723        rollout_id,
724        activation_completed,
725    )
726    .await;
727
728    // 3. Soaking → Converged. `converged_at` is anchored to
729    //    `soak_due_at` when the heartbeat arrives before soak has
730    //    elapsed, so soaking.rs's `converged_at >= soak_due_at`
731    //    invariant passes. The actual agent-side convergence happened
732    //    pre-wipe; CP can't reconstruct that timestamp, so it stamps
733    //    the post-wipe-floor instead.
734    let Some(db) = state.db.as_ref() else {
735        return;
736    };
737    let post_activation = match db.host_rollout_records().load(rollout_id.as_str(), host) {
738        Ok(Some(r)) => r,
739        _ => return,
740    };
741    let synth_converged_at = match post_activation.soak_due_at {
742        Some(soak_due) if soak_due > at => soak_due,
743        _ => at,
744    };
745    let converged = nixfleet_state_machine::Event::RemoteConverged {
746        converged_at: synth_converged_at,
747        current_closure: agent_current.to_string(),
748        seq: record.last_event_seq + 3,
749    };
750    handle_host_event(state, clock, event_log_tx, rs, host, rollout_id, converged).await;
751}
752
753fn compute_replay_from(
754    state: &Arc<AppState>,
755    host: &str,
756    rollout_id: Option<&str>,
757    current_closure: Option<&str>,
758) -> Option<u64> {
759    let db = state.db.as_ref()?;
760    let rollout_id = rollout_id?;
761    let agent_closure = current_closure?;
762    let record = db
763        .host_rollout_records()
764        .load(rollout_id, host)
765        .ok()
766        .flatten()?;
767
768    let cp_closure = record.current_closure.as_deref();
769    match cp_closure {
770        Some(cp) if cp == agent_closure => None,
771        // Drift: agent reports a closure CP didn't see acknowledged.
772        // Replay-From = CP's last_event_seq + 1 (the next seq CP hasn't
773        // seen yet); agent re-POSTs everything from there.
774        _ => Some(record.last_event_seq),
775    }
776}
777
778async fn run_plan(
779    state: &Arc<AppState>,
780    clock: &ClockHandle,
781    event_log_tx: &EventLogTx,
782    rs: &ReducerState,
783) {
784    let Some(manifests) = rs.manifests.as_ref() else {
785        // Cold start: the manifest_poll worker hasn't primed yet.
786        // Periodic ticks land here harmlessly until it does.
787        return;
788    };
789    let Some(db) = state.db.as_ref() else {
790        return;
791    };
792
793    let now = clock.now();
794    let mut fleet_state = match build_fleet_state(db, manifests) {
795        Ok(fs) => fs,
796        Err(err) => {
797            tracing::error!(
798                target: "cp_reducer",
799                error = %err,
800                "run_plan: FleetState construction failed; skipping plan tick",
801            );
802            return;
803        }
804    };
805
806    // Advance `rollouts.current_wave` for any rollout whose current
807    // wave has fully Converged. Must happen BEFORE plan_next so the
808    // wave_promotion gate sees the new value on the same tick (next
809    // wave's hosts go through immediately rather than waiting on the
810    // periodic safety-net replan).
811    advance_current_waves(db, manifests, &mut fleet_state).await;
812
813    let actions =
814        nixfleet_reconciler::planner::plan_next(manifests, &fleet_state, &rs.quarantines, now);
815
816    let ctx = ApplierCtx {
817        state,
818        manifests,
819        clock,
820        event_log_tx,
821    };
822    for action in actions {
823        apply_plan_action(&ctx, action).await;
824    }
825}
826
827/// Promote each rollout's `current_wave` when every host in the current
828/// wave has reached `HostState::Converged`. The check reads the host
829/// list from the verified manifest's `fleet.waves[channel][current_wave]`
830/// — the same path the wave_promotion gate uses for `host_wave`, so the
831/// bump and the gate stay in lock-step.
832///
833/// Empty waves are NOT auto-promoted. Same invariant as
834/// `planner::maybe_mark_terminal`: an empty wave that produced no host
835/// records vacuously satisfies "all converged"; treating that as
836/// promotion-eligible would walk the wave pointer past content that
837/// later arrives. Pin via the `has_any_host` guard.
838async fn advance_current_waves(
839    db: &Arc<Db>,
840    manifests: &SignedManifestSet,
841    fleet_state: &mut FleetState,
842) {
843    let fleet = manifests.fleet();
844    let mut bumps: Vec<(RolloutId, u32)> = Vec::new();
845    for (rollout_id, summary) in fleet_state.rollouts.iter() {
846        if summary.terminal_at.is_some() {
847            continue;
848        }
849        let Some(channel_waves) = fleet.waves.get(&summary.channel) else {
850            continue;
851        };
852        let current = summary.current_wave as usize;
853        if current + 1 >= channel_waves.len() {
854            // No next wave to promote into.
855            continue;
856        }
857        let Some(current_wave) = channel_waves.get(current) else {
858            continue;
859        };
860        if current_wave.hosts.is_empty() {
861            continue;
862        }
863        // LOADBEARING: a wave is "done participating" when every host
864        // is ordering-eligible — Converged OR Deferred (per
865        // RFC-0005 §3 terminal-for-ordering). Deferred means
866        // activation is staged but live-switch was skipped
867        // (critical-component swap pending reboot); the host has done
868        // what it can within the rollout step, so successor waves
869        // should not stall waiting on it. Health verification (probes
870        // + soak) still happens after operator reboot via
871        // `handle_heartbeat`'s recovery synthesis (Deferred → Soaking).
872        let all_ordering_eligible = current_wave.hosts.iter().all(|host| {
873            fleet_state
874                .host_states
875                .get(&(rollout_id.clone(), host.clone()))
876                .map(|s| {
877                    matches!(
878                        s.state,
879                        nixfleet_state_machine::HostState::Converged
880                            | nixfleet_state_machine::HostState::Deferred
881                    )
882                })
883                .unwrap_or(false)
884        });
885        if all_ordering_eligible {
886            bumps.push((rollout_id.clone(), summary.current_wave + 1));
887        }
888    }
889
890    for (rollout_id, next_wave) in bumps {
891        // FK is populated by the rollout reducer's
892        // `RolloutEffect::UpdateCurrentWave`; the planner passes
893        // None per RFC-0008 §6.1 item 3.
894        match db
895            .rollouts()
896            .set_current_wave(rollout_id.as_str(), next_wave, None)
897        {
898            Ok(_) => {
899                if let Some(s) = fleet_state.rollouts.get_mut(&rollout_id) {
900                    s.current_wave = next_wave;
901                }
902                tracing::info!(
903                    target: "cp_reducer",
904                    %rollout_id,
905                    next_wave,
906                    "advance_current_waves: bumped current_wave (every host in prior wave Converged)",
907                );
908            }
909            Err(err) => {
910                tracing::error!(
911                    target: "cp_reducer",
912                    %rollout_id,
913                    next_wave,
914                    error = %err,
915                    "advance_current_waves: set_current_wave failed",
916                );
917            }
918        }
919    }
920}
921
922/// Build a fresh `FleetState` from the DB. Called on every plan tick;
923/// at v0.2 scale (≤256 hosts, ≤8 active rollouts) the SELECTs are
924/// negligible.
925fn build_fleet_state(db: &Arc<Db>, manifests: &SignedManifestSet) -> anyhow::Result<FleetState> {
926    let mut host_states: HashMap<(RolloutId, HostId), HostRolloutState> = HashMap::new();
927    let mut rollouts: HashMap<RolloutId, RolloutSummary> = HashMap::new();
928
929    // For each channel with a verified rollout manifest, load all
930    // host_rollout_records under the manifested rollout_id so the
931    // planner can walk Pending hosts and run gates. Channels whose
932    // rollout hasn't been opened yet (no DB row, no Pending records)
933    // surface through the OpenRollout emission path above.
934    for (channel, vm) in &manifests.rollouts {
935        let manifest = vm.inner();
936        // Canonical RolloutId construction (RFC-0008 §6.3): matches
937        // the planner's `RolloutId::new(channel, channel_ref)` so
938        // lookups by rollout_id succeed even when multiple channels
939        // share a channel_ref.
940        let rollout_id = nixfleet_proto::RolloutId::new(channel, &manifest.channel_ref);
941
942        let rows = db
943            .host_rollout_records()
944            .all_for_rollout(rollout_id.as_str())?;
945        for row in rows {
946            host_states.insert((rollout_id.clone(), row.hostname.clone()), row);
947        }
948
949        // RolloutSummary metadata. Full row from the `rollouts` table
950        // (RFC-0008 §6.3). Missing row ⇒ rollout not opened yet ⇒ omit
951        // from `rollouts` map (gates that require RolloutSummary for
952        // in-flight reasoning correctly see "not yet open" for this
953        // channel).
954        if let Ok(Some(row)) = db.rollouts().state(rollout_id.as_str()) {
955            rollouts.insert(
956                rollout_id.clone(),
957                RolloutSummary {
958                    rollout_id: rollout_id.clone(),
959                    channel: channel.clone(),
960                    target_ref: manifest.channel_ref.clone(),
961                    opened_at: row.opened_at,
962                    terminal_at: row.terminal_at,
963                    current_wave: row.current_wave,
964                    budgets: manifest.disruption_budgets.clone(),
965                },
966            );
967        }
968    }
969
970    // Distinct outstanding enforce-mode probe failures per
971    // (rollout, host). Feeds the compliance_wave gate
972    // (planner_gates::compliance_wave) per RFC-0007 §7.2.
973    let outstanding_failing_enforce_probes = db
974        .probe_failures()
975        .outstanding_failing_enforce_probes_by_rollout()
976        .unwrap_or_else(|err| {
977            tracing::warn!(
978                target: "cp_reducer",
979                error = %err,
980                "build_fleet_state: outstanding_failing_enforce_probes query failed; \
981                 falling back to empty map (compliance_wave gate inert this tick)",
982            );
983            HashMap::new()
984        });
985
986    Ok(FleetState {
987        host_states,
988        rollouts,
989        outstanding_failing_enforce_probes,
990    })
991}
992
993fn resolve_policy<'a>(
994    manifests: &'a SignedManifestSet,
995    channel: &str,
996) -> Option<&'a RolloutPolicy> {
997    let fleet = manifests.fleet();
998    let channel_entry = fleet.channels.get(channel)?;
999    fleet.rollout_policies.get(&channel_entry.rollout_policy)
1000}
1001
1002/// RAII container for per-worker shutdown senders. Holding it in scope
1003/// ensures workers receive shutdown signal exactly when this task exits.
1004struct ShutdownGuard(#[allow(dead_code)] Vec<oneshot::Sender<()>>);
1005
1006#[cfg(test)]
1007mod tests {
1008    //! Regression coverage for the FleetState builder.
1009    //!
1010    //! Per-gate behaviour on populated fields is covered in
1011    //! `nixfleet_reconciler::planner_gates::tests`. The
1012    //! `outstanding_failing_enforce_probes` projection gets its own
1013    //! end-to-end test in 9b once the writer side lands.
1014
1015    use super::*;
1016    use crate::db::Db;
1017    use chrono::Utc;
1018    use nixfleet_proto::testing::FleetBuilder;
1019    use nixfleet_reconciler::verify::Verified;
1020
1021    fn fresh_db() -> Arc<Db> {
1022        let db = Db::open_in_memory().expect("open in-memory db");
1023        db.migrate().expect("apply migrations");
1024        Arc::new(db)
1025    }
1026
1027    fn empty_manifests() -> SignedManifestSet {
1028        let fleet = FleetBuilder::new().build();
1029        SignedManifestSet {
1030            fleet: Verified::unverified_for_tests(fleet, Utc::now()),
1031            rollouts: HashMap::new(),
1032        }
1033    }
1034
1035    #[test]
1036    fn outstanding_failing_enforce_probes_empty_in_9a() {
1037        let db = fresh_db();
1038        let manifests = empty_manifests();
1039        let fs = build_fleet_state(&db, &manifests).expect("build_fleet_state");
1040        assert!(
1041            fs.outstanding_failing_enforce_probes.is_empty(),
1042            "9a: probe_failures is unwritten ⇒ projection must be empty (got {:?})",
1043            fs.outstanding_failing_enforce_probes,
1044        );
1045    }
1046}