nixfleet_proto/
agent_event.rs

1//! Wire-format types for `POST /v1/agent/events` (RFC-0005 §4.2).
2//!
3//! Lives in `nixfleet-proto` so the agent (producer) and CP (consumer)
4//! share a single canonical definition. Prior to this lift the envelope
5//! was hand-built as `serde_json::Map` on the agent and re-defined as a
6//! Rust struct on the CP side; a casing mismatch on the outer
7//! `rollout_id` field was the surface defect that exposed the
8//! duplicated-definition shape. Lifted per RFC-0004 §2: any type that
9//! crosses the agent <-> CP boundary lives in `nixfleet-proto`, not in
10//! both sides simultaneously.
11//!
12//! Wire convention pinned here:
13//!   - Envelope: outer fields are `camelCase` (`hostname`, `rolloutId`,
14//!     `event`, optional `signature`).
15//!   - Inner event: `tag = "kind"` (PascalCase variants) +
16//!     `camelCase` field names for the variant payloads.
17//!   - Probe-status / probe-mode / on-health-failure: keep their
18//!     historic wire shapes (`lowercase`, `kebab-case`, `kebab-case`).
19//!
20//! Conversions to `nixfleet_state_machine` types live in that crate
21//! (orphan rule); this module defines the wire surface only.
22
23use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25
26use crate::RolloutId;
27
28/// Outer envelope agents POST to `/v1/agent/events`.
29#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
30#[serde(rename_all = "camelCase")]
31pub struct AgentEventEnvelope {
32    pub hostname: String,
33    pub rollout_id: RolloutId,
34    pub event: AgentEvent,
35    /// Hex-encoded Ed25519 signature over canonicalised `event` bytes.
36    /// Optional in v0.2 (mTLS provides primary auth); enforced in
37    /// Phase 7+.
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub signature: Option<String>,
40}
41
42/// Inbound agent events. Mirrors the wire side of
43/// `nixfleet_state_machine::OutboundAgentEvent` (same variant names,
44/// `camelCase` fields per RFC-0005 §4.2).
45#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
46#[serde(tag = "kind", rename_all = "PascalCase")]
47pub enum AgentEvent {
48    #[serde(rename_all = "camelCase")]
49    DispatchAck {
50        current_closure_at_dispatch: String,
51        received_at: DateTime<Utc>,
52        seq: u64,
53    },
54    #[serde(rename_all = "camelCase")]
55    ActivationStarted {
56        started_at: DateTime<Utc>,
57        switch_method: String,
58        seq: u64,
59    },
60    #[serde(rename_all = "camelCase")]
61    ActivationCompleted {
62        observed_current_closure: String,
63        exit_code: i32,
64        completed_at: DateTime<Utc>,
65        seq: u64,
66    },
67    #[serde(rename_all = "camelCase")]
68    ActivationFailed {
69        exit_code: i32,
70        stderr_tail: String,
71        failed_at: DateTime<Utc>,
72        seq: u64,
73    },
74    /// LIFT #2 (RFC-0005 §4.2): live activation skipped because
75    /// `component` (dbus/systemd/kernel/init) cannot be live-swapped on
76    /// a running system. Profile + bootloader updated; next reboot
77    /// completes the activation. Replaces the pre-LIFT-#2 fake
78    /// `ActivationCompleted` with `exit_code: 0`.
79    #[serde(rename_all = "camelCase")]
80    ActivationDeferred {
81        component: String,
82        deferred_at: DateTime<Utc>,
83        seq: u64,
84    },
85    #[serde(rename_all = "camelCase")]
86    ProbeTopologyDeclared {
87        probes: Vec<ProbeTopologyEntryWire>,
88        declared_at: DateTime<Utc>,
89        seq: u64,
90    },
91    #[serde(rename_all = "camelCase")]
92    ProbeObservedFirst {
93        probe_name: String,
94        mode: ProbeModeWire,
95        observed_at: DateTime<Utc>,
96        seq: u64,
97    },
98    #[serde(rename_all = "camelCase")]
99    ProbeResult {
100        probe_name: String,
101        mode: ProbeModeWire,
102        status: ProbeStatusWire,
103        observed_at: DateTime<Utc>,
104        #[serde(default, skip_serializing_if = "Option::is_none")]
105        failure_reason: Option<String>,
106        #[serde(default, skip_serializing_if = "Option::is_none")]
107        sub_results: Option<Vec<ProbeSubResultWire>>,
108        seq: u64,
109    },
110    #[serde(rename_all = "camelCase")]
111    ProbeFailureFirst {
112        probe_name: String,
113        mode: ProbeModeWire,
114        first_failed_at: DateTime<Utc>,
115        seq: u64,
116    },
117    #[serde(rename_all = "camelCase")]
118    Failed {
119        failed_at: DateTime<Utc>,
120        sustained_duration_secs: u64,
121        failing_probes: Vec<String>,
122        policy_applied: OnHealthFailureWire,
123        seq: u64,
124    },
125    #[serde(rename_all = "camelCase")]
126    RollbackComplete {
127        reverted_to_closure: String,
128        exit_code: i32,
129        completed_at: DateTime<Utc>,
130        seq: u64,
131    },
132    #[serde(rename_all = "camelCase")]
133    Converged {
134        converged_at: DateTime<Utc>,
135        current_closure: String,
136        seq: u64,
137    },
138}
139
140impl AgentEvent {
141    pub fn seq(&self) -> u64 {
142        match self {
143            AgentEvent::DispatchAck { seq, .. }
144            | AgentEvent::ActivationStarted { seq, .. }
145            | AgentEvent::ActivationCompleted { seq, .. }
146            | AgentEvent::ActivationDeferred { seq, .. }
147            | AgentEvent::ActivationFailed { seq, .. }
148            | AgentEvent::ProbeTopologyDeclared { seq, .. }
149            | AgentEvent::ProbeObservedFirst { seq, .. }
150            | AgentEvent::ProbeResult { seq, .. }
151            | AgentEvent::ProbeFailureFirst { seq, .. }
152            | AgentEvent::Failed { seq, .. }
153            | AgentEvent::RollbackComplete { seq, .. }
154            | AgentEvent::Converged { seq, .. } => *seq,
155        }
156    }
157}
158
159#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
160#[serde(rename_all = "lowercase")]
161pub enum ProbeStatusWire {
162    Pass,
163    Fail,
164}
165
166#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
167#[serde(rename_all = "kebab-case")]
168pub enum ProbeModeWire {
169    #[default]
170    Enforce,
171    Observe,
172    Disabled,
173}
174
175#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
176#[serde(rename_all = "kebab-case")]
177pub enum OnHealthFailureWire {
178    Halt,
179    RollbackAndHalt,
180}
181
182impl From<OnHealthFailureWire> for crate::OnHealthFailure {
183    fn from(w: OnHealthFailureWire) -> Self {
184        match w {
185            OnHealthFailureWire::Halt => crate::OnHealthFailure::Halt,
186            OnHealthFailureWire::RollbackAndHalt => crate::OnHealthFailure::RollbackAndHalt,
187        }
188    }
189}
190
191impl From<crate::OnHealthFailure> for OnHealthFailureWire {
192    fn from(p: crate::OnHealthFailure) -> Self {
193        match p {
194            crate::OnHealthFailure::Halt => OnHealthFailureWire::Halt,
195            crate::OnHealthFailure::RollbackAndHalt => OnHealthFailureWire::RollbackAndHalt,
196        }
197    }
198}
199
200#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
201#[serde(rename_all = "camelCase")]
202pub struct ProbeTopologyEntryWire {
203    pub probe_name: String,
204    pub kind: String,
205    pub mode: ProbeModeWire,
206}
207
208#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
209#[serde(rename_all = "camelCase")]
210pub struct ProbeSubResultWire {
211    pub control_id: String,
212    pub status: ProbeStatusWire,
213    pub framework: String,
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub article: Option<String>,
216    /// Per-control effective mode resolved from the probe's
217    /// `controlOverrides` / `controls` declaration. The CP applier
218    /// inserts `probe_failures` rows only for `Enforce`-mode failures;
219    /// `Observe` rows are recorded in `event_log` for visibility but
220    /// do not gate the compliance_wave. `Disabled` controls are
221    /// filtered out by the agent before emission.
222    #[serde(default)]
223    pub effective_mode: ProbeModeWire,
224    /// Audit rationale from the operator's `controlOverrides[id].reason`
225    /// (or `controls[id].reason`). Surfaces in CP's event_log so
226    /// auditors can recover the "why was this control downgraded?"
227    /// answer from the signed event stream alone — no out-of-band
228    /// reference to fleet.nix needed.
229    #[serde(default, skip_serializing_if = "Option::is_none")]
230    pub override_reason: Option<String>,
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use chrono::TimeZone;
237
238    fn fixed_now() -> DateTime<Utc> {
239        Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap()
240    }
241
242    fn envelope_with(event: AgentEvent) -> AgentEventEnvelope {
243        AgentEventEnvelope {
244            hostname: "host-test".into(),
245            rollout_id: RolloutId::new("stable", "abc1234"),
246            event,
247            signature: None,
248        }
249    }
250
251    fn round_trip(env: AgentEventEnvelope) {
252        let raw = serde_json::to_string(&env).expect("serialize envelope");
253        let back: AgentEventEnvelope = serde_json::from_str(&raw).expect("deserialize envelope");
254        assert_eq!(env, back, "envelope round-trip preserves equality");
255    }
256
257    #[test]
258    fn envelope_outer_keys_are_camelcase() {
259        let env = envelope_with(AgentEvent::DispatchAck {
260            current_closure_at_dispatch: "prior".into(),
261            received_at: fixed_now(),
262            seq: 1,
263        });
264        let raw = serde_json::to_string(&env).unwrap();
265        assert!(
266            raw.contains("\"rolloutId\""),
267            "envelope must use camelCase rolloutId: {raw}",
268        );
269        assert!(
270            !raw.contains("\"rollout_id\""),
271            "envelope must NOT use snake_case rollout_id: {raw}",
272        );
273    }
274
275    #[test]
276    fn dispatch_ack_round_trip() {
277        round_trip(envelope_with(AgentEvent::DispatchAck {
278            current_closure_at_dispatch: "prior-closure".into(),
279            received_at: fixed_now(),
280            seq: 1,
281        }));
282    }
283
284    #[test]
285    fn activation_started_round_trip() {
286        round_trip(envelope_with(AgentEvent::ActivationStarted {
287            started_at: fixed_now(),
288            switch_method: "systemd-run".into(),
289            seq: 2,
290        }));
291    }
292
293    #[test]
294    fn activation_completed_round_trip() {
295        round_trip(envelope_with(AgentEvent::ActivationCompleted {
296            observed_current_closure: "closure-a".into(),
297            exit_code: 0,
298            completed_at: fixed_now(),
299            seq: 3,
300        }));
301    }
302
303    #[test]
304    fn activation_failed_round_trip() {
305        round_trip(envelope_with(AgentEvent::ActivationFailed {
306            exit_code: 1,
307            stderr_tail: "boom".into(),
308            failed_at: fixed_now(),
309            seq: 4,
310        }));
311    }
312
313    #[test]
314    fn probe_topology_declared_round_trip() {
315        round_trip(envelope_with(AgentEvent::ProbeTopologyDeclared {
316            probes: vec![ProbeTopologyEntryWire {
317                probe_name: "nginx".into(),
318                kind: "http".into(),
319                mode: ProbeModeWire::Enforce,
320            }],
321            declared_at: fixed_now(),
322            seq: 5,
323        }));
324    }
325
326    #[test]
327    fn probe_observed_first_round_trip() {
328        round_trip(envelope_with(AgentEvent::ProbeObservedFirst {
329            probe_name: "nginx".into(),
330            mode: ProbeModeWire::Observe,
331            observed_at: fixed_now(),
332            seq: 6,
333        }));
334    }
335
336    #[test]
337    fn probe_result_round_trip_with_sub_results() {
338        round_trip(envelope_with(AgentEvent::ProbeResult {
339            probe_name: "evidence-nis2".into(),
340            mode: ProbeModeWire::Enforce,
341            status: ProbeStatusWire::Fail,
342            observed_at: fixed_now(),
343            failure_reason: Some("missing control".into()),
344            sub_results: Some(vec![ProbeSubResultWire {
345                control_id: "A.8.1".into(),
346                status: ProbeStatusWire::Fail,
347                framework: "nis2".into(),
348                article: Some("21.2.h".into()),
349                effective_mode: ProbeModeWire::Enforce,
350                override_reason: None,
351            }]),
352            seq: 7,
353        }));
354    }
355
356    #[test]
357    fn probe_failure_first_round_trip() {
358        round_trip(envelope_with(AgentEvent::ProbeFailureFirst {
359            probe_name: "nginx".into(),
360            mode: ProbeModeWire::Enforce,
361            first_failed_at: fixed_now(),
362            seq: 8,
363        }));
364    }
365
366    #[test]
367    fn failed_round_trip() {
368        round_trip(envelope_with(AgentEvent::Failed {
369            failed_at: fixed_now(),
370            sustained_duration_secs: 120,
371            failing_probes: vec!["nginx".into()],
372            policy_applied: OnHealthFailureWire::RollbackAndHalt,
373            seq: 9,
374        }));
375    }
376
377    #[test]
378    fn rollback_complete_round_trip() {
379        round_trip(envelope_with(AgentEvent::RollbackComplete {
380            reverted_to_closure: "prior-closure".into(),
381            exit_code: 0,
382            completed_at: fixed_now(),
383            seq: 10,
384        }));
385    }
386
387    #[test]
388    fn converged_round_trip() {
389        round_trip(envelope_with(AgentEvent::Converged {
390            converged_at: fixed_now(),
391            current_closure: "closure-a".into(),
392            seq: 11,
393        }));
394    }
395
396    #[test]
397    fn probe_status_wire_format_is_lowercase() {
398        assert_eq!(
399            serde_json::to_string(&ProbeStatusWire::Pass).unwrap(),
400            "\"pass\"",
401        );
402        assert_eq!(
403            serde_json::to_string(&ProbeStatusWire::Fail).unwrap(),
404            "\"fail\"",
405        );
406    }
407
408    #[test]
409    fn on_health_failure_wire_format_is_kebab_case() {
410        assert_eq!(
411            serde_json::to_string(&OnHealthFailureWire::Halt).unwrap(),
412            "\"halt\"",
413        );
414        assert_eq!(
415            serde_json::to_string(&OnHealthFailureWire::RollbackAndHalt).unwrap(),
416            "\"rollback-and-halt\"",
417        );
418    }
419
420    #[test]
421    fn seq_accessor_matches_variant_field() {
422        let e = AgentEvent::DispatchAck {
423            current_closure_at_dispatch: "x".into(),
424            received_at: fixed_now(),
425            seq: 42,
426        };
427        assert_eq!(e.seq(), 42);
428    }
429}