nixfleet_agent/runtime/workers/longpoll.rs
1//! Long-poll worker: holds an open `GET /v1/agent/dispatch?wait=60`
2//! against the CP. On a 200 with a Dispatch body, fetches + verifies
3//! the rollout manifest via the disk-backed [`ManifestCache`], asserts
4//! the dispatched `target_closure` matches the manifest's declaration
5//! for this host (RFC-0005 §4.1 advisory-payload contract), then emits
6//! `LocalActivate` into the reducer.
7//!
8//! First dispatch per rollout fetches `/v1/rollouts/{id}` from CP,
9//! verifies, writes through to disk. Repeat dispatches hit disk +
10//! re-verify (defense in depth). Disk cache + on-demand fetch is the
11//! model; no in-memory pre-priming step, no periodic-refresh worker.
12
13use std::time::Duration;
14
15use nixfleet_proto::clock::ClockHandle;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18
19use super::super::wire::DispatchResponse;
20use super::super::{AgentConfig, ReducerInput, ShutdownToken};
21use crate::manifest_cache::ManifestCache;
22
23/// Per-poll wait window. Plan 06's locked-in value; matches CP's
24/// `dispatch::MAX_WAIT_SECS`.
25const WAIT_SECS: u64 = 60;
26
27/// HTTP timeout = wait + 5s slack to cover request setup + response.
28const HTTP_TIMEOUT: Duration = Duration::from_secs(WAIT_SECS + 5);
29
30/// Per-error backoff floor. Real network failures should NOT hot-loop;
31/// 5s gives the CP a chance to recover (e.g. mid-rollout deploy).
32const ERROR_BACKOFF: Duration = Duration::from_secs(5);
33
34pub fn spawn(
35 cfg: AgentConfig,
36 _clock: ClockHandle,
37 input_tx: mpsc::Sender<ReducerInput>,
38 shutdown: ShutdownToken,
39) -> JoinHandle<()> {
40 tokio::spawn(async move {
41 let mut shutdown_rx = shutdown.into_inner();
42 let client = match crate::comms::build_client(
43 cfg.ca_cert.as_deref(),
44 cfg.client_cert.as_deref(),
45 cfg.client_key.as_deref(),
46 ) {
47 Ok(c) => c,
48 Err(err) => {
49 tracing::error!(
50 target: "agent_longpoll",
51 error = %err,
52 "failed to build mTLS HTTP client; worker exits",
53 );
54 return;
55 }
56 };
57 let manifest_cache = ManifestCache::new_with_freshness(
58 &cfg.state_dir,
59 &cfg.trust_file,
60 std::time::Duration::from_secs(cfg.manifest_freshness_window_secs),
61 );
62 let url = format!(
63 "{}/v1/agent/dispatch?wait={}",
64 cfg.control_plane_url.trim_end_matches('/'),
65 WAIT_SECS,
66 );
67
68 loop {
69 tokio::select! {
70 biased;
71 _ = &mut shutdown_rx => {
72 tracing::info!(
73 target: "shutdown",
74 task = "agent_longpoll",
75 "task shut down",
76 );
77 return;
78 }
79 res = poll_once(&client, &url) => {
80 match res {
81 Ok(Some(dispatch)) => {
82 if let Err(err) = handle_dispatch(&cfg, &client, &manifest_cache, &input_tx, dispatch).await {
83 tracing::warn!(
84 target: "agent_longpoll",
85 error = %err,
86 "handle_dispatch failed; backing off",
87 );
88 tokio::time::sleep(ERROR_BACKOFF).await;
89 }
90 }
91 Ok(None) => {
92 // 204 / empty response: no work queued. Loop
93 // immediately into the next poll; the CP's
94 // `wait=60s` cap is the rate-limiter.
95 }
96 Err(err) => {
97 tracing::warn!(
98 target: "agent_longpoll",
99 error = %err,
100 "long-poll request failed; backing off",
101 );
102 tokio::time::sleep(ERROR_BACKOFF).await;
103 }
104 }
105 }
106 }
107 }
108 })
109}
110
111async fn poll_once(
112 client: &reqwest::Client,
113 url: &str,
114) -> anyhow::Result<Option<DispatchResponse>> {
115 // Per-request timeout override: long-poll waits up to 60s server-
116 // side; the request needs slack on top of that. `comms::build_client`'s
117 // 30s default would cut the long-poll off before CP could reply.
118 let resp = client.get(url).timeout(HTTP_TIMEOUT).send().await?;
119 let status = resp.status();
120 if status == reqwest::StatusCode::NO_CONTENT {
121 return Ok(None);
122 }
123 if !status.is_success() {
124 anyhow::bail!("CP returned {status}");
125 }
126 // Some endpoints return 200 with a null body when no work; treat
127 // any non-deserializable / null response as "no work pending"
128 // rather than an error.
129 let body = resp.text().await?;
130 let trimmed = body.trim();
131 if trimmed.is_empty() || trimmed == "null" {
132 return Ok(None);
133 }
134 let parsed: serde_json::Value = serde_json::from_str(trimmed)?;
135 if parsed.is_null() {
136 return Ok(None);
137 }
138 let dispatch: DispatchResponse = serde_json::from_value(parsed)?;
139 Ok(Some(dispatch))
140}
141
142async fn handle_dispatch(
143 cfg: &AgentConfig,
144 client: &reqwest::Client,
145 manifest_cache: &ManifestCache,
146 input_tx: &mpsc::Sender<ReducerInput>,
147 dispatch: DispatchResponse,
148) -> anyhow::Result<()> {
149 // Hostname guard: CP can only queue dispatches for this agent, but
150 // a misconfigured CP or a rogue intermediate could send the wrong
151 // payload. Refuse early.
152 if dispatch.hostname != cfg.machine_id {
153 anyhow::bail!(
154 "dispatch hostname={} != agent machine_id={}",
155 dispatch.hostname,
156 cfg.machine_id,
157 );
158 }
159
160 // RFC-0005 §4.1 advisory-payload contract enforced by the
161 // ManifestCache: fetches the rollout's signed manifest (cache hit
162 // re-verifies disk bytes; miss fetches from CP, verifies, writes
163 // through), then asserts the dispatched target_closure matches the
164 // manifest's declaration for this host. Any divergence aborts the
165 // dispatch before it reaches the reducer.
166 let verified = manifest_cache
167 .ensure_for_dispatch(
168 client,
169 &cfg.control_plane_url,
170 dispatch.rollout_id.as_str(),
171 &cfg.machine_id,
172 &dispatch.target_closure,
173 )
174 .await
175 .map_err(|err| {
176 anyhow::anyhow!(
177 "manifest verify (rollout={}): {}",
178 dispatch.rollout_id,
179 err.reason()
180 )
181 })?;
182
183 // D-028: defense-in-depth freshness recheck at dispatch reception time.
184 // ensure_for_dispatch's verify_artifact already enforces freshness at
185 // fetch time, but a cache hit close to the freshness edge can cross the
186 // boundary between fetch and dispatch arrival. Refuse stale dispatches
187 // here so the agent doesn't fire a switch against a manifest that has
188 // aged out — CP's planner will re-emit with a fresher manifest if CI
189 // has signed one. The freshness window matches the hardcoded 3600s the
190 // cache verify uses; channel-level operator-declared windows are a
191 // v0.2.1 followup (currently the cache layer hardcodes 3600s too, so
192 // matching values here keeps the two checks in lockstep).
193 let now = chrono::Utc::now();
194 match crate::freshness::check_signed_at(verified.signed_at(), 3600, now) {
195 crate::freshness::FreshnessCheck::Fresh => {}
196 crate::freshness::FreshnessCheck::Stale {
197 signed_at,
198 freshness_window_secs,
199 age_secs,
200 } => {
201 anyhow::bail!(
202 "dispatch refused: per-rollout manifest stale at dispatch time \
203 (rollout={}, signed_at={signed_at}, window={freshness_window_secs}s, age={age_secs}s)",
204 dispatch.rollout_id,
205 );
206 }
207 }
208
209 // LOADBEARING: `current_closure_at_dispatch` is semantically "the
210 // closure the agent was running when CP issued this dispatch" —
211 // i.e. the rollback target if activation fails. Read it from
212 // `/run/current-system` here, NOT from `dispatch.target_closure`
213 // (which is the NEW target the dispatch is steering toward). The
214 // distinction is load-bearing for the CP-side Failed → Reverted
215 // synthesis path: on a post-rollback restart the heartbeat reports
216 // current_closure == prior closure, and the synth condition
217 // `current_closure_at_dispatch == agent_current` must match.
218 // Reading target_closure here would store the failed-target,
219 // never matching the rollback's post-restart heartbeat, and the
220 // host would stay Failed indefinitely.
221 //
222 // `None` (no `/run/current-system` symlink — fresh install, test
223 // fixture) falls back to the empty-string sentinel, same shape as
224 // CP's post-wipe `synthesize_pending_to_converged` placeholder.
225 // The rollback-target ambiguity is inert in that case: a host
226 // with no prior closure can't roll back to one.
227 let current_closure_at_dispatch =
228 crate::runtime::recovery::read_current_closure(&cfg.current_system_path)
229 .unwrap_or_default();
230
231 let event = nixfleet_state_machine::Event::LocalActivate {
232 current_closure_at_dispatch,
233 // LOADBEARING: validated dispatch target carried verbatim.
234 // `ensure_for_dispatch` above asserted this value matches the
235 // freshly-verified per-rollout manifest's declared
236 // target_closure for this host; the reducer's bootstrap
237 // consumes it without re-derivation (eliminates TOCTOU
238 // against the reducer's stale manifests snapshot).
239 target_closure: dispatch.target_closure.clone(),
240 received_at: dispatch.enqueued_at,
241 // CP-resolved soak deadline (RFC-0004 §1 invariant 1: CP is
242 // the single source of truth for the policy-resolved soak
243 // window).
244 soak_due_at: dispatch.soak_due_at,
245 seq: 0, // reducer assigns
246 };
247 input_tx
248 .send(ReducerInput::HostEvent {
249 rollout_id: dispatch.rollout_id,
250 event,
251 })
252 .await?;
253 Ok(())
254}