agentmux_launcher/
event_log.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase D.2 — event log: in-memory ring buffer of recent reducer
5// events, used by D.3's replay protocol; plus an optional disk
6// persistence stream at `<data-dir>/launcher-events.log` for
7// crash forensics.
8//
9// Two roles, kept clean:
10//
11// 1. **In-memory ring** is the source of truth for replay during
12//    the launcher's lifetime. `GetEvents { since: u64 }` reads
13//    from it. Bounded by `MAX_RING_EVENTS` to stop unbounded
14//    memory growth in long sessions; oldest events evict first.
15//    A subscriber that fell behind further than the ring's
16//    coverage gets an `EventList` of whatever's still in the ring
17//    + a flag indicating they may have missed some — it's their
18//    job to recover by treating subsequent events as authoritative.
19//
20// 2. **Disk file** is purely forensic. Append-only JSON-lines.
21//    Survives launcher crash so an operator can post-mortem
22//    "what was the launcher doing right before it died."
23//    Fire-and-forget: write failures log a warning but never
24//    block the in-memory path. Rotated when it exceeds
25//    `MAX_DISK_BYTES` (renames current to `.old`, starts fresh).
26//
27// Why both: in-memory satisfies D.3's resync needs at zero I/O
28// cost in the happy path. Disk satisfies the "what happened
29// before the crash" debugging story without complicating the
30// replay reader (which would otherwise need fallback-to-disk
31// logic when the ring's coverage doesn't reach back far enough).
32//
33// Phase D.3 adds the wire protocol that consumes this module.
34
35use std::collections::VecDeque;
36use std::path::PathBuf;
37use std::sync::Mutex;
38
39use agentmux_common::ipc::Event;
40use tokio::io::AsyncWriteExt;
41
42/// Cap on in-memory ring size. 4096 events is comfortable for
43/// realistic resync windows (~minutes of activity at typical
44/// reducer event rates: 10–50 events per user action). Tunable
45/// upward if forensics in long sessions show truncation; downward
46/// would only be useful if we measured noticeable memory pressure
47/// from this (we don't — Event is small and there are 4096 of them).
48const MAX_RING_EVENTS: usize = 4096;
49
50/// Cap on disk file size before rotation. 8 MiB ≈ 4–8K events
51/// depending on event variant. Two-file rotation: when current
52/// exceeds this, rename to `.old` (overwriting any prior `.old`)
53/// and start fresh. Total worst-case footprint: 2 × MAX_DISK_BYTES.
54const MAX_DISK_BYTES: u64 = 8 * 1024 * 1024;
55
56/// Append-only ring + optional disk persistence.
57///
58/// Cloneable via `Arc<EventLog>` from the IPC server context;
59/// `append` and `events_since` are cheap (Mutex held for
60/// microseconds — Vec<Event> push / scan, no I/O on the
61/// in-memory path).
62///
63/// Disk writes happen on a dedicated tokio task that subscribes
64/// to the broadcast bus separately from the in-memory append.
65/// This means the in-memory ring is updated synchronously from
66/// the IPC server's dispatch path; disk persistence runs at its
67/// own pace and may lag.
68#[derive(Debug)]
69pub struct EventLog {
70    ring: Mutex<VecDeque<Event>>,
71    disk_path: Option<PathBuf>,
72}
73
74impl EventLog {
75    /// Construct an event log. `disk_path = None` disables disk
76    /// persistence (used in tests where filesystem state is
77    /// inconvenient). The on-disk file is created on first append;
78    /// no upfront I/O.
79    pub fn new(disk_path: Option<PathBuf>) -> Self {
80        Self {
81            ring: Mutex::new(VecDeque::with_capacity(MAX_RING_EVENTS)),
82            disk_path,
83        }
84    }
85
86    /// Append an event to the in-memory ring. Evicts the oldest
87    /// entry when the ring is at capacity. Synchronous,
88    /// O(1)-amortized.
89    pub fn append(&self, event: Event) {
90        let mut ring = self.ring.lock().expect("event-log ring mutex poisoned");
91        if ring.len() == MAX_RING_EVENTS {
92            ring.pop_front();
93        }
94        ring.push_back(event);
95    }
96
97    /// Snapshot of all events currently in the ring with version >
98    /// `since`. Returned in insertion order (oldest first), so the
99    /// caller applies them sequentially.
100    ///
101    /// Phase D.3 — used by `Command::GetEvents { since }` to
102    /// produce the replay slice. The snapshot is taken at-call-time;
103    /// events arriving after this returns are NOT included (the
104    /// subscriber sees them on the live broadcast stream).
105    pub fn events_since(&self, since: u64) -> Vec<Event> {
106        let ring = self.ring.lock().expect("event-log ring mutex poisoned");
107        ring.iter()
108            .filter(|e| event_version(e) > since)
109            .cloned()
110            .collect()
111    }
112
113    /// True if the requested `since` version is older than the
114    /// oldest event in the ring (i.e. the subscriber missed events
115    /// that have already been evicted). Caller should treat the
116    /// resulting `events_since` slice as best-effort and may need
117    /// to re-fetch a snapshot to recover canonical state.
118    pub fn replay_truncated(&self, since: u64) -> bool {
119        let ring = self.ring.lock().expect("event-log ring mutex poisoned");
120        match ring.front() {
121            // Phase E.1a (codex P2 #608) — saturating_add guards
122            // against `since: u64::MAX` overflow. Wire input is
123            // externally reachable; debug builds would panic, release
124            // would wrap. Saturating means "since == u64::MAX" trivially
125            // returns false (oldest can't exceed u64::MAX), which is
126            // the correct semantic — there's no possible event newer
127            // than that, so there can't be a gap.
128            Some(oldest) => event_version(oldest) > since.saturating_add(1),
129            None => false,
130        }
131    }
132
133    /// Disk path for the writer task to flush to. None when disk
134    /// persistence is disabled.
135    pub fn disk_path(&self) -> Option<&PathBuf> {
136        self.disk_path.as_ref()
137    }
138}
139
140/// Background task: write events to the disk file as they arrive
141/// on the broadcast bus. Rotates when the file exceeds
142/// `MAX_DISK_BYTES`.
143///
144/// Spawned once per launcher run from main.rs. Holds a tokio
145/// broadcast receiver and the EventLog's disk path. Failures
146/// log a warning and drop the event from the disk stream
147/// (in-memory ring is unaffected — disk is forensics-only).
148pub async fn run_disk_writer(
149    log: std::sync::Arc<EventLog>,
150    mut events_rx: tokio::sync::broadcast::Receiver<Event>,
151) {
152    let path = match log.disk_path() {
153        Some(p) => p.clone(),
154        None => return, // disk persistence disabled — exit task
155    };
156    let rotated_path = path.with_extension("log.old");
157
158    let mut file = match open_for_append(&path).await {
159        Ok(f) => f,
160        Err(e) => {
161            crate::log(&format!(
162                "[event-log] cannot open {} for append: {} — disk persistence disabled",
163                path.display(),
164                e
165            ));
166            return;
167        }
168    };
169    let mut bytes_written = file.metadata().await.map(|m| m.len()).unwrap_or(0);
170
171    loop {
172        match events_rx.recv().await {
173            Ok(event) => {
174                let mut buf = match serde_json::to_vec(&event) {
175                    Ok(b) => b,
176                    Err(e) => {
177                        crate::log(&format!("[event-log] serialize failed: {}", e));
178                        continue;
179                    }
180                };
181                buf.push(b'\n');
182                if bytes_written + buf.len() as u64 > MAX_DISK_BYTES {
183                    // Rotate: drop the writer, rename current →
184                    // .old (overwriting), reopen fresh. Failures
185                    // here are non-fatal; we log and keep writing
186                    // to the existing file (it'll just exceed cap).
187                    drop(file);
188                    if let Err(e) = tokio::fs::rename(&path, &rotated_path).await {
189                        crate::log(&format!("[event-log] rotation rename failed: {} — continuing without rotation", e));
190                    }
191                    file = match open_for_append(&path).await {
192                        Ok(f) => f,
193                        Err(e) => {
194                            crate::log(&format!(
195                                "[event-log] post-rotation open failed: {} — disk persistence stopping",
196                                e
197                            ));
198                            return;
199                        }
200                    };
201                    bytes_written = 0;
202                }
203                if let Err(e) = file.write_all(&buf).await {
204                    crate::log(&format!("[event-log] write failed: {} — dropping event from disk stream", e));
205                    continue;
206                }
207                // Phase E.1a — durable: fsync per append so events
208                // written before a crash survive it. Required by
209                // Phase E §6.4 for srv's bootstrap-replay correctness.
210                // Latency cost: ~ms per event vs microseconds without
211                // sync. Acceptable for our event volume (~10–50 per
212                // user action). Trade-off considered: batched fsync
213                // (group commit). Skipping for E.1a — premature
214                // optimization; revisit if profiling shows a hot path.
215                if let Err(e) = file.sync_data().await {
216                    crate::log(&format!("[event-log] sync_data failed: {} — event written but not fsynced", e));
217                }
218                bytes_written += buf.len() as u64;
219            }
220            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
221                crate::log(&format!("[event-log] disk writer lagged, missed {} events", n));
222            }
223            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
224                let _ = file.flush().await;
225                return;
226            }
227        }
228    }
229}
230
231async fn open_for_append(path: &std::path::Path) -> std::io::Result<tokio::fs::File> {
232    tokio::fs::OpenOptions::new()
233        .create(true)
234        .append(true)
235        .open(path)
236        .await
237}
238
239/// Extract the `version` field from any Event variant. Mirrors
240/// reducer.rs::extract_version (which is test-only; the prod code
241/// path needs its own copy here). When a new variant is added that
242/// carries a version, add it here too — the unreachable arm catches
243/// future-variant compile errors.
244fn event_version(e: &Event) -> u64 {
245    match e {
246        Event::ProcessSpawned { version, .. }
247        | Event::ProcessExited { version, .. }
248        | Event::LifecyclePhaseChanged { version, .. }
249        | Event::Registered { version, .. }
250        | Event::Pong { version, .. }
251        | Event::WindowOpened { version, .. }
252        | Event::WindowClosed { version, .. }
253        | Event::PoolWindowAdded { version, .. }
254        | Event::PoolWindowRemoved { version, .. }
255        | Event::PoolWindowPromoted { version, .. }
256        | Event::PanesReaped { version, .. }
257        | Event::PoolDrained { version, .. }
258        | Event::PoolNotLast { version, .. }
259        | Event::WindowInstanceAssigned { version, .. }
260        | Event::WindowInstanceReleased { version, .. }
261        | Event::BackendWindowIdRegistered { version, .. }
262        | Event::BackendWindowIdUnregistered { version, .. }
263        | Event::DriftDetected { version, .. }
264        | Event::HwndDriftDetected { version, .. }
265        | Event::CorrectiveWindowMove { version, .. }
266        | Event::HostShouldQuit { version, .. }
267        | Event::Snapshot { version, .. }
268        | Event::EventList { version, .. }
269        | Event::SagaStarted { version, .. }
270        | Event::SagaCompleted { version, .. }
271        | Event::SagaFailed { version, .. }
272        | Event::SrvSnapshot { version, .. }
273        | Event::WorkspaceCreated { version, .. }
274        | Event::WorkspaceDeleted { version, .. }
275        | Event::TabCreated { version, .. }
276        | Event::TabDeleted { version, .. }
277        | Event::ActiveTabChanged { version, .. }
278        | Event::TabReordered { version, .. }
279        | Event::SrvWindowOpened { version, .. }
280        | Event::SrvWindowClosed { version, .. }
281        | Event::SrvWindowWorkspaceChanged { version, .. }
282        | Event::TabsReorderedBulk { version, .. }
283        | Event::WorkspaceRenamed { version, .. }
284        | Event::TabRenamed { version, .. }
285        | Event::WorkspaceMetaUpdated { version, .. }
286        | Event::TabMetaUpdated { version, .. }
287        | Event::BlockMetaUpdated { version, .. }
288        | Event::TabMoved { version, .. }
289        | Event::BlockMoved { version, .. }
290        | Event::BlockCreated { version, .. }
291        | Event::BlockDeleted { version, .. }
292        | Event::FocusedNodeChanged { version, .. }
293        | Event::MagnifiedNodeChanged { version, .. }
294        | Event::SagaActionFailed { version, .. }
295        | Event::Error { version, .. }
296        // Phase E.4.B — layout tree events.
297        | Event::LayoutNodeInserted { version, .. }
298        | Event::LayoutNodeInsertedAtIndex { version, .. }
299        | Event::LayoutNodeDeleted { version, .. }
300        | Event::LayoutNodeMoved { version, .. }
301        | Event::LayoutNodesSwapped { version, .. }
302        | Event::LayoutNodesResized { version, .. }
303        | Event::LayoutNodeReplaced { version, .. }
304        | Event::LayoutSplitHorizontalApplied { version, .. }
305        | Event::LayoutSplitVerticalApplied { version, .. }
306        | Event::LayoutCleared { version, .. }
307        | Event::LayoutTreeReplaced { version, .. }
308        | Event::WindowMetaUpdated { version, .. } => *version,
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use agentmux_common::ipc::{ClientKind, LifecyclePhase};
316
317    fn lifecycle_event(v: u64) -> Event {
318        Event::LifecyclePhaseChanged {
319            from: LifecyclePhase::Starting,
320            to: LifecyclePhase::Running,
321            version: v,
322        }
323    }
324
325    fn registered_event(v: u64) -> Event {
326        Event::Registered {
327            client_id: 1,
328            launcher_pid: 1,
329            launcher_version: "test".into(),
330            version: v,
331        }
332    }
333
334    fn process_spawned_event(v: u64) -> Event {
335        Event::ProcessSpawned {
336            pid: 100,
337            kind: ClientKind::Host,
338            client_version: "0.0.0".into(),
339            version: v,
340        }
341    }
342
343    #[test]
344    fn append_grows_ring_until_cap_then_evicts_oldest() {
345        let log = EventLog::new(None);
346        for v in 1..=(MAX_RING_EVENTS as u64 + 5) {
347            log.append(lifecycle_event(v));
348        }
349        let ring = log.ring.lock().unwrap();
350        assert_eq!(ring.len(), MAX_RING_EVENTS);
351        // Oldest 5 should have been evicted; first should be v=6.
352        assert_eq!(event_version(ring.front().unwrap()), 6);
353        // Newest is v = MAX_RING_EVENTS + 5.
354        assert_eq!(
355            event_version(ring.back().unwrap()),
356            MAX_RING_EVENTS as u64 + 5
357        );
358    }
359
360    #[test]
361    fn events_since_returns_only_versions_strictly_greater() {
362        let log = EventLog::new(None);
363        for v in [1u64, 2, 3, 4, 5] {
364            log.append(lifecycle_event(v));
365        }
366        let replay = log.events_since(2);
367        assert_eq!(replay.len(), 3, "expected v=3,4,5; got {:?}", replay);
368        assert_eq!(event_version(&replay[0]), 3);
369        assert_eq!(event_version(&replay[2]), 5);
370    }
371
372    #[test]
373    fn events_since_zero_returns_all() {
374        let log = EventLog::new(None);
375        for v in [1u64, 2, 3] {
376            log.append(registered_event(v));
377        }
378        let replay = log.events_since(0);
379        assert_eq!(replay.len(), 3);
380    }
381
382    #[test]
383    fn events_since_at_or_above_max_returns_empty() {
384        let log = EventLog::new(None);
385        log.append(process_spawned_event(5));
386        log.append(process_spawned_event(6));
387        let replay = log.events_since(10);
388        assert!(replay.is_empty());
389    }
390
391    #[test]
392    fn replay_truncated_detects_missed_events() {
393        let log = EventLog::new(None);
394        // Fill past capacity to force eviction.
395        for v in 1..=(MAX_RING_EVENTS as u64 + 100) {
396            log.append(lifecycle_event(v));
397        }
398        // Subscriber asks for events since v=5; ring's oldest is now v=101.
399        // The subscriber missed v=6..=100 — the replay slice covers v=101..,
400        // and `replay_truncated(5)` reports the gap.
401        assert!(log.replay_truncated(5));
402        // Subscriber asking from a version newer than ring's oldest sees
403        // no truncation.
404        let oldest = MAX_RING_EVENTS as u64 + 100 - MAX_RING_EVENTS as u64 + 1;
405        assert!(!log.replay_truncated(oldest));
406    }
407
408    #[test]
409    fn replay_truncated_on_empty_log_is_false() {
410        let log = EventLog::new(None);
411        assert!(!log.replay_truncated(0));
412        assert!(!log.replay_truncated(100));
413    }
414}