nixfleet_control_plane/server/routes/
events.rs

1//! `POST /v1/agent/events` — inbound event ingestion (RFC-0005 §4.2).
2//!
3//! The agent posts a single `AgentEvent` per call. The handler:
4//!
5//! 1. Authenticates the caller via mTLS (`require_cn_layer` middleware
6//!    has already verified the cert and stamped `AuthenticatedCn`).
7//! 2. Cross-checks the cert CN's machine_id against the body's
8//!    `hostname` — same pattern as `/v1/agent/report`. CN-vs-body
9//!    mismatch ⇒ 403.
10//! 3. Deduplicates by `(hostname, rollout_id, seq)` against the
11//!    `host_rollout_records.last_event_seq` column. A seq ≤ the stored
12//!    value is a replay/duplicate and silently 204s (the agent retries
13//!    are idempotent by design).
14//! 4. Maps the wire `AgentEvent` onto the matching
15//!    `nixfleet_state_machine::Event::Remote*` variant and sends it into
16//!    the reducer's input MPSC.
17//! 5. Returns 204 on success, 503 if the runtime channel is unavailable
18//!    (only observable during a narrow startup window before
19//!    `serve()` wires `state.runtime_input_tx`).
20//!
21//! Signature verification on the body is a forward-looking TODO. v0.2
22//! trusts the mTLS cert chain (RFC-0002 §3) — a Phase 7+ pass adds
23//! per-event signatures so an event_log replay can detect tampering
24//! against a stored cert change. The wire envelope already carries an
25//! optional `signature` field so adding enforcement is non-breaking.
26
27use std::sync::Arc;
28
29use axum::Json;
30use axum::extract::{Extension, State};
31use axum::http::StatusCode;
32use nixfleet_proto::AgentEventEnvelope;
33use nixfleet_state_machine::Event;
34
35use super::super::middleware::AuthenticatedCn;
36use super::super::state::AppState;
37use crate::runtime::ReducerInput;
38
39// Wire envelope + AgentEvent + the supporting Wire enums + the
40// AgentEvent -> Event projection all live in `nixfleet_proto` and
41// `nixfleet_state_machine` (RFC-0004 §2 lift: types crossing the
42// agent <-> CP boundary live in a single canonical place). The
43// duplicated definitions that lived here previously - and the
44// hand-built JSON envelope on the agent side - both shipped a
45// `rollout_id` vs `rolloutId` casing mismatch that this lift closes
46// at the type level.
47
48pub(in crate::server) async fn events(
49    State(state): State<Arc<AppState>>,
50    Extension(cn): Extension<AuthenticatedCn>,
51    Json(envelope): Json<AgentEventEnvelope>,
52) -> StatusCode {
53    // 1. Cert-CN vs body-hostname guard. Same shape as the existing
54    //    /v1/agent/report check (cf. routes/reports.rs).
55    let cn_str = cn.into_string();
56    let machine_id = crate::auth::issuance::extract_machine_id(&cn_str, &state.agent_cn_suffix);
57    if machine_id != envelope.hostname {
58        tracing::warn!(
59            target: "events",
60            cert_cn = %cn_str,
61            machine_id = %machine_id,
62            body_hostname = %envelope.hostname,
63            "events rejected: cert CN does not match body hostname",
64        );
65        return StatusCode::FORBIDDEN;
66    }
67
68    let seq = envelope.event.seq();
69
70    // 2. Dedup by (hostname, rollout_id, seq). Replay is silent 204.
71    //    Source of truth: host_rollout_records.last_event_seq, advanced
72    //    by the reducer when it actually applies the event. A duplicate
73    //    here means the agent retried before the reducer caught up — or
74    //    we already processed and the agent missed the response. Either
75    //    way: idempotent.
76    if let Some(db) = state.db.as_ref() {
77        match db
78            .host_rollout_records()
79            .load(envelope.rollout_id.as_str(), &envelope.hostname)
80        {
81            Ok(Some(record)) if seq <= record.last_event_seq => {
82                tracing::debug!(
83                    target: "events",
84                    hostname = %envelope.hostname,
85                    rollout_id = %envelope.rollout_id,
86                    seq,
87                    last_event_seq = record.last_event_seq,
88                    "events: duplicate seq, 204 idempotent",
89                );
90                return StatusCode::NO_CONTENT;
91            }
92            Ok(_) => {} // first event for this pair, or new seq — fall through
93            Err(err) => {
94                tracing::error!(
95                    target: "events",
96                    hostname = %envelope.hostname,
97                    rollout_id = %envelope.rollout_id,
98                    error = %err,
99                    "events: dedup lookup failed; dropping into reducer anyway",
100                );
101                // Continue — better to double-process than to silently drop.
102                // The reducer's state-machine itself rejects illegal
103                // transitions (e.g. duplicate DispatchAck on a Soaking host).
104            }
105        }
106    }
107
108    // 3. Push into reducer. 503 if runtime not yet spun up (narrow
109    //    startup window only).
110    let Some(tx) = state.runtime_input_tx.get() else {
111        tracing::warn!(
112            target: "events",
113            hostname = %envelope.hostname,
114            "events: runtime not yet spun up; returning 503",
115        );
116        return StatusCode::SERVICE_UNAVAILABLE;
117    };
118
119    let host = envelope.hostname.clone();
120    let rollout_id = envelope.rollout_id.clone();
121    let reducer_event: Event = envelope.event.into();
122    let input = ReducerInput::HostEvent {
123        host,
124        rollout_id,
125        event: reducer_event,
126    };
127
128    // Bounded MPSC backpressure surfaces as 503; the pull-only agent
129    // contract (RFC-0005 §2.1) means agents will retry, so the channel
130    // never builds an unbounded queue when CP is overloaded.
131    match tx.try_send(input) {
132        Ok(()) => StatusCode::NO_CONTENT,
133        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
134            tracing::warn!(
135                target: "events",
136                hostname = %envelope.hostname,
137                rollout_id = %envelope.rollout_id,
138                seq,
139                "events: reducer input channel full; returning 503 (agent retries)",
140            );
141            StatusCode::SERVICE_UNAVAILABLE
142        }
143        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
144            tracing::error!(
145                target: "events",
146                hostname = %envelope.hostname,
147                "events: reducer input channel closed; CP shutting down",
148            );
149            StatusCode::SERVICE_UNAVAILABLE
150        }
151    }
152}