nixfleet_reconciler/
planner.rs

1//! Pure planner (RFC-0006 §4.1).
2//!
3//! Emits `OpenRollout` actions for channels with a verified manifest but
4//! no rollout row for the current target_ref yet, and walks each active
5//! rollout's `Pending` hosts through the gate stack to produce
6//! `QueueDispatch` (gates pass) or `DeferDispatch` (gates block) actions.
7//!
8//! Properties enforced by the signature:
9//!
10//! - **Pure.** No `chrono::Utc::now()`, no DB reads, no HTTP. `now` is
11//!   a parameter so tests advance time deterministically.
12//! - **Verified-only inputs.** `SignedManifestSet` carries `Verified<T>`;
13//!   the planner cannot accidentally consume an unverified manifest.
14
15use chrono::{DateTime, Duration, Utc};
16use nixfleet_state_machine::HostState;
17
18use crate::planner_gates;
19use crate::planner_types::{FleetState, PlanAction, QuarantineSet, RolloutId, SignedManifestSet};
20
21/// Pure reducer: given the trust-verified manifests, the current
22/// per-host state aggregate, and the active quarantine table, produce a
23/// list of `PlanAction`s for the applier to execute.
24///
25/// Determinism: `(manifests, fleet_state, quarantines, now)` →
26/// `Vec<PlanAction>`. No hidden state, no I/O. Same inputs → same output.
27pub fn plan_next(
28    manifests: &SignedManifestSet,
29    fleet_state: &FleetState,
30    quarantines: &QuarantineSet,
31    now: DateTime<Utc>,
32) -> Vec<PlanAction> {
33    let mut actions = Vec::new();
34
35    // 1. Open rollouts for channels whose verified manifest advertises
36    //    a target_ref that has no rollout row yet.
37    //
38    // LOADBEARING: predicate is `rollouts.contains_key(&rollout_id)` —
39    // rollout-id-keyed, not channel-keyed. A channel-keyed predicate
40    // would (a) re-fire OpenRollout against a Terminal rollout for the
41    // same target_ref on every tick (clobbering Converged
42    // host_rollout_records back to Pending and blocking the
43    // channel-edges gate), and (b) fail to open a successor rollout
44    // when a new target_ref arrives while a predecessor is still
45    // Active (the channel-keyed check stays true under the
46    // predecessor, supersession never triggers).
47    //
48    // The rollout-id-keyed predicate splits the two intents: this site
49    // asks "has this specific target_ref been opened?"; channel-edges
50    // asks "is anything in flight on this predecessor channel?".
51    //
52    // `rollout_id` is the canonical `"{channel}@{channel_ref}"`
53    // composite (RFC-0008 §6.3). `target_ref` stays as the raw
54    // channel_ref since it identifies the channel pointer, not the
55    // rollout.
56    for (channel, rollout_manifest) in &manifests.rollouts {
57        let channel_ref = rollout_manifest.inner().channel_ref.clone();
58        let rollout_id = RolloutId::new(channel, &channel_ref);
59        if !fleet_state.rollouts.contains_key(&rollout_id) {
60            actions.push(PlanAction::OpenRollout {
61                rollout_id,
62                channel: channel.clone(),
63                target_ref: channel_ref,
64            });
65        }
66    }
67
68    // 2. Per active rollout, walk hosts in `Pending` state and consult
69    //    the gate stack. Pass → QueueDispatch. Block → DeferDispatch
70    //    (telemetry). No state change either way — applier acts.
71    //
72    // LOADBEARING: within-tick budget accumulator. With `Pending`
73    // excluded from `is_in_flight` (see
74    // `planner_gates/disruption_budget.rs`), the gate would otherwise
75    // wave through N Pending hosts on a `max_in_flight = 1` budget
76    // because none have transitioned to Activating yet within the same
77    // tick. `tick_dispatched` carries per-budget dispatch counts
78    // emitted earlier in this loop; the gate adds them to the live
79    // in-flight count before checking against `max`.
80    let mut tick_dispatched: std::collections::HashMap<
81        planner_gates::disruption_budget::BudgetId,
82        u32,
83    > = std::collections::HashMap::new();
84    for (rollout_id, summary) in &fleet_state.rollouts {
85        if summary.terminal_at.is_some() {
86            continue; // closed rollout — no more dispatches
87        }
88        let channel = &summary.channel;
89
90        for ((rid, host), state) in &fleet_state.host_states {
91            if rid != rollout_id {
92                continue;
93            }
94            if state.state != HostState::Pending {
95                continue;
96            }
97            // dispatch_acked_at != None means the agent has already
98            // ack'd; that should have advanced the state past Pending
99            // via the reducer. If we're still Pending, the dispatch is
100            // queued and the agent hasn't pulled it yet — applier
101            // skips queueing a duplicate.
102            if state.dispatch_acked_at.is_some() {
103                continue;
104            }
105
106            let target_closure = &state.target_closure;
107            let block = planner_gates::evaluate_for_dispatch(
108                fleet_state,
109                manifests,
110                quarantines,
111                rollout_id,
112                host,
113                target_closure,
114                channel,
115                &tick_dispatched,
116            );
117            match block {
118                Some(b) => {
119                    actions.push(PlanAction::DeferDispatch {
120                        host: host.clone(),
121                        rollout: rollout_id.clone(),
122                        gate: b.discriminator(),
123                        reason: b.reason(),
124                    });
125                }
126                None => {
127                    // Increment the within-tick counter for every
128                    // budget this host belongs to BEFORE pushing the
129                    // QueueDispatch so the next host's gate-check sees
130                    // the updated count.
131                    for budget in &summary.budgets {
132                        if budget.hosts.iter().any(|h| h == host) {
133                            *tick_dispatched.entry(budget.selector.clone()).or_insert(0) += 1;
134                        }
135                    }
136                    let soak_due_at = state.soak_due_at.unwrap_or(now);
137                    actions.push(PlanAction::QueueDispatch {
138                        host: host.clone(),
139                        rollout: rollout_id.clone(),
140                        target_closure: target_closure.clone(),
141                        soak_due_at,
142                    });
143                }
144            }
145        }
146    }
147
148    // Terminal-transition emission lives on the rollout reducer (per
149    // RFC-0008 §3 + §7). The planner does not emit
150    // `MarkChannelTerminal`; the rollout reducer's
151    // `RolloutEffect::RecordRolloutTransition` drives the transition
152    // when it consumes the last per-host
153    // `HostStateChanged → Converged`.
154
155    actions
156}
157
158/// Compute `soak_due_at` for a freshly-dispatched host. Pure: takes
159/// `dispatched_at` + the policy's wave soak duration; returns the time
160/// at which the soak window elapses.
161pub fn compute_soak_due_at(dispatched_at: DateTime<Utc>, soak_minutes: u32) -> DateTime<Utc> {
162    dispatched_at + Duration::minutes(soak_minutes as i64)
163}
164
165/// Lookup helper: given a host_id, find its active rollout id (if any).
166pub fn active_rollout_for_host<'a>(
167    fleet_state: &'a FleetState,
168    host_id: &str,
169) -> Option<&'a RolloutId> {
170    // We don't track host -> rollout directly; reverse-scan host_states.
171    // The active set is small (≤fleet size); fine for v0.2 scale.
172    fleet_state
173        .host_states
174        .iter()
175        .find(|((_, h), _)| h == host_id)
176        .map(|((rid, _), _)| rid)
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::planner_types::*;
183    use chrono::TimeZone;
184    use nixfleet_state_machine::HostRolloutState;
185    use std::collections::HashMap;
186
187    fn t0() -> DateTime<Utc> {
188        Utc.with_ymd_and_hms(2026, 5, 16, 1, 0, 0).unwrap()
189    }
190
191    fn empty_fleet_state() -> FleetState {
192        FleetState {
193            host_states: HashMap::new(),
194            rollouts: HashMap::new(),
195            outstanding_failing_enforce_probes: HashMap::new(),
196        }
197    }
198
199    // `rollout_summary` helper deleted in Phase 10a — only used by the
200    // (now-removed) `maybe_mark_terminal_*` tests.
201
202    fn host_in(rollout: &str, host: &str, state: HostState) -> HostRolloutState {
203        let mut s = HostRolloutState::new_pending(
204            rollout.into(),
205            host.into(),
206            "stable".into(),
207            "target".into(),
208            t0(),
209            t0() + chrono::Duration::minutes(5),
210        );
211        s.state = state;
212        s
213    }
214
215    // Phase 10a removed the `maybe_mark_terminal` helper + tests; the
216    // rollout reducer (Phase 10b) drives terminal transitions via
217    // `RolloutEffect::RecordRolloutTransition`.
218
219    #[test]
220    fn compute_soak_due_at_is_pure_addition() {
221        assert_eq!(
222            compute_soak_due_at(t0(), 5),
223            t0() + chrono::Duration::minutes(5)
224        );
225        assert_eq!(compute_soak_due_at(t0(), 0), t0());
226    }
227
228    // plan_next integration tests
229    //
230    // These run plan_next() end-to-end against the new gate stack. The
231    // SignedManifestSet is built via `Verified::unverified_for_tests`
232    // (test-only constructor in verify.rs).
233
234    use crate::planner_types::SignedManifestSet;
235    use crate::verify::Verified;
236    use nixfleet_proto::testing::FleetBuilder;
237
238    fn signed_manifest_set(fleet: nixfleet_proto::FleetResolved) -> SignedManifestSet {
239        SignedManifestSet {
240            fleet: Verified::unverified_for_tests(fleet, t0()),
241            rollouts: HashMap::new(),
242        }
243    }
244
245    #[test]
246    fn plan_next_queues_dispatch_for_pending_host_with_no_gates_blocking() {
247        let fleet = FleetBuilder::new().host("h1", "stable").build();
248        let manifests = signed_manifest_set(fleet);
249
250        let mut fs = empty_fleet_state();
251        fs.rollouts.insert(
252            "r1".into(),
253            RolloutSummary {
254                rollout_id: "r1".into(),
255                channel: "stable".into(),
256                target_ref: "r1".into(),
257                opened_at: t0(),
258                terminal_at: None,
259                current_wave: 0,
260                budgets: Vec::new(),
261            },
262        );
263        fs.host_states.insert(
264            ("r1".into(), "h1".into()),
265            host_in("r1", "h1", HostState::Pending),
266        );
267
268        let quarantines = std::collections::HashMap::new();
269        let actions = plan_next(&manifests, &fs, &quarantines, t0());
270
271        assert!(actions.iter().any(|a| matches!(
272            a,
273            PlanAction::QueueDispatch { host, rollout, .. } if host == "h1" && rollout.as_str() == "r1"
274        )));
275    }
276
277    #[test]
278    fn plan_next_defers_dispatch_when_quarantined() {
279        let fleet = FleetBuilder::new().host("h1", "stable").build();
280        let manifests = signed_manifest_set(fleet);
281
282        let mut fs = empty_fleet_state();
283        fs.rollouts.insert(
284            "r1".into(),
285            RolloutSummary {
286                rollout_id: "r1".into(),
287                channel: "stable".into(),
288                target_ref: "r1".into(),
289                opened_at: t0(),
290                terminal_at: None,
291                current_wave: 0,
292                budgets: Vec::new(),
293            },
294        );
295        let mut h1 = host_in("r1", "h1", HostState::Pending);
296        h1.target_closure = "bad-hash".into();
297        fs.host_states.insert(("r1".into(), "h1".into()), h1);
298
299        let mut quarantines = std::collections::HashMap::new();
300        let mut set = std::collections::HashSet::new();
301        set.insert("bad-hash".to_string());
302        quarantines.insert("stable".to_string(), set);
303
304        let actions = plan_next(&manifests, &fs, &quarantines, t0());
305
306        assert!(actions.iter().any(|a| matches!(
307            a,
308            PlanAction::DeferDispatch { host, gate, .. } if host == "h1" && *gate == "quarantine"
309        )));
310        // No QueueDispatch for this host.
311        assert!(!actions.iter().any(|a| matches!(
312            a,
313            PlanAction::QueueDispatch { host, .. } if host == "h1"
314        )));
315    }
316
317    #[test]
318    fn plan_next_skips_acked_hosts() {
319        // Host in Pending but with dispatch_acked_at set — applier has
320        // already queued the Dispatch; reducer just hasn't seen the ack
321        // yet. plan_next must not re-emit QueueDispatch.
322        let fleet = FleetBuilder::new().host("h1", "stable").build();
323        let manifests = signed_manifest_set(fleet);
324
325        let mut fs = empty_fleet_state();
326        fs.rollouts.insert(
327            "r1".into(),
328            RolloutSummary {
329                rollout_id: "r1".into(),
330                channel: "stable".into(),
331                target_ref: "r1".into(),
332                opened_at: t0(),
333                terminal_at: None,
334                current_wave: 0,
335                budgets: Vec::new(),
336            },
337        );
338        let mut h1 = host_in("r1", "h1", HostState::Pending);
339        h1.dispatch_acked_at = Some(t0());
340        fs.host_states.insert(("r1".into(), "h1".into()), h1);
341
342        let quarantines = std::collections::HashMap::new();
343        let actions = plan_next(&manifests, &fs, &quarantines, t0());
344
345        assert!(!actions.iter().any(|a| matches!(
346            a,
347            PlanAction::QueueDispatch { host, .. } if host == "h1"
348        )));
349    }
350
351    // `plan_next_emits_mark_terminal_when_rollout_converges` deleted
352    // alongside `MarkChannelTerminal`. The rollout reducer's terminal
353    // transition is covered by `tests/rollout_rederivability.rs` in 10b.
354
355    #[test]
356    fn plan_next_emits_open_rollout_for_unopened_channel() {
357        // A channel with a verified rollout manifest but no
358        // host_rollout_records yet — planner emits OpenRollout for
359        // the applier to create the per-host records.
360        //
361        // LOADBEARING: rollout_id is the canonical
362        // `RolloutId::new(channel, channel_ref)` composite. The
363        // applier-side `build_fleet_state` reads `manifest.channel_ref`
364        // to look up host_rollout_records; a mismatched rollout_id
365        // shape leaves `fleet_state.host_states` empty for the
366        // rollout and Pending → QueueDispatch iterates zero hosts.
367        // Fixture uses `channel != channel_ref` to surface the
368        // mismatch.
369        let fleet = FleetBuilder::new().host("h1", "stable").build();
370        let mut manifests = signed_manifest_set(fleet);
371
372        // Synthesise a verified rollout manifest.
373        let rollout_manifest = nixfleet_proto::RolloutManifest {
374            schema_version: 1,
375            display_name: "stable@r1".into(),
376            channel: "stable".into(),
377            channel_ref: "r1".into(),
378            fleet_resolved_hash: String::new(),
379            host_set: Vec::new(),
380            health_gate: nixfleet_proto::HealthGate::default(),
381            disruption_budgets: Vec::new(),
382            meta: nixfleet_proto::Meta {
383                schema_version: 1,
384                signed_at: Some(t0()),
385                ci_commit: None,
386                signature_algorithm: Some("ed25519".into()),
387            },
388        };
389        manifests.rollouts.insert(
390            "stable".to_string(),
391            Verified::unverified_for_tests(rollout_manifest, t0()),
392        );
393
394        let fs = empty_fleet_state();
395        let quarantines = std::collections::HashMap::new();
396        let actions = plan_next(&manifests, &fs, &quarantines, t0());
397
398        let open = actions
399            .iter()
400            .find_map(|a| match a {
401                PlanAction::OpenRollout {
402                    rollout_id,
403                    channel,
404                    target_ref,
405                } if channel == "stable" => Some((rollout_id, target_ref)),
406                _ => None,
407            })
408            .expect("OpenRollout for stable must be emitted");
409        // LOADBEARING: rollout_id is the canonical
410        // `"{channel}@{channel_ref}"` composite (RFC-0008 §6.3), not
411        // channel_ref alone — multiple channels can share a ref, so a
412        // ref-only identity would collide.
413        assert_eq!(
414            open.0.as_str(),
415            "stable@r1",
416            "rollout_id MUST equal RolloutId::new(channel, channel_ref) per RFC-0008 §6.3"
417        );
418        assert_eq!(
419            open.1, "r1",
420            "target_ref stays as raw channel_ref (the channel pointer)"
421        );
422    }
423
424    #[test]
425    fn plan_next_does_not_re_emit_open_rollout_for_terminal_rollout() {
426        // LOADBEARING: Terminal rollouts stay in the `rollouts` table
427        // (RFC-0008 §6.3) so the channel-edges gate can read
428        // `terminal_at`. The OpenRollout predicate MUST be
429        // rollout-id-keyed; a channel-keyed predicate that filtered
430        // by `terminal_at.is_none()` would re-fire OpenRollout for a
431        // Terminal rollout's target_ref every tick (the applier's
432        // host_rollout_records upsert would clobber Converged back to
433        // Pending, freezing the channel-edges gate closed).
434        let fleet = FleetBuilder::new().host("h1", "stable").build();
435        let mut manifests = signed_manifest_set(fleet);
436
437        let rollout_manifest = nixfleet_proto::RolloutManifest {
438            schema_version: 1,
439            display_name: "stable@r1".into(),
440            channel: "stable".into(),
441            channel_ref: "r1".into(),
442            fleet_resolved_hash: String::new(),
443            host_set: Vec::new(),
444            health_gate: nixfleet_proto::HealthGate::default(),
445            disruption_budgets: Vec::new(),
446            meta: nixfleet_proto::Meta {
447                schema_version: 1,
448                signed_at: Some(t0()),
449                ci_commit: None,
450                signature_algorithm: Some("ed25519".into()),
451            },
452        };
453        manifests.rollouts.insert(
454            "stable".to_string(),
455            Verified::unverified_for_tests(rollout_manifest, t0()),
456        );
457
458        // Terminal rollout for the SAME target_ref: present in
459        // `rollouts` (channel-edges still needs to see terminal_at).
460        let rollout_id = nixfleet_proto::RolloutId::new("stable", "r1");
461        let mut fs = empty_fleet_state();
462        fs.rollouts.insert(
463            rollout_id.clone(),
464            RolloutSummary {
465                rollout_id: rollout_id.clone(),
466                channel: "stable".into(),
467                target_ref: "r1".into(),
468                opened_at: t0(),
469                terminal_at: Some(t0() + chrono::Duration::minutes(10)),
470                current_wave: 0,
471                budgets: Vec::new(),
472            },
473        );
474
475        let quarantines = std::collections::HashMap::new();
476        let actions = plan_next(&manifests, &fs, &quarantines, t0());
477
478        assert!(
479            !actions.iter().any(|a| matches!(
480                a,
481                PlanAction::OpenRollout { rollout_id: rid, .. }
482                    if rid.as_str() == "stable@r1"
483            )),
484            "Terminal rollout for same target_ref MUST NOT re-fire OpenRollout; actions: {actions:?}",
485        );
486    }
487
488    #[test]
489    fn plan_next_emits_open_rollout_for_new_target_ref_while_predecessor_active() {
490        // LOADBEARING: a new target_ref on a channel with an already-
491        // Active rollout MUST trigger OpenRollout for the new
492        // rollout_id. The rollout-id-keyed predicate ("new target_ref
493        // → new rollout_id → not in `rollouts` table → OpenRollout
494        // fires") is what reaches the applier's supersession path
495        // (predecessor rollout transitions to Superseded when the
496        // successor opens).
497        let fleet = FleetBuilder::new().host("h1", "stable").build();
498        let mut manifests = signed_manifest_set(fleet);
499
500        // Manifest now advertises the NEW target_ref `r2`.
501        let rollout_manifest = nixfleet_proto::RolloutManifest {
502            schema_version: 1,
503            display_name: "stable@r2".into(),
504            channel: "stable".into(),
505            channel_ref: "r2".into(),
506            fleet_resolved_hash: String::new(),
507            host_set: Vec::new(),
508            health_gate: nixfleet_proto::HealthGate::default(),
509            disruption_budgets: Vec::new(),
510            meta: nixfleet_proto::Meta {
511                schema_version: 1,
512                signed_at: Some(t0()),
513                ci_commit: None,
514                signature_algorithm: Some("ed25519".into()),
515            },
516        };
517        manifests.rollouts.insert(
518            "stable".to_string(),
519            Verified::unverified_for_tests(rollout_manifest, t0()),
520        );
521
522        // Predecessor rollout `stable@r1` is Active.
523        let pred_id = nixfleet_proto::RolloutId::new("stable", "r1");
524        let mut fs = empty_fleet_state();
525        fs.rollouts.insert(
526            pred_id.clone(),
527            RolloutSummary {
528                rollout_id: pred_id.clone(),
529                channel: "stable".into(),
530                target_ref: "r1".into(),
531                opened_at: t0(),
532                terminal_at: None,
533                current_wave: 0,
534                budgets: Vec::new(),
535            },
536        );
537        let quarantines = std::collections::HashMap::new();
538        let actions = plan_next(&manifests, &fs, &quarantines, t0());
539
540        assert!(
541            actions.iter().any(|a| matches!(
542                a,
543                PlanAction::OpenRollout { rollout_id: rid, target_ref, .. }
544                    if rid.as_str() == "stable@r2" && target_ref == "r2"
545            )),
546            "New target_ref MUST trigger OpenRollout for the new rollout_id even while predecessor is Active; actions: {actions:?}",
547        );
548    }
549}