nixfleet_control_plane/runtime/
mod.rs

1//! CP runtime: MPSC reducer loop + applier + workers (RFC-0006 §7.2).
2//!
3//! Architecture
4//! ============
5//!
6//! ```text
7//!   manifest_poll ─┐
8//!   event_ingest  ─┼──▶  mpsc::Sender<ReducerInput>  ───▶  reducer task ───▶ applier
9//!   heartbeat_rx  ─┤                                          │
10//!   dispatch lp   ─┘                                          ▼
11//!                                                       (step + plan_next)
12//! ```
13//!
14//! Invariants (do not violate without an RFC amendment):
15//!
16//! 1. **One MPSC, one mutator.** The reducer task is the only thing that
17//!    calls [`nixfleet_state_machine::step`] or
18//!    [`nixfleet_reconciler::planner::plan_next`]. Workers emit
19//!    [`ReducerInput`] values; the applier executes the resulting effects.
20//!    A second writer "for performance" is the defect class that the v0.2
21//!    fold is folding away — don't reintroduce it.
22//!
23//! 2. **CP mirror runs the same `step()` as the agent.** Identical
24//!    transitions for `Local*` and `Remote*` event pairs by construction
25//!    (RFC-0006 §2 principle 4). No "leaner" CP-side reimplementation.
26//!
27//! 3. **Shutdown via oneshot drop.** The reducer task owns a vector of
28//!    [`tokio::sync::oneshot::Sender<()>`]; each worker holds the matching
29//!    [`ShutdownToken`] (wrapping a `Receiver`). Reducer exit drops every
30//!    sender, every worker's `select!` shutdown arm fires, workers exit
31//!    cleanly. No leaks if a worker hangs on its own — the `JoinHandle`
32//!    drain in `server::mod` aborts past the deadline.
33//!
34//! 4. **Bounded channels.** The input MPSC and any internal applier queues
35//!    have explicit `bounded` capacities with a sizing rationale in a
36//!    comment — never `unbounded`, never "let's start at 4096 and see."
37
38use std::sync::Arc;
39
40use chrono::{DateTime, Utc};
41use nixfleet_proto::clock::ClockHandle;
42use nixfleet_state_machine::Event;
43use tokio::sync::{mpsc, oneshot};
44use tokio::task::JoinHandle;
45use tokio_util::sync::CancellationToken;
46
47use crate::server::AppState;
48
49pub mod applier;
50pub mod event_log_writer;
51mod reducer;
52pub mod workers;
53
54pub use event_log_writer::{EVENT_LOG_CHANNEL_CAPACITY, EventLogTx};
55
56/// Input channel depth. Sized for ~512 in-flight events plus headroom for
57/// burst: a 256-host fleet, ~2 events/host during peak rollout activation,
58/// rounded to power-of-two and doubled. Backpressure surfaces at HTTP
59/// handlers via `try_send` → 503 (caller retries), preserving the pull-only
60/// agent contract (RFC-0005 §2.1 — CP never pushes, agents always retry).
61const REDUCER_INPUT_CAPACITY: usize = 1024;
62
63/// Reducer-task inputs. Workers and HTTP route handlers obtain a
64/// `mpsc::Sender<ReducerInput>` from the [`RuntimeHandle`] and emit values
65/// of this type. Variants partition by the reducer action they drive:
66///
67/// - [`ReducerInput::HostEvent`] — call `step()` on the CP-mirror per-host
68///   state for `(host, rollout_id)`, run applier on resulting effects.
69/// - [`ReducerInput::ManifestSetUpdated`] — refresh the cached
70///   `SignedManifestSet` and run `plan_next()`.
71/// - [`ReducerInput::HeartbeatReceived`] — touch `last_heartbeat_at`; if
72///   the agent's `current_closure` disagrees with the CP mirror, the reply
73///   contains the last-known-seq for `Replay-From` (RFC-0005 §4.3).
74/// - [`ReducerInput::PlanTick`] — safety-net periodic re-run of
75///   `plan_next()` in case a manifest update was missed by a poller crash.
76pub enum ReducerInput {
77    HostEvent {
78        host: String,
79        rollout_id: nixfleet_proto::RolloutId,
80        event: Event,
81    },
82    ManifestSetUpdated(Box<nixfleet_reconciler::planner_types::SignedManifestSet>),
83    HeartbeatReceived {
84        host: String,
85        rollout_id: Option<nixfleet_proto::RolloutId>,
86        current_closure: Option<String>,
87        at: DateTime<Utc>,
88        reply: oneshot::Sender<HeartbeatReply>,
89    },
90    PlanTick,
91}
92
93/// Heartbeat reply: drift detected ⇒ `replay_from` is `Some(last_known_seq)`;
94/// agent should re-POST events from that seq onward (RFC-0005 §4.3).
95/// `bootstrap_rollouts` (LIFT #3) carries CP's view of active rollouts the
96/// agent should rehydrate when its reducer was lost (boot-recovery shape;
97/// heartbeat carried `rollout_id = None` but CP holds non-terminal records
98/// for the host).
99#[derive(Debug, Clone)]
100pub struct HeartbeatReply {
101    pub replay_from: Option<u64>,
102    pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
103}
104
105/// Shutdown signal handed to a worker at spawn time. Workers select! between
106/// their input source and `&mut self.0`; reducer-task exit drops the matched
107/// `Sender<()>` and the receiver resolves with `Err(RecvError)`. Workers
108/// treat any resolution as "shutdown" and exit cleanly.
109pub struct ShutdownToken(oneshot::Receiver<()>);
110
111impl ShutdownToken {
112    /// Unwrap to the inner `Receiver<()>` for direct use in `select!`. The
113    /// idiomatic pattern is:
114    ///
115    /// ```ignore
116    /// let mut shutdown = token.into_inner();
117    /// loop {
118    ///     tokio::select! {
119    ///         _ = &mut shutdown => return,
120    ///         msg = input.recv() => { ... }
121    ///     }
122    /// }
123    /// ```
124    pub fn into_inner(self) -> oneshot::Receiver<()> {
125        self.0
126    }
127}
128
129/// Handle returned by [`spawn`]. Holds the input channel sender (cloneable
130/// for HTTP route handlers) and the reducer-task `JoinHandle`. The handle
131/// must outlive every cloned sender, otherwise the reducer task exits
132/// prematurely.
133pub struct RuntimeHandle {
134    pub input_tx: mpsc::Sender<ReducerInput>,
135    /// Cloneable handle to the event_log writer task. Route handlers and
136    /// the reducer-task applier send `EventLogEntry` values here; the
137    /// dedicated writer task drains and persists. See `event_log_writer`
138    /// for the backpressure rationale.
139    pub event_log_tx: EventLogTx,
140    pub reducer_handle: JoinHandle<()>,
141    pub event_log_writer_handle: JoinHandle<()>,
142    pub worker_handles: Vec<JoinHandle<()>>,
143}
144
145impl RuntimeHandle {
146    /// All background tasks owned by this runtime. The CP `server::serve`
147    /// drains these as part of the shutdown protocol (`drain_background_tasks`).
148    pub fn into_join_handles(self) -> Vec<JoinHandle<()>> {
149        let mut handles = self.worker_handles;
150        handles.push(self.reducer_handle);
151        handles.push(self.event_log_writer_handle);
152        handles
153    }
154}
155
156/// Spawn the reducer + worker constellation. Returns a [`RuntimeHandle`]
157/// the caller wires into the server shutdown drain.
158///
159/// `clock` and `state` are the only external dependencies. Workers
160/// obtain everything else (DB handles, manifest cache, etc.) from `state`.
161pub fn spawn(cancel: CancellationToken, state: Arc<AppState>, clock: ClockHandle) -> RuntimeHandle {
162    let (input_tx, input_rx) = mpsc::channel::<ReducerInput>(REDUCER_INPUT_CAPACITY);
163    let (event_log_tx, event_log_rx) =
164        mpsc::channel::<crate::db::event_log::EventLogEntry>(EVENT_LOG_CHANNEL_CAPACITY);
165
166    // One oneshot per child task. Reducer task takes ownership of the
167    // senders in a `ShutdownGuard`; on reducer exit the guard drops, every
168    // sender drops, every child task's `select!` shutdown arm fires.
169    // Two tokens: 1 manifest-poll worker + 1 event-log writer. The HTTP
170    // route handlers (events / heartbeat / dispatch long-poll) write
171    // `ReducerInput` directly to `input_tx`; they coordinate shutdown
172    // through axum-server's graceful-drain protocol, not through this
173    // constellation (Phase 8f deleted the stub workers).
174    const SHUTDOWN_TOKEN_COUNT: usize = 2;
175    let mut shutdown_senders: Vec<oneshot::Sender<()>> = Vec::new();
176    let mut shutdown_tokens: Vec<ShutdownToken> = Vec::new();
177    for _ in 0..SHUTDOWN_TOKEN_COUNT {
178        let (tx, rx) = oneshot::channel::<()>();
179        shutdown_senders.push(tx);
180        shutdown_tokens.push(ShutdownToken(rx));
181    }
182    // Pop in reverse so each consumer gets a distinct token.
183    let event_log_writer_shutdown = shutdown_tokens
184        .pop()
185        .expect("SHUTDOWN_TOKEN_COUNT tokens allocated");
186    let manifest_poll_shutdown = shutdown_tokens
187        .pop()
188        .expect("SHUTDOWN_TOKEN_COUNT tokens allocated");
189
190    let worker_handles = vec![workers::manifest_poll::spawn(
191        state.clone(),
192        clock.clone(),
193        input_tx.clone(),
194        manifest_poll_shutdown,
195    )];
196
197    let event_log_writer_handle = match state.db.clone() {
198        Some(db) => event_log_writer::spawn(db, event_log_rx, event_log_writer_shutdown),
199        None => spawn_drain_only_writer(event_log_rx, event_log_writer_shutdown),
200    };
201
202    let reducer_handle = tokio::spawn({
203        let event_log_tx = event_log_tx.clone();
204        async move {
205            reducer::run(
206                cancel,
207                state,
208                clock,
209                input_rx,
210                event_log_tx,
211                shutdown_senders,
212            )
213            .await
214        }
215    });
216
217    RuntimeHandle {
218        input_tx,
219        event_log_tx,
220        reducer_handle,
221        event_log_writer_handle,
222        worker_handles,
223    }
224}
225
226/// Fallback writer when CP runs in in-memory mode (no `db_path` flag). Just
227/// drains entries silently so the channel never fills up. Production paths
228/// always configure a DB.
229fn spawn_drain_only_writer(
230    mut rx: mpsc::Receiver<crate::db::event_log::EventLogEntry>,
231    shutdown: ShutdownToken,
232) -> JoinHandle<()> {
233    tokio::spawn(async move {
234        let mut shutdown_rx = shutdown.into_inner();
235        loop {
236            tokio::select! {
237                _ = &mut shutdown_rx => return,
238                maybe = rx.recv() => {
239                    if maybe.is_none() { return; }
240                }
241            }
242        }
243    })
244}