nixfleet_control_plane/runtime/event_log_writer.rs
1//! Event-log writer task: dedicated consumer of [`EventLogEntry`] values
2//! the reducer-task applier emits, persisting them to SQLite outside the
3//! reducer's critical section.
4//!
5//! The applier hands entries to this task via [`EventLogTx`]. The reducer
6//! never touches `Db::event_log()` directly — that keeps the reducer task
7//! free of the SQLite Mutex during high-frequency `RemoteAppendEventLog` /
8//! `RecordTransition` effects, and isolates writer hiccups (disk fsync
9//! pauses, mutex contention) from the per-host `step()` path.
10//!
11//! Backpressure: the channel is bounded. When full, the applier's
12//! `send().await` waits — which surfaces the slowdown back into the
13//! reducer's input MPSC, preserving the no-fail-open contract for the
14//! audit log (RFC-0005 §6: every gate decision and state transition
15//! must reach the log; silently dropping is forbidden).
16
17use std::sync::Arc;
18
19use tokio::sync::mpsc;
20use tokio::task::JoinHandle;
21
22use super::ShutdownToken;
23use crate::db::Db;
24use crate::db::event_log::EventLogEntry;
25
26/// Bounded channel depth for event-log entries between the reducer applier
27/// and the writer task.
28///
29/// Sizing rationale (v0.2 homelab scale):
30/// - ~256-host fleet, ~10 events per host during peak rollout activation,
31/// ⇒ peak burst ~2 560 entries; sustained rate is much lower.
32/// - SQLite + WAL sustained insert rate is ~10 k/s on stock SSDs, so
33/// draining 2 560 entries is < 300 ms.
34/// - Half of peak burst is comfortable headroom without making the writer
35/// queue itself a memory sink during steady state. 1 024 entries × ~256
36/// bytes/entry ≈ 256 KiB tail latency budget.
37/// - Backpressure surfaces at the reducer (its `send().await` waits),
38/// which is the desired behaviour: the audit log must not lose entries.
39pub const EVENT_LOG_CHANNEL_CAPACITY: usize = 1024;
40
41pub type EventLogTx = mpsc::Sender<EventLogEntry>;
42pub type EventLogRx = mpsc::Receiver<EventLogEntry>;
43
44/// Spawn the writer task. Returns the `JoinHandle` for the runtime drain.
45pub fn spawn(db: Arc<Db>, mut rx: EventLogRx, shutdown: ShutdownToken) -> JoinHandle<()> {
46 tokio::spawn(async move {
47 let mut shutdown_rx = shutdown.into_inner();
48 loop {
49 tokio::select! {
50 biased;
51 _ = &mut shutdown_rx => {
52 drain_pending(&db, &mut rx);
53 tracing::info!(
54 target: "shutdown",
55 task = "cp_event_log_writer",
56 "task shut down (drained pending entries)",
57 );
58 return;
59 }
60 maybe_entry = rx.recv() => {
61 let Some(entry) = maybe_entry else {
62 // All senders dropped (runtime tearing down).
63 return;
64 };
65 if let Err(err) = db.event_log().append(&entry) {
66 tracing::error!(
67 target: "cp_runtime",
68 kind = ?entry.kind,
69 host_id = ?entry.host_id,
70 rollout_id = ?entry.rollout_id,
71 error = %err,
72 "event_log writer: append failed",
73 );
74 // Continue — losing one row is preferable to
75 // killing the writer and silently dropping every
76 // subsequent entry.
77 }
78 }
79 }
80 }
81 })
82}
83
84/// Best-effort drain of in-flight entries on shutdown. Skips `send().await`
85/// because we're past the cancellation point — no new entries can arrive
86/// (every applier holds a `Sender` that's already been dropped or is
87/// about to be).
88fn drain_pending(db: &Db, rx: &mut EventLogRx) {
89 while let Ok(entry) = rx.try_recv() {
90 if let Err(err) = db.event_log().append(&entry) {
91 tracing::warn!(
92 target: "cp_runtime",
93 kind = ?entry.kind,
94 error = %err,
95 "event_log writer drain: append failed",
96 );
97 }
98 }
99}