nixfleet_control_plane/runtime/mod.rs
1//! CP runtime: MPSC reducer loop + applier + workers (RFC-0006 §7.2).
2//!
3//! Architecture
4//! ============
5//!
6//! ```text
7//! manifest_poll ─┐
8//! event_ingest ─┼──▶ mpsc::Sender<ReducerInput> ───▶ reducer task ───▶ applier
9//! heartbeat_rx ─┤ │
10//! dispatch lp ─┘ ▼
11//! (step + plan_next)
12//! ```
13//!
14//! Invariants (do not violate without an RFC amendment):
15//!
16//! 1. **One MPSC, one mutator.** The reducer task is the only thing that
17//! calls [`nixfleet_state_machine::step`] or
18//! [`nixfleet_reconciler::planner::plan_next`]. Workers emit
19//! [`ReducerInput`] values; the applier executes the resulting effects.
20//! A second writer "for performance" is the defect class that the v0.2
21//! fold is folding away — don't reintroduce it.
22//!
23//! 2. **CP mirror runs the same `step()` as the agent.** Identical
24//! transitions for `Local*` and `Remote*` event pairs by construction
25//! (RFC-0006 §2 principle 4). No "leaner" CP-side reimplementation.
26//!
27//! 3. **Shutdown via oneshot drop.** The reducer task owns a vector of
28//! [`tokio::sync::oneshot::Sender<()>`]; each worker holds the matching
29//! [`ShutdownToken`] (wrapping a `Receiver`). Reducer exit drops every
30//! sender, every worker's `select!` shutdown arm fires, workers exit
31//! cleanly. No leaks if a worker hangs on its own — the `JoinHandle`
32//! drain in `server::mod` aborts past the deadline.
33//!
34//! 4. **Bounded channels.** The input MPSC and any internal applier queues
35//! have explicit `bounded` capacities with a sizing rationale in a
36//! comment — never `unbounded`, never "let's start at 4096 and see."
37
38use std::sync::Arc;
39
40use chrono::{DateTime, Utc};
41use nixfleet_proto::clock::ClockHandle;
42use nixfleet_state_machine::Event;
43use tokio::sync::{mpsc, oneshot};
44use tokio::task::JoinHandle;
45use tokio_util::sync::CancellationToken;
46
47use crate::server::AppState;
48
49pub mod applier;
50pub mod event_log_writer;
51mod reducer;
52pub mod workers;
53
54pub use event_log_writer::{EVENT_LOG_CHANNEL_CAPACITY, EventLogTx};
55
56/// Input channel depth. Sized for ~512 in-flight events plus headroom for
57/// burst: a 256-host fleet, ~2 events/host during peak rollout activation,
58/// rounded to power-of-two and doubled. Backpressure surfaces at HTTP
59/// handlers via `try_send` → 503 (caller retries), preserving the pull-only
60/// agent contract (RFC-0005 §2.1 — CP never pushes, agents always retry).
61const REDUCER_INPUT_CAPACITY: usize = 1024;
62
63/// Reducer-task inputs. Workers and HTTP route handlers obtain a
64/// `mpsc::Sender<ReducerInput>` from the [`RuntimeHandle`] and emit values
65/// of this type. Variants partition by the reducer action they drive:
66///
67/// - [`ReducerInput::HostEvent`] — call `step()` on the CP-mirror per-host
68/// state for `(host, rollout_id)`, run applier on resulting effects.
69/// - [`ReducerInput::ManifestSetUpdated`] — refresh the cached
70/// `SignedManifestSet` and run `plan_next()`.
71/// - [`ReducerInput::HeartbeatReceived`] — touch `last_heartbeat_at`; if
72/// the agent's `current_closure` disagrees with the CP mirror, the reply
73/// contains the last-known-seq for `Replay-From` (RFC-0005 §4.3).
74/// - [`ReducerInput::PlanTick`] — safety-net periodic re-run of
75/// `plan_next()` in case a manifest update was missed by a poller crash.
76pub enum ReducerInput {
77 HostEvent {
78 host: String,
79 rollout_id: nixfleet_proto::RolloutId,
80 event: Event,
81 },
82 ManifestSetUpdated(Box<nixfleet_reconciler::planner_types::SignedManifestSet>),
83 HeartbeatReceived {
84 host: String,
85 rollout_id: Option<nixfleet_proto::RolloutId>,
86 current_closure: Option<String>,
87 at: DateTime<Utc>,
88 reply: oneshot::Sender<HeartbeatReply>,
89 },
90 PlanTick,
91}
92
93/// Heartbeat reply: drift detected ⇒ `replay_from` is `Some(last_known_seq)`;
94/// agent should re-POST events from that seq onward (RFC-0005 §4.3).
95/// `bootstrap_rollouts` (LIFT #3) carries CP's view of active rollouts the
96/// agent should rehydrate when its reducer was lost (boot-recovery shape;
97/// heartbeat carried `rollout_id = None` but CP holds non-terminal records
98/// for the host).
99#[derive(Debug, Clone)]
100pub struct HeartbeatReply {
101 pub replay_from: Option<u64>,
102 pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
103}
104
105/// Shutdown signal handed to a worker at spawn time. Workers select! between
106/// their input source and `&mut self.0`; reducer-task exit drops the matched
107/// `Sender<()>` and the receiver resolves with `Err(RecvError)`. Workers
108/// treat any resolution as "shutdown" and exit cleanly.
109pub struct ShutdownToken(oneshot::Receiver<()>);
110
111impl ShutdownToken {
112 /// Unwrap to the inner `Receiver<()>` for direct use in `select!`. The
113 /// idiomatic pattern is:
114 ///
115 /// ```ignore
116 /// let mut shutdown = token.into_inner();
117 /// loop {
118 /// tokio::select! {
119 /// _ = &mut shutdown => return,
120 /// msg = input.recv() => { ... }
121 /// }
122 /// }
123 /// ```
124 pub fn into_inner(self) -> oneshot::Receiver<()> {
125 self.0
126 }
127}
128
129/// Handle returned by [`spawn`]. Holds the input channel sender (cloneable
130/// for HTTP route handlers) and the reducer-task `JoinHandle`. The handle
131/// must outlive every cloned sender, otherwise the reducer task exits
132/// prematurely.
133pub struct RuntimeHandle {
134 pub input_tx: mpsc::Sender<ReducerInput>,
135 /// Cloneable handle to the event_log writer task. Route handlers and
136 /// the reducer-task applier send `EventLogEntry` values here; the
137 /// dedicated writer task drains and persists. See `event_log_writer`
138 /// for the backpressure rationale.
139 pub event_log_tx: EventLogTx,
140 pub reducer_handle: JoinHandle<()>,
141 pub event_log_writer_handle: JoinHandle<()>,
142 pub worker_handles: Vec<JoinHandle<()>>,
143}
144
145impl RuntimeHandle {
146 /// All background tasks owned by this runtime. The CP `server::serve`
147 /// drains these as part of the shutdown protocol (`drain_background_tasks`).
148 pub fn into_join_handles(self) -> Vec<JoinHandle<()>> {
149 let mut handles = self.worker_handles;
150 handles.push(self.reducer_handle);
151 handles.push(self.event_log_writer_handle);
152 handles
153 }
154}
155
156/// Spawn the reducer + worker constellation. Returns a [`RuntimeHandle`]
157/// the caller wires into the server shutdown drain.
158///
159/// `clock` and `state` are the only external dependencies. Workers
160/// obtain everything else (DB handles, manifest cache, etc.) from `state`.
161pub fn spawn(cancel: CancellationToken, state: Arc<AppState>, clock: ClockHandle) -> RuntimeHandle {
162 let (input_tx, input_rx) = mpsc::channel::<ReducerInput>(REDUCER_INPUT_CAPACITY);
163 let (event_log_tx, event_log_rx) =
164 mpsc::channel::<crate::db::event_log::EventLogEntry>(EVENT_LOG_CHANNEL_CAPACITY);
165
166 // One oneshot per child task. Reducer task takes ownership of the
167 // senders in a `ShutdownGuard`; on reducer exit the guard drops, every
168 // sender drops, every child task's `select!` shutdown arm fires.
169 // Two tokens: 1 manifest-poll worker + 1 event-log writer. The HTTP
170 // route handlers (events / heartbeat / dispatch long-poll) write
171 // `ReducerInput` directly to `input_tx`; they coordinate shutdown
172 // through axum-server's graceful-drain protocol, not through this
173 // constellation (Phase 8f deleted the stub workers).
174 const SHUTDOWN_TOKEN_COUNT: usize = 2;
175 let mut shutdown_senders: Vec<oneshot::Sender<()>> = Vec::new();
176 let mut shutdown_tokens: Vec<ShutdownToken> = Vec::new();
177 for _ in 0..SHUTDOWN_TOKEN_COUNT {
178 let (tx, rx) = oneshot::channel::<()>();
179 shutdown_senders.push(tx);
180 shutdown_tokens.push(ShutdownToken(rx));
181 }
182 // Pop in reverse so each consumer gets a distinct token.
183 let event_log_writer_shutdown = shutdown_tokens
184 .pop()
185 .expect("SHUTDOWN_TOKEN_COUNT tokens allocated");
186 let manifest_poll_shutdown = shutdown_tokens
187 .pop()
188 .expect("SHUTDOWN_TOKEN_COUNT tokens allocated");
189
190 let worker_handles = vec![workers::manifest_poll::spawn(
191 state.clone(),
192 clock.clone(),
193 input_tx.clone(),
194 manifest_poll_shutdown,
195 )];
196
197 let event_log_writer_handle = match state.db.clone() {
198 Some(db) => event_log_writer::spawn(db, event_log_rx, event_log_writer_shutdown),
199 None => spawn_drain_only_writer(event_log_rx, event_log_writer_shutdown),
200 };
201
202 let reducer_handle = tokio::spawn({
203 let event_log_tx = event_log_tx.clone();
204 async move {
205 reducer::run(
206 cancel,
207 state,
208 clock,
209 input_rx,
210 event_log_tx,
211 shutdown_senders,
212 )
213 .await
214 }
215 });
216
217 RuntimeHandle {
218 input_tx,
219 event_log_tx,
220 reducer_handle,
221 event_log_writer_handle,
222 worker_handles,
223 }
224}
225
226/// Fallback writer when CP runs in in-memory mode (no `db_path` flag). Just
227/// drains entries silently so the channel never fills up. Production paths
228/// always configure a DB.
229fn spawn_drain_only_writer(
230 mut rx: mpsc::Receiver<crate::db::event_log::EventLogEntry>,
231 shutdown: ShutdownToken,
232) -> JoinHandle<()> {
233 tokio::spawn(async move {
234 let mut shutdown_rx = shutdown.into_inner();
235 loop {
236 tokio::select! {
237 _ = &mut shutdown_rx => return,
238 maybe = rx.recv() => {
239 if maybe.is_none() { return; }
240 }
241 }
242 }
243 })
244}