nixfleet_control_plane/server/routes/
status.rs1use std::sync::Arc;
4
5use axum::Json;
6use axum::body::Body;
7use axum::extract::{Extension, Path, State};
8use axum::http::StatusCode;
9use axum::response::Response;
10use chrono::Utc;
11use nixfleet_proto::{HostRolloutState, HostStatusEntry, HostsResponse};
12use nixfleet_state_machine::HostState;
13use serde::Serialize;
14
15use super::super::middleware::AuthenticatedCn;
16use super::super::state::AppState;
17
18fn host_state_to_wire(s: HostState) -> HostRolloutState {
21 match s {
22 HostState::Pending => HostRolloutState::Pending,
23 HostState::Activating => HostRolloutState::Activating,
24 HostState::Deferred => HostRolloutState::Deferred,
25 HostState::Soaking => HostRolloutState::Soaking,
26 HostState::Converged => HostRolloutState::Converged,
27 HostState::Failed => HostRolloutState::Failed,
28 HostState::Reverted => HostRolloutState::Reverted,
29 }
30}
31
32#[derive(Debug, Serialize)]
33pub(in crate::server) struct WhoamiResponse {
34 cn: String,
35 #[serde(rename = "issuedAt")]
37 issued_at: String,
38}
39
40pub(in crate::server) async fn whoami(
42 Extension(cn): Extension<AuthenticatedCn>,
43) -> Json<WhoamiResponse> {
44 Json(WhoamiResponse {
45 cn: cn.into_string(),
46 issued_at: Utc::now().to_rfc3339(),
47 })
48}
49
50#[derive(Debug, Serialize)]
51pub(in crate::server) struct ChannelStatusResponse {
52 name: String,
53 declared_ci_commit: Option<String>,
55 signed_at: Option<String>,
56 freshness_window_minutes: u32,
57}
58
59pub(in crate::server) async fn channel_status(
61 State(state): State<Arc<AppState>>,
62 Path(name): Path<String>,
63) -> Result<Json<ChannelStatusResponse>, StatusCode> {
64 let snapshot = state.verified_fleet.read().await.clone();
65 let snap = snapshot.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
66 let fleet = snap.fleet;
67 let channel = fleet.channels.get(&name).ok_or(StatusCode::NOT_FOUND)?;
68
69 Ok(Json(ChannelStatusResponse {
70 name,
71 declared_ci_commit: fleet.meta.ci_commit.clone(),
72 signed_at: fleet.meta.signed_at.map(|t| t.to_rfc3339()),
73 freshness_window_minutes: channel.freshness_window,
74 }))
75}
76
77pub(in crate::server) async fn hosts_status(
86 State(state): State<Arc<AppState>>,
87) -> Result<Json<HostsResponse>, StatusCode> {
88 let Some(db) = state.db.as_ref() else {
89 return Ok(Json(HostsResponse { hosts: Vec::new() }));
90 };
91
92 let outstanding = db
96 .probe_failures()
97 .outstanding_failing_enforce_probes_by_rollout()
98 .unwrap_or_default();
99
100 let mut hosts: Vec<HostStatusEntry> = Vec::new();
101 let rollouts = match db.rollouts().list_active() {
102 Ok(r) => r,
103 Err(err) => {
104 tracing::error!(target: "hosts_status", error = %err, "rollouts.list_active failed");
105 return Err(StatusCode::INTERNAL_SERVER_ERROR);
106 }
107 };
108
109 let verified = state.verified_fleet.read().await.clone();
112
113 for r in rollouts.iter() {
114 let records = match db
115 .host_rollout_records()
116 .all_for_rollout(r.rollout_id.as_str())
117 {
118 Ok(rs) => rs,
119 Err(err) => {
120 tracing::error!(
121 target: "hosts_status",
122 rollout_id = %r.rollout_id,
123 error = %err,
124 "all_for_rollout failed; skipping rollout",
125 );
126 continue;
127 }
128 };
129 for row in records {
130 let declared_closure_hash = verified
131 .as_ref()
132 .and_then(|s| s.fleet.hosts.get(&row.hostname))
133 .and_then(|h| h.closure_hash.clone());
134 let compliance_count = outstanding
135 .get(&r.rollout_id)
136 .and_then(|per_host| per_host.get(&row.hostname))
137 .copied()
138 .unwrap_or(0);
139 hosts.push(HostStatusEntry {
140 hostname: row.hostname.clone(),
141 channel: row.channel.clone(),
142 declared_closure_hash,
143 current_closure_hash: row.current_closure.clone(),
144 pending_closure_hash: row.current_closure_at_dispatch.clone(),
145 last_checkin_at: None,
146 last_rollout_id: Some(row.rollout_id.as_str().to_string()),
147 converged: row.state == HostState::Converged,
148 outstanding_compliance_failures: compliance_count,
149 outstanding_runtime_gate_errors: 0,
150 verified_event_count: row.last_event_seq as usize,
151 last_uptime_secs: None,
152 rollout_state: Some(host_state_to_wire(row.state)),
153 pending_reboot: false,
154 quarantined_closure: None,
155 pin: verified
156 .as_ref()
157 .and_then(|s| s.fleet.hosts.get(&row.hostname))
158 .and_then(|h| h.pin.clone()),
159 outstanding_health_failures: 0,
160 });
161 }
162 }
163 Ok(Json(HostsResponse { hosts }))
164}
165
166pub(in crate::server) async fn closure_proxy(
168 State(state): State<Arc<AppState>>,
169 Extension(cn): Extension<AuthenticatedCn>,
170 Path(closure_hash): Path<String>,
171) -> Result<Response, StatusCode> {
172 let cn = cn.as_str();
173
174 let upstream = match &state.closure_upstream {
175 Some(u) => u,
176 None => {
177 tracing::info!(
178 target: "closure_proxy",
179 cn = %cn,
180 closure = %closure_hash,
181 "closure proxy hit but no --closure-upstream configured (501)"
182 );
183 let body = serde_json::json!({
184 "error": "closure proxy not configured",
185 "closure": closure_hash,
186 "tracking": "set services.nixfleet-control-plane.closureUpstream",
187 });
188 return Ok(Response::builder()
189 .status(StatusCode::NOT_IMPLEMENTED)
190 .header("content-type", "application/json")
191 .body(Body::from(body.to_string()))
192 .expect("Response::builder with valid status + body is infallible"));
193 }
194 };
195
196 let url = format!(
197 "{}/{}.narinfo",
198 upstream.base_url.trim_end_matches('/'),
199 closure_hash
200 );
201 tracing::debug!(target: "closure_proxy", cn = %cn, url = %url, "forwarding");
202
203 let resp = match upstream.client.get(&url).send().await {
204 Ok(r) => r,
205 Err(err) => {
206 tracing::warn!(error = %err, "closure proxy: upstream unreachable");
207 return Ok(Response::builder()
208 .status(StatusCode::BAD_GATEWAY)
209 .body(Body::from(format!("upstream error: {err}")))
210 .expect("Response::builder with valid status + body is infallible"));
211 }
212 };
213 let status = resp.status().as_u16();
214 let body = resp.bytes().await.map_err(|err| {
215 tracing::warn!(error = %err, "closure proxy: upstream body read failed");
216 StatusCode::BAD_GATEWAY
217 })?;
218 Ok(Response::builder()
219 .status(status)
220 .header("content-type", "text/x-nix-narinfo")
221 .body(Body::from(body))
222 .expect("Response::builder with valid status + body is infallible"))
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
232 use crate::db::Db;
233 use nixfleet_state_machine::{HostRolloutState, HostState};
234
235 fn fresh_state() -> Arc<AppState> {
236 let db = Db::open_in_memory().unwrap();
237 db.migrate().unwrap();
238 Arc::new(AppState {
239 db: Some(Arc::new(db)),
240 ..Default::default()
241 })
242 }
243
244 #[tokio::test]
245 async fn hosts_status_empty_with_no_rollouts() {
246 let state = fresh_state();
247 let resp = hosts_status(State(state)).await.unwrap();
248 assert!(resp.0.hosts.is_empty(), "no rollouts ⇒ empty hosts list");
249 }
250
251 #[tokio::test]
252 async fn hosts_status_projects_host_rollout_records() {
253 let state = fresh_state();
254 let db = state.db.clone().unwrap();
255 let rollout = "stable@abc12345";
256 let channel = "stable";
257 db.rollouts()
258 .record_rollout_opened(rollout, channel, rollout, chrono::Utc::now(), None)
259 .unwrap();
260
261 let now = chrono::Utc::now();
262 let mut row = HostRolloutState::new_pending(
263 rollout.into(),
264 "h1".to_string(),
265 channel.to_string(),
266 "h1-closure".to_string(),
267 now,
268 now + chrono::Duration::minutes(5),
269 );
270 row.state = HostState::Converged;
271 row.current_closure = Some("h1-closure".into());
272 row.converged_at = Some(now + chrono::Duration::minutes(10));
273 row.last_event_seq = 7;
274 db.host_rollout_records().upsert(&row).unwrap();
275
276 let resp = hosts_status(State(state.clone())).await.unwrap();
277 assert_eq!(resp.0.hosts.len(), 1);
278 let h = &resp.0.hosts[0];
279 assert_eq!(h.hostname, "h1");
280 assert_eq!(h.channel, "stable");
281 assert_eq!(h.last_rollout_id.as_deref(), Some(rollout));
282 assert!(h.converged, "state == Converged ⇒ converged = true");
283 assert_eq!(
284 h.rollout_state,
285 Some(nixfleet_proto::HostRolloutState::Converged),
286 );
287 assert_eq!(h.current_closure_hash.as_deref(), Some("h1-closure"));
288 assert_eq!(h.verified_event_count, 7);
289 }
290}