nixfleet_agent/runtime/workers/
probe.rs

1//! Probe worker (RFC-0007 §8). On each `LocalResetProbeCache`:
2//!
3//! 1. Aborts every per-probe ticker spawned for the previous rollout
4//!    (the `JoinHandle::abort` invariant: no probe ticker from rollout
5//!    N runs after `LocalResetProbeCache { rollout_id: N }` is observed).
6//! 2. Reads `/etc/nixfleet/agent/health-checks.json` (rendered by the
7//!    host's NixOS module from the mkFleet-resolved effective set —
8//!    closure-driven, transitively signed via the closure hash chain
9//!    per RFC-0007 §4). Path is hardcoded; no `--health-checks-config`
10//!    flag.
11//! 3. Emits one `LocalProbeTopologyDeclared` event into the reducer
12//!    input MPSC, then spawns one ticker per probe with
13//!    `mode != "disabled"`.
14//!
15//! Each ticker fires every `intervalSeconds` (or once, if
16//! `runOnce = true`). On each tick it dispatches to the kind-specific
17//! `probe_runners::run` and emits `LocalProbeResult`. The ticker holds
18//! its own first-observation + first-failure flags to drive the
19//! `LocalProbeObservedFirst` / `LocalProbeFailureFirst` events RFC-0005
20//! §3.2 requires.
21//!
22//! Test-mode escape hatch: `NIXFLEET_AGENT_PROBE_TEST_MODE=1` skips
23//! reading the JSON file and the spawn loop — the smoke test sets it
24//! so the runtime can boot without a /etc/ file or actual probes.
25
26use std::collections::HashMap;
27use std::path::PathBuf;
28
29use nixfleet_proto::clock::ClockHandle;
30use nixfleet_state_machine::{Event, ProbeMode, ProbeStatus, ProbeTopologyEntry};
31use tokio::sync::mpsc;
32use tokio::task::JoinHandle;
33
34use super::super::wire::ProbeResetCommand;
35use super::super::{AgentConfig, ReducerInput, ShutdownToken};
36use super::probe_runners::{self, ProbeDecl};
37
38const HEALTH_CHECKS_PATH: &str = "/etc/nixfleet/agent/health-checks.json";
39
40pub fn spawn(
41    _cfg: AgentConfig,
42    clock: ClockHandle,
43    input_tx: mpsc::Sender<ReducerInput>,
44    mut reset_rx: mpsc::Receiver<ProbeResetCommand>,
45    shutdown: ShutdownToken,
46) -> JoinHandle<()> {
47    tokio::spawn(async move {
48        let mut shutdown_rx = shutdown.into_inner();
49        // Per-probe ticker handles for the CURRENT rollout. On reset we
50        // .abort() all of them — RFC-0007 §6 invariant.
51        let mut tickers: HashMap<String, JoinHandle<()>> = HashMap::new();
52        loop {
53            tokio::select! {
54                biased;
55                _ = &mut shutdown_rx => {
56                    abort_all(&mut tickers);
57                    tracing::info!(
58                        target: "shutdown",
59                        task = "agent_probe",
60                        "task shut down",
61                    );
62                    return;
63                }
64                maybe = reset_rx.recv() => {
65                    let Some(cmd) = maybe else {
66                        abort_all(&mut tickers);
67                        return;
68                    };
69                    handle_reset(
70                        cmd,
71                        &mut tickers,
72                        &input_tx,
73                        &clock,
74                    ).await;
75                }
76            }
77        }
78    })
79}
80
81fn abort_all(tickers: &mut HashMap<String, JoinHandle<()>>) {
82    for (_, h) in tickers.drain() {
83        h.abort();
84    }
85}
86
87async fn handle_reset(
88    cmd: ProbeResetCommand,
89    tickers: &mut HashMap<String, JoinHandle<()>>,
90    input_tx: &mpsc::Sender<ReducerInput>,
91    clock: &ClockHandle,
92) {
93    abort_all(tickers);
94    let rollout_id = cmd.rollout_id;
95    tracing::info!(
96        target: "agent_probe",
97        %rollout_id,
98        "probe cache reset; reloading declarations",
99    );
100
101    if std::env::var("NIXFLEET_AGENT_PROBE_TEST_MODE").is_ok() {
102        tracing::info!(
103            target: "agent_probe",
104            "test mode: skipping probe declaration read + ticker spawn",
105        );
106        return;
107    }
108
109    let path = PathBuf::from(HEALTH_CHECKS_PATH);
110    let raw = match tokio::fs::read_to_string(&path).await {
111        Ok(s) => s,
112        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
113            tracing::info!(
114                target: "agent_probe",
115                path = %path.display(),
116                "no probe declarations file present; running with empty probe set",
117            );
118            return;
119        }
120        Err(err) => {
121            tracing::warn!(
122                target: "agent_probe",
123                path = %path.display(),
124                error = %err,
125                "failed to read probe declarations; running with empty probe set",
126            );
127            return;
128        }
129    };
130    let decls: HashMap<String, ProbeDecl> = match serde_json::from_str(&raw) {
131        Ok(d) => d,
132        Err(err) => {
133            tracing::warn!(
134                target: "agent_probe",
135                error = %err,
136                "failed to parse probe declarations; running with empty probe set",
137            );
138            return;
139        }
140    };
141
142    // Emit topology declaration BEFORE spawning tickers so the CP's
143    // event_log has the authoritative declared-probe set on record
144    // before any per-probe result lands (RFC-0007 §8).
145    let topology: Vec<ProbeTopologyEntry> = decls
146        .iter()
147        .map(|(name, d)| ProbeTopologyEntry {
148            probe_name: name.clone(),
149            kind: d.kind.clone(),
150            mode: parse_mode(&d.mode),
151        })
152        .collect();
153    if input_tx
154        .send(ReducerInput::HostEvent {
155            rollout_id: rollout_id.clone(),
156            event: Event::LocalProbeTopologyDeclared {
157                probes: topology,
158                declared_at: clock.now(),
159                seq: 0,
160            },
161        })
162        .await
163        .is_err()
164    {
165        tracing::warn!(
166            target: "agent_probe",
167            "reducer input channel closed; aborting reset",
168        );
169        return;
170    }
171
172    for (name, decl) in decls.into_iter() {
173        let mode = parse_mode(&decl.mode);
174        if matches!(mode, ProbeMode::Disabled) {
175            continue;
176        }
177        let handle = spawn_ticker(
178            name.clone(),
179            decl,
180            mode,
181            rollout_id.clone(),
182            input_tx.clone(),
183            clock.clone(),
184        );
185        tickers.insert(name, handle);
186    }
187}
188
189fn parse_mode(s: &str) -> ProbeMode {
190    match s {
191        "enforce" => ProbeMode::Enforce,
192        "observe" => ProbeMode::Observe,
193        "disabled" => ProbeMode::Disabled,
194        // Honest fail-closed: unknown mode treats the probe as gating
195        // (enforce). Operator typo is loud, not silent.
196        _ => ProbeMode::Enforce,
197    }
198}
199
200fn spawn_ticker(
201    name: String,
202    decl: ProbeDecl,
203    mode: ProbeMode,
204    rollout_id: nixfleet_proto::RolloutId,
205    input_tx: mpsc::Sender<ReducerInput>,
206    clock: ClockHandle,
207) -> JoinHandle<()> {
208    tokio::spawn(async move {
209        // LOADBEARING: floor probe interval at MIN_INTERVAL_SECS (5s)
210        // — guards against a misconfigured 0/1-second probe DOSing
211        // the host.
212        let interval = std::time::Duration::from_secs(
213            decl.interval_seconds
214                .max(super::probe_runners::MIN_INTERVAL_SECS),
215        );
216        let mut first_observed = false;
217        let mut last_status: Option<ProbeStatus> = None;
218        let mut ticker = tokio::time::interval(interval);
219        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
220        loop {
221            ticker.tick().await;
222            let outcome = probe_runners::run(&decl, clock.now()).await;
223            let status = outcome.status;
224            let observed_at = outcome.observed_at;
225
226            if !first_observed {
227                first_observed = true;
228                let _ = input_tx
229                    .send(ReducerInput::HostEvent {
230                        rollout_id: rollout_id.clone(),
231                        event: Event::LocalProbeObservedFirst {
232                            probe_name: name.clone(),
233                            mode,
234                            observed_at,
235                            seq: 0,
236                        },
237                    })
238                    .await;
239            }
240            let was_fail = matches!(last_status, Some(ProbeStatus::Fail));
241            let now_fail = matches!(status, ProbeStatus::Fail);
242            if now_fail && !was_fail {
243                let _ = input_tx
244                    .send(ReducerInput::HostEvent {
245                        rollout_id: rollout_id.clone(),
246                        event: Event::LocalProbeFailureFirst {
247                            probe_name: name.clone(),
248                            mode,
249                            first_failed_at: observed_at,
250                            seq: 0,
251                        },
252                    })
253                    .await;
254            }
255            last_status = Some(status);
256
257            // Per-tick ProbeResult. `sub_results` carries per-control
258            // accounting (effective_mode + override_reason) for
259            // evidence/custom-framework probes; the reducer threads
260            // it onto the OutboundAgentEvent so the audit trail
261            // ("why was control X downgraded?") survives in the
262            // signed event_log. `None` for non-evidence probes.
263            let _ = input_tx
264                .send(ReducerInput::HostEvent {
265                    rollout_id: rollout_id.clone(),
266                    event: Event::LocalProbeResult {
267                        probe_name: name.clone(),
268                        mode,
269                        status,
270                        observed_at,
271                        failure_reason: outcome.failure_reason,
272                        sub_results: outcome.sub_results,
273                        seq: 0,
274                    },
275                })
276                .await;
277
278            if decl.run_once {
279                return;
280            }
281        }
282    })
283}