nixfleet_control_plane/server/routes/
heartbeat.rs

1//! `POST /v1/agent/heartbeat` — agent liveness + drift detection
2//! (RFC-0005 §4.3). Replaces the v0.1 `POST /v1/agent/checkin` flow.
3//!
4//! The agent posts a minimal envelope (hostname, optional rollout_id,
5//! optional current_closure). CP:
6//!
7//! 1. Authenticates via mTLS (existing `require_cn_layer`).
8//! 2. Checks cert CN's machine_id against body hostname (FORBIDDEN on
9//!    mismatch — same shape as /v1/agent/events).
10//! 3. Forwards a `HeartbeatReceived` input to the reducer with a oneshot
11//!    reply.
12//! 4. The reducer updates `last_heartbeat_at` (in-memory) and compares
13//!    the agent's `current_closure` against the CP-mirror's
14//!    `current_closure` field on the matching host_rollout_records row.
15//!    Mismatch → reply contains `last_event_seq` for Replay-From.
16//! 5. Response is 200 with optional `X-Nixfleet-Replay-From: <seq>`
17//!    header.
18//!
19//! The 5-second reducer-reply timeout is generous: a healthy reducer
20//! processes a heartbeat input in microseconds. A timeout here signals
21//! reducer wedge (stuck applier? deadlock?) — log at error, return 503,
22//! agent retries.
23
24use std::sync::Arc;
25use std::time::Duration;
26
27use axum::Json;
28use axum::extract::{Extension, State};
29use axum::http::{HeaderMap, HeaderValue, StatusCode};
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::oneshot;
33
34use super::super::middleware::AuthenticatedCn;
35use super::super::state::AppState;
36use crate::runtime::{HeartbeatReply, ReducerInput};
37
38const REDUCER_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
39
40#[derive(Debug, Clone, Deserialize, Serialize)]
41pub struct HeartbeatRequest {
42    pub hostname: String,
43    /// Agent's view of the active rollout. `None` when the agent has no
44    /// outstanding rollout (post-Converged steady state). Serde-
45    /// transparent (RFC-0008 §6.3): the wire JSON shape is unchanged
46    /// from a plain `String`.
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub rollout_id: Option<nixfleet_proto::RolloutId>,
49    /// Agent's observed `current` closure hash (what the host is
50    /// actually running right now). CP cross-checks against its mirror.
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub current_closure: Option<String>,
53    /// Caller-supplied timestamp (from agent's ClockHandle). CP uses
54    /// this rather than wallclock-now so log lines line up with
55    /// agent-side timing.
56    pub at: DateTime<Utc>,
57}
58
59#[derive(Debug, Clone, Deserialize, Serialize)]
60pub struct HeartbeatResponse {
61    /// Mirrors `last_heartbeat_at` after the update. Useful for the
62    /// agent to confirm CP saw it; null if the reducer was unable to
63    /// process the heartbeat for some reason.
64    pub received_at: Option<DateTime<Utc>>,
65    /// LIFT #3: per active rollout on this host, a HostRolloutSnapshot
66    /// the agent should apply to its in-memory reducer. Populated only
67    /// when the heartbeat carried `rollout_id = None` (boot-recovery
68    /// shape — agent's reducer is empty) AND CP holds non-terminal
69    /// records for the host. Empty in steady-state heartbeats.
70    /// Serde-default empty so old agents ignore the field.
71    #[serde(default, skip_serializing_if = "Vec::is_empty")]
72    pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
73}
74
75pub(in crate::server) async fn heartbeat(
76    State(state): State<Arc<AppState>>,
77    Extension(cn): Extension<AuthenticatedCn>,
78    Json(req): Json<HeartbeatRequest>,
79) -> Result<(HeaderMap, Json<HeartbeatResponse>), StatusCode> {
80    let cn_str = cn.into_string();
81    let machine_id = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
82    if machine_id != req.hostname {
83        tracing::warn!(
84            target: "heartbeat",
85            cert_cn = %cn_str,
86            machine_id = %machine_id,
87            body_hostname = %req.hostname,
88            "heartbeat rejected: cert CN does not match body hostname",
89        );
90        return Err(StatusCode::FORBIDDEN);
91    }
92
93    let Some(tx) = state.runtime_input_tx.get() else {
94        tracing::warn!(
95            target: "heartbeat",
96            hostname = %req.hostname,
97            "heartbeat: runtime not yet spun up; returning 503",
98        );
99        return Err(StatusCode::SERVICE_UNAVAILABLE);
100    };
101
102    let (reply_tx, reply_rx) = oneshot::channel::<HeartbeatReply>();
103    let input = ReducerInput::HeartbeatReceived {
104        host: req.hostname.clone(),
105        rollout_id: req.rollout_id.clone(),
106        current_closure: req.current_closure.clone(),
107        at: req.at,
108        reply: reply_tx,
109    };
110    if let Err(err) = tx.try_send(input) {
111        match err {
112            tokio::sync::mpsc::error::TrySendError::Full(_) => {
113                tracing::warn!(
114                    target: "heartbeat",
115                    hostname = %req.hostname,
116                    "heartbeat: reducer channel full; returning 503",
117                );
118            }
119            tokio::sync::mpsc::error::TrySendError::Closed(_) => {
120                tracing::error!(
121                    target: "heartbeat",
122                    hostname = %req.hostname,
123                    "heartbeat: reducer channel closed; CP shutting down",
124                );
125            }
126        }
127        return Err(StatusCode::SERVICE_UNAVAILABLE);
128    }
129
130    // Wait for the reducer's drift verdict. Short timeout because a
131    // healthy reducer answers in microseconds; a multi-second wedge is
132    // a runtime bug.
133    let reply = match tokio::time::timeout(REDUCER_REPLY_TIMEOUT, reply_rx).await {
134        Ok(Ok(r)) => r,
135        Ok(Err(_recv_err)) => {
136            tracing::error!(
137                target: "heartbeat",
138                hostname = %req.hostname,
139                "heartbeat: reducer dropped reply oneshot without responding",
140            );
141            return Err(StatusCode::SERVICE_UNAVAILABLE);
142        }
143        Err(_elapsed) => {
144            tracing::error!(
145                target: "heartbeat",
146                hostname = %req.hostname,
147                timeout_secs = REDUCER_REPLY_TIMEOUT.as_secs(),
148                "heartbeat: reducer did not respond within timeout (possible wedge)",
149            );
150            return Err(StatusCode::SERVICE_UNAVAILABLE);
151        }
152    };
153
154    tracing::info!(
155        target: "heartbeat",
156        hostname = %req.hostname,
157        rollout_id = ?req.rollout_id,
158        current_closure = ?req.current_closure,
159        replay_from = ?reply.replay_from,
160        "heartbeat received",
161    );
162
163    let mut headers = HeaderMap::new();
164    if let Some(seq) = reply.replay_from {
165        // The agent's MUST-honor header: on drift, it re-POSTs events
166        // from `seq` onward to /v1/agent/events. The exact handshake is
167        // spelled out in RFC-0005 §4.3.
168        headers.insert(
169            "X-Nixfleet-Replay-From",
170            HeaderValue::from_str(&seq.to_string()).expect("u64 to_string is always ASCII"),
171        );
172    }
173    Ok((
174        headers,
175        Json(HeartbeatResponse {
176            received_at: Some(req.at),
177            bootstrap_rollouts: reply.bootstrap_rollouts,
178        }),
179    ))
180}