nixfleet_agent/runtime/workers/
activation.rs

1//! Activation worker. Receives [`ActivationIntent`] from the applier,
2//! emits `LocalActivationStarted` to the reducer, drives the rich
3//! activation pipeline (`crate::activation`), and emits
4//! `LocalActivationCompleted` / `LocalActivationFailed` based on the
5//! pipeline's outcome.
6//!
7//! Test-mode (`NIXFLEET_AGENT_ACTIVATION_TEST_MODE`) short-circuits in
8//! this worker BEFORE entering the pipeline — smoke tests + integration
9//! tests rely on this to avoid spawning real subprocesses.
10
11use chrono::Utc;
12use nixfleet_proto::clock::ClockHandle;
13use nixfleet_state_machine::Event;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16
17use crate::activation::{
18    ActivationOutcome, ActivationTarget, RollbackOutcome, activate as run_pipeline,
19    rollback as run_rollback_pipeline,
20};
21
22use super::super::wire::ActivationIntent;
23use super::super::{AgentConfig, ReducerInput, ShutdownToken};
24
25pub fn spawn(
26    _cfg: AgentConfig,
27    _clock: ClockHandle,
28    input_tx: mpsc::Sender<ReducerInput>,
29    mut intent_rx: mpsc::Receiver<ActivationIntent>,
30    shutdown: ShutdownToken,
31) -> JoinHandle<()> {
32    tokio::spawn(async move {
33        let mut shutdown_rx = shutdown.into_inner();
34        loop {
35            tokio::select! {
36                biased;
37                _ = &mut shutdown_rx => {
38                    tracing::info!(
39                        target: "shutdown",
40                        task = "agent_activation",
41                        "task shut down",
42                    );
43                    return;
44                }
45                maybe = intent_rx.recv() => {
46                    let Some(intent) = maybe else { return };
47                    handle_intent(&input_tx, intent).await;
48                }
49            }
50        }
51    })
52}
53
54async fn handle_intent(input_tx: &mpsc::Sender<ReducerInput>, intent: ActivationIntent) {
55    let started_at = Utc::now();
56    // LOADBEARING: Failed → Activating is not a legal state-machine
57    // transition; rollback drives Failed → Reverted directly via
58    // `LocalRollbackCompleted` (RFC-0005 §3 / `failed.rs`). The forward
59    // path emits `LocalActivationStarted` to stamp
60    // `activation_started_at` (RFC-0005 §4.2); the rollback path skips
61    // it — emitting an Activation* event from Failed state causes the
62    // reducer to silently reject both LocalActivationStarted and
63    // LocalActivationCompleted, blocking Failed → Reverted bookkeeping
64    // and the quarantine populate.
65    if !intent.rollback {
66        let switch_method = "switch-to-configuration".to_string();
67        if let Err(err) = input_tx
68            .send(ReducerInput::HostEvent {
69                rollout_id: intent.rollout_id.clone(),
70                event: Event::LocalActivationStarted {
71                    started_at,
72                    switch_method,
73                    seq: 0,
74                },
75            })
76            .await
77        {
78            tracing::warn!(
79                target: "agent_activation",
80                error = %err,
81                "reducer input channel closed during ActivationStarted",
82            );
83            return;
84        }
85    }
86
87    let completed_event = if activation_test_mode_enabled() {
88        // Test-mode short-circuit: no real subprocess, no /run/current-system
89        // poll, no nix-store / nix-env invocation. Smoke + integration tests
90        // depend on this to exercise the runtime integration end-to-end
91        // through the durable queue without requiring a NixOS host.
92        // Production code paths MUST NEVER set
93        // `NIXFLEET_AGENT_ACTIVATION_TEST_MODE`; the gate is checked on
94        // every intent so a misconfigured test environment fails closed
95        // (no activation) rather than open (real subprocess).
96        let now = Utc::now();
97        tracing::info!(
98            target: "agent_activation",
99            target_closure = %intent.target_closure,
100            rollback = intent.rollback,
101            "activation: test-mode gate fired; skipping pipeline",
102        );
103        if intent.rollback {
104            // Rollback path's test-mode synthesis: emit LocalRollbackCompleted
105            // with the intent's target as the reverted-to closure (in test
106            // mode there's no real /run/current-system to read).
107            Event::LocalRollbackCompleted {
108                reverted_to_closure: intent.target_closure.clone(),
109                exit_code: 0,
110                completed_at: now,
111                seq: 0,
112            }
113        } else {
114            Event::LocalActivationCompleted {
115                observed_current_closure: intent.target_closure.clone(),
116                exit_code: 0,
117                completed_at: now,
118                seq: 0,
119            }
120        }
121    } else if intent.rollback {
122        let now = Utc::now();
123        match run_rollback_pipeline().await {
124            Ok(RollbackOutcome::FiredAndPolled {
125                reverted_to_closure,
126            }) => {
127                tracing::info!(
128                    target: "agent_activation",
129                    rollout_id = %intent.rollout_id,
130                    %reverted_to_closure,
131                    "activation: rollback pipeline completed; firing LocalRollbackCompleted",
132                );
133                // LOADBEARING: only `LocalRollbackCompleted` (not
134                // `LocalActivationCompleted`) is legal from Failed
135                // state per state-machine `failed.rs`. The
136                // reverted_to_closure is the post-rollback
137                // `/run/current-system` basename observed by
138                // `verify_poll` in
139                // `activation::rollback::rollback_with`.
140                Event::LocalRollbackCompleted {
141                    reverted_to_closure,
142                    exit_code: 0,
143                    completed_at: now,
144                    seq: 0,
145                }
146            }
147            Ok(RollbackOutcome::Failed { phase, exit_code }) => Event::LocalActivationFailed {
148                exit_code: exit_code.unwrap_or(-1),
149                stderr_tail: format!("rollback failed at phase {phase}"),
150                failed_at: now,
151                seq: 0,
152            },
153            Err(err) => Event::LocalActivationFailed {
154                exit_code: -1,
155                stderr_tail: format!("rollback pipeline error: {err}"),
156                failed_at: now,
157                seq: 0,
158            },
159        }
160    } else {
161        let target = ActivationTarget {
162            closure_hash: intent.target_closure.clone(),
163            channel_ref: intent.rollout_id.channel_ref().to_string(),
164        };
165        let now = Utc::now();
166        match run_pipeline(&target).await {
167            Ok(ActivationOutcome::FiredAndPolled) => {
168                tracing::info!(
169                    target: "agent_activation",
170                    rollout_id = %intent.rollout_id,
171                    target_closure = %intent.target_closure,
172                    "activation: pipeline completed (FiredAndPolled)",
173                );
174                Event::LocalActivationCompleted {
175                    // FiredAndPolled means verify_poll observed
176                    // /run/current-system match the expected target — so
177                    // observed == target by construction at this point.
178                    observed_current_closure: intent.target_closure.clone(),
179                    exit_code: 0,
180                    completed_at: now,
181                    seq: 0,
182                }
183            }
184            Ok(ActivationOutcome::DeferredPendingReboot { component }) => {
185                // Profile set; bootloader updated; live switch deferred
186                // because `component` (dbus/systemd/kernel/init) cannot
187                // be live-swapped per nixos-rebuild's own rules. New
188                // generation activates on next reboot.
189                //
190                // LIFT #2: emit `LocalActivationDeferred` (state stays
191                // Activating, no fake exit_code=0, no fake
192                // observed_current_closure). On next reboot the agent's
193                // boot-recovery handshake observes current == target
194                // and CP's handle_heartbeat (LIFT #1) synthesizes the
195                // RemoteActivationCompleted that advances the cascade.
196                //
197                // Pre-LIFT-#2 this emitted LocalActivationCompleted with
198                // `exit_code: 0` (a lie — activation didn't take live)
199                // and `observed_current_closure: <pre-switch closure or
200                // fallback to target>` (also a lie — the observation
201                // pointed at the wrong thing). The state machine then
202                // transitioned Activating → Soaking with current_closure
203                // ≠ target_closure, producing a soft-converged state
204                // that misled operators about the reboot requirement.
205                tracing::warn!(
206                    target: "agent_activation",
207                    rollout_id = %intent.rollout_id,
208                    target_closure = %intent.target_closure,
209                    component = %component,
210                    "activation: deferred pending reboot (critical component swap); bootloader updated, host stays at Activating until reboot triggers LIFT #1's retroactive confirmation",
211                );
212                Event::LocalActivationDeferred {
213                    component,
214                    deferred_at: now,
215                    seq: 0,
216                }
217            }
218            Ok(ActivationOutcome::SignatureMismatch {
219                closure_hash,
220                stderr_tail,
221            }) => {
222                tracing::error!(
223                    target: "agent_activation",
224                    rollout_id = %intent.rollout_id,
225                    %closure_hash,
226                    "activation: signature mismatch — closure refused by substituter trust",
227                );
228                Event::LocalActivationFailed {
229                    // Distinct exit_code -2 lets dashboards route trust
230                    // violations separately from generic switch failures.
231                    exit_code: -2,
232                    stderr_tail: format!("signature mismatch on {closure_hash}: {stderr_tail}"),
233                    failed_at: now,
234                    seq: 0,
235                }
236            }
237            Ok(ActivationOutcome::RealiseFailed { reason }) => Event::LocalActivationFailed {
238                exit_code: -1,
239                stderr_tail: format!("realise failed: {reason}"),
240                failed_at: now,
241                seq: 0,
242            },
243            Ok(ActivationOutcome::SwitchFailed { phase, exit_code }) => {
244                Event::LocalActivationFailed {
245                    exit_code: exit_code.unwrap_or(-1),
246                    stderr_tail: format!("switch failed at phase {phase}"),
247                    failed_at: now,
248                    seq: 0,
249                }
250            }
251            Ok(ActivationOutcome::VerifyMismatch { expected, actual }) => {
252                tracing::error!(
253                    target: "agent_activation",
254                    rollout_id = %intent.rollout_id,
255                    %expected,
256                    %actual,
257                    "activation: /run/current-system flipped to unexpected basename",
258                );
259                Event::LocalActivationFailed {
260                    // Distinct exit_code -3 surfaces verify-mismatch as a
261                    // rollback-required class (third closure observed).
262                    exit_code: -3,
263                    stderr_tail: format!("verify mismatch: expected={expected} actual={actual}",),
264                    failed_at: now,
265                    seq: 0,
266                }
267            }
268            Err(err) => Event::LocalActivationFailed {
269                exit_code: -1,
270                stderr_tail: format!("activation pipeline error: {err}"),
271                failed_at: now,
272                seq: 0,
273            },
274        }
275    };
276
277    if let Err(err) = input_tx
278        .send(ReducerInput::HostEvent {
279            rollout_id: intent.rollout_id,
280            event: completed_event,
281        })
282        .await
283    {
284        tracing::warn!(
285            target: "agent_activation",
286            error = %err,
287            "reducer input channel closed during ActivationCompleted/Failed",
288        );
289    }
290}
291
292/// Test-mode gate. When the env var `NIXFLEET_AGENT_ACTIVATION_TEST_MODE`
293/// is set to ANY value, `handle_intent` short-circuits the activation
294/// pipeline and emits `LocalActivationCompleted` with the intent's
295/// target as the observed value. Smoke tests
296/// (`tests/runtime_smoke.rs`) MUST set this — they exercise the runtime
297/// integration end-to-end through the durable queue, not the actual
298/// activation subprocess. Production code paths must NEVER set it; the
299/// gate is read on every intent so a test setup mistake fails closed
300/// (no activation) rather than open (real subprocess).
301fn activation_test_mode_enabled() -> bool {
302    std::env::var_os("NIXFLEET_AGENT_ACTIVATION_TEST_MODE").is_some()
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    /// Coverage of the test-mode gate at the worker layer. The activation
310    /// pipeline's unit tests live in `crate::activation::*::tests`;
311    /// integration tests in `tests/runtime_smoke.rs` exercise the
312    /// short-circuited path end-to-end through the reducer + outbound
313    /// queue. This minimal worker-layer test pins the env-var contract.
314    #[test]
315    fn test_mode_gate_responds_to_env_var() {
316        let key = "NIXFLEET_AGENT_ACTIVATION_TEST_MODE";
317        // SAFETY: env mutation is process-global; this test reads its
318        // current value and restores it. Test framework runs tests in
319        // separate threads but env-var read/write is unsafe under
320        // concurrent mutation. We're not racing against a writer here —
321        // this is the only test that touches this var — but the
322        // remove + set + restore sequence is wrapped in `unsafe`
323        // explicitly per std::env::set_var's safety docs.
324        let prior = std::env::var_os(key);
325        unsafe {
326            std::env::remove_var(key);
327        }
328        assert!(!activation_test_mode_enabled());
329        unsafe {
330            std::env::set_var(key, "1");
331        }
332        assert!(activation_test_mode_enabled());
333        // Restore prior state so other tests in the same process don't see
334        // a leaked env var.
335        unsafe {
336            match prior {
337                Some(v) => std::env::set_var(key, v),
338                None => std::env::remove_var(key),
339            }
340        }
341    }
342
343    /// Regression guard for the rollback path's event-type contract.
344    /// The state-machine handler at `failed.rs:29-62` legalises only
345    /// `LocalRollbackCompleted` and `RemoteRollbackComplete` from
346    /// Failed state; emitting `LocalActivationStarted` or
347    /// `LocalActivationCompleted` instead would be rejected by the
348    /// reducer as "illegal transition", silently blocking Failed →
349    /// Reverted and the quarantine populate.
350    ///
351    /// The rollback path:
352    ///   (a) skips `LocalActivationStarted` (rollback doesn't start
353    ///       activation)
354    ///   (b) emits `LocalRollbackCompleted { reverted_to_closure, ... }`
355    ///       on success — handler sets state.state = Reverted, populates
356    ///       `reverted_at` + `reverted_to`, and emits
357    ///       `OutboundAgentEvent::RollbackComplete` for CP propagation
358    ///       (driving CP's `Effect::RemoteInsertQuarantine` on the bad
359    ///       SHA).
360    ///
361    /// This test pins the worker's test-mode short-circuit path for
362    /// rollback intents. The production pipeline path is the same code
363    /// branch with real `run_rollback_pipeline()` instead of the test
364    /// gate.
365    #[tokio::test]
366    async fn rollback_intent_emits_local_rollback_completed_not_activation_completed() {
367        use crate::runtime::wire::ActivationIntent;
368        use nixfleet_proto::RolloutId;
369        use nixfleet_state_machine::Event;
370        use tokio::sync::mpsc;
371
372        let key = "NIXFLEET_AGENT_ACTIVATION_TEST_MODE";
373        let prior = std::env::var_os(key);
374        // SAFETY: see test_mode_gate_responds_to_env_var.
375        unsafe {
376            std::env::set_var(key, "1");
377        }
378
379        let (tx, mut rx) = mpsc::channel::<ReducerInput>(8);
380        let intent = ActivationIntent {
381            rollout_id: RolloutId::new("edge", "abc1234deadbeef"),
382            target_closure: "rollback-target-closure".to_string(),
383            rollback: true,
384        };
385
386        handle_intent(&tx, intent).await;
387
388        let mut events = Vec::new();
389        while let Ok(ev) = rx.try_recv() {
390            events.push(ev);
391        }
392
393        // Pre-fix this would have emitted [LocalActivationStarted,
394        // LocalActivationCompleted]. Post-fix it must emit exactly
395        // [LocalRollbackCompleted] — no Activation* events.
396        assert_eq!(
397            events.len(),
398            1,
399            "rollback path must emit exactly one event (LocalRollbackCompleted); got {} events",
400            events.len(),
401        );
402        let ReducerInput::HostEvent { event, .. } = events.into_iter().next().unwrap() else {
403            panic!("expected HostEvent variant");
404        };
405        match event {
406            Event::LocalRollbackCompleted {
407                reverted_to_closure,
408                ..
409            } => {
410                assert_eq!(
411                    reverted_to_closure, "rollback-target-closure",
412                    "reverted_to_closure should carry the post-rollback closure (in test-mode short-circuit, the intent's target is used)",
413                );
414            }
415            other => {
416                panic!("rollback path must emit LocalRollbackCompleted; got: {other:?}")
417            }
418        }
419
420        // Restore env state.
421        unsafe {
422            match prior {
423                Some(v) => std::env::set_var(key, v),
424                None => std::env::remove_var(key),
425            }
426        }
427    }
428}