nixfleet_control_plane/server/routes/
status.rs

1//! Read-only status endpoints and closure proxy fallback.
2
3use std::sync::Arc;
4
5use axum::Json;
6use axum::body::Body;
7use axum::extract::{Extension, Path, State};
8use axum::http::StatusCode;
9use axum::response::Response;
10use chrono::Utc;
11use nixfleet_proto::{HostRolloutState, HostStatusEntry, HostsResponse};
12use nixfleet_state_machine::HostState;
13use serde::Serialize;
14
15use super::super::middleware::AuthenticatedCn;
16use super::super::state::AppState;
17
18/// Map the reducer's internal `HostState` to the wire-side `HostRolloutState`.
19/// 1:1 — proto's wire variants match the state-machine's enum exactly.
20fn host_state_to_wire(s: HostState) -> HostRolloutState {
21    match s {
22        HostState::Pending => HostRolloutState::Pending,
23        HostState::Activating => HostRolloutState::Activating,
24        HostState::Deferred => HostRolloutState::Deferred,
25        HostState::Soaking => HostRolloutState::Soaking,
26        HostState::Converged => HostRolloutState::Converged,
27        HostState::Failed => HostRolloutState::Failed,
28        HostState::Reverted => HostRolloutState::Reverted,
29    }
30}
31
32#[derive(Debug, Serialize)]
33pub(in crate::server) struct WhoamiResponse {
34    cn: String,
35    /// RFC3339; moment we observed the verified identity, not the cert's notBefore.
36    #[serde(rename = "issuedAt")]
37    issued_at: String,
38}
39
40/// `GET /v1/whoami` - verified mTLS CN of the caller.
41pub(in crate::server) async fn whoami(
42    Extension(cn): Extension<AuthenticatedCn>,
43) -> Json<WhoamiResponse> {
44    Json(WhoamiResponse {
45        cn: cn.into_string(),
46        issued_at: Utc::now().to_rfc3339(),
47    })
48}
49
50#[derive(Debug, Serialize)]
51pub(in crate::server) struct ChannelStatusResponse {
52    name: String,
53    /// `None` when offline / file-backed deploys leave `meta.ciCommit` unset.
54    declared_ci_commit: Option<String>,
55    signed_at: Option<String>,
56    freshness_window_minutes: u32,
57}
58
59/// `GET /v1/channels/{name}` - 503 until verified snapshot primed; 404 if channel undeclared.
60pub(in crate::server) async fn channel_status(
61    State(state): State<Arc<AppState>>,
62    Path(name): Path<String>,
63) -> Result<Json<ChannelStatusResponse>, StatusCode> {
64    let snapshot = state.verified_fleet.read().await.clone();
65    let snap = snapshot.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
66    let fleet = snap.fleet;
67    let channel = fleet.channels.get(&name).ok_or(StatusCode::NOT_FOUND)?;
68
69    Ok(Json(ChannelStatusResponse {
70        name,
71        declared_ci_commit: fleet.meta.ci_commit.clone(),
72        signed_at: fleet.meta.signed_at.map(|t| t.to_rfc3339()),
73        freshness_window_minutes: channel.freshness_window,
74    }))
75}
76
77/// `GET /v1/hosts` - per-host status overview, projected from
78/// `host_rollout_records` (RFC-0005 §5).
79///
80/// One entry per (rollout, host) pair across all non-superseded rollouts.
81/// Fields the v0.1 schema carried but the new schema doesn't (per-host
82/// uptime, pending_reboot, quarantined_closure, last_checkin_at) stay at
83/// their defaults — the agent runtime rewrite (Plan 07 / Phase 7-agent)
84/// is where those re-attach to wire reports.
85pub(in crate::server) async fn hosts_status(
86    State(state): State<Arc<AppState>>,
87) -> Result<Json<HostsResponse>, StatusCode> {
88    let Some(db) = state.db.as_ref() else {
89        return Ok(Json(HostsResponse { hosts: Vec::new() }));
90    };
91
92    // Enforce-mode probe-failure counts, keyed by (rollout, host).
93    // Source: probe_failures projection (RFC-0007 §7.2). **Phase 9a**:
94    // unwritten until 9b — values flow once the applier co-write lands.
95    let outstanding = db
96        .probe_failures()
97        .outstanding_failing_enforce_probes_by_rollout()
98        .unwrap_or_default();
99
100    let mut hosts: Vec<HostStatusEntry> = Vec::new();
101    let rollouts = match db.rollouts().list_active() {
102        Ok(r) => r,
103        Err(err) => {
104            tracing::error!(target: "hosts_status", error = %err, "rollouts.list_active failed");
105            return Err(StatusCode::INTERNAL_SERVER_ERROR);
106        }
107    };
108
109    // Declared closure per host: best-effort lookup from the verified
110    // fleet snapshot. None when verified_fleet isn't primed (early boot).
111    let verified = state.verified_fleet.read().await.clone();
112
113    for r in rollouts.iter() {
114        let records = match db
115            .host_rollout_records()
116            .all_for_rollout(r.rollout_id.as_str())
117        {
118            Ok(rs) => rs,
119            Err(err) => {
120                tracing::error!(
121                    target: "hosts_status",
122                    rollout_id = %r.rollout_id,
123                    error = %err,
124                    "all_for_rollout failed; skipping rollout",
125                );
126                continue;
127            }
128        };
129        for row in records {
130            let declared_closure_hash = verified
131                .as_ref()
132                .and_then(|s| s.fleet.hosts.get(&row.hostname))
133                .and_then(|h| h.closure_hash.clone());
134            let compliance_count = outstanding
135                .get(&r.rollout_id)
136                .and_then(|per_host| per_host.get(&row.hostname))
137                .copied()
138                .unwrap_or(0);
139            hosts.push(HostStatusEntry {
140                hostname: row.hostname.clone(),
141                channel: row.channel.clone(),
142                declared_closure_hash,
143                current_closure_hash: row.current_closure.clone(),
144                pending_closure_hash: row.current_closure_at_dispatch.clone(),
145                last_checkin_at: None,
146                last_rollout_id: Some(row.rollout_id.as_str().to_string()),
147                converged: row.state == HostState::Converged,
148                outstanding_compliance_failures: compliance_count,
149                outstanding_runtime_gate_errors: 0,
150                verified_event_count: row.last_event_seq as usize,
151                last_uptime_secs: None,
152                rollout_state: Some(host_state_to_wire(row.state)),
153                pending_reboot: false,
154                quarantined_closure: None,
155                pin: verified
156                    .as_ref()
157                    .and_then(|s| s.fleet.hosts.get(&row.hostname))
158                    .and_then(|h| h.pin.clone()),
159                outstanding_health_failures: 0,
160            });
161        }
162    }
163    Ok(Json(HostsResponse { hosts }))
164}
165
166/// `GET /v1/agent/closure/{hash}` - narinfo proxy fallback; 501 when no upstream configured.
167pub(in crate::server) async fn closure_proxy(
168    State(state): State<Arc<AppState>>,
169    Extension(cn): Extension<AuthenticatedCn>,
170    Path(closure_hash): Path<String>,
171) -> Result<Response, StatusCode> {
172    let cn = cn.as_str();
173
174    let upstream = match &state.closure_upstream {
175        Some(u) => u,
176        None => {
177            tracing::info!(
178                target: "closure_proxy",
179                cn = %cn,
180                closure = %closure_hash,
181                "closure proxy hit but no --closure-upstream configured (501)"
182            );
183            let body = serde_json::json!({
184                "error": "closure proxy not configured",
185                "closure": closure_hash,
186                "tracking": "set services.nixfleet-control-plane.closureUpstream",
187            });
188            return Ok(Response::builder()
189                .status(StatusCode::NOT_IMPLEMENTED)
190                .header("content-type", "application/json")
191                .body(Body::from(body.to_string()))
192                .expect("Response::builder with valid status + body is infallible"));
193        }
194    };
195
196    let url = format!(
197        "{}/{}.narinfo",
198        upstream.base_url.trim_end_matches('/'),
199        closure_hash
200    );
201    tracing::debug!(target: "closure_proxy", cn = %cn, url = %url, "forwarding");
202
203    let resp = match upstream.client.get(&url).send().await {
204        Ok(r) => r,
205        Err(err) => {
206            tracing::warn!(error = %err, "closure proxy: upstream unreachable");
207            return Ok(Response::builder()
208                .status(StatusCode::BAD_GATEWAY)
209                .body(Body::from(format!("upstream error: {err}")))
210                .expect("Response::builder with valid status + body is infallible"));
211        }
212    };
213    let status = resp.status().as_u16();
214    let body = resp.bytes().await.map_err(|err| {
215        tracing::warn!(error = %err, "closure proxy: upstream body read failed");
216        StatusCode::BAD_GATEWAY
217    })?;
218    Ok(Response::builder()
219        .status(status)
220        .header("content-type", "text/x-nix-narinfo")
221        .body(Body::from(body))
222        .expect("Response::builder with valid status + body is infallible"))
223}
224
225#[cfg(test)]
226mod tests {
227    //! Happy-path projection tests for `/v1/hosts`. The route reads from
228    //! host_rollout_records + the rollouts table; we set up both directly
229    //! and call the handler in-process (no router/middleware).
230
231    use super::*;
232    use crate::db::Db;
233    use nixfleet_state_machine::{HostRolloutState, HostState};
234
235    fn fresh_state() -> Arc<AppState> {
236        let db = Db::open_in_memory().unwrap();
237        db.migrate().unwrap();
238        Arc::new(AppState {
239            db: Some(Arc::new(db)),
240            ..Default::default()
241        })
242    }
243
244    #[tokio::test]
245    async fn hosts_status_empty_with_no_rollouts() {
246        let state = fresh_state();
247        let resp = hosts_status(State(state)).await.unwrap();
248        assert!(resp.0.hosts.is_empty(), "no rollouts ⇒ empty hosts list");
249    }
250
251    #[tokio::test]
252    async fn hosts_status_projects_host_rollout_records() {
253        let state = fresh_state();
254        let db = state.db.clone().unwrap();
255        let rollout = "stable@abc12345";
256        let channel = "stable";
257        db.rollouts()
258            .record_rollout_opened(rollout, channel, rollout, chrono::Utc::now(), None)
259            .unwrap();
260
261        let now = chrono::Utc::now();
262        let mut row = HostRolloutState::new_pending(
263            rollout.into(),
264            "h1".to_string(),
265            channel.to_string(),
266            "h1-closure".to_string(),
267            now,
268            now + chrono::Duration::minutes(5),
269        );
270        row.state = HostState::Converged;
271        row.current_closure = Some("h1-closure".into());
272        row.converged_at = Some(now + chrono::Duration::minutes(10));
273        row.last_event_seq = 7;
274        db.host_rollout_records().upsert(&row).unwrap();
275
276        let resp = hosts_status(State(state.clone())).await.unwrap();
277        assert_eq!(resp.0.hosts.len(), 1);
278        let h = &resp.0.hosts[0];
279        assert_eq!(h.hostname, "h1");
280        assert_eq!(h.channel, "stable");
281        assert_eq!(h.last_rollout_id.as_deref(), Some(rollout));
282        assert!(h.converged, "state == Converged ⇒ converged = true");
283        assert_eq!(
284            h.rollout_state,
285            Some(nixfleet_proto::HostRolloutState::Converged),
286        );
287        assert_eq!(h.current_closure_hash.as_deref(), Some("h1-closure"));
288        assert_eq!(h.verified_event_count, 7);
289    }
290}