nixfleet_agent/runtime/workers/heartbeat.rs
1//! Heartbeat worker: every 60s posts to `/v1/agent/heartbeat` with
2//! `current_closure` + `uptime_secs` + `last_event_seq_by_rollout`.
3//! Reads CP's `X-Nixfleet-Replay-From` response header; on drift the
4//! intent is to walk the durable outbound queue from that seq and
5//! re-POST the missed events.
6//!
7//! v0.2 scope: real POST loop only. The Replay-From walk-and-replay
8//! is intentionally deferred — when CP signals drift today we log a
9//! warning so operators see it. The durable queue (7d) and the
10//! outbound drainer (7c) already give us crash-safe at-least-once
11//! delivery for forward progress; Replay-From is a recovery
12//! optimization for the case where CP lost state, which the
13//! recovery handshake (7f) also covers on agent restart.
14
15use std::time::Duration;
16
17use nixfleet_proto::clock::ClockHandle;
18use tokio::sync::mpsc;
19use tokio::task::JoinHandle;
20
21use super::super::wire::{HeartbeatRequest, HeartbeatResponse};
22use super::super::{AgentConfig, ReducerInput, ShutdownToken};
23
24/// 60s heartbeat cadence. Plan 06 + RFC-0005 §4.3 — same window as the
25/// long-poll's `wait` window so a stuck agent stops heartbeating within
26/// roughly one polling interval.
27const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
28
29const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
30
31const ERROR_BACKOFF: Duration = Duration::from_secs(5);
32
33pub fn spawn(
34 cfg: AgentConfig,
35 clock: ClockHandle,
36 input_tx: mpsc::Sender<ReducerInput>,
37 shutdown: ShutdownToken,
38) -> JoinHandle<()> {
39 tokio::spawn(async move {
40 let mut shutdown_rx = shutdown.into_inner();
41 let client = match crate::comms::build_client(
42 cfg.ca_cert.as_deref(),
43 cfg.client_cert.as_deref(),
44 cfg.client_key.as_deref(),
45 ) {
46 Ok(c) => c,
47 Err(err) => {
48 tracing::error!(
49 target: "agent_heartbeat",
50 error = %err,
51 "failed to build mTLS HTTP client; worker exits",
52 );
53 return;
54 }
55 };
56 let url = format!(
57 "{}/v1/agent/heartbeat",
58 cfg.control_plane_url.trim_end_matches('/'),
59 );
60 let mut ticker = tokio::time::interval(HEARTBEAT_INTERVAL);
61 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63 loop {
64 tokio::select! {
65 biased;
66 _ = &mut shutdown_rx => {
67 tracing::info!(
68 target: "shutdown",
69 task = "agent_heartbeat",
70 "task shut down",
71 );
72 return;
73 }
74 _ = ticker.tick() => {
75 if let Err(err) = heartbeat_once(&client, &url, &cfg, &clock, &input_tx).await {
76 tracing::warn!(
77 target: "agent_heartbeat",
78 error = %err,
79 "heartbeat POST failed; backing off",
80 );
81 tokio::time::sleep(ERROR_BACKOFF).await;
82 }
83 }
84 }
85 }
86 })
87}
88
89async fn heartbeat_once(
90 client: &reqwest::Client,
91 url: &str,
92 cfg: &AgentConfig,
93 clock: &ClockHandle,
94 input_tx: &mpsc::Sender<ReducerInput>,
95) -> anyhow::Result<()> {
96 // LIFT #5: read /run/current-system on each heartbeat. The agent's
97 // observed closure is the load-bearing soft-state input the CP
98 // needs to rebuild `host_rollout_records` from agent traffic alone
99 // after a wipe-restart (architecture.md §305 acceptance gate 1).
100 // `rollout_id` stays None: that keeps the heartbeat in the
101 // boot-recovery shape so CP's LIFT #1 + LIFT #5 synthesis fires
102 // when it sees `current_closure == record.target_closure`.
103 let current_closure = crate::runtime::recovery::read_current_closure(&cfg.current_system_path);
104 let req = HeartbeatRequest {
105 hostname: cfg.machine_id.clone(),
106 rollout_id: None,
107 current_closure,
108 at: clock.now(),
109 };
110 // Per-request timeout override: comms::build_client uses a 30s
111 // default; heartbeat insists on a tighter 10s fail-fast.
112 let resp = client
113 .post(url)
114 .timeout(HTTP_TIMEOUT)
115 .json(&req)
116 .send()
117 .await?;
118 let status = resp.status();
119
120 let replay_from = resp
121 .headers()
122 .get("X-Nixfleet-Replay-From")
123 .and_then(|v| v.to_str().ok())
124 .and_then(|s| s.parse::<u64>().ok());
125
126 if !status.is_success() {
127 anyhow::bail!("CP returned {status}");
128 }
129 let body: HeartbeatResponse = resp.json().await?;
130
131 if let Some(seq) = replay_from {
132 // v0.2 deferred: walk-and-replay from durable queue. Logged so
133 // operators see drift; recovery handshake (7f) covers the
134 // forward path on agent restart.
135 tracing::warn!(
136 target: "agent_heartbeat",
137 replay_from = seq,
138 "CP signaled Replay-From (walk-and-replay deferred for v0.2)",
139 );
140 }
141
142 // LIFT #4: forward CP-supplied bootstrap snapshots into the reducer.
143 // CP fills `bootstrap_rollouts` on every heartbeat where the agent
144 // posted `rollout_id=None` (see CP reducer build_bootstrap_for_host).
145 // The steady-state heartbeat currently posts None unconditionally,
146 // so this path catches the case where the agent's in-memory cache
147 // got out of sync with CP between boot-recovery and now (e.g.
148 // LIFT #1 heartbeat synthesis completed post-recovery on CP). The
149 // reducer routes the resulting LocalResetProbeCache effects so
150 // probe runners re-spawn against the bootstrapped rollout_id
151 // instead of holding tickers from a prior agent process.
152 if !body.bootstrap_rollouts.is_empty() {
153 tracing::info!(
154 target: "agent_heartbeat",
155 count = body.bootstrap_rollouts.len(),
156 "heartbeat: CP returned bootstrap snapshots; forwarding to reducer",
157 );
158 for snapshot in body.bootstrap_rollouts {
159 if let Err(err) = input_tx
160 .send(ReducerInput::BootstrapHost(Box::new(snapshot)))
161 .await
162 {
163 tracing::warn!(
164 target: "agent_heartbeat",
165 error = %err,
166 "bootstrap-snapshot forward failed (reducer not ready?); skipping",
167 );
168 }
169 }
170 }
171 Ok(())
172}