nixfleet_agent/runtime/workers/activation.rs
1//! Activation worker. Receives [`ActivationIntent`] from the applier,
2//! emits `LocalActivationStarted` to the reducer, drives the rich
3//! activation pipeline (`crate::activation`), and emits
4//! `LocalActivationCompleted` / `LocalActivationFailed` based on the
5//! pipeline's outcome.
6//!
7//! Test-mode (`NIXFLEET_AGENT_ACTIVATION_TEST_MODE`) short-circuits in
8//! this worker BEFORE entering the pipeline — smoke tests + integration
9//! tests rely on this to avoid spawning real subprocesses.
10
11use chrono::Utc;
12use nixfleet_proto::clock::ClockHandle;
13use nixfleet_state_machine::Event;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16
17use crate::activation::{
18 ActivationOutcome, ActivationTarget, RollbackOutcome, activate as run_pipeline,
19 rollback as run_rollback_pipeline,
20};
21
22use super::super::wire::ActivationIntent;
23use super::super::{AgentConfig, ReducerInput, ShutdownToken};
24
25pub fn spawn(
26 _cfg: AgentConfig,
27 _clock: ClockHandle,
28 input_tx: mpsc::Sender<ReducerInput>,
29 mut intent_rx: mpsc::Receiver<ActivationIntent>,
30 shutdown: ShutdownToken,
31) -> JoinHandle<()> {
32 tokio::spawn(async move {
33 let mut shutdown_rx = shutdown.into_inner();
34 loop {
35 tokio::select! {
36 biased;
37 _ = &mut shutdown_rx => {
38 tracing::info!(
39 target: "shutdown",
40 task = "agent_activation",
41 "task shut down",
42 );
43 return;
44 }
45 maybe = intent_rx.recv() => {
46 let Some(intent) = maybe else { return };
47 handle_intent(&input_tx, intent).await;
48 }
49 }
50 }
51 })
52}
53
54async fn handle_intent(input_tx: &mpsc::Sender<ReducerInput>, intent: ActivationIntent) {
55 let started_at = Utc::now();
56 // LOADBEARING: Failed → Activating is not a legal state-machine
57 // transition; rollback drives Failed → Reverted directly via
58 // `LocalRollbackCompleted` (RFC-0005 §3 / `failed.rs`). The forward
59 // path emits `LocalActivationStarted` to stamp
60 // `activation_started_at` (RFC-0005 §4.2); the rollback path skips
61 // it — emitting an Activation* event from Failed state causes the
62 // reducer to silently reject both LocalActivationStarted and
63 // LocalActivationCompleted, blocking Failed → Reverted bookkeeping
64 // and the quarantine populate.
65 if !intent.rollback {
66 let switch_method = "switch-to-configuration".to_string();
67 if let Err(err) = input_tx
68 .send(ReducerInput::HostEvent {
69 rollout_id: intent.rollout_id.clone(),
70 event: Event::LocalActivationStarted {
71 started_at,
72 switch_method,
73 seq: 0,
74 },
75 })
76 .await
77 {
78 tracing::warn!(
79 target: "agent_activation",
80 error = %err,
81 "reducer input channel closed during ActivationStarted",
82 );
83 return;
84 }
85 }
86
87 let completed_event = if activation_test_mode_enabled() {
88 // Test-mode short-circuit: no real subprocess, no /run/current-system
89 // poll, no nix-store / nix-env invocation. Smoke + integration tests
90 // depend on this to exercise the runtime integration end-to-end
91 // through the durable queue without requiring a NixOS host.
92 // Production code paths MUST NEVER set
93 // `NIXFLEET_AGENT_ACTIVATION_TEST_MODE`; the gate is checked on
94 // every intent so a misconfigured test environment fails closed
95 // (no activation) rather than open (real subprocess).
96 let now = Utc::now();
97 tracing::info!(
98 target: "agent_activation",
99 target_closure = %intent.target_closure,
100 rollback = intent.rollback,
101 "activation: test-mode gate fired; skipping pipeline",
102 );
103 if intent.rollback {
104 // Rollback path's test-mode synthesis: emit LocalRollbackCompleted
105 // with the intent's target as the reverted-to closure (in test
106 // mode there's no real /run/current-system to read).
107 Event::LocalRollbackCompleted {
108 reverted_to_closure: intent.target_closure.clone(),
109 exit_code: 0,
110 completed_at: now,
111 seq: 0,
112 }
113 } else {
114 Event::LocalActivationCompleted {
115 observed_current_closure: intent.target_closure.clone(),
116 exit_code: 0,
117 completed_at: now,
118 seq: 0,
119 }
120 }
121 } else if intent.rollback {
122 let now = Utc::now();
123 match run_rollback_pipeline().await {
124 Ok(RollbackOutcome::FiredAndPolled {
125 reverted_to_closure,
126 }) => {
127 tracing::info!(
128 target: "agent_activation",
129 rollout_id = %intent.rollout_id,
130 %reverted_to_closure,
131 "activation: rollback pipeline completed; firing LocalRollbackCompleted",
132 );
133 // LOADBEARING: only `LocalRollbackCompleted` (not
134 // `LocalActivationCompleted`) is legal from Failed
135 // state per state-machine `failed.rs`. The
136 // reverted_to_closure is the post-rollback
137 // `/run/current-system` basename observed by
138 // `verify_poll` in
139 // `activation::rollback::rollback_with`.
140 Event::LocalRollbackCompleted {
141 reverted_to_closure,
142 exit_code: 0,
143 completed_at: now,
144 seq: 0,
145 }
146 }
147 Ok(RollbackOutcome::Failed { phase, exit_code }) => Event::LocalActivationFailed {
148 exit_code: exit_code.unwrap_or(-1),
149 stderr_tail: format!("rollback failed at phase {phase}"),
150 failed_at: now,
151 seq: 0,
152 },
153 Err(err) => Event::LocalActivationFailed {
154 exit_code: -1,
155 stderr_tail: format!("rollback pipeline error: {err}"),
156 failed_at: now,
157 seq: 0,
158 },
159 }
160 } else {
161 let target = ActivationTarget {
162 closure_hash: intent.target_closure.clone(),
163 channel_ref: intent.rollout_id.channel_ref().to_string(),
164 };
165 let now = Utc::now();
166 match run_pipeline(&target).await {
167 Ok(ActivationOutcome::FiredAndPolled) => {
168 tracing::info!(
169 target: "agent_activation",
170 rollout_id = %intent.rollout_id,
171 target_closure = %intent.target_closure,
172 "activation: pipeline completed (FiredAndPolled)",
173 );
174 Event::LocalActivationCompleted {
175 // FiredAndPolled means verify_poll observed
176 // /run/current-system match the expected target — so
177 // observed == target by construction at this point.
178 observed_current_closure: intent.target_closure.clone(),
179 exit_code: 0,
180 completed_at: now,
181 seq: 0,
182 }
183 }
184 Ok(ActivationOutcome::DeferredPendingReboot { component }) => {
185 // Profile set; bootloader updated; live switch deferred
186 // because `component` (dbus/systemd/kernel/init) cannot
187 // be live-swapped per nixos-rebuild's own rules. New
188 // generation activates on next reboot.
189 //
190 // LIFT #2: emit `LocalActivationDeferred` (state stays
191 // Activating, no fake exit_code=0, no fake
192 // observed_current_closure). On next reboot the agent's
193 // boot-recovery handshake observes current == target
194 // and CP's handle_heartbeat (LIFT #1) synthesizes the
195 // RemoteActivationCompleted that advances the cascade.
196 //
197 // Pre-LIFT-#2 this emitted LocalActivationCompleted with
198 // `exit_code: 0` (a lie — activation didn't take live)
199 // and `observed_current_closure: <pre-switch closure or
200 // fallback to target>` (also a lie — the observation
201 // pointed at the wrong thing). The state machine then
202 // transitioned Activating → Soaking with current_closure
203 // ≠ target_closure, producing a soft-converged state
204 // that misled operators about the reboot requirement.
205 tracing::warn!(
206 target: "agent_activation",
207 rollout_id = %intent.rollout_id,
208 target_closure = %intent.target_closure,
209 component = %component,
210 "activation: deferred pending reboot (critical component swap); bootloader updated, host stays at Activating until reboot triggers LIFT #1's retroactive confirmation",
211 );
212 Event::LocalActivationDeferred {
213 component,
214 deferred_at: now,
215 seq: 0,
216 }
217 }
218 Ok(ActivationOutcome::SignatureMismatch {
219 closure_hash,
220 stderr_tail,
221 }) => {
222 tracing::error!(
223 target: "agent_activation",
224 rollout_id = %intent.rollout_id,
225 %closure_hash,
226 "activation: signature mismatch — closure refused by substituter trust",
227 );
228 Event::LocalActivationFailed {
229 // Distinct exit_code -2 lets dashboards route trust
230 // violations separately from generic switch failures.
231 exit_code: -2,
232 stderr_tail: format!("signature mismatch on {closure_hash}: {stderr_tail}"),
233 failed_at: now,
234 seq: 0,
235 }
236 }
237 Ok(ActivationOutcome::RealiseFailed { reason }) => Event::LocalActivationFailed {
238 exit_code: -1,
239 stderr_tail: format!("realise failed: {reason}"),
240 failed_at: now,
241 seq: 0,
242 },
243 Ok(ActivationOutcome::SwitchFailed { phase, exit_code }) => {
244 Event::LocalActivationFailed {
245 exit_code: exit_code.unwrap_or(-1),
246 stderr_tail: format!("switch failed at phase {phase}"),
247 failed_at: now,
248 seq: 0,
249 }
250 }
251 Ok(ActivationOutcome::VerifyMismatch { expected, actual }) => {
252 tracing::error!(
253 target: "agent_activation",
254 rollout_id = %intent.rollout_id,
255 %expected,
256 %actual,
257 "activation: /run/current-system flipped to unexpected basename",
258 );
259 Event::LocalActivationFailed {
260 // Distinct exit_code -3 surfaces verify-mismatch as a
261 // rollback-required class (third closure observed).
262 exit_code: -3,
263 stderr_tail: format!("verify mismatch: expected={expected} actual={actual}",),
264 failed_at: now,
265 seq: 0,
266 }
267 }
268 Err(err) => Event::LocalActivationFailed {
269 exit_code: -1,
270 stderr_tail: format!("activation pipeline error: {err}"),
271 failed_at: now,
272 seq: 0,
273 },
274 }
275 };
276
277 if let Err(err) = input_tx
278 .send(ReducerInput::HostEvent {
279 rollout_id: intent.rollout_id,
280 event: completed_event,
281 })
282 .await
283 {
284 tracing::warn!(
285 target: "agent_activation",
286 error = %err,
287 "reducer input channel closed during ActivationCompleted/Failed",
288 );
289 }
290}
291
292/// Test-mode gate. When the env var `NIXFLEET_AGENT_ACTIVATION_TEST_MODE`
293/// is set to ANY value, `handle_intent` short-circuits the activation
294/// pipeline and emits `LocalActivationCompleted` with the intent's
295/// target as the observed value. Smoke tests
296/// (`tests/runtime_smoke.rs`) MUST set this — they exercise the runtime
297/// integration end-to-end through the durable queue, not the actual
298/// activation subprocess. Production code paths must NEVER set it; the
299/// gate is read on every intent so a test setup mistake fails closed
300/// (no activation) rather than open (real subprocess).
301fn activation_test_mode_enabled() -> bool {
302 std::env::var_os("NIXFLEET_AGENT_ACTIVATION_TEST_MODE").is_some()
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 /// Coverage of the test-mode gate at the worker layer. The activation
310 /// pipeline's unit tests live in `crate::activation::*::tests`;
311 /// integration tests in `tests/runtime_smoke.rs` exercise the
312 /// short-circuited path end-to-end through the reducer + outbound
313 /// queue. This minimal worker-layer test pins the env-var contract.
314 #[test]
315 fn test_mode_gate_responds_to_env_var() {
316 let key = "NIXFLEET_AGENT_ACTIVATION_TEST_MODE";
317 // SAFETY: env mutation is process-global; this test reads its
318 // current value and restores it. Test framework runs tests in
319 // separate threads but env-var read/write is unsafe under
320 // concurrent mutation. We're not racing against a writer here —
321 // this is the only test that touches this var — but the
322 // remove + set + restore sequence is wrapped in `unsafe`
323 // explicitly per std::env::set_var's safety docs.
324 let prior = std::env::var_os(key);
325 unsafe {
326 std::env::remove_var(key);
327 }
328 assert!(!activation_test_mode_enabled());
329 unsafe {
330 std::env::set_var(key, "1");
331 }
332 assert!(activation_test_mode_enabled());
333 // Restore prior state so other tests in the same process don't see
334 // a leaked env var.
335 unsafe {
336 match prior {
337 Some(v) => std::env::set_var(key, v),
338 None => std::env::remove_var(key),
339 }
340 }
341 }
342
343 /// Regression guard for the rollback path's event-type contract.
344 /// The state-machine handler at `failed.rs:29-62` legalises only
345 /// `LocalRollbackCompleted` and `RemoteRollbackComplete` from
346 /// Failed state; emitting `LocalActivationStarted` or
347 /// `LocalActivationCompleted` instead would be rejected by the
348 /// reducer as "illegal transition", silently blocking Failed →
349 /// Reverted and the quarantine populate.
350 ///
351 /// The rollback path:
352 /// (a) skips `LocalActivationStarted` (rollback doesn't start
353 /// activation)
354 /// (b) emits `LocalRollbackCompleted { reverted_to_closure, ... }`
355 /// on success — handler sets state.state = Reverted, populates
356 /// `reverted_at` + `reverted_to`, and emits
357 /// `OutboundAgentEvent::RollbackComplete` for CP propagation
358 /// (driving CP's `Effect::RemoteInsertQuarantine` on the bad
359 /// SHA).
360 ///
361 /// This test pins the worker's test-mode short-circuit path for
362 /// rollback intents. The production pipeline path is the same code
363 /// branch with real `run_rollback_pipeline()` instead of the test
364 /// gate.
365 #[tokio::test]
366 async fn rollback_intent_emits_local_rollback_completed_not_activation_completed() {
367 use crate::runtime::wire::ActivationIntent;
368 use nixfleet_proto::RolloutId;
369 use nixfleet_state_machine::Event;
370 use tokio::sync::mpsc;
371
372 let key = "NIXFLEET_AGENT_ACTIVATION_TEST_MODE";
373 let prior = std::env::var_os(key);
374 // SAFETY: see test_mode_gate_responds_to_env_var.
375 unsafe {
376 std::env::set_var(key, "1");
377 }
378
379 let (tx, mut rx) = mpsc::channel::<ReducerInput>(8);
380 let intent = ActivationIntent {
381 rollout_id: RolloutId::new("edge", "abc1234deadbeef"),
382 target_closure: "rollback-target-closure".to_string(),
383 rollback: true,
384 };
385
386 handle_intent(&tx, intent).await;
387
388 let mut events = Vec::new();
389 while let Ok(ev) = rx.try_recv() {
390 events.push(ev);
391 }
392
393 // Pre-fix this would have emitted [LocalActivationStarted,
394 // LocalActivationCompleted]. Post-fix it must emit exactly
395 // [LocalRollbackCompleted] — no Activation* events.
396 assert_eq!(
397 events.len(),
398 1,
399 "rollback path must emit exactly one event (LocalRollbackCompleted); got {} events",
400 events.len(),
401 );
402 let ReducerInput::HostEvent { event, .. } = events.into_iter().next().unwrap() else {
403 panic!("expected HostEvent variant");
404 };
405 match event {
406 Event::LocalRollbackCompleted {
407 reverted_to_closure,
408 ..
409 } => {
410 assert_eq!(
411 reverted_to_closure, "rollback-target-closure",
412 "reverted_to_closure should carry the post-rollback closure (in test-mode short-circuit, the intent's target is used)",
413 );
414 }
415 other => {
416 panic!("rollback path must emit LocalRollbackCompleted; got: {other:?}")
417 }
418 }
419
420 // Restore env state.
421 unsafe {
422 match prior {
423 Some(v) => std::env::set_var(key, v),
424 None => std::env::remove_var(key),
425 }
426 }
427 }
428}