agentmux_launcher/event_log.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase D.2 — event log: in-memory ring buffer of recent reducer
5// events, used by D.3's replay protocol; plus an optional disk
6// persistence stream at `<data-dir>/launcher-events.log` for
7// crash forensics.
8//
9// Two roles, kept clean:
10//
11// 1. **In-memory ring** is the source of truth for replay during
12// the launcher's lifetime. `GetEvents { since: u64 }` reads
13// from it. Bounded by `MAX_RING_EVENTS` to stop unbounded
14// memory growth in long sessions; oldest events evict first.
15// A subscriber that fell behind further than the ring's
16// coverage gets an `EventList` of whatever's still in the ring
17// + a flag indicating they may have missed some — it's their
18// job to recover by treating subsequent events as authoritative.
19//
20// 2. **Disk file** is purely forensic. Append-only JSON-lines.
21// Survives launcher crash so an operator can post-mortem
22// "what was the launcher doing right before it died."
23// Fire-and-forget: write failures log a warning but never
24// block the in-memory path. Rotated when it exceeds
25// `MAX_DISK_BYTES` (renames current to `.old`, starts fresh).
26//
27// Why both: in-memory satisfies D.3's resync needs at zero I/O
28// cost in the happy path. Disk satisfies the "what happened
29// before the crash" debugging story without complicating the
30// replay reader (which would otherwise need fallback-to-disk
31// logic when the ring's coverage doesn't reach back far enough).
32//
33// Phase D.3 adds the wire protocol that consumes this module.
34
35use std::collections::VecDeque;
36use std::path::PathBuf;
37use std::sync::Mutex;
38
39use agentmux_common::ipc::Event;
40use tokio::io::AsyncWriteExt;
41
42/// Cap on in-memory ring size. 4096 events is comfortable for
43/// realistic resync windows (~minutes of activity at typical
44/// reducer event rates: 10–50 events per user action). Tunable
45/// upward if forensics in long sessions show truncation; downward
46/// would only be useful if we measured noticeable memory pressure
47/// from this (we don't — Event is small and there are 4096 of them).
48const MAX_RING_EVENTS: usize = 4096;
49
50/// Cap on disk file size before rotation. 8 MiB ≈ 4–8K events
51/// depending on event variant. Two-file rotation: when current
52/// exceeds this, rename to `.old` (overwriting any prior `.old`)
53/// and start fresh. Total worst-case footprint: 2 × MAX_DISK_BYTES.
54const MAX_DISK_BYTES: u64 = 8 * 1024 * 1024;
55
56/// Append-only ring + optional disk persistence.
57///
58/// Cloneable via `Arc<EventLog>` from the IPC server context;
59/// `append` and `events_since` are cheap (Mutex held for
60/// microseconds — Vec<Event> push / scan, no I/O on the
61/// in-memory path).
62///
63/// Disk writes happen on a dedicated tokio task that subscribes
64/// to the broadcast bus separately from the in-memory append.
65/// This means the in-memory ring is updated synchronously from
66/// the IPC server's dispatch path; disk persistence runs at its
67/// own pace and may lag.
68#[derive(Debug)]
69pub struct EventLog {
70 ring: Mutex<VecDeque<Event>>,
71 disk_path: Option<PathBuf>,
72}
73
74impl EventLog {
75 /// Construct an event log. `disk_path = None` disables disk
76 /// persistence (used in tests where filesystem state is
77 /// inconvenient). The on-disk file is created on first append;
78 /// no upfront I/O.
79 pub fn new(disk_path: Option<PathBuf>) -> Self {
80 Self {
81 ring: Mutex::new(VecDeque::with_capacity(MAX_RING_EVENTS)),
82 disk_path,
83 }
84 }
85
86 /// Append an event to the in-memory ring. Evicts the oldest
87 /// entry when the ring is at capacity. Synchronous,
88 /// O(1)-amortized.
89 pub fn append(&self, event: Event) {
90 let mut ring = self.ring.lock().expect("event-log ring mutex poisoned");
91 if ring.len() == MAX_RING_EVENTS {
92 ring.pop_front();
93 }
94 ring.push_back(event);
95 }
96
97 /// Snapshot of all events currently in the ring with version >
98 /// `since`. Returned in insertion order (oldest first), so the
99 /// caller applies them sequentially.
100 ///
101 /// Phase D.3 — used by `Command::GetEvents { since }` to
102 /// produce the replay slice. The snapshot is taken at-call-time;
103 /// events arriving after this returns are NOT included (the
104 /// subscriber sees them on the live broadcast stream).
105 pub fn events_since(&self, since: u64) -> Vec<Event> {
106 let ring = self.ring.lock().expect("event-log ring mutex poisoned");
107 ring.iter()
108 .filter(|e| event_version(e) > since)
109 .cloned()
110 .collect()
111 }
112
113 /// True if the requested `since` version is older than the
114 /// oldest event in the ring (i.e. the subscriber missed events
115 /// that have already been evicted). Caller should treat the
116 /// resulting `events_since` slice as best-effort and may need
117 /// to re-fetch a snapshot to recover canonical state.
118 pub fn replay_truncated(&self, since: u64) -> bool {
119 let ring = self.ring.lock().expect("event-log ring mutex poisoned");
120 match ring.front() {
121 // Phase E.1a (codex P2 #608) — saturating_add guards
122 // against `since: u64::MAX` overflow. Wire input is
123 // externally reachable; debug builds would panic, release
124 // would wrap. Saturating means "since == u64::MAX" trivially
125 // returns false (oldest can't exceed u64::MAX), which is
126 // the correct semantic — there's no possible event newer
127 // than that, so there can't be a gap.
128 Some(oldest) => event_version(oldest) > since.saturating_add(1),
129 None => false,
130 }
131 }
132
133 /// Disk path for the writer task to flush to. None when disk
134 /// persistence is disabled.
135 pub fn disk_path(&self) -> Option<&PathBuf> {
136 self.disk_path.as_ref()
137 }
138}
139
140/// Background task: write events to the disk file as they arrive
141/// on the broadcast bus. Rotates when the file exceeds
142/// `MAX_DISK_BYTES`.
143///
144/// Spawned once per launcher run from main.rs. Holds a tokio
145/// broadcast receiver and the EventLog's disk path. Failures
146/// log a warning and drop the event from the disk stream
147/// (in-memory ring is unaffected — disk is forensics-only).
148pub async fn run_disk_writer(
149 log: std::sync::Arc<EventLog>,
150 mut events_rx: tokio::sync::broadcast::Receiver<Event>,
151) {
152 let path = match log.disk_path() {
153 Some(p) => p.clone(),
154 None => return, // disk persistence disabled — exit task
155 };
156 let rotated_path = path.with_extension("log.old");
157
158 let mut file = match open_for_append(&path).await {
159 Ok(f) => f,
160 Err(e) => {
161 crate::log(&format!(
162 "[event-log] cannot open {} for append: {} — disk persistence disabled",
163 path.display(),
164 e
165 ));
166 return;
167 }
168 };
169 let mut bytes_written = file.metadata().await.map(|m| m.len()).unwrap_or(0);
170
171 loop {
172 match events_rx.recv().await {
173 Ok(event) => {
174 let mut buf = match serde_json::to_vec(&event) {
175 Ok(b) => b,
176 Err(e) => {
177 crate::log(&format!("[event-log] serialize failed: {}", e));
178 continue;
179 }
180 };
181 buf.push(b'\n');
182 if bytes_written + buf.len() as u64 > MAX_DISK_BYTES {
183 // Rotate: drop the writer, rename current →
184 // .old (overwriting), reopen fresh. Failures
185 // here are non-fatal; we log and keep writing
186 // to the existing file (it'll just exceed cap).
187 drop(file);
188 if let Err(e) = tokio::fs::rename(&path, &rotated_path).await {
189 crate::log(&format!("[event-log] rotation rename failed: {} — continuing without rotation", e));
190 }
191 file = match open_for_append(&path).await {
192 Ok(f) => f,
193 Err(e) => {
194 crate::log(&format!(
195 "[event-log] post-rotation open failed: {} — disk persistence stopping",
196 e
197 ));
198 return;
199 }
200 };
201 bytes_written = 0;
202 }
203 if let Err(e) = file.write_all(&buf).await {
204 crate::log(&format!("[event-log] write failed: {} — dropping event from disk stream", e));
205 continue;
206 }
207 // Phase E.1a — durable: fsync per append so events
208 // written before a crash survive it. Required by
209 // Phase E §6.4 for srv's bootstrap-replay correctness.
210 // Latency cost: ~ms per event vs microseconds without
211 // sync. Acceptable for our event volume (~10–50 per
212 // user action). Trade-off considered: batched fsync
213 // (group commit). Skipping for E.1a — premature
214 // optimization; revisit if profiling shows a hot path.
215 if let Err(e) = file.sync_data().await {
216 crate::log(&format!("[event-log] sync_data failed: {} — event written but not fsynced", e));
217 }
218 bytes_written += buf.len() as u64;
219 }
220 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
221 crate::log(&format!("[event-log] disk writer lagged, missed {} events", n));
222 }
223 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
224 let _ = file.flush().await;
225 return;
226 }
227 }
228 }
229}
230
231async fn open_for_append(path: &std::path::Path) -> std::io::Result<tokio::fs::File> {
232 tokio::fs::OpenOptions::new()
233 .create(true)
234 .append(true)
235 .open(path)
236 .await
237}
238
239/// Extract the `version` field from any Event variant. Mirrors
240/// reducer.rs::extract_version (which is test-only; the prod code
241/// path needs its own copy here). When a new variant is added that
242/// carries a version, add it here too — the unreachable arm catches
243/// future-variant compile errors.
244fn event_version(e: &Event) -> u64 {
245 match e {
246 Event::ProcessSpawned { version, .. }
247 | Event::ProcessExited { version, .. }
248 | Event::LifecyclePhaseChanged { version, .. }
249 | Event::Registered { version, .. }
250 | Event::Pong { version, .. }
251 | Event::WindowOpened { version, .. }
252 | Event::WindowClosed { version, .. }
253 | Event::PoolWindowAdded { version, .. }
254 | Event::PoolWindowRemoved { version, .. }
255 | Event::PoolWindowPromoted { version, .. }
256 | Event::PanesReaped { version, .. }
257 | Event::PoolDrained { version, .. }
258 | Event::PoolNotLast { version, .. }
259 | Event::WindowInstanceAssigned { version, .. }
260 | Event::WindowInstanceReleased { version, .. }
261 | Event::BackendWindowIdRegistered { version, .. }
262 | Event::BackendWindowIdUnregistered { version, .. }
263 | Event::DriftDetected { version, .. }
264 | Event::HwndDriftDetected { version, .. }
265 | Event::CorrectiveWindowMove { version, .. }
266 | Event::HostShouldQuit { version, .. }
267 | Event::Snapshot { version, .. }
268 | Event::EventList { version, .. }
269 | Event::SagaStarted { version, .. }
270 | Event::SagaCompleted { version, .. }
271 | Event::SagaFailed { version, .. }
272 | Event::SrvSnapshot { version, .. }
273 | Event::WorkspaceCreated { version, .. }
274 | Event::WorkspaceDeleted { version, .. }
275 | Event::TabCreated { version, .. }
276 | Event::TabDeleted { version, .. }
277 | Event::ActiveTabChanged { version, .. }
278 | Event::TabReordered { version, .. }
279 | Event::SrvWindowOpened { version, .. }
280 | Event::SrvWindowClosed { version, .. }
281 | Event::SrvWindowWorkspaceChanged { version, .. }
282 | Event::TabsReorderedBulk { version, .. }
283 | Event::WorkspaceRenamed { version, .. }
284 | Event::TabRenamed { version, .. }
285 | Event::WorkspaceMetaUpdated { version, .. }
286 | Event::TabMetaUpdated { version, .. }
287 | Event::BlockMetaUpdated { version, .. }
288 | Event::TabMoved { version, .. }
289 | Event::BlockMoved { version, .. }
290 | Event::BlockCreated { version, .. }
291 | Event::BlockDeleted { version, .. }
292 | Event::FocusedNodeChanged { version, .. }
293 | Event::MagnifiedNodeChanged { version, .. }
294 | Event::SagaActionFailed { version, .. }
295 | Event::Error { version, .. }
296 // Phase E.4.B — layout tree events.
297 | Event::LayoutNodeInserted { version, .. }
298 | Event::LayoutNodeInsertedAtIndex { version, .. }
299 | Event::LayoutNodeDeleted { version, .. }
300 | Event::LayoutNodeMoved { version, .. }
301 | Event::LayoutNodesSwapped { version, .. }
302 | Event::LayoutNodesResized { version, .. }
303 | Event::LayoutNodeReplaced { version, .. }
304 | Event::LayoutSplitHorizontalApplied { version, .. }
305 | Event::LayoutSplitVerticalApplied { version, .. }
306 | Event::LayoutCleared { version, .. }
307 | Event::LayoutTreeReplaced { version, .. }
308 | Event::WindowMetaUpdated { version, .. } => *version,
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use agentmux_common::ipc::{ClientKind, LifecyclePhase};
316
317 fn lifecycle_event(v: u64) -> Event {
318 Event::LifecyclePhaseChanged {
319 from: LifecyclePhase::Starting,
320 to: LifecyclePhase::Running,
321 version: v,
322 }
323 }
324
325 fn registered_event(v: u64) -> Event {
326 Event::Registered {
327 client_id: 1,
328 launcher_pid: 1,
329 launcher_version: "test".into(),
330 version: v,
331 }
332 }
333
334 fn process_spawned_event(v: u64) -> Event {
335 Event::ProcessSpawned {
336 pid: 100,
337 kind: ClientKind::Host,
338 client_version: "0.0.0".into(),
339 version: v,
340 }
341 }
342
343 #[test]
344 fn append_grows_ring_until_cap_then_evicts_oldest() {
345 let log = EventLog::new(None);
346 for v in 1..=(MAX_RING_EVENTS as u64 + 5) {
347 log.append(lifecycle_event(v));
348 }
349 let ring = log.ring.lock().unwrap();
350 assert_eq!(ring.len(), MAX_RING_EVENTS);
351 // Oldest 5 should have been evicted; first should be v=6.
352 assert_eq!(event_version(ring.front().unwrap()), 6);
353 // Newest is v = MAX_RING_EVENTS + 5.
354 assert_eq!(
355 event_version(ring.back().unwrap()),
356 MAX_RING_EVENTS as u64 + 5
357 );
358 }
359
360 #[test]
361 fn events_since_returns_only_versions_strictly_greater() {
362 let log = EventLog::new(None);
363 for v in [1u64, 2, 3, 4, 5] {
364 log.append(lifecycle_event(v));
365 }
366 let replay = log.events_since(2);
367 assert_eq!(replay.len(), 3, "expected v=3,4,5; got {:?}", replay);
368 assert_eq!(event_version(&replay[0]), 3);
369 assert_eq!(event_version(&replay[2]), 5);
370 }
371
372 #[test]
373 fn events_since_zero_returns_all() {
374 let log = EventLog::new(None);
375 for v in [1u64, 2, 3] {
376 log.append(registered_event(v));
377 }
378 let replay = log.events_since(0);
379 assert_eq!(replay.len(), 3);
380 }
381
382 #[test]
383 fn events_since_at_or_above_max_returns_empty() {
384 let log = EventLog::new(None);
385 log.append(process_spawned_event(5));
386 log.append(process_spawned_event(6));
387 let replay = log.events_since(10);
388 assert!(replay.is_empty());
389 }
390
391 #[test]
392 fn replay_truncated_detects_missed_events() {
393 let log = EventLog::new(None);
394 // Fill past capacity to force eviction.
395 for v in 1..=(MAX_RING_EVENTS as u64 + 100) {
396 log.append(lifecycle_event(v));
397 }
398 // Subscriber asks for events since v=5; ring's oldest is now v=101.
399 // The subscriber missed v=6..=100 — the replay slice covers v=101..,
400 // and `replay_truncated(5)` reports the gap.
401 assert!(log.replay_truncated(5));
402 // Subscriber asking from a version newer than ring's oldest sees
403 // no truncation.
404 let oldest = MAX_RING_EVENTS as u64 + 100 - MAX_RING_EVENTS as u64 + 1;
405 assert!(!log.replay_truncated(oldest));
406 }
407
408 #[test]
409 fn replay_truncated_on_empty_log_is_false() {
410 let log = EventLog::new(None);
411 assert!(!log.replay_truncated(0));
412 assert!(!log.replay_truncated(100));
413 }
414}