nixfleet_agent/runtime/workers/
longpoll.rs

1//! Long-poll worker: holds an open `GET /v1/agent/dispatch?wait=60`
2//! against the CP. On a 200 with a Dispatch body, fetches + verifies
3//! the rollout manifest via the disk-backed [`ManifestCache`], asserts
4//! the dispatched `target_closure` matches the manifest's declaration
5//! for this host (RFC-0005 §4.1 advisory-payload contract), then emits
6//! `LocalActivate` into the reducer.
7//!
8//! First dispatch per rollout fetches `/v1/rollouts/{id}` from CP,
9//! verifies, writes through to disk. Repeat dispatches hit disk +
10//! re-verify (defense in depth). Disk cache + on-demand fetch is the
11//! model; no in-memory pre-priming step, no periodic-refresh worker.
12
13use std::time::Duration;
14
15use nixfleet_proto::clock::ClockHandle;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18
19use super::super::wire::DispatchResponse;
20use super::super::{AgentConfig, ReducerInput, ShutdownToken};
21use crate::manifest_cache::ManifestCache;
22
23/// Per-poll wait window. Plan 06's locked-in value; matches CP's
24/// `dispatch::MAX_WAIT_SECS`.
25const WAIT_SECS: u64 = 60;
26
27/// HTTP timeout = wait + 5s slack to cover request setup + response.
28const HTTP_TIMEOUT: Duration = Duration::from_secs(WAIT_SECS + 5);
29
30/// Per-error backoff floor. Real network failures should NOT hot-loop;
31/// 5s gives the CP a chance to recover (e.g. mid-rollout deploy).
32const ERROR_BACKOFF: Duration = Duration::from_secs(5);
33
34pub fn spawn(
35    cfg: AgentConfig,
36    _clock: ClockHandle,
37    input_tx: mpsc::Sender<ReducerInput>,
38    shutdown: ShutdownToken,
39) -> JoinHandle<()> {
40    tokio::spawn(async move {
41        let mut shutdown_rx = shutdown.into_inner();
42        let client = match crate::comms::build_client(
43            cfg.ca_cert.as_deref(),
44            cfg.client_cert.as_deref(),
45            cfg.client_key.as_deref(),
46        ) {
47            Ok(c) => c,
48            Err(err) => {
49                tracing::error!(
50                    target: "agent_longpoll",
51                    error = %err,
52                    "failed to build mTLS HTTP client; worker exits",
53                );
54                return;
55            }
56        };
57        let manifest_cache = ManifestCache::new_with_freshness(
58            &cfg.state_dir,
59            &cfg.trust_file,
60            std::time::Duration::from_secs(cfg.manifest_freshness_window_secs),
61        );
62        let url = format!(
63            "{}/v1/agent/dispatch?wait={}",
64            cfg.control_plane_url.trim_end_matches('/'),
65            WAIT_SECS,
66        );
67
68        loop {
69            tokio::select! {
70                biased;
71                _ = &mut shutdown_rx => {
72                    tracing::info!(
73                        target: "shutdown",
74                        task = "agent_longpoll",
75                        "task shut down",
76                    );
77                    return;
78                }
79                res = poll_once(&client, &url) => {
80                    match res {
81                        Ok(Some(dispatch)) => {
82                            if let Err(err) = handle_dispatch(&cfg, &client, &manifest_cache, &input_tx, dispatch).await {
83                                tracing::warn!(
84                                    target: "agent_longpoll",
85                                    error = %err,
86                                    "handle_dispatch failed; backing off",
87                                );
88                                tokio::time::sleep(ERROR_BACKOFF).await;
89                            }
90                        }
91                        Ok(None) => {
92                            // 204 / empty response: no work queued. Loop
93                            // immediately into the next poll; the CP's
94                            // `wait=60s` cap is the rate-limiter.
95                        }
96                        Err(err) => {
97                            tracing::warn!(
98                                target: "agent_longpoll",
99                                error = %err,
100                                "long-poll request failed; backing off",
101                            );
102                            tokio::time::sleep(ERROR_BACKOFF).await;
103                        }
104                    }
105                }
106            }
107        }
108    })
109}
110
111async fn poll_once(
112    client: &reqwest::Client,
113    url: &str,
114) -> anyhow::Result<Option<DispatchResponse>> {
115    // Per-request timeout override: long-poll waits up to 60s server-
116    // side; the request needs slack on top of that. `comms::build_client`'s
117    // 30s default would cut the long-poll off before CP could reply.
118    let resp = client.get(url).timeout(HTTP_TIMEOUT).send().await?;
119    let status = resp.status();
120    if status == reqwest::StatusCode::NO_CONTENT {
121        return Ok(None);
122    }
123    if !status.is_success() {
124        anyhow::bail!("CP returned {status}");
125    }
126    // Some endpoints return 200 with a null body when no work; treat
127    // any non-deserializable / null response as "no work pending"
128    // rather than an error.
129    let body = resp.text().await?;
130    let trimmed = body.trim();
131    if trimmed.is_empty() || trimmed == "null" {
132        return Ok(None);
133    }
134    let parsed: serde_json::Value = serde_json::from_str(trimmed)?;
135    if parsed.is_null() {
136        return Ok(None);
137    }
138    let dispatch: DispatchResponse = serde_json::from_value(parsed)?;
139    Ok(Some(dispatch))
140}
141
142async fn handle_dispatch(
143    cfg: &AgentConfig,
144    client: &reqwest::Client,
145    manifest_cache: &ManifestCache,
146    input_tx: &mpsc::Sender<ReducerInput>,
147    dispatch: DispatchResponse,
148) -> anyhow::Result<()> {
149    // Hostname guard: CP can only queue dispatches for this agent, but
150    // a misconfigured CP or a rogue intermediate could send the wrong
151    // payload. Refuse early.
152    if dispatch.hostname != cfg.machine_id {
153        anyhow::bail!(
154            "dispatch hostname={} != agent machine_id={}",
155            dispatch.hostname,
156            cfg.machine_id,
157        );
158    }
159
160    // RFC-0005 §4.1 advisory-payload contract enforced by the
161    // ManifestCache: fetches the rollout's signed manifest (cache hit
162    // re-verifies disk bytes; miss fetches from CP, verifies, writes
163    // through), then asserts the dispatched target_closure matches the
164    // manifest's declaration for this host. Any divergence aborts the
165    // dispatch before it reaches the reducer.
166    let verified = manifest_cache
167        .ensure_for_dispatch(
168            client,
169            &cfg.control_plane_url,
170            dispatch.rollout_id.as_str(),
171            &cfg.machine_id,
172            &dispatch.target_closure,
173        )
174        .await
175        .map_err(|err| {
176            anyhow::anyhow!(
177                "manifest verify (rollout={}): {}",
178                dispatch.rollout_id,
179                err.reason()
180            )
181        })?;
182
183    // D-028: defense-in-depth freshness recheck at dispatch reception time.
184    // ensure_for_dispatch's verify_artifact already enforces freshness at
185    // fetch time, but a cache hit close to the freshness edge can cross the
186    // boundary between fetch and dispatch arrival. Refuse stale dispatches
187    // here so the agent doesn't fire a switch against a manifest that has
188    // aged out — CP's planner will re-emit with a fresher manifest if CI
189    // has signed one. The freshness window matches the hardcoded 3600s the
190    // cache verify uses; channel-level operator-declared windows are a
191    // v0.2.1 followup (currently the cache layer hardcodes 3600s too, so
192    // matching values here keeps the two checks in lockstep).
193    let now = chrono::Utc::now();
194    match crate::freshness::check_signed_at(verified.signed_at(), 3600, now) {
195        crate::freshness::FreshnessCheck::Fresh => {}
196        crate::freshness::FreshnessCheck::Stale {
197            signed_at,
198            freshness_window_secs,
199            age_secs,
200        } => {
201            anyhow::bail!(
202                "dispatch refused: per-rollout manifest stale at dispatch time \
203                 (rollout={}, signed_at={signed_at}, window={freshness_window_secs}s, age={age_secs}s)",
204                dispatch.rollout_id,
205            );
206        }
207    }
208
209    // LOADBEARING: `current_closure_at_dispatch` is semantically "the
210    // closure the agent was running when CP issued this dispatch" —
211    // i.e. the rollback target if activation fails. Read it from
212    // `/run/current-system` here, NOT from `dispatch.target_closure`
213    // (which is the NEW target the dispatch is steering toward). The
214    // distinction is load-bearing for the CP-side Failed → Reverted
215    // synthesis path: on a post-rollback restart the heartbeat reports
216    // current_closure == prior closure, and the synth condition
217    // `current_closure_at_dispatch == agent_current` must match.
218    // Reading target_closure here would store the failed-target,
219    // never matching the rollback's post-restart heartbeat, and the
220    // host would stay Failed indefinitely.
221    //
222    // `None` (no `/run/current-system` symlink — fresh install, test
223    // fixture) falls back to the empty-string sentinel, same shape as
224    // CP's post-wipe `synthesize_pending_to_converged` placeholder.
225    // The rollback-target ambiguity is inert in that case: a host
226    // with no prior closure can't roll back to one.
227    let current_closure_at_dispatch =
228        crate::runtime::recovery::read_current_closure(&cfg.current_system_path)
229            .unwrap_or_default();
230
231    let event = nixfleet_state_machine::Event::LocalActivate {
232        current_closure_at_dispatch,
233        // LOADBEARING: validated dispatch target carried verbatim.
234        // `ensure_for_dispatch` above asserted this value matches the
235        // freshly-verified per-rollout manifest's declared
236        // target_closure for this host; the reducer's bootstrap
237        // consumes it without re-derivation (eliminates TOCTOU
238        // against the reducer's stale manifests snapshot).
239        target_closure: dispatch.target_closure.clone(),
240        received_at: dispatch.enqueued_at,
241        // CP-resolved soak deadline (RFC-0004 §1 invariant 1: CP is
242        // the single source of truth for the policy-resolved soak
243        // window).
244        soak_due_at: dispatch.soak_due_at,
245        seq: 0, // reducer assigns
246    };
247    input_tx
248        .send(ReducerInput::HostEvent {
249            rollout_id: dispatch.rollout_id,
250            event,
251        })
252        .await?;
253    Ok(())
254}