nixfleet_control_plane/server/routes/
rollouts.rs

1//! Stateless distributor for pre-signed rollout manifests; CP holds no signing key.
2
3use std::path::{Path as FsPath, PathBuf};
4use std::sync::Arc;
5
6use axum::body::Bytes;
7use axum::extract::{Path, State};
8use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
9use axum::response::IntoResponse;
10
11use super::super::route_error::internal_warn;
12use super::super::state::AppState;
13
14/// LOADBEARING: validates the canonical RFC-0008 §6.3 RolloutId shape
15/// `"{channel}@{channel_ref}"` and blocks path-traversal smuggling
16/// (`/`, `..`, whitespace, multi-`@` all fail the character classes).
17/// Channel is locked to lowercase ASCII to match the cycle's convention
18/// and avoid case-insensitive-filesystem collisions on macOS hosts; the
19/// ref tracks the git SHA shape upstream of the producer.
20fn looks_like_rollout_id(s: &str) -> bool {
21    let Some((channel, channel_ref)) = s.split_once('@') else {
22        return false;
23    };
24    if channel.is_empty() || channel_ref.is_empty() {
25        return false;
26    }
27    if channel_ref.contains('@') {
28        return false;
29    }
30    if !channel
31        .chars()
32        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '-')
33    {
34        return false;
35    }
36    channel_ref
37        .chars()
38        .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
39}
40
41fn manifest_paths(dir: &FsPath, rollout_id: &str) -> (PathBuf, PathBuf) {
42    let manifest = dir.join(format!("{rollout_id}.json"));
43    let sig = dir.join(format!("{rollout_id}.json.sig"));
44    (manifest, sig)
45}
46
47type ManifestPair = (Vec<u8>, Vec<u8>);
48
49fn try_load_from_dir(dir: &FsPath, rollout_id: &str) -> Result<Option<ManifestPair>, StatusCode> {
50    let (manifest_path, sig_path) = manifest_paths(dir, rollout_id);
51    let manifest_bytes = match std::fs::read(&manifest_path) {
52        Ok(b) => b,
53        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
54        Err(err) => {
55            tracing::warn!(
56                rollout_id = %rollout_id,
57                path = %manifest_path.display(),
58                error = %err,
59                "rollouts handler: read manifest failed",
60            );
61            return Err(StatusCode::INTERNAL_SERVER_ERROR);
62        }
63    };
64    let sig_bytes = match std::fs::read(&sig_path) {
65        Ok(b) => b,
66        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
67            // GOTCHA: manifest present but sig missing - refuse rather than serve unverifiable bytes.
68            tracing::warn!(
69                rollout_id = %rollout_id,
70                "rollouts handler: signature file missing for present manifest",
71            );
72            return Err(StatusCode::INTERNAL_SERVER_ERROR);
73        }
74        Err(err) => {
75            tracing::warn!(
76                rollout_id = %rollout_id,
77                error = %err,
78                "rollouts handler: read signature failed",
79            );
80            return Err(StatusCode::INTERNAL_SERVER_ERROR);
81        }
82    };
83    Ok(Some((manifest_bytes, sig_bytes)))
84}
85
86async fn load_pair(state: &AppState, rollout_id: &str) -> Result<ManifestPair, StatusCode> {
87    if state.rollouts_dir.is_none() && state.rollouts_source.is_none() {
88        tracing::debug!(
89            rollout_id = %rollout_id,
90            "rollouts handler: neither rollouts_dir nor rollouts_source configured; returning 503",
91        );
92        return Err(StatusCode::SERVICE_UNAVAILABLE);
93    }
94
95    if !looks_like_rollout_id(rollout_id) {
96        return Err(StatusCode::NOT_FOUND);
97    }
98
99    if let Some(dir) = state.rollouts_dir.as_ref()
100        && let Some((manifest_bytes, sig_bytes)) = try_load_from_dir(dir, rollout_id)?
101    {
102        return Ok((manifest_bytes, sig_bytes));
103    }
104
105    if let Some(source) = state.rollouts_source.as_ref() {
106        match source.fetch_pair(rollout_id).await {
107            Ok((manifest_bytes, sig_bytes)) => {
108                tracing::info!(
109                    rollout_id = %rollout_id,
110                    "rollouts handler: fetched manifest pair from upstream source",
111                );
112                return Ok((manifest_bytes, sig_bytes));
113            }
114            Err(err) => {
115                tracing::warn!(
116                    rollout_id = %rollout_id,
117                    error = %err,
118                    "rollouts handler: upstream fetch failed",
119                );
120                return Err(StatusCode::BAD_GATEWAY);
121            }
122        }
123    }
124
125    Err(StatusCode::NOT_FOUND)
126}
127
128/// `GET /v1/rollouts/{rolloutId}` - manifest bytes; mTLS via router-level `require_cn_layer`.
129pub(in crate::server) async fn manifest(
130    State(state): State<Arc<AppState>>,
131    Path(rollout_id): Path<String>,
132) -> Result<impl IntoResponse, StatusCode> {
133    let (manifest_bytes, _sig) = load_pair(&state, &rollout_id).await?;
134    let mut headers = HeaderMap::new();
135    headers.insert(
136        header::CONTENT_TYPE,
137        HeaderValue::from_static("application/json"),
138    );
139    Ok((StatusCode::OK, headers, Bytes::from(manifest_bytes)))
140}
141
142/// `GET /v1/rollouts/{rolloutId}/sig` - raw signature bytes.
143pub(in crate::server) async fn signature(
144    State(state): State<Arc<AppState>>,
145    Path(rollout_id): Path<String>,
146) -> Result<impl IntoResponse, StatusCode> {
147    let (_manifest, sig_bytes) = load_pair(&state, &rollout_id).await?;
148    let mut headers = HeaderMap::new();
149    headers.insert(
150        header::CONTENT_TYPE,
151        HeaderValue::from_static("application/octet-stream"),
152    );
153    Ok((StatusCode::OK, headers, Bytes::from(sig_bytes)))
154}
155
156/// `GET /v1/rollouts` - enumerate active (non-superseded) rollouts with
157/// per-host state pulled from `host_rollout_state` (DB-authoritative,
158/// independent of the journal event window).
159///
160/// Operators (status renderers) use this for "what's actually deployed"
161/// instead of inferring from journal `target=confirm` events - agent
162/// confirms only fire on real dispatches, so converged-at-dispatch hosts
163/// would otherwise look unconfirmed forever in journal-derived views.
164pub(in crate::server) async fn list_active(
165    State(state): State<Arc<AppState>>,
166) -> Result<impl IntoResponse, StatusCode> {
167    let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
168    // Per-host states sourced from `host_rollout_records` (RFC-0005 §5).
169    // Legacy `host_dispatch_state` is gone; the new projection groups
170    // rows by rollout_id and renders the 6-state machine value as-is.
171    let rollouts_meta = db
172        .rollouts()
173        .list_in_flight()
174        .map_err(internal_warn("list_in_flight rollouts query failed"))?;
175
176    let rollouts: Vec<serde_json::Value> = rollouts_meta
177        .into_iter()
178        .map(|r| {
179            let host_states: std::collections::HashMap<String, String> = db
180                .host_rollout_records()
181                .all_for_rollout(r.rollout_id.as_str())
182                .unwrap_or_default()
183                .into_iter()
184                .map(|row| (row.hostname.clone(), format!("{:?}", row.state)))
185                .collect();
186            serde_json::json!({
187                "rolloutId": r.rollout_id.as_str(),
188                "channel": r.channel,
189                "currentWave": r.current_wave,
190                "createdAt": r.created_at,
191                "hostStates": host_states,
192            })
193        })
194        .collect();
195    let body = serde_json::json!({ "rollouts": rollouts }).to_string();
196    let mut headers = HeaderMap::new();
197    headers.insert(
198        header::CONTENT_TYPE,
199        HeaderValue::from_static("application/json"),
200    );
201    Ok((StatusCode::OK, headers, body))
202}
203
204/// `GET /v1/rollouts/{rolloutId}/lifecycle` - supersession state for the
205/// rollout, sourced solely from the rollouts table. Returns 404 for any
206/// rid not tracked there.
207///
208/// Distinct from the signed manifest endpoint because we can't inject
209/// server-derived metadata into the signed bytes.
210pub(in crate::server) async fn lifecycle(
211    State(state): State<Arc<AppState>>,
212    Path(rollout_id): Path<String>,
213) -> Result<impl IntoResponse, StatusCode> {
214    if !looks_like_rollout_id(&rollout_id) {
215        return Err(StatusCode::BAD_REQUEST);
216    }
217    let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
218    let row = db
219        .rollouts()
220        .state(&rollout_id)
221        .map_err(internal_warn("lifecycle: state query failed"))?;
222    let row = row.ok_or(StatusCode::NOT_FOUND)?;
223    let body = serde_json::json!({
224        "rolloutId": rollout_id,
225        "state": row.state.as_db_str(),
226        "supersededAt": row.superseded_at.map(|t: chrono::DateTime<chrono::Utc>| t.to_rfc3339()),
227        // `superseded_by` dropped in Phase 10a per RFC-0008 §6.3 + SR-3.
228        // Operator-facing successor lookup migrates to an event_log walk
229        // for `SuccessorOpened` events; not yet wired (v0.2.x follow-up).
230        "supersededBy": serde_json::Value::Null,
231        // Distinct from supersededAt - terminal_at fires on natural
232        // convergence. UI consumers use this to gray out finished
233        // rollouts; gates ignore it (they read host_states directly
234        // from list_active).
235        "terminalAt": row.terminal_at.map(|t: chrono::DateTime<chrono::Utc>| t.to_rfc3339()),
236    })
237    .to_string();
238    let mut headers = HeaderMap::new();
239    headers.insert(
240        header::CONTENT_TYPE,
241        HeaderValue::from_static("application/json"),
242    );
243    Ok((StatusCode::OK, headers, body))
244}
245
246/// `GET /v1/rollouts/{rolloutId}/hosts` — per-host summary for a rollout.
247///
248/// Projects `host_rollout_records` into one entry per `(rollout, host)`
249/// pair: state, target/current closure, dispatch + terminal timestamps.
250/// Operator-facing read: "what state is each host in?" The CLI's
251/// `nixfleet rollout hosts <id>` renders this as a table.
252///
253/// For the chronological event-log stream (engineer-facing replay
254/// surface; RFC-0005 §10.5), see [`events`] / `GET /v1/rollouts/{id}/events`.
255pub(in crate::server) async fn hosts(
256    State(state): State<Arc<AppState>>,
257    Path(rollout_id): Path<String>,
258) -> Result<axum::Json<nixfleet_proto::RolloutHosts>, StatusCode> {
259    if !looks_like_rollout_id(&rollout_id) {
260        return Err(StatusCode::BAD_REQUEST);
261    }
262    let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
263
264    // Per-host summary rows from host_rollout_records. v0.2 collapses to
265    // one row per (rollout, host) because dispatch is now event-driven
266    // (RFC-0005 §4) — multiple dispatches against the same pair are
267    // re-emits of the same logical intent, not distinct rows. The `wave`
268    // field is read from the verified manifest's `host_set` for that
269    // host.
270    let records = match db.host_rollout_records().all_for_rollout(&rollout_id) {
271        Ok(r) => r,
272        Err(err) => {
273            tracing::error!(
274                target: "rollout_hosts",
275                %rollout_id,
276                error = %err,
277                "all_for_rollout failed",
278            );
279            return Err(StatusCode::INTERNAL_SERVER_ERROR);
280        }
281    };
282    if records.is_empty() {
283        return Err(StatusCode::NOT_FOUND);
284    }
285
286    // Wave-index lookup from the verified manifest (only available when
287    // verified_fleet is primed). Missing manifest ⇒ wave 0 placeholder.
288    let wave_by_host: std::collections::HashMap<String, u32> =
289        match db.rollouts().state(&rollout_id) {
290            Ok(Some(_)) => std::collections::HashMap::new(),
291            _ => std::collections::HashMap::new(),
292        };
293
294    let mut hosts: Vec<nixfleet_proto::RolloutHostEntry> = Vec::with_capacity(records.len());
295    for r in &records {
296        let (terminal_state, terminal_at) = derive_terminal(r);
297        hosts.push(nixfleet_proto::RolloutHostEntry {
298            host: r.hostname.clone(),
299            channel: r.channel.clone(),
300            wave: wave_by_host.get(&r.hostname).copied().unwrap_or(0),
301            target_closure_hash: r.target_closure.clone(),
302            target_channel_ref: r.rollout_id.as_str().to_string(),
303            dispatched_at: r.dispatched_at.to_rfc3339(),
304            terminal_state,
305            terminal_at,
306        });
307    }
308    // Stable rendering order: wave asc, hostname asc.
309    hosts.sort_by(|a, b| a.wave.cmp(&b.wave).then_with(|| a.host.cmp(&b.host)));
310
311    // looks_like_rollout_id has already enforced the canonical
312    // `channel@channel_ref` shape; split_once cannot return None here.
313    let (channel, channel_ref) = rollout_id.split_once('@').unwrap();
314    Ok(axum::Json(nixfleet_proto::RolloutHosts {
315        rollout_id: nixfleet_proto::RolloutId::new(channel, channel_ref),
316        hosts,
317    }))
318}
319
320/// `GET /v1/rollouts/{rolloutId}/events` — chronological event-log
321/// stream for a rollout (RFC-0005 §10.5 + Plan 04 §"Event log schema").
322///
323/// Returns every `event_log` row whose `rollout_id` matches, sorted by
324/// `seq` ascending. Engineer-facing surface: feed the rows through
325/// `nixfleet_state_machine::step` to reproduce per-host state
326/// evolution; or query for specific kinds (`agent_event`, `effect`,
327/// `gate_decision`, etc.) to debug a specific layer.
328///
329/// `?limit=N` caps the number of entries (default 1000 — sized for
330/// ~5–8 events per host × ~125 hosts; raise for very large rollouts).
331pub(in crate::server) async fn events(
332    State(state): State<Arc<AppState>>,
333    Path(rollout_id): Path<String>,
334    axum::extract::Query(query): axum::extract::Query<EventsQuery>,
335) -> Result<axum::Json<nixfleet_proto::RolloutEvents>, StatusCode> {
336    if !looks_like_rollout_id(&rollout_id) {
337        return Err(StatusCode::BAD_REQUEST);
338    }
339    let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
340    let limit = query.limit.unwrap_or(1000).max(1);
341
342    let rows = match db.event_log().query_by_rollout(&rollout_id, limit) {
343        Ok(r) => r,
344        Err(err) => {
345            tracing::error!(
346                target: "rollout_events",
347                %rollout_id,
348                error = %err,
349                "event_log query_by_rollout failed",
350            );
351            return Err(StatusCode::INTERNAL_SERVER_ERROR);
352        }
353    };
354
355    let mut events: Vec<nixfleet_proto::RolloutEventEntry> = Vec::with_capacity(rows.len());
356    for row in rows {
357        // event_log enforces JSON validity at insert (Phase 4 fix
358        // f3fcb213); an unparsable payload is a corruption signal, not
359        // expected operational state. Log + replace with a placeholder
360        // so a single bad row doesn't 500 the whole trace.
361        let payload = match serde_json::from_str::<serde_json::Value>(&row.payload) {
362            Ok(v) => v,
363            Err(err) => {
364                tracing::warn!(
365                    target: "rollout_events",
366                    seq = row.seq,
367                    error = %err,
368                    "event_log row payload failed JSON parse — emitting placeholder",
369                );
370                serde_json::json!({ "_parse_error": err.to_string(), "_raw": row.payload })
371            }
372        };
373        events.push(nixfleet_proto::RolloutEventEntry {
374            seq: row.seq,
375            ts: row.ts.to_rfc3339(),
376            kind: row.kind,
377            host: row.host_id,
378            payload,
379        });
380    }
381
382    let (channel, channel_ref) = rollout_id.split_once('@').unwrap();
383    Ok(axum::Json(nixfleet_proto::RolloutEvents {
384        rollout_id: nixfleet_proto::RolloutId::new(channel, channel_ref),
385        events,
386    }))
387}
388
389#[derive(Debug, serde::Deserialize)]
390pub struct EventsQuery {
391    pub limit: Option<i64>,
392}
393
394/// Map a host_rollout_records row to the CLI's (terminal_state, terminal_at)
395/// pair. Open = `None`.
396fn derive_terminal(
397    r: &nixfleet_state_machine::HostRolloutState,
398) -> (Option<String>, Option<String>) {
399    use nixfleet_state_machine::HostState;
400    match r.state {
401        HostState::Converged => (
402            Some("converged".into()),
403            r.converged_at.map(|t| t.to_rfc3339()),
404        ),
405        HostState::Reverted => (
406            Some("rolled-back".into()),
407            r.reverted_at.map(|t| t.to_rfc3339()),
408        ),
409        HostState::Failed => (Some("failed".into()), r.failed_at.map(|t| t.to_rfc3339())),
410        HostState::Pending | HostState::Activating | HostState::Deferred | HostState::Soaking => {
411            (None, None)
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    //! `/v1/rollouts/{id}/hosts` projection from host_rollout_records.
419
420    use super::*;
421    use crate::db::Db;
422    use nixfleet_state_machine::{HostRolloutState as SmState, HostState};
423
424    /// Canonical RFC-0008 §6.3 RolloutId, valid per `looks_like_rollout_id`.
425    const TEST_ROLLOUT: &str = "stable@deadbeef";
426
427    #[test]
428    fn validator_accepts_canonical_rollout_id() {
429        assert!(looks_like_rollout_id("stable@deadbeef"));
430    }
431
432    #[test]
433    fn validator_accepts_long_channel_ref() {
434        // 40-char SHA1 + 64-char SHA256 length variants both pass the
435        // character class; the validator does not hardcode a ref length.
436        assert!(looks_like_rollout_id(
437            "stable@deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
438        ));
439    }
440
441    #[test]
442    fn validator_rejects_legacy_sha256_hex_alone() {
443        // Reject the legacy hex-only format (64 lowercase hex, no `@`
444        // separator) per RFC-0008 §6.3.
445        assert!(!looks_like_rollout_id(
446            "abc1234567890123456789012345678901234567890123456789012345678901",
447        ));
448    }
449
450    #[test]
451    fn validator_rejects_empty_channel() {
452        assert!(!looks_like_rollout_id("@deadbeef"));
453    }
454
455    #[test]
456    fn validator_rejects_empty_ref() {
457        assert!(!looks_like_rollout_id("stable@"));
458    }
459
460    #[test]
461    fn validator_rejects_path_traversal() {
462        assert!(!looks_like_rollout_id("../../etc/passwd"));
463    }
464
465    #[test]
466    fn validator_rejects_slash_in_channel() {
467        assert!(!looks_like_rollout_id("stable/branch@abc"));
468    }
469
470    #[test]
471    fn validator_rejects_uppercase_hex_in_ref() {
472        assert!(!looks_like_rollout_id("stable@DEADBEEF"));
473    }
474
475    #[test]
476    fn validator_rejects_no_separator() {
477        assert!(!looks_like_rollout_id("stableABC"));
478    }
479
480    #[test]
481    fn validator_rejects_multiple_separators() {
482        assert!(!looks_like_rollout_id("stable@beta@abc"));
483    }
484
485    fn fresh_state() -> Arc<AppState> {
486        let db = Db::open_in_memory().unwrap();
487        db.migrate().unwrap();
488        Arc::new(AppState {
489            db: Some(Arc::new(db)),
490            ..Default::default()
491        })
492    }
493
494    #[tokio::test]
495    async fn hosts_404_when_rollout_unknown() {
496        let state = fresh_state();
497        let err = hosts(State(state), axum::extract::Path(TEST_ROLLOUT.into()))
498            .await
499            .unwrap_err();
500        assert_eq!(err, StatusCode::NOT_FOUND);
501    }
502
503    #[tokio::test]
504    async fn hosts_projects_one_entry_per_host_with_terminal_state() {
505        let state = fresh_state();
506        let db = state.db.clone().unwrap();
507        let rollout = TEST_ROLLOUT;
508
509        let now = chrono::Utc::now();
510        let mut h1 = SmState::new_pending(
511            rollout.into(),
512            "h1".into(),
513            "stable".into(),
514            "h1-closure".into(),
515            now,
516            now + chrono::Duration::minutes(5),
517        );
518        h1.state = HostState::Converged;
519        h1.current_closure = Some("h1-closure".into());
520        h1.converged_at = Some(now + chrono::Duration::minutes(10));
521        db.host_rollout_records().upsert(&h1).unwrap();
522
523        let h2 = SmState::new_pending(
524            rollout.into(),
525            "h2".into(),
526            "stable".into(),
527            "h2-closure".into(),
528            now,
529            now + chrono::Duration::minutes(5),
530        );
531        db.host_rollout_records().upsert(&h2).unwrap();
532
533        let resp = hosts(State(state), axum::extract::Path(rollout.into()))
534            .await
535            .unwrap();
536        let payload = resp.0;
537        assert_eq!(payload.rollout_id.as_str(), rollout);
538        assert_eq!(payload.hosts.len(), 2);
539        // Stable order: wave asc, hostname asc → h1 then h2.
540        assert_eq!(payload.hosts[0].host, "h1");
541        assert_eq!(
542            payload.hosts[0].terminal_state.as_deref(),
543            Some("converged"),
544        );
545        assert!(payload.hosts[0].terminal_at.is_some());
546        assert_eq!(payload.hosts[1].host, "h2");
547        assert_eq!(payload.hosts[1].terminal_state, None, "h2 still pending");
548        assert_eq!(payload.hosts[1].terminal_at, None);
549    }
550}