nixfleet_agent/runtime/workers/
manifest_poll.rs

1//! Periodic agent-side manifest poll. Mirrors CP's
2//! `runtime::workers::manifest_poll`: every 30s the worker fetches
3//! `/v1/fleet.resolved` + each channel's `/v1/rollouts/{id}` from CP,
4//! verifies signatures + cross-checks each rollout's
5//! `fleet_resolved_hash` against the canonical hash of the just-fetched
6//! fleet, then emits `ReducerInput::ManifestSetUpdated` so the reducer's
7//! cached `SignedManifestSet` reflects the current fleet snapshot.
8//!
9//! First tick fires at `Instant::now()` (no startup delay), so the
10//! reducer's `manifests` cache is populated before the longpoll worker
11//! receives its first `Dispatch`. Subsequent ticks every 30s; on tick
12//! failure the worker retains the prior emit and retries next tick.
13//!
14//! Failure semantics (matching CP's manifest_poll):
15//! - Fleet fetch/verify failed -> skip the entire tick; reducer retains
16//!   its prior cache.
17//! - Per-rollout fetch/verify failed OR cross-check mismatch -> log +
18//!   skip that channel only; emit the partial set.
19//! - Reducer channel closed (cancel propagated) -> exit.
20
21use std::collections::HashMap;
22use std::time::Duration;
23
24use nixfleet_proto::clock::ClockHandle;
25use nixfleet_reconciler::compute_rollout_id_for_channel;
26use nixfleet_reconciler::planner_types::SignedManifestSet;
27use tokio::sync::mpsc;
28use tokio::task::JoinHandle;
29
30use super::super::{AgentConfig, ReducerInput, ShutdownToken};
31use crate::manifest_cache::ManifestCache;
32
33/// Poll cadence. Matches CP's `runtime::workers::manifest_poll::POLL_INTERVAL`.
34const POLL_INTERVAL: Duration = Duration::from_secs(30);
35
36pub fn spawn(
37    cfg: AgentConfig,
38    clock: ClockHandle,
39    input_tx: mpsc::Sender<ReducerInput>,
40    shutdown: ShutdownToken,
41) -> JoinHandle<()> {
42    let trust_path = cfg.trust_file.clone();
43    spawn_with_trust_path(cfg, trust_path, clock, input_tx, shutdown)
44}
45
46/// Tunable-trust-path entry point. Production code calls [`spawn`] which
47/// threads `AgentConfig::trust_file` (the `--trust-file` CLI arg);
48/// integration tests under the `test-helpers` feature gate call this
49/// variant with a tempdir-rooted trust.json so the worker can run
50/// without touching
51/// `/etc/nixfleet/agent/`. Same convention as
52/// `nixfleet_agent::runtime::ShutdownToken::__test_only_from_rx`.
53pub fn spawn_with_trust_path(
54    cfg: AgentConfig,
55    trust_path: std::path::PathBuf,
56    _clock: ClockHandle,
57    input_tx: mpsc::Sender<ReducerInput>,
58    shutdown: ShutdownToken,
59) -> JoinHandle<()> {
60    tokio::spawn(async move {
61        let mut shutdown_rx = shutdown.into_inner();
62        let client = match crate::comms::build_client(
63            cfg.ca_cert.as_deref(),
64            cfg.client_cert.as_deref(),
65            cfg.client_key.as_deref(),
66        ) {
67            Ok(c) => c,
68            Err(err) => {
69                tracing::error!(
70                    target: "agent_manifest_poll",
71                    error = %err,
72                    "failed to build mTLS HTTP client; worker exits",
73                );
74                return;
75            }
76        };
77        let manifest_cache = ManifestCache::new_with_freshness(
78            &cfg.state_dir,
79            &trust_path,
80            std::time::Duration::from_secs(cfg.manifest_freshness_window_secs),
81        );
82        let cp_url = cfg.control_plane_url.clone();
83
84        // First tick at Instant::now() (no startup delay) so the reducer's
85        // manifest cache is populated before longpoll's first dispatch can
86        // arrive. Subsequent ticks every POLL_INTERVAL.
87        let mut ticker = tokio::time::interval(POLL_INTERVAL);
88        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
89
90        loop {
91            tokio::select! {
92                biased;
93                _ = &mut shutdown_rx => {
94                    tracing::info!(
95                        target: "shutdown",
96                        task = "agent_manifest_poll",
97                        "task shut down",
98                    );
99                    return;
100                }
101                _ = ticker.tick() => {}
102            }
103
104            match tick_once(&manifest_cache, &client, &cp_url).await {
105                Ok(set) => {
106                    if let Err(err) = input_tx
107                        .send(ReducerInput::ManifestSetUpdated(Box::new(set)))
108                        .await
109                    {
110                        tracing::warn!(
111                            target: "agent_manifest_poll",
112                            error = %err,
113                            "reducer input channel closed; worker exits",
114                        );
115                        return;
116                    }
117                }
118                Err(err) => {
119                    tracing::warn!(
120                        target: "agent_manifest_poll",
121                        error = %err,
122                        "tick failed; retaining previous reducer cache",
123                    );
124                }
125            }
126        }
127    })
128}
129
130async fn tick_once(
131    manifest_cache: &ManifestCache,
132    client: &reqwest::Client,
133    cp_url: &str,
134) -> anyhow::Result<SignedManifestSet> {
135    // 1. Fetch + verify fleet.resolved. `fetch_or_load_fleet` returns the
136    //    canonical hash of the artifact bytes paired with the Verified
137    //    struct so the per-rollout discriminator below has its anchor.
138    let (verified_fleet, fleet_hash) = manifest_cache
139        .fetch_or_load_fleet(client, cp_url)
140        .await
141        .map_err(|err| anyhow::anyhow!("fetch fleet: {}", err.reason()))?;
142
143    // 2. For each channel in the verified fleet, derive the expected
144    //    rollout_id, fetch + verify, and cross-check the discriminator.
145    let mut rollouts = HashMap::new();
146    for channel in verified_fleet.inner().channels.keys() {
147        let rollout_id =
148            match compute_rollout_id_for_channel(verified_fleet.inner(), &fleet_hash, channel) {
149                Ok(Some(id)) => id,
150                Ok(None) => continue, // channel has no host with a closure yet
151                Err(err) => {
152                    tracing::warn!(
153                        target: "agent_manifest_poll",
154                        %channel,
155                        error = %err,
156                        "compute_rollout_id_for_channel failed; skipping channel",
157                    );
158                    continue;
159                }
160            };
161
162        let verified_rollout = match manifest_cache
163            .fetch_or_load(client, cp_url, &rollout_id)
164            .await
165        {
166            Ok(v) => v,
167            Err(err) => {
168                tracing::warn!(
169                    target: "agent_manifest_poll",
170                    %channel,
171                    %rollout_id,
172                    error = %err.reason(),
173                    "fetch rollout manifest failed; skipping channel",
174                );
175                continue;
176            }
177        };
178
179        // Discriminator: the rollout's pinned fleet_resolved_hash MUST
180        // match the canonical hash of the fleet we just fetched. A
181        // mismatch means CP is serving inconsistent state across the
182        // two artifacts (e.g. fresh fleet snapshot but stale rollout
183        // pin, or vice versa). Defense-in-depth over signature verify;
184        // mirrors the architect's amendment on the d010-feed plan
185        // restated under Option C: discriminator lives at the worker
186        // boundary as a cross-consistency check.
187        if verified_rollout.inner().fleet_resolved_hash != fleet_hash {
188            tracing::warn!(
189                target: "agent_manifest_poll",
190                %channel,
191                %rollout_id,
192                rollout_pin = %verified_rollout.inner().fleet_resolved_hash,
193                fleet_hash = %fleet_hash,
194                "rollout manifest pins a different fleet snapshot than the one CP served; skipping",
195            );
196            continue;
197        }
198
199        rollouts.insert(channel.clone(), verified_rollout);
200    }
201
202    Ok(SignedManifestSet::new(verified_fleet, rollouts))
203}