nixfleet_control_plane/db/
probe_failures.rs1use anyhow::{Context, Result};
29use chrono::{DateTime, Utc};
30use rusqlite::{Connection, params};
31use std::collections::HashMap;
32use std::sync::Mutex;
33
34pub struct ProbeFailures<'a> {
35 pub(super) conn: &'a Mutex<Connection>,
36}
37
38#[derive(Debug, Clone)]
41pub struct ProbeFailureInsert<'a> {
42 pub rollout_id: &'a str,
43 pub host_id: &'a str,
44 pub probe_name: &'a str,
45 pub control_id: Option<&'a str>,
46 pub framework: Option<&'a str>,
47 pub observed_at: DateTime<Utc>,
48}
49
50impl ProbeFailures<'_> {
51 pub fn insert_many(&self, rows: &[ProbeFailureInsert<'_>]) -> Result<()> {
62 if rows.is_empty() {
63 return Ok(());
64 }
65 super::txn(self.conn, "probe_failures.insert_many", |tx| {
66 let mut stmt = tx.prepare(
67 "INSERT INTO probe_failures
68 (event_log_seq, rollout_id, host_id, probe_name,
69 control_id, framework, observed_at)
70 VALUES (NULL, ?1, ?2, ?3, ?4, ?5, ?6)",
71 )?;
72 for r in rows {
73 stmt.execute(params![
74 r.rollout_id,
75 r.host_id,
76 r.probe_name,
77 r.control_id,
78 r.framework,
79 r.observed_at.to_rfc3339(),
80 ])
81 .context("insert probe_failures row")?;
82 }
83 Ok(())
84 })
85 }
86
87 pub fn outstanding_failing_enforce_probes_by_rollout(
91 &self,
92 ) -> Result<HashMap<nixfleet_proto::RolloutId, HashMap<String, usize>>> {
93 super::read(self.conn, |c| {
94 let mut stmt = c.prepare(
95 "SELECT rollout_id, host_id,
96 COUNT(DISTINCT COALESCE(control_id, probe_name))
97 FROM probe_failures
98 GROUP BY rollout_id, host_id",
99 )?;
100 let rows = stmt
101 .query_map([], |row| {
102 Ok((
103 row.get::<_, nixfleet_proto::RolloutId>(0)?,
104 row.get::<_, String>(1)?,
105 row.get::<_, i64>(2)? as usize,
106 ))
107 })?
108 .collect::<rusqlite::Result<Vec<_>>>()
109 .context("query outstanding_failing_enforce_probes_by_rollout")?;
110 let mut out: HashMap<nixfleet_proto::RolloutId, HashMap<String, usize>> =
111 HashMap::new();
112 for (rollout, host, n) in rows {
113 out.entry(rollout).or_default().insert(host, n);
114 }
115 Ok(out)
116 })
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::super::Db;
123 use super::ProbeFailureInsert;
124 use chrono::Utc;
125
126 #[test]
128 fn empty_when_table_unwritten() {
129 let db = Db::open_in_memory().expect("open in-memory db");
130 db.migrate().expect("apply migrations");
131 let got = db
132 .probe_failures()
133 .outstanding_failing_enforce_probes_by_rollout()
134 .expect("query");
135 assert!(got.is_empty(), "baseline: projection must be empty");
136 }
137
138 #[test]
141 fn insert_many_and_project_counts_distinct_controls() {
142 let db = Db::open_in_memory().expect("open in-memory db");
143 db.migrate().expect("apply migrations");
144 let now = Utc::now();
145 let rows: Vec<ProbeFailureInsert> = ["c1", "c2", "c3"]
146 .into_iter()
147 .map(|c| ProbeFailureInsert {
148 rollout_id: "R1",
149 host_id: "h1",
150 probe_name: "evidence-nis2",
151 control_id: Some(c),
152 framework: Some("nis2-essential"),
153 observed_at: now,
154 })
155 .collect();
156 db.probe_failures().insert_many(&rows).expect("insert");
157 let got = db
158 .probe_failures()
159 .outstanding_failing_enforce_probes_by_rollout()
160 .expect("query");
161 assert_eq!(got.get("R1").and_then(|m| m.get("h1")).copied(), Some(3));
162 }
163
164 #[test]
167 fn insert_many_non_evidence_uses_probe_name_fallback() {
168 let db = Db::open_in_memory().expect("open in-memory db");
169 db.migrate().expect("apply migrations");
170 let row = ProbeFailureInsert {
171 rollout_id: "R2",
172 host_id: "h1",
173 probe_name: "heartbeat",
174 control_id: None,
175 framework: None,
176 observed_at: Utc::now(),
177 };
178 db.probe_failures().insert_many(&[row]).expect("insert");
179 let got = db
180 .probe_failures()
181 .outstanding_failing_enforce_probes_by_rollout()
182 .expect("query");
183 assert_eq!(got.get("R2").and_then(|m| m.get("h1")).copied(), Some(1));
184 }
185
186 #[test]
189 fn insert_many_duplicate_control_ids_collapse() {
190 let db = Db::open_in_memory().expect("open in-memory db");
191 db.migrate().expect("apply migrations");
192 let now = Utc::now();
193 let rows: Vec<ProbeFailureInsert> = (0..5)
194 .map(|_| ProbeFailureInsert {
195 rollout_id: "R1",
196 host_id: "h1",
197 probe_name: "evidence-nis2",
198 control_id: Some("same-control"),
199 framework: Some("nis2-essential"),
200 observed_at: now,
201 })
202 .collect();
203 db.probe_failures().insert_many(&rows).expect("insert");
204 let got = db
205 .probe_failures()
206 .outstanding_failing_enforce_probes_by_rollout()
207 .expect("query");
208 assert_eq!(
209 got.get("R1").and_then(|m| m.get("h1")).copied(),
210 Some(1),
211 "DISTINCT control_id collapses 5 duplicates to 1",
212 );
213 }
214}