nixfleet_agent/runtime/workers/
advance_ticker.rs1use std::time::Duration;
11
12use nixfleet_proto::clock::ClockHandle;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15
16use super::super::{ReducerInput, ShutdownToken};
17
18const TICK_INTERVAL: Duration = Duration::from_secs(5);
19
20pub fn spawn(
21 _clock: ClockHandle,
22 input_tx: mpsc::Sender<ReducerInput>,
23 shutdown: ShutdownToken,
24) -> JoinHandle<()> {
25 tokio::spawn(async move {
26 let mut shutdown_rx = shutdown.into_inner();
27 let mut ticker =
28 tokio::time::interval_at(tokio::time::Instant::now() + TICK_INTERVAL, TICK_INTERVAL);
29 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
30
31 loop {
32 tokio::select! {
33 biased;
34 _ = &mut shutdown_rx => {
35 tracing::info!(
36 target: "shutdown",
37 task = "agent_advance_ticker",
38 "task shut down",
39 );
40 return;
41 }
42 _ = ticker.tick() => {
43 if let Err(err) = input_tx.send(ReducerInput::AgentAdvanceTick).await {
44 tracing::warn!(
45 target: "agent_advance_ticker",
46 error = %err,
47 "reducer input channel closed",
48 );
49 return;
50 }
51 }
52 }
53 }
54 })
55}