nixfleet_control_plane/server/routes/
deferrals.rs

1//! `GET /v1/deferrals` - currently-blocked (rollout, host) pairs as observed
2//! by the latest gate decisions in `event_log`.
3//!
4//! Each row in `event_log` with `kind = 'gate_decision'` represents one
5//! `plan_next` pass blocking a host's dispatch (Phase 5b's
6//! `PlanAction::DeferDispatch`). We dedupe to one entry per (host, rollout)
7//! pair keeping the most recent decision; that gives operators "what's
8//! holding things up right now" without the historical stream — full
9//! history stays available via the raw event_log query.
10
11use std::collections::HashSet;
12use std::sync::Arc;
13
14use axum::extract::State;
15use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
16use axum::response::IntoResponse;
17
18use super::super::state::AppState;
19use crate::db::event_log::EventLogKind;
20
21/// Max rows scanned from event_log. v0.2 scale (~256 hosts × ~few gate
22/// decisions per dispatch attempt) keeps practical totals well under this
23/// even mid-rollout. If you raise this, also bump it in the CLI.
24const SCAN_LIMIT: i64 = 1024;
25
26pub(in crate::server) async fn list(
27    State(state): State<Arc<AppState>>,
28) -> Result<impl IntoResponse, StatusCode> {
29    let payload = project_deferrals(&state)?;
30    let body = payload.to_string();
31    let mut headers = HeaderMap::new();
32    headers.insert(
33        header::CONTENT_TYPE,
34        HeaderValue::from_static("application/json"),
35    );
36    Ok((headers, body))
37}
38
39/// Pure projection from `event_log` to the deferrals response shape.
40/// Split out from the route handler so unit tests can assert against
41/// `serde_json::Value` directly without exercising the axum response
42/// pipeline.
43fn project_deferrals(state: &Arc<AppState>) -> Result<serde_json::Value, StatusCode> {
44    let Some(db) = state.db.as_ref() else {
45        return Ok(serde_json::json!({ "deferrals": [] }));
46    };
47    let rows = match db
48        .event_log()
49        .query_by_kind(EventLogKind::GateDecision, SCAN_LIMIT)
50    {
51        Ok(rs) => rs,
52        Err(err) => {
53            tracing::error!(target: "deferrals", error = %err, "event_log query failed");
54            return Err(StatusCode::INTERNAL_SERVER_ERROR);
55        }
56    };
57
58    let mut seen: HashSet<(String, String)> = HashSet::new();
59    let mut deferrals: Vec<serde_json::Value> = Vec::new();
60    for row in rows.into_iter().rev() {
61        let payload: serde_json::Value = match serde_json::from_str(&row.payload) {
62            Ok(v) => v,
63            Err(_) => continue,
64        };
65        let host = payload
66            .get("host")
67            .and_then(serde_json::Value::as_str)
68            .unwrap_or("");
69        let rollout = payload
70            .get("rollout")
71            .and_then(serde_json::Value::as_str)
72            .unwrap_or("");
73        if host.is_empty() || rollout.is_empty() {
74            continue;
75        }
76        if !seen.insert((host.to_string(), rollout.to_string())) {
77            continue;
78        }
79        let gate = payload
80            .get("gate")
81            .and_then(serde_json::Value::as_str)
82            .unwrap_or("unknown");
83        let reason = payload
84            .get("reason")
85            .and_then(serde_json::Value::as_str)
86            .unwrap_or("");
87        deferrals.push(serde_json::json!({
88            "host": host,
89            "rollout": rollout,
90            "blockedBy": gate,
91            "reason": reason,
92            "observedAt": row.ts.to_rfc3339(),
93        }));
94    }
95    Ok(serde_json::json!({ "deferrals": deferrals }))
96}
97
98#[cfg(test)]
99mod tests {
100    //! Pure-projection tests against `project_deferrals` — the route
101    //! handler is a thin axum wrapper around it.
102
103    use super::*;
104    use crate::db::Db;
105    use crate::db::event_log::EventLogEntry;
106    use chrono::Utc;
107
108    fn fresh_state() -> Arc<AppState> {
109        let db = Db::open_in_memory().unwrap();
110        db.migrate().unwrap();
111        Arc::new(AppState {
112            db: Some(Arc::new(db)),
113            ..Default::default()
114        })
115    }
116
117    #[tokio::test]
118    async fn deferrals_empty_when_no_gate_decisions() {
119        let state = fresh_state();
120        let v = project_deferrals(&state).unwrap();
121        assert_eq!(v["deferrals"], serde_json::json!([]));
122    }
123
124    #[tokio::test]
125    async fn deferrals_dedups_to_latest_per_host_rollout_pair() {
126        let state = fresh_state();
127        let db = state.db.clone().unwrap();
128        let now = Utc::now();
129        // Two decisions for (h2, stable): older says wave-promotion,
130        // newer says compliance-wave. Expect the latest only.
131        db.event_log()
132            .append(&EventLogEntry {
133                kind: EventLogKind::GateDecision,
134                ts: now,
135                host_id: Some("h2".into()),
136                rollout_id: Some("stable".into()),
137                payload:
138                    r#"{"host":"h2","rollout":"stable","gate":"wave-promotion","reason":"earlier"}"#
139                        .into(),
140            })
141            .unwrap();
142        db.event_log()
143            .append(&EventLogEntry {
144                kind: EventLogKind::GateDecision,
145                ts: now + chrono::Duration::seconds(1),
146                host_id: Some("h2".into()),
147                rollout_id: Some("stable".into()),
148                payload:
149                    r#"{"host":"h2","rollout":"stable","gate":"compliance-wave","reason":"latest"}"#
150                        .into(),
151            })
152            .unwrap();
153
154        let v = project_deferrals(&state).unwrap();
155        let arr = v["deferrals"].as_array().unwrap();
156        assert_eq!(arr.len(), 1, "dedup keeps one entry per (host, rollout)");
157        assert_eq!(arr[0]["host"], "h2");
158        assert_eq!(arr[0]["blockedBy"], "compliance-wave");
159        assert_eq!(arr[0]["reason"], "latest");
160    }
161}