nixfleet_control_plane/server/routes/
dispatch.rs1use std::sync::Arc;
27use std::time::Duration;
28
29use axum::Json;
30use axum::extract::{Extension, Query, State};
31use axum::http::StatusCode;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34
35use super::super::middleware::AuthenticatedCn;
36use super::super::state::AppState;
37
38const MAX_WAIT_SECS: u64 = 60;
40
41#[derive(Debug, Clone, Deserialize)]
42pub struct DispatchQuery {
43 #[serde(default)]
45 pub wait: Option<u64>,
46}
47
48#[derive(Debug, Clone, Serialize)]
49pub struct DispatchResponse {
50 pub hostname: String,
51 pub rollout_id: nixfleet_proto::RolloutId,
52 pub target_closure: String,
53 pub soak_due_at: DateTime<Utc>,
54 pub enqueued_at: DateTime<Utc>,
55}
56
57pub(in crate::server) async fn dispatch(
58 State(state): State<Arc<AppState>>,
59 Extension(cn): Extension<AuthenticatedCn>,
60 Query(q): Query<DispatchQuery>,
61) -> Result<(StatusCode, Json<Option<DispatchResponse>>), StatusCode> {
62 let cn_str = cn.into_string();
63 let hostname = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
64
65 let Some(db) = state.db.as_ref() else {
66 tracing::warn!(
67 target: "dispatch",
68 %hostname,
69 "dispatch: no DB attached; returning 204 (no queue exists in in-memory mode)",
70 );
71 return Ok((StatusCode::NO_CONTENT, Json(None)));
72 };
73
74 match db.dispatch_queue().take_for_host(&hostname) {
76 Ok(Some(q)) => return Ok(deliver(q)),
77 Ok(None) => {} Err(err) => {
79 tracing::error!(
80 target: "dispatch",
81 %hostname,
82 error = %err,
83 "dispatch: take_for_host failed",
84 );
85 return Err(StatusCode::INTERNAL_SERVER_ERROR);
86 }
87 }
88
89 let wait = Duration::from_secs(q.wait.unwrap_or(MAX_WAIT_SECS).min(MAX_WAIT_SECS));
90 let deadline = tokio::time::Instant::now() + wait;
91 let mut kick_rx = state.dispatch_kick.subscribe();
92
93 loop {
94 let now = tokio::time::Instant::now();
95 if now >= deadline {
96 return Ok((StatusCode::NO_CONTENT, Json(None)));
97 }
98 let remaining = deadline - now;
99
100 tokio::select! {
101 res = kick_rx.changed() => {
105 if res.is_err() {
106 return Err(StatusCode::SERVICE_UNAVAILABLE);
108 }
109 }
111 _ = tokio::time::sleep(remaining) => {
112 return Ok((StatusCode::NO_CONTENT, Json(None)));
113 }
114 }
115
116 match db.dispatch_queue().take_for_host(&hostname) {
117 Ok(Some(q)) => return Ok(deliver(q)),
118 Ok(None) => continue, Err(err) => {
120 tracing::error!(
121 target: "dispatch",
122 %hostname,
123 error = %err,
124 "dispatch: take_for_host failed after kick",
125 );
126 return Err(StatusCode::INTERNAL_SERVER_ERROR);
127 }
128 }
129 }
130}
131
132fn deliver(
133 q: crate::db::dispatch_queue::QueuedDispatch,
134) -> (StatusCode, Json<Option<DispatchResponse>>) {
135 let resp = DispatchResponse {
136 hostname: q.hostname,
137 rollout_id: q.rollout_id,
138 target_closure: q.target_closure,
139 soak_due_at: q.soak_due_at,
140 enqueued_at: q.enqueued_at,
141 };
142 (StatusCode::OK, Json(Some(resp)))
143}