agentmux_cef/
srv_ipc.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.2c.5a — host-side client for the srv reducer's named-pipe
5// IPC server. Connects on host startup, sends `Register`, and runs
6// a read loop that forwards every event to the renderer via
7// `srv_event_bridge::dispatch_to_renderers`.
8//
9// This is the host-bridge half of E.2c.5; the renderer dispatcher
10// half (the JS handler `window.__agentmux_srv_event`) lands as a
11// separate frontend PR (E.2c.5b).
12//
13// Activated only when `AGENTMUX_SRV_PIPE_PATH` is set — the env var
14// the launcher provides post-E.1b. Absent → host runs without the
15// srv bridge; renderer falls back to the bespoke
16// `waveobj:update` HTTP/WS path (still wired during the migration).
17//
18// Mirrors `launcher_ipc::connect_to_launcher` but slimmer:
19//   * No outbound command channel (host doesn't issue srv commands
20//     today; saga coordinator E.5+ adds the producer).
21//   * No shadow state tracking (host's existing state shadow the
22//     launcher reducer; srv events go straight through to the
23//     renderer).
24//
25// Reconnect / resync semantics: B.3 launcher pattern leaves them
26// for a follow-up; same here. If the srv pipe drops, we log and
27// stop forwarding. Renderer falls back to the legacy HTTP/WS path
28// until the host restarts. E.2c.5b/E.5 will tighten this once it
29// matters (saga consumers can't tolerate dropped events).
30
31use std::sync::Arc;
32
33use agentmux_common::ipc::{ClientKind, Command, Event};
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
35
36#[cfg(target_os = "windows")]
37use tokio::net::windows::named_pipe::ClientOptions;
38
39/// Handle held by main.rs for the host's lifetime so the srv pipe
40/// connection stays open. Dropping it closes the pipe.
41#[cfg(target_os = "windows")]
42pub struct SrvIpcHandle {
43    #[allow(dead_code)]
44    reader_task: tokio::task::JoinHandle<()>,
45}
46
47#[cfg(not(target_os = "windows"))]
48pub struct SrvIpcHandle;
49
50/// If `AGENTMUX_SRV_PIPE_PATH` is set, connect, Register as Host,
51/// spawn a read loop that forwards srv events to all top-level
52/// renderers. Returns a handle to keep the connection alive.
53///
54/// Errors are logged but non-fatal: a srv-IPC failure should NOT
55/// prevent the host from running. The renderer continues using the
56/// legacy `waveobj:update` HTTP/WS path until the connection comes
57/// back at the next host start.
58#[cfg(target_os = "windows")]
59pub async fn connect_to_srv(
60    state: std::sync::Arc<crate::state::AppState>,
61) -> Option<SrvIpcHandle> {
62    let pipe_path = match std::env::var("AGENTMUX_SRV_PIPE_PATH") {
63        Ok(p) if !p.is_empty() => p,
64        _ => {
65            tracing::info!(
66                "AGENTMUX_SRV_PIPE_PATH unset — running without srv IPC bridge (dev mode)"
67            );
68            return None;
69        }
70    };
71
72    let client = match ClientOptions::new().open(&pipe_path) {
73        Ok(c) => c,
74        Err(e) => {
75            tracing::error!(
76                "[srv-ipc] failed to open {}: {} — continuing without srv bridge",
77                pipe_path,
78                e
79            );
80            return None;
81        }
82    };
83    tracing::info!("[srv-ipc] connected to {}", pipe_path);
84
85    let (read_half, mut write_half) = tokio::io::split(client);
86
87    // Send Register FIRST (server enforces register-first; violation
88    // is a fatal close).
89    let register = Command::Register {
90        kind: ClientKind::Host,
91        pid: std::process::id(),
92        version: env!("CARGO_PKG_VERSION").to_string(),
93    };
94    let mut buf = match serde_json::to_vec(&register) {
95        Ok(b) => b,
96        Err(e) => {
97            tracing::error!("[srv-ipc] failed to serialize Register: {}", e);
98            return None;
99        }
100    };
101    buf.push(b'\n');
102    if let Err(e) = write_half.write_all(&buf).await {
103        tracing::error!("[srv-ipc] failed to send Register: {} — bailing", e);
104        return None;
105    }
106    if let Err(e) = write_half.flush().await {
107        tracing::error!("[srv-ipc] failed to flush Register: {} — bailing", e);
108        return None;
109    }
110
111    // Read loop: parse newline-delimited Events and forward each to
112    // every top-level renderer.
113    let state_for_reader = Arc::clone(&state);
114    let reader_task = tokio::spawn(async move {
115        let reader = BufReader::new(read_half);
116        let mut lines = reader.lines();
117        loop {
118            match lines.next_line().await {
119                Ok(Some(line)) if line.trim().is_empty() => continue,
120                Ok(Some(line)) => match serde_json::from_str::<Event>(&line) {
121                    Ok(event) => {
122                        crate::srv_event_bridge::dispatch_to_renderers(&state_for_reader, &event);
123                    }
124                    Err(e) => {
125                        tracing::warn!(
126                            "[srv-ipc] could not parse event line ({}): {}",
127                            e,
128                            line
129                        );
130                    }
131                },
132                Ok(None) => {
133                    tracing::info!("[srv-ipc] srv pipe EOF — connection closed");
134                    return;
135                }
136                Err(e) => {
137                    tracing::warn!("[srv-ipc] read error: {}", e);
138                    return;
139                }
140            }
141        }
142    });
143
144    // Keep the writer alive (Host doesn't send Commands today; saga
145    // coordinator E.5+ adds the producer). For now the writer is
146    // moved into a background task that just holds it open; dropping
147    // would close the pipe and trigger Goodbye on the server.
148    let _writer_keepalive = tokio::spawn(async move {
149        // No-op: just owns write_half so it isn't dropped.
150        let _ = write_half;
151        std::future::pending::<()>().await;
152    });
153
154    Some(SrvIpcHandle { reader_task })
155}
156
157#[cfg(not(target_os = "windows"))]
158pub async fn connect_to_srv(
159    _state: std::sync::Arc<crate::state::AppState>,
160) -> Option<SrvIpcHandle> {
161    None
162}