nixfleet_control_plane/runtime/workers/
manifest_poll.rs1use std::collections::HashMap;
25use std::path::PathBuf;
26use std::sync::Arc;
27use std::time::Duration;
28
29use nixfleet_proto::clock::ClockHandle;
30use nixfleet_reconciler::planner_types::SignedManifestSet;
31use nixfleet_reconciler::{
32 VerifiedFleet, VerifiedRolloutManifest, compute_rollout_id_for_channel, verify_artifact,
33 verify_rollout_manifest,
34};
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37
38use super::super::{ReducerInput, ShutdownToken};
39use crate::polling::signed_fetch;
40use crate::rollouts_source::RolloutsSource;
41use crate::server::{AppState, VerifiedFleetSnapshot};
42
43const POLL_INTERVAL: Duration = Duration::from_secs(30);
45
46#[derive(Debug, Clone)]
53pub struct ChannelRefsSource {
54 pub artifact_url: String,
55 pub signature_url: String,
56 pub token_file: Option<PathBuf>,
57 pub trust_path: PathBuf,
58 pub freshness_window: Duration,
59}
60
61pub fn spawn(
62 state: Arc<AppState>,
63 clock: ClockHandle,
64 input_tx: mpsc::Sender<ReducerInput>,
65 shutdown: ShutdownToken,
66) -> JoinHandle<()> {
67 tokio::spawn(async move {
68 let mut shutdown_rx = shutdown.into_inner();
69
70 if state.channel_refs_source.is_none() {
71 tracing::info!(
72 target: "cp_manifest_poll",
73 "manifest_poll: no channel_refs_source configured; idling",
74 );
75 let _ = (&mut shutdown_rx).await;
76 return;
77 }
78
79 let client = signed_fetch::build_client();
80 let mut ticker = tokio::time::interval(POLL_INTERVAL);
91 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
92
93 loop {
94 tokio::select! {
95 biased;
96 _ = &mut shutdown_rx => {
97 tracing::info!(
98 target: "shutdown",
99 task = "cp_manifest_poll",
100 "task shut down",
101 );
102 return;
103 }
104 _ = ticker.tick() => {}
105 }
106
107 match poll_once(&client, &state, &clock).await {
108 Ok(Some((set, fleet_snapshot))) => {
109 publish_verified_fleet(&state, fleet_snapshot).await;
110 if let Err(err) = input_tx
111 .send(ReducerInput::ManifestSetUpdated(Box::new(set)))
112 .await
113 {
114 tracing::warn!(
115 target: "cp_manifest_poll",
116 error = %err,
117 "manifest_poll: reducer input channel closed",
118 );
119 return;
120 }
121 }
122 Ok(None) => {
123 }
126 Err(err) => {
127 tracing::warn!(
128 target: "cp_manifest_poll",
129 error = %err,
130 "manifest_poll: tick failed; retaining previous cache",
131 );
132 }
133 }
134 }
135 })
136}
137
138pub async fn prime_blocking(state: &Arc<AppState>, clock: &ClockHandle) -> anyhow::Result<bool> {
146 if state.channel_refs_source.is_none() {
147 return Ok(false);
148 }
149 let client = signed_fetch::build_client();
150 match poll_once(&client, state, clock).await? {
151 Some((_set, snapshot)) => {
152 publish_verified_fleet(state, snapshot).await;
153 Ok(true)
154 }
155 None => Ok(false),
156 }
157}
158
159async fn publish_verified_fleet(state: &Arc<AppState>, snapshot: VerifiedFleetSnapshot) {
160 *state.verified_fleet.write().await = Some(snapshot);
161 state
162 .artifact_primed
163 .store(true, std::sync::atomic::Ordering::Release);
164}
165
166async fn poll_once(
167 client: &reqwest::Client,
168 state: &Arc<AppState>,
169 clock: &ClockHandle,
170) -> anyhow::Result<Option<(SignedManifestSet, VerifiedFleetSnapshot)>> {
171 let Some(channel_refs_source) = state.channel_refs_source.as_ref() else {
172 return Ok(None);
173 };
174
175 let now = clock.now();
176 let (trusted_keys, reject_before) =
177 signed_fetch::read_trust_roots(&channel_refs_source.trust_path, now)?;
178 let token = signed_fetch::read_token(channel_refs_source.token_file.as_deref())?;
179
180 let (artifact_bytes, signature_bytes) = signed_fetch::fetch_signed_pair(
182 client,
183 &channel_refs_source.artifact_url,
184 &channel_refs_source.signature_url,
185 token.as_deref(),
186 )
187 .await?;
188
189 let fleet: VerifiedFleet = verify_artifact(
190 &artifact_bytes,
191 &signature_bytes,
192 &trusted_keys,
193 now,
194 channel_refs_source.freshness_window,
195 reject_before,
196 )
197 .map_err(|e| anyhow::anyhow!("verify_artifact: {e:?}"))?;
198
199 let fleet_hash = nixfleet_reconciler::canonical_hash_from_bytes(&artifact_bytes)
204 .map_err(|e| anyhow::anyhow!("canonical_hash_from_bytes: {e:?}"))?;
205
206 let mut rollouts: HashMap<String, VerifiedRolloutManifest> = HashMap::new();
208 if let Some(rollouts_source) = state.rollouts_source.as_ref() {
209 for channel in fleet.inner().channels.keys() {
210 match fetch_and_verify_channel_manifest(
211 rollouts_source,
212 fleet.inner(),
213 &fleet_hash,
214 channel,
215 &trusted_keys,
216 now,
217 channel_refs_source.freshness_window,
218 reject_before,
219 )
220 .await
221 {
222 Ok(Some(verified)) => {
223 rollouts.insert(channel.clone(), verified);
224 }
225 Ok(None) => {}
226 Err(err) => {
227 tracing::warn!(
228 target: "cp_manifest_poll",
229 %channel,
230 error = %err,
231 "manifest_poll: rollout manifest fetch/verify failed; \
232 omitting channel from SignedManifestSet (next tick retries)",
233 );
234 }
235 }
236 }
237 } else {
238 tracing::debug!(
239 target: "cp_manifest_poll",
240 "manifest_poll: no rollouts_source configured; emitting SignedManifestSet with empty rollouts (plan_next emits no actions)",
241 );
242 }
243
244 let snapshot = VerifiedFleetSnapshot {
245 fleet: Arc::new(fleet.inner().clone()),
246 fleet_resolved_hash: fleet_hash,
247 artifact_bytes,
248 signature_bytes,
249 };
250 let set = SignedManifestSet::new(fleet, rollouts);
251 Ok(Some((set, snapshot)))
252}
253
254#[allow(clippy::too_many_arguments)]
255async fn fetch_and_verify_channel_manifest(
256 rollouts_source: &RolloutsSource,
257 fleet: &nixfleet_proto::FleetResolved,
258 fleet_hash: &str,
259 channel: &str,
260 trusted_keys: &[nixfleet_proto::TrustedPubkey],
261 now: chrono::DateTime<chrono::Utc>,
262 freshness_window: Duration,
263 reject_before: Option<chrono::DateTime<chrono::Utc>>,
264) -> anyhow::Result<Option<VerifiedRolloutManifest>> {
265 let rollout_id = match compute_rollout_id_for_channel(fleet, fleet_hash, channel)? {
266 Some(id) => id,
267 None => return Ok(None),
268 };
269 let (manifest_bytes, signature_bytes) = rollouts_source.fetch_pair(&rollout_id).await?;
270 let verified = verify_rollout_manifest(
271 &manifest_bytes,
272 &signature_bytes,
273 trusted_keys,
274 now,
275 freshness_window,
276 reject_before,
277 )
278 .map_err(|e| anyhow::anyhow!("verify_rollout_manifest({channel}): {e:?}"))?;
279 let m = verified.inner();
280 check_rollout_id_discriminator(&m.channel, &m.channel_ref, &rollout_id)
281 .map_err(|e| anyhow::anyhow!("{channel}: {e}"))?;
282 Ok(Some(verified))
283}
284
285fn check_rollout_id_discriminator(
293 manifest_channel: &str,
294 manifest_channel_ref: &str,
295 requested: &str,
296) -> anyhow::Result<()> {
297 let parsed = nixfleet_proto::RolloutId::new(manifest_channel, manifest_channel_ref);
298 if parsed.as_str() != requested {
299 return Err(anyhow::anyhow!(
300 "rollout_id discriminator failed: requested {requested} but signed manifest declares {parsed}"
301 ));
302 }
303 Ok(())
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn discriminator_accepts_canonical_match() {
312 check_rollout_id_discriminator("stable", "abc1234deadbeef", "stable@abc1234deadbeef")
313 .expect("canonical id matches");
314 }
315
316 #[test]
317 fn discriminator_rejects_channel_substitution() {
318 let err =
319 check_rollout_id_discriminator("edge", "abc1234deadbeef", "stable@abc1234deadbeef")
320 .expect_err("channel substitution rejected");
321 let msg = err.to_string();
322 assert!(msg.contains("discriminator failed"), "msg: {msg}");
323 assert!(msg.contains("stable@abc1234deadbeef"), "msg: {msg}");
324 assert!(msg.contains("edge@abc1234deadbeef"), "msg: {msg}");
325 }
326
327 #[test]
328 fn discriminator_rejects_ref_substitution() {
329 let err =
330 check_rollout_id_discriminator("stable", "def5678feedface", "stable@abc1234deadbeef")
331 .expect_err("ref substitution rejected");
332 let msg = err.to_string();
333 assert!(msg.contains("discriminator failed"), "msg: {msg}");
334 assert!(msg.contains("stable@abc1234deadbeef"), "msg: {msg}");
335 assert!(msg.contains("stable@def5678feedface"), "msg: {msg}");
336 }
337
338 #[test]
339 fn discriminator_rejects_legacy_hex_only_request() {
340 let legacy_hex = "a".repeat(64);
344 let err = check_rollout_id_discriminator("stable", "abc1234", &legacy_hex)
345 .expect_err("legacy hex-only request rejected");
346 assert!(err.to_string().contains("discriminator failed"));
347 }
348}