nixfleet_control_plane/server/routes/
deferrals.rs1use std::collections::HashSet;
12use std::sync::Arc;
13
14use axum::extract::State;
15use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
16use axum::response::IntoResponse;
17
18use super::super::state::AppState;
19use crate::db::event_log::EventLogKind;
20
21const SCAN_LIMIT: i64 = 1024;
25
26pub(in crate::server) async fn list(
27 State(state): State<Arc<AppState>>,
28) -> Result<impl IntoResponse, StatusCode> {
29 let payload = project_deferrals(&state)?;
30 let body = payload.to_string();
31 let mut headers = HeaderMap::new();
32 headers.insert(
33 header::CONTENT_TYPE,
34 HeaderValue::from_static("application/json"),
35 );
36 Ok((headers, body))
37}
38
39fn project_deferrals(state: &Arc<AppState>) -> Result<serde_json::Value, StatusCode> {
44 let Some(db) = state.db.as_ref() else {
45 return Ok(serde_json::json!({ "deferrals": [] }));
46 };
47 let rows = match db
48 .event_log()
49 .query_by_kind(EventLogKind::GateDecision, SCAN_LIMIT)
50 {
51 Ok(rs) => rs,
52 Err(err) => {
53 tracing::error!(target: "deferrals", error = %err, "event_log query failed");
54 return Err(StatusCode::INTERNAL_SERVER_ERROR);
55 }
56 };
57
58 let mut seen: HashSet<(String, String)> = HashSet::new();
59 let mut deferrals: Vec<serde_json::Value> = Vec::new();
60 for row in rows.into_iter().rev() {
61 let payload: serde_json::Value = match serde_json::from_str(&row.payload) {
62 Ok(v) => v,
63 Err(_) => continue,
64 };
65 let host = payload
66 .get("host")
67 .and_then(serde_json::Value::as_str)
68 .unwrap_or("");
69 let rollout = payload
70 .get("rollout")
71 .and_then(serde_json::Value::as_str)
72 .unwrap_or("");
73 if host.is_empty() || rollout.is_empty() {
74 continue;
75 }
76 if !seen.insert((host.to_string(), rollout.to_string())) {
77 continue;
78 }
79 let gate = payload
80 .get("gate")
81 .and_then(serde_json::Value::as_str)
82 .unwrap_or("unknown");
83 let reason = payload
84 .get("reason")
85 .and_then(serde_json::Value::as_str)
86 .unwrap_or("");
87 deferrals.push(serde_json::json!({
88 "host": host,
89 "rollout": rollout,
90 "blockedBy": gate,
91 "reason": reason,
92 "observedAt": row.ts.to_rfc3339(),
93 }));
94 }
95 Ok(serde_json::json!({ "deferrals": deferrals }))
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
104 use crate::db::Db;
105 use crate::db::event_log::EventLogEntry;
106 use chrono::Utc;
107
108 fn fresh_state() -> Arc<AppState> {
109 let db = Db::open_in_memory().unwrap();
110 db.migrate().unwrap();
111 Arc::new(AppState {
112 db: Some(Arc::new(db)),
113 ..Default::default()
114 })
115 }
116
117 #[tokio::test]
118 async fn deferrals_empty_when_no_gate_decisions() {
119 let state = fresh_state();
120 let v = project_deferrals(&state).unwrap();
121 assert_eq!(v["deferrals"], serde_json::json!([]));
122 }
123
124 #[tokio::test]
125 async fn deferrals_dedups_to_latest_per_host_rollout_pair() {
126 let state = fresh_state();
127 let db = state.db.clone().unwrap();
128 let now = Utc::now();
129 db.event_log()
132 .append(&EventLogEntry {
133 kind: EventLogKind::GateDecision,
134 ts: now,
135 host_id: Some("h2".into()),
136 rollout_id: Some("stable".into()),
137 payload:
138 r#"{"host":"h2","rollout":"stable","gate":"wave-promotion","reason":"earlier"}"#
139 .into(),
140 })
141 .unwrap();
142 db.event_log()
143 .append(&EventLogEntry {
144 kind: EventLogKind::GateDecision,
145 ts: now + chrono::Duration::seconds(1),
146 host_id: Some("h2".into()),
147 rollout_id: Some("stable".into()),
148 payload:
149 r#"{"host":"h2","rollout":"stable","gate":"compliance-wave","reason":"latest"}"#
150 .into(),
151 })
152 .unwrap();
153
154 let v = project_deferrals(&state).unwrap();
155 let arr = v["deferrals"].as_array().unwrap();
156 assert_eq!(arr.len(), 1, "dedup keeps one entry per (host, rollout)");
157 assert_eq!(arr[0]["host"], "h2");
158 assert_eq!(arr[0]["blockedBy"], "compliance-wave");
159 assert_eq!(arr[0]["reason"], "latest");
160 }
161}