nixfleet_agent/runtime/
recovery.rs

1//! Boot-recovery handshake (RFC-0005 §9.5 / Plan 07 open-question
2//! resolution).
3//!
4//! Runs ONCE at runtime startup, before any worker is allowed to fire.
5//! Goal: synchronise the (zero-in-memory) agent reducer with whatever
6//! state CP holds, so a crash that drops in-memory state doesn't lead
7//! the agent to either re-fire a switch that already took or skip a
8//! switch that didn't.
9//!
10//! Five scenarios, all distinguished by what `/run/current-system`
11//! points to vs what CP's record says:
12//!
13//!   1. Fresh install. /run/current-system = X, CP has no rollout
14//!      record. First heartbeat reports current_closure=X; CP just
15//!      records "agent present"; next Dispatch starts normal flow.
16//!
17//!   2. Crashed mid-Pending (before LocalActivate fired).
18//!      /run/current-system = prior closure (switch never started).
19//!      First heartbeat reports prior. CP's Pending record's
20//!      current_closure_at_dispatch matches; CP re-queues Dispatch.
21//!      Long-poll picks up; normal flow.
22//!
23//!   3. Crashed mid-Activating. /run/current-system = either prior OR
24//!      target depending on whether switch-to-configuration had
25//!      committed the new generation yet.
26//!        - If target: switch took, agent crashed before posting
27//!          ActivationCompleted. CP synthesises an
28//!          ActivationCompleted-shaped Replay-From event; agent's
29//!          reducer applies it as if it had arrived normally; probes
30//!          start fresh; soak proceeds.
31//!        - If prior: switch didn't take (or had only just started).
32//!          CP treats as scenario 2.
33//!
34//!   4. Crashed mid-Soaking. /run/current-system = target. CP sees
35//!      Soaking record's target matches; replies with a stream of
36//!      synthesised LocalProbeResult events the reducer needs to
37//!      know prior probe state (so the Fail→Pass transition detector
38//!      doesn't double-count). Probes resume; soak completes;
39//!      Converged fires.
40//!
41//!   5. Crashed post-Converged. /run/current-system = target. CP sees
42//!      Converged record; nothing to do; wait for next Dispatch.
43//!
44//! For 7f, the agent issues the first heartbeat and reads the CP's
45//! `X-Nixfleet-Replay-From` header. Full synthesised-event
46//! reconstruction (scenario 3's ActivationCompleted synthesis, scenario
47//! 4's probe-result stream) is documented here but the rich synthesis
48//! lands in a follow-up — the architecture is in place; the wiring
49//! needs CP-side route changes that aren't in scope this commit.
50//! Until then, a non-zero `Replay-From` triggers a warn log; operator
51//! can use `nixfleet trace` to inspect the in-flight rollout.
52
53use std::path::{Path, PathBuf};
54use std::time::Duration;
55
56use chrono::{DateTime, Utc};
57use nixfleet_proto::clock::ClockHandle;
58use serde::{Deserialize, Serialize};
59
60use super::wire::{HeartbeatRequest, HeartbeatResponse};
61
62/// HTTP timeout for the boot-recovery heartbeat. Longer than the
63/// steady-state heartbeat because boot recovery may run while CP is
64/// itself starting up — 30s of slack covers normal CP cold-start.
65const RECOVERY_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
66
67/// Outcome of the boot-recovery handshake. Returned to `runtime::spawn`
68/// so the caller can decide whether to feed synthesised events through
69/// the reducer before workers start.
70#[derive(Debug, Clone)]
71pub struct RecoveryOutcome {
72    pub current_closure: Option<String>,
73    pub replay_from: Option<u64>,
74    pub heartbeat_sent_at: DateTime<Utc>,
75    /// LIFT #3: per-rollout snapshots from CP, populated when CP
76    /// detected the agent's reducer empty (boot-recovery heartbeat with
77    /// rollout_id=None) AND CP holds non-terminal records for the host.
78    /// The runtime spawn path applies each snapshot to the agent's
79    /// in-memory reducer state before workers start, restoring the
80    /// cache so probe runners + advance-ticker resume work
81    /// post-restart.
82    pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
83}
84
85/// Best-effort read of `/run/current-system`'s store-path basename.
86/// Returns `None` when the symlink doesn't exist (fresh install, or
87/// non-NixOS host running the agent for the first time).
88pub fn read_current_closure(path: &Path) -> Option<String> {
89    let target = std::fs::read_link(path).ok()?;
90    target
91        .file_name()
92        .and_then(|n| n.to_str())
93        .map(|s| s.to_string())
94}
95
96/// Issue the gated first-heartbeat. Returns the recovery outcome so
97/// `runtime::spawn` can act on `replay_from` BEFORE starting workers.
98///
99/// `cp_url` is the agent's `--control-plane-url`. `machine_id` matches
100/// the CN in the agent's mTLS cert. The three `ca_cert`/`client_cert`/
101/// `client_key` paths are threaded straight into
102/// `crate::comms::build_client` so the handshake rides the same mTLS
103/// identity post-Phase-7c workers use (DEFECT-003 + D-005). `None`
104/// for all three drops to TLS-only mode (no client cert) — acceptable
105/// for the wiremock-driven tests but never production.
106///
107/// Failure to reach CP is non-fatal: we return an outcome with no
108/// replay-from, and the steady-state heartbeat worker will keep
109/// retrying. Better to start the agent and have its long-poll +
110/// retries re-converge than to refuse to boot.
111pub async fn handshake(
112    cp_url: &str,
113    machine_id: &str,
114    clock: &ClockHandle,
115    current_system_path: &Path,
116    ca_cert: Option<&Path>,
117    client_cert: Option<&Path>,
118    client_key: Option<&Path>,
119) -> RecoveryOutcome {
120    let current_closure = read_current_closure(current_system_path);
121    let now = clock.now();
122
123    let url = format!("{}/v1/agent/heartbeat", cp_url.trim_end_matches('/'));
124    let client = match crate::comms::build_client(ca_cert, client_cert, client_key) {
125        Ok(c) => c,
126        Err(err) => {
127            tracing::warn!(
128                target: "agent_recovery",
129                error = %err,
130                "boot-recovery: mTLS HTTP client build failed; skipping handshake",
131            );
132            return RecoveryOutcome {
133                current_closure,
134                replay_from: None,
135                heartbeat_sent_at: now,
136                bootstrap_rollouts: Vec::new(),
137            };
138        }
139    };
140
141    let req = HeartbeatRequest {
142        hostname: machine_id.to_string(),
143        rollout_id: None,
144        current_closure: current_closure.clone(),
145        at: now,
146    };
147    // Per-request timeout override: comms::build_client uses a 30s
148    // default; boot-recovery wants the same 30s explicitly (matches
149    // pre-D-005 behaviour even though the values coincide today — the
150    // override makes the contract explicit if either constant drifts).
151    let resp = match client
152        .post(&url)
153        .timeout(RECOVERY_HTTP_TIMEOUT)
154        .json(&req)
155        .send()
156        .await
157    {
158        Ok(r) => r,
159        Err(err) => {
160            tracing::warn!(
161                target: "agent_recovery",
162                error = %err,
163                "boot-recovery: heartbeat POST failed; steady-state heartbeat worker will retry",
164            );
165            return RecoveryOutcome {
166                current_closure,
167                replay_from: None,
168                heartbeat_sent_at: now,
169                bootstrap_rollouts: Vec::new(),
170            };
171        }
172    };
173
174    let replay_from = resp
175        .headers()
176        .get("X-Nixfleet-Replay-From")
177        .and_then(|v| v.to_str().ok())
178        .and_then(|s| s.parse::<u64>().ok());
179
180    // LIFT #3: parse the response body for bootstrap snapshots that
181    // CP-side built when it saw our rollout_id=None + non-terminal
182    // records pattern. We MUST read the body (not just drain it) to
183    // get the snapshots. Old CP responses without the field
184    // deserialize cleanly (serde-default empty Vec).
185    let bootstrap_rollouts = match resp.json::<HeartbeatResponse>().await {
186        Ok(body) => {
187            if !body.bootstrap_rollouts.is_empty() {
188                tracing::info!(
189                    target: "agent_recovery",
190                    count = body.bootstrap_rollouts.len(),
191                    "boot-recovery: CP returned bootstrap snapshots; will rehydrate agent reducer",
192                );
193            }
194            body.bootstrap_rollouts
195        }
196        Err(err) => {
197            tracing::warn!(
198                target: "agent_recovery",
199                error = %err,
200                "boot-recovery: response body parse failed; no bootstrap rehydration",
201            );
202            Vec::new()
203        }
204    };
205
206    if let Some(seq) = replay_from {
207        tracing::info!(
208            target: "agent_recovery",
209            replay_from = seq,
210            bootstrap_count = bootstrap_rollouts.len(),
211            "boot-recovery: CP signaled drift; bootstrap snapshots will rehydrate \
212             whatever state was lost (LIFT #3)",
213        );
214    }
215
216    RecoveryOutcome {
217        current_closure,
218        replay_from,
219        heartbeat_sent_at: now,
220        bootstrap_rollouts,
221    }
222}
223
224/// Default location of NixOS's current-system symlink. Test fixtures
225/// override via `handshake`'s `current_system_path` parameter.
226pub fn default_current_system_path() -> PathBuf {
227    PathBuf::from("/run/current-system")
228}
229
230/// Synthetic Replay-From payload shape. Documented for the
231/// follow-up that wires CP to emit per-event replay bodies; today the
232/// agent only sees the seq header.
233#[allow(dead_code)]
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct ReplayFromPayload {
236    pub rollout_id: String,
237    pub from_seq: u64,
238    /// `agent_event_kind` strings the CP plans to synthesise for the
239    /// agent. v0.2 emits seq-only; this field is forward-compat.
240    pub kinds: Vec<String>,
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use std::os::unix::fs::symlink;
247    use tempfile::TempDir;
248    use wiremock::matchers::{method, path};
249    use wiremock::{Mock, MockServer, ResponseTemplate};
250
251    fn fake_clock() -> ClockHandle {
252        std::sync::Arc::new(nixfleet_proto::clock::SystemClock::new())
253    }
254
255    #[test]
256    fn read_current_closure_returns_basename_when_link_exists() {
257        let dir = TempDir::new().unwrap();
258        let store_path = dir.path().join("nixos-system-host-26.05");
259        std::fs::create_dir(&store_path).unwrap();
260        let link = dir.path().join("current-system");
261        symlink(&store_path, &link).unwrap();
262        let got = read_current_closure(&link).unwrap();
263        assert_eq!(got, "nixos-system-host-26.05");
264    }
265
266    #[test]
267    fn read_current_closure_returns_none_when_missing() {
268        let dir = TempDir::new().unwrap();
269        let link = dir.path().join("does-not-exist");
270        assert!(read_current_closure(&link).is_none());
271    }
272
273    #[tokio::test]
274    async fn handshake_returns_replay_from_when_cp_signals_drift() {
275        // RFC-0005 §9.5 scenario 3: agent crashed mid-Activating;
276        // /run/current-system points at the target closure; CP signals
277        // Replay-From=42 so the agent knows it should rebuild from
278        // that seq. We assert the handshake surfaces the seq.
279        let cp = MockServer::start().await;
280        Mock::given(method("POST"))
281            .and(path("/v1/agent/heartbeat"))
282            .respond_with(
283                ResponseTemplate::new(200)
284                    .insert_header("X-Nixfleet-Replay-From", "42")
285                    .set_body_json(serde_json::json!({})),
286            )
287            .mount(&cp)
288            .await;
289
290        let dir = TempDir::new().unwrap();
291        let store_path = dir.path().join("target-closure-store-path");
292        std::fs::create_dir(&store_path).unwrap();
293        let link = dir.path().join("current-system");
294        symlink(&store_path, &link).unwrap();
295
296        // TLS-only mode (no mTLS cert) — wiremock listens on plain
297        // HTTP; reqwest's rustls config only activates for https://
298        // URLs, so the handshake succeeds against http:// wiremock.
299        let outcome = handshake(
300            &cp.uri(),
301            "host-smoke",
302            &fake_clock(),
303            &link,
304            None,
305            None,
306            None,
307        )
308        .await;
309        assert_eq!(
310            outcome.current_closure.as_deref(),
311            Some("target-closure-store-path")
312        );
313        assert_eq!(outcome.replay_from, Some(42));
314    }
315
316    #[tokio::test]
317    async fn handshake_handles_cp_unreachable_gracefully() {
318        let dir = TempDir::new().unwrap();
319        let outcome = handshake(
320            "http://localhost:1",
321            "host-smoke",
322            &fake_clock(),
323            &dir.path().join("missing-current-system"),
324            None,
325            None,
326            None,
327        )
328        .await;
329        assert!(outcome.current_closure.is_none());
330        assert!(outcome.replay_from.is_none());
331    }
332}