nixfleet_control_plane/runtime/reducer.rs
1//! Reducer task body: the single mutator of CP-mirror state.
2//!
3//! Owns the cached `SignedManifestSet`, the per-channel quarantine set,
4//! and (transiently) the per-rollout `HostRolloutState` map it loads from
5//! the DB. Calls into `nixfleet_state_machine::step` on every `HostEvent`
6//! input, `nixfleet_reconciler::plan_next` on `ManifestSetUpdated` /
7//! `PlanTick`, and the applier (`apply_plan_action`/`apply_effect`) for
8//! every output.
9//!
10//! Invariant 1 from `runtime::mod`: this is the only place that calls
11//! `step()` or `plan_next()`. Workers and HTTP route handlers emit
12//! `ReducerInput` values; the applier executes the produced effects.
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18use chrono::{DateTime, Utc};
19use nixfleet_proto::RolloutPolicy;
20use nixfleet_proto::clock::ClockHandle;
21use nixfleet_reconciler::planner_types::{
22 FleetState, HostId, QuarantineSet, RolloutId, RolloutSummary, SignedManifestSet,
23};
24use nixfleet_state_machine::HostRolloutState;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27
28use super::applier::{ApplierCtx, apply_effect, apply_plan_action};
29use super::event_log_writer::EventLogTx;
30use super::{HeartbeatReply, ReducerInput};
31use crate::db::Db;
32use crate::server::AppState;
33
34/// Safety-net replan cadence. Triggers `plan_next` even when no event
35/// arrives — covers manifest_poll crashes mid-cycle, missed kicks, etc.
36const PLAN_TICK_INTERVAL: Duration = Duration::from_secs(15);
37
38pub(super) async fn run(
39 cancel: CancellationToken,
40 state: Arc<AppState>,
41 clock: ClockHandle,
42 mut input_rx: mpsc::Receiver<ReducerInput>,
43 event_log_tx: EventLogTx,
44 shutdown_senders: Vec<oneshot::Sender<()>>,
45) {
46 let _shutdown_guard = ShutdownGuard(shutdown_senders);
47
48 let mut reducer_state = ReducerState {
49 manifests: None,
50 quarantines: QuarantineSet::new(),
51 last_heartbeat_at: HashMap::new(),
52 };
53
54 let mut plan_ticker = tokio::time::interval(PLAN_TICK_INTERVAL);
55 plan_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
56
57 loop {
58 tokio::select! {
59 _ = cancel.cancelled() => {
60 tracing::info!(target: "shutdown", task = "cp_reducer", "task shut down");
61 return;
62 }
63 maybe_input = input_rx.recv() => {
64 let Some(input) = maybe_input else { return };
65 handle_input(&state, &clock, &event_log_tx, &mut reducer_state, input).await;
66 }
67 _ = plan_ticker.tick() => {
68 run_plan(&state, &clock, &event_log_tx, &reducer_state).await;
69 }
70 }
71 }
72}
73
74/// Reducer-task-private state. The DB-backed CP-mirror lives in
75/// `host_rollout_records`; this struct holds only the data the
76/// reducer can't or shouldn't go back to SQLite for on every input.
77struct ReducerState {
78 manifests: Option<SignedManifestSet>,
79 quarantines: QuarantineSet,
80 /// `last_heartbeat_at[hostname]` — in-memory only. Lost on CP restart;
81 /// the agent's next heartbeat re-seeds within seconds. Operator-
82 /// observable freshness, not gate input.
83 last_heartbeat_at: HashMap<String, DateTime<Utc>>,
84}
85
86async fn handle_input(
87 state: &Arc<AppState>,
88 clock: &ClockHandle,
89 event_log_tx: &EventLogTx,
90 rs: &mut ReducerState,
91 input: ReducerInput,
92) {
93 match input {
94 ReducerInput::HostEvent {
95 host,
96 rollout_id,
97 event,
98 } => {
99 handle_host_event(state, clock, event_log_tx, rs, &host, &rollout_id, event).await;
100 }
101 ReducerInput::ManifestSetUpdated(set) => {
102 rs.manifests = Some(*set);
103 run_plan(state, clock, event_log_tx, rs).await;
104 }
105 ReducerInput::HeartbeatReceived {
106 host,
107 rollout_id,
108 current_closure,
109 at,
110 reply,
111 } => {
112 handle_heartbeat(
113 state,
114 clock,
115 event_log_tx,
116 rs,
117 &host,
118 rollout_id,
119 current_closure,
120 at,
121 reply,
122 )
123 .await;
124 }
125 ReducerInput::PlanTick => {
126 run_plan(state, clock, event_log_tx, rs).await;
127 }
128 }
129}
130
131async fn handle_host_event(
132 state: &Arc<AppState>,
133 clock: &ClockHandle,
134 event_log_tx: &EventLogTx,
135 rs: &mut ReducerState,
136 host: &str,
137 rollout_id: &nixfleet_proto::RolloutId,
138 event: nixfleet_state_machine::Event,
139) {
140 let Some(db) = state.db.as_ref() else {
141 tracing::warn!(
142 target: "cp_reducer",
143 host, rollout_id = %rollout_id,
144 "HostEvent: no DB attached; skipping",
145 );
146 return;
147 };
148
149 // Load current state. Absence is legal — first event from an agent on
150 // a fresh rollout. The state machine's transitions starting from
151 // Pending require an existing record though; if absent, we drop +
152 // log so the operator sees a "saw event for unknown host" alert.
153 let prior = match db.host_rollout_records().load(rollout_id.as_str(), host) {
154 Ok(Some(s)) => s,
155 Ok(None) => {
156 tracing::warn!(
157 target: "cp_reducer",
158 host, rollout_id = %rollout_id,
159 "HostEvent: no host_rollout_records row; dropping (planner-side OpenRollout creates Pending records — was this an out-of-order arrival?)",
160 );
161 return;
162 }
163 Err(err) => {
164 tracing::error!(
165 target: "cp_reducer",
166 host, rollout_id = %rollout_id,
167 error = %err,
168 "HostEvent: load failed",
169 );
170 return;
171 }
172 };
173
174 // Dedup: drop seq <= last_event_seq. The events route does the same
175 // check, but a race between two simultaneous POSTs can have both pass
176 // the route check before either updates the DB.
177 let incoming_seq = event.seq();
178 if incoming_seq <= prior.last_event_seq {
179 tracing::debug!(
180 target: "cp_reducer",
181 host, rollout_id = %rollout_id,
182 incoming_seq,
183 last_event_seq = prior.last_event_seq,
184 "HostEvent: duplicate seq; dropping",
185 );
186 return;
187 }
188
189 let Some(manifests) = rs.manifests.as_ref() else {
190 tracing::warn!(
191 target: "cp_reducer",
192 host, rollout_id = %rollout_id,
193 "HostEvent: no cached SignedManifestSet; cannot resolve RolloutPolicy. Dropping (manifest_poll will warm the cache and a retry will succeed).",
194 );
195 return;
196 };
197 let policy = match resolve_policy(manifests, &prior.channel) {
198 Some(p) => p.clone(),
199 None => {
200 tracing::warn!(
201 target: "cp_reducer",
202 host, rollout_id = %rollout_id,
203 channel = %prior.channel,
204 "HostEvent: rollout_policy not found in cached manifests; dropping",
205 );
206 return;
207 }
208 };
209
210 let now = clock.now();
211 let prior_host_state = prior.state;
212 let (mut next_state, effects) = match nixfleet_state_machine::step(prior, event, now, &policy) {
213 Ok(out) => out,
214 Err(err) => {
215 tracing::warn!(
216 target: "cp_reducer",
217 host, rollout_id = %rollout_id,
218 error = %err,
219 "HostEvent: step() rejected — illegal transition or out-of-order event",
220 );
221 return;
222 }
223 };
224 next_state.last_event_seq = incoming_seq;
225 let next_host_state = next_state.state;
226
227 if let Err(err) = db.host_rollout_records().upsert(&next_state) {
228 tracing::error!(
229 target: "cp_reducer",
230 host, rollout_id = %rollout_id,
231 error = %err,
232 "HostEvent: host_rollout_records upsert failed",
233 );
234 return;
235 }
236
237 let ctx = ApplierCtx {
238 state,
239 manifests,
240 clock,
241 event_log_tx,
242 };
243 for effect in effects {
244 apply_effect(&ctx, effect).await;
245 }
246
247 // RFC-0008 §7 reducer composition: per-host transitions feed the
248 // rollout reducer as `HostStateChanged`; aggregate signals (e.g.,
249 // "all hosts in this rollout reached Converged") are computed
250 // applier-side from `host_rollout_records` and emitted as
251 // `RolloutTerminal`.
252 if prior_host_state != next_host_state {
253 super::applier::process_rollout_event(
254 &ctx,
255 db,
256 now,
257 nixfleet_state_machine::rollout::RolloutEvent::HostStateChanged {
258 rollout_id: rollout_id.clone(),
259 host_id: host.to_string(),
260 from: prior_host_state,
261 to: next_host_state,
262 at: now,
263 },
264 )
265 .await;
266
267 // Terminal aggregate: if every host in this rollout has reached
268 // Converged, emit `RolloutTerminal` so the rollout reducer
269 // transitions to Terminal (RFC-0008 §3 invariant: "Terminal ⇒
270 // ∀ host ∈ rollout: state == Converged").
271 if next_host_state == nixfleet_state_machine::HostState::Converged
272 && let Ok(rows) = db
273 .host_rollout_records()
274 .all_for_rollout(rollout_id.as_str())
275 && !rows.is_empty()
276 && rows
277 .iter()
278 .all(|r| r.state == nixfleet_state_machine::HostState::Converged)
279 {
280 super::applier::process_rollout_event(
281 &ctx,
282 db,
283 now,
284 nixfleet_state_machine::rollout::RolloutEvent::RolloutTerminal {
285 rollout_id: rollout_id.clone(),
286 at: now,
287 },
288 )
289 .await;
290 }
291 }
292}
293
294// Threads the same reducer-task dependencies (state, clock,
295// event_log_tx, rs) as handle_host_event, plus the heartbeat envelope
296// (host, rollout_id, current_closure, at, reply). Refactoring to a
297// context struct would obscure the call site at handle_input where the
298// reducer dispatches inputs. The lint is fine to suppress here.
299#[allow(clippy::too_many_arguments)]
300async fn handle_heartbeat(
301 state: &Arc<AppState>,
302 clock: &ClockHandle,
303 event_log_tx: &EventLogTx,
304 rs: &mut ReducerState,
305 host: &str,
306 rollout_id: Option<nixfleet_proto::RolloutId>,
307 current_closure: Option<String>,
308 at: DateTime<Utc>,
309 reply: oneshot::Sender<HeartbeatReply>,
310) {
311 rs.last_heartbeat_at.insert(host.to_string(), at);
312
313 // Boot-recovery retroactive confirmation (RFC-0005 §9.5).
314 // Closes the "agent restart mid-Activating leaves CP forever stuck
315 // at Activating" defect. The flow: an agent's
316 // `nixfleet-agent.service` restart kills the in-flight verify_poll
317 // before it can emit LocalActivationCompleted. The new agent's
318 // boot-recovery handshake reports `current_closure` (read from
319 // /run/current-system) but no rollout_id, so the steady-state
320 // replay_from path can't match. Here we scan active
321 // host_rollout_records for this hostname; if any record's
322 // target_closure matches the agent's current_closure AND state is
323 // Activating or Deferred, we synthesize
324 // `Event::RemoteActivationCompleted` and feed it through
325 // `handle_host_event` — same path the wire-borne version takes. CP
326 // transitions Activating/Deferred → Soaking, populates
327 // activation_completed_at, the planner unblocks, the cascade
328 // continues. Recovery.rs:45-51 documented this design intent ("CP
329 // synthesises an ActivationCompleted-shaped Replay-From event").
330 //
331 // Synthesis runs BEFORE bootstrap + reply: the bootstrap reflects
332 // post-synthesis state (e.g. Soaking, not Activating). The reducer
333 // is single-threaded so the read-modify-read is race-free;
334 // synthesis is in-process and well under the route's
335 // REDUCER_REPLY_TIMEOUT.
336 if let Some(agent_current) = current_closure.as_deref() {
337 maybe_synthesize_recovery_completion(
338 state,
339 clock,
340 event_log_tx,
341 rs,
342 host,
343 agent_current,
344 at,
345 )
346 .await;
347 }
348
349 let replay_from = compute_replay_from(
350 state,
351 host,
352 rollout_id.as_ref().map(|r| r.as_str()),
353 current_closure.as_deref(),
354 );
355
356 // When the agent's heartbeat carried no rollout_id (the
357 // boot-recovery shape — agent's reducer is empty post-restart),
358 // but CP holds non-terminal records for the host, build a
359 // bootstrap snapshot per record. The agent's runtime applies each
360 // snapshot to its in-memory HostRolloutState before workers spawn,
361 // restoring the cache so probe runners + advance-ticker resume
362 // work post-restart. Steady-state heartbeats (rollout_id
363 // populated) skip this — the agent's reducer already knows.
364 let bootstrap_rollouts = if rollout_id.is_none() {
365 build_bootstrap_for_host(state, host)
366 } else {
367 Vec::new()
368 };
369
370 let _ = reply.send(HeartbeatReply {
371 replay_from,
372 bootstrap_rollouts,
373 });
374}
375
376/// Scan active records for `host` and produce a
377/// `HostRolloutSnapshot` per record. Called only on
378/// boot-recovery-shaped heartbeats (rollout_id=None). Order is
379/// deterministic by (rollout_id, hostname) PK in SQL; the agent
380/// applies them in arrival order.
381fn build_bootstrap_for_host(
382 state: &Arc<AppState>,
383 host: &str,
384) -> Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot> {
385 let Some(db) = state.db.as_ref() else {
386 return Vec::new();
387 };
388 let records = match db.host_rollout_records().active_for_host(host) {
389 Ok(r) => r,
390 Err(err) => {
391 tracing::warn!(
392 target: "cp_reducer",
393 host,
394 error = %err,
395 "bootstrap build: active_for_host load failed; returning empty",
396 );
397 return Vec::new();
398 }
399 };
400 records
401 .into_iter()
402 .map(host_rollout_state_to_snapshot)
403 .collect()
404}
405
406fn host_rollout_state_to_snapshot(
407 record: nixfleet_state_machine::HostRolloutState,
408) -> nixfleet_proto::agent_wire::HostRolloutSnapshot {
409 use nixfleet_proto::HostRolloutState as WireState;
410 use nixfleet_state_machine::HostState;
411 let wire_state = match record.state {
412 HostState::Pending => WireState::Pending,
413 HostState::Activating => WireState::Activating,
414 HostState::Deferred => WireState::Deferred,
415 HostState::Soaking => WireState::Soaking,
416 HostState::Converged => WireState::Converged,
417 HostState::Failed => WireState::Failed,
418 HostState::Reverted => WireState::Reverted,
419 };
420 nixfleet_proto::agent_wire::HostRolloutSnapshot {
421 rollout_id: record.rollout_id,
422 hostname: record.hostname,
423 channel: record.channel,
424 state: wire_state,
425 target_closure: record.target_closure,
426 current_closure_at_dispatch: record.current_closure_at_dispatch,
427 current_closure: record.current_closure,
428 dispatched_at: record.dispatched_at,
429 dispatch_acked_at: record.dispatch_acked_at,
430 activation_started_at: record.activation_started_at,
431 activation_completed_at: record.activation_completed_at,
432 soak_due_at: record.soak_due_at,
433 last_event_seq: record.last_event_seq,
434 }
435}
436
437/// Scan active host_rollout_records for `host`; for each record whose
438/// observed closure on the wire identifies a missed transition,
439/// synthesize the event chain that advances the row to a state
440/// consistent with the agent's reality. Idempotent: if the state has
441/// already advanced (e.g. concurrent agent emit), the record won't
442/// match and the synthesis is a no-op.
443///
444/// LOADBEARING: this is the CP-side half of architecture.md §305
445/// acceptance gate 1 ("destroying the CP database and rebuilding
446/// from empty state results in full fleet visibility within one
447/// reconcile cycle, with zero operator intervention beyond restarting
448/// the service"). The agent's heartbeat carries `current_closure` on
449/// every tick; CP rebuilds soft-state HRR rows from those inputs.
450///
451/// Four reachable recovery cases:
452/// - `Activating` + `current == target`: agent restarted
453/// mid-rollout, post-boot observed the activation took.
454/// Synthesise `RemoteActivationCompleted`.
455/// - `Deferred` + `current == target`: operator rebooted to
456/// finish a critical-component activation. Same synthesis.
457/// - `Pending` + `current == target`: CP itself was wiped, the
458/// planner re-opened the rollout in `Pending`, but the agent has
459/// been running the target closure all along. Synthesise the
460/// full `RemoteDispatchAck → RemoteActivationCompleted →
461/// RemoteConverged` chain. `RemoteConverged`'s soak-elapsed
462/// invariant is satisfied by stamping `converged_at = max(at,
463/// record.soak_due_at)` — the soak window's purpose (give probes
464/// time to fail) was exercised pre-wipe. Gated by
465/// `evaluate_synth_gates` so a fleet bump that opens a Pending
466/// row for a host whose closure already matches doesn't bypass
467/// channel-edges or wave-promotion ordering.
468/// - `Failed` + `current == current_closure_at_dispatch`: the
469/// rollback's switch-to-configuration restarted the agent
470/// mid-VerifyPoll, dropping `LocalRollbackCompleted`. Synthesise
471/// `RemoteRollbackComplete` on the heartbeat that observes the
472/// rolled-back closure; the canonical `Failed` reducer arm
473/// (`failed.rs::RemoteRollbackComplete`) produces the quarantine
474/// + event_log + transition effects.
475async fn maybe_synthesize_recovery_completion(
476 state: &Arc<AppState>,
477 clock: &ClockHandle,
478 event_log_tx: &EventLogTx,
479 rs: &mut ReducerState,
480 host: &str,
481 agent_current: &str,
482 at: DateTime<Utc>,
483) {
484 let Some(db) = state.db.as_ref() else {
485 return;
486 };
487 let records = match db.host_rollout_records().active_for_host(host) {
488 Ok(r) => r,
489 Err(err) => {
490 tracing::warn!(
491 target: "cp_reducer",
492 host,
493 error = %err,
494 "boot-recovery synthesis: active_for_host load failed; skipping",
495 );
496 return;
497 }
498 };
499 for record in records {
500 let rollout_id = record.rollout_id.clone();
501 match record.state {
502 nixfleet_state_machine::HostState::Activating
503 | nixfleet_state_machine::HostState::Deferred
504 if record.target_closure == agent_current =>
505 {
506 tracing::info!(
507 target: "cp_reducer",
508 host,
509 rollout_id = %record.rollout_id,
510 target = %record.target_closure,
511 prior_state = ?record.state,
512 "post-restart recovery: synthesizing RemoteActivationCompleted (RFC-0005 §9.5)",
513 );
514 let synth_event = nixfleet_state_machine::Event::RemoteActivationCompleted {
515 observed_current_closure: agent_current.to_string(),
516 exit_code: 0,
517 completed_at: at,
518 seq: record.last_event_seq + 1,
519 };
520 handle_host_event(
521 state,
522 clock,
523 event_log_tx,
524 rs,
525 host,
526 &rollout_id,
527 synth_event,
528 )
529 .await;
530 }
531 nixfleet_state_machine::HostState::Pending
532 if record.target_closure == agent_current =>
533 {
534 // LOADBEARING: consult the dispatch gates before
535 // synthesising the chain. The agent's heartbeat
536 // reporting `current_closure == target_closure`
537 // legitimately means "I've been on this closure all
538 // along" — but only safe to fast-forward to Converged
539 // when the gates that would have applied at dispatch
540 // are satisfied. Channel-edges in particular: a fleet
541 // bump that leaves a host's closure unchanged still
542 // opens a new Pending HRR for that host; without this
543 // gate check the synth races past `stable` Converging
544 // while its predecessor `edge` is still Active,
545 // silently bypassing the RFC-0002 §4.3 ordering
546 // contract. If any gate blocks, skip the synth — the
547 // host stays Pending and the next regular plan tick
548 // dispatches it when the gate clears.
549 let gate_block = match rs.manifests.as_ref() {
550 Some(manifests) => evaluate_synth_gates(
551 db,
552 manifests,
553 &rs.quarantines,
554 &record,
555 ),
556 None => None,
557 };
558 if let Some(reason) = gate_block {
559 tracing::info!(
560 target: "cp_reducer",
561 host,
562 rollout_id = %record.rollout_id,
563 gate_block = %reason,
564 "synth-converge held: dispatch gate would block; staying Pending until next plan tick",
565 );
566 continue;
567 }
568 synthesize_pending_to_converged(
569 state,
570 clock,
571 event_log_tx,
572 rs,
573 &record,
574 agent_current,
575 at,
576 )
577 .await;
578 }
579 nixfleet_state_machine::HostState::Failed
580 if record.current_closure_at_dispatch.as_deref() == Some(agent_current) =>
581 {
582 // Agent rollback-ack is non-durable: the rollback's
583 // switch-to-configuration restarts the agent
584 // mid-VerifyPoll, SIGTERMing it before
585 // `LocalRollbackCompleted` can be emitted. The new
586 // agent PID comes up on the rolled-back closure with
587 // no memory of the pending event; the rollback
588 // completed operationally but CP keeps the HRR in
589 // `Failed` indefinitely. Detect this from the
590 // post-restart heartbeat (current_closure ==
591 // current_closure_at_dispatch, i.e. the closure the
592 // agent ran before the failed activation = the
593 // rollback target) and synthesise the missing
594 // `RemoteRollbackComplete`. The reducer's `Failed`
595 // arm in `failed.rs` produces the canonical effects:
596 // `RemoteInsertQuarantine` for the bad closure +
597 // `RemoteAppendEventLog` for the audit trail + the
598 // `Failed → Reverted` state transition.
599 //
600 // No gates apply: the agent's rollback decision was
601 // policy-driven from the signed manifest's
602 // `onHealthFailure`, not gate-driven, so there's
603 // nothing for CP to second-guess on synthesis.
604 tracing::info!(
605 target: "cp_reducer",
606 host,
607 rollout_id = %record.rollout_id,
608 reverted_to = %agent_current,
609 "post-rollback-restart recovery: synthesizing RemoteRollbackComplete",
610 );
611 let synth_event = nixfleet_state_machine::Event::RemoteRollbackComplete {
612 reverted_to_closure: agent_current.to_string(),
613 exit_code: 0,
614 completed_at: at,
615 seq: record.last_event_seq + 1,
616 };
617 handle_host_event(
618 state,
619 clock,
620 event_log_tx,
621 rs,
622 host,
623 &rollout_id,
624 synth_event,
625 )
626 .await;
627 }
628 _ => continue,
629 }
630 }
631}
632
633/// Run the dispatch-gate evaluation against a Pending HRR record so
634/// `maybe_synthesize_recovery_completion` can refuse to fast-forward
635/// Pending → Converged when an active gate (channel-edges,
636/// wave-promotion, quarantine, etc.) would still hold the host.
637///
638/// Returns `Some(reason)` when a gate blocks; the caller leaves the
639/// host in Pending and lets the next regular plan tick handle it.
640fn evaluate_synth_gates(
641 db: &Arc<Db>,
642 manifests: &SignedManifestSet,
643 quarantines: &QuarantineSet,
644 record: &nixfleet_state_machine::HostRolloutState,
645) -> Option<String> {
646 let fleet_state = build_fleet_state(db, manifests).ok()?;
647 let host_id: HostId = record.hostname.clone();
648 let channel: nixfleet_reconciler::planner_types::ChannelId =
649 record.channel.clone();
650 let target: nixfleet_reconciler::planner_types::ClosureHash =
651 record.target_closure.clone();
652 nixfleet_reconciler::planner_gates::evaluate_for_dispatch(
653 &fleet_state,
654 manifests,
655 quarantines,
656 &record.rollout_id,
657 &host_id,
658 &target,
659 &channel,
660 &std::collections::HashMap::new(),
661 )
662 .map(|b| format!("{b:?}"))
663}
664
665/// Drive a `Pending` HRR row through the full lifecycle to
666/// `Converged` when the agent reports `current_closure == target`.
667/// The chain preserves the event-log audit trail (RFC-0004 §1):
668/// every transition emits its usual `RemoteAppendEventLog` effect
669/// flagged with the synthesis context via the `seq` ordering relative
670/// to the pre-synthesis `last_event_seq`.
671///
672/// LOADBEARING: callers MUST first verify that the dispatch gates
673/// (channel-edges, wave-promotion, quarantine, …) wouldn't have
674/// blocked dispatch — see `evaluate_synth_gates`. Without the gate
675/// check, a fleet bump that leaves a host's closure unchanged
676/// silently fast-forwards the host to Converged on the very first
677/// post-bump heartbeat, bypassing the RFC-0002 §4.3 channel-edges
678/// ordering contract.
679async fn synthesize_pending_to_converged(
680 state: &Arc<AppState>,
681 clock: &ClockHandle,
682 event_log_tx: &EventLogTx,
683 rs: &mut ReducerState,
684 record: &nixfleet_state_machine::HostRolloutState,
685 agent_current: &str,
686 at: DateTime<Utc>,
687) {
688 let host = record.hostname.as_str();
689 let rollout_id = &record.rollout_id;
690 tracing::info!(
691 target: "cp_reducer",
692 host,
693 rollout_id = %rollout_id,
694 target = %record.target_closure,
695 "post-wipe recovery: synthesizing Pending → Converged chain (architecture.md §305)",
696 );
697
698 // 1. Pending → Activating. `current_closure_at_dispatch` is the
699 // pre-dispatch closure; CP has no way to know it post-wipe.
700 // Empty string is the documented placeholder — rollback never
701 // fires from a synthesis chain that lands at Converged (terminal),
702 // so the rollback-target ambiguity is inert.
703 let dispatch_ack = nixfleet_state_machine::Event::RemoteDispatchAck {
704 current_closure_at_dispatch: String::new(),
705 received_at: at,
706 seq: record.last_event_seq + 1,
707 };
708 handle_host_event(state, clock, event_log_tx, rs, host, rollout_id, dispatch_ack).await;
709
710 // 2. Activating → Soaking.
711 let activation_completed = nixfleet_state_machine::Event::RemoteActivationCompleted {
712 observed_current_closure: agent_current.to_string(),
713 exit_code: 0,
714 completed_at: at,
715 seq: record.last_event_seq + 2,
716 };
717 handle_host_event(
718 state,
719 clock,
720 event_log_tx,
721 rs,
722 host,
723 rollout_id,
724 activation_completed,
725 )
726 .await;
727
728 // 3. Soaking → Converged. `converged_at` is anchored to
729 // `soak_due_at` when the heartbeat arrives before soak has
730 // elapsed, so soaking.rs's `converged_at >= soak_due_at`
731 // invariant passes. The actual agent-side convergence happened
732 // pre-wipe; CP can't reconstruct that timestamp, so it stamps
733 // the post-wipe-floor instead.
734 let Some(db) = state.db.as_ref() else {
735 return;
736 };
737 let post_activation = match db.host_rollout_records().load(rollout_id.as_str(), host) {
738 Ok(Some(r)) => r,
739 _ => return,
740 };
741 let synth_converged_at = match post_activation.soak_due_at {
742 Some(soak_due) if soak_due > at => soak_due,
743 _ => at,
744 };
745 let converged = nixfleet_state_machine::Event::RemoteConverged {
746 converged_at: synth_converged_at,
747 current_closure: agent_current.to_string(),
748 seq: record.last_event_seq + 3,
749 };
750 handle_host_event(state, clock, event_log_tx, rs, host, rollout_id, converged).await;
751}
752
753fn compute_replay_from(
754 state: &Arc<AppState>,
755 host: &str,
756 rollout_id: Option<&str>,
757 current_closure: Option<&str>,
758) -> Option<u64> {
759 let db = state.db.as_ref()?;
760 let rollout_id = rollout_id?;
761 let agent_closure = current_closure?;
762 let record = db
763 .host_rollout_records()
764 .load(rollout_id, host)
765 .ok()
766 .flatten()?;
767
768 let cp_closure = record.current_closure.as_deref();
769 match cp_closure {
770 Some(cp) if cp == agent_closure => None,
771 // Drift: agent reports a closure CP didn't see acknowledged.
772 // Replay-From = CP's last_event_seq + 1 (the next seq CP hasn't
773 // seen yet); agent re-POSTs everything from there.
774 _ => Some(record.last_event_seq),
775 }
776}
777
778async fn run_plan(
779 state: &Arc<AppState>,
780 clock: &ClockHandle,
781 event_log_tx: &EventLogTx,
782 rs: &ReducerState,
783) {
784 let Some(manifests) = rs.manifests.as_ref() else {
785 // Cold start: the manifest_poll worker hasn't primed yet.
786 // Periodic ticks land here harmlessly until it does.
787 return;
788 };
789 let Some(db) = state.db.as_ref() else {
790 return;
791 };
792
793 let now = clock.now();
794 let mut fleet_state = match build_fleet_state(db, manifests) {
795 Ok(fs) => fs,
796 Err(err) => {
797 tracing::error!(
798 target: "cp_reducer",
799 error = %err,
800 "run_plan: FleetState construction failed; skipping plan tick",
801 );
802 return;
803 }
804 };
805
806 // Advance `rollouts.current_wave` for any rollout whose current
807 // wave has fully Converged. Must happen BEFORE plan_next so the
808 // wave_promotion gate sees the new value on the same tick (next
809 // wave's hosts go through immediately rather than waiting on the
810 // periodic safety-net replan).
811 advance_current_waves(db, manifests, &mut fleet_state).await;
812
813 let actions =
814 nixfleet_reconciler::planner::plan_next(manifests, &fleet_state, &rs.quarantines, now);
815
816 let ctx = ApplierCtx {
817 state,
818 manifests,
819 clock,
820 event_log_tx,
821 };
822 for action in actions {
823 apply_plan_action(&ctx, action).await;
824 }
825}
826
827/// Promote each rollout's `current_wave` when every host in the current
828/// wave has reached `HostState::Converged`. The check reads the host
829/// list from the verified manifest's `fleet.waves[channel][current_wave]`
830/// — the same path the wave_promotion gate uses for `host_wave`, so the
831/// bump and the gate stay in lock-step.
832///
833/// Empty waves are NOT auto-promoted. Same invariant as
834/// `planner::maybe_mark_terminal`: an empty wave that produced no host
835/// records vacuously satisfies "all converged"; treating that as
836/// promotion-eligible would walk the wave pointer past content that
837/// later arrives. Pin via the `has_any_host` guard.
838async fn advance_current_waves(
839 db: &Arc<Db>,
840 manifests: &SignedManifestSet,
841 fleet_state: &mut FleetState,
842) {
843 let fleet = manifests.fleet();
844 let mut bumps: Vec<(RolloutId, u32)> = Vec::new();
845 for (rollout_id, summary) in fleet_state.rollouts.iter() {
846 if summary.terminal_at.is_some() {
847 continue;
848 }
849 let Some(channel_waves) = fleet.waves.get(&summary.channel) else {
850 continue;
851 };
852 let current = summary.current_wave as usize;
853 if current + 1 >= channel_waves.len() {
854 // No next wave to promote into.
855 continue;
856 }
857 let Some(current_wave) = channel_waves.get(current) else {
858 continue;
859 };
860 if current_wave.hosts.is_empty() {
861 continue;
862 }
863 // LOADBEARING: a wave is "done participating" when every host
864 // is ordering-eligible — Converged OR Deferred (per
865 // RFC-0005 §3 terminal-for-ordering). Deferred means
866 // activation is staged but live-switch was skipped
867 // (critical-component swap pending reboot); the host has done
868 // what it can within the rollout step, so successor waves
869 // should not stall waiting on it. Health verification (probes
870 // + soak) still happens after operator reboot via
871 // `handle_heartbeat`'s recovery synthesis (Deferred → Soaking).
872 let all_ordering_eligible = current_wave.hosts.iter().all(|host| {
873 fleet_state
874 .host_states
875 .get(&(rollout_id.clone(), host.clone()))
876 .map(|s| {
877 matches!(
878 s.state,
879 nixfleet_state_machine::HostState::Converged
880 | nixfleet_state_machine::HostState::Deferred
881 )
882 })
883 .unwrap_or(false)
884 });
885 if all_ordering_eligible {
886 bumps.push((rollout_id.clone(), summary.current_wave + 1));
887 }
888 }
889
890 for (rollout_id, next_wave) in bumps {
891 // FK is populated by the rollout reducer's
892 // `RolloutEffect::UpdateCurrentWave`; the planner passes
893 // None per RFC-0008 §6.1 item 3.
894 match db
895 .rollouts()
896 .set_current_wave(rollout_id.as_str(), next_wave, None)
897 {
898 Ok(_) => {
899 if let Some(s) = fleet_state.rollouts.get_mut(&rollout_id) {
900 s.current_wave = next_wave;
901 }
902 tracing::info!(
903 target: "cp_reducer",
904 %rollout_id,
905 next_wave,
906 "advance_current_waves: bumped current_wave (every host in prior wave Converged)",
907 );
908 }
909 Err(err) => {
910 tracing::error!(
911 target: "cp_reducer",
912 %rollout_id,
913 next_wave,
914 error = %err,
915 "advance_current_waves: set_current_wave failed",
916 );
917 }
918 }
919 }
920}
921
922/// Build a fresh `FleetState` from the DB. Called on every plan tick;
923/// at v0.2 scale (≤256 hosts, ≤8 active rollouts) the SELECTs are
924/// negligible.
925fn build_fleet_state(db: &Arc<Db>, manifests: &SignedManifestSet) -> anyhow::Result<FleetState> {
926 let mut host_states: HashMap<(RolloutId, HostId), HostRolloutState> = HashMap::new();
927 let mut rollouts: HashMap<RolloutId, RolloutSummary> = HashMap::new();
928
929 // For each channel with a verified rollout manifest, load all
930 // host_rollout_records under the manifested rollout_id so the
931 // planner can walk Pending hosts and run gates. Channels whose
932 // rollout hasn't been opened yet (no DB row, no Pending records)
933 // surface through the OpenRollout emission path above.
934 for (channel, vm) in &manifests.rollouts {
935 let manifest = vm.inner();
936 // Canonical RolloutId construction (RFC-0008 §6.3): matches
937 // the planner's `RolloutId::new(channel, channel_ref)` so
938 // lookups by rollout_id succeed even when multiple channels
939 // share a channel_ref.
940 let rollout_id = nixfleet_proto::RolloutId::new(channel, &manifest.channel_ref);
941
942 let rows = db
943 .host_rollout_records()
944 .all_for_rollout(rollout_id.as_str())?;
945 for row in rows {
946 host_states.insert((rollout_id.clone(), row.hostname.clone()), row);
947 }
948
949 // RolloutSummary metadata. Full row from the `rollouts` table
950 // (RFC-0008 §6.3). Missing row ⇒ rollout not opened yet ⇒ omit
951 // from `rollouts` map (gates that require RolloutSummary for
952 // in-flight reasoning correctly see "not yet open" for this
953 // channel).
954 if let Ok(Some(row)) = db.rollouts().state(rollout_id.as_str()) {
955 rollouts.insert(
956 rollout_id.clone(),
957 RolloutSummary {
958 rollout_id: rollout_id.clone(),
959 channel: channel.clone(),
960 target_ref: manifest.channel_ref.clone(),
961 opened_at: row.opened_at,
962 terminal_at: row.terminal_at,
963 current_wave: row.current_wave,
964 budgets: manifest.disruption_budgets.clone(),
965 },
966 );
967 }
968 }
969
970 // Distinct outstanding enforce-mode probe failures per
971 // (rollout, host). Feeds the compliance_wave gate
972 // (planner_gates::compliance_wave) per RFC-0007 §7.2.
973 let outstanding_failing_enforce_probes = db
974 .probe_failures()
975 .outstanding_failing_enforce_probes_by_rollout()
976 .unwrap_or_else(|err| {
977 tracing::warn!(
978 target: "cp_reducer",
979 error = %err,
980 "build_fleet_state: outstanding_failing_enforce_probes query failed; \
981 falling back to empty map (compliance_wave gate inert this tick)",
982 );
983 HashMap::new()
984 });
985
986 Ok(FleetState {
987 host_states,
988 rollouts,
989 outstanding_failing_enforce_probes,
990 })
991}
992
993fn resolve_policy<'a>(
994 manifests: &'a SignedManifestSet,
995 channel: &str,
996) -> Option<&'a RolloutPolicy> {
997 let fleet = manifests.fleet();
998 let channel_entry = fleet.channels.get(channel)?;
999 fleet.rollout_policies.get(&channel_entry.rollout_policy)
1000}
1001
1002/// RAII container for per-worker shutdown senders. Holding it in scope
1003/// ensures workers receive shutdown signal exactly when this task exits.
1004struct ShutdownGuard(#[allow(dead_code)] Vec<oneshot::Sender<()>>);
1005
1006#[cfg(test)]
1007mod tests {
1008 //! Regression coverage for the FleetState builder.
1009 //!
1010 //! Per-gate behaviour on populated fields is covered in
1011 //! `nixfleet_reconciler::planner_gates::tests`. The
1012 //! `outstanding_failing_enforce_probes` projection gets its own
1013 //! end-to-end test in 9b once the writer side lands.
1014
1015 use super::*;
1016 use crate::db::Db;
1017 use chrono::Utc;
1018 use nixfleet_proto::testing::FleetBuilder;
1019 use nixfleet_reconciler::verify::Verified;
1020
1021 fn fresh_db() -> Arc<Db> {
1022 let db = Db::open_in_memory().expect("open in-memory db");
1023 db.migrate().expect("apply migrations");
1024 Arc::new(db)
1025 }
1026
1027 fn empty_manifests() -> SignedManifestSet {
1028 let fleet = FleetBuilder::new().build();
1029 SignedManifestSet {
1030 fleet: Verified::unverified_for_tests(fleet, Utc::now()),
1031 rollouts: HashMap::new(),
1032 }
1033 }
1034
1035 #[test]
1036 fn outstanding_failing_enforce_probes_empty_in_9a() {
1037 let db = fresh_db();
1038 let manifests = empty_manifests();
1039 let fs = build_fleet_state(&db, &manifests).expect("build_fleet_state");
1040 assert!(
1041 fs.outstanding_failing_enforce_probes.is_empty(),
1042 "9a: probe_failures is unwritten ⇒ projection must be empty (got {:?})",
1043 fs.outstanding_failing_enforce_probes,
1044 );
1045 }
1046}