nixfleet_agent/runtime/workers/
outbound.rs

1//! Outbound queue drainer. Periodically scans
2//! `{state_dir}/outbound-queue/` and POSTs each pending event to
3//! `/v1/agent/events`, deleting on success. Acts as the network-side
4//! companion to the applier's `LocalEmitEvent` writes.
5//!
6//! On CP unreachable / network failure the file stays on disk; the
7//! next drain tick retries. The heartbeat worker's
8//! `X-Nixfleet-Replay-From` handler triggers an immediate drain pass
9//! by sending into the [`DrainKick`] channel.
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use tokio::sync::{mpsc, watch};
15use tokio::task::JoinHandle;
16
17use super::super::outbound_queue::{OutboundQueue, QueuedEvent};
18use super::super::{AgentConfig, ReducerInput, ShutdownToken};
19
20/// Outer cadence. Plan 07's locked-in decision keeps the drain
21/// hot-loop-free: a single tick per N seconds, kicked by the
22/// `DrainKick` channel when something new lands or when the
23/// heartbeat worker observed CP drift.
24const DRAIN_INTERVAL: Duration = Duration::from_secs(15);
25
26/// HTTP timeout per POST. CP returns 204 fast (in-memory dispatch
27/// channel push); 10s is generous slack.
28const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
29
30pub type DrainKick = watch::Sender<()>;
31
32pub fn spawn(
33    cfg: AgentConfig,
34    queue: Arc<OutboundQueue>,
35    mut kick_rx: watch::Receiver<()>,
36    _input_tx: mpsc::Sender<ReducerInput>,
37    shutdown: ShutdownToken,
38) -> JoinHandle<()> {
39    tokio::spawn(async move {
40        let mut shutdown_rx = shutdown.into_inner();
41        let client = match crate::comms::build_client(
42            cfg.ca_cert.as_deref(),
43            cfg.client_cert.as_deref(),
44            cfg.client_key.as_deref(),
45        ) {
46            Ok(c) => c,
47            Err(err) => {
48                tracing::error!(
49                    target: "agent_outbound",
50                    error = %err,
51                    "failed to build mTLS HTTP client; worker exits",
52                );
53                return;
54            }
55        };
56        let url = format!(
57            "{}/v1/agent/events",
58            cfg.control_plane_url.trim_end_matches('/'),
59        );
60        let mut ticker = tokio::time::interval(DRAIN_INTERVAL);
61        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63        loop {
64            tokio::select! {
65                biased;
66                _ = &mut shutdown_rx => {
67                    tracing::info!(
68                        target: "shutdown",
69                        task = "agent_outbound",
70                        "task shut down",
71                    );
72                    return;
73                }
74                _ = ticker.tick() => {
75                    drain_once(&client, &url, &queue).await;
76                }
77                changed = kick_rx.changed() => {
78                    if changed.is_err() {
79                        // All senders dropped — runtime tearing down.
80                        return;
81                    }
82                    drain_once(&client, &url, &queue).await;
83                }
84            }
85        }
86    })
87}
88
89async fn drain_once(client: &reqwest::Client, url: &str, queue: &OutboundQueue) {
90    let pending = match queue.scan_pending() {
91        Ok(p) => p,
92        Err(err) => {
93            tracing::warn!(
94                target: "agent_outbound",
95                error = %err,
96                "scan_pending failed; will retry next tick",
97            );
98            return;
99        }
100    };
101    for event in pending {
102        if let Err(err) = post_one(client, url, &event).await {
103            tracing::warn!(
104                target: "agent_outbound",
105                seq = event.seq,
106                kind = %event.event_kind,
107                error = %err,
108                "POST /v1/agent/events failed; leaving on disk for retry",
109            );
110            // Stop on first failure — CP is likely unreachable; later
111            // events in the batch will fail for the same reason.
112            return;
113        }
114        if let Err(err) = queue.mark_sent(&event) {
115            tracing::warn!(
116                target: "agent_outbound",
117                seq = event.seq,
118                error = %err,
119                "mark_sent failed (file remove); will retry — POST is idempotent at CP via (host, rollout, seq) dedup",
120            );
121        }
122    }
123}
124
125async fn post_one(client: &reqwest::Client, url: &str, event: &QueuedEvent) -> anyhow::Result<()> {
126    // Construct the typed wire envelope. Both sides import the same
127    // type from `nixfleet-proto`, so the camelCase / snake_case
128    // discriminator drift that previously bit `rollout_id` vs
129    // `rolloutId` is now resolved at the serde-derive layer rather
130    // than at a hand-built JSON map.
131    let envelope = nixfleet_proto::AgentEventEnvelope {
132        hostname: event.hostname.clone(),
133        rollout_id: event.rollout_id.clone(),
134        event: event.payload.clone(),
135        signature: None,
136    };
137    // Per-request timeout override: comms::build_client uses 30s default;
138    // outbound POSTs to /v1/agent/events insist on the 10s fail-fast cap.
139    let resp = client
140        .post(url)
141        .timeout(HTTP_TIMEOUT)
142        .json(&envelope)
143        .send()
144        .await?;
145    let status = resp.status();
146    if !status.is_success() {
147        anyhow::bail!("CP returned {status}");
148    }
149    Ok(())
150}