nixfleet_control_plane/server/routes/
dispatch.rs

1//! `GET /v1/agent/dispatch?wait=60` — long-poll dispatch delivery.
2//!
3//! Replaces the pre-v0.2 dispatch-on-checkin contract. The agent calls
4//! this every time it wants new work (typically right after a Converged
5//! or every minute on idle, per `nixfleet-agent` policy). The contract
6//! is RFC-0003 §1 pull-only + RFC-0005 §2.1 + plan 06's locked-in
7//! "long-poll, 60s wait window" decision.
8//!
9//! Implementation:
10//!
11//!   1. mTLS + CN-vs-?hostname=… check (cert CN authoritative).
12//!   2. Peek `dispatch_queue` for the host. Row exists ⇒ atomic
13//!      `take_for_host` + return. No row ⇒ park on the
14//!      `state.dispatch_kick` watch channel for up to `wait` seconds
15//!      (capped at 60). Wake on:
16//!         - applier upsert (any host) — re-peek for this host;
17//!         - timeout — return empty.
18//!   3. Response shape:
19//!         - 200 + Dispatch JSON ⇒ work to do, agent processes;
20//!         - 204                ⇒ no work, agent re-polls.
21//!
22//! Backpressure: long-polls are cheap (one row peek per wake), and the
23//! watch channel collapses bursts to one wake. The 60s cap is the
24//! protocol-defined ceiling.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use axum::Json;
30use axum::extract::{Extension, Query, State};
31use axum::http::StatusCode;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34
35use super::super::middleware::AuthenticatedCn;
36use super::super::state::AppState;
37
38/// Long-poll wait window cap. Locked by plan 06.
39const MAX_WAIT_SECS: u64 = 60;
40
41#[derive(Debug, Clone, Deserialize)]
42pub struct DispatchQuery {
43    /// Long-poll wait in seconds. Clamped to [0, 60].
44    #[serde(default)]
45    pub wait: Option<u64>,
46}
47
48#[derive(Debug, Clone, Serialize)]
49pub struct DispatchResponse {
50    pub hostname: String,
51    pub rollout_id: nixfleet_proto::RolloutId,
52    pub target_closure: String,
53    pub soak_due_at: DateTime<Utc>,
54    pub enqueued_at: DateTime<Utc>,
55}
56
57pub(in crate::server) async fn dispatch(
58    State(state): State<Arc<AppState>>,
59    Extension(cn): Extension<AuthenticatedCn>,
60    Query(q): Query<DispatchQuery>,
61) -> Result<(StatusCode, Json<Option<DispatchResponse>>), StatusCode> {
62    let cn_str = cn.into_string();
63    let hostname = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
64
65    let Some(db) = state.db.as_ref() else {
66        tracing::warn!(
67            target: "dispatch",
68            %hostname,
69            "dispatch: no DB attached; returning 204 (no queue exists in in-memory mode)",
70        );
71        return Ok((StatusCode::NO_CONTENT, Json(None)));
72    };
73
74    // Fast path: a row is already waiting.
75    match db.dispatch_queue().take_for_host(&hostname) {
76        Ok(Some(q)) => return Ok(deliver(q)),
77        Ok(None) => {} // park below
78        Err(err) => {
79            tracing::error!(
80                target: "dispatch",
81                %hostname,
82                error = %err,
83                "dispatch: take_for_host failed",
84            );
85            return Err(StatusCode::INTERNAL_SERVER_ERROR);
86        }
87    }
88
89    let wait = Duration::from_secs(q.wait.unwrap_or(MAX_WAIT_SECS).min(MAX_WAIT_SECS));
90    let deadline = tokio::time::Instant::now() + wait;
91    let mut kick_rx = state.dispatch_kick.subscribe();
92
93    loop {
94        let now = tokio::time::Instant::now();
95        if now >= deadline {
96            return Ok((StatusCode::NO_CONTENT, Json(None)));
97        }
98        let remaining = deadline - now;
99
100        tokio::select! {
101            // Watch yields on any new dispatch_queue upsert. We don't
102            // care about the value — re-peek the table for THIS host
103            // and decide.
104            res = kick_rx.changed() => {
105                if res.is_err() {
106                    // Sender dropped — CP is shutting down.
107                    return Err(StatusCode::SERVICE_UNAVAILABLE);
108                }
109                // Fall through and try take_for_host below.
110            }
111            _ = tokio::time::sleep(remaining) => {
112                return Ok((StatusCode::NO_CONTENT, Json(None)));
113            }
114        }
115
116        match db.dispatch_queue().take_for_host(&hostname) {
117            Ok(Some(q)) => return Ok(deliver(q)),
118            Ok(None) => continue, // false wake — re-park
119            Err(err) => {
120                tracing::error!(
121                    target: "dispatch",
122                    %hostname,
123                    error = %err,
124                    "dispatch: take_for_host failed after kick",
125                );
126                return Err(StatusCode::INTERNAL_SERVER_ERROR);
127            }
128        }
129    }
130}
131
132fn deliver(
133    q: crate::db::dispatch_queue::QueuedDispatch,
134) -> (StatusCode, Json<Option<DispatchResponse>>) {
135    let resp = DispatchResponse {
136        hostname: q.hostname,
137        rollout_id: q.rollout_id,
138        target_closure: q.target_closure,
139        soak_due_at: q.soak_due_at,
140        enqueued_at: q.enqueued_at,
141    };
142    (StatusCode::OK, Json(Some(resp)))
143}