nixfleet_proto/
fleet_resolved.rs

1//! `fleet.resolved.json` types. Produced by CI's Nix eval, consumed by CP and
2//! (fallback path) agents; JCS bytes must round-trip identically across Nix + Rust.
3
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_with::skip_serializing_none;
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10#[serde(rename_all = "camelCase")]
11pub struct FleetResolved {
12    pub schema_version: u32,
13    pub hosts: HashMap<String, Host>,
14    pub channels: HashMap<String, Channel>,
15    #[serde(default)]
16    pub rollout_policies: HashMap<String, RolloutPolicy>,
17    pub waves: HashMap<String, Vec<Wave>>,
18    #[serde(default)]
19    pub edges: Vec<Edge>,
20    /// Cross-channel ordering: a `before` channel must reach Converged before
21    /// any new rollout opens on the `after` channel. Within-channel coordination
22    /// uses `edges`. Cycles rejected at mkFleet eval time.
23    #[serde(default)]
24    pub channel_edges: Vec<ChannelEdge>,
25    #[serde(default)]
26    pub disruption_budgets: Vec<DisruptionBudget>,
27    pub meta: Meta,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
31#[serde(rename_all = "camelCase")]
32pub struct Host {
33    pub system: String,
34    pub tags: Vec<String>,
35    pub channel: String,
36    #[serde(default)]
37    pub closure_hash: Option<String>,
38    #[serde(default)]
39    pub pubkey: Option<String>,
40    /// Operator-declared commit pin. Resolved at mkFleet eval time from the
41    /// most-specific declaration in the host > tag > channel chain; populated
42    /// only when the effective pin is non-empty AND unexpired. When present,
43    /// `nixfleet-release` builds from `pin.commit` instead of the release commit.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub pin: Option<Pin>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
49#[serde(rename_all = "camelCase")]
50pub struct Pin {
51    /// Source-control rev the host's closure should be built from. Opaque to
52    /// the framework; typically a 40-char SHA but short SHAs + tag names work.
53    pub commit: String,
54    /// Free-form operator note. Not parsed; surfaced verbatim in
55    /// `nixfleet status` + dashboards.
56    pub reason: String,
57    /// Hard expiry. Expired pins are filtered at mkFleet eval time, so when
58    /// present here the artifact already passed the filter at signing time -
59    /// informational for operators reading the JSON.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub expires_at: Option<DateTime<Utc>>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65#[serde(rename_all = "camelCase")]
66pub struct Channel {
67    pub rollout_policy: String,
68    pub reconcile_interval_minutes: u32,
69    /// MINUTES despite missing `_minutes` suffix (kept for wire-compat).
70    /// Convert via [`Channel::freshness_window_duration`].
71    pub freshness_window: u32,
72    pub signing_interval_minutes: u32,
73}
74
75impl Channel {
76    /// Helper that converts minutes -> Duration. Avoids the
77    /// `Duration::from_secs(raw)` 60× landmine.
78    pub fn freshness_window_duration(&self) -> std::time::Duration {
79        std::time::Duration::from_secs(self.freshness_window as u64 * 60)
80    }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84#[serde(rename_all = "camelCase")]
85pub struct RolloutPolicy {
86    pub strategy: String,
87    pub waves: Vec<PolicyWave>,
88    #[serde(default)]
89    pub health_gate: HealthGate,
90    pub on_health_failure: OnHealthFailure,
91}
92
93/// Recovery action when a host fails its health gate.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(rename_all = "kebab-case")]
96pub enum OnHealthFailure {
97    /// Stop advancing; failed host stays Failed pending operator action.
98    Halt,
99    /// Roll the failed host back to its previous closure, then halt.
100    RollbackAndHalt,
101}
102
103impl std::fmt::Display for OnHealthFailure {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        let s = match self {
106            OnHealthFailure::Halt => "halt",
107            OnHealthFailure::RollbackAndHalt => "rollback-and-halt",
108        };
109        f.write_str(s)
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct PolicyWave {
116    pub selector: Selector,
117    pub soak_minutes: u32,
118}
119
120#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash)]
121#[serde(rename_all = "camelCase")]
122pub struct Selector {
123    #[serde(default)]
124    pub tags: Vec<String>,
125    #[serde(default)]
126    pub tags_any: Vec<String>,
127    #[serde(default)]
128    pub hosts: Vec<String>,
129    #[serde(default)]
130    pub channel: Option<String>,
131    #[serde(default)]
132    pub all: bool,
133}
134
135impl Selector {
136    /// Match a single host. Mirrors `lib/mk-fleet.nix:resolveSelector`: any
137    /// rule that fires (all / hosts / channel / tags-all / tags-any) matches.
138    /// Sub-selector composition (and / not) is mkFleet-only and not exposed
139    /// in the wire format.
140    pub fn matches(&self, host_name: &str, host: &Host) -> bool {
141        if self.all {
142            return true;
143        }
144        if !self.hosts.is_empty() && self.hosts.iter().any(|h| h == host_name) {
145            return true;
146        }
147        if let Some(ch) = &self.channel
148            && &host.channel == ch
149        {
150            return true;
151        }
152        if !self.tags.is_empty() && self.tags.iter().all(|t| host.tags.contains(t)) {
153            return true;
154        }
155        if !self.tags_any.is_empty() && self.tags_any.iter().any(|t| host.tags.contains(t)) {
156            return true;
157        }
158        false
159    }
160
161    /// Resolve to matching host names. Order follows the input iterator;
162    /// callers that need a stable ordering should sort.
163    pub fn resolve<'a, I: IntoIterator<Item = (&'a String, &'a Host)>>(
164        &self,
165        hosts: I,
166    ) -> Vec<String> {
167        hosts
168            .into_iter()
169            .filter(|(n, h)| self.matches(n, h))
170            .map(|(n, _)| n.clone())
171            .collect()
172    }
173
174    /// Canonical short string for log lines, metric labels, and any consumer
175    /// that needs to refer to a `Selector` by name. Sorted-list semantics keep
176    /// rendering stable across HashMap iteration orders.
177    pub fn summary(&self) -> String {
178        if self.all {
179            return "all".to_string();
180        }
181        if let Some(channel) = &self.channel {
182            return format!("channel:{channel}");
183        }
184        if !self.tags.is_empty() {
185            let mut t = self.tags.clone();
186            t.sort();
187            return format!("tags:{}", t.join(","));
188        }
189        if !self.tags_any.is_empty() {
190            let mut t = self.tags_any.clone();
191            t.sort();
192            return format!("tags_any:{}", t.join(","));
193        }
194        if !self.hosts.is_empty() {
195            let mut h = self.hosts.clone();
196            h.sort();
197            return format!("hosts:{}", h.join(","));
198        }
199        "unknown".to_string()
200    }
201}
202
203// Nix emits `"healthGate": {}` when no inner constraints set; the
204// skip_serializing_none below preserves that empty-object shape for JCS parity.
205#[skip_serializing_none]
206#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
207#[serde(rename_all = "camelCase")]
208pub struct HealthGate {
209    #[serde(default)]
210    pub systemd_failed_units: Option<SystemdFailedUnits>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
214#[serde(rename_all = "camelCase")]
215pub struct SystemdFailedUnits {
216    pub max: u32,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
220#[serde(rename_all = "camelCase")]
221pub struct Wave {
222    pub hosts: Vec<String>,
223    pub soak_minutes: u32,
224}
225
226/// Per-host DAG edge: `gated` host dispatches only once `gates` host reaches
227/// terminal-for-ordering (Soaked / Converged) within the same rollout. Both
228/// hosts must be on the same channel; cross-channel ordering is `ChannelEdge`'s
229/// job. Hard cutover from pre-rename `before`/`after` field names - those
230/// bytes will not parse.
231#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
232#[serde(rename_all = "camelCase")]
233pub struct Edge {
234    /// Host whose dispatch is held until `gates` completes.
235    pub gated: String,
236    /// Host that must reach Soaked/Converged before `gated` can dispatch.
237    pub gates: String,
238    #[serde(default)]
239    pub reason: Option<String>,
240}
241
242/// Cross-channel ordering edge. The `gates` channel's most-recent rollout
243/// must reach terminal `converged` before any new rollout opens on `gated`.
244/// If `gates` has never had a rollout, the gate is open. Validated at
245/// mkFleet eval time: both channels must exist, no cycles. Pre-rename
246/// `before`/`after` wire keys accepted via serde alias so older signed
247/// bytes still verify on upgraded CPs.
248#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
249#[serde(rename_all = "camelCase")]
250pub struct ChannelEdge {
251    /// Predecessor channel. Was `before`; kept as serde alias.
252    #[serde(alias = "before")]
253    pub gates: String,
254    /// Dependent channel, held until `gates` converges. Was `after`.
255    #[serde(alias = "after")]
256    pub gated: String,
257    #[serde(default)]
258    pub reason: Option<String>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
262#[serde(rename_all = "camelCase")]
263pub struct DisruptionBudget {
264    /// Tag-driven selector resolved at reconcile time so tag membership can
265    /// change without re-signing fleet.resolved.
266    pub selector: Selector,
267    #[serde(default)]
268    pub max_in_flight: Option<u32>,
269    #[serde(default)]
270    pub max_in_flight_pct: Option<u32>,
271}
272
273// LOADBEARING: signed_at + ci_commit serialize as `null` (no skip) to match
274// the Nix evaluator's shape - JCS byte-identity depends on it. Only
275// `signature_algorithm` is genuinely optional in the wire format.
276#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
277#[serde(rename_all = "camelCase")]
278pub struct Meta {
279    pub schema_version: u32,
280    #[serde(default)]
281    pub signed_at: Option<DateTime<Utc>>,
282    #[serde(default)]
283    pub ci_commit: Option<String>,
284    /// Absent ≡ "ed25519" at schemaVersion=1 (CONTRACTS §V Pattern A). Pre-stamp
285    /// eval emits absent; `stamp_meta` populates at signing time.
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub signature_algorithm: Option<String>,
288}
289
290impl Meta {
291    /// `signature_algorithm` with the `absent ≡ "ed25519"` rule applied.
292    pub fn signature_algorithm_or_default(&self) -> &str {
293        self.signature_algorithm.as_deref().unwrap_or("ed25519")
294    }
295}
296
297/// Canonical strategy string for the all-at-once rollout shape.
298///
299/// Operators declare it terse: `rolloutPolicies.all-at-once.strategy =
300/// "all-at-once";` — typically with no explicit `waves` because the
301/// strategy's semantic name is "ship everywhere, no staging." Without
302/// post-deserialization normalization the applier's per-wave soak
303/// lookup falls through to `DEFAULT_SOAK_MINUTES = 60`, contradicting
304/// the strategy.
305pub const STRATEGY_ALL_AT_ONCE: &str = "all-at-once";
306
307/// Post-deserialization normalization for `FleetResolved.rollout_policies`.
308///
309/// Synthesizes an implicit single match-all wave (soak_minutes=0) for any
310/// `all-at-once` policy declared without explicit waves. Operator's
311/// most-common form is `{strategy = "all-at-once";}` with empty waves;
312/// without this synthesis, the applier's per-wave soak lookup
313/// (`crates/nixfleet-control-plane/src/runtime/applier.rs:198` +
314/// `:655`) returns None and falls back to `DEFAULT_SOAK_MINUTES = 60`,
315/// silently producing a 1-hour soak hold on a strategy whose semantic
316/// name is "ship everywhere with no staging."
317///
318/// Strategies other than `all-at-once` declared without explicit waves
319/// are left untouched — a canary or custom strategy without waves IS
320/// malformed, and the applier's `DEFAULT_SOAK_MINUTES` fallback (with
321/// its warn log) remains the right behaviour as a safety net.
322///
323/// Invariant preserved: policies that already declare waves are not
324/// modified; operator's explicit declaration always wins.
325///
326/// Called from `nixfleet_reconciler::verify::verify_artifact`
327/// post-deserialization so every consumer of `Verified<FleetResolved>`
328/// (CP reducer + agent manifest cache) sees the canonical shape. Signed
329/// bytes are not modified — the signature covers the wire form (empty
330/// waves), the in-memory form is post-normalization.
331pub fn normalize_rollout_policies(fleet: &mut FleetResolved) {
332    for policy in fleet.rollout_policies.values_mut() {
333        if policy.strategy == STRATEGY_ALL_AT_ONCE && policy.waves.is_empty() {
334            policy.waves.push(PolicyWave {
335                selector: Selector {
336                    all: true,
337                    ..Default::default()
338                },
339                soak_minutes: 0,
340            });
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348
349    /// Pre-rename fleet.resolved bytes used `before`/`after` keys. New CPs
350    /// must accept those via serde alias, otherwise an upgraded CP would
351    /// reject any signed artifact in the channel-refs window.
352    #[test]
353    fn channel_edge_accepts_legacy_before_after_wire_format() {
354        let legacy = r#"{"before":"edge","after":"stable","reason":"test canary"}"#;
355        let parsed: ChannelEdge = serde_json::from_str(legacy).unwrap();
356        assert_eq!(parsed.gates, "edge");
357        assert_eq!(parsed.gated, "stable");
358        assert_eq!(parsed.reason.as_deref(), Some("test canary"));
359    }
360
361    /// New emitters write `gates`/`gated`; round-trip must be lossless.
362    #[test]
363    fn channel_edge_canonical_wire_format_round_trips() {
364        let edge = ChannelEdge {
365            gates: "edge".into(),
366            gated: "stable".into(),
367            reason: Some("canary".into()),
368        };
369        let bytes = serde_json::to_string(&edge).unwrap();
370        assert!(
371            bytes.contains("\"gates\":\"edge\""),
372            "wire must use canonical 'gates' field; got {bytes}"
373        );
374        assert!(
375            bytes.contains("\"gated\":\"stable\""),
376            "wire must use canonical 'gated' field; got {bytes}"
377        );
378        let back: ChannelEdge = serde_json::from_str(&bytes).unwrap();
379        assert_eq!(back, edge);
380    }
381
382    #[test]
383    fn selector_summary_priority_and_sorted_lists() {
384        let s = Selector {
385            all: true,
386            ..Default::default()
387        };
388        assert_eq!(s.summary(), "all");
389
390        let s = Selector {
391            channel: Some("stable".into()),
392            ..Default::default()
393        };
394        assert_eq!(s.summary(), "channel:stable");
395
396        let s = Selector {
397            tags: vec!["server".into(), "prod".into()],
398            ..Default::default()
399        };
400        assert_eq!(s.summary(), "tags:prod,server");
401
402        let s = Selector {
403            tags_any: vec!["b".into(), "a".into()],
404            ..Default::default()
405        };
406        assert_eq!(s.summary(), "tags_any:a,b");
407
408        let s = Selector {
409            hosts: vec!["zzz".into(), "aaa".into()],
410            ..Default::default()
411        };
412        assert_eq!(s.summary(), "hosts:aaa,zzz");
413
414        // Explicit "unknown" sentinel keeps a Prometheus label queryable.
415        assert_eq!(Selector::default().summary(), "unknown");
416    }
417
418    fn fleet_with_policy(policy_name: &str, policy: RolloutPolicy) -> FleetResolved {
419        let mut rollout_policies = HashMap::new();
420        rollout_policies.insert(policy_name.to_string(), policy);
421        FleetResolved {
422            schema_version: 1,
423            hosts: HashMap::new(),
424            channels: HashMap::new(),
425            rollout_policies,
426            waves: HashMap::new(),
427            edges: Vec::new(),
428            channel_edges: Vec::new(),
429            disruption_budgets: Vec::new(),
430            meta: Meta {
431                schema_version: 1,
432                signed_at: None,
433                ci_commit: None,
434                signature_algorithm: None,
435            },
436        }
437    }
438
439    #[test]
440    fn normalize_synthesizes_implicit_wave_for_all_at_once_without_waves() {
441        // The terse all-at-once form
442        // (`rolloutPolicies.all-at-once = {strategy = "all-at-once";};`,
443        // no explicit waves) must produce one match-all zero-soak
444        // wave at normalization time; otherwise `waves.get(wave_index)`
445        // returns None and the applier falls through to
446        // `DEFAULT_SOAK_MINUTES = 60`, imposing a 1-hour hold on a
447        // strategy whose semantic name is "ship everywhere with no
448        // staging."
449        let mut fleet = fleet_with_policy(
450            "all-at-once",
451            RolloutPolicy {
452                strategy: STRATEGY_ALL_AT_ONCE.into(),
453                waves: Vec::new(),
454                health_gate: HealthGate::default(),
455                on_health_failure: OnHealthFailure::Halt,
456            },
457        );
458
459        normalize_rollout_policies(&mut fleet);
460
461        let policy = fleet
462            .rollout_policies
463            .get("all-at-once")
464            .expect("policy present");
465        assert_eq!(policy.waves.len(), 1, "implicit wave synthesized");
466        assert!(policy.waves[0].selector.all, "match-all selector");
467        assert_eq!(
468            policy.waves[0].soak_minutes, 0,
469            "zero soak — all-at-once means no staging hold",
470        );
471    }
472
473    #[test]
474    fn normalize_preserves_explicit_waves_on_all_at_once() {
475        // Operator explicitly declared waves on an all-at-once policy:
476        // normalization MUST NOT clobber the declaration. The
477        // declared shape wins — synthesis only fires for the empty
478        // case.
479        let explicit_wave = PolicyWave {
480            selector: Selector {
481                hosts: vec!["web-01".into()],
482                ..Default::default()
483            },
484            soak_minutes: 15,
485        };
486        let mut fleet = fleet_with_policy(
487            "all-at-once",
488            RolloutPolicy {
489                strategy: STRATEGY_ALL_AT_ONCE.into(),
490                waves: vec![explicit_wave.clone()],
491                health_gate: HealthGate::default(),
492                on_health_failure: OnHealthFailure::Halt,
493            },
494        );
495
496        normalize_rollout_policies(&mut fleet);
497
498        let policy = fleet
499            .rollout_policies
500            .get("all-at-once")
501            .expect("policy present");
502        assert_eq!(policy.waves.len(), 1);
503        assert_eq!(policy.waves[0], explicit_wave);
504    }
505
506    #[test]
507    fn normalize_does_not_touch_canary_without_waves() {
508        // Canary without waves IS malformed (canary requires staging).
509        // Normalization leaves it alone so the applier's
510        // DEFAULT_SOAK_MINUTES fallback fires with the warn log it's
511        // designed to emit. This pins the safety-net semantic.
512        let mut fleet = fleet_with_policy(
513            "canary",
514            RolloutPolicy {
515                strategy: "canary".into(),
516                waves: Vec::new(),
517                health_gate: HealthGate::default(),
518                on_health_failure: OnHealthFailure::Halt,
519            },
520        );
521
522        normalize_rollout_policies(&mut fleet);
523
524        let policy = fleet
525            .rollout_policies
526            .get("canary")
527            .expect("policy present");
528        assert!(
529            policy.waves.is_empty(),
530            "normalization is all-at-once-only; canary stays as declared",
531        );
532    }
533
534    #[test]
535    fn normalize_handles_empty_rollout_policies_map() {
536        let mut fleet = FleetResolved {
537            schema_version: 1,
538            hosts: HashMap::new(),
539            channels: HashMap::new(),
540            rollout_policies: HashMap::new(),
541            waves: HashMap::new(),
542            edges: Vec::new(),
543            channel_edges: Vec::new(),
544            disruption_budgets: Vec::new(),
545            meta: Meta {
546                schema_version: 1,
547                signed_at: None,
548                ci_commit: None,
549                signature_algorithm: None,
550            },
551        };
552        normalize_rollout_policies(&mut fleet);
553        assert!(fleet.rollout_policies.is_empty());
554    }
555}