1use anyhow::{Context, Result};
17use chrono::{DateTime, Utc};
18use rusqlite::{Connection, OptionalExtension, params};
19use std::sync::Mutex;
20
21use nixfleet_proto::RolloutId;
22use nixfleet_state_machine::rollout::RolloutState;
23
24type RolloutRowTuple = (
30 RolloutId,
31 String,
32 String,
33 String,
34 i64,
35 Option<i64>,
36 Option<i64>,
37 String,
38 Option<String>,
39 Option<String>,
40);
41
42pub struct Rollouts<'a> {
43 pub(super) conn: &'a Mutex<Connection>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct RolloutRow {
52 pub rollout_id: RolloutId,
53 pub channel: String,
54 pub target_ref: String,
55 pub state: RolloutState,
56 pub current_wave: u32,
57 pub opened_event_log_seq: Option<i64>,
58 pub last_transition_event_log_seq: Option<i64>,
59 pub opened_at: DateTime<Utc>,
60 pub terminal_at: Option<DateTime<Utc>>,
61 pub superseded_at: Option<DateTime<Utc>>,
62}
63
64impl Rollouts<'_> {
65 pub fn record_rollout_opened(
79 &self,
80 rollout_id: &str,
81 channel: &str,
82 target_ref: &str,
83 opened_at: DateTime<Utc>,
84 opened_event_log_seq: Option<i64>,
85 ) -> Result<()> {
86 let opened_rfc = opened_at.to_rfc3339();
87 super::read(self.conn, |c| {
88 c.execute(
89 "INSERT OR IGNORE INTO rollouts(
90 rollout_id, channel, target_ref, state, current_wave,
91 opened_event_log_seq, last_transition_event_log_seq,
92 opened_at)
93 VALUES (?1, ?2, ?3, 'Opening', 0, ?4, ?4, ?5)",
94 params![
95 rollout_id,
96 channel,
97 target_ref,
98 opened_event_log_seq,
99 opened_rfc
100 ],
101 )
102 .context("INSERT OR IGNORE rollouts")
103 .map(|_| ())
104 })
105 }
106
107 pub fn record_rollout_transition(
115 &self,
116 rollout_id: &str,
117 to: RolloutState,
118 at: DateTime<Utc>,
119 event_log_seq: Option<i64>,
120 ) -> Result<usize> {
121 let at_rfc = at.to_rfc3339();
122 let to_str = to.as_db_str();
123 let (sql, bind_terminal, bind_superseded): (&str, bool, bool) = match to {
126 RolloutState::Terminal => (
127 "UPDATE rollouts
128 SET state = ?2,
129 last_transition_event_log_seq = ?3,
130 terminal_at = ?4
131 WHERE rollout_id = ?1 AND state != ?2",
132 true,
133 false,
134 ),
135 RolloutState::Superseded => (
136 "UPDATE rollouts
137 SET state = ?2,
138 last_transition_event_log_seq = ?3,
139 superseded_at = ?4
140 WHERE rollout_id = ?1 AND state != ?2",
141 false,
142 true,
143 ),
144 _ => (
145 "UPDATE rollouts
146 SET state = ?2,
147 last_transition_event_log_seq = ?3
148 WHERE rollout_id = ?1 AND state != ?2",
149 false,
150 false,
151 ),
152 };
153 super::read(self.conn, |c| {
154 if bind_terminal || bind_superseded {
155 c.execute(sql, params![rollout_id, to_str, event_log_seq, at_rfc])
156 .context("UPDATE rollouts state (with timestamp side-effect)")
157 } else {
158 c.execute(sql, params![rollout_id, to_str, event_log_seq])
159 .context("UPDATE rollouts state")
160 }
161 })
162 }
163
164 pub fn set_current_wave(
168 &self,
169 rollout_id: &str,
170 wave: u32,
171 event_log_seq: Option<i64>,
172 ) -> Result<usize> {
173 super::read(self.conn, |c| {
174 c.execute(
175 "UPDATE rollouts
176 SET current_wave = ?2,
177 last_transition_event_log_seq = COALESCE(?3, last_transition_event_log_seq)
178 WHERE rollout_id = ?1 AND current_wave < ?2",
179 params![rollout_id, wave as i64, event_log_seq],
180 )
181 .context("set_current_wave")
182 })
183 }
184
185 pub fn current_wave(&self, rollout_id: &str) -> Result<Option<u32>> {
186 super::read(self.conn, |c| {
187 c.query_row(
188 "SELECT current_wave FROM rollouts WHERE rollout_id = ?1",
189 params![rollout_id],
190 |row| row.get::<_, i64>(0).map(|w| w as u32),
191 )
192 .optional()
193 .context("query rollouts.current_wave")
194 })
195 }
196
197 pub fn state(&self, rollout_id: &str) -> Result<Option<RolloutRow>> {
202 super::read(self.conn, |c| {
203 let row = c
204 .query_row(
205 "SELECT rollout_id, channel, target_ref, state, current_wave,
206 opened_event_log_seq, last_transition_event_log_seq,
207 opened_at, terminal_at, superseded_at
208 FROM rollouts
209 WHERE rollout_id = ?1",
210 params![rollout_id],
211 |row| -> rusqlite::Result<RolloutRowTuple> {
212 Ok((
213 row.get(0)?,
214 row.get(1)?,
215 row.get(2)?,
216 row.get(3)?,
217 row.get(4)?,
218 row.get(5)?,
219 row.get(6)?,
220 row.get(7)?,
221 row.get(8)?,
222 row.get(9)?,
223 ))
224 },
225 )
226 .optional()
227 .context("query rollouts.state")?;
228 row.map(|t| -> Result<RolloutRow> {
229 let parse_ts =
230 |raw: Option<String>, field: &str| -> Result<Option<DateTime<Utc>>> {
231 match raw {
232 Some(s) => Ok(Some(
233 s.parse::<DateTime<Utc>>()
234 .with_context(|| format!("parse rollouts.{field}: {s}"))?,
235 )),
236 None => Ok(None),
237 }
238 };
239 let state = RolloutState::from_db_str(&t.3).ok_or_else(|| {
240 anyhow::anyhow!("unknown rollouts.state value: {} (CHECK violation?)", t.3)
241 })?;
242 Ok(RolloutRow {
243 rollout_id: t.0,
244 channel: t.1,
245 target_ref: t.2,
246 state,
247 current_wave: t.4 as u32,
248 opened_event_log_seq: t.5,
249 last_transition_event_log_seq: t.6,
250 opened_at: t
251 .7
252 .parse::<DateTime<Utc>>()
253 .with_context(|| format!("parse rollouts.opened_at: {}", t.7))?,
254 terminal_at: parse_ts(t.8, "terminal_at")?,
255 superseded_at: parse_ts(t.9, "superseded_at")?,
256 })
257 })
258 .transpose()
259 })
260 }
261
262 pub fn list_active(&self) -> Result<GateRollouts> {
266 Ok(GateRollouts(self.list_filtered(false)?))
267 }
268
269 pub fn list_in_flight(&self) -> Result<UiRollouts> {
272 Ok(UiRollouts(self.list_filtered(true)?))
273 }
274
275 fn list_filtered(&self, exclude_terminal: bool) -> Result<Vec<ActiveRollout>> {
276 let sql = if exclude_terminal {
277 "SELECT rollout_id, channel, current_wave, opened_at, terminal_at
278 FROM rollouts
279 WHERE state NOT IN ('Superseded', 'Pruned', 'Terminal')
280 ORDER BY opened_at DESC, rollout_id"
281 } else {
282 "SELECT rollout_id, channel, current_wave, opened_at, terminal_at
283 FROM rollouts
284 WHERE state NOT IN ('Superseded', 'Pruned')
285 ORDER BY opened_at DESC, rollout_id"
286 };
287 let rows: Vec<(ActiveRollout, Option<String>)> = super::read(self.conn, |c| {
288 let mut stmt = c.prepare(sql)?;
289 let v = stmt
290 .query_map([], |row| {
291 let terminal_at_raw: Option<String> = row.get(4)?;
292 Ok((
293 ActiveRollout {
294 rollout_id: row.get(0)?,
295 channel: row.get(1)?,
296 current_wave: row.get::<_, i64>(2)? as u32,
297 created_at: row.get::<_, String>(3)?,
298 terminal_at: None,
299 },
300 terminal_at_raw,
301 ))
302 })?
303 .collect::<std::result::Result<Vec<_>, _>>()?;
304 Ok(v)
305 })?;
306 rows.into_iter()
307 .map(|(mut row, raw)| -> Result<ActiveRollout> {
308 row.terminal_at = match raw {
309 Some(s) => Some(
310 s.parse::<DateTime<Utc>>()
311 .with_context(|| format!("parse rollouts.terminal_at: {s}"))?,
312 ),
313 None => None,
314 };
315 Ok(row)
316 })
317 .collect()
318 }
319
320 pub fn prune_finished_rollouts(&self, max_age_hours: i64) -> Result<(usize, usize)> {
332 let cutoff_str = (Utc::now() - chrono::Duration::hours(max_age_hours)).to_rfc3339();
333 super::txn(self.conn, "prune_finished_rollouts", |t| {
334 let records_pruned = t
335 .execute(
336 "DELETE FROM host_rollout_records
337 WHERE rollout_id IN (
338 SELECT rollout_id FROM rollouts
339 WHERE state IN ('Superseded', 'Terminal', 'Failed', 'Reverted')
340 AND (
341 (superseded_at IS NOT NULL AND superseded_at < ?1)
342 OR (terminal_at IS NOT NULL AND terminal_at < ?1)
343 )
344 )",
345 params![&cutoff_str],
346 )
347 .context("DELETE host_rollout_records for finished rollouts")?;
348 let rollouts_pruned = t
349 .execute(
350 "DELETE FROM rollouts
351 WHERE state IN ('Superseded', 'Terminal', 'Failed', 'Reverted')
352 AND (
353 (superseded_at IS NOT NULL AND superseded_at < ?1)
354 OR (terminal_at IS NOT NULL AND terminal_at < ?1)
355 )",
356 params![&cutoff_str],
357 )
358 .context("DELETE rollouts (finished + past retention)")?;
359 Ok((records_pruned, rollouts_pruned))
360 })
361 }
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct ActiveRollout {
366 pub rollout_id: RolloutId,
367 pub channel: String,
368 pub current_wave: u32,
369 pub created_at: String,
370 pub terminal_at: Option<DateTime<Utc>>,
374}
375
376#[derive(Debug, Clone, Default)]
379pub struct GateRollouts(Vec<ActiveRollout>);
380
381#[derive(Debug, Clone, Default)]
383pub struct UiRollouts(Vec<ActiveRollout>);
384
385macro_rules! rollout_view_api {
386 ($t:ident) => {
387 impl $t {
388 pub fn iter(&self) -> std::slice::Iter<'_, ActiveRollout> {
389 self.0.iter()
390 }
391 pub fn len(&self) -> usize {
392 self.0.len()
393 }
394 pub fn is_empty(&self) -> bool {
395 self.0.is_empty()
396 }
397 pub fn into_inner(self) -> Vec<ActiveRollout> {
398 self.0
399 }
400 }
401 impl IntoIterator for $t {
402 type Item = ActiveRollout;
403 type IntoIter = std::vec::IntoIter<ActiveRollout>;
404 fn into_iter(self) -> Self::IntoIter {
405 self.0.into_iter()
406 }
407 }
408 impl<'a> IntoIterator for &'a $t {
409 type Item = &'a ActiveRollout;
410 type IntoIter = std::slice::Iter<'a, ActiveRollout>;
411 fn into_iter(self) -> Self::IntoIter {
412 self.0.iter()
413 }
414 }
415 };
416}
417rollout_view_api!(GateRollouts);
418rollout_view_api!(UiRollouts);
419
420impl GateRollouts {
421 pub fn into_ui(self) -> UiRollouts {
424 UiRollouts(
425 self.0
426 .into_iter()
427 .filter(|r| r.terminal_at.is_none())
428 .collect(),
429 )
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use crate::db::Db;
437
438 fn fresh_db() -> Db {
439 let db = Db::open_in_memory().unwrap();
440 db.migrate().unwrap();
441 db
442 }
443
444 fn t0() -> DateTime<Utc> {
445 use chrono::TimeZone;
446 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
447 }
448
449 #[test]
450 fn record_rollout_opened_inserts_first_one_as_opening() {
451 let db = fresh_db();
452 db.rollouts()
453 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
454 .unwrap();
455 let row = db.rollouts().state("r1").unwrap().expect("rollout present");
456 assert_eq!(row.state, RolloutState::Opening);
457 assert_eq!(row.target_ref, "ref-1");
458 assert_eq!(row.channel, "stable");
459 assert!(row.superseded_at.is_none());
460 }
461
462 #[test]
469 fn record_rollout_opened_is_pure_insert() {
470 let db = fresh_db();
471 db.rollouts()
472 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
473 .unwrap();
474 db.rollouts()
475 .record_rollout_opened(
476 "r2",
477 "stable",
478 "ref-2",
479 t0() + chrono::Duration::seconds(1),
480 None,
481 )
482 .unwrap();
483 assert_eq!(
488 db.rollouts().state("r1").unwrap().unwrap().state,
489 RolloutState::Opening
490 );
491 assert_eq!(
492 db.rollouts().state("r2").unwrap().unwrap().state,
493 RolloutState::Opening
494 );
495 }
496
497 #[test]
498 fn record_rollout_opened_does_not_supersede_across_channels() {
499 let db = fresh_db();
500 db.rollouts()
501 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
502 .unwrap();
503 db.rollouts()
504 .record_rollout_opened("r2", "edge-slow", "ref-2", t0(), None)
505 .unwrap();
506 assert_eq!(
507 db.rollouts().state("r1").unwrap().unwrap().state,
508 RolloutState::Opening
509 );
510 assert_eq!(
511 db.rollouts().state("r2").unwrap().unwrap().state,
512 RolloutState::Opening
513 );
514 }
515
516 #[test]
517 fn state_returns_none_for_unknown_rollout() {
518 let db = fresh_db();
519 assert!(db.rollouts().state("ghost").unwrap().is_none());
520 }
521
522 #[test]
523 fn record_rollout_transition_stamps_terminal() {
524 let db = fresh_db();
525 db.rollouts()
526 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
527 .unwrap();
528 let n = db
529 .rollouts()
530 .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
531 .unwrap();
532 assert_eq!(n, 1);
533 let row = db.rollouts().state("r1").unwrap().unwrap();
534 assert_eq!(row.state, RolloutState::Terminal);
535 assert!(row.terminal_at.is_some());
536 let n2 = db
538 .rollouts()
539 .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
540 .unwrap();
541 assert_eq!(n2, 0);
542 }
543
544 #[test]
545 fn set_current_wave_is_monotonic_no_op_on_backwards() {
546 let db = fresh_db();
547 db.rollouts()
548 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
549 .unwrap();
550 assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(0));
551 let n = db.rollouts().set_current_wave("r1", 1, None).unwrap();
552 assert_eq!(n, 1);
553 assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(1));
554 let n = db.rollouts().set_current_wave("r1", 0, None).unwrap();
556 assert_eq!(n, 0);
557 assert_eq!(db.rollouts().current_wave("r1").unwrap(), Some(1));
558 }
559
560 #[test]
566 fn terminal_stays_in_list_active_but_drops_from_list_in_flight() {
567 let db = fresh_db();
568 db.rollouts()
569 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
570 .unwrap();
571 db.rollouts()
572 .record_rollout_opened("r2", "edge", "ref-2", t0(), None)
573 .unwrap();
574 assert_eq!(db.rollouts().list_active().unwrap().len(), 2);
575 assert_eq!(db.rollouts().list_in_flight().unwrap().len(), 2);
576
577 db.rollouts()
578 .record_rollout_transition("r1", RolloutState::Terminal, t0(), None)
579 .unwrap();
580
581 let active = db.rollouts().list_active().unwrap();
582 assert_eq!(
583 active.len(),
584 2,
585 "list_active must include terminal rollouts so gates can see converged predecessors"
586 );
587 let r1_active = active
588 .iter()
589 .find(|r| r.rollout_id.as_str() == "r1")
590 .unwrap();
591 assert!(r1_active.terminal_at.is_some());
592
593 let in_flight = db.rollouts().list_in_flight().unwrap().into_inner();
594 assert_eq!(in_flight.len(), 1);
595 assert_eq!(in_flight[0].rollout_id.as_str(), "r2");
596 }
597
598 #[test]
602 fn superseded_dropped_from_both_list_active_and_list_in_flight() {
603 let db = fresh_db();
604 db.rollouts()
605 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
606 .unwrap();
607 db.rollouts()
612 .record_rollout_transition(
613 "r1",
614 RolloutState::Superseded,
615 t0() + chrono::Duration::seconds(1),
616 None,
617 )
618 .unwrap();
619 for rid in db.rollouts().list_active().unwrap().iter() {
620 assert_ne!(rid.rollout_id.as_str(), "r1");
621 }
622 for rid in db.rollouts().list_in_flight().unwrap().iter() {
623 assert_ne!(rid.rollout_id.as_str(), "r1");
624 }
625 }
626
627 #[test]
629 fn gate_rollouts_into_ui_filters_terminal() {
630 let db = fresh_db();
631 db.rollouts()
632 .record_rollout_opened("r-active", "stable", "ref-a", t0(), None)
633 .unwrap();
634 db.rollouts()
635 .record_rollout_opened("r-converged", "edge", "ref-c", t0(), None)
636 .unwrap();
637 db.rollouts()
638 .record_rollout_transition("r-converged", RolloutState::Terminal, t0(), None)
639 .unwrap();
640 let gate = db.rollouts().list_active().unwrap();
641 assert_eq!(gate.len(), 2);
642 let ui = gate.into_ui();
643 assert_eq!(ui.len(), 1);
644 assert_eq!(ui.into_inner()[0].rollout_id.as_str(), "r-active");
645 }
646
647 #[test]
652 fn gate_and_ui_rollouts_are_distinct_types() {
653 let db = fresh_db();
654 db.rollouts()
655 .record_rollout_opened("r1", "stable", "ref-1", t0(), None)
656 .unwrap();
657 let _gate: super::GateRollouts = db.rollouts().list_active().unwrap();
658 let _ui: super::UiRollouts = db.rollouts().list_in_flight().unwrap();
659 }
660
661 #[test]
665 fn prune_finished_rollouts_drops_old_finished_keeps_recent_and_in_flight() {
666 let db = fresh_db();
667 let now = chrono::Utc::now();
668 let old = now - chrono::Duration::days(120);
669 let recent = now - chrono::Duration::days(30);
670
671 db.rollouts()
673 .record_rollout_opened("r-active", "stable", "ref-a", now, None)
674 .unwrap();
675
676 db.rollouts()
682 .record_rollout_opened("r-old-superseded", "edge", "ref-os", now, None)
683 .unwrap();
684 db.rollouts()
685 .record_rollout_opened("r-old-superseder", "edge", "ref-osr", now, None)
686 .unwrap();
687 db.rollouts()
688 .record_rollout_transition("r-old-superseded", RolloutState::Superseded, old, None)
689 .unwrap();
690
691 db.rollouts()
693 .record_rollout_opened("r-recent-terminal", "preview", "ref-rt", now, None)
694 .unwrap();
695 db.rollouts()
696 .record_rollout_transition("r-recent-terminal", RolloutState::Terminal, recent, None)
697 .unwrap();
698
699 db.rollouts()
701 .record_rollout_opened("r-old-terminal", "preview-old", "ref-ot", now, None)
702 .unwrap();
703 db.rollouts()
704 .record_rollout_transition("r-old-terminal", RolloutState::Terminal, old, None)
705 .unwrap();
706
707 for rid in [
709 "r-active",
710 "r-old-superseded",
711 "r-recent-terminal",
712 "r-old-terminal",
713 ] {
714 let row = nixfleet_state_machine::HostRolloutState::new_pending(
715 rid.into(),
716 "host-x".to_string(),
717 "stable".to_string(),
718 format!("closure-{rid}"),
719 now,
720 now + chrono::Duration::minutes(5),
721 );
722 db.host_rollout_records().upsert(&row).unwrap();
723 }
724
725 let (records_pruned, rollouts_pruned) =
726 db.rollouts().prune_finished_rollouts(24 * 90).unwrap();
727 assert_eq!(rollouts_pruned, 2, "r-old-superseded + r-old-terminal");
728 assert_eq!(records_pruned, 2);
729
730 let active = db.rollouts().list_active().unwrap();
732 let kept: Vec<&str> = active.iter().map(|r| r.rollout_id.as_str()).collect();
733 assert!(kept.contains(&"r-active"));
734 assert!(db.rollouts().state("r-recent-terminal").unwrap().is_some());
735 assert!(db.rollouts().state("r-old-superseded").unwrap().is_none());
736 assert!(db.rollouts().state("r-old-terminal").unwrap().is_none());
737 }
738}