nixfleet_control_plane/db/
probe_failures.rs

1//! Per-(rollout, host) typed denormalization of enforce-mode probe
2//! failures. Source of truth is `event_log` (canonical, append-only);
3//! this table is a derived projection providing the indexed columns
4//! the compliance-wave gate needs cheaply.
5//!
6//! RFC-0007 §7.2 shape:
7//! - `event_log_seq`  — FK back to the source `event_log` row
8//! - `rollout_id`     — gate aggregates per rollout
9//! - `host_id`        — gate aggregates per host
10//! - `probe_name`     — operator-facing probe identifier
11//! - `control_id`     — set for evidence-kind sub-result rows; NULL for
12//!   non-evidence enforce-mode probe failures
13//! - `framework`      — set for evidence sub-result rows; NULL otherwise
14//! - `observed_at`    — agent-supplied observation timestamp
15//!
16//! Indexed on `(rollout_id, host_id, control_id)` so the compliance-wave
17//! gate's distinct-control count query is cheap.
18//!
19//! ## Phase 9a state (stub)
20//!
21//! The schema lands in this commit. The writer side (single-transaction
22//! co-write from the applier's `RemoteAppendEventLog` handler on
23//! enforce-mode `ProbeResult { status = Fail }` events) lands in 9b.
24//! Until then the table is unwritten and every projection returns the
25//! empty map — `outstanding_failing_enforce_probes_by_rollout` exposes
26//! the SHAPE that 9b will fill with data.
27
28use anyhow::{Context, Result};
29use chrono::{DateTime, Utc};
30use rusqlite::{Connection, params};
31use std::collections::HashMap;
32use std::sync::Mutex;
33
34pub struct ProbeFailures<'a> {
35    pub(super) conn: &'a Mutex<Connection>,
36}
37
38/// One `probe_failures` row insert. Borrowed for ergonomics — the
39/// applier's hot path passes references off the inbound payload.
40#[derive(Debug, Clone)]
41pub struct ProbeFailureInsert<'a> {
42    pub rollout_id: &'a str,
43    pub host_id: &'a str,
44    pub probe_name: &'a str,
45    pub control_id: Option<&'a str>,
46    pub framework: Option<&'a str>,
47    pub observed_at: DateTime<Utc>,
48}
49
50impl ProbeFailures<'_> {
51    /// Insert a batch of probe_failures rows. The applier calls this
52    /// from the `RemoteAppendEventLog` handler on enforce-mode
53    /// `ProbeResult { status = Fail }`; for evidence probes it walks
54    /// `sub_results` and inserts one row per failing control.
55    ///
56    /// 9b shape: one transaction wraps the whole batch. `event_log_seq`
57    /// is left NULL — the FK column in the schema permits NULL; a
58    /// follow-up tightening lands when the event_log_writer task can
59    /// hand back the row's `seq` synchronously (today's writer is
60    /// fire-and-forget on a bounded channel).
61    pub fn insert_many(&self, rows: &[ProbeFailureInsert<'_>]) -> Result<()> {
62        if rows.is_empty() {
63            return Ok(());
64        }
65        super::txn(self.conn, "probe_failures.insert_many", |tx| {
66            let mut stmt = tx.prepare(
67                "INSERT INTO probe_failures
68                   (event_log_seq, rollout_id, host_id, probe_name,
69                    control_id, framework, observed_at)
70                 VALUES (NULL, ?1, ?2, ?3, ?4, ?5, ?6)",
71            )?;
72            for r in rows {
73                stmt.execute(params![
74                    r.rollout_id,
75                    r.host_id,
76                    r.probe_name,
77                    r.control_id,
78                    r.framework,
79                    r.observed_at.to_rfc3339(),
80                ])
81                .context("insert probe_failures row")?;
82            }
83            Ok(())
84        })
85    }
86
87    /// Per-(rollout, host) distinct-control failure counts. Gate's
88    /// canonical input projection — empty in 9a, populated by 9b's
89    /// applier co-write.
90    pub fn outstanding_failing_enforce_probes_by_rollout(
91        &self,
92    ) -> Result<HashMap<nixfleet_proto::RolloutId, HashMap<String, usize>>> {
93        super::read(self.conn, |c| {
94            let mut stmt = c.prepare(
95                "SELECT rollout_id, host_id,
96                        COUNT(DISTINCT COALESCE(control_id, probe_name))
97                 FROM probe_failures
98                 GROUP BY rollout_id, host_id",
99            )?;
100            let rows = stmt
101                .query_map([], |row| {
102                    Ok((
103                        row.get::<_, nixfleet_proto::RolloutId>(0)?,
104                        row.get::<_, String>(1)?,
105                        row.get::<_, i64>(2)? as usize,
106                    ))
107                })?
108                .collect::<rusqlite::Result<Vec<_>>>()
109                .context("query outstanding_failing_enforce_probes_by_rollout")?;
110            let mut out: HashMap<nixfleet_proto::RolloutId, HashMap<String, usize>> =
111                HashMap::new();
112            for (rollout, host, n) in rows {
113                out.entry(rollout).or_default().insert(host, n);
114            }
115            Ok(out)
116        })
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::super::Db;
123    use super::ProbeFailureInsert;
124    use chrono::Utc;
125
126    /// Projection returns empty before any writer call (9a baseline).
127    #[test]
128    fn empty_when_table_unwritten() {
129        let db = Db::open_in_memory().expect("open in-memory db");
130        db.migrate().expect("apply migrations");
131        let got = db
132            .probe_failures()
133            .outstanding_failing_enforce_probes_by_rollout()
134            .expect("query");
135        assert!(got.is_empty(), "baseline: projection must be empty");
136    }
137
138    /// Evidence probe with 3 distinct failing controls on (R1, h1) →
139    /// projection returns 3. Distinct counted on control_id.
140    #[test]
141    fn insert_many_and_project_counts_distinct_controls() {
142        let db = Db::open_in_memory().expect("open in-memory db");
143        db.migrate().expect("apply migrations");
144        let now = Utc::now();
145        let rows: Vec<ProbeFailureInsert> = ["c1", "c2", "c3"]
146            .into_iter()
147            .map(|c| ProbeFailureInsert {
148                rollout_id: "R1",
149                host_id: "h1",
150                probe_name: "evidence-nis2",
151                control_id: Some(c),
152                framework: Some("nis2-essential"),
153                observed_at: now,
154            })
155            .collect();
156        db.probe_failures().insert_many(&rows).expect("insert");
157        let got = db
158            .probe_failures()
159            .outstanding_failing_enforce_probes_by_rollout()
160            .expect("query");
161        assert_eq!(got.get("R1").and_then(|m| m.get("h1")).copied(), Some(3));
162    }
163
164    /// Non-evidence enforce-fail probe (control_id=NULL) → projection
165    /// counts it as 1, keyed by probe_name fallback.
166    #[test]
167    fn insert_many_non_evidence_uses_probe_name_fallback() {
168        let db = Db::open_in_memory().expect("open in-memory db");
169        db.migrate().expect("apply migrations");
170        let row = ProbeFailureInsert {
171            rollout_id: "R2",
172            host_id: "h1",
173            probe_name: "heartbeat",
174            control_id: None,
175            framework: None,
176            observed_at: Utc::now(),
177        };
178        db.probe_failures().insert_many(&[row]).expect("insert");
179        let got = db
180            .probe_failures()
181            .outstanding_failing_enforce_probes_by_rollout()
182            .expect("query");
183        assert_eq!(got.get("R2").and_then(|m| m.get("h1")).copied(), Some(1));
184    }
185
186    /// Distinct-counting collapses duplicate inserts on the same
187    /// control_id (the projection uses COUNT(DISTINCT ...)).
188    #[test]
189    fn insert_many_duplicate_control_ids_collapse() {
190        let db = Db::open_in_memory().expect("open in-memory db");
191        db.migrate().expect("apply migrations");
192        let now = Utc::now();
193        let rows: Vec<ProbeFailureInsert> = (0..5)
194            .map(|_| ProbeFailureInsert {
195                rollout_id: "R1",
196                host_id: "h1",
197                probe_name: "evidence-nis2",
198                control_id: Some("same-control"),
199                framework: Some("nis2-essential"),
200                observed_at: now,
201            })
202            .collect();
203        db.probe_failures().insert_many(&rows).expect("insert");
204        let got = db
205            .probe_failures()
206            .outstanding_failing_enforce_probes_by_rollout()
207            .expect("query");
208        assert_eq!(
209            got.get("R1").and_then(|m| m.get("h1")).copied(),
210            Some(1),
211            "DISTINCT control_id collapses 5 duplicates to 1",
212        );
213    }
214}