1use chrono::{DateTime, Duration, Utc};
16use nixfleet_state_machine::HostState;
17
18use crate::planner_gates;
19use crate::planner_types::{FleetState, PlanAction, QuarantineSet, RolloutId, SignedManifestSet};
20
21pub fn plan_next(
28 manifests: &SignedManifestSet,
29 fleet_state: &FleetState,
30 quarantines: &QuarantineSet,
31 now: DateTime<Utc>,
32) -> Vec<PlanAction> {
33 let mut actions = Vec::new();
34
35 for (channel, rollout_manifest) in &manifests.rollouts {
57 let channel_ref = rollout_manifest.inner().channel_ref.clone();
58 let rollout_id = RolloutId::new(channel, &channel_ref);
59 if !fleet_state.rollouts.contains_key(&rollout_id) {
60 actions.push(PlanAction::OpenRollout {
61 rollout_id,
62 channel: channel.clone(),
63 target_ref: channel_ref,
64 });
65 }
66 }
67
68 let mut tick_dispatched: std::collections::HashMap<
81 planner_gates::disruption_budget::BudgetId,
82 u32,
83 > = std::collections::HashMap::new();
84 for (rollout_id, summary) in &fleet_state.rollouts {
85 if summary.terminal_at.is_some() {
86 continue; }
88 let channel = &summary.channel;
89
90 for ((rid, host), state) in &fleet_state.host_states {
91 if rid != rollout_id {
92 continue;
93 }
94 if state.state != HostState::Pending {
95 continue;
96 }
97 if state.dispatch_acked_at.is_some() {
103 continue;
104 }
105
106 let target_closure = &state.target_closure;
107 let block = planner_gates::evaluate_for_dispatch(
108 fleet_state,
109 manifests,
110 quarantines,
111 rollout_id,
112 host,
113 target_closure,
114 channel,
115 &tick_dispatched,
116 );
117 match block {
118 Some(b) => {
119 actions.push(PlanAction::DeferDispatch {
120 host: host.clone(),
121 rollout: rollout_id.clone(),
122 gate: b.discriminator(),
123 reason: b.reason(),
124 });
125 }
126 None => {
127 for budget in &summary.budgets {
132 if budget.hosts.iter().any(|h| h == host) {
133 *tick_dispatched.entry(budget.selector.clone()).or_insert(0) += 1;
134 }
135 }
136 let soak_due_at = state.soak_due_at.unwrap_or(now);
137 actions.push(PlanAction::QueueDispatch {
138 host: host.clone(),
139 rollout: rollout_id.clone(),
140 target_closure: target_closure.clone(),
141 soak_due_at,
142 });
143 }
144 }
145 }
146 }
147
148 actions
156}
157
158pub fn compute_soak_due_at(dispatched_at: DateTime<Utc>, soak_minutes: u32) -> DateTime<Utc> {
162 dispatched_at + Duration::minutes(soak_minutes as i64)
163}
164
165pub fn active_rollout_for_host<'a>(
167 fleet_state: &'a FleetState,
168 host_id: &str,
169) -> Option<&'a RolloutId> {
170 fleet_state
173 .host_states
174 .iter()
175 .find(|((_, h), _)| h == host_id)
176 .map(|((rid, _), _)| rid)
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::planner_types::*;
183 use chrono::TimeZone;
184 use nixfleet_state_machine::HostRolloutState;
185 use std::collections::HashMap;
186
187 fn t0() -> DateTime<Utc> {
188 Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
189 }
190
191 fn empty_fleet_state() -> FleetState {
192 FleetState {
193 host_states: HashMap::new(),
194 rollouts: HashMap::new(),
195 outstanding_failing_enforce_probes: HashMap::new(),
196 }
197 }
198
199 fn host_in(rollout: &str, host: &str, state: HostState) -> HostRolloutState {
203 let mut s = HostRolloutState::new_pending(
204 rollout.into(),
205 host.into(),
206 "stable".into(),
207 "target".into(),
208 t0(),
209 t0() + chrono::Duration::minutes(5),
210 );
211 s.state = state;
212 s
213 }
214
215 #[test]
220 fn compute_soak_due_at_is_pure_addition() {
221 assert_eq!(
222 compute_soak_due_at(t0(), 5),
223 t0() + chrono::Duration::minutes(5)
224 );
225 assert_eq!(compute_soak_due_at(t0(), 0), t0());
226 }
227
228 use crate::planner_types::SignedManifestSet;
235 use crate::verify::Verified;
236 use nixfleet_proto::testing::FleetBuilder;
237
238 fn signed_manifest_set(fleet: nixfleet_proto::FleetResolved) -> SignedManifestSet {
239 SignedManifestSet {
240 fleet: Verified::unverified_for_tests(fleet, t0()),
241 rollouts: HashMap::new(),
242 }
243 }
244
245 #[test]
246 fn plan_next_queues_dispatch_for_pending_host_with_no_gates_blocking() {
247 let fleet = FleetBuilder::new().host("h1", "stable").build();
248 let manifests = signed_manifest_set(fleet);
249
250 let mut fs = empty_fleet_state();
251 fs.rollouts.insert(
252 "r1".into(),
253 RolloutSummary {
254 rollout_id: "r1".into(),
255 channel: "stable".into(),
256 target_ref: "r1".into(),
257 opened_at: t0(),
258 terminal_at: None,
259 current_wave: 0,
260 budgets: Vec::new(),
261 },
262 );
263 fs.host_states.insert(
264 ("r1".into(), "h1".into()),
265 host_in("r1", "h1", HostState::Pending),
266 );
267
268 let quarantines = std::collections::HashMap::new();
269 let actions = plan_next(&manifests, &fs, &quarantines, t0());
270
271 assert!(actions.iter().any(|a| matches!(
272 a,
273 PlanAction::QueueDispatch { host, rollout, .. } if host == "h1" && rollout.as_str() == "r1"
274 )));
275 }
276
277 #[test]
278 fn plan_next_defers_dispatch_when_quarantined() {
279 let fleet = FleetBuilder::new().host("h1", "stable").build();
280 let manifests = signed_manifest_set(fleet);
281
282 let mut fs = empty_fleet_state();
283 fs.rollouts.insert(
284 "r1".into(),
285 RolloutSummary {
286 rollout_id: "r1".into(),
287 channel: "stable".into(),
288 target_ref: "r1".into(),
289 opened_at: t0(),
290 terminal_at: None,
291 current_wave: 0,
292 budgets: Vec::new(),
293 },
294 );
295 let mut h1 = host_in("r1", "h1", HostState::Pending);
296 h1.target_closure = "bad-hash".into();
297 fs.host_states.insert(("r1".into(), "h1".into()), h1);
298
299 let mut quarantines = std::collections::HashMap::new();
300 let mut set = std::collections::HashSet::new();
301 set.insert("bad-hash".to_string());
302 quarantines.insert("stable".to_string(), set);
303
304 let actions = plan_next(&manifests, &fs, &quarantines, t0());
305
306 assert!(actions.iter().any(|a| matches!(
307 a,
308 PlanAction::DeferDispatch { host, gate, .. } if host == "h1" && *gate == "quarantine"
309 )));
310 assert!(!actions.iter().any(|a| matches!(
312 a,
313 PlanAction::QueueDispatch { host, .. } if host == "h1"
314 )));
315 }
316
317 #[test]
318 fn plan_next_skips_acked_hosts() {
319 let fleet = FleetBuilder::new().host("h1", "stable").build();
323 let manifests = signed_manifest_set(fleet);
324
325 let mut fs = empty_fleet_state();
326 fs.rollouts.insert(
327 "r1".into(),
328 RolloutSummary {
329 rollout_id: "r1".into(),
330 channel: "stable".into(),
331 target_ref: "r1".into(),
332 opened_at: t0(),
333 terminal_at: None,
334 current_wave: 0,
335 budgets: Vec::new(),
336 },
337 );
338 let mut h1 = host_in("r1", "h1", HostState::Pending);
339 h1.dispatch_acked_at = Some(t0());
340 fs.host_states.insert(("r1".into(), "h1".into()), h1);
341
342 let quarantines = std::collections::HashMap::new();
343 let actions = plan_next(&manifests, &fs, &quarantines, t0());
344
345 assert!(!actions.iter().any(|a| matches!(
346 a,
347 PlanAction::QueueDispatch { host, .. } if host == "h1"
348 )));
349 }
350
351 #[test]
356 fn plan_next_emits_open_rollout_for_unopened_channel() {
357 let fleet = FleetBuilder::new().host("h1", "stable").build();
370 let mut manifests = signed_manifest_set(fleet);
371
372 let rollout_manifest = nixfleet_proto::RolloutManifest {
374 schema_version: 1,
375 display_name: "stable@r1".into(),
376 channel: "stable".into(),
377 channel_ref: "r1".into(),
378 fleet_resolved_hash: String::new(),
379 host_set: Vec::new(),
380 health_gate: nixfleet_proto::HealthGate::default(),
381 disruption_budgets: Vec::new(),
382 meta: nixfleet_proto::Meta {
383 schema_version: 1,
384 signed_at: Some(t0()),
385 ci_commit: None,
386 signature_algorithm: Some("ed25519".into()),
387 },
388 };
389 manifests.rollouts.insert(
390 "stable".to_string(),
391 Verified::unverified_for_tests(rollout_manifest, t0()),
392 );
393
394 let fs = empty_fleet_state();
395 let quarantines = std::collections::HashMap::new();
396 let actions = plan_next(&manifests, &fs, &quarantines, t0());
397
398 let open = actions
399 .iter()
400 .find_map(|a| match a {
401 PlanAction::OpenRollout {
402 rollout_id,
403 channel,
404 target_ref,
405 } if channel == "stable" => Some((rollout_id, target_ref)),
406 _ => None,
407 })
408 .expect("OpenRollout for stable must be emitted");
409 assert_eq!(
414 open.0.as_str(),
415 "stable@r1",
416 "rollout_id MUST equal RolloutId::new(channel, channel_ref) per RFC-0008 §6.3"
417 );
418 assert_eq!(
419 open.1, "r1",
420 "target_ref stays as raw channel_ref (the channel pointer)"
421 );
422 }
423
424 #[test]
425 fn plan_next_does_not_re_emit_open_rollout_for_terminal_rollout() {
426 let fleet = FleetBuilder::new().host("h1", "stable").build();
435 let mut manifests = signed_manifest_set(fleet);
436
437 let rollout_manifest = nixfleet_proto::RolloutManifest {
438 schema_version: 1,
439 display_name: "stable@r1".into(),
440 channel: "stable".into(),
441 channel_ref: "r1".into(),
442 fleet_resolved_hash: String::new(),
443 host_set: Vec::new(),
444 health_gate: nixfleet_proto::HealthGate::default(),
445 disruption_budgets: Vec::new(),
446 meta: nixfleet_proto::Meta {
447 schema_version: 1,
448 signed_at: Some(t0()),
449 ci_commit: None,
450 signature_algorithm: Some("ed25519".into()),
451 },
452 };
453 manifests.rollouts.insert(
454 "stable".to_string(),
455 Verified::unverified_for_tests(rollout_manifest, t0()),
456 );
457
458 let rollout_id = nixfleet_proto::RolloutId::new("stable", "r1");
461 let mut fs = empty_fleet_state();
462 fs.rollouts.insert(
463 rollout_id.clone(),
464 RolloutSummary {
465 rollout_id: rollout_id.clone(),
466 channel: "stable".into(),
467 target_ref: "r1".into(),
468 opened_at: t0(),
469 terminal_at: Some(t0() + chrono::Duration::minutes(10)),
470 current_wave: 0,
471 budgets: Vec::new(),
472 },
473 );
474
475 let quarantines = std::collections::HashMap::new();
476 let actions = plan_next(&manifests, &fs, &quarantines, t0());
477
478 assert!(
479 !actions.iter().any(|a| matches!(
480 a,
481 PlanAction::OpenRollout { rollout_id: rid, .. }
482 if rid.as_str() == "stable@r1"
483 )),
484 "Terminal rollout for same target_ref MUST NOT re-fire OpenRollout; actions: {actions:?}",
485 );
486 }
487
488 #[test]
489 fn plan_next_emits_open_rollout_for_new_target_ref_while_predecessor_active() {
490 let fleet = FleetBuilder::new().host("h1", "stable").build();
498 let mut manifests = signed_manifest_set(fleet);
499
500 let rollout_manifest = nixfleet_proto::RolloutManifest {
502 schema_version: 1,
503 display_name: "stable@r2".into(),
504 channel: "stable".into(),
505 channel_ref: "r2".into(),
506 fleet_resolved_hash: String::new(),
507 host_set: Vec::new(),
508 health_gate: nixfleet_proto::HealthGate::default(),
509 disruption_budgets: Vec::new(),
510 meta: nixfleet_proto::Meta {
511 schema_version: 1,
512 signed_at: Some(t0()),
513 ci_commit: None,
514 signature_algorithm: Some("ed25519".into()),
515 },
516 };
517 manifests.rollouts.insert(
518 "stable".to_string(),
519 Verified::unverified_for_tests(rollout_manifest, t0()),
520 );
521
522 let pred_id = nixfleet_proto::RolloutId::new("stable", "r1");
524 let mut fs = empty_fleet_state();
525 fs.rollouts.insert(
526 pred_id.clone(),
527 RolloutSummary {
528 rollout_id: pred_id.clone(),
529 channel: "stable".into(),
530 target_ref: "r1".into(),
531 opened_at: t0(),
532 terminal_at: None,
533 current_wave: 0,
534 budgets: Vec::new(),
535 },
536 );
537 let quarantines = std::collections::HashMap::new();
538 let actions = plan_next(&manifests, &fs, &quarantines, t0());
539
540 assert!(
541 actions.iter().any(|a| matches!(
542 a,
543 PlanAction::OpenRollout { rollout_id: rid, target_ref, .. }
544 if rid.as_str() == "stable@r2" && target_ref == "r2"
545 )),
546 "New target_ref MUST trigger OpenRollout for the new rollout_id even while predecessor is Active; actions: {actions:?}",
547 );
548 }
549}