agentmux_launcher\ipc/
server.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Named-pipe IPC server — accept loop + per-connection handler.
5//
6// Phase B.3: every Command goes through the pure reducer
7// (`crate::reducer::update`) which mutates the shared State and
8// returns a Vec<Event>. The handler then writes those events back
9// over the connection. State is held inside Arc<Mutex<State>> and
10// the mutex is acquired only for the duration of the reducer call —
11// never across an await.
12//
13// What this commit does NOT do:
14//   * Per-subscriber broadcast routing (B.4 splits replies vs broadcasts;
15//     today every event goes back over the originating connection).
16//   * Server-initiated events (no spontaneous emissions yet — only
17//     reducer outputs).
18//   * Persisted client_id (still per-launcher-run).
19//
20// Connection lifecycle: each accepted pipe instance handles one
21// client connection end-to-end. When the client drops, the per-
22// connection task ends and the accept loop continues with a fresh
23// instance.
24
25use std::sync::Arc;
26
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28use tokio::sync::Mutex;
29
30#[cfg(target_os = "windows")]
31use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
32
33use agentmux_common::ipc::{ClientKind, Command, ErrorCode, Event};
34
35use crate::host_pipe::HostPipe;
36use crate::reducer;
37use crate::state::State;
38
39/// State the IPC server shares across connections. Carries the
40/// launcher's identity (for patching into Registered events the
41/// reducer emits with sentinel values) plus the canonical State.
42#[derive(Debug)]
43pub struct ServerCtx {
44    pub launcher_pid: u32,
45    pub launcher_version: String,
46    /// Canonical state owned by the server. Mutex held only during
47    /// reducer dispatch — sub-millisecond.
48    ///
49    /// Phase E.1a — moved from `Mutex<State>` to `Arc<Mutex<State>>`
50    /// so the saga coordinator can share access. The coordinator
51    /// needs `bump_version` when emitting saga lifecycle events and
52    /// will (in E.5) inspect state during saga decisions. Sharing
53    /// via `Arc` keeps the existing single-writer-mutex discipline.
54    pub state: std::sync::Arc<Mutex<State>>,
55    /// Phase B.8 — broadcast bus for reducer-emitted events. Every
56    /// reducer event from `reducer::update` is published here; each
57    /// connection subscribes and writes received events to its own
58    /// pipe. Lets observability clients (`--diag wrr`, future Tools)
59    /// see cross-process activity, not just replies to their own
60    /// commands. Per-connection direct sends (Error replies for
61    /// parse failures, register-first violations) bypass the bus —
62    /// they're response-to-this-client-only by intent. (codex P1
63    /// PR #605.)
64    pub events_tx: tokio::sync::broadcast::Sender<Event>,
65    /// Phase D.2 — event log: in-memory ring of recent reducer
66    /// events (replay source for D.3's `GetEvents`) + disk
67    /// persistence stream for crash forensics. Server appends to
68    /// the in-memory ring synchronously after each reducer
69    /// dispatch; the disk writer is a separate task spawned in
70    /// main.rs that subscribes to the broadcast bus.
71    pub event_log: std::sync::Arc<crate::event_log::EventLog>,
72    /// CPD-2 — launcher → host pipe wrapper. The per-connection
73    /// handler hands the host's writer half to `HostPipe::set_writer`
74    /// once the connecting client registers as `ClientKind::Host`,
75    /// and clears it on disconnect. The host's per-connection event
76    /// fanout task routes events through `HostPipe::send_event`
77    /// instead of `send_event` direct (so frames carry the
78    /// `HostFrame` envelope and traverse the pending-buffer path
79    /// when the host reconnects).
80    pub host_pipe: std::sync::Arc<HostPipe>,
81}
82
83/// Bind the first named-pipe instance synchronously.
84///
85/// Phase B.6: the bind is the single-instance signal. Splitting it
86/// out of `run_ipc_server` lets the caller (main.rs) detect a
87/// collision BEFORE spawning srv/host and surface a user-visible
88/// error ("AgentMux is already running"). `ServerOptions::create`
89/// requires a Tokio runtime context for IOCP registration, so this
90/// must be called from inside `#[tokio::main]` (or any task on the
91/// runtime) — not from a plain sync entrypoint.
92///
93/// On Windows, a second launcher hitting the same pipe gets
94/// `ERROR_ACCESS_DENIED` (raw OS error 5); other errors mean the
95/// pipe namespace itself is misconfigured.
96#[cfg(target_os = "windows")]
97pub fn bind_first_pipe_instance(pipe_name: &str) -> std::io::Result<NamedPipeServer> {
98    ServerOptions::new()
99        .first_pipe_instance(true)
100        .create(pipe_name)
101}
102
103/// Run the named-pipe IPC server until cancelled (or task panics).
104///
105/// Returns a JoinHandle the caller (main.rs) holds for the life of
106/// the launcher. The server keeps accepting until the launcher's
107/// Tokio runtime shuts down.
108///
109/// The first pipe instance is passed in pre-bound by the caller (see
110/// `bind_first_pipe_instance`) so a collision can be surfaced
111/// synchronously before any children are spawned (Phase B.6).
112///
113/// Each accepted connection becomes a new tokio task running
114/// `handle_connection`. The accept loop creates a fresh
115/// `NamedPipeServer` instance for the next client BEFORE spawning
116/// the handler — without this, a slow handler could starve the next
117/// connect. Standard Win32 named-pipe pattern.
118#[cfg(target_os = "windows")]
119pub fn run_ipc_server(
120    pipe_name: String,
121    first: NamedPipeServer,
122    ctx: ServerCtx,
123) -> tokio::task::JoinHandle<()> {
124    tokio::spawn(async move {
125        let ctx = Arc::new(ctx);
126        crate::log(&format!("[ipc] server starting on {}", pipe_name));
127
128        let mut current = first;
129
130        loop {
131            // Wait for a client to connect to the existing instance.
132            // On error: log + recreate the instance + retry. Without
133            // the explicit `continue`, the failed (un-connected) pipe
134            // instance below would be moved into `accepted` and
135            // spawned in a handler that immediately fails to read,
136            // wasting a per-connection task slot. (reagent P1 + codex
137            // P1 PR #573 round-1.)
138            if let Err(e) = current.connect().await {
139                crate::log(&format!("[ipc] connect failed: {} — recreating instance", e));
140                current = match ServerOptions::new().create(&pipe_name) {
141                    Ok(s) => s,
142                    Err(create_err) => {
143                        crate::log(&format!(
144                            "[ipc] FATAL: failed to recreate pipe after connect error: {} (server stopping)",
145                            create_err
146                        ));
147                        return;
148                    }
149                };
150                continue;
151            }
152
153            // Hand the accepted instance to a handler task, then
154            // create the NEXT server instance so the next client
155            // doesn't have to wait for the handler to finish.
156            let accepted = current;
157            current = match ServerOptions::new().create(&pipe_name) {
158                Ok(s) => s,
159                Err(e) => {
160                    crate::log(&format!(
161                        "[ipc] FATAL: failed to create next pipe instance: {} (server stopping)",
162                        e
163                    ));
164                    // Drain the accepted client, then bail.
165                    tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
166                    return;
167                }
168            };
169
170            tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
171        }
172    })
173}
174
175#[cfg(not(target_os = "windows"))]
176pub fn run_ipc_server(
177    _pipe_name: String,
178    _ctx: ServerCtx,
179) -> tokio::task::JoinHandle<()> {
180    // Non-Windows: pipe IPC isn't built yet. Phase 7 of the broader
181    // tear-off cross-platform work (separate spec) will add Unix
182    // domain socket support. For now return an immediately-finished
183    // task so the caller can hold a handle uniformly.
184    tokio::spawn(async {})
185}
186
187/// Drive one connection: read newline-delimited JSON Commands,
188/// write back JSON Events. First message MUST be `Register`.
189///
190/// Phase B.3: every Command goes through `reducer::update`. The
191/// reducer is sync; we hold the state mutex only while it runs.
192/// Events come back from the reducer as Vec<Event>; we patch sentinel
193/// fields (Registered.launcher_pid / launcher_version — the reducer
194/// can't read those) and publish them on the broadcast bus.
195///
196/// Phase B.8 — events from the reducer flow through the broadcast
197/// bus (`ctx.events_tx`); a per-connection fanout task subscribes
198/// and writes events to this connection's pipe. Per-connection
199/// direct writes are reserved for "response-to-this-client-only"
200/// errors (parse failure, register-first violation). (codex P1
201/// PR #605.)
202#[cfg(target_os = "windows")]
203async fn handle_connection(stream: NamedPipeServer, ctx: Arc<ServerCtx>) {
204    let (read_half, write_half) = tokio::io::split(stream);
205    // CPD-2 — wrap the writer in the HostPipe-compatible
206    // `Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>` shape so the
207    // SAME writer is reachable by:
208    //   1. The per-connection main loop (connection-private error
209    //      replies via `send_event_shared`).
210    //   2. The per-connection fanout task (broadcast-bus events).
211    //   3. (For the host connection only) `HostPipe::send_command` /
212    //      `HostPipe::send_event` after the host registers.
213    // The Mutex serializes writes, so frames can't interleave.
214    let boxed: crate::host_pipe::BoxedWriter = Box::new(write_half);
215    let writer: crate::host_pipe::SharedWriter = crate::host_pipe::make_shared_writer(boxed);
216    let reader = BufReader::new(read_half);
217    let mut lines = reader.lines();
218
219    // Per-connection state the server (not reducer) tracks: have we
220    // seen a Register yet? Reducer-level dedup is keyed by PID across
221    // all connections; this is the per-connection enforcement so a
222    // single connection can't send Ping before Register.
223    let mut registered_kind: Option<ClientKind> = None;
224    let mut registered_pid: Option<u32> = None;
225    // Connection ID is server-allocated (not state-allocated) and
226    // exists only for log correlation; the reducer-allocated
227    // client_id (returned in Registered) is the wire-visible one.
228    let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
229
230    // Phase B.8 — fanout task: subscribe to the server-wide event
231    // bus and write each event to THIS connection's pipe. Started
232    // before any commands are processed so a registered client can
233    // start receiving events from concurrent activity immediately.
234    // Aborted when the connection's read loop returns (below).
235    //
236    // CPD-2 design (round 4): events are written DIRECTLY to this
237    // connection's per-connection writer — NOT routed through
238    // `HostPipe`. HostPipe exists for *commands* (saga-issued
239    // launcher → host actions, which CPD-3 will wire). Events stay
240    // on the existing direct-write path because:
241    //   1. Wire format compat — host's parser expects raw `Event`
242    //      JSON, not `HostFrame::Event` envelopes (codex P1 round 3).
243    //      CPD-3 will update host's parser AND swap the fanout to
244    //      HostFrame envelopes together.
245    //   2. No need for HostPipe's pending-buffer / 30s-timeout
246    //      semantics on events: events are broadcast-driven, every
247    //      subscriber gets them, and stale events post-reconnect
248    //      would be wrong anyway.
249    //   3. Each per-connection fanout writes to its OWN writer —
250    //      no global-writer race / stale-fanout issue (which is
251    //      what `host_session_id` would have guarded against, but
252    //      isn't needed when fanouts are connection-local).
253    let fanout_handle = {
254        let writer = Arc::clone(&writer);
255        let ctx = Arc::clone(&ctx);
256        let mut events_rx = ctx.events_tx.subscribe();
257        tokio::spawn(async move {
258            loop {
259                match events_rx.recv().await {
260                    Ok(event) => {
261                        // Identity is patched at the publisher (before
262                        // log + bus). No need to re-patch here.
263                        // Errors here mean the pipe is closed; the
264                        // read loop will detect EOF on the next
265                        // iteration. Swallow + continue to drain
266                        // any remaining buffered events so we don't
267                        // accidentally hold onto channel slots.
268                        if send_event_shared(&writer, event).await.is_err() {
269                            return;
270                        }
271                    }
272                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
273                        // Slow client missed `n` events. Phase D's
274                        // GetSnapshot resync covers this case
275                        // properly; for now the client has to
276                        // reconnect to recover. Log so operators
277                        // can see when this happens.
278                        crate::log(&format!(
279                            "[ipc] conn_id={} lagged event bus, missed {} events",
280                            conn_id, n
281                        ));
282                    }
283                    Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
284                }
285            }
286        })
287    };
288
289    loop {
290        let line = match lines.next_line().await {
291            Ok(Some(l)) => l,
292            Ok(None) => {
293                crate::log(&format!(
294                    "[ipc] connection conn_id={} closed (kind={:?}, pid={:?})",
295                    conn_id, registered_kind, registered_pid
296                ));
297                // CPD-2 — clear HostPipe writer if this was the host
298                // connection so subsequent saga commands buffer
299                // (up to 64) until the host reconnects (or fail at
300                // the 30s timeout). Idempotent if not the host.
301                if matches!(registered_kind, Some(ClientKind::Host)) {
302                    ctx.host_pipe.clear_writer().await;
303                }
304                // Phase E.1b — synthetic Goodbye on ungraceful
305                // disconnect so the reducer marks the PID Exited;
306                // otherwise reconnect-from-same-PID hits
307                // AlreadyRegistered. (codex P1 #610.)
308                dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
309                fanout_handle.abort();
310                return;
311            }
312            Err(e) => {
313                crate::log(&format!(
314                    "[ipc] read error conn_id={}: {}",
315                    conn_id, e
316                ));
317                if matches!(registered_kind, Some(ClientKind::Host)) {
318                    ctx.host_pipe.clear_writer().await;
319                }
320                dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
321                fanout_handle.abort();
322                return;
323            }
324        };
325
326        if line.trim().is_empty() {
327            continue;
328        }
329
330        let cmd = match serde_json::from_str::<Command>(&line) {
331            Ok(c) => c,
332            Err(e) => {
333                // Phase E.1b — parse errors are connection-private
334                // (sent only to the offender, not broadcast, not
335                // appended to the event log). Don't bump the global
336                // event_version: other subscribers would see version
337                // gaps and treat them as missed events. Use 0 as a
338                // sentinel for "not part of the ordered stream."
339                // (codex P2 #610.)
340                let _ = send_event_shared(
341                    &writer,
342                    Event::Error {
343                        code: ErrorCode::InvalidCommand,
344                        message: format!("parse failed: {}", e),
345                        fatal: false,
346                        version: 0,
347                    },
348                )
349                .await;
350                continue;
351            }
352        };
353
354        // Per-connection invariants: enforce here so reducer doesn't
355        // need to know about connection identity. Honor the `fatal`
356        // bit — Ping-before-Register is non-fatal (clients can
357        // recover by sending Register next), Goodbye-before-Register
358        // is fatal (can't recover from a closed-by-them connection).
359        // (reagent P1 + codex P1 PR #574 round-1.)
360        if let Some(reply) = enforce_register_first(&cmd, &registered_kind).await {
361            let close = matches!(&reply, Event::Error { fatal: true, .. });
362            let _ = send_event_shared(&writer, reply).await;
363            if close {
364                fanout_handle.abort();
365                return;
366            }
367            continue;
368        }
369        if let Command::Register { .. } = &cmd {
370            if registered_kind.is_some() {
371                // Phase E.1b — connection-private error; same
372                // version-sentinel rationale as parse-error path
373                // above. (codex P2 #610.)
374                let _ = send_event_shared(
375                    &writer,
376                    Event::Error {
377                        code: ErrorCode::AlreadyRegistered,
378                        message: "Register sent twice on the same connection".into(),
379                        fatal: false,
380                        version: 0,
381                    },
382                )
383                .await;
384                continue;
385            }
386        }
387
388        // Track local registration before dispatch so we can update
389        // our per-connection state if the reducer accepts it. The
390        // reducer's PID-uniqueness check might reject the Register;
391        // we re-check the events for that case below.
392        let pre_register = if let Command::Register { kind, pid, .. } = &cmd {
393            Some((*kind, *pid))
394        } else {
395            None
396        };
397
398        // Phase D.3 — `GetEvents { since }` is handled here, not in
399        // the reducer. The reducer is pure (no I/O); querying the
400        // event log is a non-mutating read against the in-memory
401        // ring + disk fallback.
402        //
403        // Phase E.1b — the reply (`Event::EventList`) is sent
404        // DIRECTLY to the requesting connection, NOT broadcast on
405        // the shared bus. EventList is request/response, not a
406        // state transition; broadcasting it would force every
407        // subscriber to process foreign replay payloads
408        // (potentially treating them as their own catch-up data,
409        // duplicating state application). (codex P1 #610.)
410        if let Command::GetEvents { since } = &cmd {
411            // Phase E.1b — read the current version WITHOUT bumping
412            // (codex P2 #610). EventList is connection-private; the
413            // version it carries is the "as-of" point for the
414            // requester's next resync, not a new state-transition
415            // marker. Bumping would create a global gap that other
416            // subscribers see as missed events.
417            let v = {
418                let state = ctx.state.lock().await;
419                state.event_version
420            };
421            let replay = ctx.event_log.events_since(*since);
422            // Don't log truncation as an error — it's a valid
423            // "subscriber missed events that have already been
424            // evicted" signal; the subscriber's resync logic
425            // handles it (re-fetch a fresh snapshot).
426            if ctx.event_log.replay_truncated(*since) {
427                crate::log(&format!(
428                    "[ipc] conn_id={} GetEvents since={} truncated (oldest retained event > since+1)",
429                    conn_id, since
430                ));
431            }
432            let _ = send_event_shared(
433                &writer,
434                Event::EventList {
435                    events: replay,
436                    version: v,
437                },
438            )
439            .await;
440            continue;
441        }
442
443        // Dispatch through the reducer. Mutex held briefly — compute
444        // the timestamp BEFORE acquiring so syscalls + string
445        // formatting don't show up in lock-hold time. (gemini
446        // MEDIUM @ server.rs:259, PR #574 round-1.)
447        let now_rfc3339 = chrono::Utc::now().to_rfc3339();
448        // Phase B.9.1 — monotonic ms since launcher start. Used by
449        // the WRR arm for per-window observability ages.
450        // `LAUNCHER_START_INSTANT` is a once-init `Instant`; the
451        // first request seeds it, subsequent ones read its delta.
452        let now_ms = launcher_start_ms();
453        let events = {
454            let mut state = ctx.state.lock().await;
455            let rctx = reducer::Ctx {
456                now_rfc3339,
457                conn_id,
458                registered_pid,
459                now_ms,
460            };
461            reducer::update(&mut state, cmd.clone(), &rctx)
462        };
463
464        // If the reducer accepted the Register (no AlreadyRegistered
465        // error in the output), commit the local connection state.
466        if let Some((kind, pid)) = pre_register {
467            let rejected = events
468                .iter()
469                .any(|e| matches!(e, Event::Error { code: ErrorCode::AlreadyRegistered, .. }));
470            if !rejected {
471                registered_kind = Some(kind);
472                registered_pid = Some(pid);
473                // CPD-2 — host registration: install this connection's
474                // writer into the launcher's HostPipe wrapper and flip
475                // the fanout task to route through it. Subsequent
476                // events for this connection traverse
477                // `HostPipe::send_event` (HostFrame::Event envelope +
478                // pending-buffer-on-disconnect semantics) instead of
479                // the legacy direct write. Drains any frames buffered
480                // since the prior host disconnect (FIFO).
481                if kind == ClientKind::Host {
482                    // Install the host's writer half into HostPipe so
483                    // saga-issued commands (CPD-3+) can be transmitted
484                    // and so any pending Command frames buffered during
485                    // a prior host disconnect drain in FIFO order.
486                    // Returns a session_id we don't need today (events
487                    // bypass HostPipe — see fanout task above), but the
488                    // counter is in place for future code that does.
489                    let session = ctx
490                        .host_pipe
491                        .set_writer(std::sync::Arc::clone(&writer))
492                        .await;
493                    crate::log(&format!(
494                        "[ipc] conn_id={} host registered (session={}) — HostPipe writer installed",
495                        conn_id, session
496                    ));
497                }
498            }
499        }
500
501        // Phase B.8 — publish reducer events on the broadcast bus
502        // instead of writing them directly to this connection. The
503        // per-connection fanout task (spawned above) subscribes and
504        // writes them to its own pipe — including this connection,
505        // which sees its own events back. Drift events still log at
506        // the launcher level so operators see them regardless of
507        // subscriber wiring. (codex P1 PR #605.)
508        let goodbye = matches!(cmd, Command::Goodbye);
509        for event in events {
510            if let Event::DriftDetected {
511                kind,
512                host_count,
513                mirror_count,
514                ..
515            } = &event
516            {
517                crate::log(&format!(
518                    "[ipc] DRIFT {:?}: host={} mirror={} (conn_id={})",
519                    kind, host_count, mirror_count, conn_id
520                ));
521            }
522            if let Event::HwndDriftDetected {
523                kind,
524                label,
525                hwnd,
526                detail,
527                severity,
528                ..
529            } = &event
530            {
531                crate::log(&format!(
532                    "[ipc] WRR-DRIFT [{:?}] {:?} label={:?} hwnd={:?}: {} (conn_id={})",
533                    severity, kind, label, hwnd, detail, conn_id
534                ));
535            }
536            // Phase E.1a (codex P2 #608) — patch sentinel identity
537            // BEFORE appending to the log. Reducer emits
538            // `Event::Registered { launcher_pid: 0, launcher_version: "" }`
539            // because it doesn't know the launcher's identity; the
540            // server fills it in. Pre-fix, the patch happened only at
541            // per-connection write, so `GetEvents` replay returned
542            // stored sentinels, inconsistent with live broadcast.
543            let event = patch_launcher_identity(event, &ctx);
544
545            // Phase D.2 — append to the in-memory ring BEFORE
546            // broadcasting so a connection's GetEvents query that
547            // races a just-published event sees consistent
548            // results. Disk persistence (separate task) is best-
549            // effort and may lag. Snapshot / EventList variants
550            // are NOT appended — they're meta-events about the
551            // event stream itself; including them would create
552            // recursive replay (an EventList containing EventLists
553            // is meaningless). Errors are also skipped — they're
554            // per-client diagnostics, not state transitions.
555            if !matches!(event, Event::Snapshot { .. } | Event::EventList { .. } | Event::Error { .. }) {
556                ctx.event_log.append(event.clone());
557            }
558            // Send may fail when no receivers exist (e.g., during
559            // shutdown). That's fine — events are advisory in that
560            // window and the per-connection fanout tasks own retry
561            // semantics via subscribe().
562            let _ = ctx.events_tx.send(event);
563        }
564        if goodbye {
565            crate::log(&format!(
566                "[ipc] goodbye from conn_id={} kind={:?} pid={:?}",
567                conn_id, registered_kind, registered_pid
568            ));
569            // CPD-2 — clear HostPipe writer on graceful goodbye too
570            // so a host that re-registers via a fresh connection can
571            // re-install cleanly. Idempotent if not host.
572            if matches!(registered_kind, Some(ClientKind::Host)) {
573                ctx.host_pipe.clear_writer().await;
574            }
575            fanout_handle.abort();
576            return;
577        }
578    }
579}
580
581/// Per-connection counter for log-correlation IDs (NOT the wire
582/// client_id — that comes from the reducer). Allocated even for
583/// pre-Register failures so log lines can be correlated.
584static NEXT_CONN_ID: std::sync::atomic::AtomicU64 =
585    std::sync::atomic::AtomicU64::new(1);
586
587/// Phase E.1b — synthetic Goodbye dispatch for ungraceful disconnects
588/// (EOF / read error before the client sent an explicit Goodbye).
589/// Without this, the reducer's process record stays Running and a
590/// reconnect from the same live PID hits AlreadyRegistered.
591/// (codex P1 #610.)
592#[cfg(target_os = "windows")]
593async fn dispatch_synthetic_goodbye(
594    ctx: &Arc<ServerCtx>,
595    conn_id: u64,
596    registered_pid: Option<u32>,
597) {
598    let Some(pid) = registered_pid else {
599        return;
600    };
601    let now_rfc3339 = chrono::Utc::now().to_rfc3339();
602    let now_ms = launcher_start_ms();
603    let events = {
604        let mut state = ctx.state.lock().await;
605        let rctx = reducer::Ctx {
606            now_rfc3339,
607            conn_id,
608            registered_pid: Some(pid),
609            now_ms,
610        };
611        reducer::update(&mut state, Command::Goodbye, &rctx)
612    };
613    for event in events {
614        let event = patch_launcher_identity(event, ctx);
615        if !matches!(
616            event,
617            Event::Snapshot { .. } | Event::EventList { .. } | Event::Error { .. }
618        ) {
619            ctx.event_log.append(event.clone());
620        }
621        let _ = ctx.events_tx.send(event);
622    }
623}
624
625/// Phase B.9.1 — milliseconds since the launcher's IPC server
626/// started (first call seeds the epoch). Used as the monotonic
627/// clock for WRR observability timestamps in `reducer::Ctx::now_ms`.
628/// Distinct from `chrono::Utc::now()` because the WRR arm wants
629/// elapsed time, not wall clock — and we don't want clock-skew
630/// jitter (NTP adjustment, DST) showing up as drift.
631fn launcher_start_ms() -> u64 {
632    static START: std::sync::OnceLock<std::time::Instant> = std::sync::OnceLock::new();
633    let start = START.get_or_init(std::time::Instant::now);
634    start.elapsed().as_millis() as u64
635}
636
637/// Enforce the "first message must be Register" invariant. Returns
638/// `Some(Event::Error)` if the command violates the contract; the
639/// caller sends it and closes the connection.
640#[cfg(target_os = "windows")]
641async fn enforce_register_first(
642    cmd: &Command,
643    registered_kind: &Option<ClientKind>,
644) -> Option<Event> {
645    // F.7 cleanup audit: prior signature accepted `ctx: &Arc<ServerCtx>`
646    // for symmetry with neighboring helpers, but no body site read it.
647    // Dropped to silence the unused-variable warning without an allow.
648    if registered_kind.is_some() {
649        return None;
650    }
651    let (msg, fatal) = match cmd {
652        Command::Register { .. } => return None,
653        Command::Ping { .. } => ("Ping before Register".to_string(), false),
654        Command::Goodbye => ("Goodbye before Register".to_string(), true),
655        Command::ReportWindowOpened { .. } => {
656            ("ReportWindowOpened before Register".to_string(), true)
657        }
658        Command::ReportWindowClosed { .. } => {
659            ("ReportWindowClosed before Register".to_string(), true)
660        }
661        Command::ReportPoolWindowAdded { .. } => {
662            ("ReportPoolWindowAdded before Register".to_string(), true)
663        }
664        Command::ReportPoolWindowRemoved { .. } => {
665            ("ReportPoolWindowRemoved before Register".to_string(), true)
666        }
667        // Phase F.5 — host-only report; gate matches the other pool-
668        // mirror reports above.
669        Command::ReportPoolWindowPromoted { .. } => {
670            ("ReportPoolWindowPromoted before Register".to_string(), true)
671        }
672        // Phase F.5 — `SpawnPoolWindow` is a launcher→host direction
673        // command. Sent to the launcher pipe before Register: same
674        // soft-error treatment as the srv-pipe misroutes (the client
675        // can recover by registering or routing correctly).
676        Command::SpawnPoolWindow { .. } => {
677            ("SpawnPoolWindow is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
678        }
679        // Phase F.6 — host-only reports; same fatal-before-Register
680        // treatment as the other window-mirror reports above.
681        Command::ReportPanesReaped { .. } => {
682            ("ReportPanesReaped before Register".to_string(), true)
683        }
684        Command::ReportPoolDrainDecision { .. } => {
685            ("ReportPoolDrainDecision before Register".to_string(), true)
686        }
687        // Phase F.6 — launcher→host direction commands. Same
688        // soft-error treatment as `SpawnPoolWindow` above.
689        Command::ReapPanes { .. } => {
690            ("ReapPanes is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
691        }
692        Command::DrainPoolIfLast { .. } => {
693            ("DrainPoolIfLast is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
694        }
695        // Phase CPD-1 — host-only saga-action-failed report. Same
696        // fatal-before-Register treatment as the other Report*
697        // commands above.
698        Command::ReportSagaActionFailed { .. } => {
699            ("ReportSagaActionFailed before Register".to_string(), true)
700        }
701        Command::ReportHostCounts { .. } => {
702            ("ReportHostCounts before Register".to_string(), true)
703        }
704        Command::ReportHostPoolCount { .. } => {
705            ("ReportHostPoolCount before Register".to_string(), true)
706        }
707        Command::ReportBackendWindowIdRegistered { .. } => {
708            (
709                "ReportBackendWindowIdRegistered before Register".to_string(),
710                true,
711            )
712        }
713        Command::ReportBackendWindowIdUnregistered { .. } => {
714            (
715                "ReportBackendWindowIdUnregistered before Register".to_string(),
716                true,
717            )
718        }
719        // Phase B.9.1 (WRR) — host-only Win32 reality reports.
720        Command::ReportHwndOpened { .. } => {
721            ("ReportHwndOpened before Register".to_string(), true)
722        }
723        Command::ReportHwndDestroyed { .. } => {
724            ("ReportHwndDestroyed before Register".to_string(), true)
725        }
726        Command::ReportHwndVisibilityChanged { .. } => {
727            (
728                "ReportHwndVisibilityChanged before Register".to_string(),
729                true,
730            )
731        }
732        Command::ReportHwndForegroundChanged { .. } => {
733            (
734                "ReportHwndForegroundChanged before Register".to_string(),
735                true,
736            )
737        }
738        Command::ReportHwndIconicChanged { .. } => {
739            ("ReportHwndIconicChanged before Register".to_string(), true)
740        }
741        Command::ReportHwndPositionChanged { .. } => {
742            (
743                "ReportHwndPositionChanged before Register".to_string(),
744                true,
745            )
746        }
747        Command::ReportMonitorTopologyChanged { .. } => {
748            (
749                "ReportMonitorTopologyChanged before Register".to_string(),
750                true,
751            )
752        }
753        // Phase D.1 — GetSnapshot before Register is non-fatal: any
754        // sane diagnostic client can fix it by retrying with Register
755        // first. (Same Ping-before-Register precedent — soft error.)
756        Command::GetSnapshot => ("GetSnapshot before Register".to_string(), false),
757        // Phase D.3 — GetEvents before Register: same non-fatal
758        // semantics as GetSnapshot.
759        Command::GetEvents { .. } => ("GetEvents before Register".to_string(), false),
760        // Phase E.1b — GetSrvSnapshot is a srv-pipe command; if a
761        // client sends it to the launcher pipe by mistake, soft
762        // error: not the launcher's command.
763        Command::GetSrvSnapshot => (
764            "GetSrvSnapshot is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
765            false,
766        ),
767        // Phase E.2 — srv-pipe commands sent to launcher pipe by
768        // mistake. Soft error — clients can recover by routing to
769        // the right pipe.
770        Command::CreateWorkspace { .. } => (
771            "CreateWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
772            false,
773        ),
774        Command::DeleteWorkspace { .. } => (
775            "DeleteWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
776            false,
777        ),
778        // Phase E.2b — Tab arms are also srv-pipe commands.
779        Command::CreateTab { .. } => (
780            "CreateTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
781            false,
782        ),
783        Command::DeleteTab { .. } => (
784            "DeleteTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
785            false,
786        ),
787        Command::SetActiveTab { .. } => (
788            "SetActiveTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
789            false,
790        ),
791        Command::ReorderTab { .. } => (
792            "ReorderTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
793            false,
794        ),
795        // Phase E.5 — window↔workspace mapping commands are srv-pipe.
796        Command::CreateWindow { .. } => (
797            "CreateWindow is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
798            false,
799        ),
800        Command::CloseWindowInternal { .. } => (
801            "CloseWindowInternal is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
802            false,
803        ),
804        Command::SwitchWorkspace { .. } => (
805            "SwitchWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
806            false,
807        ),
808        // Phase E.5.3 — atomic single-step domain commands are srv-pipe.
809        Command::ReorderTabsBulk { .. } => (
810            "ReorderTabsBulk is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
811            false,
812        ),
813        Command::RenameWorkspace { .. } => (
814            "RenameWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
815            false,
816        ),
817        Command::RenameTab { .. } => (
818            "RenameTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
819            false,
820        ),
821        Command::UpdateWorkspaceMeta { .. } => (
822            "UpdateWorkspaceMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
823            false,
824        ),
825        Command::UpdateTabMeta { .. } => (
826            "UpdateTabMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
827            false,
828        ),
829        Command::UpdateBlockMeta { .. } => (
830            "UpdateBlockMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
831            false,
832        ),
833        // Phase E.3 — Block arms are also srv-pipe commands.
834        Command::CreateBlock { .. } => (
835            "CreateBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
836            false,
837        ),
838        Command::DeleteBlock { .. } => (
839            "DeleteBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
840            false,
841        ),
842        // Phase E.5.5 — saga-driven move commands are srv-pipe.
843        Command::MoveTab { .. } => (
844            "MoveTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
845            false,
846        ),
847        Command::MoveBlock { .. } => (
848            "MoveBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
849            false,
850        ),
851        // Phase E.4 (Option A) — layout focused/magnified setters are srv-pipe.
852        Command::SetFocusedNode { .. } => (
853            "SetFocusedNode is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
854            false,
855        ),
856        Command::SetMagnifiedNode { .. } => (
857            "SetMagnifiedNode is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
858            false,
859        ),
860        // Phase E.4.B — all layout-tree commands are srv-pipe only.
861        Command::LayoutInsertNode { .. }
862        | Command::LayoutInsertNodeAtIndex { .. }
863        | Command::LayoutDeleteNode { .. }
864        | Command::LayoutMoveNode { .. }
865        | Command::LayoutSwapNodes { .. }
866        | Command::LayoutResizeNodes { .. }
867        | Command::LayoutReplaceNode { .. }
868        | Command::LayoutSplitHorizontal { .. }
869        | Command::LayoutSplitVertical { .. }
870        | Command::LayoutClear { .. }
871        | Command::LayoutSetTree { .. } => (
872            "Layout command is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
873            false,
874        ),
875        Command::UpdateWindowMeta { .. } => (
876            "UpdateWindowMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
877            false,
878        ),
879    };
880    // Phase E.1b — connection-private error; sentinel version=0
881    // (codex P2 #610). See parse-error path for rationale.
882    let v = 0;
883    Some(Event::Error {
884        code: ErrorCode::NotRegistered,
885        message: msg,
886        fatal,
887        version: v,
888    })
889}
890
891/// Patch `launcher_pid` + `launcher_version` into `Event::Registered`.
892/// The reducer leaves these as sentinels (it can't read env without
893/// breaking determinism) — the server fills them in here, just
894/// before serializing to the wire.
895fn patch_launcher_identity(event: Event, ctx: &Arc<ServerCtx>) -> Event {
896    if let Event::Registered {
897        client_id, version, ..
898    } = event
899    {
900        Event::Registered {
901            client_id,
902            launcher_pid: ctx.launcher_pid,
903            launcher_version: ctx.launcher_version.clone(),
904            version,
905        }
906    } else {
907        event
908    }
909}
910
911/// Serialize an Event as one JSON line + `\n` and write atomically
912/// (under the per-connection writer mutex). Returns Err if the
913/// connection died mid-write.
914///
915/// CPD-2 — generalized from `Arc<Mutex<WriteHalf<NamedPipeServer>>>`
916/// to `crate::host_pipe::SharedWriter` so the launcher's IPC server
917/// + the host_pipe wrapper share one writer-handle representation.
918/// Wire shape is unchanged: a non-host client sees a raw `Event` JSON
919/// line (legacy schema), a host client sees a `HostFrame::Event`
920/// envelope only when the frame goes through `HostPipe::send_event`.
921/// Connection-private error replies in this file still emit raw
922/// `Event` JSON to preserve backwards compat with existing host
923/// versions that haven't adopted the envelope yet — CPD-1 lands the
924/// host-side schema migration.
925#[cfg(target_os = "windows")]
926async fn send_event_shared(
927    writer: &crate::host_pipe::SharedWriter,
928    event: Event,
929) -> std::io::Result<()> {
930    let mut buf = serde_json::to_vec(&event).map_err(|e| {
931        std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
932    })?;
933    buf.push(b'\n');
934    let mut w = writer.lock().await;
935    w.write_all(&buf).await?;
936    w.flush().await?;
937    Ok(())
938}