nixfleet_agent/runtime/workers/manifest_poll.rs
1//! Periodic agent-side manifest poll. Mirrors CP's
2//! `runtime::workers::manifest_poll`: every 30s the worker fetches
3//! `/v1/fleet.resolved` + each channel's `/v1/rollouts/{id}` from CP,
4//! verifies signatures + cross-checks each rollout's
5//! `fleet_resolved_hash` against the canonical hash of the just-fetched
6//! fleet, then emits `ReducerInput::ManifestSetUpdated` so the reducer's
7//! cached `SignedManifestSet` reflects the current fleet snapshot.
8//!
9//! First tick fires at `Instant::now()` (no startup delay), so the
10//! reducer's `manifests` cache is populated before the longpoll worker
11//! receives its first `Dispatch`. Subsequent ticks every 30s; on tick
12//! failure the worker retains the prior emit and retries next tick.
13//!
14//! Failure semantics (matching CP's manifest_poll):
15//! - Fleet fetch/verify failed -> skip the entire tick; reducer retains
16//! its prior cache.
17//! - Per-rollout fetch/verify failed OR cross-check mismatch -> log +
18//! skip that channel only; emit the partial set.
19//! - Reducer channel closed (cancel propagated) -> exit.
20
21use std::collections::HashMap;
22use std::time::Duration;
23
24use nixfleet_proto::clock::ClockHandle;
25use nixfleet_reconciler::compute_rollout_id_for_channel;
26use nixfleet_reconciler::planner_types::SignedManifestSet;
27use tokio::sync::mpsc;
28use tokio::task::JoinHandle;
29
30use super::super::{AgentConfig, ReducerInput, ShutdownToken};
31use crate::manifest_cache::ManifestCache;
32
33/// Poll cadence. Matches CP's `runtime::workers::manifest_poll::POLL_INTERVAL`.
34const POLL_INTERVAL: Duration = Duration::from_secs(30);
35
36pub fn spawn(
37 cfg: AgentConfig,
38 clock: ClockHandle,
39 input_tx: mpsc::Sender<ReducerInput>,
40 shutdown: ShutdownToken,
41) -> JoinHandle<()> {
42 let trust_path = cfg.trust_file.clone();
43 spawn_with_trust_path(cfg, trust_path, clock, input_tx, shutdown)
44}
45
46/// Tunable-trust-path entry point. Production code calls [`spawn`] which
47/// threads `AgentConfig::trust_file` (the `--trust-file` CLI arg);
48/// integration tests under the `test-helpers` feature gate call this
49/// variant with a tempdir-rooted trust.json so the worker can run
50/// without touching
51/// `/etc/nixfleet/agent/`. Same convention as
52/// `nixfleet_agent::runtime::ShutdownToken::__test_only_from_rx`.
53pub fn spawn_with_trust_path(
54 cfg: AgentConfig,
55 trust_path: std::path::PathBuf,
56 _clock: ClockHandle,
57 input_tx: mpsc::Sender<ReducerInput>,
58 shutdown: ShutdownToken,
59) -> JoinHandle<()> {
60 tokio::spawn(async move {
61 let mut shutdown_rx = shutdown.into_inner();
62 let client = match crate::comms::build_client(
63 cfg.ca_cert.as_deref(),
64 cfg.client_cert.as_deref(),
65 cfg.client_key.as_deref(),
66 ) {
67 Ok(c) => c,
68 Err(err) => {
69 tracing::error!(
70 target: "agent_manifest_poll",
71 error = %err,
72 "failed to build mTLS HTTP client; worker exits",
73 );
74 return;
75 }
76 };
77 let manifest_cache = ManifestCache::new_with_freshness(
78 &cfg.state_dir,
79 &trust_path,
80 std::time::Duration::from_secs(cfg.manifest_freshness_window_secs),
81 );
82 let cp_url = cfg.control_plane_url.clone();
83
84 // First tick at Instant::now() (no startup delay) so the reducer's
85 // manifest cache is populated before longpoll's first dispatch can
86 // arrive. Subsequent ticks every POLL_INTERVAL.
87 let mut ticker = tokio::time::interval(POLL_INTERVAL);
88 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
89
90 loop {
91 tokio::select! {
92 biased;
93 _ = &mut shutdown_rx => {
94 tracing::info!(
95 target: "shutdown",
96 task = "agent_manifest_poll",
97 "task shut down",
98 );
99 return;
100 }
101 _ = ticker.tick() => {}
102 }
103
104 match tick_once(&manifest_cache, &client, &cp_url).await {
105 Ok(set) => {
106 if let Err(err) = input_tx
107 .send(ReducerInput::ManifestSetUpdated(Box::new(set)))
108 .await
109 {
110 tracing::warn!(
111 target: "agent_manifest_poll",
112 error = %err,
113 "reducer input channel closed; worker exits",
114 );
115 return;
116 }
117 }
118 Err(err) => {
119 tracing::warn!(
120 target: "agent_manifest_poll",
121 error = %err,
122 "tick failed; retaining previous reducer cache",
123 );
124 }
125 }
126 }
127 })
128}
129
130async fn tick_once(
131 manifest_cache: &ManifestCache,
132 client: &reqwest::Client,
133 cp_url: &str,
134) -> anyhow::Result<SignedManifestSet> {
135 // 1. Fetch + verify fleet.resolved. `fetch_or_load_fleet` returns the
136 // canonical hash of the artifact bytes paired with the Verified
137 // struct so the per-rollout discriminator below has its anchor.
138 let (verified_fleet, fleet_hash) = manifest_cache
139 .fetch_or_load_fleet(client, cp_url)
140 .await
141 .map_err(|err| anyhow::anyhow!("fetch fleet: {}", err.reason()))?;
142
143 // 2. For each channel in the verified fleet, derive the expected
144 // rollout_id, fetch + verify, and cross-check the discriminator.
145 let mut rollouts = HashMap::new();
146 for channel in verified_fleet.inner().channels.keys() {
147 let rollout_id =
148 match compute_rollout_id_for_channel(verified_fleet.inner(), &fleet_hash, channel) {
149 Ok(Some(id)) => id,
150 Ok(None) => continue, // channel has no host with a closure yet
151 Err(err) => {
152 tracing::warn!(
153 target: "agent_manifest_poll",
154 %channel,
155 error = %err,
156 "compute_rollout_id_for_channel failed; skipping channel",
157 );
158 continue;
159 }
160 };
161
162 let verified_rollout = match manifest_cache
163 .fetch_or_load(client, cp_url, &rollout_id)
164 .await
165 {
166 Ok(v) => v,
167 Err(err) => {
168 tracing::warn!(
169 target: "agent_manifest_poll",
170 %channel,
171 %rollout_id,
172 error = %err.reason(),
173 "fetch rollout manifest failed; skipping channel",
174 );
175 continue;
176 }
177 };
178
179 // Discriminator: the rollout's pinned fleet_resolved_hash MUST
180 // match the canonical hash of the fleet we just fetched. A
181 // mismatch means CP is serving inconsistent state across the
182 // two artifacts (e.g. fresh fleet snapshot but stale rollout
183 // pin, or vice versa). Defense-in-depth over signature verify;
184 // mirrors the architect's amendment on the d010-feed plan
185 // restated under Option C: discriminator lives at the worker
186 // boundary as a cross-consistency check.
187 if verified_rollout.inner().fleet_resolved_hash != fleet_hash {
188 tracing::warn!(
189 target: "agent_manifest_poll",
190 %channel,
191 %rollout_id,
192 rollout_pin = %verified_rollout.inner().fleet_resolved_hash,
193 fleet_hash = %fleet_hash,
194 "rollout manifest pins a different fleet snapshot than the one CP served; skipping",
195 );
196 continue;
197 }
198
199 rollouts.insert(channel.clone(), verified_rollout);
200 }
201
202 Ok(SignedManifestSet::new(verified_fleet, rollouts))
203}