nixfleet_agent/runtime/applier.rs
1//! Imperative shell for the agent reducer (RFC-0006 §7.1).
2//!
3//! Effect dispatch — the applier itself contains NO platform-specific
4//! code. `LocalFireSwitch` / `LocalFireRollbackTo` send an
5//! [`ActivationIntent`] to the activation worker; the worker holds the
6//! per-OS branches (Linux: `systemd-run --unit=nixfleet-switch`;
7//! Darwin: `setsid()`-detached launchd). Same shape for
8//! `LocalResetProbeCache` (probe-worker channel).
9//!
10//! Keeping the platform branches off the applier preserves the
11//! architectural symmetry with the CP-side applier (which has no
12//! platform code either) and means the reducer task is portable across
13//! Linux + Darwin (and any future target) without changes.
14//!
15//! `Remote*` variants are CP-only; reaching the agent applier means a
16//! reducer state-machine defect. Log + drop, do not panic — a buggy
17//! peer must not crash the agent.
18
19use nixfleet_state_machine::{Effect, LogLevel};
20
21use super::ApplierCtx;
22use super::outbound_queue::{QueuedEvent, outbound_event_kind, outbound_event_seq};
23use super::wire::{ActivationIntent, ProbeResetCommand};
24
25/// Execute one `Effect` emitted by `nixfleet_state_machine::step`.
26pub async fn apply_effect(ctx: &ApplierCtx<'_>, effect: Effect) {
27 match effect {
28 Effect::LocalFireSwitch { rollout_id, target } => {
29 send_intent(
30 ctx,
31 ActivationIntent {
32 rollout_id,
33 target_closure: target,
34 rollback: false,
35 },
36 )
37 .await;
38 }
39 Effect::LocalFireRollbackTo {
40 rollout_id,
41 closure,
42 } => {
43 send_intent(
44 ctx,
45 ActivationIntent {
46 rollout_id,
47 target_closure: closure,
48 rollback: true,
49 },
50 )
51 .await;
52 }
53 Effect::LocalResetProbeCache { rollout_id } => {
54 if let Err(err) = ctx
55 .probe_reset_tx
56 .send(ProbeResetCommand { rollout_id })
57 .await
58 {
59 tracing::warn!(
60 target: "agent_applier",
61 error = %err,
62 "probe-reset channel closed; reset signal lost",
63 );
64 }
65 }
66 Effect::LocalEmitEvent {
67 rollout_id,
68 payload,
69 durable,
70 } => {
71 // Persist to the disk-backed queue BEFORE returning so a
72 // crash between this point and the network POST is
73 // recoverable on restart (Plan 07 + RFC-0005 §9.7). The
74 // outbound worker drains the queue and POSTs; on success
75 // it deletes the file.
76 //
77 // `durable = false` events are rare (today only telemetry
78 // metrics like ReducerError would set it); for v0.2 we
79 // route everything through the queue + drainer to keep
80 // the on-disk audit trail complete. Phase 8 may add a
81 // fast-path for non-durable events if profiling shows the
82 // fsync cost matters.
83 let _ = durable;
84 let event = QueuedEvent {
85 seq: outbound_event_seq(&payload),
86 hostname: ctx.cfg.machine_id.clone(),
87 // `rollout_id` is carried on the effect directly (Phase 8a —
88 // closes the v0.2 enrichment stopgap by lifting the field
89 // onto every Local* variant per RFC-0006 §9's
90 // "effects carry everything the applier needs" principle).
91 rollout_id,
92 event_kind: outbound_event_kind(&payload).to_string(),
93 created_at: ctx.clock.now(),
94 payload: nixfleet_proto::AgentEvent::from(payload),
95 };
96 match ctx.outbound_queue.enqueue(&event) {
97 Ok(_path) => {
98 // Wake the drainer immediately rather than waiting
99 // for its periodic tick.
100 let _ = ctx.outbound_kick.send(());
101 tracing::debug!(
102 target: "agent_applier",
103 seq = event.seq,
104 kind = %event.event_kind,
105 "LocalEmitEvent enqueued",
106 );
107 }
108 Err(err) => {
109 // fsync / rename failure on the queue dir is
110 // operator-actionable (disk full, FS read-only).
111 // We log + drop; the agent stays alive but the
112 // event is lost. Phase 8 metric:
113 // `agent_outbound_queue_enqueue_failures_total`.
114 tracing::error!(
115 target: "agent_applier",
116 seq = event.seq,
117 kind = %event.event_kind,
118 error = %err,
119 "LocalEmitEvent enqueue failed; event LOST — investigate state-dir disk health",
120 );
121 }
122 }
123 }
124
125 Effect::RecordTransition {
126 host,
127 rollout_id,
128 from,
129 to,
130 at,
131 } => {
132 tracing::info!(
133 target: "agent_applier",
134 %host,
135 %rollout_id,
136 ?from,
137 ?to,
138 %at,
139 "RecordTransition",
140 );
141 }
142 Effect::EmitMetric {
143 name,
144 labels,
145 value,
146 } => {
147 tracing::debug!(
148 target: "agent_applier",
149 metric = %name,
150 ?labels,
151 value,
152 "EmitMetric",
153 );
154 }
155 Effect::EmitLog {
156 level,
157 target,
158 message,
159 fields,
160 } => match level {
161 LogLevel::Trace => tracing::trace!(
162 target: "agent_runtime_emitted",
163 emitted_target = target,
164 ?fields,
165 "{message}",
166 ),
167 LogLevel::Debug => tracing::debug!(
168 target: "agent_runtime_emitted",
169 emitted_target = target,
170 ?fields,
171 "{message}",
172 ),
173 LogLevel::Info => tracing::info!(
174 target: "agent_runtime_emitted",
175 emitted_target = target,
176 ?fields,
177 "{message}",
178 ),
179 LogLevel::Warn => tracing::warn!(
180 target: "agent_runtime_emitted",
181 emitted_target = target,
182 ?fields,
183 "{message}",
184 ),
185 LogLevel::Error => tracing::error!(
186 target: "agent_runtime_emitted",
187 emitted_target = target,
188 ?fields,
189 "{message}",
190 ),
191 },
192
193 Effect::RemoteQueueDispatch { .. }
194 | Effect::RemoteInsertQuarantine { .. }
195 | Effect::RemoteOpenRolloutRecord { .. }
196 | Effect::RemoteAppendEventLog { .. } => {
197 tracing::error!(
198 target: "agent_applier",
199 effect = ?remote_kind(&effect),
200 "apply_effect: CP-only Remote* effect reached the agent applier — \
201 reducer state-machine defect. Dropping.",
202 );
203 }
204 }
205}
206
207async fn send_intent(ctx: &ApplierCtx<'_>, intent: ActivationIntent) {
208 if let Err(err) = ctx.activation_tx.send(intent).await {
209 tracing::error!(
210 target: "agent_applier",
211 error = %err,
212 "activation-intent channel closed; switch/rollback intent lost",
213 );
214 }
215}
216
217fn remote_kind(e: &Effect) -> &'static str {
218 match e {
219 Effect::RemoteQueueDispatch { .. } => "RemoteQueueDispatch",
220 Effect::RemoteInsertQuarantine { .. } => "RemoteInsertQuarantine",
221 Effect::RemoteOpenRolloutRecord { .. } => "RemoteOpenRolloutRecord",
222 Effect::RemoteAppendEventLog { .. } => "RemoteAppendEventLog",
223 _ => "<not-remote>",
224 }
225}