agentmux_cef/srv_ipc.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.2c.5a — host-side client for the srv reducer's named-pipe
5// IPC server. Connects on host startup, sends `Register`, and runs
6// a read loop that forwards every event to the renderer via
7// `srv_event_bridge::dispatch_to_renderers`.
8//
9// This is the host-bridge half of E.2c.5; the renderer dispatcher
10// half (the JS handler `window.__agentmux_srv_event`) lands as a
11// separate frontend PR (E.2c.5b).
12//
13// Activated only when `AGENTMUX_SRV_PIPE_PATH` is set — the env var
14// the launcher provides post-E.1b. Absent → host runs without the
15// srv bridge; renderer falls back to the bespoke
16// `waveobj:update` HTTP/WS path (still wired during the migration).
17//
18// Mirrors `launcher_ipc::connect_to_launcher` but slimmer:
19// * No outbound command channel (host doesn't issue srv commands
20// today; saga coordinator E.5+ adds the producer).
21// * No shadow state tracking (host's existing state shadow the
22// launcher reducer; srv events go straight through to the
23// renderer).
24//
25// Reconnect / resync semantics: B.3 launcher pattern leaves them
26// for a follow-up; same here. If the srv pipe drops, we log and
27// stop forwarding. Renderer falls back to the legacy HTTP/WS path
28// until the host restarts. E.2c.5b/E.5 will tighten this once it
29// matters (saga consumers can't tolerate dropped events).
30
31use std::sync::Arc;
32
33use agentmux_common::ipc::{ClientKind, Command, Event};
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
35
36#[cfg(target_os = "windows")]
37use tokio::net::windows::named_pipe::ClientOptions;
38
39/// Handle held by main.rs for the host's lifetime so the srv pipe
40/// connection stays open. Dropping it closes the pipe.
41#[cfg(target_os = "windows")]
42pub struct SrvIpcHandle {
43 #[allow(dead_code)]
44 reader_task: tokio::task::JoinHandle<()>,
45}
46
47#[cfg(not(target_os = "windows"))]
48pub struct SrvIpcHandle;
49
50/// If `AGENTMUX_SRV_PIPE_PATH` is set, connect, Register as Host,
51/// spawn a read loop that forwards srv events to all top-level
52/// renderers. Returns a handle to keep the connection alive.
53///
54/// Errors are logged but non-fatal: a srv-IPC failure should NOT
55/// prevent the host from running. The renderer continues using the
56/// legacy `waveobj:update` HTTP/WS path until the connection comes
57/// back at the next host start.
58#[cfg(target_os = "windows")]
59pub async fn connect_to_srv(
60 state: std::sync::Arc<crate::state::AppState>,
61) -> Option<SrvIpcHandle> {
62 let pipe_path = match std::env::var("AGENTMUX_SRV_PIPE_PATH") {
63 Ok(p) if !p.is_empty() => p,
64 _ => {
65 tracing::info!(
66 "AGENTMUX_SRV_PIPE_PATH unset — running without srv IPC bridge (dev mode)"
67 );
68 return None;
69 }
70 };
71
72 let client = match ClientOptions::new().open(&pipe_path) {
73 Ok(c) => c,
74 Err(e) => {
75 tracing::error!(
76 "[srv-ipc] failed to open {}: {} — continuing without srv bridge",
77 pipe_path,
78 e
79 );
80 return None;
81 }
82 };
83 tracing::info!("[srv-ipc] connected to {}", pipe_path);
84
85 let (read_half, mut write_half) = tokio::io::split(client);
86
87 // Send Register FIRST (server enforces register-first; violation
88 // is a fatal close).
89 let register = Command::Register {
90 kind: ClientKind::Host,
91 pid: std::process::id(),
92 version: env!("CARGO_PKG_VERSION").to_string(),
93 };
94 let mut buf = match serde_json::to_vec(®ister) {
95 Ok(b) => b,
96 Err(e) => {
97 tracing::error!("[srv-ipc] failed to serialize Register: {}", e);
98 return None;
99 }
100 };
101 buf.push(b'\n');
102 if let Err(e) = write_half.write_all(&buf).await {
103 tracing::error!("[srv-ipc] failed to send Register: {} — bailing", e);
104 return None;
105 }
106 if let Err(e) = write_half.flush().await {
107 tracing::error!("[srv-ipc] failed to flush Register: {} — bailing", e);
108 return None;
109 }
110
111 // Read loop: parse newline-delimited Events and forward each to
112 // every top-level renderer.
113 let state_for_reader = Arc::clone(&state);
114 let reader_task = tokio::spawn(async move {
115 let reader = BufReader::new(read_half);
116 let mut lines = reader.lines();
117 loop {
118 match lines.next_line().await {
119 Ok(Some(line)) if line.trim().is_empty() => continue,
120 Ok(Some(line)) => match serde_json::from_str::<Event>(&line) {
121 Ok(event) => {
122 crate::srv_event_bridge::dispatch_to_renderers(&state_for_reader, &event);
123 }
124 Err(e) => {
125 tracing::warn!(
126 "[srv-ipc] could not parse event line ({}): {}",
127 e,
128 line
129 );
130 }
131 },
132 Ok(None) => {
133 tracing::info!("[srv-ipc] srv pipe EOF — connection closed");
134 return;
135 }
136 Err(e) => {
137 tracing::warn!("[srv-ipc] read error: {}", e);
138 return;
139 }
140 }
141 }
142 });
143
144 // Keep the writer alive (Host doesn't send Commands today; saga
145 // coordinator E.5+ adds the producer). For now the writer is
146 // moved into a background task that just holds it open; dropping
147 // would close the pipe and trigger Goodbye on the server.
148 let _writer_keepalive = tokio::spawn(async move {
149 // No-op: just owns write_half so it isn't dropped.
150 let _ = write_half;
151 std::future::pending::<()>().await;
152 });
153
154 Some(SrvIpcHandle { reader_task })
155}
156
157#[cfg(not(target_os = "windows"))]
158pub async fn connect_to_srv(
159 _state: std::sync::Arc<crate::state::AppState>,
160) -> Option<SrvIpcHandle> {
161 None
162}