nixfleet_control_plane/db/
mod.rs1use anyhow::{Context, Result};
4use rusqlite::Connection;
5use std::path::Path;
6use std::sync::{Mutex, MutexGuard};
7
8pub mod allowed_nonces;
9pub mod dispatch_queue;
10pub mod event_log;
11pub mod host_rollout_records;
12pub mod probe_failures;
13pub mod quarantined_closures;
14pub mod revocations;
15pub mod rollouts;
16pub mod tokens;
17
18pub use tokens::RecordTokenOutcome;
19
20mod embedded {
21 use refinery::embed_migrations;
22 embed_migrations!("migrations");
23}
24
25pub struct Db {
26 conn: Mutex<Connection>,
27}
28
29impl std::fmt::Debug for Db {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("Db").field("conn", &"<sqlite>").finish()
32 }
33}
34
35impl Db {
36 pub fn open(path: &Path) -> Result<Self> {
38 if let Some(parent) = path.parent()
39 && !parent.as_os_str().is_empty()
40 {
41 std::fs::create_dir_all(parent)
42 .with_context(|| format!("create dir {}", parent.display()))?;
43 }
44 let conn =
45 Connection::open(path).with_context(|| format!("open sqlite {}", path.display()))?;
46
47 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
48 .context("set sqlite pragmas")?;
49
50 Ok(Self {
51 conn: Mutex::new(conn),
52 })
53 }
54
55 pub fn open_in_memory() -> Result<Self> {
59 let conn = Connection::open_in_memory().context("open sqlite :memory:")?;
60 conn.execute_batch("PRAGMA foreign_keys=ON;")?;
61 Ok(Self {
62 conn: Mutex::new(conn),
63 })
64 }
65
66 fn conn(&self) -> Result<MutexGuard<'_, Connection>> {
67 lock_conn(&self.conn)
68 }
69
70 pub fn migrate(&self) -> Result<()> {
72 let mut guard = self.conn()?;
73 embedded::migrations::runner()
74 .run(&mut *guard)
75 .context("run sqlite migrations")?;
76 Ok(())
77 }
78
79 pub fn tokens(&self) -> tokens::Tokens<'_> {
80 tokens::Tokens { conn: &self.conn }
81 }
82
83 pub fn probe_failures(&self) -> probe_failures::ProbeFailures<'_> {
84 probe_failures::ProbeFailures { conn: &self.conn }
85 }
86
87 pub fn revocations(&self) -> revocations::Revocations<'_> {
89 revocations::Revocations { conn: &self.conn }
90 }
91
92 pub fn quarantined_closures(&self) -> quarantined_closures::QuarantinedClosures<'_> {
93 quarantined_closures::QuarantinedClosures { conn: &self.conn }
94 }
95
96 pub fn rollouts(&self) -> rollouts::Rollouts<'_> {
97 rollouts::Rollouts { conn: &self.conn }
98 }
99
100 pub fn dispatch_queue(&self) -> dispatch_queue::DispatchQueue<'_> {
101 dispatch_queue::DispatchQueue { conn: &self.conn }
102 }
103
104 pub fn event_log(&self) -> event_log::EventLog<'_> {
105 event_log::EventLog { conn: &self.conn }
106 }
107
108 pub fn host_rollout_records(&self) -> host_rollout_records::HostRolloutRecords<'_> {
109 host_rollout_records::HostRolloutRecords { conn: &self.conn }
110 }
111}
112
113pub(crate) fn lock_conn(mu: &Mutex<Connection>) -> Result<MutexGuard<'_, Connection>> {
115 mu.lock()
116 .map_err(|e| anyhow::anyhow!("db lock poisoned: {e}"))
117}
118
119pub(crate) fn read<F, T>(mu: &Mutex<Connection>, f: F) -> Result<T>
122where
123 F: FnOnce(&Connection) -> Result<T>,
124{
125 let guard = lock_conn(mu)?;
126 f(&guard)
127}
128
129pub(crate) fn txn<F, T>(mu: &Mutex<Connection>, label: &'static str, f: F) -> Result<T>
132where
133 F: FnOnce(&rusqlite::Transaction) -> Result<T>,
134{
135 let mut guard = lock_conn(mu)?;
136 let tx = guard
137 .transaction()
138 .with_context(|| format!("begin {label} txn"))?;
139 let v = f(&tx)?;
140 tx.commit().with_context(|| format!("commit {label} txn"))?;
141 Ok(v)
142}
143
144#[cfg(test)]
145pub(crate) mod test_helpers;
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn migrations_produce_consolidated_schema() {
153 let db = Db::open_in_memory().unwrap();
154 db.migrate().unwrap();
155 let conn = db.conn().unwrap();
156 let names: Vec<String> = conn
157 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
158 .unwrap()
159 .query_map([], |row| row.get(0))
160 .unwrap()
161 .collect::<Result<Vec<_>, _>>()
162 .unwrap();
163 for expected in &[
164 "cert_revocations",
165 "dispatch_queue",
166 "event_log",
167 "host_rollout_records",
168 "probe_failures",
169 "quarantined_closures",
170 "rollouts",
171 "token_replay",
172 ] {
173 assert!(
174 names.contains(&expected.to_string()),
175 "v0.2 baseline must create {expected}; got {names:?}",
176 );
177 }
178 for legacy in &[
180 "host_dispatch_state",
181 "host_rollout_state",
182 "host_reports",
183 "dispatch_history",
184 "pending_confirms",
185 "schema_placeholder",
186 ] {
187 assert!(
188 !names.contains(&legacy.to_string()),
189 "v0.2 baseline must not carry legacy table {legacy}",
190 );
191 }
192 }
193
194 #[allow(dead_code)]
195 fn columns_of(conn: &Connection, table: &str) -> Vec<String> {
196 conn.prepare(&format!("PRAGMA table_info({table})"))
197 .unwrap()
198 .query_map([], |row| row.get::<_, String>(1))
199 .unwrap()
200 .collect::<Result<Vec<_>, _>>()
201 .unwrap()
202 }
203
204 #[allow(dead_code)]
205 fn assert_table_exists(conn: &Connection, table: &str) {
206 let n: i64 = conn
207 .query_row(
208 "SELECT COUNT(*) FROM sqlite_master
209 WHERE type = 'table' AND name = ?1",
210 [table],
211 |r| r.get(0),
212 )
213 .unwrap();
214 assert_eq!(n, 1, "table {table} must exist after migration");
215 }
216}