1use std::collections::BTreeMap;
24use std::path::{Path, PathBuf};
25
26use anyhow::{Context, Result};
27use chrono::{DateTime, Utc};
28use nixfleet_state_machine::OutboundAgentEvent;
29use serde::{Deserialize, Serialize};
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct QueuedEvent {
38 pub seq: u64,
39 pub hostname: String,
40 pub rollout_id: nixfleet_proto::RolloutId,
41 pub event_kind: String,
42 pub created_at: DateTime<Utc>,
43 pub payload: nixfleet_proto::AgentEvent,
45}
46
47#[derive(Clone)]
49pub struct OutboundQueue {
50 dir: PathBuf,
51}
52
53impl OutboundQueue {
54 pub fn open(state_dir: &Path) -> Result<Self> {
56 let dir = state_dir.join("outbound-queue");
57 std::fs::create_dir_all(&dir)
58 .with_context(|| format!("create outbound-queue dir {}", dir.display()))?;
59 Ok(Self { dir })
60 }
61
62 pub fn enqueue(&self, event: &QueuedEvent) -> Result<PathBuf> {
67 let final_name = filename_for(event);
68 let final_path = self.dir.join(&final_name);
69 let tmp_path = self.dir.join(format!("{final_name}.tmp"));
70
71 let bytes = serde_json::to_vec_pretty(event)
72 .with_context(|| format!("serialize QueuedEvent seq={}", event.seq))?;
73 write_atomic(&tmp_path, &final_path, &bytes)
74 .with_context(|| format!("atomic write {}", final_path.display()))?;
75 Ok(final_path)
76 }
77
78 pub fn scan_pending(&self) -> Result<Vec<QueuedEvent>> {
84 let entries = match std::fs::read_dir(&self.dir) {
85 Ok(it) => it,
86 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
87 Err(err) => return Err(anyhow::anyhow!("read_dir {}: {err}", self.dir.display())),
88 };
89 let mut by_seq: BTreeMap<u64, QueuedEvent> = BTreeMap::new();
90 for entry in entries {
91 let entry = entry.context("read_dir entry")?;
92 let name = entry.file_name();
93 let Some(name_str) = name.to_str() else {
94 continue;
95 };
96 if !name_str.ends_with(".json") || name_str.ends_with(".json.tmp") {
98 continue;
99 }
100 let path = entry.path();
101 let bytes = match std::fs::read(&path) {
102 Ok(b) => b,
103 Err(err) => {
104 tracing::warn!(
105 target: "outbound_queue",
106 path = %path.display(),
107 error = %err,
108 "scan_pending: read failed; skipping",
109 );
110 continue;
111 }
112 };
113 let event: QueuedEvent = match serde_json::from_slice(&bytes) {
114 Ok(e) => e,
115 Err(err) => {
116 tracing::warn!(
117 target: "outbound_queue",
118 path = %path.display(),
119 error = %err,
120 "scan_pending: parse failed; skipping (operator should rm the bad file)",
121 );
122 continue;
123 }
124 };
125 by_seq.insert(event.seq, event);
126 }
127 Ok(by_seq.into_values().collect())
128 }
129
130 pub fn mark_sent(&self, event: &QueuedEvent) -> Result<()> {
133 let path = self.dir.join(filename_for(event));
134 match std::fs::remove_file(&path) {
135 Ok(()) => Ok(()),
136 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
139 Err(err) => Err(anyhow::anyhow!("remove_file {}: {err}", path.display())),
140 }
141 }
142
143 #[cfg(test)]
145 pub fn clear(&self) -> Result<()> {
146 let entries = std::fs::read_dir(&self.dir).context("read_dir for clear")?;
147 for entry in entries {
148 let entry = entry?;
149 let _ = std::fs::remove_file(entry.path());
150 }
151 Ok(())
152 }
153
154 pub fn dir(&self) -> &Path {
155 &self.dir
156 }
157}
158
159pub fn outbound_event_kind(payload: &OutboundAgentEvent) -> &'static str {
163 match payload {
164 OutboundAgentEvent::DispatchAck { .. } => "DispatchAck",
165 OutboundAgentEvent::ActivationStarted { .. } => "ActivationStarted",
166 OutboundAgentEvent::ActivationCompleted { .. } => "ActivationCompleted",
167 OutboundAgentEvent::ActivationDeferred { .. } => "ActivationDeferred",
168 OutboundAgentEvent::ActivationFailed { .. } => "ActivationFailed",
169 OutboundAgentEvent::ProbeTopologyDeclared { .. } => "ProbeTopologyDeclared",
170 OutboundAgentEvent::ProbeObservedFirst { .. } => "ProbeObservedFirst",
171 OutboundAgentEvent::ProbeResult { .. } => "ProbeResult",
172 OutboundAgentEvent::ProbeFailureFirst { .. } => "ProbeFailureFirst",
173 OutboundAgentEvent::Failed { .. } => "Failed",
174 OutboundAgentEvent::RollbackComplete { .. } => "RollbackComplete",
175 OutboundAgentEvent::Converged { .. } => "Converged",
176 }
177}
178
179pub fn outbound_event_seq(payload: &OutboundAgentEvent) -> u64 {
181 match payload {
182 OutboundAgentEvent::DispatchAck { seq, .. }
183 | OutboundAgentEvent::ActivationStarted { seq, .. }
184 | OutboundAgentEvent::ActivationCompleted { seq, .. }
185 | OutboundAgentEvent::ActivationDeferred { seq, .. }
186 | OutboundAgentEvent::ActivationFailed { seq, .. }
187 | OutboundAgentEvent::ProbeTopologyDeclared { seq, .. }
188 | OutboundAgentEvent::ProbeObservedFirst { seq, .. }
189 | OutboundAgentEvent::ProbeResult { seq, .. }
190 | OutboundAgentEvent::ProbeFailureFirst { seq, .. }
191 | OutboundAgentEvent::Failed { seq, .. }
192 | OutboundAgentEvent::RollbackComplete { seq, .. }
193 | OutboundAgentEvent::Converged { seq, .. } => *seq,
194 }
195}
196
197fn filename_for(event: &QueuedEvent) -> String {
201 let hostname = sanitize(&event.hostname);
202 let rollout = sanitize(event.rollout_id.as_str());
203 let kind = sanitize(&event.event_kind);
204 format!("{:020}-{hostname}-{rollout}-{kind}.json", event.seq)
205}
206
207fn sanitize(s: &str) -> String {
211 s.chars()
212 .map(|c| match c {
213 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '@' | '.' => c,
214 _ => '_',
215 })
216 .collect()
217}
218
219fn write_atomic(tmp: &Path, final_path: &Path, bytes: &[u8]) -> Result<()> {
220 use std::io::Write;
221 {
222 let mut f = std::fs::OpenOptions::new()
223 .write(true)
224 .create(true)
225 .truncate(true)
226 .open(tmp)
227 .with_context(|| format!("open {}", tmp.display()))?;
228 f.write_all(bytes)
229 .with_context(|| format!("write {}", tmp.display()))?;
230 f.sync_all()
231 .with_context(|| format!("fsync {}", tmp.display()))?;
232 }
233 std::fs::rename(tmp, final_path)
234 .with_context(|| format!("rename {} -> {}", tmp.display(), final_path.display()))?;
235 Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use chrono::TimeZone;
242 use tempfile::TempDir;
243
244 fn t(seq: u64) -> QueuedEvent {
245 QueuedEvent {
246 seq,
247 hostname: "host-05".into(),
248 rollout_id: "stable@deadbeef".into(),
249 event_kind: "ActivationCompleted".into(),
250 created_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
251 payload: nixfleet_proto::AgentEvent::ActivationCompleted {
252 observed_current_closure: "closure-a".into(),
253 exit_code: 0,
254 completed_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
255 seq,
256 },
257 }
258 }
259
260 #[test]
261 fn enqueue_then_scan_round_trip() {
262 let dir = TempDir::new().unwrap();
263 let q = OutboundQueue::open(dir.path()).unwrap();
264 q.enqueue(&t(1)).unwrap();
265 q.enqueue(&t(2)).unwrap();
266 q.enqueue(&t(3)).unwrap();
267 let pending = q.scan_pending().unwrap();
268 assert_eq!(pending.len(), 3);
269 assert_eq!(pending[0].seq, 1);
270 assert_eq!(pending[1].seq, 2);
271 assert_eq!(pending[2].seq, 3);
272 }
273
274 #[test]
275 fn mark_sent_removes_from_queue() {
276 let dir = TempDir::new().unwrap();
277 let q = OutboundQueue::open(dir.path()).unwrap();
278 q.enqueue(&t(1)).unwrap();
279 q.enqueue(&t(2)).unwrap();
280 q.mark_sent(&t(1)).unwrap();
281 let pending = q.scan_pending().unwrap();
282 assert_eq!(pending.len(), 1);
283 assert_eq!(pending[0].seq, 2);
284 }
285
286 #[test]
287 fn mark_sent_is_idempotent_when_file_gone() {
288 let dir = TempDir::new().unwrap();
289 let q = OutboundQueue::open(dir.path()).unwrap();
290 q.enqueue(&t(1)).unwrap();
291 q.mark_sent(&t(1)).unwrap();
292 q.mark_sent(&t(1)).unwrap();
294 }
295
296 #[test]
297 fn crash_mid_write_leaves_no_visible_event() {
298 let dir = TempDir::new().unwrap();
302 let q = OutboundQueue::open(dir.path()).unwrap();
303 let tmp_path = q
304 .dir()
305 .join("00000000000000000007-host-x-rollout-y-Kind.json.tmp");
306 std::fs::write(&tmp_path, b"partial garbage").unwrap();
307 let pending = q.scan_pending().unwrap();
308 assert!(
309 pending.is_empty(),
310 "partial .tmp must not surface as a queued event",
311 );
312 let listing: Vec<_> = std::fs::read_dir(q.dir())
314 .unwrap()
315 .filter_map(|e| e.ok())
316 .map(|e| e.file_name())
317 .collect();
318 assert_eq!(listing.len(), 1);
319 assert_eq!(
320 listing[0].to_str().unwrap(),
321 tmp_path.file_name().unwrap().to_str().unwrap()
322 );
323 }
324
325 #[test]
326 fn replay_from_seq_yields_correct_subset() {
327 let dir = TempDir::new().unwrap();
330 let q = OutboundQueue::open(dir.path()).unwrap();
331 for seq in 1..=5 {
332 q.enqueue(&t(seq)).unwrap();
333 }
334 let pending = q.scan_pending().unwrap();
335 let replay: Vec<_> = pending.into_iter().filter(|e| e.seq >= 2).collect();
336 assert_eq!(replay.len(), 4);
337 assert_eq!(replay[0].seq, 2);
338 }
339
340 #[test]
341 fn enqueue_overwrites_same_seq() {
342 let dir = TempDir::new().unwrap();
346 let q = OutboundQueue::open(dir.path()).unwrap();
347 q.enqueue(&t(1)).unwrap();
348 let mut second = t(1);
350 let perturbed_payload = nixfleet_proto::AgentEvent::ActivationCompleted {
351 observed_current_closure: "closure-b".into(),
352 exit_code: 99,
353 completed_at: Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap(),
354 seq: 1,
355 };
356 second.payload = perturbed_payload.clone();
357 q.enqueue(&second).unwrap();
358 let pending = q.scan_pending().unwrap();
359 assert_eq!(pending.len(), 1);
360 assert_eq!(pending[0].payload, perturbed_payload);
361 }
362}