nixfleet_control_plane/db/
rollouts.rs

1//! Rollouts derived-view table (RFC-0008 §6.3). The applier is the sole
2//! writer; every state-mutating method takes an
3//! `event_log_seq: Option<i64>` so the row's `last_transition_event_log_seq`
4//! FK can be populated.
5//!
6//! Phase 10a baseline: the rollout reducer (Phase 10b) is unimplemented
7//! and the applier still drives transitions via the legacy PlanAction
8//! path. The new method shape (event_log_seq arg, state enum, target_ref)
9//! is ready; Phase 10b lights up the reducer that drives them through the
10//! `RolloutEffect` interpretation in the applier.
11//!
12//! `event_log_seq` is NULL-able under the v0.2.1 baseline (RFC-0008 §6.1
13//! item 3 + `.claude/plans/v0.2.1-followups.md` #1); same as
14//! `probe_failures.event_log_seq` (RFC-0007 §7.2).
15
16use anyhow::{Context, Result};
17use chrono::{DateTime, Utc};
18use rusqlite::{Connection, OptionalExtension, params};
19use std::sync::Mutex;
20
21use nixfleet_proto::RolloutId;
22use nixfleet_state_machine::rollout::RolloutState;
23
24/// Raw tuple shape of a `rollouts` row, as read by rusqlite. Fields:
25/// `(rollout_id, channel, target_ref, state, current_wave,
26///   opened_event_log_seq, last_transition_event_log_seq, opened_at,
27///   terminal_at, superseded_at)`. Aliased so clippy doesn't flag the
28/// type complexity on the inline closure.
29type RolloutRowTuple = (
30    RolloutId,
31    String,
32    String,
33    String,
34    i64,
35    Option<i64>,
36    Option<i64>,
37    String,
38    Option<String>,
39    Option<String>,
40);
41
42pub struct Rollouts<'a> {
43    pub(super) conn: &'a Mutex<Connection>,
44}
45
46/// Typed projection of a single `rollouts` row. Replaces the v0.1-era
47/// `SupersedeStatus` (which only carried `superseded_at`/`superseded_by`/
48/// `terminal_at`) with the full row shape so callers can read `state`
49/// directly without ad-hoc boolean derivation.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct RolloutRow {
52    pub rollout_id: RolloutId,
53    pub channel: String,
54    pub target_ref: String,
55    pub state: RolloutState,
56    pub current_wave: u32,
57    pub opened_event_log_seq: Option<i64>,
58    pub last_transition_event_log_seq: Option<i64>,
59    pub opened_at: DateTime<Utc>,
60    pub terminal_at: Option<DateTime<Utc>>,
61    pub superseded_at: Option<DateTime<Utc>>,
62}
63
64impl Rollouts<'_> {
65    /// Pure-insert of a new `Opening`-state rollout row. Idempotent on
66    /// `rollout_id` PK (INSERT OR IGNORE) — no side effects on other
67    /// rows.
68    ///
69    /// Supersession of prior in-flight rollouts on the same channel is
70    /// driven through the rollout reducer (Phase 10c): the applier
71    /// snapshots in-flight predecessors, calls this method, then routes
72    /// a `RolloutEvent::SuccessorOpened` per predecessor through
73    /// `process_rollout_event`. The reducer transitions each predecessor
74    /// from its current state to `Superseded` and emits a
75    /// `RolloutEffect::RecordRolloutTransition` that the applier writes
76    /// via `record_rollout_transition`. Closes the last RFC-0004 §3
77    /// "implicit side effect" anti-pattern in Phase 10.
78    pub fn record_rollout_opened(
79        &self,
80        rollout_id: &str,
81        channel: &str,
82        target_ref: &str,
83        opened_at: DateTime<Utc>,
84        opened_event_log_seq: Option<i64>,
85    ) -> Result<()> {
86        let opened_rfc = opened_at.to_rfc3339();
87        super::read(self.conn, |c| {
88            c.execute(
89                "INSERT OR IGNORE INTO rollouts(
90                     rollout_id, channel, target_ref, state, current_wave,
91                     opened_event_log_seq, last_transition_event_log_seq,
92                     opened_at)
93                 VALUES (?1, ?2, ?3, 'Opening', 0, ?4, ?4, ?5)",
94                params![
95                    rollout_id,
96                    channel,
97                    target_ref,
98                    opened_event_log_seq,
99                    opened_rfc
100                ],
101            )
102            .context("INSERT OR IGNORE rollouts")
103            .map(|_| ())
104        })
105    }
106
107    /// Record a state transition on an existing rollout row. Stamps the
108    /// `state` column, the appropriate timestamp side-effect
109    /// (`terminal_at` / `superseded_at`), and the
110    /// `last_transition_event_log_seq` FK.
111    ///
112    /// Idempotent on `(rollout_id, target_state)`: if the row is already
113    /// at `to`, the UPDATE no-ops via the `WHERE state != ?` guard.
114    pub fn record_rollout_transition(
115        &self,
116        rollout_id: &str,
117        to: RolloutState,
118        at: DateTime<Utc>,
119        event_log_seq: Option<i64>,
120    ) -> Result<usize> {
121        let at_rfc = at.to_rfc3339();
122        let to_str = to.as_db_str();
123        // SQLite has no enum; choose the timestamp side-effect by
124        // matching on `to` in Rust and building the SQL accordingly.
125        let (sql, bind_terminal, bind_superseded): (&str, bool, bool) = match to {
126            RolloutState::Terminal => (
127                "UPDATE rollouts
128                 SET state = ?2,
129                     last_transition_event_log_seq = ?3,
130                     terminal_at = ?4
131                 WHERE rollout_id = ?1 AND state != ?2",
132                true,
133                false,
134            ),
135            RolloutState::Superseded => (
136                "UPDATE rollouts
137                 SET state = ?2,
138                     last_transition_event_log_seq = ?3,
139                     superseded_at = ?4
140                 WHERE rollout_id = ?1 AND state != ?2",
141                false,
142                true,
143            ),
144            _ => (
145                "UPDATE rollouts
146                 SET state = ?2,
147                     last_transition_event_log_seq = ?3
148                 WHERE rollout_id = ?1 AND state != ?2",
149                false,
150                false,
151            ),
152        };
153        super::read(self.conn, |c| {
154            if bind_terminal || bind_superseded {
155                c.execute(sql, params![rollout_id, to_str, event_log_seq, at_rfc])
156                    .context("UPDATE rollouts state (with timestamp side-effect)")
157            } else {
158                c.execute(sql, params![rollout_id, to_str, event_log_seq])
159                    .context("UPDATE rollouts state")
160            }
161        })
162    }
163
164    /// Monotonic wave-index advance; `WHERE current_wave < ?2` blocks
165    /// concurrent ticks from racing backwards. Stamps the
166    /// `last_transition_event_log_seq` FK alongside.
167    pub fn set_current_wave(
168        &self,
169        rollout_id: &str,
170        wave: u32,
171        event_log_seq: Option<i64>,
172    ) -> Result<usize> {
173        super::read(self.conn, |c| {
174            c.execute(
175                "UPDATE rollouts
176                 SET current_wave = ?2,
177                     last_transition_event_log_seq = COALESCE(?3, last_transition_event_log_seq)
178                 WHERE rollout_id = ?1 AND current_wave < ?2",
179                params![rollout_id, wave as i64, event_log_seq],
180            )
181            .context("set_current_wave")
182        })
183    }
184
185    pub fn current_wave(&self, rollout_id: &str) -> Result<Option<u32>> {
186        super::read(self.conn, |c| {
187            c.query_row(
188                "SELECT current_wave FROM rollouts WHERE rollout_id = ?1",
189                params![rollout_id],
190                |row| row.get::<_, i64>(0).map(|w| w as u32),
191            )
192            .optional()
193            .context("query rollouts.current_wave")
194        })
195    }
196
197    /// Full row projection, or `None` if the rollout is unknown. Callers
198    /// project `state` directly; the v0.1 `is_superseded`/`is_terminal`/
199    /// `is_finished` boolean derivations are gone (use `state ==
200    /// RolloutState::X` reads).
201    pub fn state(&self, rollout_id: &str) -> Result<Option<RolloutRow>> {
202        super::read(self.conn, |c| {
203            let row = c
204                .query_row(
205                    "SELECT rollout_id, channel, target_ref, state, current_wave,
206                            opened_event_log_seq, last_transition_event_log_seq,
207                            opened_at, terminal_at, superseded_at
208                     FROM rollouts
209                     WHERE rollout_id = ?1",
210                    params![rollout_id],
211                    |row| -> rusqlite::Result<RolloutRowTuple> {
212                        Ok((
213                            row.get(0)?,
214                            row.get(1)?,
215                            row.get(2)?,
216                            row.get(3)?,
217                            row.get(4)?,
218                            row.get(5)?,
219                            row.get(6)?,
220                            row.get(7)?,
221                            row.get(8)?,
222                            row.get(9)?,
223                        ))
224                    },
225                )
226                .optional()
227                .context("query rollouts.state")?;
228            row.map(|t| -> Result<RolloutRow> {
229                let parse_ts =
230                    |raw: Option<String>, field: &str| -> Result<Option<DateTime<Utc>>> {
231                        match raw {
232                            Some(s) => Ok(Some(
233                                s.parse::<DateTime<Utc>>()
234                                    .with_context(|| format!("parse rollouts.{field}: {s}"))?,
235                            )),
236                            None => Ok(None),
237                        }
238                    };
239                let state = RolloutState::from_db_str(&t.3).ok_or_else(|| {
240                    anyhow::anyhow!("unknown rollouts.state value: {} (CHECK violation?)", t.3)
241                })?;
242                Ok(RolloutRow {
243                    rollout_id: t.0,
244                    channel: t.1,
245                    target_ref: t.2,
246                    state,
247                    current_wave: t.4 as u32,
248                    opened_event_log_seq: t.5,
249                    last_transition_event_log_seq: t.6,
250                    opened_at: t
251                        .7
252                        .parse::<DateTime<Utc>>()
253                        .with_context(|| format!("parse rollouts.opened_at: {}", t.7))?,
254                    terminal_at: parse_ts(t.8, "terminal_at")?,
255                    superseded_at: parse_ts(t.9, "superseded_at")?,
256                })
257            })
258            .transpose()
259        })
260    }
261
262    /// Gate-observed source. Filters `Superseded` and `Pruned` only —
263    /// terminal rollouts stay visible so channel-edges can detect
264    /// "predecessor converged". UI consumers should use `list_in_flight`.
265    pub fn list_active(&self) -> Result<GateRollouts> {
266        Ok(GateRollouts(self.list_filtered(false)?))
267    }
268
269    /// UI source. Filters `Superseded`, `Pruned`, AND `Terminal`
270    /// (operator's "done" view).
271    pub fn list_in_flight(&self) -> Result<UiRollouts> {
272        Ok(UiRollouts(self.list_filtered(true)?))
273    }
274
275    fn list_filtered(&self, exclude_terminal: bool) -> Result<Vec<ActiveRollout>> {
276        let sql = if exclude_terminal {
277            "SELECT rollout_id, channel, current_wave, opened_at, terminal_at
278             FROM rollouts
279             WHERE state NOT IN ('Superseded', 'Pruned', 'Terminal')
280             ORDER BY opened_at DESC, rollout_id"
281        } else {
282            "SELECT rollout_id, channel, current_wave, opened_at, terminal_at
283             FROM rollouts
284             WHERE state NOT IN ('Superseded', 'Pruned')
285             ORDER BY opened_at DESC, rollout_id"
286        };
287        let rows: Vec<(ActiveRollout, Option<String>)> = super::read(self.conn, |c| {
288            let mut stmt = c.prepare(sql)?;
289            let v = stmt
290                .query_map([], |row| {
291                    let terminal_at_raw: Option<String> = row.get(4)?;
292                    Ok((
293                        ActiveRollout {
294                            rollout_id: row.get(0)?,
295                            channel: row.get(1)?,
296                            current_wave: row.get::<_, i64>(2)? as u32,
297                            created_at: row.get::<_, String>(3)?,
298                            terminal_at: None,
299                        },
300                        terminal_at_raw,
301                    ))
302                })?
303                .collect::<std::result::Result<Vec<_>, _>>()?;
304            Ok(v)
305        })?;
306        rows.into_iter()
307            .map(|(mut row, raw)| -> Result<ActiveRollout> {
308                row.terminal_at = match raw {
309                    Some(s) => Some(
310                        s.parse::<DateTime<Utc>>()
311                            .with_context(|| format!("parse rollouts.terminal_at: {s}"))?,
312                    ),
313                    None => None,
314                };
315                Ok(row)
316            })
317            .collect()
318    }
319
320    /// Prune finished (Superseded | Terminal | Failed | Reverted)
321    /// rollouts past `max_age_hours` AND their `host_rollout_records`
322    /// rows. Returns `(host_rollout_records_pruned, rollouts_pruned)`.
323    ///
324    /// Phase 10b: this physical-deletion pass becomes a
325    /// `RetentionExpired` event emission instead, transitioning the row
326    /// to `Pruned` (the row persists for audit; v0.3 retention-
327    /// compaction handles physical deletion per RFC-0008 §3 + §13).
328    /// For 10a we keep the physical prune so the existing operator
329    /// workflow stays unchanged while the rollout reducer is
330    /// unimplemented.
331    pub fn prune_finished_rollouts(&self, max_age_hours: i64) -> Result<(usize, usize)> {
332        let cutoff_str = (Utc::now() - chrono::Duration::hours(max_age_hours)).to_rfc3339();
333        super::txn(self.conn, "prune_finished_rollouts", |t| {
334            let records_pruned = t
335                .execute(
336                    "DELETE FROM host_rollout_records
337                     WHERE rollout_id IN (
338                         SELECT rollout_id FROM rollouts
339                         WHERE state IN ('Superseded', 'Terminal', 'Failed', 'Reverted')
340                           AND (
341                               (superseded_at IS NOT NULL AND superseded_at < ?1)
342                               OR (terminal_at IS NOT NULL AND terminal_at < ?1)
343                           )
344                     )",
345                    params![&cutoff_str],
346                )
347                .context("DELETE host_rollout_records for finished rollouts")?;
348            let rollouts_pruned = t
349                .execute(
350                    "DELETE FROM rollouts
351                     WHERE state IN ('Superseded', 'Terminal', 'Failed', 'Reverted')
352                       AND (
353                           (superseded_at IS NOT NULL AND superseded_at < ?1)
354                           OR (terminal_at IS NOT NULL AND terminal_at < ?1)
355                       )",
356                    params![&cutoff_str],
357                )
358                .context("DELETE rollouts (finished + past retention)")?;
359            Ok((records_pruned, rollouts_pruned))
360        })
361    }
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct ActiveRollout {
366    pub rollout_id: RolloutId,
367    pub channel: String,
368    pub current_wave: u32,
369    pub created_at: String,
370    /// Set on terminal transition; threaded into the in-memory `Rollout`
371    /// so `advance_rollout` short-circuits and `channel_edges` can
372    /// distinguish "predecessor converged" from "predecessor unknown".
373    pub terminal_at: Option<DateTime<Utc>>,
374}
375
376/// Gate-observed view (keeps terminal). Type-disjoint from `UiRollouts`
377/// so a wrong query result can't leak into a gate consumer.
378#[derive(Debug, Clone, Default)]
379pub struct GateRollouts(Vec<ActiveRollout>);
380
381/// UI view (drops terminal). Drives `/v1/rollouts`, deferrals, metrics.
382#[derive(Debug, Clone, Default)]
383pub struct UiRollouts(Vec<ActiveRollout>);
384
385macro_rules! rollout_view_api {
386    ($t:ident) => {
387        impl $t {
388            pub fn iter(&self) -> std::slice::Iter<'_, ActiveRollout> {
389                self.0.iter()
390            }
391            pub fn len(&self) -> usize {
392                self.0.len()
393            }
394            pub fn is_empty(&self) -> bool {
395                self.0.is_empty()
396            }
397            pub fn into_inner(self) -> Vec<ActiveRollout> {
398                self.0
399            }
400        }
401        impl IntoIterator for $t {
402            type Item = ActiveRollout;
403            type IntoIter = std::vec::IntoIter<ActiveRollout>;
404            fn into_iter(self) -> Self::IntoIter {
405                self.0.into_iter()
406            }
407        }
408        impl<'a> IntoIterator for &'a $t {
409            type Item = &'a ActiveRollout;
410            type IntoIter = std::slice::Iter<'a, ActiveRollout>;
411            fn into_iter(self) -> Self::IntoIter {
412                self.0.iter()
413            }
414        }
415    };
416}
417rollout_view_api!(GateRollouts);
418rollout_view_api!(UiRollouts);
419
420impl GateRollouts {
421    /// One-way demotion (UI is a strict subset; reverse direction is a
422    /// type error so missing terminals can't be silently fabricated).
423    pub fn into_ui(self) -> UiRollouts {
424        UiRollouts(
425            self.0
426                .into_iter()
427                .filter(|r| r.terminal_at.is_none())
428                .collect(),
429        )
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use crate::db::Db;
437
438    fn fresh_db() -> Db {
439        let db = Db::open_in_memory().unwrap();
440        db.migrate().unwrap();
441        db
442    }
443
444    fn t0() -> DateTime<Utc> {
445        use chrono::TimeZone;
446        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
447    }
448
449    #[test]
450    fn record_rollout_opened_inserts_first_one_as_opening() {
451        let db = fresh_db();
452        db.rollouts()
453            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
454            .unwrap();
455        let row = db.rollouts().state("r1").unwrap().expect("rollout present");
456        assert_eq!(row.state, RolloutState::Opening);
457        assert_eq!(row.target_ref, "ref-1");
458        assert_eq!(row.channel, "stable");
459        assert!(row.superseded_at.is_none());
460    }
461
462    /// Pure-insert assertion. Phase 10c deleted the inline supersession
463    /// `UPDATE` from `record_rollout_opened`; the reducer-driven
464    /// `SuccessorOpened` path now owns the predecessor → Superseded
465    /// transition (re-derivability test covers it end-to-end). This test
466    /// pins the new contract: opening a second rollout on the same
467    /// channel leaves predecessors in their pre-existing state.
468    #[test]
469    fn record_rollout_opened_is_pure_insert() {
470        let db = fresh_db();
471        db.rollouts()
472            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
473            .unwrap();
474        db.rollouts()
475            .record_rollout_opened(
476                "r2",
477                "stable",
478                "ref-2",
479                t0() + chrono::Duration::seconds(1),
480                None,
481            )
482            .unwrap();
483        // r1 stays Opening — the applier (open_rollout) is responsible
484        // for routing SuccessorOpened through process_rollout_event,
485        // which drives the reducer transition. The DB method is now
486        // pure-insert and does not side-effect on prior rows.
487        assert_eq!(
488            db.rollouts().state("r1").unwrap().unwrap().state,
489            RolloutState::Opening
490        );
491        assert_eq!(
492            db.rollouts().state("r2").unwrap().unwrap().state,
493            RolloutState::Opening
494        );
495    }
496
497    #[test]
498    fn record_rollout_opened_does_not_supersede_across_channels() {
499        let db = fresh_db();
500        db.rollouts()
501            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
502            .unwrap();
503        db.rollouts()
504            .record_rollout_opened("r2", "edge-slow", "ref-2", t0(), None)
505            .unwrap();
506        assert_eq!(
507            db.rollouts().state("r1").unwrap().unwrap().state,
508            RolloutState::Opening
509        );
510        assert_eq!(
511            db.rollouts().state("r2").unwrap().unwrap().state,
512            RolloutState::Opening
513        );
514    }
515
516    #[test]
517    fn state_returns_none_for_unknown_rollout() {
518        let db = fresh_db();
519        assert!(db.rollouts().state("ghost").unwrap().is_none());
520    }
521
522    #[test]
523    fn record_rollout_transition_stamps_terminal() {
524        let db = fresh_db();
525        db.rollouts()
526            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
527            .unwrap();
528        let n = db
529            .rollouts()
530            .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
531            .unwrap();
532        assert_eq!(n, 1);
533        let row = db.rollouts().state("r1").unwrap().unwrap();
534        assert_eq!(row.state, RolloutState::Terminal);
535        assert!(row.terminal_at.is_some());
536        // Idempotent re-call no-ops.
537        let n2 = db
538            .rollouts()
539            .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
540            .unwrap();
541        assert_eq!(n2, 0);
542    }
543
544    #[test]
545    fn set_current_wave_is_monotonic_no_op_on_backwards() {
546        let db = fresh_db();
547        db.rollouts()
548            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
549            .unwrap();
550        assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(0));
551        let n = db.rollouts().set_current_wave("r1", 1, None).unwrap();
552        assert_eq!(n, 1);
553        assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(1));
554        // Backwards is no-op.
555        let n = db.rollouts().set_current_wave("r1", 0, None).unwrap();
556        assert_eq!(n, 0);
557        assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(1));
558    }
559
560    /// **Regression guard**: terminal rollouts STAY visible in
561    /// `list_active` (the gate-observed source) but are HIDDEN from
562    /// `list_in_flight` (the UI source). Same row, different views —
563    /// this is the load-bearing semantic the v0.1 lifecycle attempts
564    /// kept getting wrong.
565    #[test]
566    fn terminal_stays_in_list_active_but_drops_from_list_in_flight() {
567        let db = fresh_db();
568        db.rollouts()
569            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
570            .unwrap();
571        db.rollouts()
572            .record_rollout_opened("r2", "edge", "ref-2", t0(), None)
573            .unwrap();
574        assert_eq!(db.rollouts().list_active().unwrap().len(), 2);
575        assert_eq!(db.rollouts().list_in_flight().unwrap().len(), 2);
576
577        db.rollouts()
578            .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
579            .unwrap();
580
581        let active = db.rollouts().list_active().unwrap();
582        assert_eq!(
583            active.len(),
584            2,
585            "list_active must include terminal rollouts so gates can see converged predecessors"
586        );
587        let r1_active = active
588            .iter()
589            .find(|r| r.rollout_id.as_str() == "r1")
590            .unwrap();
591        assert!(r1_active.terminal_at.is_some());
592
593        let in_flight = db.rollouts().list_in_flight().unwrap().into_inner();
594        assert_eq!(in_flight.len(), 1);
595        assert_eq!(in_flight[0].rollout_id.as_str(), "r2");
596    }
597
598    /// Superseded rollouts are dropped from BOTH views — supersession is
599    /// the stronger signal (newer rollout for the same channel exists,
600    /// gates evaluate against it).
601    #[test]
602    fn superseded_dropped_from_both_list_active_and_list_in_flight() {
603        let db = fresh_db();
604        db.rollouts()
605            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
606            .unwrap();
607        // Phase 10c: supersession is reducer-driven via the applier; in
608        // a db-level unit test we synthesize the end-state directly via
609        // record_rollout_transition. Re-derivability through the reducer
610        // is exercised in `tests/rollout_rederivability.rs`.
611        db.rollouts()
612            .record_rollout_transition(
613                "r1",
614                RolloutState::Superseded,
615                t0() + chrono::Duration::seconds(1),
616                None,
617            )
618            .unwrap();
619        for rid in db.rollouts().list_active().unwrap().iter() {
620            assert_ne!(rid.rollout_id.as_str(), "r1");
621        }
622        for rid in db.rollouts().list_in_flight().unwrap().iter() {
623            assert_ne!(rid.rollout_id.as_str(), "r1");
624        }
625    }
626
627    /// `GateRollouts.into_ui()` filters out terminal rollouts.
628    #[test]
629    fn gate_rollouts_into_ui_filters_terminal() {
630        let db = fresh_db();
631        db.rollouts()
632            .record_rollout_opened("r-active", "stable", "ref-a", t0(), None)
633            .unwrap();
634        db.rollouts()
635            .record_rollout_opened("r-converged", "edge", "ref-c", t0(), None)
636            .unwrap();
637        db.rollouts()
638            .record_rollout_transition("r-converged", RolloutState::Terminal, t0(), None)
639            .unwrap();
640        let gate = db.rollouts().list_active().unwrap();
641        assert_eq!(gate.len(), 2);
642        let ui = gate.into_ui();
643        assert_eq!(ui.len(), 1);
644        assert_eq!(ui.into_inner()[0].rollout_id.as_str(), "r-active");
645    }
646
647    /// **Documentation test** — GateRollouts and UiRollouts must remain
648    /// distinct types so a future commit can't conflate them. If a `From<
649    /// UiRollouts> for GateRollouts` impl is added, the asymmetric
650    /// `into_ui` invariant breaks; keep this test as a tripwire.
651    #[test]
652    fn gate_and_ui_rollouts_are_distinct_types() {
653        let db = fresh_db();
654        db.rollouts()
655            .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
656            .unwrap();
657        let _gate: super::GateRollouts = db.rollouts().list_active().unwrap();
658        let _ui: super::UiRollouts = db.rollouts().list_in_flight().unwrap();
659    }
660
661    /// **Regression guard**: prune drops finished rollouts past
662    /// retention AND their host_rollout_records rows; leaves in-flight
663    /// rollouts and recent finishes alone.
664    #[test]
665    fn prune_finished_rollouts_drops_old_finished_keeps_recent_and_in_flight() {
666        let db = fresh_db();
667        let now = chrono::Utc::now();
668        let old = now - chrono::Duration::days(120);
669        let recent = now - chrono::Duration::days(30);
670
671        // r-active: in-flight, never touched. Must survive prune.
672        db.rollouts()
673            .record_rollout_opened("r-active", "stable", "ref-a", now, None)
674            .unwrap();
675
676        // r-old-superseded: superseded long ago. Phase 10c made
677        // record_rollout_opened pure-insert; the prune scenario
678        // drives the supersession transition explicitly via
679        // record_rollout_transition (matches what the applier's
680        // reducer-driven SuccessorOpened path would do).
681        db.rollouts()
682            .record_rollout_opened("r-old-superseded", "edge", "ref-os", now, None)
683            .unwrap();
684        db.rollouts()
685            .record_rollout_opened("r-old-superseder", "edge", "ref-osr", now, None)
686            .unwrap();
687        db.rollouts()
688            .record_rollout_transition("r-old-superseded", RolloutState::Superseded, old, None)
689            .unwrap();
690
691        // r-recent-terminal: terminal recently (30d). Should NOT prune.
692        db.rollouts()
693            .record_rollout_opened("r-recent-terminal", "preview", "ref-rt", now, None)
694            .unwrap();
695        db.rollouts()
696            .record_rollout_transition("r-recent-terminal", RolloutState::Terminal, recent, None)
697            .unwrap();
698
699        // r-old-terminal: terminal long ago (120d). Should prune.
700        db.rollouts()
701            .record_rollout_opened("r-old-terminal", "preview-old", "ref-ot", now, None)
702            .unwrap();
703        db.rollouts()
704            .record_rollout_transition("r-old-terminal", RolloutState::Terminal, old, None)
705            .unwrap();
706
707        // host_rollout_records rows tied to each.
708        for rid in [
709            "r-active",
710            "r-old-superseded",
711            "r-recent-terminal",
712            "r-old-terminal",
713        ] {
714            let row = nixfleet_state_machine::HostRolloutState::new_pending(
715                rid.into(),
716                "host-x".to_string(),
717                "stable".to_string(),
718                format!("closure-{rid}"),
719                now,
720                now + chrono::Duration::minutes(5),
721            );
722            db.host_rollout_records().upsert(&row).unwrap();
723        }
724
725        let (records_pruned, rollouts_pruned) =
726            db.rollouts().prune_finished_rollouts(24 * 90).unwrap();
727        assert_eq!(rollouts_pruned, 2, "r-old-superseded + r-old-terminal");
728        assert_eq!(records_pruned, 2);
729
730        // r-active and r-recent-terminal retained.
731        let active = db.rollouts().list_active().unwrap();
732        let kept: Vec<&str> = active.iter().map(|r| r.rollout_id.as_str()).collect();
733        assert!(kept.contains(&"r-active"));
734        assert!(db.rollouts().state("r-recent-terminal").unwrap().is_some());
735        assert!(db.rollouts().state("r-old-superseded").unwrap().is_none());
736        assert!(db.rollouts().state("r-old-terminal").unwrap().is_none());
737    }
738}