agentmux_cef/launcher_ipc.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase B.2: host-side client for the launcher's named-pipe IPC
5// server. Connects on host startup, sends `Register`, holds the
6// connection open for the host's lifetime.
7//
8// Today (B.2) the connection is fire-and-hold — we Register and
9// then leave the read half idle. B.3 will start receiving `Event`s
10// from the launcher (state changes), B.4 will send `Command`s up,
11// B.7 will bridge events to the renderer process.
12//
13// Activated only when `AGENTMUX_LAUNCHER_PIPE` is set (production
14// portable / installed paths after PR #571 + this PR). Absent →
15// `task dev` mode where launcher isn't in the loop; host runs as
16// before.
17
18use std::sync::Arc;
19use std::sync::OnceLock;
20
21use agentmux_common::ipc::{ClientKind, Command, Event, HostFrame};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::sync::{mpsc, Mutex};
24
25#[cfg(target_os = "windows")]
26use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
27
28/// Phase B.4 — global outbound command channel. Set once when the
29/// launcher pipe connects; sync callers (CEF lifecycle callbacks
30/// fire on the UI thread) post `Command`s without needing a
31/// tokio runtime handle. Drained by a task spawned in
32/// `connect_to_launcher` that writes to the pipe.
33///
34/// `None` semantics: launcher IPC isn't connected (e.g. `task dev`
35/// mode where launcher isn't in the loop). `report_window_*` calls
36/// are silently no-ops; the host runs as before.
37static COMMAND_TX: OnceLock<mpsc::UnboundedSender<Command>> = OnceLock::new();
38
39/// Handle the host holds for its lifetime so the launcher's IPC
40/// pipe stays open. Dropping it closes the connection (launcher
41/// logs as a disconnect).
42///
43/// Cross-platform shape: the Windows variant carries the actual
44/// pipe writer + reader-task handles; the non-Windows variant is
45/// a unit-struct stub so `connect_to_launcher` keeps a uniform
46/// `Option<LauncherIpcHandle>` return type. Phase 7's cross-
47/// platform IPC (Unix domain sockets) will fill the non-Windows
48/// shape later. (reagent / codex P1 PR #573 round-1.)
49#[cfg(target_os = "windows")]
50pub struct LauncherIpcHandle {
51 /// Held purely as a keepalive; B.3+ will replace with a real
52 /// command-sender that uses this writer.
53 #[allow(dead_code)]
54 writer: Arc<Mutex<tokio::io::WriteHalf<NamedPipeClient>>>,
55 /// Read-half task handle. Currently just consumes bytes off the
56 /// wire and logs them. B.4+ replaces with an event dispatcher.
57 #[allow(dead_code)]
58 reader_task: tokio::task::JoinHandle<()>,
59}
60
61#[cfg(not(target_os = "windows"))]
62pub struct LauncherIpcHandle;
63
64/// If `AGENTMUX_LAUNCHER_PIPE` is set, connect, Register as Host,
65/// and return a handle the caller (host main.rs) holds for the
66/// host's lifetime. If unset → return None and the host runs in
67/// pre-Phase-B mode (no launcher connection).
68///
69/// Errors are logged but non-fatal: a launcher-IPC failure should
70/// NOT prevent the host from running, since the launcher's
71/// authoritative state is still in environment / files for B.2.
72/// Phase B.5+ will tighten this when the host actually depends on
73/// IPC for state.
74#[cfg(target_os = "windows")]
75pub async fn connect_to_launcher(
76 state: std::sync::Arc<crate::state::AppState>,
77) -> Option<LauncherIpcHandle> {
78 let pipe_path = match std::env::var("AGENTMUX_LAUNCHER_PIPE") {
79 Ok(p) if !p.is_empty() => p,
80 _ => {
81 tracing::info!(
82 "AGENTMUX_LAUNCHER_PIPE unset — running without launcher IPC (dev mode)"
83 );
84 return None;
85 }
86 };
87
88 // Open the named pipe (client side). The launcher created it
89 // with `first_pipe_instance(true)` and is accept-looping; this
90 // call blocks briefly until accept lands. tokio retries
91 // ERROR_PIPE_BUSY internally.
92 let client = match ClientOptions::new().open(&pipe_path) {
93 Ok(c) => c,
94 Err(e) => {
95 tracing::error!(
96 "[launcher-ipc] failed to open {}: {} — continuing without launcher IPC",
97 pipe_path,
98 e
99 );
100 return None;
101 }
102 };
103 tracing::info!("[launcher-ipc] connected to {}", pipe_path);
104
105 let (read_half, write_half) = tokio::io::split(client);
106 let writer = Arc::new(Mutex::new(write_half));
107
108 // Send Register FIRST. Server's `enforce_register_first` requires
109 // it as the very first message on the wire, and a fatal-close
110 // on violation would permanently disable the mirror (no
111 // reconnect path in B.4). The drain task that processes
112 // outbound commands gets spawned + the global sender published
113 // only AFTER this succeeds. (reagent P1 + codex P2 PR #576
114 // round-1 — race where a `report_window_*` call between
115 // `COMMAND_TX.set` and the Register flush could land first on
116 // the wire.)
117 let register = Command::Register {
118 kind: ClientKind::Host,
119 pid: std::process::id(),
120 version: env!("CARGO_PKG_VERSION").to_string(),
121 };
122 let mut buf = match serde_json::to_vec(®ister) {
123 Ok(b) => b,
124 Err(e) => {
125 tracing::error!(
126 "[launcher-ipc] failed to serialize Register: {} — continuing without IPC",
127 e
128 );
129 return None;
130 }
131 };
132 buf.push(b'\n');
133 {
134 let mut w = writer.lock().await;
135 if let Err(e) = w.write_all(&buf).await {
136 tracing::error!(
137 "[launcher-ipc] failed to send Register: {} — continuing without IPC",
138 e
139 );
140 return None;
141 }
142 if let Err(e) = w.flush().await {
143 tracing::error!(
144 "[launcher-ipc] failed to flush Register: {} — continuing without IPC",
145 e
146 );
147 return None;
148 }
149 }
150
151 // Phase B.4 — Register confirmed on the wire; safe to expose the
152 // outbound command channel. Sync callers push Commands; a
153 // dedicated drain task writes them as newline-delimited JSON.
154 // Ordered (preserves report order from the UI thread). Bounded?
155 // No — UnboundedSender so `try_send` can stay non-blocking on
156 // the UI thread; the drain task is fast (single async write)
157 // and the volume is one event per window create/close, not
158 // high-frequency.
159 let (tx, mut rx) = mpsc::unbounded_channel::<Command>();
160 if COMMAND_TX.set(tx).is_err() {
161 tracing::warn!("[launcher-ipc] COMMAND_TX already set — connect_to_launcher called twice");
162 }
163 let writer_for_drain = Arc::clone(&writer);
164 tokio::spawn(async move {
165 while let Some(cmd) = rx.recv().await {
166 let mut buf = match serde_json::to_vec(&cmd) {
167 Ok(b) => b,
168 Err(e) => {
169 tracing::warn!("[launcher-ipc] failed to serialize {:?}: {}", cmd, e);
170 continue;
171 }
172 };
173 buf.push(b'\n');
174 let mut w = writer_for_drain.lock().await;
175 if let Err(e) = w.write_all(&buf).await {
176 tracing::warn!(
177 "[launcher-ipc] write failed for {:?}: {} — dropping further commands",
178 cmd, e
179 );
180 return;
181 }
182 if let Err(e) = w.flush().await {
183 tracing::warn!("[launcher-ipc] flush failed: {}", e);
184 return;
185 }
186 }
187 });
188
189 // Spawn a read-loop task. Logs every event and feeds the host's
190 // shadow projections (`shadow_instance_registry` for now;
191 // `shadow_*` for other migrated maps in subsequent B.5 sub-PRs)
192 // from the launcher's authoritative event stream. Host has no
193 // local `WindowInstanceRegistry` post-B.5e — the shadow IS the
194 // read path.
195 //
196 // Phase CPD-5 — the reader now accepts `HostFrame` envelopes
197 // (CPD-1) so the launcher can push saga-issued `Command`s down
198 // alongside `Event`s. Each line is parsed as `HostFrame` first;
199 // on parse failure we fall back to raw `Event` JSON for
200 // backward-compatibility with launchers that haven't yet
201 // adopted the envelope shape (the legacy fanout path emits
202 // bare `Event` JSON; CPD-2's `HostPipe::send_event` wraps in
203 // `HostFrame::Event`).
204 //
205 // Saga-issued `Command`s are dispatched via the host-side
206 // idempotency LRU (`saga_dispatch::dispatch_host_command`):
207 // duplicate `(saga_id, kind)` pairs re-emit the same Report
208 // without re-running the action. The reply Commands are pushed
209 // through the existing `COMMAND_TX` channel, which the drain
210 // task spawned above already serializes to the writer.
211 let state_for_reader = std::sync::Arc::clone(&state);
212 let saga_lru = std::sync::Arc::new(parking_lot::Mutex::new(
213 crate::saga_dispatch::SagaIdempotencyLru::with_default_cap(),
214 ));
215 // The reply path uses the same `COMMAND_TX` published above.
216 // Snapshot a sender clone NOW so the reader doesn't have to
217 // probe the OnceLock on every command (which would also race
218 // with the unset-OnceLock window during the brief gap between
219 // Register flush and `COMMAND_TX.set`).
220 let reply_tx = COMMAND_TX
221 .get()
222 .expect("COMMAND_TX set before reader task spawn")
223 .clone();
224 let saga_runner = std::sync::Arc::new(crate::saga_dispatch::LiveActionRunner {
225 state: std::sync::Arc::clone(&state),
226 });
227 let reader_task = tokio::spawn(async move {
228 let reader = BufReader::new(read_half);
229 let mut lines = reader.lines();
230 loop {
231 match lines.next_line().await {
232 Ok(Some(line)) if line.trim().is_empty() => continue,
233 Ok(Some(line)) => {
234 // Try the envelope first.
235 match serde_json::from_str::<HostFrame>(&line) {
236 Ok(HostFrame::Event(event)) => {
237 tracing::info!("[launcher-ipc] received event: {:?}", event);
238 apply_event_to_shadow(&state_for_reader, &event);
239 }
240 Ok(HostFrame::Command(cmd)) => {
241 tracing::info!(
242 "[launcher-ipc] received saga command: {:?}",
243 cmd
244 );
245 let outcome = crate::saga_dispatch::dispatch_host_command(
246 &cmd,
247 saga_runner.as_ref(),
248 &saga_lru,
249 &reply_tx,
250 );
251 tracing::debug!(
252 "[launcher-ipc] saga command outcome: {:?}",
253 outcome
254 );
255 }
256 Err(_envelope_err) => {
257 // Backward-compat fallback: pre-CPD-2 launchers
258 // emit bare `Event` JSON without the `HostFrame`
259 // wrapper. Parse as raw `Event`; on failure
260 // log and skip.
261 match serde_json::from_str::<Event>(&line) {
262 Ok(event) => {
263 tracing::info!(
264 "[launcher-ipc] received event (legacy bare-Event): {:?}",
265 event
266 );
267 apply_event_to_shadow(&state_for_reader, &event);
268 }
269 Err(e) => {
270 tracing::warn!(
271 "[launcher-ipc] could not parse line as HostFrame or Event ({}): {}",
272 e,
273 line
274 );
275 }
276 }
277 }
278 }
279 }
280 Ok(None) => {
281 tracing::info!("[launcher-ipc] launcher pipe EOF — connection closed");
282 return;
283 }
284 Err(e) => {
285 tracing::warn!("[launcher-ipc] read error: {}", e);
286 return;
287 }
288 }
289 }
290 });
291
292 Some(LauncherIpcHandle {
293 writer,
294 reader_task,
295 })
296}
297
298#[cfg(not(target_os = "windows"))]
299pub async fn connect_to_launcher(
300 _state: std::sync::Arc<crate::state::AppState>,
301) -> Option<LauncherIpcHandle> {
302 None
303}
304
305/// Apply a launcher event to host's shadow projections, then fan
306/// the event out to every top-level renderer via the JS bridge.
307///
308/// Shadows are updated FIRST so any renderer-side code that reads
309/// host state via the IPC HTTP path (e.g. `listWindowInstances`)
310/// sees a consistent view at the moment the typed event lands.
311///
312/// Phase B.7.3.3 — the bespoke `window-instances-changed` re-emit
313/// is gone; renderers now consume typed events directly via the
314/// CEF JS bridge dispatcher (`launcher_event_bridge`).
315/// Pure shadow-state projection — extracted from `apply_event_to_shadow`
316/// for property testing per master spec §8.14 (subscriber idempotency
317/// contract). Mutates ONLY the shadow Mutex<HashMap> fields:
318/// `shadow_instance_registry`, `shadow_window_meta`,
319/// `shadow_backend_window_ids`. No UI tasks, no renderer dispatch, no
320/// CEF calls — those live in the wrapper `apply_event_to_shadow`.
321///
322/// Idempotent by construction: every arm is `HashMap::insert` /
323/// `HashMap::remove` keyed by `label`, both naturally idempotent past
324/// the first call. The drift-compare branch in `WindowOpened` is
325/// read-only (logs a warning, doesn't mutate). Locked in by the
326/// `shadow_projection_idempotent_under_replay` proptest.
327pub(crate) fn apply_shadow_projection(state: &std::sync::Arc<crate::state::AppState>, event: &Event) {
328 match event {
329 Event::WindowInstanceAssigned { label, num, .. } => {
330 // Phase B.5e — host's `WindowInstanceRegistry` was
331 // deleted. The drift compare that used to live here
332 // (B.5b/c/d) is gone; the launcher is the sole
333 // authority and the shadow is the host's projection.
334 state
335 .shadow_instance_registry
336 .lock()
337 .insert(label.clone(), *num);
338 }
339 Event::WindowOpened { label, kind, parent_label, .. } => {
340 // Phase B.5 (window_meta step b) — shadow projection
341 // + drift compare. Map the wire `WindowKind` back to
342 // host's `crate::state::WindowKind` (one-to-one).
343 let host_kind = match kind {
344 agentmux_common::ipc::WindowKind::FullInstance => crate::state::WindowKind::FullInstance,
345 agentmux_common::ipc::WindowKind::Subwindow => crate::state::WindowKind::Subwindow,
346 };
347 let shadow_meta = crate::state::WindowMeta {
348 label: label.clone(),
349 kind: host_kind,
350 parent_instance_id: parent_label.clone(),
351 };
352 // Drift compare to host's authoritative `window_meta`.
353 // Race-tolerant: host's local insert (in
354 // `on_after_created`) may not have run yet if the
355 // launcher event raced ahead. Skip in that case.
356 let host_meta = state.window_meta.lock().get(label).cloned();
357 if let Some(host_meta) = host_meta {
358 if host_meta.kind != shadow_meta.kind
359 || host_meta.parent_instance_id != shadow_meta.parent_instance_id
360 {
361 tracing::warn!(
362 target: "launcher-ipc:drift",
363 label = %label,
364 launcher_kind = ?shadow_meta.kind,
365 launcher_parent = ?shadow_meta.parent_instance_id,
366 host_kind = ?host_meta.kind,
367 host_parent = ?host_meta.parent_instance_id,
368 "[launcher-ipc] window_meta drift: host and launcher disagree"
369 );
370 }
371 }
372 state
373 .shadow_window_meta
374 .lock()
375 .insert(label.clone(), shadow_meta);
376 }
377 Event::WindowClosed { label, .. } => {
378 // Phase B.5 (window_meta step b) — symmetric drop.
379 // Drift compare on close is omitted: the launcher's
380 // strict-pairing semantic (PR #577 round-2) means we
381 // only see this event when the open was paired, so a
382 // host-side missing entry would have surfaced at
383 // open time.
384 state.shadow_window_meta.lock().remove(label);
385 }
386 Event::BackendWindowIdRegistered { label, window_id, .. } => {
387 // Phase B.5 (window_id_map step e) — host's
388 // `window_id_map` was deleted; the drift compare is
389 // gone with it. Shadow is sole source of truth.
390 state
391 .shadow_backend_window_ids
392 .lock()
393 .insert(label.clone(), window_id.clone());
394 }
395 Event::BackendWindowIdUnregistered { label, .. } => {
396 // Phase B.5 (window_id_map step e) — drift compare gone.
397 state.shadow_backend_window_ids.lock().remove(label);
398 }
399 Event::WindowInstanceReleased { label, .. } => {
400 // Phase B.5e — host's `WindowInstanceRegistry` was
401 // deleted; the drift compare is gone with it.
402 state.shadow_instance_registry.lock().remove(label);
403 }
404 // Side-effect arms (UI tasks, reconciler) handled in the
405 // `apply_event_to_shadow` wrapper. Excluded here so this
406 // function stays pure-projection and trivially idempotent.
407 _ => {}
408 }
409}
410
411fn apply_event_to_shadow(state: &std::sync::Arc<crate::state::AppState>, event: &Event) {
412 apply_shadow_projection(state, event);
413 match event {
414 Event::CorrectiveWindowMove { hwnd, target_rect, reason, .. } => {
415 // Phase B.9.2 — pure-reducer self-heal. Reducer detected
416 // an off-monitor / sentinel-parked window that the user
417 // has never foregrounded, and computed an on-monitor
418 // target rect. Apply `SetWindowPos` to move the window
419 // before the user notices the orphan taskbar entry.
420 // The CEF UI thread is the safe caller for SetWindowPos
421 // on a CEF Views-managed window — post a UI task rather
422 // than calling from this tokio task directly.
423 tracing::warn!(
424 target: "wrr",
425 "[wrr] CorrectiveWindowMove hwnd={:#x} reason={:?} target=({},{})-({},{})",
426 hwnd, reason,
427 target_rect.left, target_rect.top, target_rect.right, target_rect.bottom
428 );
429 crate::ui_tasks::post_corrective_window_move(
430 state,
431 *hwnd,
432 target_rect.left,
433 target_rect.top,
434 target_rect.right - target_rect.left,
435 target_rect.bottom - target_rect.top,
436 );
437 }
438 Event::HostShouldQuit { .. } => {
439 // The launcher emits this when it detects the
440 // last-user-visible-window-closed-but-host-alive state.
441 // The host's reconciler closes any orphan
442 // `window-pool-*` browsers so the existing on_before_close
443 // cascade can drain to `browser_list.is_empty()` and
444 // fire `quit_message_loop`.
445 //
446 // Spec: `docs/specs/SPEC_HOST_ORPHAN_RECONCILIATION_2026_05_05.md`.
447 //
448 // We're on the launcher IPC reader thread; CEF
449 // Browser/BrowserHost calls aren't safe here. The
450 // reconciler does state-snapshot + classification only,
451 // then posts a UI-thread task that does the HWND probe
452 // and `host.close_browser(force=1)`. Earlier v0.33.491–
453 // v0.33.494 attempts at driving UI shutdown directly
454 // from this handler hung CEF; using the task scheduler
455 // is the difference.
456 tracing::warn!(
457 target: "wrr",
458 "[wrr] HostShouldQuit received — running orphan reconciler"
459 );
460 crate::commands::orphan_reconcile::reconcile_and_drain(state);
461 }
462 _ => {}
463 }
464
465 // Phase B.7.3.1 — broadcast the typed event to every top-level
466 // renderer. The renderer-side dispatcher (`window.__agentmux_launcher_event`,
467 // installed by `frontend/util/launcher-events.ts`) feeds the
468 // launcher-event-reducer signal that downstream UI subscribes to.
469 crate::launcher_event_bridge::dispatch_to_renderers(state, event);
470}
471
472/// Phase B.4 — sync API: report a window open to the launcher's
473/// state mirror. Called from CEF lifecycle callbacks on the UI
474/// thread. No-op if the launcher pipe isn't connected (`task dev`
475/// mode); failures to enqueue (channel closed, drain task died)
476/// are logged but don't propagate — the host's authoritative state
477/// is unaffected, the mirror just falls behind. B.5 tightens.
478pub fn report_window_opened(
479 label: String,
480 kind: agentmux_common::ipc::WindowKind,
481 parent_label: Option<String>,
482) {
483 let Some(tx) = COMMAND_TX.get() else {
484 return; // launcher not in the loop
485 };
486 let cmd = Command::ReportWindowOpened {
487 label,
488 kind,
489 parent_label,
490 };
491 if let Err(e) = tx.send(cmd) {
492 tracing::warn!("[launcher-ipc] report_window_opened: channel closed ({})", e);
493 }
494}
495
496/// Phase B.4 — sync API: report a window close to the launcher's
497/// state mirror. Same no-op-if-disconnected semantics as
498/// `report_window_opened`.
499pub fn report_window_closed(label: String) {
500 let Some(tx) = COMMAND_TX.get() else {
501 return;
502 };
503 let cmd = Command::ReportWindowClosed { label };
504 if let Err(e) = tx.send(cmd) {
505 tracing::warn!("[launcher-ipc] report_window_closed: channel closed ({})", e);
506 }
507}
508
509/// Phase B.4 follow-up — sync API: report a pool window being added
510/// to the warm pool inventory. Called from `spawn_pool_window` on
511/// the UI thread. No-op when launcher pipe is absent.
512pub fn report_pool_window_added(label: String) {
513 let Some(tx) = COMMAND_TX.get() else {
514 return;
515 };
516 // CPD-1 (schema-only): host's existing `report_pool_window_added`
517 // call sites are organic refills (not yet saga-driven); pass
518 // `saga_id: None` per spec §3.3. CPD-3 wires the saga-driven
519 // path that will pass `Some(N)` through here.
520 let cmd = Command::ReportPoolWindowAdded { label, saga_id: None };
521 if let Err(e) = tx.send(cmd) {
522 tracing::warn!("[launcher-ipc] report_pool_window_added: channel closed ({})", e);
523 }
524}
525
526/// Phase B.4 follow-up — sync API: report a pool window leaving the
527/// pool (promote, destroy). On promote callers should also call
528/// `report_window_opened` so the label transitions atomically (from
529/// the launcher's perspective) from `pool` to `windows`.
530pub fn report_pool_window_removed(label: String) {
531 let Some(tx) = COMMAND_TX.get() else {
532 return;
533 };
534 let cmd = Command::ReportPoolWindowRemoved { label };
535 if let Err(e) = tx.send(cmd) {
536 tracing::warn!("[launcher-ipc] report_pool_window_removed: channel closed ({})", e);
537 }
538}
539
540/// Phase F.5 — sync API: tell the launcher that a pool window is
541/// promoting to a user-visible top-level window. Sent BETWEEN
542/// `report_pool_window_removed` and `report_window_opened` so the
543/// launcher's pool-respawn saga (state-machine bracket around the
544/// implicit refill) can correlate the promote event with the
545/// subsequent `PoolWindowAdded` for the freshly-spawned replacement
546/// pool slot.
547///
548/// Same no-op-if-disconnected semantics as the other `report_*`
549/// helpers — `task dev` mode (no launcher in the loop) silently
550/// drops; the host's authoritative state and refill mechanism are
551/// unaffected.
552pub fn report_pool_window_promoted(label: String) {
553 let Some(tx) = COMMAND_TX.get() else {
554 return;
555 };
556 let cmd = Command::ReportPoolWindowPromoted { label };
557 if let Err(e) = tx.send(cmd) {
558 tracing::warn!("[launcher-ipc] report_pool_window_promoted: channel closed ({})", e);
559 }
560}
561
562/// Phase F.6 — sync API: tell the launcher that all browser-pane
563/// HWNDs belonging to a closing top-level window have been reaped
564/// (lifecycle entries drained, pane HWND map cleared, subwindow
565/// cascade closes initiated). Sent from `client.rs::on_before_close`
566/// AFTER the pane drain step, BEFORE the post-close pool-drain
567/// decision is reported.
568///
569/// The launcher's window-cleanup-cascade saga uses this as the
570/// Step 1 → Step 2 transition signal: it marks the implicit pane
571/// reap as observed and lets the saga issue its `DrainPoolIfLast`
572/// IssueCmd (currently log-only — see `saga/window_cleanup.rs`
573/// module docstring for the saga-as-narrator scope decision).
574///
575/// Same no-op-if-disconnected semantics as the other `report_*`
576/// helpers; `task dev` mode silently drops.
577pub fn report_panes_reaped(label: String) {
578 let Some(tx) = COMMAND_TX.get() else {
579 return;
580 };
581 // CPD-1 (schema-only): existing call sites are organic; CPD-3
582 // adds the saga-driven path that fills `saga_id`.
583 let cmd = Command::ReportPanesReaped { label, saga_id: None };
584 if let Err(e) = tx.send(cmd) {
585 tracing::warn!("[launcher-ipc] report_panes_reaped: channel closed ({})", e);
586 }
587}
588
589/// Phase F.6 — sync API: tell the launcher the result of the
590/// post-close drain-pool-if-last decision. `was_last == true` when
591/// the closing window was the last user-visible window (Stage 1 of
592/// the wrr two-stage close cascade just kicked off in
593/// `client.rs::on_before_close`); `false` when other windows
594/// remain and the warm pool stays warm.
595///
596/// Step 2 terminal signal for the launcher's
597/// window-cleanup-cascade saga. Both branches close the
598/// `SagaStarted` bracket successfully — the saga's job is to
599/// narrate the decision, not enforce a particular outcome.
600///
601/// Same no-op-if-disconnected semantics as the other `report_*`
602/// helpers.
603pub fn report_pool_drain_decision(label: String, was_last: bool) {
604 let Some(tx) = COMMAND_TX.get() else {
605 return;
606 };
607 // CPD-1 (schema-only): existing call sites are organic; CPD-3
608 // adds the saga-driven path that fills `saga_id`.
609 let cmd = Command::ReportPoolDrainDecision {
610 label,
611 was_last,
612 saga_id: None,
613 };
614 if let Err(e) = tx.send(cmd) {
615 tracing::warn!(
616 "[launcher-ipc] report_pool_drain_decision: channel closed ({})",
617 e
618 );
619 }
620}
621
622/// Phase B.4 follow-up — sync API: report the host's current
623/// authoritative counts so the launcher reducer can compare against
624/// its mirror and emit `Event::DriftDetected` on mismatch. Callers
625/// invoke this AFTER each window-level transition so the launcher
626/// gets a fresh snapshot to compare against its just-applied
627/// transition.
628pub fn report_host_counts(windows: u32, pool: u32) {
629 let Some(tx) = COMMAND_TX.get() else {
630 return;
631 };
632 let cmd = Command::ReportHostCounts { windows, pool };
633 if let Err(e) = tx.send(cmd) {
634 tracing::warn!("[launcher-ipc] report_host_counts: channel closed ({})", e);
635 }
636}
637
638/// Phase B.5 (window_id_map step b) — sync API: report the
639/// frontend's `register_backend_window` IPC to the launcher.
640/// Called from `commands/window.rs::register_backend_window`
641/// after the host's local `window_id_map` insert. No-op if the
642/// launcher pipe isn't connected.
643pub fn report_backend_window_id_registered(label: String, window_id: String) {
644 let Some(tx) = COMMAND_TX.get() else {
645 return;
646 };
647 let cmd = Command::ReportBackendWindowIdRegistered { label, window_id };
648 if let Err(e) = tx.send(cmd) {
649 tracing::warn!(
650 "[launcher-ipc] report_backend_window_id_registered: channel closed ({})",
651 e
652 );
653 }
654}
655
656/// Phase B.5 (window_id_map step b) — sync API: report a window's
657/// backend ID being dropped (close path). Called from
658/// `client.rs::on_before_close` after the host's local
659/// `window_id_map.remove`. No-op if launcher pipe absent.
660pub fn report_backend_window_id_unregistered(label: String) {
661 let Some(tx) = COMMAND_TX.get() else {
662 return;
663 };
664 let cmd = Command::ReportBackendWindowIdUnregistered { label };
665 if let Err(e) = tx.send(cmd) {
666 tracing::warn!(
667 "[launcher-ipc] report_backend_window_id_unregistered: channel closed ({})",
668 e
669 );
670 }
671}
672
673/// Phase B.4 follow-up — sync API: report the host's pool count
674/// only. Used by `spawn_pool_window` where the windows dimension
675/// is mid-flight relative to the launcher mirror (refill happens
676/// during a close path that hasn't sent `ReportWindowClosed` yet);
677/// snapshotting only the pool dimension preserves the
678/// check-every-transition guarantee without producing false
679/// windows-drift. (codex P2 PR #578 round-3.)
680pub fn report_host_pool_count(count: u32) {
681 let Some(tx) = COMMAND_TX.get() else {
682 return;
683 };
684 let cmd = Command::ReportHostPoolCount { count };
685 if let Err(e) = tx.send(cmd) {
686 tracing::warn!("[launcher-ipc] report_host_pool_count: channel closed ({})", e);
687 }
688}
689
690/// Phase B.9.1 (WRR) — sync API: report a Win32 HWND created.
691/// Called from the WRR `SetWinEventHook` callback. No-op if the
692/// launcher pipe isn't connected (`task dev` mode); reducer arm
693/// stashes pending-without-label until reconciliation.
694pub fn report_hwnd_opened(
695 hwnd: u64,
696 class_name: String,
697 title: String,
698 label_hint: Option<String>,
699) {
700 let Some(tx) = COMMAND_TX.get() else {
701 return;
702 };
703 let cmd = Command::ReportHwndOpened {
704 hwnd,
705 class_name,
706 title,
707 label_hint,
708 };
709 if let Err(e) = tx.send(cmd) {
710 tracing::warn!("[launcher-ipc] report_hwnd_opened: channel closed ({})", e);
711 }
712}
713
714/// Phase B.9.1 — sync API: report a Win32 HWND destroyed.
715pub fn report_hwnd_destroyed(hwnd: u64) {
716 let Some(tx) = COMMAND_TX.get() else {
717 return;
718 };
719 let cmd = Command::ReportHwndDestroyed { hwnd };
720 if let Err(e) = tx.send(cmd) {
721 tracing::warn!("[launcher-ipc] report_hwnd_destroyed: channel closed ({})", e);
722 }
723}
724
725/// Phase B.9.1 — sync API: report visibility change.
726pub fn report_hwnd_visibility_changed(hwnd: u64, visible: bool) {
727 let Some(tx) = COMMAND_TX.get() else {
728 return;
729 };
730 let cmd = Command::ReportHwndVisibilityChanged { hwnd, visible };
731 if let Err(e) = tx.send(cmd) {
732 tracing::warn!("[launcher-ipc] report_hwnd_visibility_changed: channel closed ({})", e);
733 }
734}
735
736/// Phase B.9.1 — sync API: report foreground change.
737pub fn report_hwnd_foreground_changed(hwnd: u64) {
738 let Some(tx) = COMMAND_TX.get() else {
739 return;
740 };
741 let cmd = Command::ReportHwndForegroundChanged { hwnd };
742 if let Err(e) = tx.send(cmd) {
743 tracing::warn!("[launcher-ipc] report_hwnd_foreground_changed: channel closed ({})", e);
744 }
745}
746
747/// Phase B.9.1 — sync API: report iconic (minimized) change.
748pub fn report_hwnd_iconic_changed(hwnd: u64, iconic: bool) {
749 let Some(tx) = COMMAND_TX.get() else {
750 return;
751 };
752 let cmd = Command::ReportHwndIconicChanged { hwnd, iconic };
753 if let Err(e) = tx.send(cmd) {
754 tracing::warn!("[launcher-ipc] report_hwnd_iconic_changed: channel closed ({})", e);
755 }
756}
757
758/// Phase B.9.1 — sync API: report position change. Caller is
759/// responsible for debouncing — see `wrr/position_debounce.rs`.
760pub fn report_hwnd_position_changed(hwnd: u64, rect: agentmux_common::ipc::Rect) {
761 let Some(tx) = COMMAND_TX.get() else {
762 return;
763 };
764 let cmd = Command::ReportHwndPositionChanged { hwnd, rect };
765 if let Err(e) = tx.send(cmd) {
766 tracing::warn!("[launcher-ipc] report_hwnd_position_changed: channel closed ({})", e);
767 }
768}
769
770/// Phase B.9.1 — sync API: report current monitor topology. Sent
771/// once at install time; mid-session topology changes are a B.9.2
772/// follow-up.
773pub fn report_monitor_topology_changed(rects: Vec<agentmux_common::ipc::Rect>) {
774 let Some(tx) = COMMAND_TX.get() else {
775 return;
776 };
777 let cmd = Command::ReportMonitorTopologyChanged { rects };
778 if let Err(e) = tx.send(cmd) {
779 tracing::warn!("[launcher-ipc] report_monitor_topology_changed: channel closed ({})", e);
780 }
781}
782
783/// Phase B.4 follow-up — compute the host's authoritative counts
784/// from `AppState` and report them. Callers invoke this AFTER
785/// each window/pool transition.
786///
787/// Atomic snapshot: holds both `unpromoted_pool_labels` and
788/// `browsers` simultaneously so the reported `(windows, pool)`
789/// pair is from one consistent state. Without this, a concurrent
790/// mutation between the two lock acquisitions (CEF lifecycle on
791/// the UI thread vs. IPC handler in `commands/drag.rs`) could
792/// produce a mismatched snapshot and trigger a spurious
793/// `Event::DriftDetected`. (codex P2 PR #578 round-1.)
794///
795/// Lock order: `unpromoted_pool_labels` first, then `browsers`.
796/// Matches the existing snapshot pattern in
797/// `client.rs::on_before_close` (line ~418) and is the only place
798/// in the codebase that holds both locks simultaneously, so no
799/// other path can race in the reverse order.
800///
801/// Counts (matching the launcher's mirror semantics):
802/// * `windows` — top-level user-visible windows in `browsers`,
803/// excluding `browser-pane-*` child HWNDs and any label still
804/// in `unpromoted_pool_labels`.
805/// * `pool` — pre-promote pool labels (`unpromoted_pool_labels.len()`).
806///
807/// **Why this reads host's `browsers` and `unpromoted_pool_labels`
808/// directly (not the shadow):** this fn IS the source for the
809/// launcher's mirror — its output is what gets compared against
810/// `state.windows.len()` / `state.pool.len()` in the drift-detection
811/// path. Reading from the shadow would compare the shadow against
812/// itself (always agrees) and defeat the entire B.4 drift-detection
813/// design. Once the host reducer arrives in Phase F (see
814/// `docs/retro/multi-reducer-proposal-2026-04-28.md`), this becomes
815/// "report host's authoritative reducer-state to the launcher."
816pub fn compute_and_report_host_counts(state: &std::sync::Arc<crate::state::AppState>) {
817 // Atomic snapshot — pool inventory + browsers under ONE
818 // `host_state` lock. Two-lock variants race against
819 // `promote_pool_window` between reads and let queued pool
820 // windows leak into the user-window count, triggering spurious
821 // launcher drift-detection.
822 let (windows, pool) = state.host_counts_snapshot();
823 report_host_counts(windows, pool);
824}
825
826#[cfg(test)]
827mod shadow_projection_tests {
828 //! Master spec §8.14 — subscriber idempotency property tests for
829 //! the host's shadow projection. The launcher event channel may
830 //! deliver duplicates (re-dispatch, resync, replay); the contract
831 //! is that subscribers fold them into a no-op past the first
832 //! application.
833 //!
834 //! These tests target `apply_shadow_projection`, the pure
835 //! HashMap-mutation slice of `apply_event_to_shadow`. Side-effect
836 //! arms (`CorrectiveWindowMove`, `HostShouldQuit`) are excluded
837 //! from the projection function and tested separately at the
838 //! integration level.
839
840 use super::*;
841 use agentmux_common::ipc::{Event, WindowKind};
842 use proptest::prelude::*;
843 use std::sync::Arc;
844
845 fn snapshot_shadow_state(
846 state: &Arc<crate::state::AppState>,
847 ) -> (
848 std::collections::HashMap<String, crate::state::WindowMeta>,
849 std::collections::HashMap<String, String>,
850 std::collections::HashMap<String, u32>,
851 ) {
852 (
853 state.shadow_window_meta.lock().clone(),
854 state.shadow_backend_window_ids.lock().clone(),
855 state.shadow_instance_registry.lock().clone(),
856 )
857 }
858
859 /// Strategy: events the projection actually mutates. Side-effect
860 /// arms (CorrectiveWindowMove, HostShouldQuit) excluded — they
861 /// don't reach `apply_shadow_projection`'s match.
862 ///
863 /// Labels drawn from `[a-c]{1,3}` so duplicates (open then close
864 /// for same label) are common. The launcher's strict-pairing
865 /// semantic means real production never sends close-without-open,
866 /// but the projection itself doesn't enforce that, and replaying
867 /// either order through `apply_shadow_projection` should still
868 /// converge to the same shadow state.
869 fn arb_projection_event() -> impl Strategy<Value = Event> {
870 prop_oneof![
871 3 => (
872 "[a-c]{1,3}",
873 prop_oneof![Just(WindowKind::FullInstance), Just(WindowKind::Subwindow)],
874 prop_oneof![Just(None::<String>), Just(Some("a".into()))],
875 )
876 .prop_map(|(label, kind, parent_label)| Event::WindowOpened {
877 label,
878 kind,
879 parent_label,
880 version: 0,
881 }),
882 3 => "[a-c]{1,3}".prop_map(|label| Event::WindowClosed {
883 label,
884 version: 0,
885 crash_detected: false,
886 }),
887 2 => ("[a-c]{1,3}", 1u32..100u32).prop_map(|(label, num)| {
888 Event::WindowInstanceAssigned { label, num, version: 0 }
889 }),
890 1 => ("[a-c]{1,3}", 1u32..100u32).prop_map(|(label, num)| {
891 Event::WindowInstanceReleased { label, num, version: 0 }
892 }),
893 2 => ("[a-c]{1,3}", "[0-9a-f]{4,8}").prop_map(|(label, window_id)| {
894 Event::BackendWindowIdRegistered { label, window_id, version: 0 }
895 }),
896 1 => ("[a-c]{1,3}", "[0-9a-f]{4,8}").prop_map(|(label, window_id)| {
897 Event::BackendWindowIdUnregistered { label, window_id, version: 0 }
898 }),
899 ]
900 }
901
902 proptest! {
903 #![proptest_config(ProptestConfig {
904 cases: 64,
905 ..ProptestConfig::default()
906 })]
907
908 /// Master spec §8.14 — replaying any event sequence twice
909 /// converges to the same shadow state. Property: for any
910 /// sequence S of events, `apply(S) == apply(S; S)`.
911 ///
912 /// Drift-storm regression class (PR #708): if a launcher
913 /// event reaches the host's `apply_event_to_shadow` 600
914 /// times instead of once, the shadow projection MUST stay
915 /// in the same final state. By construction
916 /// (HashMap::insert/remove are idempotent for the same key),
917 /// this holds — the proptest locks it.
918 #[test]
919 fn shadow_projection_idempotent_under_replay(
920 events in proptest::collection::vec(arb_projection_event(), 0..40)
921 ) {
922 let state = Arc::new(crate::state::AppState::default());
923
924 // Apply once.
925 for e in &events {
926 apply_shadow_projection(&state, e);
927 }
928 let (meta1, ids1, registry1) = snapshot_shadow_state(&state);
929
930 // Apply the SAME sequence again — every event is a duplicate.
931 for e in &events {
932 apply_shadow_projection(&state, e);
933 }
934 let (meta2, ids2, registry2) = snapshot_shadow_state(&state);
935
936 prop_assert_eq!(meta1, meta2, "shadow_window_meta diverged on replay");
937 prop_assert_eq!(ids1, ids2, "shadow_backend_window_ids diverged on replay");
938 prop_assert_eq!(registry1, registry2, "shadow_instance_registry diverged on replay");
939 }
940
941 /// Stronger property: arbitrary per-event duplication count
942 /// (1..5x per event) produces the same final state as
943 /// applying each event once. This is the §8.14 contract
944 /// directly — duplicates must fold to no-op.
945 #[test]
946 fn shadow_projection_collapses_arbitrary_duplicates(
947 events in proptest::collection::vec(arb_projection_event(), 1..30),
948 dup_counts in proptest::collection::vec(1u8..6, 1..30),
949 ) {
950 // Pair events with duplication counts, taking the shorter length.
951 let n = events.len().min(dup_counts.len());
952
953 // Run 1: apply each event once.
954 let state_once = Arc::new(crate::state::AppState::default());
955 for e in events.iter().take(n) {
956 apply_shadow_projection(&state_once, e);
957 }
958 let baseline = snapshot_shadow_state(&state_once);
959
960 // Run 2: apply each event N times (1..5x).
961 let state_dup = Arc::new(crate::state::AppState::default());
962 for (e, &dups) in events.iter().take(n).zip(dup_counts.iter().take(n)) {
963 for _ in 0..dups {
964 apply_shadow_projection(&state_dup, e);
965 }
966 }
967 let inflated = snapshot_shadow_state(&state_dup);
968
969 prop_assert_eq!(baseline.0, inflated.0, "shadow_window_meta diverged under duplicate-bursting");
970 prop_assert_eq!(baseline.1, inflated.1, "shadow_backend_window_ids diverged under duplicate-bursting");
971 prop_assert_eq!(baseline.2, inflated.2, "shadow_instance_registry diverged under duplicate-bursting");
972 }
973 }
974
975 /// Anti-vacuity guard (per `feedback_property_test_input_must_match_sut_filter`):
976 /// confirm the projection actually mutates the shadow state under
977 /// the strategy's events. If the strategy ever drifts away from
978 /// the SUT (e.g. event variants renamed), this test fails loudly
979 /// instead of letting the property hold vacuously.
980 #[test]
981 fn projection_actually_mutates_state_for_strategy_events() {
982 let state = Arc::new(crate::state::AppState::default());
983 apply_shadow_projection(
984 &state,
985 &Event::WindowOpened {
986 label: "a".to_string(),
987 kind: WindowKind::FullInstance,
988 parent_label: None,
989 version: 0,
990 },
991 );
992 assert!(
993 state.shadow_window_meta.lock().contains_key("a"),
994 "projection failed to mutate shadow_window_meta — strategy/SUT drift?"
995 );
996 apply_shadow_projection(
997 &state,
998 &Event::BackendWindowIdRegistered {
999 label: "a".to_string(),
1000 window_id: "w-1".to_string(),
1001 version: 0,
1002 },
1003 );
1004 assert!(
1005 state.shadow_backend_window_ids.lock().contains_key("a"),
1006 "projection failed to mutate shadow_backend_window_ids"
1007 );
1008 apply_shadow_projection(
1009 &state,
1010 &Event::WindowInstanceAssigned {
1011 label: "a".to_string(),
1012 num: 7,
1013 version: 0,
1014 },
1015 );
1016 assert_eq!(
1017 state.shadow_instance_registry.lock().get("a"),
1018 Some(&7),
1019 "projection failed to mutate shadow_instance_registry"
1020 );
1021 }
1022}