nixfleet_control_plane/runtime/
event_log_writer.rs

1//! Event-log writer task: dedicated consumer of [`EventLogEntry`] values
2//! the reducer-task applier emits, persisting them to SQLite outside the
3//! reducer's critical section.
4//!
5//! The applier hands entries to this task via [`EventLogTx`]. The reducer
6//! never touches `Db::event_log()` directly — that keeps the reducer task
7//! free of the SQLite Mutex during high-frequency `RemoteAppendEventLog` /
8//! `RecordTransition` effects, and isolates writer hiccups (disk fsync
9//! pauses, mutex contention) from the per-host `step()` path.
10//!
11//! Backpressure: the channel is bounded. When full, the applier's
12//! `send().await` waits — which surfaces the slowdown back into the
13//! reducer's input MPSC, preserving the no-fail-open contract for the
14//! audit log (RFC-0005 §6: every gate decision and state transition
15//! must reach the log; silently dropping is forbidden).
16
17use std::sync::Arc;
18
19use tokio::sync::mpsc;
20use tokio::task::JoinHandle;
21
22use super::ShutdownToken;
23use crate::db::Db;
24use crate::db::event_log::EventLogEntry;
25
26/// Bounded channel depth for event-log entries between the reducer applier
27/// and the writer task.
28///
29/// Sizing rationale (v0.2 homelab scale):
30/// - ~256-host fleet, ~10 events per host during peak rollout activation,
31///   ⇒ peak burst ~2 560 entries; sustained rate is much lower.
32/// - SQLite + WAL sustained insert rate is ~10 k/s on stock SSDs, so
33///   draining 2 560 entries is < 300 ms.
34/// - Half of peak burst is comfortable headroom without making the writer
35///   queue itself a memory sink during steady state. 1 024 entries × ~256
36///   bytes/entry ≈ 256 KiB tail latency budget.
37/// - Backpressure surfaces at the reducer (its `send().await` waits),
38///   which is the desired behaviour: the audit log must not lose entries.
39pub const EVENT_LOG_CHANNEL_CAPACITY: usize = 1024;
40
41pub type EventLogTx = mpsc::Sender<EventLogEntry>;
42pub type EventLogRx = mpsc::Receiver<EventLogEntry>;
43
44/// Spawn the writer task. Returns the `JoinHandle` for the runtime drain.
45pub fn spawn(db: Arc<Db>, mut rx: EventLogRx, shutdown: ShutdownToken) -> JoinHandle<()> {
46    tokio::spawn(async move {
47        let mut shutdown_rx = shutdown.into_inner();
48        loop {
49            tokio::select! {
50                biased;
51                _ = &mut shutdown_rx => {
52                    drain_pending(&db, &mut rx);
53                    tracing::info!(
54                        target: "shutdown",
55                        task = "cp_event_log_writer",
56                        "task shut down (drained pending entries)",
57                    );
58                    return;
59                }
60                maybe_entry = rx.recv() => {
61                    let Some(entry) = maybe_entry else {
62                        // All senders dropped (runtime tearing down).
63                        return;
64                    };
65                    if let Err(err) = db.event_log().append(&entry) {
66                        tracing::error!(
67                            target: "cp_runtime",
68                            kind = ?entry.kind,
69                            host_id = ?entry.host_id,
70                            rollout_id = ?entry.rollout_id,
71                            error = %err,
72                            "event_log writer: append failed",
73                        );
74                        // Continue — losing one row is preferable to
75                        // killing the writer and silently dropping every
76                        // subsequent entry.
77                    }
78                }
79            }
80        }
81    })
82}
83
84/// Best-effort drain of in-flight entries on shutdown. Skips `send().await`
85/// because we're past the cancellation point — no new entries can arrive
86/// (every applier holds a `Sender` that's already been dropped or is
87/// about to be).
88fn drain_pending(db: &Db, rx: &mut EventLogRx) {
89    while let Ok(entry) = rx.try_recv() {
90        if let Err(err) = db.event_log().append(&entry) {
91            tracing::warn!(
92                target: "cp_runtime",
93                kind = ?entry.kind,
94                error = %err,
95                "event_log writer drain: append failed",
96            );
97        }
98    }
99}