nixfleet_control_plane/server/routes/events.rs
1//! `POST /v1/agent/events` — inbound event ingestion (RFC-0005 §4.2).
2//!
3//! The agent posts a single `AgentEvent` per call. The handler:
4//!
5//! 1. Authenticates the caller via mTLS (`require_cn_layer` middleware
6//! has already verified the cert and stamped `AuthenticatedCn`).
7//! 2. Cross-checks the cert CN's machine_id against the body's
8//! `hostname` — same pattern as `/v1/agent/report`. CN-vs-body
9//! mismatch ⇒ 403.
10//! 3. Deduplicates by `(hostname, rollout_id, seq)` against the
11//! `host_rollout_records.last_event_seq` column. A seq ≤ the stored
12//! value is a replay/duplicate and silently 204s (the agent retries
13//! are idempotent by design).
14//! 4. Maps the wire `AgentEvent` onto the matching
15//! `nixfleet_state_machine::Event::Remote*` variant and sends it into
16//! the reducer's input MPSC.
17//! 5. Returns 204 on success, 503 if the runtime channel is unavailable
18//! (only observable during a narrow startup window before
19//! `serve()` wires `state.runtime_input_tx`).
20//!
21//! Signature verification on the body is a forward-looking TODO. v0.2
22//! trusts the mTLS cert chain (RFC-0002 §3) — a Phase 7+ pass adds
23//! per-event signatures so an event_log replay can detect tampering
24//! against a stored cert change. The wire envelope already carries an
25//! optional `signature` field so adding enforcement is non-breaking.
26
27use std::sync::Arc;
28
29use axum::Json;
30use axum::extract::{Extension, State};
31use axum::http::StatusCode;
32use nixfleet_proto::AgentEventEnvelope;
33use nixfleet_state_machine::Event;
34
35use super::super::middleware::AuthenticatedCn;
36use super::super::state::AppState;
37use crate::runtime::ReducerInput;
38
39// Wire envelope + AgentEvent + the supporting Wire enums + the
40// AgentEvent -> Event projection all live in `nixfleet_proto` and
41// `nixfleet_state_machine` (RFC-0004 §2 lift: types crossing the
42// agent <-> CP boundary live in a single canonical place). The
43// duplicated definitions that lived here previously - and the
44// hand-built JSON envelope on the agent side - both shipped a
45// `rollout_id` vs `rolloutId` casing mismatch that this lift closes
46// at the type level.
47
48pub(in crate::server) async fn events(
49 State(state): State<Arc<AppState>>,
50 Extension(cn): Extension<AuthenticatedCn>,
51 Json(envelope): Json<AgentEventEnvelope>,
52) -> StatusCode {
53 // 1. Cert-CN vs body-hostname guard. Same shape as the existing
54 // /v1/agent/report check (cf. routes/reports.rs).
55 let cn_str = cn.into_string();
56 let machine_id = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
57 if machine_id != envelope.hostname {
58 tracing::warn!(
59 target: "events",
60 cert_cn = %cn_str,
61 machine_id = %machine_id,
62 body_hostname = %envelope.hostname,
63 "events rejected: cert CN does not match body hostname",
64 );
65 return StatusCode::FORBIDDEN;
66 }
67
68 let seq = envelope.event.seq();
69
70 // 2. Dedup by (hostname, rollout_id, seq). Replay is silent 204.
71 // Source of truth: host_rollout_records.last_event_seq, advanced
72 // by the reducer when it actually applies the event. A duplicate
73 // here means the agent retried before the reducer caught up — or
74 // we already processed and the agent missed the response. Either
75 // way: idempotent.
76 if let Some(db) = state.db.as_ref() {
77 match db
78 .host_rollout_records()
79 .load(envelope.rollout_id.as_str(), &envelope.hostname)
80 {
81 Ok(Some(record)) if seq <= record.last_event_seq => {
82 tracing::debug!(
83 target: "events",
84 hostname = %envelope.hostname,
85 rollout_id = %envelope.rollout_id,
86 seq,
87 last_event_seq = record.last_event_seq,
88 "events: duplicate seq, 204 idempotent",
89 );
90 return StatusCode::NO_CONTENT;
91 }
92 Ok(_) => {} // first event for this pair, or new seq — fall through
93 Err(err) => {
94 tracing::error!(
95 target: "events",
96 hostname = %envelope.hostname,
97 rollout_id = %envelope.rollout_id,
98 error = %err,
99 "events: dedup lookup failed; dropping into reducer anyway",
100 );
101 // Continue — better to double-process than to silently drop.
102 // The reducer's state-machine itself rejects illegal
103 // transitions (e.g. duplicate DispatchAck on a Soaking host).
104 }
105 }
106 }
107
108 // 3. Push into reducer. 503 if runtime not yet spun up (narrow
109 // startup window only).
110 let Some(tx) = state.runtime_input_tx.get() else {
111 tracing::warn!(
112 target: "events",
113 hostname = %envelope.hostname,
114 "events: runtime not yet spun up; returning 503",
115 );
116 return StatusCode::SERVICE_UNAVAILABLE;
117 };
118
119 let host = envelope.hostname.clone();
120 let rollout_id = envelope.rollout_id.clone();
121 let reducer_event: Event = envelope.event.into();
122 let input = ReducerInput::HostEvent {
123 host,
124 rollout_id,
125 event: reducer_event,
126 };
127
128 // Bounded MPSC backpressure surfaces as 503; the pull-only agent
129 // contract (RFC-0005 §2.1) means agents will retry, so the channel
130 // never builds an unbounded queue when CP is overloaded.
131 match tx.try_send(input) {
132 Ok(()) => StatusCode::NO_CONTENT,
133 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
134 tracing::warn!(
135 target: "events",
136 hostname = %envelope.hostname,
137 rollout_id = %envelope.rollout_id,
138 seq,
139 "events: reducer input channel full; returning 503 (agent retries)",
140 );
141 StatusCode::SERVICE_UNAVAILABLE
142 }
143 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
144 tracing::error!(
145 target: "events",
146 hostname = %envelope.hostname,
147 "events: reducer input channel closed; CP shutting down",
148 );
149 StatusCode::SERVICE_UNAVAILABLE
150 }
151 }
152}