agentmux_cef/
launcher_ipc.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase B.2: host-side client for the launcher's named-pipe IPC
5// server. Connects on host startup, sends `Register`, holds the
6// connection open for the host's lifetime.
7//
8// Today (B.2) the connection is fire-and-hold — we Register and
9// then leave the read half idle. B.3 will start receiving `Event`s
10// from the launcher (state changes), B.4 will send `Command`s up,
11// B.7 will bridge events to the renderer process.
12//
13// Activated only when `AGENTMUX_LAUNCHER_PIPE` is set (production
14// portable / installed paths after PR #571 + this PR). Absent →
15// `task dev` mode where launcher isn't in the loop; host runs as
16// before.
17
18use std::sync::Arc;
19use std::sync::OnceLock;
20
21use agentmux_common::ipc::{ClientKind, Command, Event, HostFrame};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::sync::{mpsc, Mutex};
24
25#[cfg(target_os = "windows")]
26use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
27
28/// Phase B.4 — global outbound command channel. Set once when the
29/// launcher pipe connects; sync callers (CEF lifecycle callbacks
30/// fire on the UI thread) post `Command`s without needing a
31/// tokio runtime handle. Drained by a task spawned in
32/// `connect_to_launcher` that writes to the pipe.
33///
34/// `None` semantics: launcher IPC isn't connected (e.g. `task dev`
35/// mode where launcher isn't in the loop). `report_window_*` calls
36/// are silently no-ops; the host runs as before.
37static COMMAND_TX: OnceLock<mpsc::UnboundedSender<Command>> = OnceLock::new();
38
39/// Handle the host holds for its lifetime so the launcher's IPC
40/// pipe stays open. Dropping it closes the connection (launcher
41/// logs as a disconnect).
42///
43/// Cross-platform shape: the Windows variant carries the actual
44/// pipe writer + reader-task handles; the non-Windows variant is
45/// a unit-struct stub so `connect_to_launcher` keeps a uniform
46/// `Option<LauncherIpcHandle>` return type. Phase 7's cross-
47/// platform IPC (Unix domain sockets) will fill the non-Windows
48/// shape later. (reagent / codex P1 PR #573 round-1.)
49#[cfg(target_os = "windows")]
50pub struct LauncherIpcHandle {
51    /// Held purely as a keepalive; B.3+ will replace with a real
52    /// command-sender that uses this writer.
53    #[allow(dead_code)]
54    writer: Arc<Mutex<tokio::io::WriteHalf<NamedPipeClient>>>,
55    /// Read-half task handle. Currently just consumes bytes off the
56    /// wire and logs them. B.4+ replaces with an event dispatcher.
57    #[allow(dead_code)]
58    reader_task: tokio::task::JoinHandle<()>,
59}
60
61#[cfg(not(target_os = "windows"))]
62pub struct LauncherIpcHandle;
63
64/// If `AGENTMUX_LAUNCHER_PIPE` is set, connect, Register as Host,
65/// and return a handle the caller (host main.rs) holds for the
66/// host's lifetime. If unset → return None and the host runs in
67/// pre-Phase-B mode (no launcher connection).
68///
69/// Errors are logged but non-fatal: a launcher-IPC failure should
70/// NOT prevent the host from running, since the launcher's
71/// authoritative state is still in environment / files for B.2.
72/// Phase B.5+ will tighten this when the host actually depends on
73/// IPC for state.
74#[cfg(target_os = "windows")]
75pub async fn connect_to_launcher(
76    state: std::sync::Arc<crate::state::AppState>,
77) -> Option<LauncherIpcHandle> {
78    let pipe_path = match std::env::var("AGENTMUX_LAUNCHER_PIPE") {
79        Ok(p) if !p.is_empty() => p,
80        _ => {
81            tracing::info!(
82                "AGENTMUX_LAUNCHER_PIPE unset — running without launcher IPC (dev mode)"
83            );
84            return None;
85        }
86    };
87
88    // Open the named pipe (client side). The launcher created it
89    // with `first_pipe_instance(true)` and is accept-looping; this
90    // call blocks briefly until accept lands. tokio retries
91    // ERROR_PIPE_BUSY internally.
92    let client = match ClientOptions::new().open(&pipe_path) {
93        Ok(c) => c,
94        Err(e) => {
95            tracing::error!(
96                "[launcher-ipc] failed to open {}: {} — continuing without launcher IPC",
97                pipe_path,
98                e
99            );
100            return None;
101        }
102    };
103    tracing::info!("[launcher-ipc] connected to {}", pipe_path);
104
105    let (read_half, write_half) = tokio::io::split(client);
106    let writer = Arc::new(Mutex::new(write_half));
107
108    // Send Register FIRST. Server's `enforce_register_first` requires
109    // it as the very first message on the wire, and a fatal-close
110    // on violation would permanently disable the mirror (no
111    // reconnect path in B.4). The drain task that processes
112    // outbound commands gets spawned + the global sender published
113    // only AFTER this succeeds. (reagent P1 + codex P2 PR #576
114    // round-1 — race where a `report_window_*` call between
115    // `COMMAND_TX.set` and the Register flush could land first on
116    // the wire.)
117    let register = Command::Register {
118        kind: ClientKind::Host,
119        pid: std::process::id(),
120        version: env!("CARGO_PKG_VERSION").to_string(),
121    };
122    let mut buf = match serde_json::to_vec(&register) {
123        Ok(b) => b,
124        Err(e) => {
125            tracing::error!(
126                "[launcher-ipc] failed to serialize Register: {} — continuing without IPC",
127                e
128            );
129            return None;
130        }
131    };
132    buf.push(b'\n');
133    {
134        let mut w = writer.lock().await;
135        if let Err(e) = w.write_all(&buf).await {
136            tracing::error!(
137                "[launcher-ipc] failed to send Register: {} — continuing without IPC",
138                e
139            );
140            return None;
141        }
142        if let Err(e) = w.flush().await {
143            tracing::error!(
144                "[launcher-ipc] failed to flush Register: {} — continuing without IPC",
145                e
146            );
147            return None;
148        }
149    }
150
151    // Phase B.4 — Register confirmed on the wire; safe to expose the
152    // outbound command channel. Sync callers push Commands; a
153    // dedicated drain task writes them as newline-delimited JSON.
154    // Ordered (preserves report order from the UI thread). Bounded?
155    // No — UnboundedSender so `try_send` can stay non-blocking on
156    // the UI thread; the drain task is fast (single async write)
157    // and the volume is one event per window create/close, not
158    // high-frequency.
159    let (tx, mut rx) = mpsc::unbounded_channel::<Command>();
160    if COMMAND_TX.set(tx).is_err() {
161        tracing::warn!("[launcher-ipc] COMMAND_TX already set — connect_to_launcher called twice");
162    }
163    let writer_for_drain = Arc::clone(&writer);
164    tokio::spawn(async move {
165        while let Some(cmd) = rx.recv().await {
166            let mut buf = match serde_json::to_vec(&cmd) {
167                Ok(b) => b,
168                Err(e) => {
169                    tracing::warn!("[launcher-ipc] failed to serialize {:?}: {}", cmd, e);
170                    continue;
171                }
172            };
173            buf.push(b'\n');
174            let mut w = writer_for_drain.lock().await;
175            if let Err(e) = w.write_all(&buf).await {
176                tracing::warn!(
177                    "[launcher-ipc] write failed for {:?}: {} — dropping further commands",
178                    cmd, e
179                );
180                return;
181            }
182            if let Err(e) = w.flush().await {
183                tracing::warn!("[launcher-ipc] flush failed: {}", e);
184                return;
185            }
186        }
187    });
188
189    // Spawn a read-loop task. Logs every event and feeds the host's
190    // shadow projections (`shadow_instance_registry` for now;
191    // `shadow_*` for other migrated maps in subsequent B.5 sub-PRs)
192    // from the launcher's authoritative event stream. Host has no
193    // local `WindowInstanceRegistry` post-B.5e — the shadow IS the
194    // read path.
195    //
196    // Phase CPD-5 — the reader now accepts `HostFrame` envelopes
197    // (CPD-1) so the launcher can push saga-issued `Command`s down
198    // alongside `Event`s. Each line is parsed as `HostFrame` first;
199    // on parse failure we fall back to raw `Event` JSON for
200    // backward-compatibility with launchers that haven't yet
201    // adopted the envelope shape (the legacy fanout path emits
202    // bare `Event` JSON; CPD-2's `HostPipe::send_event` wraps in
203    // `HostFrame::Event`).
204    //
205    // Saga-issued `Command`s are dispatched via the host-side
206    // idempotency LRU (`saga_dispatch::dispatch_host_command`):
207    // duplicate `(saga_id, kind)` pairs re-emit the same Report
208    // without re-running the action. The reply Commands are pushed
209    // through the existing `COMMAND_TX` channel, which the drain
210    // task spawned above already serializes to the writer.
211    let state_for_reader = std::sync::Arc::clone(&state);
212    let saga_lru = std::sync::Arc::new(parking_lot::Mutex::new(
213        crate::saga_dispatch::SagaIdempotencyLru::with_default_cap(),
214    ));
215    // The reply path uses the same `COMMAND_TX` published above.
216    // Snapshot a sender clone NOW so the reader doesn't have to
217    // probe the OnceLock on every command (which would also race
218    // with the unset-OnceLock window during the brief gap between
219    // Register flush and `COMMAND_TX.set`).
220    let reply_tx = COMMAND_TX
221        .get()
222        .expect("COMMAND_TX set before reader task spawn")
223        .clone();
224    let saga_runner = std::sync::Arc::new(crate::saga_dispatch::LiveActionRunner {
225        state: std::sync::Arc::clone(&state),
226    });
227    let reader_task = tokio::spawn(async move {
228        let reader = BufReader::new(read_half);
229        let mut lines = reader.lines();
230        loop {
231            match lines.next_line().await {
232                Ok(Some(line)) if line.trim().is_empty() => continue,
233                Ok(Some(line)) => {
234                    // Try the envelope first.
235                    match serde_json::from_str::<HostFrame>(&line) {
236                        Ok(HostFrame::Event(event)) => {
237                            tracing::info!("[launcher-ipc] received event: {:?}", event);
238                            apply_event_to_shadow(&state_for_reader, &event);
239                        }
240                        Ok(HostFrame::Command(cmd)) => {
241                            tracing::info!(
242                                "[launcher-ipc] received saga command: {:?}",
243                                cmd
244                            );
245                            let outcome = crate::saga_dispatch::dispatch_host_command(
246                                &cmd,
247                                saga_runner.as_ref(),
248                                &saga_lru,
249                                &reply_tx,
250                            );
251                            tracing::debug!(
252                                "[launcher-ipc] saga command outcome: {:?}",
253                                outcome
254                            );
255                        }
256                        Err(_envelope_err) => {
257                            // Backward-compat fallback: pre-CPD-2 launchers
258                            // emit bare `Event` JSON without the `HostFrame`
259                            // wrapper. Parse as raw `Event`; on failure
260                            // log and skip.
261                            match serde_json::from_str::<Event>(&line) {
262                                Ok(event) => {
263                                    tracing::info!(
264                                        "[launcher-ipc] received event (legacy bare-Event): {:?}",
265                                        event
266                                    );
267                                    apply_event_to_shadow(&state_for_reader, &event);
268                                }
269                                Err(e) => {
270                                    tracing::warn!(
271                                        "[launcher-ipc] could not parse line as HostFrame or Event ({}): {}",
272                                        e,
273                                        line
274                                    );
275                                }
276                            }
277                        }
278                    }
279                }
280                Ok(None) => {
281                    tracing::info!("[launcher-ipc] launcher pipe EOF — connection closed");
282                    return;
283                }
284                Err(e) => {
285                    tracing::warn!("[launcher-ipc] read error: {}", e);
286                    return;
287                }
288            }
289        }
290    });
291
292    Some(LauncherIpcHandle {
293        writer,
294        reader_task,
295    })
296}
297
298#[cfg(not(target_os = "windows"))]
299pub async fn connect_to_launcher(
300    _state: std::sync::Arc<crate::state::AppState>,
301) -> Option<LauncherIpcHandle> {
302    None
303}
304
305/// Apply a launcher event to host's shadow projections, then fan
306/// the event out to every top-level renderer via the JS bridge.
307///
308/// Shadows are updated FIRST so any renderer-side code that reads
309/// host state via the IPC HTTP path (e.g. `listWindowInstances`)
310/// sees a consistent view at the moment the typed event lands.
311///
312/// Phase B.7.3.3 — the bespoke `window-instances-changed` re-emit
313/// is gone; renderers now consume typed events directly via the
314/// CEF JS bridge dispatcher (`launcher_event_bridge`).
315/// Pure shadow-state projection — extracted from `apply_event_to_shadow`
316/// for property testing per master spec §8.14 (subscriber idempotency
317/// contract). Mutates ONLY the shadow Mutex<HashMap> fields:
318/// `shadow_instance_registry`, `shadow_window_meta`,
319/// `shadow_backend_window_ids`. No UI tasks, no renderer dispatch, no
320/// CEF calls — those live in the wrapper `apply_event_to_shadow`.
321///
322/// Idempotent by construction: every arm is `HashMap::insert` /
323/// `HashMap::remove` keyed by `label`, both naturally idempotent past
324/// the first call. The drift-compare branch in `WindowOpened` is
325/// read-only (logs a warning, doesn't mutate). Locked in by the
326/// `shadow_projection_idempotent_under_replay` proptest.
327pub(crate) fn apply_shadow_projection(state: &std::sync::Arc<crate::state::AppState>, event: &Event) {
328    match event {
329        Event::WindowInstanceAssigned { label, num, .. } => {
330            // Phase B.5e — host's `WindowInstanceRegistry` was
331            // deleted. The drift compare that used to live here
332            // (B.5b/c/d) is gone; the launcher is the sole
333            // authority and the shadow is the host's projection.
334            state
335                .shadow_instance_registry
336                .lock()
337                .insert(label.clone(), *num);
338        }
339        Event::WindowOpened { label, kind, parent_label, .. } => {
340            // Phase B.5 (window_meta step b) — shadow projection
341            // + drift compare. Map the wire `WindowKind` back to
342            // host's `crate::state::WindowKind` (one-to-one).
343            let host_kind = match kind {
344                agentmux_common::ipc::WindowKind::FullInstance => crate::state::WindowKind::FullInstance,
345                agentmux_common::ipc::WindowKind::Subwindow => crate::state::WindowKind::Subwindow,
346            };
347            let shadow_meta = crate::state::WindowMeta {
348                label: label.clone(),
349                kind: host_kind,
350                parent_instance_id: parent_label.clone(),
351            };
352            // Drift compare to host's authoritative `window_meta`.
353            // Race-tolerant: host's local insert (in
354            // `on_after_created`) may not have run yet if the
355            // launcher event raced ahead. Skip in that case.
356            let host_meta = state.window_meta.lock().get(label).cloned();
357            if let Some(host_meta) = host_meta {
358                if host_meta.kind != shadow_meta.kind
359                    || host_meta.parent_instance_id != shadow_meta.parent_instance_id
360                {
361                    tracing::warn!(
362                        target: "launcher-ipc:drift",
363                        label = %label,
364                        launcher_kind = ?shadow_meta.kind,
365                        launcher_parent = ?shadow_meta.parent_instance_id,
366                        host_kind = ?host_meta.kind,
367                        host_parent = ?host_meta.parent_instance_id,
368                        "[launcher-ipc] window_meta drift: host and launcher disagree"
369                    );
370                }
371            }
372            state
373                .shadow_window_meta
374                .lock()
375                .insert(label.clone(), shadow_meta);
376        }
377        Event::WindowClosed { label, .. } => {
378            // Phase B.5 (window_meta step b) — symmetric drop.
379            // Drift compare on close is omitted: the launcher's
380            // strict-pairing semantic (PR #577 round-2) means we
381            // only see this event when the open was paired, so a
382            // host-side missing entry would have surfaced at
383            // open time.
384            state.shadow_window_meta.lock().remove(label);
385        }
386        Event::BackendWindowIdRegistered { label, window_id, .. } => {
387            // Phase B.5 (window_id_map step e) — host's
388            // `window_id_map` was deleted; the drift compare is
389            // gone with it. Shadow is sole source of truth.
390            state
391                .shadow_backend_window_ids
392                .lock()
393                .insert(label.clone(), window_id.clone());
394        }
395        Event::BackendWindowIdUnregistered { label, .. } => {
396            // Phase B.5 (window_id_map step e) — drift compare gone.
397            state.shadow_backend_window_ids.lock().remove(label);
398        }
399        Event::WindowInstanceReleased { label, .. } => {
400            // Phase B.5e — host's `WindowInstanceRegistry` was
401            // deleted; the drift compare is gone with it.
402            state.shadow_instance_registry.lock().remove(label);
403        }
404        // Side-effect arms (UI tasks, reconciler) handled in the
405        // `apply_event_to_shadow` wrapper. Excluded here so this
406        // function stays pure-projection and trivially idempotent.
407        _ => {}
408    }
409}
410
411fn apply_event_to_shadow(state: &std::sync::Arc<crate::state::AppState>, event: &Event) {
412    apply_shadow_projection(state, event);
413    match event {
414        Event::CorrectiveWindowMove { hwnd, target_rect, reason, .. } => {
415            // Phase B.9.2 — pure-reducer self-heal. Reducer detected
416            // an off-monitor / sentinel-parked window that the user
417            // has never foregrounded, and computed an on-monitor
418            // target rect. Apply `SetWindowPos` to move the window
419            // before the user notices the orphan taskbar entry.
420            // The CEF UI thread is the safe caller for SetWindowPos
421            // on a CEF Views-managed window — post a UI task rather
422            // than calling from this tokio task directly.
423            tracing::warn!(
424                target: "wrr",
425                "[wrr] CorrectiveWindowMove hwnd={:#x} reason={:?} target=({},{})-({},{})",
426                hwnd, reason,
427                target_rect.left, target_rect.top, target_rect.right, target_rect.bottom
428            );
429            crate::ui_tasks::post_corrective_window_move(
430                state,
431                *hwnd,
432                target_rect.left,
433                target_rect.top,
434                target_rect.right - target_rect.left,
435                target_rect.bottom - target_rect.top,
436            );
437        }
438        Event::HostShouldQuit { .. } => {
439            // The launcher emits this when it detects the
440            // last-user-visible-window-closed-but-host-alive state.
441            // The host's reconciler closes any orphan
442            // `window-pool-*` browsers so the existing on_before_close
443            // cascade can drain to `browser_list.is_empty()` and
444            // fire `quit_message_loop`.
445            //
446            // Spec: `docs/specs/SPEC_HOST_ORPHAN_RECONCILIATION_2026_05_05.md`.
447            //
448            // We're on the launcher IPC reader thread; CEF
449            // Browser/BrowserHost calls aren't safe here. The
450            // reconciler does state-snapshot + classification only,
451            // then posts a UI-thread task that does the HWND probe
452            // and `host.close_browser(force=1)`. Earlier v0.33.491–
453            // v0.33.494 attempts at driving UI shutdown directly
454            // from this handler hung CEF; using the task scheduler
455            // is the difference.
456            tracing::warn!(
457                target: "wrr",
458                "[wrr] HostShouldQuit received — running orphan reconciler"
459            );
460            crate::commands::orphan_reconcile::reconcile_and_drain(state);
461        }
462        _ => {}
463    }
464
465    // Phase B.7.3.1 — broadcast the typed event to every top-level
466    // renderer. The renderer-side dispatcher (`window.__agentmux_launcher_event`,
467    // installed by `frontend/util/launcher-events.ts`) feeds the
468    // launcher-event-reducer signal that downstream UI subscribes to.
469    crate::launcher_event_bridge::dispatch_to_renderers(state, event);
470}
471
472/// Phase B.4 — sync API: report a window open to the launcher's
473/// state mirror. Called from CEF lifecycle callbacks on the UI
474/// thread. No-op if the launcher pipe isn't connected (`task dev`
475/// mode); failures to enqueue (channel closed, drain task died)
476/// are logged but don't propagate — the host's authoritative state
477/// is unaffected, the mirror just falls behind. B.5 tightens.
478pub fn report_window_opened(
479    label: String,
480    kind: agentmux_common::ipc::WindowKind,
481    parent_label: Option<String>,
482) {
483    let Some(tx) = COMMAND_TX.get() else {
484        return; // launcher not in the loop
485    };
486    let cmd = Command::ReportWindowOpened {
487        label,
488        kind,
489        parent_label,
490    };
491    if let Err(e) = tx.send(cmd) {
492        tracing::warn!("[launcher-ipc] report_window_opened: channel closed ({})", e);
493    }
494}
495
496/// Phase B.4 — sync API: report a window close to the launcher's
497/// state mirror. Same no-op-if-disconnected semantics as
498/// `report_window_opened`.
499pub fn report_window_closed(label: String) {
500    let Some(tx) = COMMAND_TX.get() else {
501        return;
502    };
503    let cmd = Command::ReportWindowClosed { label };
504    if let Err(e) = tx.send(cmd) {
505        tracing::warn!("[launcher-ipc] report_window_closed: channel closed ({})", e);
506    }
507}
508
509/// Phase B.4 follow-up — sync API: report a pool window being added
510/// to the warm pool inventory. Called from `spawn_pool_window` on
511/// the UI thread. No-op when launcher pipe is absent.
512pub fn report_pool_window_added(label: String) {
513    let Some(tx) = COMMAND_TX.get() else {
514        return;
515    };
516    // CPD-1 (schema-only): host's existing `report_pool_window_added`
517    // call sites are organic refills (not yet saga-driven); pass
518    // `saga_id: None` per spec §3.3. CPD-3 wires the saga-driven
519    // path that will pass `Some(N)` through here.
520    let cmd = Command::ReportPoolWindowAdded { label, saga_id: None };
521    if let Err(e) = tx.send(cmd) {
522        tracing::warn!("[launcher-ipc] report_pool_window_added: channel closed ({})", e);
523    }
524}
525
526/// Phase B.4 follow-up — sync API: report a pool window leaving the
527/// pool (promote, destroy). On promote callers should also call
528/// `report_window_opened` so the label transitions atomically (from
529/// the launcher's perspective) from `pool` to `windows`.
530pub fn report_pool_window_removed(label: String) {
531    let Some(tx) = COMMAND_TX.get() else {
532        return;
533    };
534    let cmd = Command::ReportPoolWindowRemoved { label };
535    if let Err(e) = tx.send(cmd) {
536        tracing::warn!("[launcher-ipc] report_pool_window_removed: channel closed ({})", e);
537    }
538}
539
540/// Phase F.5 — sync API: tell the launcher that a pool window is
541/// promoting to a user-visible top-level window. Sent BETWEEN
542/// `report_pool_window_removed` and `report_window_opened` so the
543/// launcher's pool-respawn saga (state-machine bracket around the
544/// implicit refill) can correlate the promote event with the
545/// subsequent `PoolWindowAdded` for the freshly-spawned replacement
546/// pool slot.
547///
548/// Same no-op-if-disconnected semantics as the other `report_*`
549/// helpers — `task dev` mode (no launcher in the loop) silently
550/// drops; the host's authoritative state and refill mechanism are
551/// unaffected.
552pub fn report_pool_window_promoted(label: String) {
553    let Some(tx) = COMMAND_TX.get() else {
554        return;
555    };
556    let cmd = Command::ReportPoolWindowPromoted { label };
557    if let Err(e) = tx.send(cmd) {
558        tracing::warn!("[launcher-ipc] report_pool_window_promoted: channel closed ({})", e);
559    }
560}
561
562/// Phase F.6 — sync API: tell the launcher that all browser-pane
563/// HWNDs belonging to a closing top-level window have been reaped
564/// (lifecycle entries drained, pane HWND map cleared, subwindow
565/// cascade closes initiated). Sent from `client.rs::on_before_close`
566/// AFTER the pane drain step, BEFORE the post-close pool-drain
567/// decision is reported.
568///
569/// The launcher's window-cleanup-cascade saga uses this as the
570/// Step 1 → Step 2 transition signal: it marks the implicit pane
571/// reap as observed and lets the saga issue its `DrainPoolIfLast`
572/// IssueCmd (currently log-only — see `saga/window_cleanup.rs`
573/// module docstring for the saga-as-narrator scope decision).
574///
575/// Same no-op-if-disconnected semantics as the other `report_*`
576/// helpers; `task dev` mode silently drops.
577pub fn report_panes_reaped(label: String) {
578    let Some(tx) = COMMAND_TX.get() else {
579        return;
580    };
581    // CPD-1 (schema-only): existing call sites are organic; CPD-3
582    // adds the saga-driven path that fills `saga_id`.
583    let cmd = Command::ReportPanesReaped { label, saga_id: None };
584    if let Err(e) = tx.send(cmd) {
585        tracing::warn!("[launcher-ipc] report_panes_reaped: channel closed ({})", e);
586    }
587}
588
589/// Phase F.6 — sync API: tell the launcher the result of the
590/// post-close drain-pool-if-last decision. `was_last == true` when
591/// the closing window was the last user-visible window (Stage 1 of
592/// the wrr two-stage close cascade just kicked off in
593/// `client.rs::on_before_close`); `false` when other windows
594/// remain and the warm pool stays warm.
595///
596/// Step 2 terminal signal for the launcher's
597/// window-cleanup-cascade saga. Both branches close the
598/// `SagaStarted` bracket successfully — the saga's job is to
599/// narrate the decision, not enforce a particular outcome.
600///
601/// Same no-op-if-disconnected semantics as the other `report_*`
602/// helpers.
603pub fn report_pool_drain_decision(label: String, was_last: bool) {
604    let Some(tx) = COMMAND_TX.get() else {
605        return;
606    };
607    // CPD-1 (schema-only): existing call sites are organic; CPD-3
608    // adds the saga-driven path that fills `saga_id`.
609    let cmd = Command::ReportPoolDrainDecision {
610        label,
611        was_last,
612        saga_id: None,
613    };
614    if let Err(e) = tx.send(cmd) {
615        tracing::warn!(
616            "[launcher-ipc] report_pool_drain_decision: channel closed ({})",
617            e
618        );
619    }
620}
621
622/// Phase B.4 follow-up — sync API: report the host's current
623/// authoritative counts so the launcher reducer can compare against
624/// its mirror and emit `Event::DriftDetected` on mismatch. Callers
625/// invoke this AFTER each window-level transition so the launcher
626/// gets a fresh snapshot to compare against its just-applied
627/// transition.
628pub fn report_host_counts(windows: u32, pool: u32) {
629    let Some(tx) = COMMAND_TX.get() else {
630        return;
631    };
632    let cmd = Command::ReportHostCounts { windows, pool };
633    if let Err(e) = tx.send(cmd) {
634        tracing::warn!("[launcher-ipc] report_host_counts: channel closed ({})", e);
635    }
636}
637
638/// Phase B.5 (window_id_map step b) — sync API: report the
639/// frontend's `register_backend_window` IPC to the launcher.
640/// Called from `commands/window.rs::register_backend_window`
641/// after the host's local `window_id_map` insert. No-op if the
642/// launcher pipe isn't connected.
643pub fn report_backend_window_id_registered(label: String, window_id: String) {
644    let Some(tx) = COMMAND_TX.get() else {
645        return;
646    };
647    let cmd = Command::ReportBackendWindowIdRegistered { label, window_id };
648    if let Err(e) = tx.send(cmd) {
649        tracing::warn!(
650            "[launcher-ipc] report_backend_window_id_registered: channel closed ({})",
651            e
652        );
653    }
654}
655
656/// Phase B.5 (window_id_map step b) — sync API: report a window's
657/// backend ID being dropped (close path). Called from
658/// `client.rs::on_before_close` after the host's local
659/// `window_id_map.remove`. No-op if launcher pipe absent.
660pub fn report_backend_window_id_unregistered(label: String) {
661    let Some(tx) = COMMAND_TX.get() else {
662        return;
663    };
664    let cmd = Command::ReportBackendWindowIdUnregistered { label };
665    if let Err(e) = tx.send(cmd) {
666        tracing::warn!(
667            "[launcher-ipc] report_backend_window_id_unregistered: channel closed ({})",
668            e
669        );
670    }
671}
672
673/// Phase B.4 follow-up — sync API: report the host's pool count
674/// only. Used by `spawn_pool_window` where the windows dimension
675/// is mid-flight relative to the launcher mirror (refill happens
676/// during a close path that hasn't sent `ReportWindowClosed` yet);
677/// snapshotting only the pool dimension preserves the
678/// check-every-transition guarantee without producing false
679/// windows-drift. (codex P2 PR #578 round-3.)
680pub fn report_host_pool_count(count: u32) {
681    let Some(tx) = COMMAND_TX.get() else {
682        return;
683    };
684    let cmd = Command::ReportHostPoolCount { count };
685    if let Err(e) = tx.send(cmd) {
686        tracing::warn!("[launcher-ipc] report_host_pool_count: channel closed ({})", e);
687    }
688}
689
690/// Phase B.9.1 (WRR) — sync API: report a Win32 HWND created.
691/// Called from the WRR `SetWinEventHook` callback. No-op if the
692/// launcher pipe isn't connected (`task dev` mode); reducer arm
693/// stashes pending-without-label until reconciliation.
694pub fn report_hwnd_opened(
695    hwnd: u64,
696    class_name: String,
697    title: String,
698    label_hint: Option<String>,
699) {
700    let Some(tx) = COMMAND_TX.get() else {
701        return;
702    };
703    let cmd = Command::ReportHwndOpened {
704        hwnd,
705        class_name,
706        title,
707        label_hint,
708    };
709    if let Err(e) = tx.send(cmd) {
710        tracing::warn!("[launcher-ipc] report_hwnd_opened: channel closed ({})", e);
711    }
712}
713
714/// Phase B.9.1 — sync API: report a Win32 HWND destroyed.
715pub fn report_hwnd_destroyed(hwnd: u64) {
716    let Some(tx) = COMMAND_TX.get() else {
717        return;
718    };
719    let cmd = Command::ReportHwndDestroyed { hwnd };
720    if let Err(e) = tx.send(cmd) {
721        tracing::warn!("[launcher-ipc] report_hwnd_destroyed: channel closed ({})", e);
722    }
723}
724
725/// Phase B.9.1 — sync API: report visibility change.
726pub fn report_hwnd_visibility_changed(hwnd: u64, visible: bool) {
727    let Some(tx) = COMMAND_TX.get() else {
728        return;
729    };
730    let cmd = Command::ReportHwndVisibilityChanged { hwnd, visible };
731    if let Err(e) = tx.send(cmd) {
732        tracing::warn!("[launcher-ipc] report_hwnd_visibility_changed: channel closed ({})", e);
733    }
734}
735
736/// Phase B.9.1 — sync API: report foreground change.
737pub fn report_hwnd_foreground_changed(hwnd: u64) {
738    let Some(tx) = COMMAND_TX.get() else {
739        return;
740    };
741    let cmd = Command::ReportHwndForegroundChanged { hwnd };
742    if let Err(e) = tx.send(cmd) {
743        tracing::warn!("[launcher-ipc] report_hwnd_foreground_changed: channel closed ({})", e);
744    }
745}
746
747/// Phase B.9.1 — sync API: report iconic (minimized) change.
748pub fn report_hwnd_iconic_changed(hwnd: u64, iconic: bool) {
749    let Some(tx) = COMMAND_TX.get() else {
750        return;
751    };
752    let cmd = Command::ReportHwndIconicChanged { hwnd, iconic };
753    if let Err(e) = tx.send(cmd) {
754        tracing::warn!("[launcher-ipc] report_hwnd_iconic_changed: channel closed ({})", e);
755    }
756}
757
758/// Phase B.9.1 — sync API: report position change. Caller is
759/// responsible for debouncing — see `wrr/position_debounce.rs`.
760pub fn report_hwnd_position_changed(hwnd: u64, rect: agentmux_common::ipc::Rect) {
761    let Some(tx) = COMMAND_TX.get() else {
762        return;
763    };
764    let cmd = Command::ReportHwndPositionChanged { hwnd, rect };
765    if let Err(e) = tx.send(cmd) {
766        tracing::warn!("[launcher-ipc] report_hwnd_position_changed: channel closed ({})", e);
767    }
768}
769
770/// Phase B.9.1 — sync API: report current monitor topology. Sent
771/// once at install time; mid-session topology changes are a B.9.2
772/// follow-up.
773pub fn report_monitor_topology_changed(rects: Vec<agentmux_common::ipc::Rect>) {
774    let Some(tx) = COMMAND_TX.get() else {
775        return;
776    };
777    let cmd = Command::ReportMonitorTopologyChanged { rects };
778    if let Err(e) = tx.send(cmd) {
779        tracing::warn!("[launcher-ipc] report_monitor_topology_changed: channel closed ({})", e);
780    }
781}
782
783/// Phase B.4 follow-up — compute the host's authoritative counts
784/// from `AppState` and report them. Callers invoke this AFTER
785/// each window/pool transition.
786///
787/// Atomic snapshot: holds both `unpromoted_pool_labels` and
788/// `browsers` simultaneously so the reported `(windows, pool)`
789/// pair is from one consistent state. Without this, a concurrent
790/// mutation between the two lock acquisitions (CEF lifecycle on
791/// the UI thread vs. IPC handler in `commands/drag.rs`) could
792/// produce a mismatched snapshot and trigger a spurious
793/// `Event::DriftDetected`. (codex P2 PR #578 round-1.)
794///
795/// Lock order: `unpromoted_pool_labels` first, then `browsers`.
796/// Matches the existing snapshot pattern in
797/// `client.rs::on_before_close` (line ~418) and is the only place
798/// in the codebase that holds both locks simultaneously, so no
799/// other path can race in the reverse order.
800///
801/// Counts (matching the launcher's mirror semantics):
802/// * `windows` — top-level user-visible windows in `browsers`,
803///   excluding `browser-pane-*` child HWNDs and any label still
804///   in `unpromoted_pool_labels`.
805/// * `pool` — pre-promote pool labels (`unpromoted_pool_labels.len()`).
806///
807/// **Why this reads host's `browsers` and `unpromoted_pool_labels`
808/// directly (not the shadow):** this fn IS the source for the
809/// launcher's mirror — its output is what gets compared against
810/// `state.windows.len()` / `state.pool.len()` in the drift-detection
811/// path. Reading from the shadow would compare the shadow against
812/// itself (always agrees) and defeat the entire B.4 drift-detection
813/// design. Once the host reducer arrives in Phase F (see
814/// `docs/retro/multi-reducer-proposal-2026-04-28.md`), this becomes
815/// "report host's authoritative reducer-state to the launcher."
816pub fn compute_and_report_host_counts(state: &std::sync::Arc<crate::state::AppState>) {
817    // Atomic snapshot — pool inventory + browsers under ONE
818    // `host_state` lock. Two-lock variants race against
819    // `promote_pool_window` between reads and let queued pool
820    // windows leak into the user-window count, triggering spurious
821    // launcher drift-detection.
822    let (windows, pool) = state.host_counts_snapshot();
823    report_host_counts(windows, pool);
824}
825
826#[cfg(test)]
827mod shadow_projection_tests {
828    //! Master spec §8.14 — subscriber idempotency property tests for
829    //! the host's shadow projection. The launcher event channel may
830    //! deliver duplicates (re-dispatch, resync, replay); the contract
831    //! is that subscribers fold them into a no-op past the first
832    //! application.
833    //!
834    //! These tests target `apply_shadow_projection`, the pure
835    //! HashMap-mutation slice of `apply_event_to_shadow`. Side-effect
836    //! arms (`CorrectiveWindowMove`, `HostShouldQuit`) are excluded
837    //! from the projection function and tested separately at the
838    //! integration level.
839
840    use super::*;
841    use agentmux_common::ipc::{Event, WindowKind};
842    use proptest::prelude::*;
843    use std::sync::Arc;
844
845    fn snapshot_shadow_state(
846        state: &Arc<crate::state::AppState>,
847    ) -> (
848        std::collections::HashMap<String, crate::state::WindowMeta>,
849        std::collections::HashMap<String, String>,
850        std::collections::HashMap<String, u32>,
851    ) {
852        (
853            state.shadow_window_meta.lock().clone(),
854            state.shadow_backend_window_ids.lock().clone(),
855            state.shadow_instance_registry.lock().clone(),
856        )
857    }
858
859    /// Strategy: events the projection actually mutates. Side-effect
860    /// arms (CorrectiveWindowMove, HostShouldQuit) excluded — they
861    /// don't reach `apply_shadow_projection`'s match.
862    ///
863    /// Labels drawn from `[a-c]{1,3}` so duplicates (open then close
864    /// for same label) are common. The launcher's strict-pairing
865    /// semantic means real production never sends close-without-open,
866    /// but the projection itself doesn't enforce that, and replaying
867    /// either order through `apply_shadow_projection` should still
868    /// converge to the same shadow state.
869    fn arb_projection_event() -> impl Strategy<Value = Event> {
870        prop_oneof![
871            3 => (
872                "[a-c]{1,3}",
873                prop_oneof![Just(WindowKind::FullInstance), Just(WindowKind::Subwindow)],
874                prop_oneof![Just(None::<String>), Just(Some("a".into()))],
875            )
876                .prop_map(|(label, kind, parent_label)| Event::WindowOpened {
877                    label,
878                    kind,
879                    parent_label,
880                    version: 0,
881                }),
882            3 => "[a-c]{1,3}".prop_map(|label| Event::WindowClosed {
883                label,
884                version: 0,
885                crash_detected: false,
886            }),
887            2 => ("[a-c]{1,3}", 1u32..100u32).prop_map(|(label, num)| {
888                Event::WindowInstanceAssigned { label, num, version: 0 }
889            }),
890            1 => ("[a-c]{1,3}", 1u32..100u32).prop_map(|(label, num)| {
891                Event::WindowInstanceReleased { label, num, version: 0 }
892            }),
893            2 => ("[a-c]{1,3}", "[0-9a-f]{4,8}").prop_map(|(label, window_id)| {
894                Event::BackendWindowIdRegistered { label, window_id, version: 0 }
895            }),
896            1 => ("[a-c]{1,3}", "[0-9a-f]{4,8}").prop_map(|(label, window_id)| {
897                Event::BackendWindowIdUnregistered { label, window_id, version: 0 }
898            }),
899        ]
900    }
901
902    proptest! {
903        #![proptest_config(ProptestConfig {
904            cases: 64,
905            ..ProptestConfig::default()
906        })]
907
908        /// Master spec §8.14 — replaying any event sequence twice
909        /// converges to the same shadow state. Property: for any
910        /// sequence S of events, `apply(S) == apply(S; S)`.
911        ///
912        /// Drift-storm regression class (PR #708): if a launcher
913        /// event reaches the host's `apply_event_to_shadow` 600
914        /// times instead of once, the shadow projection MUST stay
915        /// in the same final state. By construction
916        /// (HashMap::insert/remove are idempotent for the same key),
917        /// this holds — the proptest locks it.
918        #[test]
919        fn shadow_projection_idempotent_under_replay(
920            events in proptest::collection::vec(arb_projection_event(), 0..40)
921        ) {
922            let state = Arc::new(crate::state::AppState::default());
923
924            // Apply once.
925            for e in &events {
926                apply_shadow_projection(&state, e);
927            }
928            let (meta1, ids1, registry1) = snapshot_shadow_state(&state);
929
930            // Apply the SAME sequence again — every event is a duplicate.
931            for e in &events {
932                apply_shadow_projection(&state, e);
933            }
934            let (meta2, ids2, registry2) = snapshot_shadow_state(&state);
935
936            prop_assert_eq!(meta1, meta2, "shadow_window_meta diverged on replay");
937            prop_assert_eq!(ids1, ids2, "shadow_backend_window_ids diverged on replay");
938            prop_assert_eq!(registry1, registry2, "shadow_instance_registry diverged on replay");
939        }
940
941        /// Stronger property: arbitrary per-event duplication count
942        /// (1..5x per event) produces the same final state as
943        /// applying each event once. This is the §8.14 contract
944        /// directly — duplicates must fold to no-op.
945        #[test]
946        fn shadow_projection_collapses_arbitrary_duplicates(
947            events in proptest::collection::vec(arb_projection_event(), 1..30),
948            dup_counts in proptest::collection::vec(1u8..6, 1..30),
949        ) {
950            // Pair events with duplication counts, taking the shorter length.
951            let n = events.len().min(dup_counts.len());
952
953            // Run 1: apply each event once.
954            let state_once = Arc::new(crate::state::AppState::default());
955            for e in events.iter().take(n) {
956                apply_shadow_projection(&state_once, e);
957            }
958            let baseline = snapshot_shadow_state(&state_once);
959
960            // Run 2: apply each event N times (1..5x).
961            let state_dup = Arc::new(crate::state::AppState::default());
962            for (e, &dups) in events.iter().take(n).zip(dup_counts.iter().take(n)) {
963                for _ in 0..dups {
964                    apply_shadow_projection(&state_dup, e);
965                }
966            }
967            let inflated = snapshot_shadow_state(&state_dup);
968
969            prop_assert_eq!(baseline.0, inflated.0, "shadow_window_meta diverged under duplicate-bursting");
970            prop_assert_eq!(baseline.1, inflated.1, "shadow_backend_window_ids diverged under duplicate-bursting");
971            prop_assert_eq!(baseline.2, inflated.2, "shadow_instance_registry diverged under duplicate-bursting");
972        }
973    }
974
975    /// Anti-vacuity guard (per `feedback_property_test_input_must_match_sut_filter`):
976    /// confirm the projection actually mutates the shadow state under
977    /// the strategy's events. If the strategy ever drifts away from
978    /// the SUT (e.g. event variants renamed), this test fails loudly
979    /// instead of letting the property hold vacuously.
980    #[test]
981    fn projection_actually_mutates_state_for_strategy_events() {
982        let state = Arc::new(crate::state::AppState::default());
983        apply_shadow_projection(
984            &state,
985            &Event::WindowOpened {
986                label: "a".to_string(),
987                kind: WindowKind::FullInstance,
988                parent_label: None,
989                version: 0,
990            },
991        );
992        assert!(
993            state.shadow_window_meta.lock().contains_key("a"),
994            "projection failed to mutate shadow_window_meta — strategy/SUT drift?"
995        );
996        apply_shadow_projection(
997            &state,
998            &Event::BackendWindowIdRegistered {
999                label: "a".to_string(),
1000                window_id: "w-1".to_string(),
1001                version: 0,
1002            },
1003        );
1004        assert!(
1005            state.shadow_backend_window_ids.lock().contains_key("a"),
1006            "projection failed to mutate shadow_backend_window_ids"
1007        );
1008        apply_shadow_projection(
1009            &state,
1010            &Event::WindowInstanceAssigned {
1011                label: "a".to_string(),
1012                num: 7,
1013                version: 0,
1014            },
1015        );
1016        assert_eq!(
1017            state.shadow_instance_registry.lock().get("a"),
1018            Some(&7),
1019            "projection failed to mutate shadow_instance_registry"
1020        );
1021    }
1022}