nixfleet_control_plane/server/routes/
heartbeat.rs1use std::sync::Arc;
25use std::time::Duration;
26
27use axum::Json;
28use axum::extract::{Extension, State};
29use axum::http::{HeaderMap, HeaderValue, StatusCode};
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::oneshot;
33
34use super::super::middleware::AuthenticatedCn;
35use super::super::state::AppState;
36use crate::runtime::{HeartbeatReply, ReducerInput};
37
38const REDUCER_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
39
40#[derive(Debug, Clone, Deserialize, Serialize)]
41pub struct HeartbeatRequest {
42 pub hostname: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub rollout_id: Option<nixfleet_proto::RolloutId>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub current_closure: Option<String>,
53 pub at: DateTime<Utc>,
57}
58
59#[derive(Debug, Clone, Deserialize, Serialize)]
60pub struct HeartbeatResponse {
61 pub received_at: Option<DateTime<Utc>>,
65 #[serde(default, skip_serializing_if = "Vec::is_empty")]
72 pub bootstrap_rollouts: Vec<nixfleet_proto::agent_wire::HostRolloutSnapshot>,
73}
74
75pub(in crate::server) async fn heartbeat(
76 State(state): State<Arc<AppState>>,
77 Extension(cn): Extension<AuthenticatedCn>,
78 Json(req): Json<HeartbeatRequest>,
79) -> Result<(HeaderMap, Json<HeartbeatResponse>), StatusCode> {
80 let cn_str = cn.into_string();
81 let machine_id = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
82 if machine_id != req.hostname {
83 tracing::warn!(
84 target: "heartbeat",
85 cert_cn = %cn_str,
86 machine_id = %machine_id,
87 body_hostname = %req.hostname,
88 "heartbeat rejected: cert CN does not match body hostname",
89 );
90 return Err(StatusCode::FORBIDDEN);
91 }
92
93 let Some(tx) = state.runtime_input_tx.get() else {
94 tracing::warn!(
95 target: "heartbeat",
96 hostname = %req.hostname,
97 "heartbeat: runtime not yet spun up; returning 503",
98 );
99 return Err(StatusCode::SERVICE_UNAVAILABLE);
100 };
101
102 let (reply_tx, reply_rx) = oneshot::channel::<HeartbeatReply>();
103 let input = ReducerInput::HeartbeatReceived {
104 host: req.hostname.clone(),
105 rollout_id: req.rollout_id.clone(),
106 current_closure: req.current_closure.clone(),
107 at: req.at,
108 reply: reply_tx,
109 };
110 if let Err(err) = tx.try_send(input) {
111 match err {
112 tokio::sync::mpsc::error::TrySendError::Full(_) => {
113 tracing::warn!(
114 target: "heartbeat",
115 hostname = %req.hostname,
116 "heartbeat: reducer channel full; returning 503",
117 );
118 }
119 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
120 tracing::error!(
121 target: "heartbeat",
122 hostname = %req.hostname,
123 "heartbeat: reducer channel closed; CP shutting down",
124 );
125 }
126 }
127 return Err(StatusCode::SERVICE_UNAVAILABLE);
128 }
129
130 let reply = match tokio::time::timeout(REDUCER_REPLY_TIMEOUT, reply_rx).await {
134 Ok(Ok(r)) => r,
135 Ok(Err(_recv_err)) => {
136 tracing::error!(
137 target: "heartbeat",
138 hostname = %req.hostname,
139 "heartbeat: reducer dropped reply oneshot without responding",
140 );
141 return Err(StatusCode::SERVICE_UNAVAILABLE);
142 }
143 Err(_elapsed) => {
144 tracing::error!(
145 target: "heartbeat",
146 hostname = %req.hostname,
147 timeout_secs = REDUCER_REPLY_TIMEOUT.as_secs(),
148 "heartbeat: reducer did not respond within timeout (possible wedge)",
149 );
150 return Err(StatusCode::SERVICE_UNAVAILABLE);
151 }
152 };
153
154 tracing::info!(
155 target: "heartbeat",
156 hostname = %req.hostname,
157 rollout_id = ?req.rollout_id,
158 current_closure = ?req.current_closure,
159 replay_from = ?reply.replay_from,
160 "heartbeat received",
161 );
162
163 let mut headers = HeaderMap::new();
164 if let Some(seq) = reply.replay_from {
165 headers.insert(
169 "X-Nixfleet-Replay-From",
170 HeaderValue::from_str(&seq.to_string()).expect("u64 to_string is always ASCII"),
171 );
172 }
173 Ok((
174 headers,
175 Json(HeartbeatResponse {
176 received_at: Some(req.at),
177 bootstrap_rollouts: reply.bootstrap_rollouts,
178 }),
179 ))
180}