nixfleet_control_plane/db/
dispatch_queue.rs1use std::sync::Mutex;
13
14use anyhow::{Context, Result};
15use chrono::{DateTime, Utc};
16use rusqlite::{Connection, OptionalExtension, params};
17
18pub struct DispatchQueue<'a> {
19 pub(super) conn: &'a Mutex<Connection>,
20}
21
22#[derive(Debug, Clone)]
23pub struct QueuedDispatch {
24 pub hostname: String,
25 pub rollout_id: nixfleet_proto::RolloutId,
26 pub target_closure: String,
27 pub soak_due_at: DateTime<Utc>,
28 pub enqueued_at: DateTime<Utc>,
29}
30
31impl<'a> DispatchQueue<'a> {
32 pub fn upsert(&self, q: &QueuedDispatch) -> Result<()> {
36 let conn = super::lock_conn(self.conn)?;
37 conn.execute(
38 "INSERT INTO dispatch_queue
39 (hostname, rollout_id, target_closure, soak_due_at, enqueued_at)
40 VALUES (?1, ?2, ?3, ?4, ?5)
41 ON CONFLICT (hostname, rollout_id) DO UPDATE SET
42 target_closure = excluded.target_closure,
43 soak_due_at = excluded.soak_due_at,
44 enqueued_at = excluded.enqueued_at",
45 params![
46 q.hostname,
47 q.rollout_id,
48 q.target_closure,
49 q.soak_due_at.to_rfc3339(),
50 q.enqueued_at.to_rfc3339(),
51 ],
52 )
53 .context("upsert dispatch_queue")?;
54 Ok(())
55 }
56
57 pub fn take_for_host(&self, hostname: &str) -> Result<Option<QueuedDispatch>> {
64 super::txn(self.conn, "dispatch_queue.take_for_host", |tx| {
65 let row: Option<QueuedDispatch> = tx
66 .query_row(
67 "SELECT hostname, rollout_id, target_closure,
68 soak_due_at, enqueued_at
69 FROM dispatch_queue
70 WHERE hostname = ?1
71 ORDER BY enqueued_at ASC
72 LIMIT 1",
73 params![hostname],
74 row_to_queued,
75 )
76 .optional()
77 .context("select dispatch_queue row")?;
78 if let Some(ref q) = row {
79 tx.execute(
80 "DELETE FROM dispatch_queue
81 WHERE hostname = ?1 AND rollout_id = ?2",
82 params![q.hostname, q.rollout_id],
83 )
84 .context("delete dispatch_queue row")?;
85 }
86 Ok(row)
87 })
88 }
89
90 pub fn peek_for_host(&self, hostname: &str) -> Result<bool> {
94 let conn = super::lock_conn(self.conn)?;
95 let n: i64 = conn
96 .query_row(
97 "SELECT COUNT(*) FROM dispatch_queue WHERE hostname = ?1",
98 params![hostname],
99 |r| r.get(0),
100 )
101 .context("count dispatch_queue rows for host")?;
102 Ok(n > 0)
103 }
104}
105
106fn row_to_queued(row: &rusqlite::Row<'_>) -> rusqlite::Result<QueuedDispatch> {
107 let soak: String = row.get(3)?;
108 let enq: String = row.get(4)?;
109 Ok(QueuedDispatch {
110 hostname: row.get(0)?,
111 rollout_id: row.get(1)?,
112 target_closure: row.get(2)?,
113 soak_due_at: parse_rfc3339(&soak, "soak_due_at")?,
114 enqueued_at: parse_rfc3339(&enq, "enqueued_at")?,
115 })
116}
117
118fn parse_rfc3339(s: &str, field: &'static str) -> rusqlite::Result<DateTime<Utc>> {
119 DateTime::parse_from_rfc3339(s)
120 .map(|dt| dt.with_timezone(&Utc))
121 .map_err(|e| {
122 rusqlite::Error::FromSqlConversionFailure(
123 0,
124 rusqlite::types::Type::Text,
125 format!("parse {field}: {e}").into(),
126 )
127 })
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use crate::db::Db;
134 use chrono::TimeZone;
135
136 fn t0() -> DateTime<Utc> {
137 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
138 }
139
140 fn fresh_db() -> Db {
141 let db = Db::open_in_memory().unwrap();
142 db.migrate().unwrap();
143 db
144 }
145
146 fn sample(host: &str, rollout: &str, closure: &str) -> QueuedDispatch {
147 QueuedDispatch {
148 hostname: host.into(),
149 rollout_id: rollout.into(),
150 target_closure: closure.into(),
151 soak_due_at: t0() + chrono::Duration::minutes(5),
152 enqueued_at: t0(),
153 }
154 }
155
156 #[test]
157 fn upsert_then_take_round_trip() {
158 let db = fresh_db();
159 let q = DispatchQueue { conn: &db.conn };
160 q.upsert(&sample("h1", "r1", "abc")).unwrap();
161 let taken = q.take_for_host("h1").unwrap().unwrap();
162 assert_eq!(taken.target_closure, "abc");
163 assert!(q.take_for_host("h1").unwrap().is_none());
165 }
166
167 #[test]
168 fn upsert_overwrites_existing_pair() {
169 let db = fresh_db();
170 let q = DispatchQueue { conn: &db.conn };
171 q.upsert(&sample("h1", "r1", "abc")).unwrap();
172 q.upsert(&sample("h1", "r1", "def")).unwrap();
173 let taken = q.take_for_host("h1").unwrap().unwrap();
174 assert_eq!(taken.target_closure, "def");
175 assert!(q.take_for_host("h1").unwrap().is_none());
176 }
177
178 #[test]
179 fn take_returns_oldest_first() {
180 let db = fresh_db();
181 let q = DispatchQueue { conn: &db.conn };
182 let mut older = sample("h1", "r1", "first");
183 older.enqueued_at = t0();
184 let mut newer = sample("h1", "r2", "second");
185 newer.enqueued_at = t0() + chrono::Duration::seconds(10);
186 q.upsert(&older).unwrap();
187 q.upsert(&newer).unwrap();
188 let first = q.take_for_host("h1").unwrap().unwrap();
189 assert_eq!(first.target_closure, "first");
190 let second = q.take_for_host("h1").unwrap().unwrap();
191 assert_eq!(second.target_closure, "second");
192 }
193
194 #[test]
195 fn peek_reports_pending() {
196 let db = fresh_db();
197 let q = DispatchQueue { conn: &db.conn };
198 assert!(!q.peek_for_host("h1").unwrap());
199 q.upsert(&sample("h1", "r1", "abc")).unwrap();
200 assert!(q.peek_for_host("h1").unwrap());
201 }
202
203 #[test]
204 fn take_for_other_host_does_not_affect_this_one() {
205 let db = fresh_db();
206 let q = DispatchQueue { conn: &db.conn };
207 q.upsert(&sample("h1", "r1", "abc")).unwrap();
208 q.upsert(&sample("h2", "r1", "xyz")).unwrap();
209 let taken = q.take_for_host("h2").unwrap().unwrap();
210 assert_eq!(taken.hostname, "h2");
211 assert!(q.peek_for_host("h1").unwrap());
213 }
214}