1use std::path::{Path as FsPath, PathBuf};
4use std::sync::Arc;
5
6use axum::body::Bytes;
7use axum::extract::{Path, State};
8use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
9use axum::response::IntoResponse;
10
11use super::super::route_error::internal_warn;
12use super::super::state::AppState;
13
14fn looks_like_rollout_id(s: &str) -> bool {
21 let Some((channel, channel_ref)) = s.split_once('@') else {
22 return false;
23 };
24 if channel.is_empty() || channel_ref.is_empty() {
25 return false;
26 }
27 if channel_ref.contains('@') {
28 return false;
29 }
30 if !channel
31 .chars()
32 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '-')
33 {
34 return false;
35 }
36 channel_ref
37 .chars()
38 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
39}
40
41fn manifest_paths(dir: &FsPath, rollout_id: &str) -> (PathBuf, PathBuf) {
42 let manifest = dir.join(format!("{rollout_id}.json"));
43 let sig = dir.join(format!("{rollout_id}.json.sig"));
44 (manifest, sig)
45}
46
47type ManifestPair = (Vec<u8>, Vec<u8>);
48
49fn try_load_from_dir(dir: &FsPath, rollout_id: &str) -> Result<Option<ManifestPair>, StatusCode> {
50 let (manifest_path, sig_path) = manifest_paths(dir, rollout_id);
51 let manifest_bytes = match std::fs::read(&manifest_path) {
52 Ok(b) => b,
53 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
54 Err(err) => {
55 tracing::warn!(
56 rollout_id = %rollout_id,
57 path = %manifest_path.display(),
58 error = %err,
59 "rollouts handler: read manifest failed",
60 );
61 return Err(StatusCode::INTERNAL_SERVER_ERROR);
62 }
63 };
64 let sig_bytes = match std::fs::read(&sig_path) {
65 Ok(b) => b,
66 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
67 tracing::warn!(
69 rollout_id = %rollout_id,
70 "rollouts handler: signature file missing for present manifest",
71 );
72 return Err(StatusCode::INTERNAL_SERVER_ERROR);
73 }
74 Err(err) => {
75 tracing::warn!(
76 rollout_id = %rollout_id,
77 error = %err,
78 "rollouts handler: read signature failed",
79 );
80 return Err(StatusCode::INTERNAL_SERVER_ERROR);
81 }
82 };
83 Ok(Some((manifest_bytes, sig_bytes)))
84}
85
86async fn load_pair(state: &AppState, rollout_id: &str) -> Result<ManifestPair, StatusCode> {
87 if state.rollouts_dir.is_none() && state.rollouts_source.is_none() {
88 tracing::debug!(
89 rollout_id = %rollout_id,
90 "rollouts handler: neither rollouts_dir nor rollouts_source configured; returning 503",
91 );
92 return Err(StatusCode::SERVICE_UNAVAILABLE);
93 }
94
95 if !looks_like_rollout_id(rollout_id) {
96 return Err(StatusCode::NOT_FOUND);
97 }
98
99 if let Some(dir) = state.rollouts_dir.as_ref()
100 && let Some((manifest_bytes, sig_bytes)) = try_load_from_dir(dir, rollout_id)?
101 {
102 return Ok((manifest_bytes, sig_bytes));
103 }
104
105 if let Some(source) = state.rollouts_source.as_ref() {
106 match source.fetch_pair(rollout_id).await {
107 Ok((manifest_bytes, sig_bytes)) => {
108 tracing::info!(
109 rollout_id = %rollout_id,
110 "rollouts handler: fetched manifest pair from upstream source",
111 );
112 return Ok((manifest_bytes, sig_bytes));
113 }
114 Err(err) => {
115 tracing::warn!(
116 rollout_id = %rollout_id,
117 error = %err,
118 "rollouts handler: upstream fetch failed",
119 );
120 return Err(StatusCode::BAD_GATEWAY);
121 }
122 }
123 }
124
125 Err(StatusCode::NOT_FOUND)
126}
127
128pub(in crate::server) async fn manifest(
130 State(state): State<Arc<AppState>>,
131 Path(rollout_id): Path<String>,
132) -> Result<impl IntoResponse, StatusCode> {
133 let (manifest_bytes, _sig) = load_pair(&state, &rollout_id).await?;
134 let mut headers = HeaderMap::new();
135 headers.insert(
136 header::CONTENT_TYPE,
137 HeaderValue::from_static("application/json"),
138 );
139 Ok((StatusCode::OK, headers, Bytes::from(manifest_bytes)))
140}
141
142pub(in crate::server) async fn signature(
144 State(state): State<Arc<AppState>>,
145 Path(rollout_id): Path<String>,
146) -> Result<impl IntoResponse, StatusCode> {
147 let (_manifest, sig_bytes) = load_pair(&state, &rollout_id).await?;
148 let mut headers = HeaderMap::new();
149 headers.insert(
150 header::CONTENT_TYPE,
151 HeaderValue::from_static("application/octet-stream"),
152 );
153 Ok((StatusCode::OK, headers, Bytes::from(sig_bytes)))
154}
155
156pub(in crate::server) async fn list_active(
165 State(state): State<Arc<AppState>>,
166) -> Result<impl IntoResponse, StatusCode> {
167 let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
168 let rollouts_meta = db
172 .rollouts()
173 .list_in_flight()
174 .map_err(internal_warn("list_in_flight rollouts query failed"))?;
175
176 let rollouts: Vec<serde_json::Value> = rollouts_meta
177 .into_iter()
178 .map(|r| {
179 let host_states: std::collections::HashMap<String, String> = db
180 .host_rollout_records()
181 .all_for_rollout(r.rollout_id.as_str())
182 .unwrap_or_default()
183 .into_iter()
184 .map(|row| (row.hostname.clone(), format!("{:?}", row.state)))
185 .collect();
186 serde_json::json!({
187 "rolloutId": r.rollout_id.as_str(),
188 "channel": r.channel,
189 "currentWave": r.current_wave,
190 "createdAt": r.created_at,
191 "hostStates": host_states,
192 })
193 })
194 .collect();
195 let body = serde_json::json!({ "rollouts": rollouts }).to_string();
196 let mut headers = HeaderMap::new();
197 headers.insert(
198 header::CONTENT_TYPE,
199 HeaderValue::from_static("application/json"),
200 );
201 Ok((StatusCode::OK, headers, body))
202}
203
204pub(in crate::server) async fn lifecycle(
211 State(state): State<Arc<AppState>>,
212 Path(rollout_id): Path<String>,
213) -> Result<impl IntoResponse, StatusCode> {
214 if !looks_like_rollout_id(&rollout_id) {
215 return Err(StatusCode::BAD_REQUEST);
216 }
217 let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
218 let row = db
219 .rollouts()
220 .state(&rollout_id)
221 .map_err(internal_warn("lifecycle: state query failed"))?;
222 let row = row.ok_or(StatusCode::NOT_FOUND)?;
223 let body = serde_json::json!({
224 "rolloutId": rollout_id,
225 "state": row.state.as_db_str(),
226 "supersededAt": row.superseded_at.map(|t: chrono::DateTime<chrono::Utc>| t.to_rfc3339()),
227 "supersededBy": serde_json::Value::Null,
231 "terminalAt": row.terminal_at.map(|t: chrono::DateTime<chrono::Utc>| t.to_rfc3339()),
236 })
237 .to_string();
238 let mut headers = HeaderMap::new();
239 headers.insert(
240 header::CONTENT_TYPE,
241 HeaderValue::from_static("application/json"),
242 );
243 Ok((StatusCode::OK, headers, body))
244}
245
246pub(in crate::server) async fn hosts(
256 State(state): State<Arc<AppState>>,
257 Path(rollout_id): Path<String>,
258) -> Result<axum::Json<nixfleet_proto::RolloutHosts>, StatusCode> {
259 if !looks_like_rollout_id(&rollout_id) {
260 return Err(StatusCode::BAD_REQUEST);
261 }
262 let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
263
264 let records = match db.host_rollout_records().all_for_rollout(&rollout_id) {
271 Ok(r) => r,
272 Err(err) => {
273 tracing::error!(
274 target: "rollout_hosts",
275 %rollout_id,
276 error = %err,
277 "all_for_rollout failed",
278 );
279 return Err(StatusCode::INTERNAL_SERVER_ERROR);
280 }
281 };
282 if records.is_empty() {
283 return Err(StatusCode::NOT_FOUND);
284 }
285
286 let wave_by_host: std::collections::HashMap<String, u32> =
289 match db.rollouts().state(&rollout_id) {
290 Ok(Some(_)) => std::collections::HashMap::new(),
291 _ => std::collections::HashMap::new(),
292 };
293
294 let mut hosts: Vec<nixfleet_proto::RolloutHostEntry> = Vec::with_capacity(records.len());
295 for r in &records {
296 let (terminal_state, terminal_at) = derive_terminal(r);
297 hosts.push(nixfleet_proto::RolloutHostEntry {
298 host: r.hostname.clone(),
299 channel: r.channel.clone(),
300 wave: wave_by_host.get(&r.hostname).copied().unwrap_or(0),
301 target_closure_hash: r.target_closure.clone(),
302 target_channel_ref: r.rollout_id.as_str().to_string(),
303 dispatched_at: r.dispatched_at.to_rfc3339(),
304 terminal_state,
305 terminal_at,
306 });
307 }
308 hosts.sort_by(|a, b| a.wave.cmp(&b.wave).then_with(|| a.host.cmp(&b.host)));
310
311 let (channel, channel_ref) = rollout_id.split_once('@').unwrap();
314 Ok(axum::Json(nixfleet_proto::RolloutHosts {
315 rollout_id: nixfleet_proto::RolloutId::new(channel, channel_ref),
316 hosts,
317 }))
318}
319
320pub(in crate::server) async fn events(
332 State(state): State<Arc<AppState>>,
333 Path(rollout_id): Path<String>,
334 axum::extract::Query(query): axum::extract::Query<EventsQuery>,
335) -> Result<axum::Json<nixfleet_proto::RolloutEvents>, StatusCode> {
336 if !looks_like_rollout_id(&rollout_id) {
337 return Err(StatusCode::BAD_REQUEST);
338 }
339 let db = state.db.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
340 let limit = query.limit.unwrap_or(1000).max(1);
341
342 let rows = match db.event_log().query_by_rollout(&rollout_id, limit) {
343 Ok(r) => r,
344 Err(err) => {
345 tracing::error!(
346 target: "rollout_events",
347 %rollout_id,
348 error = %err,
349 "event_log query_by_rollout failed",
350 );
351 return Err(StatusCode::INTERNAL_SERVER_ERROR);
352 }
353 };
354
355 let mut events: Vec<nixfleet_proto::RolloutEventEntry> = Vec::with_capacity(rows.len());
356 for row in rows {
357 let payload = match serde_json::from_str::<serde_json::Value>(&row.payload) {
362 Ok(v) => v,
363 Err(err) => {
364 tracing::warn!(
365 target: "rollout_events",
366 seq = row.seq,
367 error = %err,
368 "event_log row payload failed JSON parse — emitting placeholder",
369 );
370 serde_json::json!({ "_parse_error": err.to_string(), "_raw": row.payload })
371 }
372 };
373 events.push(nixfleet_proto::RolloutEventEntry {
374 seq: row.seq,
375 ts: row.ts.to_rfc3339(),
376 kind: row.kind,
377 host: row.host_id,
378 payload,
379 });
380 }
381
382 let (channel, channel_ref) = rollout_id.split_once('@').unwrap();
383 Ok(axum::Json(nixfleet_proto::RolloutEvents {
384 rollout_id: nixfleet_proto::RolloutId::new(channel, channel_ref),
385 events,
386 }))
387}
388
389#[derive(Debug, serde::Deserialize)]
390pub struct EventsQuery {
391 pub limit: Option<i64>,
392}
393
394fn derive_terminal(
397 r: &nixfleet_state_machine::HostRolloutState,
398) -> (Option<String>, Option<String>) {
399 use nixfleet_state_machine::HostState;
400 match r.state {
401 HostState::Converged => (
402 Some("converged".into()),
403 r.converged_at.map(|t| t.to_rfc3339()),
404 ),
405 HostState::Reverted => (
406 Some("rolled-back".into()),
407 r.reverted_at.map(|t| t.to_rfc3339()),
408 ),
409 HostState::Failed => (Some("failed".into()), r.failed_at.map(|t| t.to_rfc3339())),
410 HostState::Pending | HostState::Activating | HostState::Deferred | HostState::Soaking => {
411 (None, None)
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
421 use crate::db::Db;
422 use nixfleet_state_machine::{HostRolloutState as SmState, HostState};
423
424 const TEST_ROLLOUT: &str = "stable@deadbeef";
426
427 #[test]
428 fn validator_accepts_canonical_rollout_id() {
429 assert!(looks_like_rollout_id("stable@deadbeef"));
430 }
431
432 #[test]
433 fn validator_accepts_long_channel_ref() {
434 assert!(looks_like_rollout_id(
437 "stable@deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
438 ));
439 }
440
441 #[test]
442 fn validator_rejects_legacy_sha256_hex_alone() {
443 assert!(!looks_like_rollout_id(
446 "abc1234567890123456789012345678901234567890123456789012345678901",
447 ));
448 }
449
450 #[test]
451 fn validator_rejects_empty_channel() {
452 assert!(!looks_like_rollout_id("@deadbeef"));
453 }
454
455 #[test]
456 fn validator_rejects_empty_ref() {
457 assert!(!looks_like_rollout_id("stable@"));
458 }
459
460 #[test]
461 fn validator_rejects_path_traversal() {
462 assert!(!looks_like_rollout_id("../../etc/passwd"));
463 }
464
465 #[test]
466 fn validator_rejects_slash_in_channel() {
467 assert!(!looks_like_rollout_id("stable/branch@abc"));
468 }
469
470 #[test]
471 fn validator_rejects_uppercase_hex_in_ref() {
472 assert!(!looks_like_rollout_id("stable@DEADBEEF"));
473 }
474
475 #[test]
476 fn validator_rejects_no_separator() {
477 assert!(!looks_like_rollout_id("stableABC"));
478 }
479
480 #[test]
481 fn validator_rejects_multiple_separators() {
482 assert!(!looks_like_rollout_id("stable@beta@abc"));
483 }
484
485 fn fresh_state() -> Arc<AppState> {
486 let db = Db::open_in_memory().unwrap();
487 db.migrate().unwrap();
488 Arc::new(AppState {
489 db: Some(Arc::new(db)),
490 ..Default::default()
491 })
492 }
493
494 #[tokio::test]
495 async fn hosts_404_when_rollout_unknown() {
496 let state = fresh_state();
497 let err = hosts(State(state), axum::extract::Path(TEST_ROLLOUT.into()))
498 .await
499 .unwrap_err();
500 assert_eq!(err, StatusCode::NOT_FOUND);
501 }
502
503 #[tokio::test]
504 async fn hosts_projects_one_entry_per_host_with_terminal_state() {
505 let state = fresh_state();
506 let db = state.db.clone().unwrap();
507 let rollout = TEST_ROLLOUT;
508
509 let now = chrono::Utc::now();
510 let mut h1 = SmState::new_pending(
511 rollout.into(),
512 "h1".into(),
513 "stable".into(),
514 "h1-closure".into(),
515 now,
516 now + chrono::Duration::minutes(5),
517 );
518 h1.state = HostState::Converged;
519 h1.current_closure = Some("h1-closure".into());
520 h1.converged_at = Some(now + chrono::Duration::minutes(10));
521 db.host_rollout_records().upsert(&h1).unwrap();
522
523 let h2 = SmState::new_pending(
524 rollout.into(),
525 "h2".into(),
526 "stable".into(),
527 "h2-closure".into(),
528 now,
529 now + chrono::Duration::minutes(5),
530 );
531 db.host_rollout_records().upsert(&h2).unwrap();
532
533 let resp = hosts(State(state), axum::extract::Path(rollout.into()))
534 .await
535 .unwrap();
536 let payload = resp.0;
537 assert_eq!(payload.rollout_id.as_str(), rollout);
538 assert_eq!(payload.hosts.len(), 2);
539 assert_eq!(payload.hosts[0].host, "h1");
541 assert_eq!(
542 payload.hosts[0].terminal_state.as_deref(),
543 Some("converged"),
544 );
545 assert!(payload.hosts[0].terminal_at.is_some());
546 assert_eq!(payload.hosts[1].host, "h2");
547 assert_eq!(payload.hosts[1].terminal_state, None, "h2 still pending");
548 assert_eq!(payload.hosts[1].terminal_at, None);
549 }
550}