nixfleet_agent/runtime/workers/
heartbeat.rs

1//! Heartbeat worker: every 60s posts to `/v1/agent/heartbeat` with
2//! `current_closure` + `uptime_secs` + `last_event_seq_by_rollout`.
3//! Reads CP's `X-Nixfleet-Replay-From` response header; on drift the
4//! intent is to walk the durable outbound queue from that seq and
5//! re-POST the missed events.
6//!
7//! v0.2 scope: real POST loop only. The Replay-From walk-and-replay
8//! is intentionally deferred — when CP signals drift today we log a
9//! warning so operators see it. The durable queue (7d) and the
10//! outbound drainer (7c) already give us crash-safe at-least-once
11//! delivery for forward progress; Replay-From is a recovery
12//! optimization for the case where CP lost state, which the
13//! recovery handshake (7f) also covers on agent restart.
14
15use std::time::Duration;
16
17use nixfleet_proto::clock::ClockHandle;
18use tokio::sync::mpsc;
19use tokio::task::JoinHandle;
20
21use super::super::wire::{HeartbeatRequest, HeartbeatResponse};
22use super::super::{AgentConfig, ReducerInput, ShutdownToken};
23
24/// 60s heartbeat cadence. Plan 06 + RFC-0005 §4.3 — same window as the
25/// long-poll's `wait` window so a stuck agent stops heartbeating within
26/// roughly one polling interval.
27const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
28
29const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
30
31const ERROR_BACKOFF: Duration = Duration::from_secs(5);
32
33pub fn spawn(
34    cfg: AgentConfig,
35    clock: ClockHandle,
36    input_tx: mpsc::Sender<ReducerInput>,
37    shutdown: ShutdownToken,
38) -> JoinHandle<()> {
39    tokio::spawn(async move {
40        let mut shutdown_rx = shutdown.into_inner();
41        let client = match crate::comms::build_client(
42            cfg.ca_cert.as_deref(),
43            cfg.client_cert.as_deref(),
44            cfg.client_key.as_deref(),
45        ) {
46            Ok(c) => c,
47            Err(err) => {
48                tracing::error!(
49                    target: "agent_heartbeat",
50                    error = %err,
51                    "failed to build mTLS HTTP client; worker exits",
52                );
53                return;
54            }
55        };
56        let url = format!(
57            "{}/v1/agent/heartbeat",
58            cfg.control_plane_url.trim_end_matches('/'),
59        );
60        let mut ticker = tokio::time::interval(HEARTBEAT_INTERVAL);
61        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63        loop {
64            tokio::select! {
65                biased;
66                _ = &mut shutdown_rx => {
67                    tracing::info!(
68                        target: "shutdown",
69                        task = "agent_heartbeat",
70                        "task shut down",
71                    );
72                    return;
73                }
74                _ = ticker.tick() => {
75                    if let Err(err) = heartbeat_once(&client, &url, &cfg, &clock, &input_tx).await {
76                        tracing::warn!(
77                            target: "agent_heartbeat",
78                            error = %err,
79                            "heartbeat POST failed; backing off",
80                        );
81                        tokio::time::sleep(ERROR_BACKOFF).await;
82                    }
83                }
84            }
85        }
86    })
87}
88
89async fn heartbeat_once(
90    client: &reqwest::Client,
91    url: &str,
92    cfg: &AgentConfig,
93    clock: &ClockHandle,
94    input_tx: &mpsc::Sender<ReducerInput>,
95) -> anyhow::Result<()> {
96    // LIFT #5: read /run/current-system on each heartbeat. The agent's
97    // observed closure is the load-bearing soft-state input the CP
98    // needs to rebuild `host_rollout_records` from agent traffic alone
99    // after a wipe-restart (architecture.md §305 acceptance gate 1).
100    // `rollout_id` stays None: that keeps the heartbeat in the
101    // boot-recovery shape so CP's LIFT #1 + LIFT #5 synthesis fires
102    // when it sees `current_closure == record.target_closure`.
103    let current_closure = crate::runtime::recovery::read_current_closure(&cfg.current_system_path);
104    let req = HeartbeatRequest {
105        hostname: cfg.machine_id.clone(),
106        rollout_id: None,
107        current_closure,
108        at: clock.now(),
109    };
110    // Per-request timeout override: comms::build_client uses a 30s
111    // default; heartbeat insists on a tighter 10s fail-fast.
112    let resp = client
113        .post(url)
114        .timeout(HTTP_TIMEOUT)
115        .json(&req)
116        .send()
117        .await?;
118    let status = resp.status();
119
120    let replay_from = resp
121        .headers()
122        .get("X-Nixfleet-Replay-From")
123        .and_then(|v| v.to_str().ok())
124        .and_then(|s| s.parse::<u64>().ok());
125
126    if !status.is_success() {
127        anyhow::bail!("CP returned {status}");
128    }
129    let body: HeartbeatResponse = resp.json().await?;
130
131    if let Some(seq) = replay_from {
132        // v0.2 deferred: walk-and-replay from durable queue. Logged so
133        // operators see drift; recovery handshake (7f) covers the
134        // forward path on agent restart.
135        tracing::warn!(
136            target: "agent_heartbeat",
137            replay_from = seq,
138            "CP signaled Replay-From (walk-and-replay deferred for v0.2)",
139        );
140    }
141
142    // LIFT #4: forward CP-supplied bootstrap snapshots into the reducer.
143    // CP fills `bootstrap_rollouts` on every heartbeat where the agent
144    // posted `rollout_id=None` (see CP reducer build_bootstrap_for_host).
145    // The steady-state heartbeat currently posts None unconditionally,
146    // so this path catches the case where the agent's in-memory cache
147    // got out of sync with CP between boot-recovery and now (e.g.
148    // LIFT #1 heartbeat synthesis completed post-recovery on CP). The
149    // reducer routes the resulting LocalResetProbeCache effects so
150    // probe runners re-spawn against the bootstrapped rollout_id
151    // instead of holding tickers from a prior agent process.
152    if !body.bootstrap_rollouts.is_empty() {
153        tracing::info!(
154            target: "agent_heartbeat",
155            count = body.bootstrap_rollouts.len(),
156            "heartbeat: CP returned bootstrap snapshots; forwarding to reducer",
157        );
158        for snapshot in body.bootstrap_rollouts {
159            if let Err(err) = input_tx
160                .send(ReducerInput::BootstrapHost(Box::new(snapshot)))
161                .await
162            {
163                tracing::warn!(
164                    target: "agent_heartbeat",
165                    error = %err,
166                    "bootstrap-snapshot forward failed (reducer not ready?); skipping",
167                );
168            }
169        }
170    }
171    Ok(())
172}