nixfleet_agent/runtime/workers/
probe.rs1use std::collections::HashMap;
27use std::path::PathBuf;
28
29use nixfleet_proto::clock::ClockHandle;
30use nixfleet_state_machine::{Event, ProbeMode, ProbeStatus, ProbeTopologyEntry};
31use tokio::sync::mpsc;
32use tokio::task::JoinHandle;
33
34use super::super::wire::ProbeResetCommand;
35use super::super::{AgentConfig, ReducerInput, ShutdownToken};
36use super::probe_runners::{self, ProbeDecl};
37
38const HEALTH_CHECKS_PATH: &str = "/etc/nixfleet/agent/health-checks.json";
39
40pub fn spawn(
41 _cfg: AgentConfig,
42 clock: ClockHandle,
43 input_tx: mpsc::Sender<ReducerInput>,
44 mut reset_rx: mpsc::Receiver<ProbeResetCommand>,
45 shutdown: ShutdownToken,
46) -> JoinHandle<()> {
47 tokio::spawn(async move {
48 let mut shutdown_rx = shutdown.into_inner();
49 let mut tickers: HashMap<String, JoinHandle<()>> = HashMap::new();
52 loop {
53 tokio::select! {
54 biased;
55 _ = &mut shutdown_rx => {
56 abort_all(&mut tickers);
57 tracing::info!(
58 target: "shutdown",
59 task = "agent_probe",
60 "task shut down",
61 );
62 return;
63 }
64 maybe = reset_rx.recv() => {
65 let Some(cmd) = maybe else {
66 abort_all(&mut tickers);
67 return;
68 };
69 handle_reset(
70 cmd,
71 &mut tickers,
72 &input_tx,
73 &clock,
74 ).await;
75 }
76 }
77 }
78 })
79}
80
81fn abort_all(tickers: &mut HashMap<String, JoinHandle<()>>) {
82 for (_, h) in tickers.drain() {
83 h.abort();
84 }
85}
86
87async fn handle_reset(
88 cmd: ProbeResetCommand,
89 tickers: &mut HashMap<String, JoinHandle<()>>,
90 input_tx: &mpsc::Sender<ReducerInput>,
91 clock: &ClockHandle,
92) {
93 abort_all(tickers);
94 let rollout_id = cmd.rollout_id;
95 tracing::info!(
96 target: "agent_probe",
97 %rollout_id,
98 "probe cache reset; reloading declarations",
99 );
100
101 if std::env::var("NIXFLEET_AGENT_PROBE_TEST_MODE").is_ok() {
102 tracing::info!(
103 target: "agent_probe",
104 "test mode: skipping probe declaration read + ticker spawn",
105 );
106 return;
107 }
108
109 let path = PathBuf::from(HEALTH_CHECKS_PATH);
110 let raw = match tokio::fs::read_to_string(&path).await {
111 Ok(s) => s,
112 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
113 tracing::info!(
114 target: "agent_probe",
115 path = %path.display(),
116 "no probe declarations file present; running with empty probe set",
117 );
118 return;
119 }
120 Err(err) => {
121 tracing::warn!(
122 target: "agent_probe",
123 path = %path.display(),
124 error = %err,
125 "failed to read probe declarations; running with empty probe set",
126 );
127 return;
128 }
129 };
130 let decls: HashMap<String, ProbeDecl> = match serde_json::from_str(&raw) {
131 Ok(d) => d,
132 Err(err) => {
133 tracing::warn!(
134 target: "agent_probe",
135 error = %err,
136 "failed to parse probe declarations; running with empty probe set",
137 );
138 return;
139 }
140 };
141
142 let topology: Vec<ProbeTopologyEntry> = decls
146 .iter()
147 .map(|(name, d)| ProbeTopologyEntry {
148 probe_name: name.clone(),
149 kind: d.kind.clone(),
150 mode: parse_mode(&d.mode),
151 })
152 .collect();
153 if input_tx
154 .send(ReducerInput::HostEvent {
155 rollout_id: rollout_id.clone(),
156 event: Event::LocalProbeTopologyDeclared {
157 probes: topology,
158 declared_at: clock.now(),
159 seq: 0,
160 },
161 })
162 .await
163 .is_err()
164 {
165 tracing::warn!(
166 target: "agent_probe",
167 "reducer input channel closed; aborting reset",
168 );
169 return;
170 }
171
172 for (name, decl) in decls.into_iter() {
173 let mode = parse_mode(&decl.mode);
174 if matches!(mode, ProbeMode::Disabled) {
175 continue;
176 }
177 let handle = spawn_ticker(
178 name.clone(),
179 decl,
180 mode,
181 rollout_id.clone(),
182 input_tx.clone(),
183 clock.clone(),
184 );
185 tickers.insert(name, handle);
186 }
187}
188
189fn parse_mode(s: &str) -> ProbeMode {
190 match s {
191 "enforce" => ProbeMode::Enforce,
192 "observe" => ProbeMode::Observe,
193 "disabled" => ProbeMode::Disabled,
194 _ => ProbeMode::Enforce,
197 }
198}
199
200fn spawn_ticker(
201 name: String,
202 decl: ProbeDecl,
203 mode: ProbeMode,
204 rollout_id: nixfleet_proto::RolloutId,
205 input_tx: mpsc::Sender<ReducerInput>,
206 clock: ClockHandle,
207) -> JoinHandle<()> {
208 tokio::spawn(async move {
209 let interval = std::time::Duration::from_secs(
213 decl.interval_seconds
214 .max(super::probe_runners::MIN_INTERVAL_SECS),
215 );
216 let mut first_observed = false;
217 let mut last_status: Option<ProbeStatus> = None;
218 let mut ticker = tokio::time::interval(interval);
219 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
220 loop {
221 ticker.tick().await;
222 let outcome = probe_runners::run(&decl, clock.now()).await;
223 let status = outcome.status;
224 let observed_at = outcome.observed_at;
225
226 if !first_observed {
227 first_observed = true;
228 let _ = input_tx
229 .send(ReducerInput::HostEvent {
230 rollout_id: rollout_id.clone(),
231 event: Event::LocalProbeObservedFirst {
232 probe_name: name.clone(),
233 mode,
234 observed_at,
235 seq: 0,
236 },
237 })
238 .await;
239 }
240 let was_fail = matches!(last_status, Some(ProbeStatus::Fail));
241 let now_fail = matches!(status, ProbeStatus::Fail);
242 if now_fail && !was_fail {
243 let _ = input_tx
244 .send(ReducerInput::HostEvent {
245 rollout_id: rollout_id.clone(),
246 event: Event::LocalProbeFailureFirst {
247 probe_name: name.clone(),
248 mode,
249 first_failed_at: observed_at,
250 seq: 0,
251 },
252 })
253 .await;
254 }
255 last_status = Some(status);
256
257 let _ = input_tx
264 .send(ReducerInput::HostEvent {
265 rollout_id: rollout_id.clone(),
266 event: Event::LocalProbeResult {
267 probe_name: name.clone(),
268 mode,
269 status,
270 observed_at,
271 failure_reason: outcome.failure_reason,
272 sub_results: outcome.sub_results,
273 seq: 0,
274 },
275 })
276 .await;
277
278 if decl.run_once {
279 return;
280 }
281 }
282 })
283}