1use std::time::Duration;
27
28use agentmux_common::ipc::{ClientKind, Command, Event};
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30
31#[cfg(target_os = "windows")]
32use tokio::net::windows::named_pipe::ClientOptions;
33
34const OBSERVATION_WINDOW: Duration = Duration::from_secs(2);
35
36#[cfg(target_os = "windows")]
40pub async fn run_wrr_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
41 let version = env!("CARGO_PKG_VERSION");
42
43 let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
44 .map_err(|e| format!("path resolution failed: {}", e))?;
45 let dir_hash = crate::hash::data_dir_hash16(&paths.data_dir);
46 let pipe_path = crate::ipc::pipe_name(&dir_hash);
47
48 println!("AgentMux diagnostic — connecting to {}", pipe_path);
49 println!("Data dir: {}", paths.data_dir.display());
50
51 let client = ClientOptions::new()
52 .open(&pipe_path)
53 .map_err(|e| format!(
54 "could not open pipe {}: {} (is AgentMux running for this data dir?)",
55 pipe_path, e
56 ))?;
57 println!("Connected. Registering as Tool client...\n");
58
59 let (read_half, mut write_half) = tokio::io::split(client);
60
61 let register = Command::Register {
62 kind: ClientKind::Tool,
63 pid: std::process::id(),
64 version: version.to_string(),
65 };
66 let mut buf = serde_json::to_vec(®ister)
67 .map_err(|e| format!("serialize Register: {}", e))?;
68 buf.push(b'\n');
69 write_half.write_all(&buf).await
70 .map_err(|e| format!("send Register: {}", e))?;
71 write_half.flush().await
72 .map_err(|e| format!("flush Register: {}", e))?;
73
74 let mut buf = serde_json::to_vec(&Command::GetSnapshot)
78 .map_err(|e| format!("serialize GetSnapshot: {}", e))?;
79 buf.push(b'\n');
80 write_half.write_all(&buf).await
81 .map_err(|e| format!("send GetSnapshot: {}", e))?;
82 write_half.flush().await
83 .map_err(|e| format!("flush GetSnapshot: {}", e))?;
84
85 let mut buf = serde_json::to_vec(&Command::GetEvents { since: 0 })
91 .map_err(|e| format!("serialize GetEvents: {}", e))?;
92 buf.push(b'\n');
93 write_half.write_all(&buf).await
94 .map_err(|e| format!("send GetEvents: {}", e))?;
95 write_half.flush().await
96 .map_err(|e| format!("flush GetEvents: {}", e))?;
97
98 let reader = BufReader::new(read_half);
103 let mut lines = reader.lines();
104 let mut events: Vec<Event> = Vec::new();
105 let deadline = tokio::time::Instant::now() + OBSERVATION_WINDOW;
106
107 loop {
108 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
109 if remaining.is_zero() {
110 break;
111 }
112 match tokio::time::timeout(remaining, lines.next_line()).await {
113 Ok(Ok(Some(line))) if line.trim().is_empty() => continue,
114 Ok(Ok(Some(line))) => {
115 match serde_json::from_str::<Event>(&line) {
116 Ok(evt) => events.push(evt),
117 Err(e) => eprintln!("[warn] could not parse event: {} ({})", e, line),
118 }
119 }
120 Ok(Ok(None)) => {
121 eprintln!("[warn] pipe closed before observation window elapsed");
122 break;
123 }
124 Ok(Err(e)) => {
125 return Err(format!("read error: {}", e));
126 }
127 Err(_) => break, }
129 }
130
131 let goodbye = match serde_json::to_vec(&Command::Goodbye) {
138 Ok(mut b) => { b.push(b'\n'); b }
139 Err(_) => Vec::new(),
140 };
141 if !goodbye.is_empty() {
142 let _ = write_half.write_all(&goodbye).await;
143 let _ = write_half.flush().await;
144 }
145
146 print_summary(&events);
147 Ok(())
148}
149
150#[cfg(not(target_os = "windows"))]
154pub async fn run_wrr_diag(_launcher_exe_dir: &std::path::Path) -> Result<(), String> {
155 Err("--diag wrr is Windows-only (Phase 7 will add Unix domain socket parity)".to_string())
156}
157
158#[cfg(target_os = "windows")]
172pub async fn run_srv_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
173 let version = env!("CARGO_PKG_VERSION");
174
175 let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
176 .map_err(|e| format!("path resolution failed: {}", e))?;
177 let dir_hash = crate::hash::data_dir_hash16(&paths.data_dir);
178 let pipe_path = crate::ipc::srv_pipe_name(&dir_hash);
179
180 println!("AgentMux srv diagnostic — connecting to {}", pipe_path);
181 println!("Data dir: {}", paths.data_dir.display());
182
183 let client = ClientOptions::new().open(&pipe_path).map_err(|e| {
184 format!(
185 "could not open srv pipe {}: {} (is AgentMux running for this data dir? \
186 srv may not be bound in `task dev` mode)",
187 pipe_path, e
188 )
189 })?;
190 println!("Connected. Registering as Tool client...\n");
191
192 let (read_half, mut write_half) = tokio::io::split(client);
193
194 let register = Command::Register {
195 kind: ClientKind::Tool,
196 pid: std::process::id(),
197 version: version.to_string(),
198 };
199 let mut buf = serde_json::to_vec(®ister)
200 .map_err(|e| format!("serialize Register: {}", e))?;
201 buf.push(b'\n');
202 write_half
203 .write_all(&buf)
204 .await
205 .map_err(|e| format!("send Register: {}", e))?;
206 write_half
207 .flush()
208 .await
209 .map_err(|e| format!("flush Register: {}", e))?;
210
211 let mut buf = serde_json::to_vec(&Command::GetSrvSnapshot)
213 .map_err(|e| format!("serialize GetSrvSnapshot: {}", e))?;
214 buf.push(b'\n');
215 write_half
216 .write_all(&buf)
217 .await
218 .map_err(|e| format!("send GetSrvSnapshot: {}", e))?;
219 write_half
220 .flush()
221 .await
222 .map_err(|e| format!("flush GetSrvSnapshot: {}", e))?;
223
224 let mut buf = serde_json::to_vec(&Command::GetEvents { since: 0 })
230 .map_err(|e| format!("serialize GetEvents: {}", e))?;
231 buf.push(b'\n');
232 write_half
233 .write_all(&buf)
234 .await
235 .map_err(|e| format!("send GetEvents: {}", e))?;
236 write_half
237 .flush()
238 .await
239 .map_err(|e| format!("flush GetEvents: {}", e))?;
240
241 let reader = BufReader::new(read_half);
242 let mut lines = reader.lines();
243 let mut events: Vec<Event> = Vec::new();
244 let deadline = tokio::time::Instant::now() + OBSERVATION_WINDOW;
245
246 loop {
247 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
248 if remaining.is_zero() {
249 break;
250 }
251 match tokio::time::timeout(remaining, lines.next_line()).await {
252 Ok(Ok(Some(line))) if line.trim().is_empty() => continue,
253 Ok(Ok(Some(line))) => match serde_json::from_str::<Event>(&line) {
254 Ok(evt) => events.push(evt),
255 Err(e) => eprintln!("[warn] could not parse event: {} ({})", e, line),
256 },
257 Ok(Ok(None)) => {
258 eprintln!("[warn] srv pipe closed before observation window elapsed");
259 break;
260 }
261 Ok(Err(e)) => {
262 return Err(format!("read error: {}", e));
263 }
264 Err(_) => break,
265 }
266 }
267
268 let goodbye = match serde_json::to_vec(&Command::Goodbye) {
271 Ok(mut b) => {
272 b.push(b'\n');
273 b
274 }
275 Err(_) => Vec::new(),
276 };
277 if !goodbye.is_empty() {
278 let _ = write_half.write_all(&goodbye).await;
279 let _ = write_half.flush().await;
280 }
281
282 print_srv_summary(&events);
283 Ok(())
284}
285
286#[cfg(not(target_os = "windows"))]
287pub async fn run_srv_diag(_launcher_exe_dir: &std::path::Path) -> Result<(), String> {
288 Err("--diag srv is Windows-only (Phase 7 will add Unix domain socket parity)".to_string())
289}
290
291#[cfg(target_os = "windows")]
292fn print_srv_summary(events: &[Event]) {
293 let snapshot: Vec<&Event> = events
294 .iter()
295 .filter(|e| matches!(e, Event::SrvSnapshot { .. }))
296 .collect();
297 let replay: Vec<&Event> = events
298 .iter()
299 .filter(|e| matches!(e, Event::EventList { .. }))
300 .collect();
301 let stream: Vec<&Event> = events
302 .iter()
303 .filter(|e| !matches!(e, Event::SrvSnapshot { .. } | Event::EventList { .. }))
304 .collect();
305
306 if let Some(Event::SrvSnapshot {
307 version,
308 lifecycle,
309 workspaces,
310 tabs,
311 active_tabs,
312 blocks,
313 }) = snapshot.last().copied()
314 {
315 println!("=== SrvSnapshot (event_version={}) ===", version);
316 println!("Lifecycle: {:?}", lifecycle);
317 println!();
318 println!("Workspaces ({}):", workspaces.len());
319 if workspaces.is_empty() {
320 println!(" (none)");
321 } else {
322 for (id, name) in workspaces {
323 let active = active_tabs
324 .iter()
325 .find(|(ws, _)| ws == id)
326 .map(|(_, t)| t.as_str())
327 .unwrap_or("—");
328 let tab_count = tabs.iter().filter(|(_, ws, _)| ws == id).count();
329 println!(
330 " {:36} name={:<20} tabs={} active={}",
331 id, name, tab_count, active
332 );
333 }
334 }
335 println!();
336 println!("Tabs ({}):", tabs.len());
337 if tabs.is_empty() {
338 println!(" (none)");
339 } else {
340 for (tab_id, ws_id, name) in tabs {
341 let block_count = blocks.iter().filter(|(_, t)| t == tab_id).count();
342 println!(
343 " {:36} ws={:36} name={:<16} blocks={}",
344 tab_id, ws_id, name, block_count
345 );
346 }
347 }
348 println!();
349 println!("Blocks ({}):", blocks.len());
350 if blocks.is_empty() {
351 println!(" (none)");
352 } else {
353 for (block_id, tab_id) in blocks {
354 println!(" {:36} tab={}", block_id, tab_id);
355 }
356 }
357 println!();
358 } else {
359 println!("(SrvSnapshot not received — srv pipe may not be bound, or older srv build)");
360 println!();
361 }
362
363 if let Some(Event::EventList {
364 events: replay_events,
365 version,
366 }) = replay.last().copied()
367 {
368 println!(
369 "=== EventList replay (event_version={}, {} event(s)) ===",
370 version,
371 replay_events.len()
372 );
373 if replay_events.is_empty() {
374 println!("(empty — srv ring + event log contained no events)");
375 } else {
376 let to_show = replay_events.iter().rev().take(20).collect::<Vec<_>>();
378 let n = replay_events.len();
379 let skipped = n.saturating_sub(to_show.len());
380 if skipped > 0 {
381 println!("(showing last 20 of {})", n);
382 }
383 for (i, evt) in to_show.iter().rev().enumerate() {
384 println!(" [{}] {}", skipped + i, format_srv_event(evt));
385 }
386 }
387 println!();
388 }
389
390 use std::collections::BTreeMap;
392 let mut saga_counts: BTreeMap<&'static str, u32> = BTreeMap::new();
393 let stream_or_replay = events
394 .iter()
395 .filter(|e| !matches!(e, Event::SrvSnapshot { .. }))
396 .flat_map(|e| match e {
397 Event::EventList { events: inner, .. } => inner.iter().collect::<Vec<_>>(),
398 other => vec![other],
399 });
400 for evt in stream_or_replay {
401 match evt {
402 Event::SagaStarted { .. } => *saga_counts.entry("SagaStarted").or_insert(0) += 1,
403 Event::SagaCompleted { .. } => {
404 *saga_counts.entry("SagaCompleted").or_insert(0) += 1
405 }
406 Event::SagaFailed { .. } => *saga_counts.entry("SagaFailed").or_insert(0) += 1,
407 _ => {}
408 }
409 }
410 if !saga_counts.is_empty() {
411 println!("=== Saga lifecycle (across snapshot + replay + stream) ===");
412 for (kind, count) in &saga_counts {
413 println!(" {:>4}× {}", count, kind);
414 }
415 println!();
416 }
417
418 println!(
419 "=== Live stream observed in {}s ===",
420 OBSERVATION_WINDOW.as_secs()
421 );
422 if stream.is_empty() {
423 println!("(no events — srv reducer is idle.)");
424 return;
425 }
426 let mut counts: BTreeMap<&'static str, u32> = BTreeMap::new();
427 for evt in &stream {
428 *counts.entry(srv_event_kind_name(evt)).or_insert(0) += 1;
429 }
430 println!("By kind:");
431 for (kind, count) in &counts {
432 println!(" {:>4}× {}", count, kind);
433 }
434 println!();
435 println!("Full stream (oldest first):");
436 for (i, evt) in stream.iter().enumerate() {
437 println!(" [{}] {}", i, format_srv_event(evt));
438 }
439}
440
441#[cfg(target_os = "windows")]
442fn srv_event_kind_name(e: &Event) -> &'static str {
443 match e {
444 Event::Registered { .. } => "Registered",
445 Event::ProcessSpawned { .. } => "ProcessSpawned",
446 Event::ProcessExited { .. } => "ProcessExited",
447 Event::LifecyclePhaseChanged { .. } => "LifecyclePhaseChanged",
448 Event::SagaStarted { .. } => "SagaStarted",
449 Event::SagaCompleted { .. } => "SagaCompleted",
450 Event::SagaFailed { .. } => "SagaFailed",
451 Event::WorkspaceCreated { .. } => "WorkspaceCreated",
452 Event::WorkspaceDeleted { .. } => "WorkspaceDeleted",
453 Event::WorkspaceRenamed { .. } => "WorkspaceRenamed",
454 Event::WorkspaceMetaUpdated { .. } => "WorkspaceMetaUpdated",
455 Event::TabCreated { .. } => "TabCreated",
456 Event::TabDeleted { .. } => "TabDeleted",
457 Event::TabRenamed { .. } => "TabRenamed",
458 Event::TabReordered { .. } => "TabReordered",
459 Event::TabsReorderedBulk { .. } => "TabsReorderedBulk",
460 Event::TabMoved { .. } => "TabMoved",
461 Event::TabMetaUpdated { .. } => "TabMetaUpdated",
462 Event::ActiveTabChanged { .. } => "ActiveTabChanged",
463 Event::BlockCreated { .. } => "BlockCreated",
464 Event::BlockDeleted { .. } => "BlockDeleted",
465 Event::BlockMoved { .. } => "BlockMoved",
466 Event::BlockMetaUpdated { .. } => "BlockMetaUpdated",
467 Event::SrvWindowOpened { .. } => "SrvWindowOpened",
468 Event::SrvWindowClosed { .. } => "SrvWindowClosed",
469 Event::SrvWindowWorkspaceChanged { .. } => "SrvWindowWorkspaceChanged",
470 Event::SrvSnapshot { .. } => "SrvSnapshot",
471 Event::EventList { .. } => "EventList",
472 Event::Error { .. } => "Error",
473 _ => "Other",
474 }
475}
476
477#[cfg(target_os = "windows")]
478fn format_srv_event(e: &Event) -> String {
479 match e {
480 Event::SagaStarted { saga_id, name, version } => {
481 format!("v={:>3} SagaStarted id={} name={}", version, saga_id, name)
482 }
483 Event::SagaCompleted { saga_id, version } => {
484 format!("v={:>3} SagaCompleted id={}", version, saga_id)
485 }
486 Event::SagaFailed { saga_id, reason, version } => {
487 format!("v={:>3} SagaFailed id={} reason={}", version, saga_id, reason)
488 }
489 Event::WorkspaceCreated { workspace_id, name, version } => {
490 format!("v={:>3} WorkspaceCreated id={} name={}", version, workspace_id, name)
491 }
492 Event::WorkspaceDeleted { workspace_id, version } => {
493 format!("v={:>3} WorkspaceDeleted id={}", version, workspace_id)
494 }
495 Event::TabCreated { workspace_id, tab_id, name, version } => {
496 format!("v={:>3} TabCreated tab={} ws={} name={}", version, tab_id, workspace_id, name)
497 }
498 Event::TabMoved {
499 tab_id,
500 src_workspace_id,
501 dst_workspace_id,
502 dst_index,
503 version,
504 ..
505 } => format!(
506 "v={:>3} TabMoved tab={} {} → {} idx={}",
507 version, tab_id, src_workspace_id, dst_workspace_id, dst_index
508 ),
509 Event::BlockMoved { block_id, src_tab_id, dst_tab_id, dst_index, version } => format!(
510 "v={:>3} BlockMoved blk={} {} → {} idx={}",
511 version, block_id, src_tab_id, dst_tab_id, dst_index
512 ),
513 Event::Error { code, message, version, .. } => {
514 format!("v={:>3} Error code={:?} msg={}", version, code, message)
515 }
516 other => serde_json::to_string(other).unwrap_or_else(|_| format!("{:?}", other)),
517 }
518}
519
520#[cfg(target_os = "windows")]
521fn print_summary(events: &[Event]) {
522 let snapshot: Vec<&Event> = events
527 .iter()
528 .filter(|e| matches!(e, Event::Snapshot { .. }))
529 .collect();
530 let replay: Vec<&Event> = events
531 .iter()
532 .filter(|e| matches!(e, Event::EventList { .. }))
533 .collect();
534 let stream: Vec<&Event> = events
535 .iter()
536 .filter(|e| {
537 !matches!(
538 e,
539 Event::Snapshot { .. } | Event::EventList { .. }
540 )
541 })
542 .collect();
543
544 if let Some(Event::Snapshot {
552 version,
553 lifecycle,
554 windows,
555 pool,
556 instance_registry,
557 backend_window_ids,
558 monitors,
559 }) = snapshot.last().copied()
560 {
561 println!("=== Snapshot (event_version={}) ===", version);
562 println!("Lifecycle: {:?}", lifecycle);
563 println!("Monitors: {} ({:?})", monitors.len(), monitors);
564 println!();
565 println!("Windows ({}):", windows.len());
566 if windows.is_empty() {
567 println!(" (none)");
568 } else {
569 for w in windows {
570 let inst = instance_registry
571 .iter()
572 .find(|(l, _)| l == &w.label)
573 .map(|(_, n)| format!("#{}", n))
574 .unwrap_or_else(|| "—".to_string());
575 let backend = backend_window_ids
576 .iter()
577 .find(|(l, _)| l == &w.label)
578 .map(|(_, b)| b.as_str())
579 .unwrap_or("—");
580 println!(
581 " {:>4} {:30} kind={:?} hwnd={:?} visible={} iconic={} fg_seen={} backend={}",
582 inst, w.label, w.kind, w.hwnd, w.visible, w.iconic,
583 w.foregrounded_since_open, backend
584 );
585 }
586 }
587 println!();
588 println!("Pool ({}): {:?}", pool.len(), pool);
589 println!();
590 } else {
591 println!("(snapshot not received — server may be older than D.1)");
592 println!();
593 }
594
595 if let Some(Event::EventList { events: replay_events, version }) = replay.last().copied() {
600 println!("=== EventList replay (event_version={}, {} event(s)) ===", version, replay_events.len());
601 if replay_events.is_empty() {
602 println!("(empty — launcher's in-memory ring contained no events with version > 0)");
603 } else {
604 for (i, evt) in replay_events.iter().enumerate() {
605 println!(" [{}] {}", i, format_event(evt));
606 }
607 }
608 println!();
609 }
610
611 println!("=== Live stream observed in {}s ===", OBSERVATION_WINDOW.as_secs());
612 if stream.is_empty() {
613 println!("(no events — instance is idle.)");
614 return;
615 }
616 use std::collections::BTreeMap;
617 let mut counts: BTreeMap<&'static str, u32> = BTreeMap::new();
618 for evt in &stream {
619 *counts.entry(event_kind_name(evt)).or_insert(0) += 1;
620 }
621 println!("By kind:");
622 for (kind, count) in &counts {
623 println!(" {:>4}× {}", count, kind);
624 }
625 println!();
626 println!("Full stream (oldest first):");
627 for (i, evt) in stream.iter().enumerate() {
628 println!(" [{}] {}", i, format_event(evt));
629 }
630}
631
632#[cfg(target_os = "windows")]
633fn event_kind_name(e: &Event) -> &'static str {
634 match e {
635 Event::Registered { .. } => "Registered",
636 Event::Pong { .. } => "Pong",
637 Event::ProcessSpawned { .. } => "ProcessSpawned",
638 Event::ProcessExited { .. } => "ProcessExited",
639 Event::LifecyclePhaseChanged { .. } => "LifecyclePhaseChanged",
640 Event::WindowOpened { .. } => "WindowOpened",
641 Event::WindowClosed { .. } => "WindowClosed",
642 Event::WindowInstanceAssigned { .. } => "WindowInstanceAssigned",
643 Event::WindowInstanceReleased { .. } => "WindowInstanceReleased",
644 Event::PoolWindowAdded { .. } => "PoolWindowAdded",
645 Event::PoolWindowRemoved { .. } => "PoolWindowRemoved",
646 Event::PoolWindowPromoted { .. } => "PoolWindowPromoted",
647 Event::BackendWindowIdRegistered { .. } => "BackendWindowIdRegistered",
648 Event::BackendWindowIdUnregistered { .. } => "BackendWindowIdUnregistered",
649 Event::DriftDetected { .. } => "DriftDetected",
650 Event::HwndDriftDetected { .. } => "HwndDriftDetected",
651 Event::CorrectiveWindowMove { .. } => "CorrectiveWindowMove",
652 Event::HostShouldQuit { .. } => "HostShouldQuit",
653 Event::Snapshot { .. } => "Snapshot",
654 Event::EventList { .. } => "EventList",
655 Event::SagaStarted { .. } => "SagaStarted",
656 Event::SagaCompleted { .. } => "SagaCompleted",
657 Event::SagaFailed { .. } => "SagaFailed",
658 _ => "Other",
659 }
660}
661
662#[cfg(target_os = "windows")]
663fn format_event(e: &Event) -> String {
664 match e {
668 Event::WindowOpened { label, kind, parent_label, version } => format!(
669 "v={:>3} WindowOpened label={} kind={:?} parent={:?}",
670 version, label, kind, parent_label
671 ),
672 Event::WindowClosed { label, version, crash_detected } => format!(
673 "v={:>3} WindowClosed label={} crash_detected={}", version, label, crash_detected
674 ),
675 Event::WindowInstanceAssigned { label, num, version } => format!(
676 "v={:>3} InstanceAssigned label={} num={}", version, label, num
677 ),
678 Event::HwndDriftDetected { kind, label, hwnd, severity, detail, version } => format!(
679 "v={:>3} HwndDriftDetected kind={:?} label={:?} hwnd={:?} severity={:?} — {}",
680 version, kind, label, hwnd, severity, detail
681 ),
682 Event::HostShouldQuit { version } => format!(
683 "v={:>3} HostShouldQuit", version
684 ),
685 other => serde_json::to_string(other).unwrap_or_else(|_| format!("{:?}", other)),
686 }
687}
688
689#[cfg(target_os = "windows")]
708pub async fn run_sagas_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
709 run_sagas_diag_impl(launcher_exe_dir).await
710}
711
712#[cfg(not(target_os = "windows"))]
713pub async fn run_sagas_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
714 run_sagas_diag_impl(launcher_exe_dir).await
721}
722
723async fn run_sagas_diag_impl(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
724 let version = env!("CARGO_PKG_VERSION");
725
726 let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
727 .map_err(|e| format!("path resolution failed: {}", e))?;
728 let saga_log_path = crate::data_dir::launcher_saga_log_path_read_only(&paths.data_dir);
732
733 println!("AgentMux launcher saga diagnostic");
734 println!("Data dir: {}", paths.data_dir.display());
735 println!("Saga log: {}", saga_log_path.display());
736 println!();
737
738 if !saga_log_path.exists() {
739 println!(
740 "(no saga log at {} — launcher hasn't written one yet, or this isn't an AgentMux data dir)",
741 saga_log_path.display()
742 );
743 return Ok(());
744 }
745
746 let log = crate::saga::LauncherSagaLog::open_read_only(&saga_log_path)
749 .map_err(|e| format!("open saga log {:?} (read-only): {}", saga_log_path, e))?;
750
751 let snapshot = log
752 .snapshot_recent(50)
753 .map_err(|e| format!("snapshot_recent: {}", e))?;
754 let unresolved = log
755 .unresolved_sagas()
756 .map_err(|e| format!("unresolved_sagas: {}", e))?;
757
758 if snapshot.is_empty() {
759 println!("(saga log is empty)");
760 return Ok(());
761 }
762
763 println!("Recent launcher sagas (last {}):", snapshot.len());
764 for s in &snapshot {
765 let recovered_marker = if s.state == "failed_compensation" {
766 " (recovered on restart)"
767 } else {
768 ""
769 };
770 let ended = s.ended_at.as_deref().unwrap_or("—");
771 println!(
772 " saga_id={} name={} state={}{}",
773 s.saga_id, s.name, s.state, recovered_marker
774 );
775 println!(
776 " started={} ended={} steps_progressed={}",
777 s.started_at, ended, s.step_count
778 );
779 if let Some(reason) = &s.failure_reason {
780 println!(" failure: {}", reason);
781 }
782 if !s.input_json.is_empty() && s.input_json != "null" {
783 println!(" input: {}", s.input_json);
784 }
785
786 let steps: Vec<crate::saga::log::UnresolvedLauncherStep> = if let Some(u) =
793 unresolved.iter().find(|u| u.saga_id == s.saga_id)
794 {
795 u.steps.clone()
796 } else if s.state == "failed_compensation" {
797 match log.get_saga_steps(s.saga_id) {
802 Ok(steps) => steps,
803 Err(e) => {
804 println!(" [step query failed: {} — saga rows may exist but cannot be read]", e);
805 Vec::new()
806 }
807 }
808 } else {
809 Vec::new()
810 };
811 if !steps.is_empty() {
812 println!(" steps:");
813 for step in &steps {
814 let target = step.target.as_deref().unwrap_or("—");
815 let cmd_snippet = step
816 .cmd_json
817 .as_deref()
818 .map(|c| truncate_for_display(c, 120))
819 .unwrap_or_else(|| "—".into());
820 println!(
821 " {:>3} {:30} target={:<14} state={:<10} cmd={}",
822 step.step_index, step.name, target, step.state, cmd_snippet
823 );
824 if let Some(reason) = &step.failure_reason {
825 println!(" failure: {}", reason);
826 }
827 }
828 if let Some(in_flight) = steps.iter().rev().find(|st| st.state == "pending") {
833 println!(
834 " [step {} was in-flight when launcher exited]",
835 in_flight.step_index
836 );
837 }
838 }
839 println!();
840 }
841
842 let recovered_count = snapshot
843 .iter()
844 .filter(|s| s.state == "failed_compensation")
845 .count();
846 if recovered_count > 0 {
847 println!(
848 "Note: {} saga(s) marked `failed_compensation` by the startup recovery walker.",
849 recovered_count
850 );
851 println!("These were unresolved when the launcher last exited; their effects on host state");
852 println!("may be partially applied. Inspect step rows above to see what was attempted.");
853 }
854
855 Ok(())
856}
857
858fn truncate_for_display(s: &str, max_chars: usize) -> String {
862 if s.chars().count() <= max_chars {
863 s.to_string()
864 } else {
865 let truncated: String = s.chars().take(max_chars).collect();
866 format!("{}…", truncated)
867 }
868}
869
870#[cfg(test)]
871mod sagas_diag_tests {
872 use super::*;
873 use crate::saga::{LauncherSagaLog, PipeTarget};
874 use agentmux_common::ipc::{Command, Event};
875
876 #[test]
877 fn truncate_for_display_shortens_long_strings_and_keeps_short_ones() {
878 assert_eq!(truncate_for_display("hi", 10), "hi");
879 let long = "a".repeat(200);
880 let truncated = truncate_for_display(&long, 50);
881 assert_eq!(truncated.chars().count(), 51);
883 assert!(truncated.ends_with('…'));
884 }
885
886 #[test]
892 fn sagas_diag_fixture_log_has_expected_summary_fields() {
893 let log = LauncherSagaLog::open_in_memory().unwrap();
894
895 log.start_saga(
897 1,
898 "window_cleanup_cascade",
899 &serde_json::json!({"label": "win-1"}),
900 )
901 .unwrap();
902 log.start_step(
903 1,
904 0,
905 "issue_cmd_host_reap_panes",
906 PipeTarget::Host,
907 &Command::Ping { nonce: 1 },
908 )
909 .unwrap();
910 log.finish_step(1, 0, &Event::Pong { nonce: 1, version: 1 })
911 .unwrap();
912 log.terminate_saga(1, crate::saga::log::SagaOutcome::Completed)
913 .unwrap();
914
915 log.start_saga(
918 2,
919 "window_cleanup_cascade",
920 &serde_json::json!({"label": "win-3"}),
921 )
922 .unwrap();
923 log.start_step(
924 2,
925 0,
926 "issue_cmd_host_reap_panes",
927 PipeTarget::Host,
928 &Command::Ping { nonce: 2 },
929 )
930 .unwrap();
931 log.finish_step(2, 0, &Event::Pong { nonce: 2, version: 1 })
932 .unwrap();
933 log.start_step(
934 2,
935 1,
936 "issue_cmd_host_drain_pool",
937 PipeTarget::Host,
938 &Command::Ping { nonce: 3 },
939 )
940 .unwrap();
941 log.mark_failed_compensation(2, "launcher restarted while saga in state 'running'")
944 .unwrap();
945
946 let snapshot = log.snapshot_recent(50).unwrap();
947 assert_eq!(snapshot.len(), 2);
948
949 let s2 = &snapshot[0];
951 let s1 = &snapshot[1];
952 assert_eq!(s1.saga_id, 1);
953 assert_eq!(s1.state, "completed");
954 assert_eq!(s1.step_count, 1);
955
956 assert_eq!(s2.saga_id, 2);
957 assert_eq!(s2.state, "failed_compensation");
958 assert_eq!(s2.step_count, 1);
962 assert!(s2
963 .failure_reason
964 .as_deref()
965 .unwrap_or("")
966 .contains("launcher restarted"));
967
968 let unresolved = log.unresolved_sagas().unwrap();
976 assert!(unresolved.iter().all(|u| u.saga_id != 2));
977 }
978}