1use std::collections::VecDeque;
36use std::path::PathBuf;
37use std::sync::Mutex;
38
39use agentmux_common::ipc::Event;
40use tokio::io::AsyncWriteExt;
41
42const MAX_RING_EVENTS: usize = 4096;
49
50const MAX_DISK_BYTES: u64 = 8 * 1024 * 1024;
55
56#[derive(Debug)]
69pub struct EventLog {
70 ring: Mutex<VecDeque<Event>>,
71 disk_path: Option<PathBuf>,
72}
73
74impl EventLog {
75 pub fn new(disk_path: Option<PathBuf>) -> Self {
80 Self {
81 ring: Mutex::new(VecDeque::with_capacity(MAX_RING_EVENTS)),
82 disk_path,
83 }
84 }
85
86 pub fn append(&self, event: Event) {
90 let mut ring = self.ring.lock().expect("event-log ring mutex poisoned");
91 if ring.len() == MAX_RING_EVENTS {
92 ring.pop_front();
93 }
94 ring.push_back(event);
95 }
96
97 pub fn events_since(&self, since: u64) -> Vec<Event> {
106 let ring = self.ring.lock().expect("event-log ring mutex poisoned");
107 ring.iter()
108 .filter(|e| event_version(e) > since)
109 .cloned()
110 .collect()
111 }
112
113 pub fn replay_truncated(&self, since: u64) -> bool {
119 let ring = self.ring.lock().expect("event-log ring mutex poisoned");
120 match ring.front() {
121 Some(oldest) => event_version(oldest) > since.saturating_add(1),
129 None => false,
130 }
131 }
132
133 pub fn disk_path(&self) -> Option<&PathBuf> {
136 self.disk_path.as_ref()
137 }
138}
139
140pub async fn run_disk_writer(
149 log: std::sync::Arc<EventLog>,
150 mut events_rx: tokio::sync::broadcast::Receiver<Event>,
151) {
152 let path = match log.disk_path() {
153 Some(p) => p.clone(),
154 None => return, };
156 let rotated_path = path.with_extension("log.old");
157
158 let mut file = match open_for_append(&path).await {
159 Ok(f) => f,
160 Err(e) => {
161 tracing::warn!(target: "event-log", "{}", format!(
162 "[event-log] cannot open {} for append: {} — disk persistence disabled",
163 path.display(),
164 e
165 ));
166 return;
167 }
168 };
169 let mut bytes_written = file.metadata().await.map(|m| m.len()).unwrap_or(0);
170
171 loop {
172 match events_rx.recv().await {
173 Ok(event) => {
174 let mut buf = match serde_json::to_vec(&event) {
175 Ok(b) => b,
176 Err(e) => {
177 tracing::warn!(target: "event-log", "{}", format!("[event-log] serialize failed: {}", e));
178 continue;
179 }
180 };
181 buf.push(b'\n');
182 if bytes_written + buf.len() as u64 > MAX_DISK_BYTES {
183 drop(file);
188 if let Err(e) = tokio::fs::rename(&path, &rotated_path).await {
189 tracing::warn!(target: "event-log", "{}", format!("[event-log] rotation rename failed: {} — continuing without rotation", e));
190 }
191 file = match open_for_append(&path).await {
192 Ok(f) => f,
193 Err(e) => {
194 tracing::warn!(target: "event-log", "{}", format!(
195 "[event-log] post-rotation open failed: {} — disk persistence stopping",
196 e
197 ));
198 return;
199 }
200 };
201 bytes_written = 0;
202 }
203 if let Err(e) = file.write_all(&buf).await {
204 tracing::warn!(target: "event-log", "{}", format!("[event-log] write failed: {} — dropping event from disk stream", e));
205 continue;
206 }
207 if let Err(e) = file.sync_data().await {
216 tracing::warn!(target: "event-log", "{}", format!("[event-log] sync_data failed: {} — event written but not fsynced", e));
217 }
218 bytes_written += buf.len() as u64;
219 }
220 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
221 tracing::warn!(target: "event-log", "{}", format!("[event-log] disk writer lagged, missed {} events", n));
222 }
223 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
224 let _ = file.flush().await;
225 return;
226 }
227 }
228 }
229}
230
231async fn open_for_append(path: &std::path::Path) -> std::io::Result<tokio::fs::File> {
232 tokio::fs::OpenOptions::new()
233 .create(true)
234 .append(true)
235 .open(path)
236 .await
237}
238
239fn event_version(e: &Event) -> u64 {
245 match e {
246 Event::ProcessSpawned { version, .. }
247 | Event::ProcessExited { version, .. }
248 | Event::LifecyclePhaseChanged { version, .. }
249 | Event::Registered { version, .. }
250 | Event::Pong { version, .. }
251 | Event::WindowOpened { version, .. }
252 | Event::WindowClosed { version, .. }
253 | Event::PoolWindowAdded { version, .. }
254 | Event::PoolWindowRemoved { version, .. }
255 | Event::PoolWindowPromoted { version, .. }
256 | Event::PanesReaped { version, .. }
257 | Event::PoolDrained { version, .. }
258 | Event::PoolNotLast { version, .. }
259 | Event::WindowInstanceAssigned { version, .. }
260 | Event::WindowInstanceReleased { version, .. }
261 | Event::BackendWindowIdRegistered { version, .. }
262 | Event::BackendWindowIdUnregistered { version, .. }
263 | Event::DriftDetected { version, .. }
264 | Event::HwndDriftDetected { version, .. }
265 | Event::CorrectiveWindowMove { version, .. }
266 | Event::HostShouldQuit { version, .. }
267 | Event::Snapshot { version, .. }
268 | Event::EventList { version, .. }
269 | Event::SagaStarted { version, .. }
270 | Event::SagaCompleted { version, .. }
271 | Event::SagaFailed { version, .. }
272 | Event::SrvSnapshot { version, .. }
273 | Event::WorkspaceCreated { version, .. }
274 | Event::WorkspaceDeleted { version, .. }
275 | Event::TabCreated { version, .. }
276 | Event::TabDeleted { version, .. }
277 | Event::ActiveTabChanged { version, .. }
278 | Event::TabReordered { version, .. }
279 | Event::SrvWindowOpened { version, .. }
280 | Event::SrvWindowClosed { version, .. }
281 | Event::SrvWindowWorkspaceChanged { version, .. }
282 | Event::TabsReorderedBulk { version, .. }
283 | Event::WorkspaceRenamed { version, .. }
284 | Event::TabRenamed { version, .. }
285 | Event::WorkspaceMetaUpdated { version, .. }
286 | Event::WindowMetaUpdated { version, .. }
287 | Event::TabMetaUpdated { version, .. }
288 | Event::BlockMetaUpdated { version, .. }
289 | Event::TabMoved { version, .. }
290 | Event::BlockMoved { version, .. }
291 | Event::BlockCreated { version, .. }
292 | Event::BlockDeleted { version, .. }
293 | Event::FocusedNodeChanged { version, .. }
294 | Event::MagnifiedNodeChanged { version, .. }
295 | Event::SagaActionFailed { version, .. }
296 | Event::Error { version, .. }
297 | Event::LayoutNodeInserted { version, .. }
299 | Event::LayoutNodeInsertedAtIndex { version, .. }
300 | Event::LayoutNodeDeleted { version, .. }
301 | Event::LayoutNodeMoved { version, .. }
302 | Event::LayoutNodesSwapped { version, .. }
303 | Event::LayoutNodesResized { version, .. }
304 | Event::LayoutNodeReplaced { version, .. }
305 | Event::LayoutSplitHorizontalApplied { version, .. }
306 | Event::LayoutSplitVerticalApplied { version, .. }
307 | Event::LayoutCleared { version, .. }
308 | Event::LayoutTreeReplaced { version, .. } => *version,
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use agentmux_common::ipc::{ClientKind, LifecyclePhase};
316
317 fn lifecycle_event(v: u64) -> Event {
318 Event::LifecyclePhaseChanged {
319 from: LifecyclePhase::Starting,
320 to: LifecyclePhase::Running,
321 version: v,
322 }
323 }
324
325 fn registered_event(v: u64) -> Event {
326 Event::Registered {
327 client_id: 1,
328 launcher_pid: 1,
329 launcher_version: "test".into(),
330 version: v,
331 }
332 }
333
334 fn process_spawned_event(v: u64) -> Event {
335 Event::ProcessSpawned {
336 pid: 100,
337 kind: ClientKind::Host,
338 client_version: "0.0.0".into(),
339 version: v,
340 }
341 }
342
343 #[test]
344 fn append_grows_ring_until_cap_then_evicts_oldest() {
345 let log = EventLog::new(None);
346 for v in 1..=(MAX_RING_EVENTS as u64 + 5) {
347 log.append(lifecycle_event(v));
348 }
349 let ring = log.ring.lock().unwrap();
350 assert_eq!(ring.len(), MAX_RING_EVENTS);
351 assert_eq!(event_version(ring.front().unwrap()), 6);
353 assert_eq!(
355 event_version(ring.back().unwrap()),
356 MAX_RING_EVENTS as u64 + 5
357 );
358 }
359
360 #[test]
361 fn events_since_returns_only_versions_strictly_greater() {
362 let log = EventLog::new(None);
363 for v in [1u64, 2, 3, 4, 5] {
364 log.append(lifecycle_event(v));
365 }
366 let replay = log.events_since(2);
367 assert_eq!(replay.len(), 3, "expected v=3,4,5; got {:?}", replay);
368 assert_eq!(event_version(&replay[0]), 3);
369 assert_eq!(event_version(&replay[2]), 5);
370 }
371
372 #[test]
373 fn events_since_zero_returns_all() {
374 let log = EventLog::new(None);
375 for v in [1u64, 2, 3] {
376 log.append(registered_event(v));
377 }
378 let replay = log.events_since(0);
379 assert_eq!(replay.len(), 3);
380 }
381
382 #[test]
383 fn events_since_at_or_above_max_returns_empty() {
384 let log = EventLog::new(None);
385 log.append(process_spawned_event(5));
386 log.append(process_spawned_event(6));
387 let replay = log.events_since(10);
388 assert!(replay.is_empty());
389 }
390
391 #[test]
392 fn replay_truncated_detects_missed_events() {
393 let log = EventLog::new(None);
394 for v in 1..=(MAX_RING_EVENTS as u64 + 100) {
396 log.append(lifecycle_event(v));
397 }
398 assert!(log.replay_truncated(5));
402 let oldest = MAX_RING_EVENTS as u64 + 100 - MAX_RING_EVENTS as u64 + 1;
405 assert!(!log.replay_truncated(oldest));
406 }
407
408 #[test]
409 fn replay_truncated_on_empty_log_is_false() {
410 let log = EventLog::new(None);
411 assert!(!log.replay_truncated(0));
412 assert!(!log.replay_truncated(100));
413 }
414}