nixfleet_control_plane/runtime/workers/
manifest_poll.rs

1//! Manifest poll worker: verifies channel-refs (fleet.resolved) + per-channel
2//! rollout manifests on a 30s tick. Two side effects per successful tick:
3//!
4//!   1. Emits [`super::super::ReducerInput::ManifestSetUpdated`] so the
5//!      reducer task refreshes its cached `SignedManifestSet` and re-runs
6//!      `plan_next`.
7//!   2. Writes the verified [`crate::server::VerifiedFleetSnapshot`] into
8//!      `state.verified_fleet`. The fleet manifest exists in two homes
9//!      because the legacy operator-API routes (`channel_status`,
10//!      `whoami`, the rollouts proxy) still read from `state.verified_fleet`
11//!      synchronously; consolidating onto the runtime cache is a separate
12//!      effort. Both writes happen in this worker so the two views stay
13//!      in lock-step.
14//!
15//! Failure semantics:
16//! - Fleet verify fails → skip this tick entirely; the reducer keeps using
17//!   its prior cache and `state.verified_fleet` retains the prior snapshot.
18//! - Per-channel rollout-manifest verify fails → log + skip that channel
19//!   only. The emitted `SignedManifestSet` is partial; `plan_next` simply
20//!   doesn't act on missing channels until next poll.
21//! - Reducer channel send fails (closed/full) → log + skip the emit. Full
22//!   is unusual at 30s cadence; closed means CP is shutting down.
23
24use std::collections::HashMap;
25use std::path::PathBuf;
26use std::sync::Arc;
27use std::time::Duration;
28
29use nixfleet_proto::clock::ClockHandle;
30use nixfleet_reconciler::planner_types::SignedManifestSet;
31use nixfleet_reconciler::{
32    VerifiedFleet, VerifiedRolloutManifest, compute_rollout_id_for_channel, verify_artifact,
33    verify_rollout_manifest,
34};
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37
38use super::super::{ReducerInput, ShutdownToken};
39use crate::polling::signed_fetch;
40use crate::rollouts_source::RolloutsSource;
41use crate::server::{AppState, VerifiedFleetSnapshot};
42
43/// Poll cadence.
44const POLL_INTERVAL: Duration = Duration::from_secs(30);
45
46/// Forge URLs + trust path the manifest-poll worker reads on every tick.
47/// CLI builds this from `--channel-refs-{artifact,signature}-url`,
48/// `--channel-refs-token-file`, `--trust-file`, `--freshness-window-secs`.
49///
50/// Token + trust files are re-read on every poll so rotation propagates
51/// without restart.
52#[derive(Debug, Clone)]
53pub struct ChannelRefsSource {
54    pub artifact_url: String,
55    pub signature_url: String,
56    pub token_file: Option<PathBuf>,
57    pub trust_path: PathBuf,
58    pub freshness_window: Duration,
59}
60
61pub fn spawn(
62    state: Arc<AppState>,
63    clock: ClockHandle,
64    input_tx: mpsc::Sender<ReducerInput>,
65    shutdown: ShutdownToken,
66) -> JoinHandle<()> {
67    tokio::spawn(async move {
68        let mut shutdown_rx = shutdown.into_inner();
69
70        if state.channel_refs_source.is_none() {
71            tracing::info!(
72                target: "cp_manifest_poll",
73                "manifest_poll: no channel_refs_source configured; idling",
74            );
75            let _ = (&mut shutdown_rx).await;
76            return;
77        }
78
79        let client = signed_fetch::build_client();
80        // First tick fires immediately so the reducer's `manifests` cache
81        // is populated before agents can POST HostEvents into the
82        // listener. `prime_blocking` (above) populates
83        // `state.verified_fleet` for the /v1/fleet.resolved route +
84        // dispatch planning, but it does NOT emit `ManifestSetUpdated`
85        // to the reducer's input channel: the reducer holds a separate
86        // in-memory cache fed only via this worker's emits per
87        // RFC-0004 §1 invariant #4 (one MPSC, one mutator per side).
88        // Without an immediate first tick, every HostEvent arriving in
89        // the warm-up window is dropped at the reducer.
90        let mut ticker = tokio::time::interval(POLL_INTERVAL);
91        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
92
93        loop {
94            tokio::select! {
95                biased;
96                _ = &mut shutdown_rx => {
97                    tracing::info!(
98                        target: "shutdown",
99                        task = "cp_manifest_poll",
100                        "task shut down",
101                    );
102                    return;
103                }
104                _ = ticker.tick() => {}
105            }
106
107            match poll_once(&client, &state, &clock).await {
108                Ok(Some((set, fleet_snapshot))) => {
109                    publish_verified_fleet(&state, fleet_snapshot).await;
110                    if let Err(err) = input_tx
111                        .send(ReducerInput::ManifestSetUpdated(Box::new(set)))
112                        .await
113                    {
114                        tracing::warn!(
115                            target: "cp_manifest_poll",
116                            error = %err,
117                            "manifest_poll: reducer input channel closed",
118                        );
119                        return;
120                    }
121                }
122                Ok(None) => {
123                    // Sources not yet configured (transient race during
124                    // startup tests).
125                }
126                Err(err) => {
127                    tracing::warn!(
128                        target: "cp_manifest_poll",
129                        error = %err,
130                        "manifest_poll: tick failed; retaining previous cache",
131                    );
132                }
133            }
134        }
135    })
136}
137
138/// Synchronous startup prime. Called by `server::serve` before the listener
139/// opens so routes that read `state.verified_fleet` (e.g. `channel_status`,
140/// the enrollment route's freshness check) see a populated snapshot on
141/// the first request.
142///
143/// Returns `Ok(false)` when no `channel_refs_source` is configured (test +
144/// offline boot paths fall back to `prime_from_artifact_files`).
145pub async fn prime_blocking(state: &Arc<AppState>, clock: &ClockHandle) -> anyhow::Result<bool> {
146    if state.channel_refs_source.is_none() {
147        return Ok(false);
148    }
149    let client = signed_fetch::build_client();
150    match poll_once(&client, state, clock).await? {
151        Some((_set, snapshot)) => {
152            publish_verified_fleet(state, snapshot).await;
153            Ok(true)
154        }
155        None => Ok(false),
156    }
157}
158
159async fn publish_verified_fleet(state: &Arc<AppState>, snapshot: VerifiedFleetSnapshot) {
160    *state.verified_fleet.write().await = Some(snapshot);
161    state
162        .artifact_primed
163        .store(true, std::sync::atomic::Ordering::Release);
164}
165
166async fn poll_once(
167    client: &reqwest::Client,
168    state: &Arc<AppState>,
169    clock: &ClockHandle,
170) -> anyhow::Result<Option<(SignedManifestSet, VerifiedFleetSnapshot)>> {
171    let Some(channel_refs_source) = state.channel_refs_source.as_ref() else {
172        return Ok(None);
173    };
174
175    let now = clock.now();
176    let (trusted_keys, reject_before) =
177        signed_fetch::read_trust_roots(&channel_refs_source.trust_path, now)?;
178    let token = signed_fetch::read_token(channel_refs_source.token_file.as_deref())?;
179
180    // 1. Fetch + verify fleet.resolved.
181    let (artifact_bytes, signature_bytes) = signed_fetch::fetch_signed_pair(
182        client,
183        &channel_refs_source.artifact_url,
184        &channel_refs_source.signature_url,
185        token.as_deref(),
186    )
187    .await?;
188
189    let fleet: VerifiedFleet = verify_artifact(
190        &artifact_bytes,
191        &signature_bytes,
192        &trusted_keys,
193        now,
194        channel_refs_source.freshness_window,
195        reject_before,
196    )
197    .map_err(|e| anyhow::anyhow!("verify_artifact: {e:?}"))?;
198
199    // Content-address anchor for downstream rolloutId derivation. MUST be
200    // computed against the received bytes, not a re-serialised parse —
201    // additive schema changes the CP's proto doesn't yet know about would
202    // shift the anchor relative to what CI signed.
203    let fleet_hash = nixfleet_reconciler::canonical_hash_from_bytes(&artifact_bytes)
204        .map_err(|e| anyhow::anyhow!("canonical_hash_from_bytes: {e:?}"))?;
205
206    // 2. Per-channel rollout manifest fetch + verify.
207    let mut rollouts: HashMap<String, VerifiedRolloutManifest> = HashMap::new();
208    if let Some(rollouts_source) = state.rollouts_source.as_ref() {
209        for channel in fleet.inner().channels.keys() {
210            match fetch_and_verify_channel_manifest(
211                rollouts_source,
212                fleet.inner(),
213                &fleet_hash,
214                channel,
215                &trusted_keys,
216                now,
217                channel_refs_source.freshness_window,
218                reject_before,
219            )
220            .await
221            {
222                Ok(Some(verified)) => {
223                    rollouts.insert(channel.clone(), verified);
224                }
225                Ok(None) => {}
226                Err(err) => {
227                    tracing::warn!(
228                        target: "cp_manifest_poll",
229                        %channel,
230                        error = %err,
231                        "manifest_poll: rollout manifest fetch/verify failed; \
232                         omitting channel from SignedManifestSet (next tick retries)",
233                    );
234                }
235            }
236        }
237    } else {
238        tracing::debug!(
239            target: "cp_manifest_poll",
240            "manifest_poll: no rollouts_source configured; emitting SignedManifestSet with empty rollouts (plan_next emits no actions)",
241        );
242    }
243
244    let snapshot = VerifiedFleetSnapshot {
245        fleet: Arc::new(fleet.inner().clone()),
246        fleet_resolved_hash: fleet_hash,
247        artifact_bytes,
248        signature_bytes,
249    };
250    let set = SignedManifestSet::new(fleet, rollouts);
251    Ok(Some((set, snapshot)))
252}
253
254#[allow(clippy::too_many_arguments)]
255async fn fetch_and_verify_channel_manifest(
256    rollouts_source: &RolloutsSource,
257    fleet: &nixfleet_proto::FleetResolved,
258    fleet_hash: &str,
259    channel: &str,
260    trusted_keys: &[nixfleet_proto::TrustedPubkey],
261    now: chrono::DateTime<chrono::Utc>,
262    freshness_window: Duration,
263    reject_before: Option<chrono::DateTime<chrono::Utc>>,
264) -> anyhow::Result<Option<VerifiedRolloutManifest>> {
265    let rollout_id = match compute_rollout_id_for_channel(fleet, fleet_hash, channel)? {
266        Some(id) => id,
267        None => return Ok(None),
268    };
269    let (manifest_bytes, signature_bytes) = rollouts_source.fetch_pair(&rollout_id).await?;
270    let verified = verify_rollout_manifest(
271        &manifest_bytes,
272        &signature_bytes,
273        trusted_keys,
274        now,
275        freshness_window,
276        reject_before,
277    )
278    .map_err(|e| anyhow::anyhow!("verify_rollout_manifest({channel}): {e:?}"))?;
279    let m = verified.inner();
280    check_rollout_id_discriminator(&m.channel, &m.channel_ref, &rollout_id)
281        .map_err(|e| anyhow::anyhow!("{channel}: {e}"))?;
282    Ok(Some(verified))
283}
284
285/// CP-side storage invariant: a signed rollout manifest may only be stored
286/// under the canonical RolloutId (RFC-0008 §6.3 `{channel}@{channel_ref}`)
287/// it claims via its own `(channel, channel_ref)` fields. Defends against a
288/// bytes-vs-url-claim substitution where the CP receives a manifest for
289/// channel B while the request is for channel A's rollout id. Mandated by
290/// the `verify_rollout_manifest` docstring; mirrors the agent's
291/// `assert_rollout_id_matches` on the consumer side.
292fn check_rollout_id_discriminator(
293    manifest_channel: &str,
294    manifest_channel_ref: &str,
295    requested: &str,
296) -> anyhow::Result<()> {
297    let parsed = nixfleet_proto::RolloutId::new(manifest_channel, manifest_channel_ref);
298    if parsed.as_str() != requested {
299        return Err(anyhow::anyhow!(
300            "rollout_id discriminator failed: requested {requested} but signed manifest declares {parsed}"
301        ));
302    }
303    Ok(())
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn discriminator_accepts_canonical_match() {
312        check_rollout_id_discriminator("stable", "abc1234deadbeef", "stable@abc1234deadbeef")
313            .expect("canonical id matches");
314    }
315
316    #[test]
317    fn discriminator_rejects_channel_substitution() {
318        let err =
319            check_rollout_id_discriminator("edge", "abc1234deadbeef", "stable@abc1234deadbeef")
320                .expect_err("channel substitution rejected");
321        let msg = err.to_string();
322        assert!(msg.contains("discriminator failed"), "msg: {msg}");
323        assert!(msg.contains("stable@abc1234deadbeef"), "msg: {msg}");
324        assert!(msg.contains("edge@abc1234deadbeef"), "msg: {msg}");
325    }
326
327    #[test]
328    fn discriminator_rejects_ref_substitution() {
329        let err =
330            check_rollout_id_discriminator("stable", "def5678feedface", "stable@abc1234deadbeef")
331                .expect_err("ref substitution rejected");
332        let msg = err.to_string();
333        assert!(msg.contains("discriminator failed"), "msg: {msg}");
334        assert!(msg.contains("stable@abc1234deadbeef"), "msg: {msg}");
335        assert!(msg.contains("stable@def5678feedface"), "msg: {msg}");
336    }
337
338    #[test]
339    fn discriminator_rejects_legacy_hex_only_request() {
340        // Regression guard against accidental drift back to a 64-char
341        // hex sha256 comparator: the canonical request shape under
342        // RFC-0008 §6.3 is `{channel}@{channel_ref}`, not a bare hash.
343        let legacy_hex = "a".repeat(64);
344        let err = check_rollout_id_discriminator("stable", "abc1234", &legacy_hex)
345            .expect_err("legacy hex-only request rejected");
346        assert!(err.to_string().contains("discriminator failed"));
347    }
348}