nixfleet_agent/runtime/
applier.rs

1//! Imperative shell for the agent reducer (RFC-0006 §7.1).
2//!
3//! Effect dispatch — the applier itself contains NO platform-specific
4//! code. `LocalFireSwitch` / `LocalFireRollbackTo` send an
5//! [`ActivationIntent`] to the activation worker; the worker holds the
6//! per-OS branches (Linux: `systemd-run --unit=nixfleet-switch`;
7//! Darwin: `setsid()`-detached launchd). Same shape for
8//! `LocalResetProbeCache` (probe-worker channel).
9//!
10//! Keeping the platform branches off the applier preserves the
11//! architectural symmetry with the CP-side applier (which has no
12//! platform code either) and means the reducer task is portable across
13//! Linux + Darwin (and any future target) without changes.
14//!
15//! `Remote*` variants are CP-only; reaching the agent applier means a
16//! reducer state-machine defect. Log + drop, do not panic — a buggy
17//! peer must not crash the agent.
18
19use nixfleet_state_machine::{Effect, LogLevel};
20
21use super::ApplierCtx;
22use super::outbound_queue::{QueuedEvent, outbound_event_kind, outbound_event_seq};
23use super::wire::{ActivationIntent, ProbeResetCommand};
24
25/// Execute one `Effect` emitted by `nixfleet_state_machine::step`.
26pub async fn apply_effect(ctx: &ApplierCtx<'_>, effect: Effect) {
27    match effect {
28        Effect::LocalFireSwitch { rollout_id, target } => {
29            send_intent(
30                ctx,
31                ActivationIntent {
32                    rollout_id,
33                    target_closure: target,
34                    rollback: false,
35                },
36            )
37            .await;
38        }
39        Effect::LocalFireRollbackTo {
40            rollout_id,
41            closure,
42        } => {
43            send_intent(
44                ctx,
45                ActivationIntent {
46                    rollout_id,
47                    target_closure: closure,
48                    rollback: true,
49                },
50            )
51            .await;
52        }
53        Effect::LocalResetProbeCache { rollout_id } => {
54            if let Err(err) = ctx
55                .probe_reset_tx
56                .send(ProbeResetCommand { rollout_id })
57                .await
58            {
59                tracing::warn!(
60                    target: "agent_applier",
61                    error = %err,
62                    "probe-reset channel closed; reset signal lost",
63                );
64            }
65        }
66        Effect::LocalEmitEvent {
67            rollout_id,
68            payload,
69            durable,
70        } => {
71            // Persist to the disk-backed queue BEFORE returning so a
72            // crash between this point and the network POST is
73            // recoverable on restart (Plan 07 + RFC-0005 §9.7). The
74            // outbound worker drains the queue and POSTs; on success
75            // it deletes the file.
76            //
77            // `durable = false` events are rare (today only telemetry
78            // metrics like ReducerError would set it); for v0.2 we
79            // route everything through the queue + drainer to keep
80            // the on-disk audit trail complete. Phase 8 may add a
81            // fast-path for non-durable events if profiling shows the
82            // fsync cost matters.
83            let _ = durable;
84            let event = QueuedEvent {
85                seq: outbound_event_seq(&payload),
86                hostname: ctx.cfg.machine_id.clone(),
87                // `rollout_id` is carried on the effect directly (Phase 8a —
88                // closes the v0.2 enrichment stopgap by lifting the field
89                // onto every Local* variant per RFC-0006 §9's
90                // "effects carry everything the applier needs" principle).
91                rollout_id,
92                event_kind: outbound_event_kind(&payload).to_string(),
93                created_at: ctx.clock.now(),
94                payload: nixfleet_proto::AgentEvent::from(payload),
95            };
96            match ctx.outbound_queue.enqueue(&event) {
97                Ok(_path) => {
98                    // Wake the drainer immediately rather than waiting
99                    // for its periodic tick.
100                    let _ = ctx.outbound_kick.send(());
101                    tracing::debug!(
102                        target: "agent_applier",
103                        seq = event.seq,
104                        kind = %event.event_kind,
105                        "LocalEmitEvent enqueued",
106                    );
107                }
108                Err(err) => {
109                    // fsync / rename failure on the queue dir is
110                    // operator-actionable (disk full, FS read-only).
111                    // We log + drop; the agent stays alive but the
112                    // event is lost. Phase 8 metric:
113                    // `agent_outbound_queue_enqueue_failures_total`.
114                    tracing::error!(
115                        target: "agent_applier",
116                        seq = event.seq,
117                        kind = %event.event_kind,
118                        error = %err,
119                        "LocalEmitEvent enqueue failed; event LOST — investigate state-dir disk health",
120                    );
121                }
122            }
123        }
124
125        Effect::RecordTransition {
126            host,
127            rollout_id,
128            from,
129            to,
130            at,
131        } => {
132            tracing::info!(
133                target: "agent_applier",
134                %host,
135                %rollout_id,
136                ?from,
137                ?to,
138                %at,
139                "RecordTransition",
140            );
141        }
142        Effect::EmitMetric {
143            name,
144            labels,
145            value,
146        } => {
147            tracing::debug!(
148                target: "agent_applier",
149                metric = %name,
150                ?labels,
151                value,
152                "EmitMetric",
153            );
154        }
155        Effect::EmitLog {
156            level,
157            target,
158            message,
159            fields,
160        } => match level {
161            LogLevel::Trace => tracing::trace!(
162                target: "agent_runtime_emitted",
163                emitted_target = target,
164                ?fields,
165                "{message}",
166            ),
167            LogLevel::Debug => tracing::debug!(
168                target: "agent_runtime_emitted",
169                emitted_target = target,
170                ?fields,
171                "{message}",
172            ),
173            LogLevel::Info => tracing::info!(
174                target: "agent_runtime_emitted",
175                emitted_target = target,
176                ?fields,
177                "{message}",
178            ),
179            LogLevel::Warn => tracing::warn!(
180                target: "agent_runtime_emitted",
181                emitted_target = target,
182                ?fields,
183                "{message}",
184            ),
185            LogLevel::Error => tracing::error!(
186                target: "agent_runtime_emitted",
187                emitted_target = target,
188                ?fields,
189                "{message}",
190            ),
191        },
192
193        Effect::RemoteQueueDispatch { .. }
194        | Effect::RemoteInsertQuarantine { .. }
195        | Effect::RemoteOpenRolloutRecord { .. }
196        | Effect::RemoteAppendEventLog { .. } => {
197            tracing::error!(
198                target: "agent_applier",
199                effect = ?remote_kind(&effect),
200                "apply_effect: CP-only Remote* effect reached the agent applier — \
201                 reducer state-machine defect. Dropping.",
202            );
203        }
204    }
205}
206
207async fn send_intent(ctx: &ApplierCtx<'_>, intent: ActivationIntent) {
208    if let Err(err) = ctx.activation_tx.send(intent).await {
209        tracing::error!(
210            target: "agent_applier",
211            error = %err,
212            "activation-intent channel closed; switch/rollback intent lost",
213        );
214    }
215}
216
217fn remote_kind(e: &Effect) -> &'static str {
218    match e {
219        Effect::RemoteQueueDispatch { .. } => "RemoteQueueDispatch",
220        Effect::RemoteInsertQuarantine { .. } => "RemoteInsertQuarantine",
221        Effect::RemoteOpenRolloutRecord { .. } => "RemoteOpenRolloutRecord",
222        Effect::RemoteAppendEventLog { .. } => "RemoteAppendEventLog",
223        _ => "<not-remote>",
224    }
225}