nixfleet_control_plane/db/
quarantined_closures.rs1use anyhow::{Context, Result};
20use chrono::{DateTime, Utc};
21use rusqlite::{Connection, params};
22use std::collections::{HashMap, HashSet};
23use std::sync::Mutex;
24
25pub struct QuarantinedClosures<'a> {
26 pub(super) conn: &'a Mutex<Connection>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct QuarantineRow {
31 pub channel: String,
32 pub closure_hash: String,
33 pub quarantined_at: DateTime<Utc>,
34 pub triggering_event_log_seq: Option<i64>,
35}
36
37impl<'a> QuarantinedClosures<'a> {
38 pub fn insert(
42 &self,
43 channel: &str,
44 closure_hash: &str,
45 quarantined_at: DateTime<Utc>,
46 triggering_event_log_seq: Option<i64>,
47 ) -> Result<()> {
48 let conn = self.conn.lock().expect("poisoned");
49 conn.execute(
50 "INSERT INTO quarantined_closures(
51 channel, closure_hash, quarantined_at, triggering_event_log_seq)
52 VALUES (?1, ?2, ?3, ?4)
53 ON CONFLICT(channel, closure_hash) DO NOTHING",
54 params![
55 channel,
56 closure_hash,
57 quarantined_at.to_rfc3339(),
58 triggering_event_log_seq
59 ],
60 )
61 .context("insert quarantine")?;
62 Ok(())
63 }
64
65 pub fn active_by_channel(&self) -> Result<HashMap<String, HashSet<String>>> {
69 let conn = self.conn.lock().expect("poisoned");
70 let mut stmt = conn
71 .prepare("SELECT channel, closure_hash FROM quarantined_closures")
72 .context("prepare active_by_channel")?;
73 let mut rows = stmt.query([]).context("query active_by_channel")?;
74 let mut out: HashMap<String, HashSet<String>> = HashMap::new();
75 while let Some(row) = rows.next().context("step active_by_channel")? {
76 let channel: String = row.get(0)?;
77 let closure_hash: String = row.get(1)?;
78 out.entry(channel).or_default().insert(closure_hash);
79 }
80 Ok(out)
81 }
82
83 pub fn list_active(&self) -> Result<Vec<QuarantineRow>> {
85 let conn = self.conn.lock().expect("poisoned");
86 let mut stmt = conn
87 .prepare(
88 "SELECT channel, closure_hash, quarantined_at, triggering_event_log_seq
89 FROM quarantined_closures
90 ORDER BY quarantined_at DESC",
91 )
92 .context("prepare list_active")?;
93 let mut rows = stmt.query([]).context("query list_active")?;
94 let mut out = Vec::new();
95 while let Some(row) = rows.next().context("step list_active")? {
96 let channel: String = row.get(0)?;
97 let closure_hash: String = row.get(1)?;
98 let qat: String = row.get(2)?;
99 let trig: Option<i64> = row.get(3)?;
100 out.push(QuarantineRow {
101 channel,
102 closure_hash,
103 quarantined_at: qat
104 .parse::<DateTime<Utc>>()
105 .with_context(|| format!("parse quarantined_closures.quarantined_at: {qat}"))?,
106 triggering_event_log_seq: trig,
107 });
108 }
109 Ok(out)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::db::Db;
117
118 fn fresh_db() -> Db {
119 let db = Db::open_in_memory().unwrap();
120 db.migrate().unwrap();
121 db
122 }
123
124 fn t0() -> DateTime<Utc> {
125 use chrono::TimeZone;
126 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
127 }
128
129 #[test]
130 fn insert_and_list_active() {
131 let db = fresh_db();
132 let q = db.quarantined_closures();
133 q.insert("stable", "abc123", t0(), None).unwrap();
134 let rows = q.list_active().unwrap();
135 assert_eq!(rows.len(), 1);
136 assert_eq!(rows[0].channel, "stable");
137 assert_eq!(rows[0].closure_hash, "abc123");
138 assert!(rows[0].triggering_event_log_seq.is_none());
139 }
140
141 #[test]
142 fn idempotent_insert_is_no_op_after_conflict() {
143 let db = fresh_db();
144 let q = db.quarantined_closures();
145 q.insert("stable", "abc", t0(), None).unwrap();
148 q.insert("stable", "abc", t0() + chrono::Duration::seconds(1), None)
150 .unwrap();
151 let rows = q.list_active().unwrap();
152 assert_eq!(rows.len(), 1);
153 assert_eq!(rows[0].quarantined_at, t0());
155 }
156
157 #[test]
158 fn active_by_channel_groups_correctly() {
159 let db = fresh_db();
160 let q = db.quarantined_closures();
161 q.insert("stable", "abc", t0(), None).unwrap();
162 q.insert("stable", "def", t0(), None).unwrap();
163 q.insert("edge", "ghi", t0(), None).unwrap();
164 let by_chan = q.active_by_channel().unwrap();
165 assert_eq!(by_chan["stable"].len(), 2);
166 assert_eq!(by_chan["edge"].len(), 1);
167 }
168}