nixfleet_control_plane/db/
dispatch_queue.rs

1//! Pending Dispatch payloads awaiting agent long-poll (RFC-0005 §4.1 +
2//! plan 06).
3//!
4//! The runtime applier UPSERTs into this table on every
5//! `PlanAction::QueueDispatch` / `Effect::RemoteQueueDispatch`. The
6//! `GET /v1/agent/dispatch` long-poll handler `take_for_host()`-s a single
7//! row, deleting it atomically; failing to deliver doesn't lose the
8//! intent because the reducer will re-emit on the next plan tick (the
9//! planner skips hosts with `dispatch_acked_at` set, so re-emission only
10//! happens when the prior Dispatch was never acked).
11
12use std::sync::Mutex;
13
14use anyhow::{Context, Result};
15use chrono::{DateTime, Utc};
16use rusqlite::{Connection, OptionalExtension, params};
17
18pub struct DispatchQueue<'a> {
19    pub(super) conn: &'a Mutex<Connection>,
20}
21
22#[derive(Debug, Clone)]
23pub struct QueuedDispatch {
24    pub hostname: String,
25    pub rollout_id: nixfleet_proto::RolloutId,
26    pub target_closure: String,
27    pub soak_due_at: DateTime<Utc>,
28    pub enqueued_at: DateTime<Utc>,
29}
30
31impl<'a> DispatchQueue<'a> {
32    /// Upsert a pending Dispatch for `(hostname, rollout_id)`. Idempotent:
33    /// re-emission of `QueueDispatch` for the same pair overwrites
34    /// `target_closure` / `soak_due_at` (the reducer's view always wins).
35    pub fn upsert(&self, q: &QueuedDispatch) -> Result<()> {
36        let conn = super::lock_conn(self.conn)?;
37        conn.execute(
38            "INSERT INTO dispatch_queue
39                (hostname, rollout_id, target_closure, soak_due_at, enqueued_at)
40             VALUES (?1, ?2, ?3, ?4, ?5)
41             ON CONFLICT (hostname, rollout_id) DO UPDATE SET
42                target_closure = excluded.target_closure,
43                soak_due_at    = excluded.soak_due_at,
44                enqueued_at    = excluded.enqueued_at",
45            params![
46                q.hostname,
47                q.rollout_id,
48                q.target_closure,
49                q.soak_due_at.to_rfc3339(),
50                q.enqueued_at.to_rfc3339(),
51            ],
52        )
53        .context("upsert dispatch_queue")?;
54        Ok(())
55    }
56
57    /// Atomically claim and delete the next queued Dispatch for `hostname`.
58    /// Returns `Ok(None)` if no dispatch is pending.
59    ///
60    /// Atomicity: SELECT + DELETE in one txn under the connection mutex.
61    /// SQLite's WAL mode + the single-writer Mutex make this race-free
62    /// even with multiple long-poll handlers (no two see the same row).
63    pub fn take_for_host(&self, hostname: &str) -> Result<Option<QueuedDispatch>> {
64        super::txn(self.conn, "dispatch_queue.take_for_host", |tx| {
65            let row: Option<QueuedDispatch> = tx
66                .query_row(
67                    "SELECT hostname, rollout_id, target_closure,
68                            soak_due_at, enqueued_at
69                     FROM dispatch_queue
70                     WHERE hostname = ?1
71                     ORDER BY enqueued_at ASC
72                     LIMIT 1",
73                    params![hostname],
74                    row_to_queued,
75                )
76                .optional()
77                .context("select dispatch_queue row")?;
78            if let Some(ref q) = row {
79                tx.execute(
80                    "DELETE FROM dispatch_queue
81                     WHERE hostname = ?1 AND rollout_id = ?2",
82                    params![q.hostname, q.rollout_id],
83                )
84                .context("delete dispatch_queue row")?;
85            }
86            Ok(row)
87        })
88    }
89
90    /// Peek without claiming. Used by the long-poll worker after a wake
91    /// event to decide whether to actually fire `take_for_host` (avoids a
92    /// txn on every wake when the queue is empty for this host).
93    pub fn peek_for_host(&self, hostname: &str) -> Result<bool> {
94        let conn = super::lock_conn(self.conn)?;
95        let n: i64 = conn
96            .query_row(
97                "SELECT COUNT(*) FROM dispatch_queue WHERE hostname = ?1",
98                params![hostname],
99                |r| r.get(0),
100            )
101            .context("count dispatch_queue rows for host")?;
102        Ok(n > 0)
103    }
104}
105
106fn row_to_queued(row: &rusqlite::Row<'_>) -> rusqlite::Result<QueuedDispatch> {
107    let soak: String = row.get(3)?;
108    let enq: String = row.get(4)?;
109    Ok(QueuedDispatch {
110        hostname: row.get(0)?,
111        rollout_id: row.get(1)?,
112        target_closure: row.get(2)?,
113        soak_due_at: parse_rfc3339(&soak, "soak_due_at")?,
114        enqueued_at: parse_rfc3339(&enq, "enqueued_at")?,
115    })
116}
117
118fn parse_rfc3339(s: &str, field: &'static str) -> rusqlite::Result<DateTime<Utc>> {
119    DateTime::parse_from_rfc3339(s)
120        .map(|dt| dt.with_timezone(&Utc))
121        .map_err(|e| {
122            rusqlite::Error::FromSqlConversionFailure(
123                0,
124                rusqlite::types::Type::Text,
125                format!("parse {field}: {e}").into(),
126            )
127        })
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::db::Db;
134    use chrono::TimeZone;
135
136    fn t0() -> DateTime<Utc> {
137        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
138    }
139
140    fn fresh_db() -> Db {
141        let db = Db::open_in_memory().unwrap();
142        db.migrate().unwrap();
143        db
144    }
145
146    fn sample(host: &str, rollout: &str, closure: &str) -> QueuedDispatch {
147        QueuedDispatch {
148            hostname: host.into(),
149            rollout_id: rollout.into(),
150            target_closure: closure.into(),
151            soak_due_at: t0() + chrono::Duration::minutes(5),
152            enqueued_at: t0(),
153        }
154    }
155
156    #[test]
157    fn upsert_then_take_round_trip() {
158        let db = fresh_db();
159        let q = DispatchQueue { conn: &db.conn };
160        q.upsert(&sample("h1", "r1", "abc")).unwrap();
161        let taken = q.take_for_host("h1").unwrap().unwrap();
162        assert_eq!(taken.target_closure, "abc");
163        // Second take returns None — row was deleted.
164        assert!(q.take_for_host("h1").unwrap().is_none());
165    }
166
167    #[test]
168    fn upsert_overwrites_existing_pair() {
169        let db = fresh_db();
170        let q = DispatchQueue { conn: &db.conn };
171        q.upsert(&sample("h1", "r1", "abc")).unwrap();
172        q.upsert(&sample("h1", "r1", "def")).unwrap();
173        let taken = q.take_for_host("h1").unwrap().unwrap();
174        assert_eq!(taken.target_closure, "def");
175        assert!(q.take_for_host("h1").unwrap().is_none());
176    }
177
178    #[test]
179    fn take_returns_oldest_first() {
180        let db = fresh_db();
181        let q = DispatchQueue { conn: &db.conn };
182        let mut older = sample("h1", "r1", "first");
183        older.enqueued_at = t0();
184        let mut newer = sample("h1", "r2", "second");
185        newer.enqueued_at = t0() + chrono::Duration::seconds(10);
186        q.upsert(&older).unwrap();
187        q.upsert(&newer).unwrap();
188        let first = q.take_for_host("h1").unwrap().unwrap();
189        assert_eq!(first.target_closure, "first");
190        let second = q.take_for_host("h1").unwrap().unwrap();
191        assert_eq!(second.target_closure, "second");
192    }
193
194    #[test]
195    fn peek_reports_pending() {
196        let db = fresh_db();
197        let q = DispatchQueue { conn: &db.conn };
198        assert!(!q.peek_for_host("h1").unwrap());
199        q.upsert(&sample("h1", "r1", "abc")).unwrap();
200        assert!(q.peek_for_host("h1").unwrap());
201    }
202
203    #[test]
204    fn take_for_other_host_does_not_affect_this_one() {
205        let db = fresh_db();
206        let q = DispatchQueue { conn: &db.conn };
207        q.upsert(&sample("h1", "r1", "abc")).unwrap();
208        q.upsert(&sample("h2", "r1", "xyz")).unwrap();
209        let taken = q.take_for_host("h2").unwrap().unwrap();
210        assert_eq!(taken.hostname, "h2");
211        // h1's row still there.
212        assert!(q.peek_for_host("h1").unwrap());
213    }
214}