nixfleet_agent/runtime/recovery.rs
1//! Boot-recovery handshake (RFC-0005 §9.5 / Plan 07 open-question
2//! resolution).
3//!
4//! Runs ONCE at runtime startup, before any worker is allowed to fire.
5//! Goal: synchronise the (zero-in-memory) agent reducer with whatever
6//! state CP holds, so a crash that drops in-memory state doesn't lead
7//! the agent to either re-fire a switch that already took or skip a
8//! switch that didn't.
9//!
10//! Five scenarios, all distinguished by what `/run/current-system`
11//! points to vs what CP's record says:
12//!
13//! 1. Fresh install. /run/current-system = X, CP has no rollout
14//! record. First heartbeat reports current_closure=X; CP just
15//! records "agent present"; next Dispatch starts normal flow.
16//!
17//! 2. Crashed mid-Pending (before LocalActivate fired).
18//! /run/current-system = prior closure (switch never started).
19//! First heartbeat reports prior. CP's Pending record's
20//! current_closure_at_dispatch matches; CP re-queues Dispatch.
21//! Long-poll picks up; normal flow.
22//!
23//! 3. Crashed mid-Activating. /run/current-system = either prior OR
24//! target depending on whether switch-to-configuration had
25//! committed the new generation yet.
26//! - If target: switch took, agent crashed before posting
27//! ActivationCompleted. CP synthesises an
28//! ActivationCompleted-shaped Replay-From event; agent's
29//! reducer applies it as if it had arrived normally; probes
30//! start fresh; soak proceeds.
31//! - If prior: switch didn't take (or had only just started).
32//! CP treats as scenario 2.
33//!
34//! 4. Crashed mid-Soaking. /run/current-system = target. CP sees
35//! Soaking record's target matches; replies with a stream of
36//! synthesised LocalProbeResult events the reducer needs to
37//! know prior probe state (so the Fail→Pass transition detector
38//! doesn't double-count). Probes resume; soak completes;
39//! Converged fires.
40//!
41//! 5. Crashed post-Converged. /run/current-system = target. CP sees
42//! Converged record; nothing to do; wait for next Dispatch.
43//!
44//! For 7f, the agent issues the first heartbeat and reads the CP's
45//! `X-Nixfleet-Replay-From` header. Full synthesised-event
46//! reconstruction (scenario 3's ActivationCompleted synthesis, scenario
47//! 4's probe-result stream) is documented here but the rich synthesis
48//! lands in a follow-up — the architecture is in place; the wiring
49//! needs CP-side route changes that aren't in scope this commit.
50//! Until then, a non-zero `Replay-From` triggers a warn log; operator
51//! can use `nixfleet trace` to inspect the in-flight rollout.
52
53use std::path::{Path, PathBuf};
54use std::time::Duration;
55
56use chrono::{DateTime, Utc};
57use nixfleet_proto::clock::ClockHandle;
58use serde::{Deserialize, Serialize};
59
60use super::wire::{HeartbeatRequest, HeartbeatResponse};
61
62/// HTTP timeout for the boot-recovery heartbeat. Longer than the
63/// steady-state heartbeat because boot recovery may run while CP is
64/// itself starting up — 30s of slack covers normal CP cold-start.
65const RECOVERY_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
66
67/// Outcome of the boot-recovery handshake. Returned to `runtime::spawn`
68/// so the caller can decide whether to feed synthesised events through
69/// the reducer before workers start.
70#[derive(Debug, Clone)]
71pub struct RecoveryOutcome {
72 pub current_closure: Option<String>,
73 pub replay_from: Option<u64>,
74 pub heartbeat_sent_at: DateTime<Utc>,
75 /// LIFT #3: per-rollout snapshots from CP, populated when CP
76 /// detected the agent's reducer empty (boot-recovery heartbeat with
77 /// rollout_id=None) AND CP holds non-terminal records for the host.
78 /// The runtime spawn path applies each snapshot to the agent's
79 /// in-memory reducer state before workers start, restoring the
80 /// cache so probe runners + advance-ticker resume work
81 /// post-restart.
82 pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
83}
84
85/// Best-effort read of `/run/current-system`'s store-path basename.
86/// Returns `None` when the symlink doesn't exist (fresh install, or
87/// non-NixOS host running the agent for the first time).
88pub fn read_current_closure(path: &Path) -> Option<String> {
89 let target = std::fs::read_link(path).ok()?;
90 target
91 .file_name()
92 .and_then(|n| n.to_str())
93 .map(|s| s.to_string())
94}
95
96/// Issue the gated first-heartbeat. Returns the recovery outcome so
97/// `runtime::spawn` can act on `replay_from` BEFORE starting workers.
98///
99/// `cp_url` is the agent's `--control-plane-url`. `machine_id` matches
100/// the CN in the agent's mTLS cert. The three `ca_cert`/`client_cert`/
101/// `client_key` paths are threaded straight into
102/// `crate::comms::build_client` so the handshake rides the same mTLS
103/// identity post-Phase-7c workers use (DEFECT-003 + D-005). `None`
104/// for all three drops to TLS-only mode (no client cert) — acceptable
105/// for the wiremock-driven tests but never production.
106///
107/// Failure to reach CP is non-fatal: we return an outcome with no
108/// replay-from, and the steady-state heartbeat worker will keep
109/// retrying. Better to start the agent and have its long-poll +
110/// retries re-converge than to refuse to boot.
111pub async fn handshake(
112 cp_url: &str,
113 machine_id: &str,
114 clock: &ClockHandle,
115 current_system_path: &Path,
116 ca_cert: Option<&Path>,
117 client_cert: Option<&Path>,
118 client_key: Option<&Path>,
119) -> RecoveryOutcome {
120 let current_closure = read_current_closure(current_system_path);
121 let now = clock.now();
122
123 let url = format!("{}/v1/agent/heartbeat", cp_url.trim_end_matches('/'));
124 let client = match crate::comms::build_client(ca_cert, client_cert, client_key) {
125 Ok(c) => c,
126 Err(err) => {
127 tracing::warn!(
128 target: "agent_recovery",
129 error = %err,
130 "boot-recovery: mTLS HTTP client build failed; skipping handshake",
131 );
132 return RecoveryOutcome {
133 current_closure,
134 replay_from: None,
135 heartbeat_sent_at: now,
136 bootstrap_rollouts: Vec::new(),
137 };
138 }
139 };
140
141 let req = HeartbeatRequest {
142 hostname: machine_id.to_string(),
143 rollout_id: None,
144 current_closure: current_closure.clone(),
145 at: now,
146 };
147 // Per-request timeout override: comms::build_client uses a 30s
148 // default; boot-recovery wants the same 30s explicitly (matches
149 // pre-D-005 behaviour even though the values coincide today — the
150 // override makes the contract explicit if either constant drifts).
151 let resp = match client
152 .post(&url)
153 .timeout(RECOVERY_HTTP_TIMEOUT)
154 .json(&req)
155 .send()
156 .await
157 {
158 Ok(r) => r,
159 Err(err) => {
160 tracing::warn!(
161 target: "agent_recovery",
162 error = %err,
163 "boot-recovery: heartbeat POST failed; steady-state heartbeat worker will retry",
164 );
165 return RecoveryOutcome {
166 current_closure,
167 replay_from: None,
168 heartbeat_sent_at: now,
169 bootstrap_rollouts: Vec::new(),
170 };
171 }
172 };
173
174 let replay_from = resp
175 .headers()
176 .get("X-Nixfleet-Replay-From")
177 .and_then(|v| v.to_str().ok())
178 .and_then(|s| s.parse::<u64>().ok());
179
180 // LIFT #3: parse the response body for bootstrap snapshots that
181 // CP-side built when it saw our rollout_id=None + non-terminal
182 // records pattern. We MUST read the body (not just drain it) to
183 // get the snapshots. Old CP responses without the field
184 // deserialize cleanly (serde-default empty Vec).
185 let bootstrap_rollouts = match resp.json::<HeartbeatResponse>().await {
186 Ok(body) => {
187 if !body.bootstrap_rollouts.is_empty() {
188 tracing::info!(
189 target: "agent_recovery",
190 count = body.bootstrap_rollouts.len(),
191 "boot-recovery: CP returned bootstrap snapshots; will rehydrate agent reducer",
192 );
193 }
194 body.bootstrap_rollouts
195 }
196 Err(err) => {
197 tracing::warn!(
198 target: "agent_recovery",
199 error = %err,
200 "boot-recovery: response body parse failed; no bootstrap rehydration",
201 );
202 Vec::new()
203 }
204 };
205
206 if let Some(seq) = replay_from {
207 tracing::info!(
208 target: "agent_recovery",
209 replay_from = seq,
210 bootstrap_count = bootstrap_rollouts.len(),
211 "boot-recovery: CP signaled drift; bootstrap snapshots will rehydrate \
212 whatever state was lost (LIFT #3)",
213 );
214 }
215
216 RecoveryOutcome {
217 current_closure,
218 replay_from,
219 heartbeat_sent_at: now,
220 bootstrap_rollouts,
221 }
222}
223
224/// Default location of NixOS's current-system symlink. Test fixtures
225/// override via `handshake`'s `current_system_path` parameter.
226pub fn default_current_system_path() -> PathBuf {
227 PathBuf::from("/run/current-system")
228}
229
230/// Synthetic Replay-From payload shape. Documented for the
231/// follow-up that wires CP to emit per-event replay bodies; today the
232/// agent only sees the seq header.
233#[allow(dead_code)]
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct ReplayFromPayload {
236 pub rollout_id: String,
237 pub from_seq: u64,
238 /// `agent_event_kind` strings the CP plans to synthesise for the
239 /// agent. v0.2 emits seq-only; this field is forward-compat.
240 pub kinds: Vec<String>,
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use std::os::unix::fs::symlink;
247 use tempfile::TempDir;
248 use wiremock::matchers::{method, path};
249 use wiremock::{Mock, MockServer, ResponseTemplate};
250
251 fn fake_clock() -> ClockHandle {
252 std::sync::Arc::new(nixfleet_proto::clock::SystemClock::new())
253 }
254
255 #[test]
256 fn read_current_closure_returns_basename_when_link_exists() {
257 let dir = TempDir::new().unwrap();
258 let store_path = dir.path().join("nixos-system-host-26.05");
259 std::fs::create_dir(&store_path).unwrap();
260 let link = dir.path().join("current-system");
261 symlink(&store_path, &link).unwrap();
262 let got = read_current_closure(&link).unwrap();
263 assert_eq!(got, "nixos-system-host-26.05");
264 }
265
266 #[test]
267 fn read_current_closure_returns_none_when_missing() {
268 let dir = TempDir::new().unwrap();
269 let link = dir.path().join("does-not-exist");
270 assert!(read_current_closure(&link).is_none());
271 }
272
273 #[tokio::test]
274 async fn handshake_returns_replay_from_when_cp_signals_drift() {
275 // RFC-0005 §9.5 scenario 3: agent crashed mid-Activating;
276 // /run/current-system points at the target closure; CP signals
277 // Replay-From=42 so the agent knows it should rebuild from
278 // that seq. We assert the handshake surfaces the seq.
279 let cp = MockServer::start().await;
280 Mock::given(method("POST"))
281 .and(path("/v1/agent/heartbeat"))
282 .respond_with(
283 ResponseTemplate::new(200)
284 .insert_header("X-Nixfleet-Replay-From", "42")
285 .set_body_json(serde_json::json!({})),
286 )
287 .mount(&cp)
288 .await;
289
290 let dir = TempDir::new().unwrap();
291 let store_path = dir.path().join("target-closure-store-path");
292 std::fs::create_dir(&store_path).unwrap();
293 let link = dir.path().join("current-system");
294 symlink(&store_path, &link).unwrap();
295
296 // TLS-only mode (no mTLS cert) — wiremock listens on plain
297 // HTTP; reqwest's rustls config only activates for https://
298 // URLs, so the handshake succeeds against http:// wiremock.
299 let outcome = handshake(
300 &cp.uri(),
301 "host-smoke",
302 &fake_clock(),
303 &link,
304 None,
305 None,
306 None,
307 )
308 .await;
309 assert_eq!(
310 outcome.current_closure.as_deref(),
311 Some("target-closure-store-path")
312 );
313 assert_eq!(outcome.replay_from, Some(42));
314 }
315
316 #[tokio::test]
317 async fn handshake_handles_cp_unreachable_gracefully() {
318 let dir = TempDir::new().unwrap();
319 let outcome = handshake(
320 "http://localhost:1",
321 "host-smoke",
322 &fake_clock(),
323 &dir.path().join("missing-current-system"),
324 None,
325 None,
326 None,
327 )
328 .await;
329 assert!(outcome.current_closure.is_none());
330 assert!(outcome.replay_from.is_none());
331 }
332}