nixfleet_control_plane/polling/poller.rs
1//! Shared spawn/tick/log scaffolding for signed-artifact poll tasks.
2
3use std::time::Duration;
4
5use anyhow::Result;
6use tokio_util::sync::CancellationToken;
7
8use super::signed_fetch;
9
10pub struct SignedArtifactPoller {
11 pub interval: Duration,
12 pub label: &'static str,
13}
14
15impl SignedArtifactPoller {
16 /// Closure must not mutate shared state on its error path; poller logs a warn and retries.
17 pub fn spawn<F, Fut>(self, cancel: CancellationToken, tick: F) -> tokio::task::JoinHandle<()>
18 where
19 F: FnMut(reqwest::Client) -> Fut + Send + 'static,
20 Fut: std::future::Future<Output = Result<()>> + Send,
21 {
22 self.spawn_with_kick(cancel, None, tick)
23 }
24
25 /// Wakes on cadence OR the external `kick` (channel-refs uses this for
26 /// reconciler-side `ConvergeRollout`/`SoakHost` triggers; cadence is the
27 /// safety net). `watch::Receiver` semantics -> kick bursts coalesce to one wake.
28 pub fn spawn_with_kick<F, Fut>(
29 self,
30 cancel: CancellationToken,
31 kick: Option<tokio::sync::watch::Receiver<()>>,
32 mut tick: F,
33 ) -> tokio::task::JoinHandle<()>
34 where
35 F: FnMut(reqwest::Client) -> Fut + Send + 'static,
36 Fut: std::future::Future<Output = Result<()>> + Send,
37 {
38 tokio::spawn(async move {
39 let client = signed_fetch::build_client();
40
41 let mut ticker = tokio::time::interval(self.interval);
42 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
43
44 // `changed()` borrows mutably; keep the receiver in an Option
45 // so the "no kick configured" branch can park forever via
46 // `pending()` without holding a permanent borrow.
47 let mut kick = kick;
48
49 loop {
50 let kicked = tokio::select! {
51 _ = cancel.cancelled() => {
52 tracing::info!(
53 target: "shutdown",
54 label = self.label,
55 "poll task shut down",
56 );
57 return;
58 }
59 _ = ticker.tick() => false,
60 res = async {
61 match kick.as_mut() {
62 Some(rx) => rx.changed().await,
63 // No kick channel configured: park forever
64 // so the select arm never fires.
65 None => std::future::pending().await,
66 }
67 } => {
68 // Sender dropped -> fall back to cadence-only;
69 // log once so it's visible if this happens.
70 if res.is_err() {
71 tracing::warn!(
72 target: "polling",
73 label = self.label,
74 "kick channel closed; running on cadence only",
75 );
76 kick = None;
77 continue;
78 }
79 true
80 }
81 };
82 if let Err(err) = tick(client.clone()).await {
83 tracing::warn!(
84 target: "polling",
85 label = self.label,
86 kicked,
87 error = %err,
88 "poll failed; retaining previous state",
89 );
90 } else if kicked {
91 tracing::debug!(
92 target: "polling",
93 label = self.label,
94 "poll fired on kick (event-driven)",
95 );
96 }
97 }
98 })
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use std::sync::Arc;
106 use std::sync::atomic::{AtomicUsize, Ordering};
107 use std::time::Duration;
108 use tokio::sync::watch;
109 use tokio_util::sync::CancellationToken;
110
111 /// **Regression guard for event-driven polling**: a `kick` MUST
112 /// fire the poll closure within milliseconds, well below the
113 /// cadence interval. If the select arm regresses (e.g., the kick
114 /// path gets removed, the wakeup is mis-wired) this test fails
115 /// because tick_count stays at 0/1 instead of rising on each
116 /// kick.
117 ///
118 /// The kick is what closes the channelEdges -> rollouts-table
119 /// timing gap structurally: when a predecessor goes terminal,
120 /// the reconciler kicks and the new successor rollout gets
121 /// recorded the same tick. Without this wakeup the gap reopens
122 /// on every cadence-period (60 s), and first checkins on a
123 /// freshly-released channel can slip past gates.
124 #[tokio::test(start_paused = false)]
125 async fn kick_fires_poll_well_before_cadence() {
126 let cancel = CancellationToken::new();
127 let (kick_tx, kick_rx) = watch::channel::<()>(());
128 let counter = Arc::new(AtomicUsize::new(0));
129
130 // Long cadence so any wake within the test window MUST be
131 // from the kick path, not the timer.
132 let poller = SignedArtifactPoller {
133 interval: Duration::from_secs(3600),
134 label: "test-kick",
135 };
136 let counter_for_tick = Arc::clone(&counter);
137 let _handle = poller.spawn_with_kick(cancel.clone(), Some(kick_rx), move |_client| {
138 let counter = Arc::clone(&counter_for_tick);
139 async move {
140 counter.fetch_add(1, Ordering::SeqCst);
141 Ok(())
142 }
143 });
144
145 // Fire 3 kicks back-to-back. Watch channel collapses bursts
146 // to a single wake, so the count rises by at least 1 per
147 // distinguishable kick (typically all 3 if interleaved by
148 // sleeps - but we assert ≥1 to keep the test robust).
149 for _ in 0..3 {
150 kick_tx.send(()).unwrap();
151 tokio::time::sleep(Duration::from_millis(50)).await;
152 }
153
154 // Allow the loop to drain; if the kick path is broken the
155 // counter stays at 0 (cadence is 1h, won't fire in this
156 // window).
157 tokio::time::sleep(Duration::from_millis(100)).await;
158 let count = counter.load(Ordering::SeqCst);
159 assert!(
160 count >= 1,
161 "kick must wake the poll within milliseconds; got {count} ticks (cadence is 1h)",
162 );
163
164 cancel.cancel();
165 }
166
167 /// Without a kick channel, the poller falls back to pure cadence.
168 /// Verify the no-kick path still runs the closure on the timer,
169 /// AND that an immediate-fire happens at startup (interval's
170 /// first tick semantic). Pinned so the spawn-with-kick path
171 /// can't accidentally swallow cadence ticks.
172 #[tokio::test(start_paused = false)]
173 async fn cadence_only_fires_without_kick() {
174 let cancel = CancellationToken::new();
175 let counter = Arc::new(AtomicUsize::new(0));
176
177 let poller = SignedArtifactPoller {
178 interval: Duration::from_millis(50),
179 label: "test-cadence",
180 };
181 let counter_for_tick = Arc::clone(&counter);
182 let _handle = poller.spawn(cancel.clone(), move |_client| {
183 let counter = Arc::clone(&counter_for_tick);
184 async move {
185 counter.fetch_add(1, Ordering::SeqCst);
186 Ok(())
187 }
188 });
189
190 // 250ms / 50ms cadence = ~5 ticks. Allow some slack for
191 // scheduler jitter; assert we got at least 2 ticks.
192 tokio::time::sleep(Duration::from_millis(250)).await;
193 let count = counter.load(Ordering::SeqCst);
194 assert!(
195 count >= 2,
196 "cadence-only must fire on the timer; got {count} ticks in 250ms with 50ms cadence",
197 );
198
199 cancel.cancel();
200 }
201
202 /// Belt-and-suspenders: when both kick and cadence are in play,
203 /// dropping the kick sender doesn't crash the poller - it logs
204 /// a warning and continues on cadence-only. Ensures a panicking
205 /// reconciler can't seize the polling loop.
206 #[tokio::test(start_paused = false)]
207 async fn dropped_kick_sender_falls_back_to_cadence() {
208 let cancel = CancellationToken::new();
209 let (kick_tx, kick_rx) = watch::channel::<()>(());
210 let counter = Arc::new(AtomicUsize::new(0));
211
212 let poller = SignedArtifactPoller {
213 interval: Duration::from_millis(50),
214 label: "test-drop",
215 };
216 let counter_for_tick = Arc::clone(&counter);
217 let _handle = poller.spawn_with_kick(cancel.clone(), Some(kick_rx), move |_client| {
218 let counter = Arc::clone(&counter_for_tick);
219 async move {
220 counter.fetch_add(1, Ordering::SeqCst);
221 Ok(())
222 }
223 });
224
225 // Drop the sender - the receiver's `changed()` returns an
226 // error on the next loop iteration. The poller should log
227 // and switch to cadence-only.
228 drop(kick_tx);
229
230 tokio::time::sleep(Duration::from_millis(250)).await;
231 let count = counter.load(Ordering::SeqCst);
232 assert!(
233 count >= 2,
234 "dropped kick must fall back to cadence; got {count} ticks in 250ms",
235 );
236
237 cancel.cancel();
238 }
239}