nixfleet_control_plane/db/
event_log.rs

1//! Append-only canonical event log (RFC-0005 §4.3 + the broader log
2//! pattern: PlanActions, Effects, gate decisions, verifications, manifest
3//! polls all land here too).
4//!
5//! ## Relationship to `probe_failures`
6//!
7//! `event_log` is the **sole canonical store** — every PlanAction,
8//! Effect, GateDecision, and inbound agent event lands here exactly
9//! once. Indexed by `seq`, `(host_id, ts)`, `(rollout_id, seq)`,
10//! `(kind, ts)` for chronological / per-rollout / per-host /
11//! per-kind queries.
12//!
13//! `probe_failures` (RFC-0007 §7.2) is a **derived view** carrying the
14//! typed denormalization the compliance-wave gate needs cheaply
15//! (`probe_name`, `control_id`, `framework`, `observed_at` indexed on
16//! `(rollout_id, host_id, control_id)`). Single writer: the applier's
17//! `RemoteAppendEventLog` handler, on enforce-mode
18//! `ProbeResult { status = Fail }`, writes the event_log row AND the
19//! per-sub_result probe_failures rows in one transaction. Each
20//! probe_failures row carries an `event_log_seq` FK back to its
21//! source event_log entry — the table is provably re-derivable from
22//! canonical state and can be rebuilt by walking event_log from the
23//! beginning.
24//!
25//! Phase 9a deleted the prior `host_reports` denormalization (its
26//! query patterns + dedup invariant moved to probe_failures + the
27//! event_log writer respectively).
28
29use std::sync::Mutex;
30
31use anyhow::{Context, Result, anyhow};
32use chrono::{DateTime, Utc};
33use rusqlite::{Connection, params};
34
35pub struct EventLog<'a> {
36    pub(super) conn: &'a Mutex<Connection>,
37}
38
39/// What kind of log entry. Disambiguates the JSON `payload` shape and
40/// drives the operator-API filters.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum EventLogKind {
43    /// Inbound POST to `/v1/agent/events` (RFC-0005 §4.2 outbound).
44    AgentEvent,
45    /// Output of CP's `plan_next()` (RFC-0006 §4.1).
46    PlanAction,
47    /// `Effect` emitted by either side's reducer (RFC-0006 §9).
48    Effect,
49    /// One per gate evaluation (channel-edges, disruption-budget, ...).
50    GateDecision,
51    /// Signature-verification outcome (manifest, revocations,
52    /// bootstrap-nonces).
53    VerifyOutcome,
54    /// `channel_refs` poll outcome.
55    ManifestPoll,
56    /// CP-internal rollout-level state transition (RFC-0008 §4). Synthesized
57    /// by the applier from per-host events; written to event_log alongside
58    /// the `rollouts` derived-view update (RFC-0008 §6.3 + §7).
59    RolloutEvent,
60}
61
62impl EventLogKind {
63    pub fn as_db_str(&self) -> &'static str {
64        match self {
65            EventLogKind::AgentEvent => "agent_event",
66            EventLogKind::PlanAction => "plan_action",
67            EventLogKind::Effect => "effect",
68            EventLogKind::GateDecision => "gate_decision",
69            EventLogKind::VerifyOutcome => "verify_outcome",
70            EventLogKind::ManifestPoll => "manifest_poll",
71            EventLogKind::RolloutEvent => "rollout_event",
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct EventLogEntry {
78    pub kind: EventLogKind,
79    /// Caller-supplied timestamp. Must come from a `ClockHandle`, not
80    /// `Utc::now()` ad-hoc — the table has no SQL DEFAULT so test
81    /// fixtures using `FakeClock` cannot accidentally produce rows
82    /// timestamped with wallclock-now while the reducer's `now` is
83    /// frozen elsewhere.
84    pub ts: DateTime<Utc>,
85    pub host_id: Option<String>,
86    pub rollout_id: Option<String>,
87    /// JSON-encoded payload. Producer-side responsibility to canonicalise
88    /// per RFC-0003 §3 if cross-version-stable hashing matters; the table
89    /// itself just stores opaque text. `append()` validates that this
90    /// parses as JSON before inserting — a malformed row would poison the
91    /// replay tool permanently and silently.
92    pub payload: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct EventLogRow {
97    pub seq: i64,
98    pub ts: DateTime<Utc>,
99    pub kind: String,
100    pub host_id: Option<String>,
101    pub rollout_id: Option<String>,
102    pub payload: String,
103}
104
105impl<'a> EventLog<'a> {
106    /// Append a single entry. Returns the assigned `seq`. Validates the
107    /// `payload` parses as JSON before insert (a malformed row would
108    /// silently poison the replay tool — see `EventLogEntry::payload`
109    /// docstring).
110    pub fn append(&self, entry: &EventLogEntry) -> Result<i64> {
111        serde_json::from_str::<serde_json::Value>(&entry.payload)
112            .map_err(|e| anyhow!("event_log payload is not valid JSON: {e}"))?;
113        let conn = super::lock_conn(self.conn)?;
114        conn.execute(
115            "INSERT INTO event_log (ts, kind, host_id, rollout_id, payload)
116             VALUES (?1, ?2, ?3, ?4, ?5)",
117            params![
118                entry.ts.to_rfc3339(),
119                entry.kind.as_db_str(),
120                entry.host_id,
121                entry.rollout_id,
122                entry.payload,
123            ],
124        )
125        .context("append event_log")?;
126        Ok(conn.last_insert_rowid())
127    }
128
129    /// Latest seq in the log. Useful for "Replay-From" handshake in
130    /// RFC-0005 §4.3 and as a sanity check.
131    pub fn last_seq(&self) -> Result<i64> {
132        let conn = super::lock_conn(self.conn)?;
133        let n: Option<i64> = conn
134            .query_row("SELECT MAX(seq) FROM event_log", [], |r| r.get(0))
135            .context("last_seq event_log")?;
136        Ok(n.unwrap_or(0))
137    }
138
139    /// Entries for a host, ordered by seq ascending. Used by the
140    /// operator API and the replay tool.
141    pub fn query_by_host(&self, host_id: &str, limit: i64) -> Result<Vec<EventLogRow>> {
142        let conn = super::lock_conn(self.conn)?;
143        let mut stmt = conn.prepare(
144            "SELECT seq, ts, kind, host_id, rollout_id, payload
145             FROM event_log
146             WHERE host_id = ?1
147             ORDER BY seq ASC
148             LIMIT ?2",
149        )?;
150        let rows = stmt.query_map(params![host_id, limit], row_to_entry)?;
151        let mut out = Vec::new();
152        for r in rows {
153            out.push(r?);
154        }
155        Ok(out)
156    }
157
158    /// Entries for a rollout, ordered by seq ascending.
159    pub fn query_by_rollout(&self, rollout_id: &str, limit: i64) -> Result<Vec<EventLogRow>> {
160        let conn = super::lock_conn(self.conn)?;
161        let mut stmt = conn.prepare(
162            "SELECT seq, ts, kind, host_id, rollout_id, payload
163             FROM event_log
164             WHERE rollout_id = ?1
165             ORDER BY seq ASC
166             LIMIT ?2",
167        )?;
168        let rows = stmt.query_map(params![rollout_id, limit], row_to_entry)?;
169        let mut out = Vec::new();
170        for r in rows {
171            out.push(r?);
172        }
173        Ok(out)
174    }
175
176    /// Entries of a particular kind, ordered by seq ascending.
177    pub fn query_by_kind(&self, kind: EventLogKind, limit: i64) -> Result<Vec<EventLogRow>> {
178        let conn = super::lock_conn(self.conn)?;
179        let mut stmt = conn.prepare(
180            "SELECT seq, ts, kind, host_id, rollout_id, payload
181             FROM event_log
182             WHERE kind = ?1
183             ORDER BY seq ASC
184             LIMIT ?2",
185        )?;
186        let rows = stmt.query_map(params![kind.as_db_str(), limit], row_to_entry)?;
187        let mut out = Vec::new();
188        for r in rows {
189            out.push(r?);
190        }
191        Ok(out)
192    }
193}
194
195fn row_to_entry(row: &rusqlite::Row<'_>) -> rusqlite::Result<EventLogRow> {
196    let ts_str: String = row.get(1)?;
197    let ts = DateTime::parse_from_rfc3339(&ts_str)
198        .map(|dt| dt.with_timezone(&Utc))
199        .map_err(|e| {
200            rusqlite::Error::FromSqlConversionFailure(
201                1,
202                rusqlite::types::Type::Text,
203                format!("parse ts: {e}").into(),
204            )
205        })?;
206    Ok(EventLogRow {
207        seq: row.get(0)?,
208        ts,
209        kind: row.get(2)?,
210        host_id: row.get(3)?,
211        rollout_id: row.get(4)?,
212        payload: row.get(5)?,
213    })
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::db::Db;
220    use chrono::TimeZone;
221
222    fn fresh_db() -> Db {
223        let db = Db::open_in_memory().unwrap();
224        db.migrate().unwrap();
225        db
226    }
227
228    fn t0() -> DateTime<Utc> {
229        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
230    }
231
232    #[test]
233    fn append_assigns_monotonic_seqs() {
234        let db = fresh_db();
235        let log = EventLog { conn: &db.conn };
236        let entry = EventLogEntry {
237            kind: EventLogKind::AgentEvent,
238            ts: t0(),
239            host_id: Some("h1".into()),
240            rollout_id: Some("r1".into()),
241            payload: r#"{"kind":"DispatchAck"}"#.into(),
242        };
243        let s1 = log.append(&entry).unwrap();
244        let s2 = log.append(&entry).unwrap();
245        let s3 = log.append(&entry).unwrap();
246        assert!(s2 > s1);
247        assert!(s3 > s2);
248        assert_eq!(log.last_seq().unwrap(), s3);
249    }
250
251    #[test]
252    fn query_by_host_returns_only_matching_host() {
253        let db = fresh_db();
254        let log = EventLog { conn: &db.conn };
255        log.append(&EventLogEntry {
256            kind: EventLogKind::AgentEvent,
257            ts: t0(),
258            host_id: Some("h1".into()),
259            rollout_id: Some("r1".into()),
260            payload: r#"{"k":"a"}"#.into(),
261        })
262        .unwrap();
263        log.append(&EventLogEntry {
264            kind: EventLogKind::AgentEvent,
265            ts: t0(),
266            host_id: Some("h2".into()),
267            rollout_id: Some("r1".into()),
268            payload: r#"{"k":"b"}"#.into(),
269        })
270        .unwrap();
271        log.append(&EventLogEntry {
272            kind: EventLogKind::AgentEvent,
273            ts: t0(),
274            host_id: Some("h1".into()),
275            rollout_id: Some("r1".into()),
276            payload: r#"{"k":"c"}"#.into(),
277        })
278        .unwrap();
279
280        let got = log.query_by_host("h1", 100).unwrap();
281        assert_eq!(got.len(), 2);
282        assert!(got.iter().all(|r| r.host_id.as_deref() == Some("h1")));
283    }
284
285    #[test]
286    fn query_by_kind_filters_correctly() {
287        let db = fresh_db();
288        let log = EventLog { conn: &db.conn };
289        log.append(&EventLogEntry {
290            kind: EventLogKind::AgentEvent,
291            ts: t0(),
292            host_id: Some("h1".into()),
293            rollout_id: None,
294            payload: r#"{}"#.into(),
295        })
296        .unwrap();
297        log.append(&EventLogEntry {
298            kind: EventLogKind::PlanAction,
299            ts: t0(),
300            host_id: None,
301            rollout_id: Some("r1".into()),
302            payload: r#"{}"#.into(),
303        })
304        .unwrap();
305        let agent_only = log.query_by_kind(EventLogKind::AgentEvent, 100).unwrap();
306        assert_eq!(agent_only.len(), 1);
307        assert_eq!(agent_only[0].kind, "agent_event");
308    }
309
310    #[test]
311    fn entries_with_null_host_and_rollout_round_trip() {
312        // CP-side plan actions and manifest polls don't pin to a host.
313        let db = fresh_db();
314        let log = EventLog { conn: &db.conn };
315        let seq = log
316            .append(&EventLogEntry {
317                kind: EventLogKind::ManifestPoll,
318                ts: t0(),
319                host_id: None,
320                rollout_id: None,
321                payload: r#"{"channel":"stable","outcome":"304"}"#.into(),
322            })
323            .unwrap();
324        assert!(seq > 0);
325        let got = log.query_by_kind(EventLogKind::ManifestPoll, 10).unwrap();
326        assert_eq!(got.len(), 1);
327        assert_eq!(got[0].host_id, None);
328        assert_eq!(got[0].rollout_id, None);
329    }
330
331    #[test]
332    fn malformed_payload_rejected_at_append() {
333        let db = fresh_db();
334        let log = EventLog { conn: &db.conn };
335        let err = log
336            .append(&EventLogEntry {
337                kind: EventLogKind::AgentEvent,
338                ts: t0(),
339                host_id: Some("h1".into()),
340                rollout_id: None,
341                payload: "not-json{".into(),
342            })
343            .unwrap_err();
344        let s = format!("{err}");
345        assert!(
346            s.contains("not valid JSON"),
347            "expected JSON validation error, got {s}"
348        );
349        // Table must remain empty.
350        assert_eq!(log.last_seq().unwrap(), 0);
351    }
352
353    #[test]
354    fn caller_supplied_ts_round_trips() {
355        let db = fresh_db();
356        let log = EventLog { conn: &db.conn };
357        let supplied = t0() + chrono::Duration::seconds(42);
358        log.append(&EventLogEntry {
359            kind: EventLogKind::AgentEvent,
360            ts: supplied,
361            host_id: Some("h1".into()),
362            rollout_id: None,
363            payload: r#"{}"#.into(),
364        })
365        .unwrap();
366        let got = log.query_by_host("h1", 10).unwrap();
367        assert_eq!(got[0].ts, supplied);
368    }
369}