nixfleet_control_plane/db/
quarantined_closures.rs

1//! Quarantined-closures derived view (RFC-0008 §6.4). Append-only: one
2//! row per `RollbackComplete` event (RFC-0005 §4.2). The applier is the
3//! sole writer; the `triggering_event_log_seq` FK proves the table is
4//! re-derivable from `event_log` (walk RollbackComplete events, group by
5//! `(channel, target_closure_hash)`, write one row per group).
6//!
7//! Trusted-input only: rows are written by the applier on
8//! `Effect::RemoteInsertQuarantine`. Agent-emitted `ClosureQuarantined`
9//! reports are NOT inserted here (they are unsigned and would let a
10//! compromised host DoS the fleet by quarantining arbitrary SHAs).
11//!
12//! `triggering_event_log_seq` is NULL-able under the v0.2.1 baseline
13//! (RFC-0008 §6.1 item 3 + v0.2.1-followups #1).
14//!
15//! Append-only under the v0.2 derived-view discipline: no `clear`,
16//! no `cleared_at`. Operator-driven clearance would land as an
17//! explicit event matching the `OperatorClearance` shape (RFC-0008 §4).
18
19use anyhow::{Context, Result};
20use chrono::{DateTime, Utc};
21use rusqlite::{Connection, params};
22use std::collections::{HashMap, HashSet};
23use std::sync::Mutex;
24
25pub struct QuarantinedClosures<'a> {
26    pub(super) conn: &'a Mutex<Connection>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct QuarantineRow {
31    pub channel: String,
32    pub closure_hash: String,
33    pub quarantined_at: DateTime<Utc>,
34    pub triggering_event_log_seq: Option<i64>,
35}
36
37impl<'a> QuarantinedClosures<'a> {
38    /// Idempotent under `(channel, closure_hash)` PK (ON CONFLICT DO
39    /// NOTHING — quarantines are append-only; re-quarantining the same
40    /// closure is a no-op rather than a re-stamp).
41    pub fn insert(
42        &self,
43        channel: &str,
44        closure_hash: &str,
45        quarantined_at: DateTime<Utc>,
46        triggering_event_log_seq: Option<i64>,
47    ) -> Result<()> {
48        let conn = self.conn.lock().expect("poisoned");
49        conn.execute(
50            "INSERT INTO quarantined_closures(
51                 channel, closure_hash, quarantined_at, triggering_event_log_seq)
52             VALUES (?1, ?2, ?3, ?4)
53             ON CONFLICT(channel, closure_hash) DO NOTHING",
54            params![
55                channel,
56                closure_hash,
57                quarantined_at.to_rfc3339(),
58                triggering_event_log_seq
59            ],
60        )
61        .context("insert quarantine")?;
62        Ok(())
63    }
64
65    /// Active set keyed by channel -> {closure_hash}. The gate reads this
66    /// on every plan tick via `Observed.quarantined_closures` to refuse
67    /// dispatch of a known-bad SHA.
68    pub fn active_by_channel(&self) -> Result<HashMap<String, HashSet<String>>> {
69        let conn = self.conn.lock().expect("poisoned");
70        let mut stmt = conn
71            .prepare("SELECT channel, closure_hash FROM quarantined_closures")
72            .context("prepare active_by_channel")?;
73        let mut rows = stmt.query([]).context("query active_by_channel")?;
74        let mut out: HashMap<String, HashSet<String>> = HashMap::new();
75        while let Some(row) = rows.next().context("step active_by_channel")? {
76            let channel: String = row.get(0)?;
77            let closure_hash: String = row.get(1)?;
78            out.entry(channel).or_default().insert(closure_hash);
79        }
80        Ok(out)
81    }
82
83    /// Operator-surface listing (CLI `nixfleet quarantine list`).
84    pub fn list_active(&self) -> Result<Vec<QuarantineRow>> {
85        let conn = self.conn.lock().expect("poisoned");
86        let mut stmt = conn
87            .prepare(
88                "SELECT channel, closure_hash, quarantined_at, triggering_event_log_seq
89                 FROM quarantined_closures
90                 ORDER BY quarantined_at DESC",
91            )
92            .context("prepare list_active")?;
93        let mut rows = stmt.query([]).context("query list_active")?;
94        let mut out = Vec::new();
95        while let Some(row) = rows.next().context("step list_active")? {
96            let channel: String = row.get(0)?;
97            let closure_hash: String = row.get(1)?;
98            let qat: String = row.get(2)?;
99            let trig: Option<i64> = row.get(3)?;
100            out.push(QuarantineRow {
101                channel,
102                closure_hash,
103                quarantined_at: qat
104                    .parse::<DateTime<Utc>>()
105                    .with_context(|| format!("parse quarantined_closures.quarantined_at: {qat}"))?,
106                triggering_event_log_seq: trig,
107            });
108        }
109        Ok(out)
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::db::Db;
117
118    fn fresh_db() -> Db {
119        let db = Db::open_in_memory().unwrap();
120        db.migrate().unwrap();
121        db
122    }
123
124    fn t0() -> DateTime<Utc> {
125        use chrono::TimeZone;
126        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
127    }
128
129    #[test]
130    fn insert_and_list_active() {
131        let db = fresh_db();
132        let q = db.quarantined_closures();
133        q.insert("stable", "abc123", t0(), None).unwrap();
134        let rows = q.list_active().unwrap();
135        assert_eq!(rows.len(), 1);
136        assert_eq!(rows[0].channel, "stable");
137        assert_eq!(rows[0].closure_hash, "abc123");
138        assert!(rows[0].triggering_event_log_seq.is_none());
139    }
140
141    #[test]
142    fn idempotent_insert_is_no_op_after_conflict() {
143        let db = fresh_db();
144        let q = db.quarantined_closures();
145        // FK is NULL-able under v0.2.1 baseline (RFC-0008 §6.1 item 3);
146        // None is the legal "FK not yet known" marker.
147        q.insert("stable", "abc", t0(), None).unwrap();
148        // Second insert at a later timestamp is a no-op (append-only).
149        q.insert("stable", "abc", t0() + chrono::Duration::seconds(1), None)
150            .unwrap();
151        let rows = q.list_active().unwrap();
152        assert_eq!(rows.len(), 1);
153        // First insert wins (ON CONFLICT DO NOTHING).
154        assert_eq!(rows[0].quarantined_at, t0());
155    }
156
157    #[test]
158    fn active_by_channel_groups_correctly() {
159        let db = fresh_db();
160        let q = db.quarantined_closures();
161        q.insert("stable", "abc", t0(), None).unwrap();
162        q.insert("stable", "def", t0(), None).unwrap();
163        q.insert("edge", "ghi", t0(), None).unwrap();
164        let by_chan = q.active_by_channel().unwrap();
165        assert_eq!(by_chan["stable"].len(), 2);
166        assert_eq!(by_chan["edge"].len(), 1);
167    }
168}