nixfleet_agent/runtime/workers/
outbound.rs1use std::sync::Arc;
12use std::time::Duration;
13
14use tokio::sync::{mpsc, watch};
15use tokio::task::JoinHandle;
16
17use super::super::outbound_queue::{OutboundQueue, QueuedEvent};
18use super::super::{AgentConfig, ReducerInput, ShutdownToken};
19
20const DRAIN_INTERVAL: Duration = Duration::from_secs(15);
25
26const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
29
30pub type DrainKick = watch::Sender<()>;
31
32pub fn spawn(
33 cfg: AgentConfig,
34 queue: Arc<OutboundQueue>,
35 mut kick_rx: watch::Receiver<()>,
36 _input_tx: mpsc::Sender<ReducerInput>,
37 shutdown: ShutdownToken,
38) -> JoinHandle<()> {
39 tokio::spawn(async move {
40 let mut shutdown_rx = shutdown.into_inner();
41 let client = match crate::comms::build_client(
42 cfg.ca_cert.as_deref(),
43 cfg.client_cert.as_deref(),
44 cfg.client_key.as_deref(),
45 ) {
46 Ok(c) => c,
47 Err(err) => {
48 tracing::error!(
49 target: "agent_outbound",
50 error = %err,
51 "failed to build mTLS HTTP client; worker exits",
52 );
53 return;
54 }
55 };
56 let url = format!(
57 "{}/v1/agent/events",
58 cfg.control_plane_url.trim_end_matches('/'),
59 );
60 let mut ticker = tokio::time::interval(DRAIN_INTERVAL);
61 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63 loop {
64 tokio::select! {
65 biased;
66 _ = &mut shutdown_rx => {
67 tracing::info!(
68 target: "shutdown",
69 task = "agent_outbound",
70 "task shut down",
71 );
72 return;
73 }
74 _ = ticker.tick() => {
75 drain_once(&client, &url, &queue).await;
76 }
77 changed = kick_rx.changed() => {
78 if changed.is_err() {
79 return;
81 }
82 drain_once(&client, &url, &queue).await;
83 }
84 }
85 }
86 })
87}
88
89async fn drain_once(client: &reqwest::Client, url: &str, queue: &OutboundQueue) {
90 let pending = match queue.scan_pending() {
91 Ok(p) => p,
92 Err(err) => {
93 tracing::warn!(
94 target: "agent_outbound",
95 error = %err,
96 "scan_pending failed; will retry next tick",
97 );
98 return;
99 }
100 };
101 for event in pending {
102 if let Err(err) = post_one(client, url, &event).await {
103 tracing::warn!(
104 target: "agent_outbound",
105 seq = event.seq,
106 kind = %event.event_kind,
107 error = %err,
108 "POST /v1/agent/events failed; leaving on disk for retry",
109 );
110 return;
113 }
114 if let Err(err) = queue.mark_sent(&event) {
115 tracing::warn!(
116 target: "agent_outbound",
117 seq = event.seq,
118 error = %err,
119 "mark_sent failed (file remove); will retry — POST is idempotent at CP via (host, rollout, seq) dedup",
120 );
121 }
122 }
123}
124
125async fn post_one(client: &reqwest::Client, url: &str, event: &QueuedEvent) -> anyhow::Result<()> {
126 let envelope = nixfleet_proto::AgentEventEnvelope {
132 hostname: event.hostname.clone(),
133 rollout_id: event.rollout_id.clone(),
134 event: event.payload.clone(),
135 signature: None,
136 };
137 let resp = client
140 .post(url)
141 .timeout(HTTP_TIMEOUT)
142 .json(&envelope)
143 .send()
144 .await?;
145 let status = resp.status();
146 if !status.is_success() {
147 anyhow::bail!("CP returned {status}");
148 }
149 Ok(())
150}