agentmux_launcher/
diag.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase B.8 — `agentmux.exe --diag wrr` Tool client.
5//
6// Connects to a running AgentMux instance's IPC pipe as a `Tool`
7// client, registers, captures events for a short observation
8// window, prints a human-readable summary to stdout, and exits.
9//
10// Operator visibility into the launcher's reducer state without
11// needing a debugger or log scraping. The output IS the diagnosis
12// surface for "what does the launcher think is happening right now."
13//
14// Why Tool kind: doesn't drive the reducer's lifecycle (Host
15// drives Starting → Running), doesn't trigger OrphanInstance saga,
16// gets all events broadcast on the pipe without affecting the
17// running instance.
18//
19// Phase D.1 — `Command::GetSnapshot` is sent right after Register
20// to capture the launcher's canonical state in one round-trip; the
21// reply (`Event::Snapshot`) is printed prominently before the live
22// event stream. Phase D.2 + D.3 will add a persisted event log + a
23// `since: u64` parameter for delta replay; until then a snapshot is
24// "as-of-now" only.
25
26use std::time::Duration;
27
28use agentmux_common::ipc::{ClientKind, Command, Event};
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30
31#[cfg(target_os = "windows")]
32use tokio::net::windows::named_pipe::ClientOptions;
33
34const OBSERVATION_WINDOW: Duration = Duration::from_secs(2);
35
36/// Entry point for diag mode. Returns `Ok(())` on success (printed
37/// summary, exit 0) or `Err(message)` for the caller to surface and
38/// exit non-zero.
39#[cfg(target_os = "windows")]
40pub async fn run_wrr_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
41    let version = env!("CARGO_PKG_VERSION");
42
43    let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
44        .map_err(|e| format!("path resolution failed: {}", e))?;
45    let dir_hash = crate::hash::data_dir_hash16(&paths.data_dir);
46    let pipe_path = crate::ipc::pipe_name(&dir_hash);
47
48    println!("AgentMux diagnostic — connecting to {}", pipe_path);
49    println!("Data dir: {}", paths.data_dir.display());
50
51    let client = ClientOptions::new()
52        .open(&pipe_path)
53        .map_err(|e| format!(
54            "could not open pipe {}: {} (is AgentMux running for this data dir?)",
55            pipe_path, e
56        ))?;
57    println!("Connected. Registering as Tool client...\n");
58
59    let (read_half, mut write_half) = tokio::io::split(client);
60
61    let register = Command::Register {
62        kind: ClientKind::Tool,
63        pid: std::process::id(),
64        version: version.to_string(),
65    };
66    let mut buf = serde_json::to_vec(&register)
67        .map_err(|e| format!("serialize Register: {}", e))?;
68    buf.push(b'\n');
69    write_half.write_all(&buf).await
70        .map_err(|e| format!("send Register: {}", e))?;
71    write_half.flush().await
72        .map_err(|e| format!("flush Register: {}", e))?;
73
74    // Phase D.1 — request a state snapshot. Reply arrives on the
75    // broadcast bus alongside the Register reply; we filter for it
76    // in the event collection below.
77    let mut buf = serde_json::to_vec(&Command::GetSnapshot)
78        .map_err(|e| format!("serialize GetSnapshot: {}", e))?;
79    buf.push(b'\n');
80    write_half.write_all(&buf).await
81        .map_err(|e| format!("send GetSnapshot: {}", e))?;
82    write_half.flush().await
83        .map_err(|e| format!("flush GetSnapshot: {}", e))?;
84
85    // Phase D.3 — request a replay of all retained events
86    // (`since: 0` means everything in the in-memory ring). Useful
87    // for operators to see the full event history of the running
88    // launcher without having to wait for new activity. The reply
89    // is an `Event::EventList`; rendered in the summary below.
90    let mut buf = serde_json::to_vec(&Command::GetEvents { since: 0 })
91        .map_err(|e| format!("serialize GetEvents: {}", e))?;
92    buf.push(b'\n');
93    write_half.write_all(&buf).await
94        .map_err(|e| format!("send GetEvents: {}", e))?;
95    write_half.flush().await
96        .map_err(|e| format!("flush GetEvents: {}", e))?;
97
98    // Read events for OBSERVATION_WINDOW. The launcher's IPC server
99    // (post-B.8) broadcasts every reducer event on a server-wide
100    // bus; this connection's fanout writes them to us. We collect
101    // everything that arrives in the window and print a summary.
102    let reader = BufReader::new(read_half);
103    let mut lines = reader.lines();
104    let mut events: Vec<Event> = Vec::new();
105    let deadline = tokio::time::Instant::now() + OBSERVATION_WINDOW;
106
107    loop {
108        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
109        if remaining.is_zero() {
110            break;
111        }
112        match tokio::time::timeout(remaining, lines.next_line()).await {
113            Ok(Ok(Some(line))) if line.trim().is_empty() => continue,
114            Ok(Ok(Some(line))) => {
115                match serde_json::from_str::<Event>(&line) {
116                    Ok(evt) => events.push(evt),
117                    Err(e) => eprintln!("[warn] could not parse event: {} ({})", e, line),
118                }
119            }
120            Ok(Ok(None)) => {
121                eprintln!("[warn] pipe closed before observation window elapsed");
122                break;
123            }
124            Ok(Err(e)) => {
125                return Err(format!("read error: {}", e));
126            }
127            Err(_) => break, // timeout → observation window closed
128        }
129    }
130
131    // Phase B.8 — send Goodbye so the server emits ProcessExited and
132    // marks our PID record as Exited. Without this, repeated
133    // `--diag wrr` invocations accumulate stale `Running` records;
134    // when Windows reuses a PID, the next Register fails with
135    // `AlreadyRegistered`. Best-effort: errors are non-fatal because
136    // we're about to exit anyway. (codex P2 PR #605.)
137    let goodbye = match serde_json::to_vec(&Command::Goodbye) {
138        Ok(mut b) => { b.push(b'\n'); b }
139        Err(_) => Vec::new(),
140    };
141    if !goodbye.is_empty() {
142        let _ = write_half.write_all(&goodbye).await;
143        let _ = write_half.flush().await;
144    }
145
146    print_summary(&events);
147    Ok(())
148}
149
150/// Stub for non-Windows platforms. Phase 7 cross-platform IPC will
151/// implement Unix domain sockets; until then `--diag wrr` is
152/// Windows-only.
153#[cfg(not(target_os = "windows"))]
154pub async fn run_wrr_diag(_launcher_exe_dir: &std::path::Path) -> Result<(), String> {
155    Err("--diag wrr is Windows-only (Phase 7 will add Unix domain socket parity)".to_string())
156}
157
158/// Phase E.7 — `agentmux.exe --diag srv` Tool client.
159///
160/// Connects to the srv pipe (`\\.\pipe\agentmux-{hash}\srv-command`),
161/// registers as a Tool client, sends `GetSrvSnapshot` + `GetEvents`,
162/// captures events for the same 2 s observation window as
163/// `--diag wrr`, prints a human-readable summary, exits.
164///
165/// Provides operator visibility into the srv reducer's canonical
166/// state (workspaces / tabs / blocks / windows / saga lifecycle) +
167/// recent event activity. Mirrors `run_wrr_diag` for the launcher;
168/// the differences are the pipe path, the snapshot variant
169/// (`Event::SrvSnapshot`), and the formatter for srv-specific event
170/// kinds.
171#[cfg(target_os = "windows")]
172pub async fn run_srv_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
173    let version = env!("CARGO_PKG_VERSION");
174
175    let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
176        .map_err(|e| format!("path resolution failed: {}", e))?;
177    let dir_hash = crate::hash::data_dir_hash16(&paths.data_dir);
178    let pipe_path = crate::ipc::srv_pipe_name(&dir_hash);
179
180    println!("AgentMux srv diagnostic — connecting to {}", pipe_path);
181    println!("Data dir: {}", paths.data_dir.display());
182
183    let client = ClientOptions::new().open(&pipe_path).map_err(|e| {
184        format!(
185            "could not open srv pipe {}: {} (is AgentMux running for this data dir? \
186             srv may not be bound in `task dev` mode)",
187            pipe_path, e
188        )
189    })?;
190    println!("Connected. Registering as Tool client...\n");
191
192    let (read_half, mut write_half) = tokio::io::split(client);
193
194    let register = Command::Register {
195        kind: ClientKind::Tool,
196        pid: std::process::id(),
197        version: version.to_string(),
198    };
199    let mut buf = serde_json::to_vec(&register)
200        .map_err(|e| format!("serialize Register: {}", e))?;
201    buf.push(b'\n');
202    write_half
203        .write_all(&buf)
204        .await
205        .map_err(|e| format!("send Register: {}", e))?;
206    write_half
207        .flush()
208        .await
209        .map_err(|e| format!("flush Register: {}", e))?;
210
211    // Phase E.1b — request the srv reducer's canonical state.
212    let mut buf = serde_json::to_vec(&Command::GetSrvSnapshot)
213        .map_err(|e| format!("serialize GetSrvSnapshot: {}", e))?;
214    buf.push(b'\n');
215    write_half
216        .write_all(&buf)
217        .await
218        .map_err(|e| format!("send GetSrvSnapshot: {}", e))?;
219    write_half
220        .flush()
221        .await
222        .map_err(|e| format!("flush GetSrvSnapshot: {}", e))?;
223
224    // Phase D.3 / E.7 — request a replay of all retained events from
225    // the srv side's in-memory ring + on-disk event log. Useful for
226    // operators wanting recent srv reducer activity (saga lifecycle
227    // events, workspace/tab/block mutations) without having to wait
228    // for new activity.
229    let mut buf = serde_json::to_vec(&Command::GetEvents { since: 0 })
230        .map_err(|e| format!("serialize GetEvents: {}", e))?;
231    buf.push(b'\n');
232    write_half
233        .write_all(&buf)
234        .await
235        .map_err(|e| format!("send GetEvents: {}", e))?;
236    write_half
237        .flush()
238        .await
239        .map_err(|e| format!("flush GetEvents: {}", e))?;
240
241    let reader = BufReader::new(read_half);
242    let mut lines = reader.lines();
243    let mut events: Vec<Event> = Vec::new();
244    let deadline = tokio::time::Instant::now() + OBSERVATION_WINDOW;
245
246    loop {
247        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
248        if remaining.is_zero() {
249            break;
250        }
251        match tokio::time::timeout(remaining, lines.next_line()).await {
252            Ok(Ok(Some(line))) if line.trim().is_empty() => continue,
253            Ok(Ok(Some(line))) => match serde_json::from_str::<Event>(&line) {
254                Ok(evt) => events.push(evt),
255                Err(e) => eprintln!("[warn] could not parse event: {} ({})", e, line),
256            },
257            Ok(Ok(None)) => {
258                eprintln!("[warn] srv pipe closed before observation window elapsed");
259                break;
260            }
261            Ok(Err(e)) => {
262                return Err(format!("read error: {}", e));
263            }
264            Err(_) => break,
265        }
266    }
267
268    // Best-effort Goodbye so the srv server marks our PID Exited
269    // (same rationale as run_wrr_diag).
270    let goodbye = match serde_json::to_vec(&Command::Goodbye) {
271        Ok(mut b) => {
272            b.push(b'\n');
273            b
274        }
275        Err(_) => Vec::new(),
276    };
277    if !goodbye.is_empty() {
278        let _ = write_half.write_all(&goodbye).await;
279        let _ = write_half.flush().await;
280    }
281
282    print_srv_summary(&events);
283    Ok(())
284}
285
286#[cfg(not(target_os = "windows"))]
287pub async fn run_srv_diag(_launcher_exe_dir: &std::path::Path) -> Result<(), String> {
288    Err("--diag srv is Windows-only (Phase 7 will add Unix domain socket parity)".to_string())
289}
290
291#[cfg(target_os = "windows")]
292fn print_srv_summary(events: &[Event]) {
293    let snapshot: Vec<&Event> = events
294        .iter()
295        .filter(|e| matches!(e, Event::SrvSnapshot { .. }))
296        .collect();
297    let replay: Vec<&Event> = events
298        .iter()
299        .filter(|e| matches!(e, Event::EventList { .. }))
300        .collect();
301    let stream: Vec<&Event> = events
302        .iter()
303        .filter(|e| !matches!(e, Event::SrvSnapshot { .. } | Event::EventList { .. }))
304        .collect();
305
306    if let Some(Event::SrvSnapshot {
307        version,
308        lifecycle,
309        workspaces,
310        tabs,
311        active_tabs,
312        blocks,
313    }) = snapshot.last().copied()
314    {
315        println!("=== SrvSnapshot (event_version={}) ===", version);
316        println!("Lifecycle: {:?}", lifecycle);
317        println!();
318        println!("Workspaces ({}):", workspaces.len());
319        if workspaces.is_empty() {
320            println!("  (none)");
321        } else {
322            for (id, name) in workspaces {
323                let active = active_tabs
324                    .iter()
325                    .find(|(ws, _)| ws == id)
326                    .map(|(_, t)| t.as_str())
327                    .unwrap_or("—");
328                let tab_count = tabs.iter().filter(|(_, ws, _)| ws == id).count();
329                println!(
330                    "  {:36} name={:<20} tabs={} active={}",
331                    id, name, tab_count, active
332                );
333            }
334        }
335        println!();
336        println!("Tabs ({}):", tabs.len());
337        if tabs.is_empty() {
338            println!("  (none)");
339        } else {
340            for (tab_id, ws_id, name) in tabs {
341                let block_count = blocks.iter().filter(|(_, t)| t == tab_id).count();
342                println!(
343                    "  {:36} ws={:36} name={:<16} blocks={}",
344                    tab_id, ws_id, name, block_count
345                );
346            }
347        }
348        println!();
349        println!("Blocks ({}):", blocks.len());
350        if blocks.is_empty() {
351            println!("  (none)");
352        } else {
353            for (block_id, tab_id) in blocks {
354                println!("  {:36} tab={}", block_id, tab_id);
355            }
356        }
357        println!();
358    } else {
359        println!("(SrvSnapshot not received — srv pipe may not be bound, or older srv build)");
360        println!();
361    }
362
363    if let Some(Event::EventList {
364        events: replay_events,
365        version,
366    }) = replay.last().copied()
367    {
368        println!(
369            "=== EventList replay (event_version={}, {} event(s)) ===",
370            version,
371            replay_events.len()
372        );
373        if replay_events.is_empty() {
374            println!("(empty — srv ring + event log contained no events)");
375        } else {
376            // Show the last 20 events to keep output manageable.
377            let to_show = replay_events.iter().rev().take(20).collect::<Vec<_>>();
378            let n = replay_events.len();
379            let skipped = n.saturating_sub(to_show.len());
380            if skipped > 0 {
381                println!("(showing last 20 of {})", n);
382            }
383            for (i, evt) in to_show.iter().rev().enumerate() {
384                println!("  [{}] {}", skipped + i, format_srv_event(evt));
385            }
386        }
387        println!();
388    }
389
390    // Saga activity is the most operator-relevant signal here.
391    use std::collections::BTreeMap;
392    let mut saga_counts: BTreeMap<&'static str, u32> = BTreeMap::new();
393    let stream_or_replay = events
394        .iter()
395        .filter(|e| !matches!(e, Event::SrvSnapshot { .. }))
396        .flat_map(|e| match e {
397            Event::EventList { events: inner, .. } => inner.iter().collect::<Vec<_>>(),
398            other => vec![other],
399        });
400    for evt in stream_or_replay {
401        match evt {
402            Event::SagaStarted { .. } => *saga_counts.entry("SagaStarted").or_insert(0) += 1,
403            Event::SagaCompleted { .. } => {
404                *saga_counts.entry("SagaCompleted").or_insert(0) += 1
405            }
406            Event::SagaFailed { .. } => *saga_counts.entry("SagaFailed").or_insert(0) += 1,
407            _ => {}
408        }
409    }
410    if !saga_counts.is_empty() {
411        println!("=== Saga lifecycle (across snapshot + replay + stream) ===");
412        for (kind, count) in &saga_counts {
413            println!("  {:>4}× {}", count, kind);
414        }
415        println!();
416    }
417
418    println!(
419        "=== Live stream observed in {}s ===",
420        OBSERVATION_WINDOW.as_secs()
421    );
422    if stream.is_empty() {
423        println!("(no events — srv reducer is idle.)");
424        return;
425    }
426    let mut counts: BTreeMap<&'static str, u32> = BTreeMap::new();
427    for evt in &stream {
428        *counts.entry(srv_event_kind_name(evt)).or_insert(0) += 1;
429    }
430    println!("By kind:");
431    for (kind, count) in &counts {
432        println!("  {:>4}× {}", count, kind);
433    }
434    println!();
435    println!("Full stream (oldest first):");
436    for (i, evt) in stream.iter().enumerate() {
437        println!("  [{}] {}", i, format_srv_event(evt));
438    }
439}
440
441#[cfg(target_os = "windows")]
442fn srv_event_kind_name(e: &Event) -> &'static str {
443    match e {
444        Event::Registered { .. } => "Registered",
445        Event::ProcessSpawned { .. } => "ProcessSpawned",
446        Event::ProcessExited { .. } => "ProcessExited",
447        Event::LifecyclePhaseChanged { .. } => "LifecyclePhaseChanged",
448        Event::SagaStarted { .. } => "SagaStarted",
449        Event::SagaCompleted { .. } => "SagaCompleted",
450        Event::SagaFailed { .. } => "SagaFailed",
451        Event::WorkspaceCreated { .. } => "WorkspaceCreated",
452        Event::WorkspaceDeleted { .. } => "WorkspaceDeleted",
453        Event::WorkspaceRenamed { .. } => "WorkspaceRenamed",
454        Event::WorkspaceMetaUpdated { .. } => "WorkspaceMetaUpdated",
455        Event::TabCreated { .. } => "TabCreated",
456        Event::TabDeleted { .. } => "TabDeleted",
457        Event::TabRenamed { .. } => "TabRenamed",
458        Event::TabReordered { .. } => "TabReordered",
459        Event::TabsReorderedBulk { .. } => "TabsReorderedBulk",
460        Event::TabMoved { .. } => "TabMoved",
461        Event::TabMetaUpdated { .. } => "TabMetaUpdated",
462        Event::ActiveTabChanged { .. } => "ActiveTabChanged",
463        Event::BlockCreated { .. } => "BlockCreated",
464        Event::BlockDeleted { .. } => "BlockDeleted",
465        Event::BlockMoved { .. } => "BlockMoved",
466        Event::BlockMetaUpdated { .. } => "BlockMetaUpdated",
467        Event::SrvWindowOpened { .. } => "SrvWindowOpened",
468        Event::SrvWindowClosed { .. } => "SrvWindowClosed",
469        Event::SrvWindowWorkspaceChanged { .. } => "SrvWindowWorkspaceChanged",
470        Event::SrvSnapshot { .. } => "SrvSnapshot",
471        Event::EventList { .. } => "EventList",
472        Event::Error { .. } => "Error",
473        _ => "Other",
474    }
475}
476
477#[cfg(target_os = "windows")]
478fn format_srv_event(e: &Event) -> String {
479    match e {
480        Event::SagaStarted { saga_id, name, version } => {
481            format!("v={:>3} SagaStarted        id={} name={}", version, saga_id, name)
482        }
483        Event::SagaCompleted { saga_id, version } => {
484            format!("v={:>3} SagaCompleted      id={}", version, saga_id)
485        }
486        Event::SagaFailed { saga_id, reason, version } => {
487            format!("v={:>3} SagaFailed         id={} reason={}", version, saga_id, reason)
488        }
489        Event::WorkspaceCreated { workspace_id, name, version } => {
490            format!("v={:>3} WorkspaceCreated   id={} name={}", version, workspace_id, name)
491        }
492        Event::WorkspaceDeleted { workspace_id, version } => {
493            format!("v={:>3} WorkspaceDeleted   id={}", version, workspace_id)
494        }
495        Event::TabCreated { workspace_id, tab_id, name, version } => {
496            format!("v={:>3} TabCreated         tab={} ws={} name={}", version, tab_id, workspace_id, name)
497        }
498        Event::TabMoved {
499            tab_id,
500            src_workspace_id,
501            dst_workspace_id,
502            dst_index,
503            version,
504            ..
505        } => format!(
506            "v={:>3} TabMoved           tab={} {} → {} idx={}",
507            version, tab_id, src_workspace_id, dst_workspace_id, dst_index
508        ),
509        Event::BlockMoved { block_id, src_tab_id, dst_tab_id, dst_index, version } => format!(
510            "v={:>3} BlockMoved         blk={} {} → {} idx={}",
511            version, block_id, src_tab_id, dst_tab_id, dst_index
512        ),
513        Event::Error { code, message, version, .. } => {
514            format!("v={:>3} Error              code={:?} msg={}", version, code, message)
515        }
516        other => serde_json::to_string(other).unwrap_or_else(|_| format!("{:?}", other)),
517    }
518}
519
520#[cfg(target_os = "windows")]
521fn print_summary(events: &[Event]) {
522    // Phase D.1 — pull the Snapshot out and print it first as the
523    // canonical "state now" view.
524    // Phase D.3 — pull the EventList replay out next; what's left
525    // is the live broadcast stream observed during the 2s window.
526    let snapshot: Vec<&Event> = events
527        .iter()
528        .filter(|e| matches!(e, Event::Snapshot { .. }))
529        .collect();
530    let replay: Vec<&Event> = events
531        .iter()
532        .filter(|e| matches!(e, Event::EventList { .. }))
533        .collect();
534    let stream: Vec<&Event> = events
535        .iter()
536        .filter(|e| {
537            !matches!(
538                e,
539                Event::Snapshot { .. } | Event::EventList { .. }
540            )
541        })
542        .collect();
543
544    // Pick the LATEST snapshot in the captured set. The IPC server
545    // broadcasts snapshots to all subscribers, so concurrent diag/Tool
546    // clients can produce multiple Snapshot events in this window;
547    // using `.first()` would show the oldest (potentially another
548    // client's stale view) rather than ours. `.last()` biases toward
549    // freshness — our own snapshot reply is monotonically the latest
550    // among captured events on the broadcast bus. (codex P2 PR #607.)
551    if let Some(Event::Snapshot {
552        version,
553        lifecycle,
554        windows,
555        pool,
556        instance_registry,
557        backend_window_ids,
558        monitors,
559    }) = snapshot.last().copied()
560    {
561        println!("=== Snapshot (event_version={}) ===", version);
562        println!("Lifecycle: {:?}", lifecycle);
563        println!("Monitors:  {} ({:?})", monitors.len(), monitors);
564        println!();
565        println!("Windows ({}):", windows.len());
566        if windows.is_empty() {
567            println!("  (none)");
568        } else {
569            for w in windows {
570                let inst = instance_registry
571                    .iter()
572                    .find(|(l, _)| l == &w.label)
573                    .map(|(_, n)| format!("#{}", n))
574                    .unwrap_or_else(|| "—".to_string());
575                let backend = backend_window_ids
576                    .iter()
577                    .find(|(l, _)| l == &w.label)
578                    .map(|(_, b)| b.as_str())
579                    .unwrap_or("—");
580                println!(
581                    "  {:>4} {:30} kind={:?} hwnd={:?} visible={} iconic={} fg_seen={} backend={}",
582                    inst, w.label, w.kind, w.hwnd, w.visible, w.iconic,
583                    w.foregrounded_since_open, backend
584                );
585            }
586        }
587        println!();
588        println!("Pool ({}): {:?}", pool.len(), pool);
589        println!();
590    } else {
591        println!("(snapshot not received — server may be older than D.1)");
592        println!();
593    }
594
595    // Phase D.3 — replay slice from the launcher's in-memory event
596    // ring. Useful for operators wanting recent history; for
597    // resyncing subscribers, this is everything since their last
598    // seen version.
599    if let Some(Event::EventList { events: replay_events, version }) = replay.last().copied() {
600        println!("=== EventList replay (event_version={}, {} event(s)) ===", version, replay_events.len());
601        if replay_events.is_empty() {
602            println!("(empty — launcher's in-memory ring contained no events with version > 0)");
603        } else {
604            for (i, evt) in replay_events.iter().enumerate() {
605                println!("  [{}] {}", i, format_event(evt));
606            }
607        }
608        println!();
609    }
610
611    println!("=== Live stream observed in {}s ===", OBSERVATION_WINDOW.as_secs());
612    if stream.is_empty() {
613        println!("(no events — instance is idle.)");
614        return;
615    }
616    use std::collections::BTreeMap;
617    let mut counts: BTreeMap<&'static str, u32> = BTreeMap::new();
618    for evt in &stream {
619        *counts.entry(event_kind_name(evt)).or_insert(0) += 1;
620    }
621    println!("By kind:");
622    for (kind, count) in &counts {
623        println!("  {:>4}× {}", count, kind);
624    }
625    println!();
626    println!("Full stream (oldest first):");
627    for (i, evt) in stream.iter().enumerate() {
628        println!("  [{}] {}", i, format_event(evt));
629    }
630}
631
632#[cfg(target_os = "windows")]
633fn event_kind_name(e: &Event) -> &'static str {
634    match e {
635        Event::Registered { .. } => "Registered",
636        Event::Pong { .. } => "Pong",
637        Event::ProcessSpawned { .. } => "ProcessSpawned",
638        Event::ProcessExited { .. } => "ProcessExited",
639        Event::LifecyclePhaseChanged { .. } => "LifecyclePhaseChanged",
640        Event::WindowOpened { .. } => "WindowOpened",
641        Event::WindowClosed { .. } => "WindowClosed",
642        Event::WindowInstanceAssigned { .. } => "WindowInstanceAssigned",
643        Event::WindowInstanceReleased { .. } => "WindowInstanceReleased",
644        Event::PoolWindowAdded { .. } => "PoolWindowAdded",
645        Event::PoolWindowRemoved { .. } => "PoolWindowRemoved",
646        Event::PoolWindowPromoted { .. } => "PoolWindowPromoted",
647        Event::BackendWindowIdRegistered { .. } => "BackendWindowIdRegistered",
648        Event::BackendWindowIdUnregistered { .. } => "BackendWindowIdUnregistered",
649        Event::DriftDetected { .. } => "DriftDetected",
650        Event::HwndDriftDetected { .. } => "HwndDriftDetected",
651        Event::CorrectiveWindowMove { .. } => "CorrectiveWindowMove",
652        Event::HostShouldQuit { .. } => "HostShouldQuit",
653        Event::Snapshot { .. } => "Snapshot",
654        Event::EventList { .. } => "EventList",
655        Event::SagaStarted { .. } => "SagaStarted",
656        Event::SagaCompleted { .. } => "SagaCompleted",
657        Event::SagaFailed { .. } => "SagaFailed",
658        _ => "Other",
659    }
660}
661
662#[cfg(target_os = "windows")]
663fn format_event(e: &Event) -> String {
664    // Compact one-liner per event. Falls back to JSON for variants
665    // we don't have a custom formatter for so the operator still
666    // sees the data.
667    match e {
668        Event::WindowOpened { label, kind, parent_label, version } => format!(
669            "v={:>3} WindowOpened       label={} kind={:?} parent={:?}",
670            version, label, kind, parent_label
671        ),
672        Event::WindowClosed { label, version, crash_detected } => format!(
673            "v={:>3} WindowClosed       label={} crash_detected={}", version, label, crash_detected
674        ),
675        Event::WindowInstanceAssigned { label, num, version } => format!(
676            "v={:>3} InstanceAssigned   label={} num={}", version, label, num
677        ),
678        Event::HwndDriftDetected { kind, label, hwnd, severity, detail, version } => format!(
679            "v={:>3} HwndDriftDetected  kind={:?} label={:?} hwnd={:?} severity={:?} — {}",
680            version, kind, label, hwnd, severity, detail
681        ),
682        Event::HostShouldQuit { version } => format!(
683            "v={:>3} HostShouldQuit", version
684        ),
685        other => serde_json::to_string(other).unwrap_or_else(|_| format!("{:?}", other)),
686    }
687}
688
689// ---------------------------------------------------------------------------
690// LSD-3 — `agentmux.exe --diag sagas`
691//
692// Operator visibility into the launcher saga log
693// (`<data-dir>/db/launcher-sagas.db`). Unlike `--diag wrr` and
694// `--diag srv`, this command does NOT need a running launcher: the
695// SQLite log is a passive on-disk artefact. We open it read-only via
696// `LauncherSagaLog::open_read_only` and call `snapshot_recent(50)`. Useful when
697// the launcher won't start (operator wants to see what sagas were
698// in flight at the last crash) or post-mortem on a portable instance.
699//
700// Output mirrors the example in LSD spec §3.5 — saga header line per
701// saga, followed by step rows showing target + state + cmd snippet.
702// Sagas marked `failed_compensation` by the startup recovery walker
703// are flagged "(recovered on restart)" in the header so operators
704// can spot them at a glance.
705// ---------------------------------------------------------------------------
706
707#[cfg(target_os = "windows")]
708pub async fn run_sagas_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
709    run_sagas_diag_impl(launcher_exe_dir).await
710}
711
712#[cfg(not(target_os = "windows"))]
713pub async fn run_sagas_diag(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
714    // The saga log is a SQLite file with no platform-specific bits;
715    // the cross-platform parity goal for `--diag sagas` is "works
716    // wherever the launcher writes the log." Phase 7 will wire the
717    // POSIX data_dir resolution, but the rest of the formatter is
718    // already platform-agnostic, so we reuse the impl on every
719    // target.
720    run_sagas_diag_impl(launcher_exe_dir).await
721}
722
723async fn run_sagas_diag_impl(launcher_exe_dir: &std::path::Path) -> Result<(), String> {
724    let version = env!("CARGO_PKG_VERSION");
725
726    let paths = crate::data_dir::resolve_paths(launcher_exe_dir, version)
727        .map_err(|e| format!("path resolution failed: {}", e))?;
728    // `--diag sagas` is a passive on-disk inspector (see the doc
729    // comment for this function); use the read-only resolver so we
730    // don't trigger the legacy-file rename here. Reagent P2 on PR #932.
731    let saga_log_path = crate::data_dir::launcher_saga_log_path_read_only(&paths.data_dir);
732
733    println!("AgentMux launcher saga diagnostic");
734    println!("Data dir: {}", paths.data_dir.display());
735    println!("Saga log: {}", saga_log_path.display());
736    println!();
737
738    if !saga_log_path.exists() {
739        println!(
740            "(no saga log at {} — launcher hasn't written one yet, or this isn't an AgentMux data dir)",
741            saga_log_path.display()
742        );
743        return Ok(());
744    }
745
746    // Read-only open: an operator's diagnostic invocation must not
747    // mutate the log a running launcher owns. (codex P2 PR #647 round 3.)
748    let log = crate::saga::LauncherSagaLog::open_read_only(&saga_log_path)
749        .map_err(|e| format!("open saga log {:?} (read-only): {}", saga_log_path, e))?;
750
751    let snapshot = log
752        .snapshot_recent(50)
753        .map_err(|e| format!("snapshot_recent: {}", e))?;
754    let unresolved = log
755        .unresolved_sagas()
756        .map_err(|e| format!("unresolved_sagas: {}", e))?;
757
758    if snapshot.is_empty() {
759        println!("(saga log is empty)");
760        return Ok(());
761    }
762
763    println!("Recent launcher sagas (last {}):", snapshot.len());
764    for s in &snapshot {
765        let recovered_marker = if s.state == "failed_compensation" {
766            " (recovered on restart)"
767        } else {
768            ""
769        };
770        let ended = s.ended_at.as_deref().unwrap_or("—");
771        println!(
772            "  saga_id={} name={} state={}{}",
773            s.saga_id, s.name, s.state, recovered_marker
774        );
775        println!(
776            "    started={} ended={} steps_progressed={}",
777            s.started_at, ended, s.step_count
778        );
779        if let Some(reason) = &s.failure_reason {
780            println!("    failure: {}", reason);
781        }
782        if !s.input_json.is_empty() && s.input_json != "null" {
783            println!("    input: {}", s.input_json);
784        }
785
786        // Step rows. Surface step detail for both `unresolved` sagas
787        // AND `failed_compensation` sagas (recovered crashes) — the
788        // latter is the operator triage flow per LSD spec §3.5.
789        // (codex P1 PR #647 round 1: unresolved_sagas() filters
790        // out failed_compensation, so we fall back to a direct
791        // get_saga_steps query for those.)
792        let steps: Vec<crate::saga::log::UnresolvedLauncherStep> = if let Some(u) =
793            unresolved.iter().find(|u| u.saga_id == s.saga_id)
794        {
795            u.steps.clone()
796        } else if s.state == "failed_compensation" {
797            // Surface step-query failures rather than silently
798            // returning empty — operators need visibility into "why
799            // are step rows missing for this recovered saga".
800            // (codex P2 PR #647 round 3.)
801            match log.get_saga_steps(s.saga_id) {
802                Ok(steps) => steps,
803                Err(e) => {
804                    println!("    [step query failed: {} — saga rows may exist but cannot be read]", e);
805                    Vec::new()
806                }
807            }
808        } else {
809            Vec::new()
810        };
811        if !steps.is_empty() {
812            println!("    steps:");
813            for step in &steps {
814                let target = step.target.as_deref().unwrap_or("—");
815                let cmd_snippet = step
816                    .cmd_json
817                    .as_deref()
818                    .map(|c| truncate_for_display(c, 120))
819                    .unwrap_or_else(|| "—".into());
820                println!(
821                    "      {:>3}  {:30} target={:<14} state={:<10} cmd={}",
822                    step.step_index, step.name, target, step.state, cmd_snippet
823                );
824                if let Some(reason) = &step.failure_reason {
825                    println!("           failure: {}", reason);
826                }
827            }
828            // Pinpoint the in-flight step at crash time, mirroring
829            // the example in spec §3.5: "[step 2 was in-flight when
830            // launcher exited]". The step in `pending` state at the
831            // highest index is the one the saga was waiting on.
832            if let Some(in_flight) = steps.iter().rev().find(|st| st.state == "pending") {
833                println!(
834                    "      [step {} was in-flight when launcher exited]",
835                    in_flight.step_index
836                );
837            }
838        }
839        println!();
840    }
841
842    let recovered_count = snapshot
843        .iter()
844        .filter(|s| s.state == "failed_compensation")
845        .count();
846    if recovered_count > 0 {
847        println!(
848            "Note: {} saga(s) marked `failed_compensation` by the startup recovery walker.",
849            recovered_count
850        );
851        println!("These were unresolved when the launcher last exited; their effects on host state");
852        println!("may be partially applied. Inspect step rows above to see what was attempted.");
853    }
854
855    Ok(())
856}
857
858/// Trim a JSON snippet to `max_chars` for one-line display, appending
859/// `…` if it was truncated. Used to keep `--diag sagas` cmd columns
860/// readable when commands carry large payloads (e.g. block meta).
861fn truncate_for_display(s: &str, max_chars: usize) -> String {
862    if s.chars().count() <= max_chars {
863        s.to_string()
864    } else {
865        let truncated: String = s.chars().take(max_chars).collect();
866        format!("{}…", truncated)
867    }
868}
869
870#[cfg(test)]
871mod sagas_diag_tests {
872    use super::*;
873    use crate::saga::{LauncherSagaLog, PipeTarget};
874    use agentmux_common::ipc::{Command, Event};
875
876    #[test]
877    fn truncate_for_display_shortens_long_strings_and_keeps_short_ones() {
878        assert_eq!(truncate_for_display("hi", 10), "hi");
879        let long = "a".repeat(200);
880        let truncated = truncate_for_display(&long, 50);
881        // 50 a's + ellipsis.
882        assert_eq!(truncated.chars().count(), 51);
883        assert!(truncated.ends_with('…'));
884    }
885
886    /// Smoke test: build a fixture saga log, run `snapshot_recent`
887    /// and `unresolved_sagas`, and verify the formatter assembles the
888    /// expected fields without panicking. We don't snapshot stdout
889    /// (printing is fine; what matters is the data we'd print is
890    /// what the spec describes).
891    #[test]
892    fn sagas_diag_fixture_log_has_expected_summary_fields() {
893        let log = LauncherSagaLog::open_in_memory().unwrap();
894
895        // Saga 1 — completed cleanly.
896        log.start_saga(
897            1,
898            "window_cleanup_cascade",
899            &serde_json::json!({"label": "win-1"}),
900        )
901        .unwrap();
902        log.start_step(
903            1,
904            0,
905            "issue_cmd_host_reap_panes",
906            PipeTarget::Host,
907            &Command::Ping { nonce: 1 },
908        )
909        .unwrap();
910        log.finish_step(1, 0, &Event::Pong { nonce: 1, version: 1 })
911            .unwrap();
912        log.terminate_saga(1, crate::saga::log::SagaOutcome::Completed)
913            .unwrap();
914
915        // Saga 2 — recovered (failed_compensation), with a pending
916        // step row to demonstrate the "[step N was in-flight ..]" line.
917        log.start_saga(
918            2,
919            "window_cleanup_cascade",
920            &serde_json::json!({"label": "win-3"}),
921        )
922        .unwrap();
923        log.start_step(
924            2,
925            0,
926            "issue_cmd_host_reap_panes",
927            PipeTarget::Host,
928            &Command::Ping { nonce: 2 },
929        )
930        .unwrap();
931        log.finish_step(2, 0, &Event::Pong { nonce: 2, version: 1 })
932            .unwrap();
933        log.start_step(
934            2,
935            1,
936            "issue_cmd_host_drain_pool",
937            PipeTarget::Host,
938            &Command::Ping { nonce: 3 },
939        )
940        .unwrap();
941        // Step 1 stays pending — saga 2 was in-flight at "crash" time.
942        // Then run the recovery walker manually.
943        log.mark_failed_compensation(2, "launcher restarted while saga in state 'running'")
944            .unwrap();
945
946        let snapshot = log.snapshot_recent(50).unwrap();
947        assert_eq!(snapshot.len(), 2);
948
949        // snapshot_recent returns most-recent-first.
950        let s2 = &snapshot[0];
951        let s1 = &snapshot[1];
952        assert_eq!(s1.saga_id, 1);
953        assert_eq!(s1.state, "completed");
954        assert_eq!(s1.step_count, 1);
955
956        assert_eq!(s2.saga_id, 2);
957        assert_eq!(s2.state, "failed_compensation");
958        // step_count counts succeeded+compensated, so 1 succeeded
959        // (the reap-panes step). The pending drain-pool step is NOT
960        // counted as progress.
961        assert_eq!(s2.step_count, 1);
962        assert!(s2
963            .failure_reason
964            .as_deref()
965            .unwrap_or("")
966            .contains("launcher restarted"));
967
968        // Step rows for saga 2 still surface via unresolved_sagas? No —
969        // saga 2 is in failed_compensation, which is terminal, so
970        // unresolved_sagas EXCLUDES it. The formatter handles this by
971        // simply skipping the step list for terminal-recovered sagas.
972        // That's fine: operators looking for in-flight detail get it
973        // for sagas STILL unresolved; for already-recovered ones, the
974        // failure_reason header already names the prior state.
975        let unresolved = log.unresolved_sagas().unwrap();
976        assert!(unresolved.iter().all(|u| u.saga_id != 2));
977    }
978}