agentmux_srv\srv_ipc/
server.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.1b — srv pipe IPC server. Adapted from
5// agentmux-launcher::ipc::server with srv-specific tweaks:
6//
7//   * No "host-only" enforcement gate — srv accepts commands from
8//     any registered client (per Phase E §5: workspace / tab / block
9//     commands eventually originate from renderer-via-host or from
10//     Tools).
11//   * No WRR drift logging — that's launcher-domain.
12//   * Identity patching is sentinel-aware: the srv reducer emits
13//     `Event::Registered { launcher_pid: 0, launcher_version: "" }`
14//     and the server fills both fields with srv's identity (process
15//     id + crate version) before broadcast — same convention the
16//     launcher uses for its own pipe replies.
17//
18// Future refactor: lift the shared parts (broadcast bus, fanout
19// task, GetEvents intercept) into agentmux-common. Phase E.7
20// cleanup PR. For E.1b copy/adapt to keep the diff scoped.
21
22use std::sync::Arc;
23
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::sync::Mutex;
26
27#[cfg(target_os = "windows")]
28use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
29
30use agentmux_common::ipc::{ClientKind, Command, ErrorCode, Event};
31
32use crate::reducer;
33use crate::state::State;
34
35#[derive(Debug)]
36pub struct ServerCtx {
37    pub srv_pid: u32,
38    pub srv_version: String,
39    pub state: Arc<Mutex<State>>,
40    pub events_tx: tokio::sync::broadcast::Sender<Event>,
41    pub event_log: Arc<crate::event_log::EventLog>,
42}
43
44#[cfg(target_os = "windows")]
45pub fn bind_first_pipe_instance(pipe_name: &str) -> std::io::Result<NamedPipeServer> {
46    ServerOptions::new()
47        .first_pipe_instance(true)
48        .create(pipe_name)
49}
50
51#[cfg(target_os = "windows")]
52pub fn run_srv_ipc_server(
53    pipe_name: String,
54    first: NamedPipeServer,
55    ctx: ServerCtx,
56) -> tokio::task::JoinHandle<()> {
57    tokio::spawn(async move {
58        let ctx = Arc::new(ctx);
59        tracing::info!(target: "srv-ipc", "[srv-ipc] server starting on {}", pipe_name);
60
61        let mut current = first;
62
63        loop {
64            if let Err(e) = current.connect().await {
65                tracing::warn!(target: "srv-ipc", "[srv-ipc] connect failed: {} — recreating instance", e);
66                current = match ServerOptions::new().create(&pipe_name) {
67                    Ok(s) => s,
68                    Err(create_err) => {
69                        tracing::error!(target: "srv-ipc", "[srv-ipc] FATAL: failed to recreate pipe after connect error: {}", create_err);
70                        return;
71                    }
72                };
73                continue;
74            }
75
76            let accepted = current;
77            current = match ServerOptions::new().create(&pipe_name) {
78                Ok(s) => s,
79                Err(e) => {
80                    tracing::error!(target: "srv-ipc", "[srv-ipc] FATAL: failed to create next pipe instance: {}", e);
81                    tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
82                    return;
83                }
84            };
85
86            tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
87        }
88    })
89}
90
91#[cfg(not(target_os = "windows"))]
92pub fn run_srv_ipc_server(
93    _pipe_name: String,
94    _ctx: ServerCtx,
95) -> tokio::task::JoinHandle<()> {
96    // Phase 7 will add Unix domain socket support.
97    tokio::spawn(async {})
98}
99
100#[cfg(target_os = "windows")]
101async fn handle_connection(stream: NamedPipeServer, ctx: Arc<ServerCtx>) {
102    let (read_half, write_half) = tokio::io::split(stream);
103    let writer = Arc::new(Mutex::new(write_half));
104    let reader = BufReader::new(read_half);
105    let mut lines = reader.lines();
106
107    let mut registered_kind: Option<ClientKind> = None;
108    let mut registered_pid: Option<u32> = None;
109    let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
110
111    // Per-connection fanout task — subscribed BEFORE we start reading
112    // commands so concurrent broadcasts aren't lost in the pre-recv
113    // window. Same pattern as launcher's server.
114    let fanout_handle = {
115        let writer = Arc::clone(&writer);
116        let mut events_rx = ctx.events_tx.subscribe();
117        tokio::spawn(async move {
118            loop {
119                match events_rx.recv().await {
120                    Ok(event) => {
121                        // Identity is patched at the publisher
122                        // (before log + bus); no re-patch here.
123                        if send_event(&writer, event).await.is_err() {
124                            return;
125                        }
126                    }
127                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
128                        tracing::warn!(target: "srv-ipc", "[srv-ipc] conn_id={} lagged event bus, missed {} events", conn_id, n);
129                    }
130                    Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
131                }
132            }
133        })
134    };
135
136    loop {
137        let line = match lines.next_line().await {
138            Ok(Some(l)) => l,
139            Ok(None) => {
140                tracing::info!(target: "srv-ipc", "[srv-ipc] connection conn_id={} closed (kind={:?}, pid={:?})", conn_id, registered_kind, registered_pid);
141                // Phase E.1b — synthetic Goodbye on ungraceful
142                // disconnect so the reducer marks the PID Exited;
143                // otherwise reconnect-from-same-PID hits
144                // AlreadyRegistered. (codex P1 #610.)
145                dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
146                fanout_handle.abort();
147                return;
148            }
149            Err(e) => {
150                tracing::warn!(target: "srv-ipc", "[srv-ipc] read error conn_id={}: {}", conn_id, e);
151                dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
152                fanout_handle.abort();
153                return;
154            }
155        };
156
157        if line.trim().is_empty() {
158            continue;
159        }
160
161        let cmd = match serde_json::from_str::<Command>(&line) {
162            Ok(c) => c,
163            Err(e) => {
164                // Phase E.1b — parse errors are connection-private
165                // (sent only to the offender, not broadcast, not
166                // appended to the event log). Don't bump the global
167                // event_version: other subscribers would see version
168                // gaps and treat them as missed events. Use 0 as a
169                // sentinel for "not part of the ordered stream."
170                // (codex P2 #610.)
171                let _ = send_event(
172                    &writer,
173                    Event::Error {
174                        code: ErrorCode::InvalidCommand,
175                        message: format!("parse failed: {}", e),
176                        fatal: false,
177                        version: 0,
178                    },
179                )
180                .await;
181                continue;
182            }
183        };
184
185        // Enforce Register-first at the connection level.
186        if let Some(reply) = enforce_register_first(&cmd, &registered_kind, &ctx).await {
187            let close = matches!(&reply, Event::Error { fatal: true, .. });
188            let _ = send_event(&writer, reply).await;
189            if close {
190                fanout_handle.abort();
191                return;
192            }
193            continue;
194        }
195        if let Command::Register { .. } = &cmd {
196            if registered_kind.is_some() {
197                // Phase E.1b — connection-private error; sentinel
198                // version=0 (codex P2 #610).
199                let _ = send_event(
200                    &writer,
201                    Event::Error {
202                        code: ErrorCode::AlreadyRegistered,
203                        message: "Register sent twice on the same connection".into(),
204                        fatal: false,
205                        version: 0,
206                    },
207                )
208                .await;
209                continue;
210            }
211        }
212
213        let pre_register = if let Command::Register { kind, pid, .. } = &cmd {
214            Some((*kind, *pid))
215        } else {
216            None
217        };
218
219        // Phase D.3 — `GetEvents` is intercepted before the reducer
220        // (log query is I/O-adjacent; reducer stays pure).
221        //
222        // The reply (`Event::EventList`) is sent DIRECTLY to the
223        // requesting connection — NOT broadcast on the shared bus.
224        // EventList is request/response, not a state transition;
225        // broadcasting it would force every subscriber to process
226        // foreign replay payloads (potentially treating them as their
227        // own catch-up data, duplicating state application). (codex
228        // P1 #610.)
229        if let Command::GetEvents { since } = &cmd {
230            // Phase E.1b — read the current version WITHOUT bumping
231            // (codex P2 #610). EventList is connection-private; the
232            // version it carries is the "as-of" point for the
233            // requester's next resync, not a new state-transition
234            // marker. Bumping would create a global gap that other
235            // subscribers see as missed events.
236            let v = {
237                let state = ctx.state.lock().await;
238                state.event_version
239            };
240            let replay = ctx.event_log.events_since(*since);
241            if ctx.event_log.replay_truncated(*since) {
242                tracing::warn!(target: "srv-ipc", "[srv-ipc] conn_id={} GetEvents since={} truncated", conn_id, since);
243            }
244            let _ = send_event(
245                &writer,
246                Event::EventList {
247                    events: replay,
248                    version: v,
249                },
250            )
251            .await;
252            continue;
253        }
254
255        let now_rfc3339 = chrono::Utc::now().to_rfc3339();
256        let events = {
257            let mut state = ctx.state.lock().await;
258            let rctx = reducer::Ctx {
259                now_rfc3339,
260                conn_id,
261                registered_pid,
262            };
263            reducer::update(&mut state, cmd.clone(), &rctx)
264        };
265
266        if let Some((kind, pid)) = pre_register {
267            let rejected = events
268                .iter()
269                .any(|e| matches!(e, Event::Error { code: ErrorCode::AlreadyRegistered, .. }));
270            if !rejected {
271                registered_kind = Some(kind);
272                registered_pid = Some(pid);
273            }
274        }
275
276        let goodbye = matches!(cmd, Command::Goodbye);
277        for event in events {
278            // Phase E.1b — patch sentinel identity BEFORE log + bus
279            // (per launcher's E.1a fix for codex P2 #608: replay must
280            // match live broadcast).
281            let event = patch_srv_identity(event, &ctx);
282            // Append to the in-memory ring before broadcasting so a
283            // concurrent GetEvents query sees consistent results.
284            // Snapshot / EventList / Error are excluded — meta-events
285            // not state transitions.
286            if !matches!(
287                event,
288                Event::Snapshot { .. }
289                    | Event::EventList { .. }
290                    | Event::SrvSnapshot { .. }
291                    | Event::Error { .. }
292            ) {
293                ctx.event_log.append(event.clone());
294            }
295            let _ = ctx.events_tx.send(event);
296        }
297        if goodbye {
298            tracing::info!(target: "srv-ipc", "[srv-ipc] goodbye from conn_id={} kind={:?} pid={:?}", conn_id, registered_kind, registered_pid);
299            fanout_handle.abort();
300            return;
301        }
302    }
303}
304
305/// Phase E.1b — synthetic Goodbye dispatch for ungraceful disconnects
306/// (EOF / read error before the client sent an explicit Goodbye).
307/// Without this, the reducer's process record stays Running and a
308/// reconnect from the same live PID hits AlreadyRegistered. Goodbye
309/// transitions the record to Exited so re-Register is accepted.
310/// (codex P1 #610.)
311///
312/// Idempotent: handle_goodbye is a no-op if no PID is registered or
313/// the record is already Exited. Errors during the synthetic dispatch
314/// are logged but non-fatal — we're already on a disconnect path.
315#[cfg(target_os = "windows")]
316async fn dispatch_synthetic_goodbye(
317    ctx: &Arc<ServerCtx>,
318    conn_id: u64,
319    registered_pid: Option<u32>,
320) {
321    let Some(pid) = registered_pid else {
322        return;
323    };
324    let now_rfc3339 = chrono::Utc::now().to_rfc3339();
325    let events = {
326        let mut state = ctx.state.lock().await;
327        let rctx = reducer::Ctx {
328            now_rfc3339,
329            conn_id,
330            registered_pid: Some(pid),
331        };
332        reducer::update(&mut state, Command::Goodbye, &rctx)
333    };
334    for event in events {
335        let event = patch_srv_identity(event, ctx);
336        if !matches!(
337            event,
338            Event::Snapshot { .. }
339                | Event::EventList { .. }
340                | Event::SrvSnapshot { .. }
341                | Event::Error { .. }
342        ) {
343            ctx.event_log.append(event.clone());
344        }
345        let _ = ctx.events_tx.send(event);
346    }
347}
348
349static NEXT_CONN_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
350
351/// Same Register-first invariant the launcher enforces.
352#[cfg(target_os = "windows")]
353async fn enforce_register_first(
354    cmd: &Command,
355    registered_kind: &Option<ClientKind>,
356    ctx: &Arc<ServerCtx>,
357) -> Option<Event> {
358    if registered_kind.is_some() {
359        return None;
360    }
361    let (msg, fatal) = match cmd {
362        Command::Register { .. } => return None,
363        Command::Ping { .. } => ("Ping before Register".to_string(), false),
364        Command::GetSrvSnapshot => ("GetSrvSnapshot before Register".to_string(), false),
365        Command::GetEvents { .. } => ("GetEvents before Register".to_string(), false),
366        Command::Goodbye => ("Goodbye before Register".to_string(), true),
367        // Anything else hitting srv pre-Register is also a soft error
368        // — same Ping precedent. The reducer arm for un-accepted
369        // commands also returns a soft InvalidCommand error if the
370        // dispatch reaches it.
371        _ => ("Command before Register".to_string(), false),
372    };
373    // Phase E.1b — connection-private error; sentinel version=0
374    // (codex P2 #610).
375    let v = 0;
376    // Phase E.1b — match launcher's `NotRegistered` error code for
377    // pre-Register violations (vs InvalidCommand which is for
378    // parse/shape problems). Clients dispatching on error code
379    // need consistent semantics across both pipes.
380    // (reagent + codex P2 #610.)
381    Some(Event::Error {
382        code: ErrorCode::NotRegistered,
383        message: msg,
384        fatal,
385        version: v,
386    })
387}
388
389/// Patch the sentinel `launcher_pid: 0` / empty `launcher_version`
390/// the reducer emits in `Event::Registered` with srv's actual
391/// identity. Idempotent — applying twice is a no-op (after the
392/// first patch, fields are non-sentinel).
393fn patch_srv_identity(event: Event, ctx: &Arc<ServerCtx>) -> Event {
394    if let Event::Registered {
395        client_id,
396        launcher_pid,
397        launcher_version,
398        version,
399    } = event
400    {
401        if launcher_pid == 0 && launcher_version.is_empty() {
402            return Event::Registered {
403                client_id,
404                launcher_pid: ctx.srv_pid,
405                launcher_version: ctx.srv_version.clone(),
406                version,
407            };
408        }
409        return Event::Registered {
410            client_id,
411            launcher_pid,
412            launcher_version,
413            version,
414        };
415    }
416    event
417}
418
419#[cfg(target_os = "windows")]
420async fn send_event(
421    writer: &Arc<Mutex<tokio::io::WriteHalf<NamedPipeServer>>>,
422    event: Event,
423) -> std::io::Result<()> {
424    let mut buf = serde_json::to_vec(&event).map_err(|e| {
425        std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
426    })?;
427    buf.push(b'\n');
428    let mut w = writer.lock().await;
429    w.write_all(&buf).await?;
430    w.flush().await?;
431    Ok(())
432}