nixfleet_control_plane/polling/
poller.rs

1//! Shared spawn/tick/log scaffolding for signed-artifact poll tasks.
2
3use std::time::Duration;
4
5use anyhow::Result;
6use tokio_util::sync::CancellationToken;
7
8use super::signed_fetch;
9
10pub struct SignedArtifactPoller {
11    pub interval: Duration,
12    pub label: &'static str,
13}
14
15impl SignedArtifactPoller {
16    /// Closure must not mutate shared state on its error path; poller logs a warn and retries.
17    pub fn spawn<F, Fut>(self, cancel: CancellationToken, tick: F) -> tokio::task::JoinHandle<()>
18    where
19        F: FnMut(reqwest::Client) -> Fut + Send + 'static,
20        Fut: std::future::Future<Output = Result<()>> + Send,
21    {
22        self.spawn_with_kick(cancel, None, tick)
23    }
24
25    /// Wakes on cadence OR the external `kick` (channel-refs uses this for
26    /// reconciler-side `ConvergeRollout`/`SoakHost` triggers; cadence is the
27    /// safety net). `watch::Receiver` semantics -> kick bursts coalesce to one wake.
28    pub fn spawn_with_kick<F, Fut>(
29        self,
30        cancel: CancellationToken,
31        kick: Option<tokio::sync::watch::Receiver<()>>,
32        mut tick: F,
33    ) -> tokio::task::JoinHandle<()>
34    where
35        F: FnMut(reqwest::Client) -> Fut + Send + 'static,
36        Fut: std::future::Future<Output = Result<()>> + Send,
37    {
38        tokio::spawn(async move {
39            let client = signed_fetch::build_client();
40
41            let mut ticker = tokio::time::interval(self.interval);
42            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
43
44            // `changed()` borrows mutably; keep the receiver in an Option
45            // so the "no kick configured" branch can park forever via
46            // `pending()` without holding a permanent borrow.
47            let mut kick = kick;
48
49            loop {
50                let kicked = tokio::select! {
51                    _ = cancel.cancelled() => {
52                        tracing::info!(
53                            target: "shutdown",
54                            label = self.label,
55                            "poll task shut down",
56                        );
57                        return;
58                    }
59                    _ = ticker.tick() => false,
60                    res = async {
61                        match kick.as_mut() {
62                            Some(rx) => rx.changed().await,
63                            // No kick channel configured: park forever
64                            // so the select arm never fires.
65                            None => std::future::pending().await,
66                        }
67                    } => {
68                        // Sender dropped -> fall back to cadence-only;
69                        // log once so it's visible if this happens.
70                        if res.is_err() {
71                            tracing::warn!(
72                                target: "polling",
73                                label = self.label,
74                                "kick channel closed; running on cadence only",
75                            );
76                            kick = None;
77                            continue;
78                        }
79                        true
80                    }
81                };
82                if let Err(err) = tick(client.clone()).await {
83                    tracing::warn!(
84                        target: "polling",
85                        label = self.label,
86                        kicked,
87                        error = %err,
88                        "poll failed; retaining previous state",
89                    );
90                } else if kicked {
91                    tracing::debug!(
92                        target: "polling",
93                        label = self.label,
94                        "poll fired on kick (event-driven)",
95                    );
96                }
97            }
98        })
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use std::sync::Arc;
106    use std::sync::atomic::{AtomicUsize, Ordering};
107    use std::time::Duration;
108    use tokio::sync::watch;
109    use tokio_util::sync::CancellationToken;
110
111    /// **Regression guard for event-driven polling**: a `kick` MUST
112    /// fire the poll closure within milliseconds, well below the
113    /// cadence interval. If the select arm regresses (e.g., the kick
114    /// path gets removed, the wakeup is mis-wired) this test fails
115    /// because tick_count stays at 0/1 instead of rising on each
116    /// kick.
117    ///
118    /// The kick is what closes the channelEdges -> rollouts-table
119    /// timing gap structurally: when a predecessor goes terminal,
120    /// the reconciler kicks and the new successor rollout gets
121    /// recorded the same tick. Without this wakeup the gap reopens
122    /// on every cadence-period (60 s), and first checkins on a
123    /// freshly-released channel can slip past gates.
124    #[tokio::test(start_paused = false)]
125    async fn kick_fires_poll_well_before_cadence() {
126        let cancel = CancellationToken::new();
127        let (kick_tx, kick_rx) = watch::channel::<()>(());
128        let counter = Arc::new(AtomicUsize::new(0));
129
130        // Long cadence so any wake within the test window MUST be
131        // from the kick path, not the timer.
132        let poller = SignedArtifactPoller {
133            interval: Duration::from_secs(3600),
134            label: "test-kick",
135        };
136        let counter_for_tick = Arc::clone(&counter);
137        let _handle = poller.spawn_with_kick(cancel.clone(), Some(kick_rx), move |_client| {
138            let counter = Arc::clone(&counter_for_tick);
139            async move {
140                counter.fetch_add(1, Ordering::SeqCst);
141                Ok(())
142            }
143        });
144
145        // Fire 3 kicks back-to-back. Watch channel collapses bursts
146        // to a single wake, so the count rises by at least 1 per
147        // distinguishable kick (typically all 3 if interleaved by
148        // sleeps - but we assert ≥1 to keep the test robust).
149        for _ in 0..3 {
150            kick_tx.send(()).unwrap();
151            tokio::time::sleep(Duration::from_millis(50)).await;
152        }
153
154        // Allow the loop to drain; if the kick path is broken the
155        // counter stays at 0 (cadence is 1h, won't fire in this
156        // window).
157        tokio::time::sleep(Duration::from_millis(100)).await;
158        let count = counter.load(Ordering::SeqCst);
159        assert!(
160            count >= 1,
161            "kick must wake the poll within milliseconds; got {count} ticks (cadence is 1h)",
162        );
163
164        cancel.cancel();
165    }
166
167    /// Without a kick channel, the poller falls back to pure cadence.
168    /// Verify the no-kick path still runs the closure on the timer,
169    /// AND that an immediate-fire happens at startup (interval's
170    /// first tick semantic). Pinned so the spawn-with-kick path
171    /// can't accidentally swallow cadence ticks.
172    #[tokio::test(start_paused = false)]
173    async fn cadence_only_fires_without_kick() {
174        let cancel = CancellationToken::new();
175        let counter = Arc::new(AtomicUsize::new(0));
176
177        let poller = SignedArtifactPoller {
178            interval: Duration::from_millis(50),
179            label: "test-cadence",
180        };
181        let counter_for_tick = Arc::clone(&counter);
182        let _handle = poller.spawn(cancel.clone(), move |_client| {
183            let counter = Arc::clone(&counter_for_tick);
184            async move {
185                counter.fetch_add(1, Ordering::SeqCst);
186                Ok(())
187            }
188        });
189
190        // 250ms / 50ms cadence = ~5 ticks. Allow some slack for
191        // scheduler jitter; assert we got at least 2 ticks.
192        tokio::time::sleep(Duration::from_millis(250)).await;
193        let count = counter.load(Ordering::SeqCst);
194        assert!(
195            count >= 2,
196            "cadence-only must fire on the timer; got {count} ticks in 250ms with 50ms cadence",
197        );
198
199        cancel.cancel();
200    }
201
202    /// Belt-and-suspenders: when both kick and cadence are in play,
203    /// dropping the kick sender doesn't crash the poller - it logs
204    /// a warning and continues on cadence-only. Ensures a panicking
205    /// reconciler can't seize the polling loop.
206    #[tokio::test(start_paused = false)]
207    async fn dropped_kick_sender_falls_back_to_cadence() {
208        let cancel = CancellationToken::new();
209        let (kick_tx, kick_rx) = watch::channel::<()>(());
210        let counter = Arc::new(AtomicUsize::new(0));
211
212        let poller = SignedArtifactPoller {
213            interval: Duration::from_millis(50),
214            label: "test-drop",
215        };
216        let counter_for_tick = Arc::clone(&counter);
217        let _handle = poller.spawn_with_kick(cancel.clone(), Some(kick_rx), move |_client| {
218            let counter = Arc::clone(&counter_for_tick);
219            async move {
220                counter.fetch_add(1, Ordering::SeqCst);
221                Ok(())
222            }
223        });
224
225        // Drop the sender - the receiver's `changed()` returns an
226        // error on the next loop iteration. The poller should log
227        // and switch to cadence-only.
228        drop(kick_tx);
229
230        tokio::time::sleep(Duration::from_millis(250)).await;
231        let count = counter.load(Ordering::SeqCst);
232        assert!(
233            count >= 2,
234            "dropped kick must fall back to cadence; got {count} ticks in 250ms",
235        );
236
237        cancel.cancel();
238    }
239}