nixfleet_agent/runtime/
outbound_queue.rs

1//! Disk-backed durable outbound event queue (Plan 07 locked-in
2//! decision; RFC-0005 §9.7).
3//!
4//! Each event is one file on disk under `{state_dir}/outbound-queue/`,
5//! named `{seq:020}-{hostname}-{rollout}-{event_kind}.json`. Zero-
6//! padded seq so directory listing is in seq-order. Atomic write via
7//! tmp + rename so a crash mid-write leaves no partially-formed file
8//! visible to the drainer. On successful POST, the file is deleted.
9//!
10//! Properties:
11//!   - Survives agent process crashes: every outbound event hits disk
12//!     before the network call returns.
13//!   - Single fsync per event: the rename hops the rename-survives-
14//!     reboot guarantee on POSIX filesystems; the data fsync ensures
15//!     the bytes are durable before the rename swings the pointer.
16//!   - Replay-from-seq friendly: a CP `X-Nixfleet-Replay-From: N`
17//!     response triggers a directory scan for files with seq ≥ N.
18//!   - Crash mid-write: a partial `.tmp` file is invisible to
19//!     [`OutboundQueue::scan_pending`] because the filename pattern
20//!     filters out non-`.json` paths. The next restart's drainer
21//!     picks up where it left off.
22
23use std::collections::BTreeMap;
24use std::path::{Path, PathBuf};
25
26use anyhow::{Context, Result};
27use chrono::{DateTime, Utc};
28use nixfleet_state_machine::OutboundAgentEvent;
29use serde::{Deserialize, Serialize};
30
31/// One entry in the on-disk queue. Persisted as JSON via serde.
32/// `payload` is the typed wire event (RFC-0004 §2 lift: the wire
33/// envelope + AgentEvent live in `nixfleet-proto`, both sides of the
34/// agent <-> CP boundary import the same types). The outbound worker
35/// wraps each QueuedEvent in an `AgentEventEnvelope` at POST time.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct QueuedEvent {
38    pub seq: u64,
39    pub hostname: String,
40    pub rollout_id: nixfleet_proto::RolloutId,
41    pub event_kind: String,
42    pub created_at: DateTime<Utc>,
43    /// Typed wire event. Same shape both sides verify against.
44    pub payload: nixfleet_proto::AgentEvent,
45}
46
47/// Disk-backed queue handle. Cheap to clone via `Arc`.
48#[derive(Clone)]
49pub struct OutboundQueue {
50    dir: PathBuf,
51}
52
53impl OutboundQueue {
54    /// Open / create the queue directory. Idempotent.
55    pub fn open(state_dir: &Path) -> Result<Self> {
56        let dir = state_dir.join("outbound-queue");
57        std::fs::create_dir_all(&dir)
58            .with_context(|| format!("create outbound-queue dir {}", dir.display()))?;
59        Ok(Self { dir })
60    }
61
62    /// Atomically persist an event. fsync before rename ensures the
63    /// payload bytes hit disk before the directory entry flips; a
64    /// crash between fsync and rename leaves a `.tmp` file that the
65    /// drainer ignores.
66    pub fn enqueue(&self, event: &QueuedEvent) -> Result<PathBuf> {
67        let final_name = filename_for(event);
68        let final_path = self.dir.join(&final_name);
69        let tmp_path = self.dir.join(format!("{final_name}.tmp"));
70
71        let bytes = serde_json::to_vec_pretty(event)
72            .with_context(|| format!("serialize QueuedEvent seq={}", event.seq))?;
73        write_atomic(&tmp_path, &final_path, &bytes)
74            .with_context(|| format!("atomic write {}", final_path.display()))?;
75        Ok(final_path)
76    }
77
78    /// All pending events, sorted ascending by seq. Filenames sort
79    /// lexicographically thanks to the 20-char zero-padded seq prefix;
80    /// the BTreeMap-by-seq pass below is belt-and-braces in case the
81    /// scan picks up files with unexpected sort order (rare on POSIX
82    /// but defensible).
83    pub fn scan_pending(&self) -> Result<Vec<QueuedEvent>> {
84        let entries = match std::fs::read_dir(&self.dir) {
85            Ok(it) => it,
86            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
87            Err(err) => return Err(anyhow::anyhow!("read_dir {}: {err}", self.dir.display())),
88        };
89        let mut by_seq: BTreeMap<u64, QueuedEvent> = BTreeMap::new();
90        for entry in entries {
91            let entry = entry.context("read_dir entry")?;
92            let name = entry.file_name();
93            let Some(name_str) = name.to_str() else {
94                continue;
95            };
96            // Filter to .json (rejects .tmp partials).
97            if !name_str.ends_with(".json") || name_str.ends_with(".json.tmp") {
98                continue;
99            }
100            let path = entry.path();
101            let bytes = match std::fs::read(&path) {
102                Ok(b) => b,
103                Err(err) => {
104                    tracing::warn!(
105                        target: "outbound_queue",
106                        path = %path.display(),
107                        error = %err,
108                        "scan_pending: read failed; skipping",
109                    );
110                    continue;
111                }
112            };
113            let event: QueuedEvent = match serde_json::from_slice(&bytes) {
114                Ok(e) => e,
115                Err(err) => {
116                    tracing::warn!(
117                        target: "outbound_queue",
118                        path = %path.display(),
119                        error = %err,
120                        "scan_pending: parse failed; skipping (operator should rm the bad file)",
121                    );
122                    continue;
123                }
124            };
125            by_seq.insert(event.seq, event);
126        }
127        Ok(by_seq.into_values().collect())
128    }
129
130    /// Delete the on-disk file for `event`. Called after a successful
131    /// POST to `/v1/agent/events`.
132    pub fn mark_sent(&self, event: &QueuedEvent) -> Result<()> {
133        let path = self.dir.join(filename_for(event));
134        match std::fs::remove_file(&path) {
135            Ok(()) => Ok(()),
136            // Already gone (concurrent drainer or operator hand-removed)
137            // is fine — the postcondition is "file is absent".
138            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
139            Err(err) => Err(anyhow::anyhow!("remove_file {}: {err}", path.display())),
140        }
141    }
142
143    /// Drop all queued events. Test-only entry point.
144    #[cfg(test)]
145    pub fn clear(&self) -> Result<()> {
146        let entries = std::fs::read_dir(&self.dir).context("read_dir for clear")?;
147        for entry in entries {
148            let entry = entry?;
149            let _ = std::fs::remove_file(entry.path());
150        }
151        Ok(())
152    }
153
154    pub fn dir(&self) -> &Path {
155        &self.dir
156    }
157}
158
159/// Map an `OutboundAgentEvent` to its kebab-case `event_kind`
160/// discriminator (used in the filename + the OutboundEventKind enum
161/// in `db/event_log` on the CP side).
162pub fn outbound_event_kind(payload: &OutboundAgentEvent) -> &'static str {
163    match payload {
164        OutboundAgentEvent::DispatchAck { .. } => "DispatchAck",
165        OutboundAgentEvent::ActivationStarted { .. } => "ActivationStarted",
166        OutboundAgentEvent::ActivationCompleted { .. } => "ActivationCompleted",
167        OutboundAgentEvent::ActivationDeferred { .. } => "ActivationDeferred",
168        OutboundAgentEvent::ActivationFailed { .. } => "ActivationFailed",
169        OutboundAgentEvent::ProbeTopologyDeclared { .. } => "ProbeTopologyDeclared",
170        OutboundAgentEvent::ProbeObservedFirst { .. } => "ProbeObservedFirst",
171        OutboundAgentEvent::ProbeResult { .. } => "ProbeResult",
172        OutboundAgentEvent::ProbeFailureFirst { .. } => "ProbeFailureFirst",
173        OutboundAgentEvent::Failed { .. } => "Failed",
174        OutboundAgentEvent::RollbackComplete { .. } => "RollbackComplete",
175        OutboundAgentEvent::Converged { .. } => "Converged",
176    }
177}
178
179/// Read the `seq` field off an `OutboundAgentEvent`.
180pub fn outbound_event_seq(payload: &OutboundAgentEvent) -> u64 {
181    match payload {
182        OutboundAgentEvent::DispatchAck { seq, .. }
183        | OutboundAgentEvent::ActivationStarted { seq, .. }
184        | OutboundAgentEvent::ActivationCompleted { seq, .. }
185        | OutboundAgentEvent::ActivationDeferred { seq, .. }
186        | OutboundAgentEvent::ActivationFailed { seq, .. }
187        | OutboundAgentEvent::ProbeTopologyDeclared { seq, .. }
188        | OutboundAgentEvent::ProbeObservedFirst { seq, .. }
189        | OutboundAgentEvent::ProbeResult { seq, .. }
190        | OutboundAgentEvent::ProbeFailureFirst { seq, .. }
191        | OutboundAgentEvent::Failed { seq, .. }
192        | OutboundAgentEvent::RollbackComplete { seq, .. }
193        | OutboundAgentEvent::Converged { seq, .. } => *seq,
194    }
195}
196
197/// `{seq:020}-{hostname}-{rollout}-{event_kind}.json`. The
198/// zero-padded seq gives lexicographic = chronological filename order;
199/// the `.json` suffix is what `scan_pending` filters on (vs `.tmp`).
200fn filename_for(event: &QueuedEvent) -> String {
201    let hostname = sanitize(&event.hostname);
202    let rollout = sanitize(event.rollout_id.as_str());
203    let kind = sanitize(&event.event_kind);
204    format!("{:020}-{hostname}-{rollout}-{kind}.json", event.seq)
205}
206
207/// Filename sanitisation: replace path separators + spaces with `_`.
208/// Belt-and-braces; the wire types should already constrain these
209/// strings to URL-safe shapes, but we don't trust the input.
210fn sanitize(s: &str) -> String {
211    s.chars()
212        .map(|c| match c {
213            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '@' | '.' => c,
214            _ => '_',
215        })
216        .collect()
217}
218
219fn write_atomic(tmp: &Path, final_path: &Path, bytes: &[u8]) -> Result<()> {
220    use std::io::Write;
221    {
222        let mut f = std::fs::OpenOptions::new()
223            .write(true)
224            .create(true)
225            .truncate(true)
226            .open(tmp)
227            .with_context(|| format!("open {}", tmp.display()))?;
228        f.write_all(bytes)
229            .with_context(|| format!("write {}", tmp.display()))?;
230        f.sync_all()
231            .with_context(|| format!("fsync {}", tmp.display()))?;
232    }
233    std::fs::rename(tmp, final_path)
234        .with_context(|| format!("rename {} -> {}", tmp.display(), final_path.display()))?;
235    Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use chrono::TimeZone;
242    use tempfile::TempDir;
243
244    fn t(seq: u64) -> QueuedEvent {
245        QueuedEvent {
246            seq,
247            hostname: "host-05".into(),
248            rollout_id: "stable@deadbeef".into(),
249            event_kind: "ActivationCompleted".into(),
250            created_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
251            payload: nixfleet_proto::AgentEvent::ActivationCompleted {
252                observed_current_closure: "closure-a".into(),
253                exit_code: 0,
254                completed_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
255                seq,
256            },
257        }
258    }
259
260    #[test]
261    fn enqueue_then_scan_round_trip() {
262        let dir = TempDir::new().unwrap();
263        let q = OutboundQueue::open(dir.path()).unwrap();
264        q.enqueue(&t(1)).unwrap();
265        q.enqueue(&t(2)).unwrap();
266        q.enqueue(&t(3)).unwrap();
267        let pending = q.scan_pending().unwrap();
268        assert_eq!(pending.len(), 3);
269        assert_eq!(pending[0].seq, 1);
270        assert_eq!(pending[1].seq, 2);
271        assert_eq!(pending[2].seq, 3);
272    }
273
274    #[test]
275    fn mark_sent_removes_from_queue() {
276        let dir = TempDir::new().unwrap();
277        let q = OutboundQueue::open(dir.path()).unwrap();
278        q.enqueue(&t(1)).unwrap();
279        q.enqueue(&t(2)).unwrap();
280        q.mark_sent(&t(1)).unwrap();
281        let pending = q.scan_pending().unwrap();
282        assert_eq!(pending.len(), 1);
283        assert_eq!(pending[0].seq, 2);
284    }
285
286    #[test]
287    fn mark_sent_is_idempotent_when_file_gone() {
288        let dir = TempDir::new().unwrap();
289        let q = OutboundQueue::open(dir.path()).unwrap();
290        q.enqueue(&t(1)).unwrap();
291        q.mark_sent(&t(1)).unwrap();
292        // Second call: file is already gone, must succeed.
293        q.mark_sent(&t(1)).unwrap();
294    }
295
296    #[test]
297    fn crash_mid_write_leaves_no_visible_event() {
298        // Simulate a crash between fsync and rename: drop a .tmp file
299        // with arbitrary contents. scan_pending must ignore it; the
300        // queue is otherwise empty so no event is delivered.
301        let dir = TempDir::new().unwrap();
302        let q = OutboundQueue::open(dir.path()).unwrap();
303        let tmp_path = q
304            .dir()
305            .join("00000000000000000007-host-x-rollout-y-Kind.json.tmp");
306        std::fs::write(&tmp_path, b"partial garbage").unwrap();
307        let pending = q.scan_pending().unwrap();
308        assert!(
309            pending.is_empty(),
310            "partial .tmp must not surface as a queued event",
311        );
312        // And no `.json` file exists for that seq either.
313        let listing: Vec<_> = std::fs::read_dir(q.dir())
314            .unwrap()
315            .filter_map(|e| e.ok())
316            .map(|e| e.file_name())
317            .collect();
318        assert_eq!(listing.len(), 1);
319        assert_eq!(
320            listing[0].to_str().unwrap(),
321            tmp_path.file_name().unwrap().to_str().unwrap()
322        );
323    }
324
325    #[test]
326    fn replay_from_seq_yields_correct_subset() {
327        // Operator scenario: CP returns Replay-From=2; agent re-POSTs
328        // events with seq ≥ 2. scan_pending is total; consumer filters.
329        let dir = TempDir::new().unwrap();
330        let q = OutboundQueue::open(dir.path()).unwrap();
331        for seq in 1..=5 {
332            q.enqueue(&t(seq)).unwrap();
333        }
334        let pending = q.scan_pending().unwrap();
335        let replay: Vec<_> = pending.into_iter().filter(|e| e.seq >= 2).collect();
336        assert_eq!(replay.len(), 4);
337        assert_eq!(replay[0].seq, 2);
338    }
339
340    #[test]
341    fn enqueue_overwrites_same_seq() {
342        // The atomic rename hops over any prior file with the same
343        // name — this is the recoverable-after-crash property. A
344        // duplicate enqueue is just a no-op update.
345        let dir = TempDir::new().unwrap();
346        let q = OutboundQueue::open(dir.path()).unwrap();
347        q.enqueue(&t(1)).unwrap();
348        // Different payload, same seq.
349        let mut second = t(1);
350        let perturbed_payload = nixfleet_proto::AgentEvent::ActivationCompleted {
351            observed_current_closure: "closure-b".into(),
352            exit_code: 99,
353            completed_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
354            seq: 1,
355        };
356        second.payload = perturbed_payload.clone();
357        q.enqueue(&second).unwrap();
358        let pending = q.scan_pending().unwrap();
359        assert_eq!(pending.len(), 1);
360        assert_eq!(pending[0].payload, perturbed_payload);
361    }
362}