nixfleet_control_plane/db/
event_log.rs1use std::sync::Mutex;
30
31use anyhow::{Context, Result, anyhow};
32use chrono::{DateTime, Utc};
33use rusqlite::{Connection, params};
34
35pub struct EventLog<'a> {
36 pub(super) conn: &'a Mutex<Connection>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum EventLogKind {
43 AgentEvent,
45 PlanAction,
47 Effect,
49 GateDecision,
51 VerifyOutcome,
54 ManifestPoll,
56 RolloutEvent,
60}
61
62impl EventLogKind {
63 pub fn as_db_str(&self) -> &'static str {
64 match self {
65 EventLogKind::AgentEvent => "agent_event",
66 EventLogKind::PlanAction => "plan_action",
67 EventLogKind::Effect => "effect",
68 EventLogKind::GateDecision => "gate_decision",
69 EventLogKind::VerifyOutcome => "verify_outcome",
70 EventLogKind::ManifestPoll => "manifest_poll",
71 EventLogKind::RolloutEvent => "rollout_event",
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct EventLogEntry {
78 pub kind: EventLogKind,
79 pub ts: DateTime<Utc>,
85 pub host_id: Option<String>,
86 pub rollout_id: Option<String>,
87 pub payload: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct EventLogRow {
97 pub seq: i64,
98 pub ts: DateTime<Utc>,
99 pub kind: String,
100 pub host_id: Option<String>,
101 pub rollout_id: Option<String>,
102 pub payload: String,
103}
104
105impl<'a> EventLog<'a> {
106 pub fn append(&self, entry: &EventLogEntry) -> Result<i64> {
111 serde_json::from_str::<serde_json::Value>(&entry.payload)
112 .map_err(|e| anyhow!("event_log payload is not valid JSON: {e}"))?;
113 let conn = super::lock_conn(self.conn)?;
114 conn.execute(
115 "INSERT INTO event_log (ts, kind, host_id, rollout_id, payload)
116 VALUES (?1, ?2, ?3, ?4, ?5)",
117 params![
118 entry.ts.to_rfc3339(),
119 entry.kind.as_db_str(),
120 entry.host_id,
121 entry.rollout_id,
122 entry.payload,
123 ],
124 )
125 .context("append event_log")?;
126 Ok(conn.last_insert_rowid())
127 }
128
129 pub fn last_seq(&self) -> Result<i64> {
132 let conn = super::lock_conn(self.conn)?;
133 let n: Option<i64> = conn
134 .query_row("SELECT MAX(seq) FROM event_log", [], |r| r.get(0))
135 .context("last_seq event_log")?;
136 Ok(n.unwrap_or(0))
137 }
138
139 pub fn query_by_host(&self, host_id: &str, limit: i64) -> Result<Vec<EventLogRow>> {
142 let conn = super::lock_conn(self.conn)?;
143 let mut stmt = conn.prepare(
144 "SELECT seq, ts, kind, host_id, rollout_id, payload
145 FROM event_log
146 WHERE host_id = ?1
147 ORDER BY seq ASC
148 LIMIT ?2",
149 )?;
150 let rows = stmt.query_map(params![host_id, limit], row_to_entry)?;
151 let mut out = Vec::new();
152 for r in rows {
153 out.push(r?);
154 }
155 Ok(out)
156 }
157
158 pub fn query_by_rollout(&self, rollout_id: &str, limit: i64) -> Result<Vec<EventLogRow>> {
160 let conn = super::lock_conn(self.conn)?;
161 let mut stmt = conn.prepare(
162 "SELECT seq, ts, kind, host_id, rollout_id, payload
163 FROM event_log
164 WHERE rollout_id = ?1
165 ORDER BY seq ASC
166 LIMIT ?2",
167 )?;
168 let rows = stmt.query_map(params![rollout_id, limit], row_to_entry)?;
169 let mut out = Vec::new();
170 for r in rows {
171 out.push(r?);
172 }
173 Ok(out)
174 }
175
176 pub fn query_by_kind(&self, kind: EventLogKind, limit: i64) -> Result<Vec<EventLogRow>> {
178 let conn = super::lock_conn(self.conn)?;
179 let mut stmt = conn.prepare(
180 "SELECT seq, ts, kind, host_id, rollout_id, payload
181 FROM event_log
182 WHERE kind = ?1
183 ORDER BY seq ASC
184 LIMIT ?2",
185 )?;
186 let rows = stmt.query_map(params![kind.as_db_str(), limit], row_to_entry)?;
187 let mut out = Vec::new();
188 for r in rows {
189 out.push(r?);
190 }
191 Ok(out)
192 }
193}
194
195fn row_to_entry(row: &rusqlite::Row<'_>) -> rusqlite::Result<EventLogRow> {
196 let ts_str: String = row.get(1)?;
197 let ts = DateTime::parse_from_rfc3339(&ts_str)
198 .map(|dt| dt.with_timezone(&Utc))
199 .map_err(|e| {
200 rusqlite::Error::FromSqlConversionFailure(
201 1,
202 rusqlite::types::Type::Text,
203 format!("parse ts: {e}").into(),
204 )
205 })?;
206 Ok(EventLogRow {
207 seq: row.get(0)?,
208 ts,
209 kind: row.get(2)?,
210 host_id: row.get(3)?,
211 rollout_id: row.get(4)?,
212 payload: row.get(5)?,
213 })
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::db::Db;
220 use chrono::TimeZone;
221
222 fn fresh_db() -> Db {
223 let db = Db::open_in_memory().unwrap();
224 db.migrate().unwrap();
225 db
226 }
227
228 fn t0() -> DateTime<Utc> {
229 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
230 }
231
232 #[test]
233 fn append_assigns_monotonic_seqs() {
234 let db = fresh_db();
235 let log = EventLog { conn: &db.conn };
236 let entry = EventLogEntry {
237 kind: EventLogKind::AgentEvent,
238 ts: t0(),
239 host_id: Some("h1".into()),
240 rollout_id: Some("r1".into()),
241 payload: r#"{"kind":"DispatchAck"}"#.into(),
242 };
243 let s1 = log.append(&entry).unwrap();
244 let s2 = log.append(&entry).unwrap();
245 let s3 = log.append(&entry).unwrap();
246 assert!(s2 > s1);
247 assert!(s3 > s2);
248 assert_eq!(log.last_seq().unwrap(), s3);
249 }
250
251 #[test]
252 fn query_by_host_returns_only_matching_host() {
253 let db = fresh_db();
254 let log = EventLog { conn: &db.conn };
255 log.append(&EventLogEntry {
256 kind: EventLogKind::AgentEvent,
257 ts: t0(),
258 host_id: Some("h1".into()),
259 rollout_id: Some("r1".into()),
260 payload: r#"{"k":"a"}"#.into(),
261 })
262 .unwrap();
263 log.append(&EventLogEntry {
264 kind: EventLogKind::AgentEvent,
265 ts: t0(),
266 host_id: Some("h2".into()),
267 rollout_id: Some("r1".into()),
268 payload: r#"{"k":"b"}"#.into(),
269 })
270 .unwrap();
271 log.append(&EventLogEntry {
272 kind: EventLogKind::AgentEvent,
273 ts: t0(),
274 host_id: Some("h1".into()),
275 rollout_id: Some("r1".into()),
276 payload: r#"{"k":"c"}"#.into(),
277 })
278 .unwrap();
279
280 let got = log.query_by_host("h1", 100).unwrap();
281 assert_eq!(got.len(), 2);
282 assert!(got.iter().all(|r| r.host_id.as_deref() == Some("h1")));
283 }
284
285 #[test]
286 fn query_by_kind_filters_correctly() {
287 let db = fresh_db();
288 let log = EventLog { conn: &db.conn };
289 log.append(&EventLogEntry {
290 kind: EventLogKind::AgentEvent,
291 ts: t0(),
292 host_id: Some("h1".into()),
293 rollout_id: None,
294 payload: r#"{}"#.into(),
295 })
296 .unwrap();
297 log.append(&EventLogEntry {
298 kind: EventLogKind::PlanAction,
299 ts: t0(),
300 host_id: None,
301 rollout_id: Some("r1".into()),
302 payload: r#"{}"#.into(),
303 })
304 .unwrap();
305 let agent_only = log.query_by_kind(EventLogKind::AgentEvent, 100).unwrap();
306 assert_eq!(agent_only.len(), 1);
307 assert_eq!(agent_only[0].kind, "agent_event");
308 }
309
310 #[test]
311 fn entries_with_null_host_and_rollout_round_trip() {
312 let db = fresh_db();
314 let log = EventLog { conn: &db.conn };
315 let seq = log
316 .append(&EventLogEntry {
317 kind: EventLogKind::ManifestPoll,
318 ts: t0(),
319 host_id: None,
320 rollout_id: None,
321 payload: r#"{"channel":"stable","outcome":"304"}"#.into(),
322 })
323 .unwrap();
324 assert!(seq > 0);
325 let got = log.query_by_kind(EventLogKind::ManifestPoll, 10).unwrap();
326 assert_eq!(got.len(), 1);
327 assert_eq!(got[0].host_id, None);
328 assert_eq!(got[0].rollout_id, None);
329 }
330
331 #[test]
332 fn malformed_payload_rejected_at_append() {
333 let db = fresh_db();
334 let log = EventLog { conn: &db.conn };
335 let err = log
336 .append(&EventLogEntry {
337 kind: EventLogKind::AgentEvent,
338 ts: t0(),
339 host_id: Some("h1".into()),
340 rollout_id: None,
341 payload: "not-json{".into(),
342 })
343 .unwrap_err();
344 let s = format!("{err}");
345 assert!(
346 s.contains("not valid JSON"),
347 "expected JSON validation error, got {s}"
348 );
349 assert_eq!(log.last_seq().unwrap(), 0);
351 }
352
353 #[test]
354 fn caller_supplied_ts_round_trips() {
355 let db = fresh_db();
356 let log = EventLog { conn: &db.conn };
357 let supplied = t0() + chrono::Duration::seconds(42);
358 log.append(&EventLogEntry {
359 kind: EventLogKind::AgentEvent,
360 ts: supplied,
361 host_id: Some("h1".into()),
362 rollout_id: None,
363 payload: r#"{}"#.into(),
364 })
365 .unwrap();
366 let got = log.query_by_host("h1", 10).unwrap();
367 assert_eq!(got[0].ts, supplied);
368 }
369}