nixfleet_control_plane/db/
mod.rs

1//! SQLite persistence: rusqlite + refinery, WAL + FK, single `Mutex<Connection>`.
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use std::path::Path;
6use std::sync::{Mutex, MutexGuard};
7
8pub mod allowed_nonces;
9pub mod dispatch_queue;
10pub mod event_log;
11pub mod host_rollout_records;
12pub mod probe_failures;
13pub mod quarantined_closures;
14pub mod revocations;
15pub mod rollouts;
16pub mod tokens;
17
18pub use tokens::RecordTokenOutcome;
19
20mod embedded {
21    use refinery::embed_migrations;
22    embed_migrations!("migrations");
23}
24
25pub struct Db {
26    conn: Mutex<Connection>,
27}
28
29impl std::fmt::Debug for Db {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("Db").field("conn", &"<sqlite>").finish()
32    }
33}
34
35impl Db {
36    /// Creates parent dirs; enables WAL + FK before migrations.
37    pub fn open(path: &Path) -> Result<Self> {
38        if let Some(parent) = path.parent()
39            && !parent.as_os_str().is_empty()
40        {
41            std::fs::create_dir_all(parent)
42                .with_context(|| format!("create dir {}", parent.display()))?;
43        }
44        let conn =
45            Connection::open(path).with_context(|| format!("open sqlite {}", path.display()))?;
46
47        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
48            .context("set sqlite pragmas")?;
49
50        Ok(Self {
51            conn: Mutex::new(conn),
52        })
53    }
54
55    /// In-memory SQLite for integration tests; trivial wrapper around
56    /// `Connection::open_in_memory()` exposed as a public API so
57    /// integration tests under `tests/` can construct an isolated `Db`.
58    pub fn open_in_memory() -> Result<Self> {
59        let conn = Connection::open_in_memory().context("open sqlite :memory:")?;
60        conn.execute_batch("PRAGMA foreign_keys=ON;")?;
61        Ok(Self {
62            conn: Mutex::new(conn),
63        })
64    }
65
66    fn conn(&self) -> Result<MutexGuard<'_, Connection>> {
67        lock_conn(&self.conn)
68    }
69
70    /// Idempotent.
71    pub fn migrate(&self) -> Result<()> {
72        let mut guard = self.conn()?;
73        embedded::migrations::runner()
74            .run(&mut *guard)
75            .context("run sqlite migrations")?;
76        Ok(())
77    }
78
79    pub fn tokens(&self) -> tokens::Tokens<'_> {
80        tokens::Tokens { conn: &self.conn }
81    }
82
83    pub fn probe_failures(&self) -> probe_failures::ProbeFailures<'_> {
84        probe_failures::ProbeFailures { conn: &self.conn }
85    }
86
87    /// Hard state.
88    pub fn revocations(&self) -> revocations::Revocations<'_> {
89        revocations::Revocations { conn: &self.conn }
90    }
91
92    pub fn quarantined_closures(&self) -> quarantined_closures::QuarantinedClosures<'_> {
93        quarantined_closures::QuarantinedClosures { conn: &self.conn }
94    }
95
96    pub fn rollouts(&self) -> rollouts::Rollouts<'_> {
97        rollouts::Rollouts { conn: &self.conn }
98    }
99
100    pub fn dispatch_queue(&self) -> dispatch_queue::DispatchQueue<'_> {
101        dispatch_queue::DispatchQueue { conn: &self.conn }
102    }
103
104    pub fn event_log(&self) -> event_log::EventLog<'_> {
105        event_log::EventLog { conn: &self.conn }
106    }
107
108    pub fn host_rollout_records(&self) -> host_rollout_records::HostRolloutRecords<'_> {
109        host_rollout_records::HostRolloutRecords { conn: &self.conn }
110    }
111}
112
113/// Surfaces mutex poisoning as anyhow rather than panic.
114pub(crate) fn lock_conn(mu: &Mutex<Connection>) -> Result<MutexGuard<'_, Connection>> {
115    mu.lock()
116        .map_err(|e| anyhow::anyhow!("db lock poisoned: {e}"))
117}
118
119/// Lock + read. Closure receives a borrowed `Connection`; lock is held
120/// for the closure's duration.
121pub(crate) fn read<F, T>(mu: &Mutex<Connection>, f: F) -> Result<T>
122where
123    F: FnOnce(&Connection) -> Result<T>,
124{
125    let guard = lock_conn(mu)?;
126    f(&guard)
127}
128
129/// Lock + open txn + run closure + commit. `label` shapes the begin/commit
130/// error context. Closure errors abort the txn.
131pub(crate) fn txn<F, T>(mu: &Mutex<Connection>, label: &'static str, f: F) -> Result<T>
132where
133    F: FnOnce(&rusqlite::Transaction) -> Result<T>,
134{
135    let mut guard = lock_conn(mu)?;
136    let tx = guard
137        .transaction()
138        .with_context(|| format!("begin {label} txn"))?;
139    let v = f(&tx)?;
140    tx.commit().with_context(|| format!("commit {label} txn"))?;
141    Ok(v)
142}
143
144#[cfg(test)]
145pub(crate) mod test_helpers;
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn migrations_produce_consolidated_schema() {
153        let db = Db::open_in_memory().unwrap();
154        db.migrate().unwrap();
155        let conn = db.conn().unwrap();
156        let names: Vec<String> = conn
157            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
158            .unwrap()
159            .query_map([], |row| row.get(0))
160            .unwrap()
161            .collect::<Result<Vec<_>, _>>()
162            .unwrap();
163        for expected in &[
164            "cert_revocations",
165            "dispatch_queue",
166            "event_log",
167            "host_rollout_records",
168            "probe_failures",
169            "quarantined_closures",
170            "rollouts",
171            "token_replay",
172        ] {
173            assert!(
174                names.contains(&expected.to_string()),
175                "v0.2 baseline must create {expected}; got {names:?}",
176            );
177        }
178        // v0.1 / 9a-deleted tables must not resurface.
179        for legacy in &[
180            "host_dispatch_state",
181            "host_rollout_state",
182            "host_reports",
183            "dispatch_history",
184            "pending_confirms",
185            "schema_placeholder",
186        ] {
187            assert!(
188                !names.contains(&legacy.to_string()),
189                "v0.2 baseline must not carry legacy table {legacy}",
190            );
191        }
192    }
193
194    #[allow(dead_code)]
195    fn columns_of(conn: &Connection, table: &str) -> Vec<String> {
196        conn.prepare(&format!("PRAGMA table_info({table})"))
197            .unwrap()
198            .query_map([], |row| row.get::<_, String>(1))
199            .unwrap()
200            .collect::<Result<Vec<_>, _>>()
201            .unwrap()
202    }
203
204    #[allow(dead_code)]
205    fn assert_table_exists(conn: &Connection, table: &str) {
206        let n: i64 = conn
207            .query_row(
208                "SELECT COUNT(*) FROM sqlite_master
209                 WHERE type = 'table' AND name = ?1",
210                [table],
211                |r| r.get(0),
212            )
213            .unwrap();
214        assert_eq!(n, 1, "table {table} must exist after migration");
215    }
216}