1use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_with::skip_serializing_none;
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10#[serde(rename_all = "camelCase")]
11pub struct FleetResolved {
12 pub schema_version: u32,
13 pub hosts: HashMap<String, Host>,
14 pub channels: HashMap<String, Channel>,
15 #[serde(default)]
16 pub rollout_policies: HashMap<String, RolloutPolicy>,
17 pub waves: HashMap<String, Vec<Wave>>,
18 #[serde(default)]
19 pub edges: Vec<Edge>,
20 #[serde(default)]
24 pub channel_edges: Vec<ChannelEdge>,
25 #[serde(default)]
26 pub disruption_budgets: Vec<DisruptionBudget>,
27 pub meta: Meta,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
31#[serde(rename_all = "camelCase")]
32pub struct Host {
33 pub system: String,
34 pub tags: Vec<String>,
35 pub channel: String,
36 #[serde(default)]
37 pub closure_hash: Option<String>,
38 #[serde(default)]
39 pub pubkey: Option<String>,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub pin: Option<Pin>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
49#[serde(rename_all = "camelCase")]
50pub struct Pin {
51 pub commit: String,
54 pub reason: String,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub expires_at: Option<DateTime<Utc>>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65#[serde(rename_all = "camelCase")]
66pub struct Channel {
67 pub rollout_policy: String,
68 pub reconcile_interval_minutes: u32,
69 pub freshness_window: u32,
72 pub signing_interval_minutes: u32,
73}
74
75impl Channel {
76 pub fn freshness_window_duration(&self) -> std::time::Duration {
79 std::time::Duration::from_secs(self.freshness_window as u64 * 60)
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84#[serde(rename_all = "camelCase")]
85pub struct RolloutPolicy {
86 pub strategy: String,
87 pub waves: Vec<PolicyWave>,
88 #[serde(default)]
89 pub health_gate: HealthGate,
90 pub on_health_failure: OnHealthFailure,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(rename_all = "kebab-case")]
96pub enum OnHealthFailure {
97 Halt,
99 RollbackAndHalt,
101}
102
103impl std::fmt::Display for OnHealthFailure {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 let s = match self {
106 OnHealthFailure::Halt => "halt",
107 OnHealthFailure::RollbackAndHalt => "rollback-and-halt",
108 };
109 f.write_str(s)
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct PolicyWave {
116 pub selector: Selector,
117 pub soak_minutes: u32,
118}
119
120#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash)]
121#[serde(rename_all = "camelCase")]
122pub struct Selector {
123 #[serde(default)]
124 pub tags: Vec<String>,
125 #[serde(default)]
126 pub tags_any: Vec<String>,
127 #[serde(default)]
128 pub hosts: Vec<String>,
129 #[serde(default)]
130 pub channel: Option<String>,
131 #[serde(default)]
132 pub all: bool,
133}
134
135impl Selector {
136 pub fn matches(&self, host_name: &str, host: &Host) -> bool {
141 if self.all {
142 return true;
143 }
144 if !self.hosts.is_empty() && self.hosts.iter().any(|h| h == host_name) {
145 return true;
146 }
147 if let Some(ch) = &self.channel
148 && &host.channel == ch
149 {
150 return true;
151 }
152 if !self.tags.is_empty() && self.tags.iter().all(|t| host.tags.contains(t)) {
153 return true;
154 }
155 if !self.tags_any.is_empty() && self.tags_any.iter().any(|t| host.tags.contains(t)) {
156 return true;
157 }
158 false
159 }
160
161 pub fn resolve<'a, I: IntoIterator<Item = (&'a String, &'a Host)>>(
164 &self,
165 hosts: I,
166 ) -> Vec<String> {
167 hosts
168 .into_iter()
169 .filter(|(n, h)| self.matches(n, h))
170 .map(|(n, _)| n.clone())
171 .collect()
172 }
173
174 pub fn summary(&self) -> String {
178 if self.all {
179 return "all".to_string();
180 }
181 if let Some(channel) = &self.channel {
182 return format!("channel:{channel}");
183 }
184 if !self.tags.is_empty() {
185 let mut t = self.tags.clone();
186 t.sort();
187 return format!("tags:{}", t.join(","));
188 }
189 if !self.tags_any.is_empty() {
190 let mut t = self.tags_any.clone();
191 t.sort();
192 return format!("tags_any:{}", t.join(","));
193 }
194 if !self.hosts.is_empty() {
195 let mut h = self.hosts.clone();
196 h.sort();
197 return format!("hosts:{}", h.join(","));
198 }
199 "unknown".to_string()
200 }
201}
202
203#[skip_serializing_none]
206#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
207#[serde(rename_all = "camelCase")]
208pub struct HealthGate {
209 #[serde(default)]
210 pub systemd_failed_units: Option<SystemdFailedUnits>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
214#[serde(rename_all = "camelCase")]
215pub struct SystemdFailedUnits {
216 pub max: u32,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
220#[serde(rename_all = "camelCase")]
221pub struct Wave {
222 pub hosts: Vec<String>,
223 pub soak_minutes: u32,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
232#[serde(rename_all = "camelCase")]
233pub struct Edge {
234 pub gated: String,
236 pub gates: String,
238 #[serde(default)]
239 pub reason: Option<String>,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
249#[serde(rename_all = "camelCase")]
250pub struct ChannelEdge {
251 #[serde(alias = "before")]
253 pub gates: String,
254 #[serde(alias = "after")]
256 pub gated: String,
257 #[serde(default)]
258 pub reason: Option<String>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
262#[serde(rename_all = "camelCase")]
263pub struct DisruptionBudget {
264 pub selector: Selector,
267 #[serde(default)]
268 pub max_in_flight: Option<u32>,
269 #[serde(default)]
270 pub max_in_flight_pct: Option<u32>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
277#[serde(rename_all = "camelCase")]
278pub struct Meta {
279 pub schema_version: u32,
280 #[serde(default)]
281 pub signed_at: Option<DateTime<Utc>>,
282 #[serde(default)]
283 pub ci_commit: Option<String>,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub signature_algorithm: Option<String>,
288}
289
290impl Meta {
291 pub fn signature_algorithm_or_default(&self) -> &str {
293 self.signature_algorithm.as_deref().unwrap_or("ed25519")
294 }
295}
296
297pub const STRATEGY_ALL_AT_ONCE: &str = "all-at-once";
306
307pub fn normalize_rollout_policies(fleet: &mut FleetResolved) {
332 for policy in fleet.rollout_policies.values_mut() {
333 if policy.strategy == STRATEGY_ALL_AT_ONCE && policy.waves.is_empty() {
334 policy.waves.push(PolicyWave {
335 selector: Selector {
336 all: true,
337 ..Default::default()
338 },
339 soak_minutes: 0,
340 });
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348
349 #[test]
353 fn channel_edge_accepts_legacy_before_after_wire_format() {
354 let legacy = r#"{"before":"edge","after":"stable","reason":"test canary"}"#;
355 let parsed: ChannelEdge = serde_json::from_str(legacy).unwrap();
356 assert_eq!(parsed.gates, "edge");
357 assert_eq!(parsed.gated, "stable");
358 assert_eq!(parsed.reason.as_deref(), Some("test canary"));
359 }
360
361 #[test]
363 fn channel_edge_canonical_wire_format_round_trips() {
364 let edge = ChannelEdge {
365 gates: "edge".into(),
366 gated: "stable".into(),
367 reason: Some("canary".into()),
368 };
369 let bytes = serde_json::to_string(&edge).unwrap();
370 assert!(
371 bytes.contains("\"gates\":\"edge\""),
372 "wire must use canonical 'gates' field; got {bytes}"
373 );
374 assert!(
375 bytes.contains("\"gated\":\"stable\""),
376 "wire must use canonical 'gated' field; got {bytes}"
377 );
378 let back: ChannelEdge = serde_json::from_str(&bytes).unwrap();
379 assert_eq!(back, edge);
380 }
381
382 #[test]
383 fn selector_summary_priority_and_sorted_lists() {
384 let s = Selector {
385 all: true,
386 ..Default::default()
387 };
388 assert_eq!(s.summary(), "all");
389
390 let s = Selector {
391 channel: Some("stable".into()),
392 ..Default::default()
393 };
394 assert_eq!(s.summary(), "channel:stable");
395
396 let s = Selector {
397 tags: vec!["server".into(), "prod".into()],
398 ..Default::default()
399 };
400 assert_eq!(s.summary(), "tags:prod,server");
401
402 let s = Selector {
403 tags_any: vec!["b".into(), "a".into()],
404 ..Default::default()
405 };
406 assert_eq!(s.summary(), "tags_any:a,b");
407
408 let s = Selector {
409 hosts: vec!["zzz".into(), "aaa".into()],
410 ..Default::default()
411 };
412 assert_eq!(s.summary(), "hosts:aaa,zzz");
413
414 assert_eq!(Selector::default().summary(), "unknown");
416 }
417
418 fn fleet_with_policy(policy_name: &str, policy: RolloutPolicy) -> FleetResolved {
419 let mut rollout_policies = HashMap::new();
420 rollout_policies.insert(policy_name.to_string(), policy);
421 FleetResolved {
422 schema_version: 1,
423 hosts: HashMap::new(),
424 channels: HashMap::new(),
425 rollout_policies,
426 waves: HashMap::new(),
427 edges: Vec::new(),
428 channel_edges: Vec::new(),
429 disruption_budgets: Vec::new(),
430 meta: Meta {
431 schema_version: 1,
432 signed_at: None,
433 ci_commit: None,
434 signature_algorithm: None,
435 },
436 }
437 }
438
439 #[test]
440 fn normalize_synthesizes_implicit_wave_for_all_at_once_without_waves() {
441 let mut fleet = fleet_with_policy(
450 "all-at-once",
451 RolloutPolicy {
452 strategy: STRATEGY_ALL_AT_ONCE.into(),
453 waves: Vec::new(),
454 health_gate: HealthGate::default(),
455 on_health_failure: OnHealthFailure::Halt,
456 },
457 );
458
459 normalize_rollout_policies(&mut fleet);
460
461 let policy = fleet
462 .rollout_policies
463 .get("all-at-once")
464 .expect("policy present");
465 assert_eq!(policy.waves.len(), 1, "implicit wave synthesized");
466 assert!(policy.waves[0].selector.all, "match-all selector");
467 assert_eq!(
468 policy.waves[0].soak_minutes, 0,
469 "zero soak — all-at-once means no staging hold",
470 );
471 }
472
473 #[test]
474 fn normalize_preserves_explicit_waves_on_all_at_once() {
475 let explicit_wave = PolicyWave {
480 selector: Selector {
481 hosts: vec!["web-01".into()],
482 ..Default::default()
483 },
484 soak_minutes: 15,
485 };
486 let mut fleet = fleet_with_policy(
487 "all-at-once",
488 RolloutPolicy {
489 strategy: STRATEGY_ALL_AT_ONCE.into(),
490 waves: vec![explicit_wave.clone()],
491 health_gate: HealthGate::default(),
492 on_health_failure: OnHealthFailure::Halt,
493 },
494 );
495
496 normalize_rollout_policies(&mut fleet);
497
498 let policy = fleet
499 .rollout_policies
500 .get("all-at-once")
501 .expect("policy present");
502 assert_eq!(policy.waves.len(), 1);
503 assert_eq!(policy.waves[0], explicit_wave);
504 }
505
506 #[test]
507 fn normalize_does_not_touch_canary_without_waves() {
508 let mut fleet = fleet_with_policy(
513 "canary",
514 RolloutPolicy {
515 strategy: "canary".into(),
516 waves: Vec::new(),
517 health_gate: HealthGate::default(),
518 on_health_failure: OnHealthFailure::Halt,
519 },
520 );
521
522 normalize_rollout_policies(&mut fleet);
523
524 let policy = fleet
525 .rollout_policies
526 .get("canary")
527 .expect("policy present");
528 assert!(
529 policy.waves.is_empty(),
530 "normalization is all-at-once-only; canary stays as declared",
531 );
532 }
533
534 #[test]
535 fn normalize_handles_empty_rollout_policies_map() {
536 let mut fleet = FleetResolved {
537 schema_version: 1,
538 hosts: HashMap::new(),
539 channels: HashMap::new(),
540 rollout_policies: HashMap::new(),
541 waves: HashMap::new(),
542 edges: Vec::new(),
543 channel_edges: Vec::new(),
544 disruption_budgets: Vec::new(),
545 meta: Meta {
546 schema_version: 1,
547 signed_at: None,
548 ci_commit: None,
549 signature_algorithm: None,
550 },
551 };
552 normalize_rollout_policies(&mut fleet);
553 assert!(fleet.rollout_policies.is_empty());
554 }
555}