agentmux_launcher\ipc/server.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Named-pipe IPC server — accept loop + per-connection handler.
5//
6// Phase B.3: every Command goes through the pure reducer
7// (`crate::reducer::update`) which mutates the shared State and
8// returns a Vec<Event>. The handler then writes those events back
9// over the connection. State is held inside Arc<Mutex<State>> and
10// the mutex is acquired only for the duration of the reducer call —
11// never across an await.
12//
13// What this commit does NOT do:
14// * Per-subscriber broadcast routing (B.4 splits replies vs broadcasts;
15// today every event goes back over the originating connection).
16// * Server-initiated events (no spontaneous emissions yet — only
17// reducer outputs).
18// * Persisted client_id (still per-launcher-run).
19//
20// Connection lifecycle: each accepted pipe instance handles one
21// client connection end-to-end. When the client drops, the per-
22// connection task ends and the accept loop continues with a fresh
23// instance.
24
25use std::sync::Arc;
26
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28use tokio::sync::Mutex;
29
30#[cfg(target_os = "windows")]
31use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
32
33use agentmux_common::ipc::{ClientKind, Command, ErrorCode, Event};
34
35use crate::host_pipe::HostPipe;
36use crate::reducer;
37use crate::state::State;
38
39/// State the IPC server shares across connections. Carries the
40/// launcher's identity (for patching into Registered events the
41/// reducer emits with sentinel values) plus the canonical State.
42#[derive(Debug)]
43pub struct ServerCtx {
44 pub launcher_pid: u32,
45 pub launcher_version: String,
46 /// Canonical state owned by the server. Mutex held only during
47 /// reducer dispatch — sub-millisecond.
48 ///
49 /// Phase E.1a — moved from `Mutex<State>` to `Arc<Mutex<State>>`
50 /// so the saga coordinator can share access. The coordinator
51 /// needs `bump_version` when emitting saga lifecycle events and
52 /// will (in E.5) inspect state during saga decisions. Sharing
53 /// via `Arc` keeps the existing single-writer-mutex discipline.
54 pub state: std::sync::Arc<Mutex<State>>,
55 /// Phase B.8 — broadcast bus for reducer-emitted events. Every
56 /// reducer event from `reducer::update` is published here; each
57 /// connection subscribes and writes received events to its own
58 /// pipe. Lets observability clients (`--diag wrr`, future Tools)
59 /// see cross-process activity, not just replies to their own
60 /// commands. Per-connection direct sends (Error replies for
61 /// parse failures, register-first violations) bypass the bus —
62 /// they're response-to-this-client-only by intent. (codex P1
63 /// PR #605.)
64 pub events_tx: tokio::sync::broadcast::Sender<Event>,
65 /// Phase D.2 — event log: in-memory ring of recent reducer
66 /// events (replay source for D.3's `GetEvents`) + disk
67 /// persistence stream for crash forensics. Server appends to
68 /// the in-memory ring synchronously after each reducer
69 /// dispatch; the disk writer is a separate task spawned in
70 /// main.rs that subscribes to the broadcast bus.
71 pub event_log: std::sync::Arc<crate::event_log::EventLog>,
72 /// CPD-2 — launcher → host pipe wrapper. The per-connection
73 /// handler hands the host's writer half to `HostPipe::set_writer`
74 /// once the connecting client registers as `ClientKind::Host`,
75 /// and clears it on disconnect. The host's per-connection event
76 /// fanout task routes events through `HostPipe::send_event`
77 /// instead of `send_event` direct (so frames carry the
78 /// `HostFrame` envelope and traverse the pending-buffer path
79 /// when the host reconnects).
80 pub host_pipe: std::sync::Arc<HostPipe>,
81}
82
83/// Bind the first named-pipe instance synchronously.
84///
85/// Phase B.6: the bind is the single-instance signal. Splitting it
86/// out of `run_ipc_server` lets the caller (main.rs) detect a
87/// collision BEFORE spawning srv/host and surface a user-visible
88/// error ("AgentMux is already running"). `ServerOptions::create`
89/// requires a Tokio runtime context for IOCP registration, so this
90/// must be called from inside `#[tokio::main]` (or any task on the
91/// runtime) — not from a plain sync entrypoint.
92///
93/// On Windows, a second launcher hitting the same pipe gets
94/// `ERROR_ACCESS_DENIED` (raw OS error 5); other errors mean the
95/// pipe namespace itself is misconfigured.
96#[cfg(target_os = "windows")]
97pub fn bind_first_pipe_instance(pipe_name: &str) -> std::io::Result<NamedPipeServer> {
98 ServerOptions::new()
99 .first_pipe_instance(true)
100 .create(pipe_name)
101}
102
103/// Run the named-pipe IPC server until cancelled (or task panics).
104///
105/// Returns a JoinHandle the caller (main.rs) holds for the life of
106/// the launcher. The server keeps accepting until the launcher's
107/// Tokio runtime shuts down.
108///
109/// The first pipe instance is passed in pre-bound by the caller (see
110/// `bind_first_pipe_instance`) so a collision can be surfaced
111/// synchronously before any children are spawned (Phase B.6).
112///
113/// Each accepted connection becomes a new tokio task running
114/// `handle_connection`. The accept loop creates a fresh
115/// `NamedPipeServer` instance for the next client BEFORE spawning
116/// the handler — without this, a slow handler could starve the next
117/// connect. Standard Win32 named-pipe pattern.
118#[cfg(target_os = "windows")]
119pub fn run_ipc_server(
120 pipe_name: String,
121 first: NamedPipeServer,
122 ctx: ServerCtx,
123) -> tokio::task::JoinHandle<()> {
124 tokio::spawn(async move {
125 let ctx = Arc::new(ctx);
126 crate::log(&format!("[ipc] server starting on {}", pipe_name));
127
128 let mut current = first;
129
130 loop {
131 // Wait for a client to connect to the existing instance.
132 // On error: log + recreate the instance + retry. Without
133 // the explicit `continue`, the failed (un-connected) pipe
134 // instance below would be moved into `accepted` and
135 // spawned in a handler that immediately fails to read,
136 // wasting a per-connection task slot. (reagent P1 + codex
137 // P1 PR #573 round-1.)
138 if let Err(e) = current.connect().await {
139 crate::log(&format!("[ipc] connect failed: {} — recreating instance", e));
140 current = match ServerOptions::new().create(&pipe_name) {
141 Ok(s) => s,
142 Err(create_err) => {
143 crate::log(&format!(
144 "[ipc] FATAL: failed to recreate pipe after connect error: {} (server stopping)",
145 create_err
146 ));
147 return;
148 }
149 };
150 continue;
151 }
152
153 // Hand the accepted instance to a handler task, then
154 // create the NEXT server instance so the next client
155 // doesn't have to wait for the handler to finish.
156 let accepted = current;
157 current = match ServerOptions::new().create(&pipe_name) {
158 Ok(s) => s,
159 Err(e) => {
160 crate::log(&format!(
161 "[ipc] FATAL: failed to create next pipe instance: {} (server stopping)",
162 e
163 ));
164 // Drain the accepted client, then bail.
165 tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
166 return;
167 }
168 };
169
170 tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
171 }
172 })
173}
174
175#[cfg(not(target_os = "windows"))]
176pub fn run_ipc_server(
177 _pipe_name: String,
178 _ctx: ServerCtx,
179) -> tokio::task::JoinHandle<()> {
180 // Non-Windows: pipe IPC isn't built yet. Phase 7 of the broader
181 // tear-off cross-platform work (separate spec) will add Unix
182 // domain socket support. For now return an immediately-finished
183 // task so the caller can hold a handle uniformly.
184 tokio::spawn(async {})
185}
186
187/// Drive one connection: read newline-delimited JSON Commands,
188/// write back JSON Events. First message MUST be `Register`.
189///
190/// Phase B.3: every Command goes through `reducer::update`. The
191/// reducer is sync; we hold the state mutex only while it runs.
192/// Events come back from the reducer as Vec<Event>; we patch sentinel
193/// fields (Registered.launcher_pid / launcher_version — the reducer
194/// can't read those) and publish them on the broadcast bus.
195///
196/// Phase B.8 — events from the reducer flow through the broadcast
197/// bus (`ctx.events_tx`); a per-connection fanout task subscribes
198/// and writes events to this connection's pipe. Per-connection
199/// direct writes are reserved for "response-to-this-client-only"
200/// errors (parse failure, register-first violation). (codex P1
201/// PR #605.)
202#[cfg(target_os = "windows")]
203async fn handle_connection(stream: NamedPipeServer, ctx: Arc<ServerCtx>) {
204 let (read_half, write_half) = tokio::io::split(stream);
205 // CPD-2 — wrap the writer in the HostPipe-compatible
206 // `Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>` shape so the
207 // SAME writer is reachable by:
208 // 1. The per-connection main loop (connection-private error
209 // replies via `send_event_shared`).
210 // 2. The per-connection fanout task (broadcast-bus events).
211 // 3. (For the host connection only) `HostPipe::send_command` /
212 // `HostPipe::send_event` after the host registers.
213 // The Mutex serializes writes, so frames can't interleave.
214 let boxed: crate::host_pipe::BoxedWriter = Box::new(write_half);
215 let writer: crate::host_pipe::SharedWriter = crate::host_pipe::make_shared_writer(boxed);
216 let reader = BufReader::new(read_half);
217 let mut lines = reader.lines();
218
219 // Per-connection state the server (not reducer) tracks: have we
220 // seen a Register yet? Reducer-level dedup is keyed by PID across
221 // all connections; this is the per-connection enforcement so a
222 // single connection can't send Ping before Register.
223 let mut registered_kind: Option<ClientKind> = None;
224 let mut registered_pid: Option<u32> = None;
225 // Connection ID is server-allocated (not state-allocated) and
226 // exists only for log correlation; the reducer-allocated
227 // client_id (returned in Registered) is the wire-visible one.
228 let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
229
230 // Phase B.8 — fanout task: subscribe to the server-wide event
231 // bus and write each event to THIS connection's pipe. Started
232 // before any commands are processed so a registered client can
233 // start receiving events from concurrent activity immediately.
234 // Aborted when the connection's read loop returns (below).
235 //
236 // CPD-2 design (round 4): events are written DIRECTLY to this
237 // connection's per-connection writer — NOT routed through
238 // `HostPipe`. HostPipe exists for *commands* (saga-issued
239 // launcher → host actions, which CPD-3 will wire). Events stay
240 // on the existing direct-write path because:
241 // 1. Wire format compat — host's parser expects raw `Event`
242 // JSON, not `HostFrame::Event` envelopes (codex P1 round 3).
243 // CPD-3 will update host's parser AND swap the fanout to
244 // HostFrame envelopes together.
245 // 2. No need for HostPipe's pending-buffer / 30s-timeout
246 // semantics on events: events are broadcast-driven, every
247 // subscriber gets them, and stale events post-reconnect
248 // would be wrong anyway.
249 // 3. Each per-connection fanout writes to its OWN writer —
250 // no global-writer race / stale-fanout issue (which is
251 // what `host_session_id` would have guarded against, but
252 // isn't needed when fanouts are connection-local).
253 let fanout_handle = {
254 let writer = Arc::clone(&writer);
255 let ctx = Arc::clone(&ctx);
256 let mut events_rx = ctx.events_tx.subscribe();
257 tokio::spawn(async move {
258 loop {
259 match events_rx.recv().await {
260 Ok(event) => {
261 // Identity is patched at the publisher (before
262 // log + bus). No need to re-patch here.
263 // Errors here mean the pipe is closed; the
264 // read loop will detect EOF on the next
265 // iteration. Swallow + continue to drain
266 // any remaining buffered events so we don't
267 // accidentally hold onto channel slots.
268 if send_event_shared(&writer, event).await.is_err() {
269 return;
270 }
271 }
272 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
273 // Slow client missed `n` events. Phase D's
274 // GetSnapshot resync covers this case
275 // properly; for now the client has to
276 // reconnect to recover. Log so operators
277 // can see when this happens.
278 crate::log(&format!(
279 "[ipc] conn_id={} lagged event bus, missed {} events",
280 conn_id, n
281 ));
282 }
283 Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
284 }
285 }
286 })
287 };
288
289 loop {
290 let line = match lines.next_line().await {
291 Ok(Some(l)) => l,
292 Ok(None) => {
293 crate::log(&format!(
294 "[ipc] connection conn_id={} closed (kind={:?}, pid={:?})",
295 conn_id, registered_kind, registered_pid
296 ));
297 // CPD-2 — clear HostPipe writer if this was the host
298 // connection so subsequent saga commands buffer
299 // (up to 64) until the host reconnects (or fail at
300 // the 30s timeout). Idempotent if not the host.
301 if matches!(registered_kind, Some(ClientKind::Host)) {
302 ctx.host_pipe.clear_writer().await;
303 }
304 // Phase E.1b — synthetic Goodbye on ungraceful
305 // disconnect so the reducer marks the PID Exited;
306 // otherwise reconnect-from-same-PID hits
307 // AlreadyRegistered. (codex P1 #610.)
308 dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
309 fanout_handle.abort();
310 return;
311 }
312 Err(e) => {
313 crate::log(&format!(
314 "[ipc] read error conn_id={}: {}",
315 conn_id, e
316 ));
317 if matches!(registered_kind, Some(ClientKind::Host)) {
318 ctx.host_pipe.clear_writer().await;
319 }
320 dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
321 fanout_handle.abort();
322 return;
323 }
324 };
325
326 if line.trim().is_empty() {
327 continue;
328 }
329
330 let cmd = match serde_json::from_str::<Command>(&line) {
331 Ok(c) => c,
332 Err(e) => {
333 // Phase E.1b — parse errors are connection-private
334 // (sent only to the offender, not broadcast, not
335 // appended to the event log). Don't bump the global
336 // event_version: other subscribers would see version
337 // gaps and treat them as missed events. Use 0 as a
338 // sentinel for "not part of the ordered stream."
339 // (codex P2 #610.)
340 let _ = send_event_shared(
341 &writer,
342 Event::Error {
343 code: ErrorCode::InvalidCommand,
344 message: format!("parse failed: {}", e),
345 fatal: false,
346 version: 0,
347 },
348 )
349 .await;
350 continue;
351 }
352 };
353
354 // Per-connection invariants: enforce here so reducer doesn't
355 // need to know about connection identity. Honor the `fatal`
356 // bit — Ping-before-Register is non-fatal (clients can
357 // recover by sending Register next), Goodbye-before-Register
358 // is fatal (can't recover from a closed-by-them connection).
359 // (reagent P1 + codex P1 PR #574 round-1.)
360 if let Some(reply) = enforce_register_first(&cmd, ®istered_kind).await {
361 let close = matches!(&reply, Event::Error { fatal: true, .. });
362 let _ = send_event_shared(&writer, reply).await;
363 if close {
364 fanout_handle.abort();
365 return;
366 }
367 continue;
368 }
369 if let Command::Register { .. } = &cmd {
370 if registered_kind.is_some() {
371 // Phase E.1b — connection-private error; same
372 // version-sentinel rationale as parse-error path
373 // above. (codex P2 #610.)
374 let _ = send_event_shared(
375 &writer,
376 Event::Error {
377 code: ErrorCode::AlreadyRegistered,
378 message: "Register sent twice on the same connection".into(),
379 fatal: false,
380 version: 0,
381 },
382 )
383 .await;
384 continue;
385 }
386 }
387
388 // Track local registration before dispatch so we can update
389 // our per-connection state if the reducer accepts it. The
390 // reducer's PID-uniqueness check might reject the Register;
391 // we re-check the events for that case below.
392 let pre_register = if let Command::Register { kind, pid, .. } = &cmd {
393 Some((*kind, *pid))
394 } else {
395 None
396 };
397
398 // Phase D.3 — `GetEvents { since }` is handled here, not in
399 // the reducer. The reducer is pure (no I/O); querying the
400 // event log is a non-mutating read against the in-memory
401 // ring + disk fallback.
402 //
403 // Phase E.1b — the reply (`Event::EventList`) is sent
404 // DIRECTLY to the requesting connection, NOT broadcast on
405 // the shared bus. EventList is request/response, not a
406 // state transition; broadcasting it would force every
407 // subscriber to process foreign replay payloads
408 // (potentially treating them as their own catch-up data,
409 // duplicating state application). (codex P1 #610.)
410 if let Command::GetEvents { since } = &cmd {
411 // Phase E.1b — read the current version WITHOUT bumping
412 // (codex P2 #610). EventList is connection-private; the
413 // version it carries is the "as-of" point for the
414 // requester's next resync, not a new state-transition
415 // marker. Bumping would create a global gap that other
416 // subscribers see as missed events.
417 let v = {
418 let state = ctx.state.lock().await;
419 state.event_version
420 };
421 let replay = ctx.event_log.events_since(*since);
422 // Don't log truncation as an error — it's a valid
423 // "subscriber missed events that have already been
424 // evicted" signal; the subscriber's resync logic
425 // handles it (re-fetch a fresh snapshot).
426 if ctx.event_log.replay_truncated(*since) {
427 crate::log(&format!(
428 "[ipc] conn_id={} GetEvents since={} truncated (oldest retained event > since+1)",
429 conn_id, since
430 ));
431 }
432 let _ = send_event_shared(
433 &writer,
434 Event::EventList {
435 events: replay,
436 version: v,
437 },
438 )
439 .await;
440 continue;
441 }
442
443 // Dispatch through the reducer. Mutex held briefly — compute
444 // the timestamp BEFORE acquiring so syscalls + string
445 // formatting don't show up in lock-hold time. (gemini
446 // MEDIUM @ server.rs:259, PR #574 round-1.)
447 let now_rfc3339 = chrono::Utc::now().to_rfc3339();
448 // Phase B.9.1 — monotonic ms since launcher start. Used by
449 // the WRR arm for per-window observability ages.
450 // `LAUNCHER_START_INSTANT` is a once-init `Instant`; the
451 // first request seeds it, subsequent ones read its delta.
452 let now_ms = launcher_start_ms();
453 let events = {
454 let mut state = ctx.state.lock().await;
455 let rctx = reducer::Ctx {
456 now_rfc3339,
457 conn_id,
458 registered_pid,
459 now_ms,
460 };
461 reducer::update(&mut state, cmd.clone(), &rctx)
462 };
463
464 // If the reducer accepted the Register (no AlreadyRegistered
465 // error in the output), commit the local connection state.
466 if let Some((kind, pid)) = pre_register {
467 let rejected = events
468 .iter()
469 .any(|e| matches!(e, Event::Error { code: ErrorCode::AlreadyRegistered, .. }));
470 if !rejected {
471 registered_kind = Some(kind);
472 registered_pid = Some(pid);
473 // CPD-2 — host registration: install this connection's
474 // writer into the launcher's HostPipe wrapper and flip
475 // the fanout task to route through it. Subsequent
476 // events for this connection traverse
477 // `HostPipe::send_event` (HostFrame::Event envelope +
478 // pending-buffer-on-disconnect semantics) instead of
479 // the legacy direct write. Drains any frames buffered
480 // since the prior host disconnect (FIFO).
481 if kind == ClientKind::Host {
482 // Install the host's writer half into HostPipe so
483 // saga-issued commands (CPD-3+) can be transmitted
484 // and so any pending Command frames buffered during
485 // a prior host disconnect drain in FIFO order.
486 // Returns a session_id we don't need today (events
487 // bypass HostPipe — see fanout task above), but the
488 // counter is in place for future code that does.
489 let session = ctx
490 .host_pipe
491 .set_writer(std::sync::Arc::clone(&writer))
492 .await;
493 crate::log(&format!(
494 "[ipc] conn_id={} host registered (session={}) — HostPipe writer installed",
495 conn_id, session
496 ));
497 }
498 }
499 }
500
501 // Phase B.8 — publish reducer events on the broadcast bus
502 // instead of writing them directly to this connection. The
503 // per-connection fanout task (spawned above) subscribes and
504 // writes them to its own pipe — including this connection,
505 // which sees its own events back. Drift events still log at
506 // the launcher level so operators see them regardless of
507 // subscriber wiring. (codex P1 PR #605.)
508 let goodbye = matches!(cmd, Command::Goodbye);
509 for event in events {
510 if let Event::DriftDetected {
511 kind,
512 host_count,
513 mirror_count,
514 ..
515 } = &event
516 {
517 crate::log(&format!(
518 "[ipc] DRIFT {:?}: host={} mirror={} (conn_id={})",
519 kind, host_count, mirror_count, conn_id
520 ));
521 }
522 if let Event::HwndDriftDetected {
523 kind,
524 label,
525 hwnd,
526 detail,
527 severity,
528 ..
529 } = &event
530 {
531 crate::log(&format!(
532 "[ipc] WRR-DRIFT [{:?}] {:?} label={:?} hwnd={:?}: {} (conn_id={})",
533 severity, kind, label, hwnd, detail, conn_id
534 ));
535 }
536 // Phase E.1a (codex P2 #608) — patch sentinel identity
537 // BEFORE appending to the log. Reducer emits
538 // `Event::Registered { launcher_pid: 0, launcher_version: "" }`
539 // because it doesn't know the launcher's identity; the
540 // server fills it in. Pre-fix, the patch happened only at
541 // per-connection write, so `GetEvents` replay returned
542 // stored sentinels, inconsistent with live broadcast.
543 let event = patch_launcher_identity(event, &ctx);
544
545 // Phase D.2 — append to the in-memory ring BEFORE
546 // broadcasting so a connection's GetEvents query that
547 // races a just-published event sees consistent
548 // results. Disk persistence (separate task) is best-
549 // effort and may lag. Snapshot / EventList variants
550 // are NOT appended — they're meta-events about the
551 // event stream itself; including them would create
552 // recursive replay (an EventList containing EventLists
553 // is meaningless). Errors are also skipped — they're
554 // per-client diagnostics, not state transitions.
555 if !matches!(event, Event::Snapshot { .. } | Event::EventList { .. } | Event::Error { .. }) {
556 ctx.event_log.append(event.clone());
557 }
558 // Send may fail when no receivers exist (e.g., during
559 // shutdown). That's fine — events are advisory in that
560 // window and the per-connection fanout tasks own retry
561 // semantics via subscribe().
562 let _ = ctx.events_tx.send(event);
563 }
564 if goodbye {
565 crate::log(&format!(
566 "[ipc] goodbye from conn_id={} kind={:?} pid={:?}",
567 conn_id, registered_kind, registered_pid
568 ));
569 // CPD-2 — clear HostPipe writer on graceful goodbye too
570 // so a host that re-registers via a fresh connection can
571 // re-install cleanly. Idempotent if not host.
572 if matches!(registered_kind, Some(ClientKind::Host)) {
573 ctx.host_pipe.clear_writer().await;
574 }
575 fanout_handle.abort();
576 return;
577 }
578 }
579}
580
581/// Per-connection counter for log-correlation IDs (NOT the wire
582/// client_id — that comes from the reducer). Allocated even for
583/// pre-Register failures so log lines can be correlated.
584static NEXT_CONN_ID: std::sync::atomic::AtomicU64 =
585 std::sync::atomic::AtomicU64::new(1);
586
587/// Phase E.1b — synthetic Goodbye dispatch for ungraceful disconnects
588/// (EOF / read error before the client sent an explicit Goodbye).
589/// Without this, the reducer's process record stays Running and a
590/// reconnect from the same live PID hits AlreadyRegistered.
591/// (codex P1 #610.)
592#[cfg(target_os = "windows")]
593async fn dispatch_synthetic_goodbye(
594 ctx: &Arc<ServerCtx>,
595 conn_id: u64,
596 registered_pid: Option<u32>,
597) {
598 let Some(pid) = registered_pid else {
599 return;
600 };
601 let now_rfc3339 = chrono::Utc::now().to_rfc3339();
602 let now_ms = launcher_start_ms();
603 let events = {
604 let mut state = ctx.state.lock().await;
605 let rctx = reducer::Ctx {
606 now_rfc3339,
607 conn_id,
608 registered_pid: Some(pid),
609 now_ms,
610 };
611 reducer::update(&mut state, Command::Goodbye, &rctx)
612 };
613 for event in events {
614 let event = patch_launcher_identity(event, ctx);
615 if !matches!(
616 event,
617 Event::Snapshot { .. } | Event::EventList { .. } | Event::Error { .. }
618 ) {
619 ctx.event_log.append(event.clone());
620 }
621 let _ = ctx.events_tx.send(event);
622 }
623}
624
625/// Phase B.9.1 — milliseconds since the launcher's IPC server
626/// started (first call seeds the epoch). Used as the monotonic
627/// clock for WRR observability timestamps in `reducer::Ctx::now_ms`.
628/// Distinct from `chrono::Utc::now()` because the WRR arm wants
629/// elapsed time, not wall clock — and we don't want clock-skew
630/// jitter (NTP adjustment, DST) showing up as drift.
631fn launcher_start_ms() -> u64 {
632 static START: std::sync::OnceLock<std::time::Instant> = std::sync::OnceLock::new();
633 let start = START.get_or_init(std::time::Instant::now);
634 start.elapsed().as_millis() as u64
635}
636
637/// Enforce the "first message must be Register" invariant. Returns
638/// `Some(Event::Error)` if the command violates the contract; the
639/// caller sends it and closes the connection.
640#[cfg(target_os = "windows")]
641async fn enforce_register_first(
642 cmd: &Command,
643 registered_kind: &Option<ClientKind>,
644) -> Option<Event> {
645 // F.7 cleanup audit: prior signature accepted `ctx: &Arc<ServerCtx>`
646 // for symmetry with neighboring helpers, but no body site read it.
647 // Dropped to silence the unused-variable warning without an allow.
648 if registered_kind.is_some() {
649 return None;
650 }
651 let (msg, fatal) = match cmd {
652 Command::Register { .. } => return None,
653 Command::Ping { .. } => ("Ping before Register".to_string(), false),
654 Command::Goodbye => ("Goodbye before Register".to_string(), true),
655 Command::ReportWindowOpened { .. } => {
656 ("ReportWindowOpened before Register".to_string(), true)
657 }
658 Command::ReportWindowClosed { .. } => {
659 ("ReportWindowClosed before Register".to_string(), true)
660 }
661 Command::ReportPoolWindowAdded { .. } => {
662 ("ReportPoolWindowAdded before Register".to_string(), true)
663 }
664 Command::ReportPoolWindowRemoved { .. } => {
665 ("ReportPoolWindowRemoved before Register".to_string(), true)
666 }
667 // Phase F.5 — host-only report; gate matches the other pool-
668 // mirror reports above.
669 Command::ReportPoolWindowPromoted { .. } => {
670 ("ReportPoolWindowPromoted before Register".to_string(), true)
671 }
672 // Phase F.5 — `SpawnPoolWindow` is a launcher→host direction
673 // command. Sent to the launcher pipe before Register: same
674 // soft-error treatment as the srv-pipe misroutes (the client
675 // can recover by registering or routing correctly).
676 Command::SpawnPoolWindow { .. } => {
677 ("SpawnPoolWindow is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
678 }
679 // Phase F.6 — host-only reports; same fatal-before-Register
680 // treatment as the other window-mirror reports above.
681 Command::ReportPanesReaped { .. } => {
682 ("ReportPanesReaped before Register".to_string(), true)
683 }
684 Command::ReportPoolDrainDecision { .. } => {
685 ("ReportPoolDrainDecision before Register".to_string(), true)
686 }
687 // Phase F.6 — launcher→host direction commands. Same
688 // soft-error treatment as `SpawnPoolWindow` above.
689 Command::ReapPanes { .. } => {
690 ("ReapPanes is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
691 }
692 Command::DrainPoolIfLast { .. } => {
693 ("DrainPoolIfLast is a launcher→host command; sent to launcher pipe by mistake".to_string(), false)
694 }
695 // Phase CPD-1 — host-only saga-action-failed report. Same
696 // fatal-before-Register treatment as the other Report*
697 // commands above.
698 Command::ReportSagaActionFailed { .. } => {
699 ("ReportSagaActionFailed before Register".to_string(), true)
700 }
701 Command::ReportHostCounts { .. } => {
702 ("ReportHostCounts before Register".to_string(), true)
703 }
704 Command::ReportHostPoolCount { .. } => {
705 ("ReportHostPoolCount before Register".to_string(), true)
706 }
707 Command::ReportBackendWindowIdRegistered { .. } => {
708 (
709 "ReportBackendWindowIdRegistered before Register".to_string(),
710 true,
711 )
712 }
713 Command::ReportBackendWindowIdUnregistered { .. } => {
714 (
715 "ReportBackendWindowIdUnregistered before Register".to_string(),
716 true,
717 )
718 }
719 // Phase B.9.1 (WRR) — host-only Win32 reality reports.
720 Command::ReportHwndOpened { .. } => {
721 ("ReportHwndOpened before Register".to_string(), true)
722 }
723 Command::ReportHwndDestroyed { .. } => {
724 ("ReportHwndDestroyed before Register".to_string(), true)
725 }
726 Command::ReportHwndVisibilityChanged { .. } => {
727 (
728 "ReportHwndVisibilityChanged before Register".to_string(),
729 true,
730 )
731 }
732 Command::ReportHwndForegroundChanged { .. } => {
733 (
734 "ReportHwndForegroundChanged before Register".to_string(),
735 true,
736 )
737 }
738 Command::ReportHwndIconicChanged { .. } => {
739 ("ReportHwndIconicChanged before Register".to_string(), true)
740 }
741 Command::ReportHwndPositionChanged { .. } => {
742 (
743 "ReportHwndPositionChanged before Register".to_string(),
744 true,
745 )
746 }
747 Command::ReportMonitorTopologyChanged { .. } => {
748 (
749 "ReportMonitorTopologyChanged before Register".to_string(),
750 true,
751 )
752 }
753 // Phase D.1 — GetSnapshot before Register is non-fatal: any
754 // sane diagnostic client can fix it by retrying with Register
755 // first. (Same Ping-before-Register precedent — soft error.)
756 Command::GetSnapshot => ("GetSnapshot before Register".to_string(), false),
757 // Phase D.3 — GetEvents before Register: same non-fatal
758 // semantics as GetSnapshot.
759 Command::GetEvents { .. } => ("GetEvents before Register".to_string(), false),
760 // Phase E.1b — GetSrvSnapshot is a srv-pipe command; if a
761 // client sends it to the launcher pipe by mistake, soft
762 // error: not the launcher's command.
763 Command::GetSrvSnapshot => (
764 "GetSrvSnapshot is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
765 false,
766 ),
767 // Phase E.2 — srv-pipe commands sent to launcher pipe by
768 // mistake. Soft error — clients can recover by routing to
769 // the right pipe.
770 Command::CreateWorkspace { .. } => (
771 "CreateWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
772 false,
773 ),
774 Command::DeleteWorkspace { .. } => (
775 "DeleteWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
776 false,
777 ),
778 // Phase E.2b — Tab arms are also srv-pipe commands.
779 Command::CreateTab { .. } => (
780 "CreateTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
781 false,
782 ),
783 Command::DeleteTab { .. } => (
784 "DeleteTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
785 false,
786 ),
787 Command::SetActiveTab { .. } => (
788 "SetActiveTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
789 false,
790 ),
791 Command::ReorderTab { .. } => (
792 "ReorderTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
793 false,
794 ),
795 // Phase E.5 — window↔workspace mapping commands are srv-pipe.
796 Command::CreateWindow { .. } => (
797 "CreateWindow is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
798 false,
799 ),
800 Command::CloseWindowInternal { .. } => (
801 "CloseWindowInternal is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
802 false,
803 ),
804 Command::SwitchWorkspace { .. } => (
805 "SwitchWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
806 false,
807 ),
808 // Phase E.5.3 — atomic single-step domain commands are srv-pipe.
809 Command::ReorderTabsBulk { .. } => (
810 "ReorderTabsBulk is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
811 false,
812 ),
813 Command::RenameWorkspace { .. } => (
814 "RenameWorkspace is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
815 false,
816 ),
817 Command::RenameTab { .. } => (
818 "RenameTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
819 false,
820 ),
821 Command::UpdateWorkspaceMeta { .. } => (
822 "UpdateWorkspaceMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
823 false,
824 ),
825 Command::UpdateTabMeta { .. } => (
826 "UpdateTabMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
827 false,
828 ),
829 Command::UpdateBlockMeta { .. } => (
830 "UpdateBlockMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
831 false,
832 ),
833 // Phase E.3 — Block arms are also srv-pipe commands.
834 Command::CreateBlock { .. } => (
835 "CreateBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
836 false,
837 ),
838 Command::DeleteBlock { .. } => (
839 "DeleteBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
840 false,
841 ),
842 // Phase E.5.5 — saga-driven move commands are srv-pipe.
843 Command::MoveTab { .. } => (
844 "MoveTab is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
845 false,
846 ),
847 Command::MoveBlock { .. } => (
848 "MoveBlock is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
849 false,
850 ),
851 // Phase E.4 (Option A) — layout focused/magnified setters are srv-pipe.
852 Command::SetFocusedNode { .. } => (
853 "SetFocusedNode is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
854 false,
855 ),
856 Command::SetMagnifiedNode { .. } => (
857 "SetMagnifiedNode is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
858 false,
859 ),
860 // Phase E.4.B — all layout-tree commands are srv-pipe only.
861 Command::LayoutInsertNode { .. }
862 | Command::LayoutInsertNodeAtIndex { .. }
863 | Command::LayoutDeleteNode { .. }
864 | Command::LayoutMoveNode { .. }
865 | Command::LayoutSwapNodes { .. }
866 | Command::LayoutResizeNodes { .. }
867 | Command::LayoutReplaceNode { .. }
868 | Command::LayoutSplitHorizontal { .. }
869 | Command::LayoutSplitVertical { .. }
870 | Command::LayoutClear { .. }
871 | Command::LayoutSetTree { .. } => (
872 "Layout command is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
873 false,
874 ),
875 Command::UpdateWindowMeta { .. } => (
876 "UpdateWindowMeta is a srv-pipe command; sent to launcher pipe by mistake".to_string(),
877 false,
878 ),
879 };
880 // Phase E.1b — connection-private error; sentinel version=0
881 // (codex P2 #610). See parse-error path for rationale.
882 let v = 0;
883 Some(Event::Error {
884 code: ErrorCode::NotRegistered,
885 message: msg,
886 fatal,
887 version: v,
888 })
889}
890
891/// Patch `launcher_pid` + `launcher_version` into `Event::Registered`.
892/// The reducer leaves these as sentinels (it can't read env without
893/// breaking determinism) — the server fills them in here, just
894/// before serializing to the wire.
895fn patch_launcher_identity(event: Event, ctx: &Arc<ServerCtx>) -> Event {
896 if let Event::Registered {
897 client_id, version, ..
898 } = event
899 {
900 Event::Registered {
901 client_id,
902 launcher_pid: ctx.launcher_pid,
903 launcher_version: ctx.launcher_version.clone(),
904 version,
905 }
906 } else {
907 event
908 }
909}
910
911/// Serialize an Event as one JSON line + `\n` and write atomically
912/// (under the per-connection writer mutex). Returns Err if the
913/// connection died mid-write.
914///
915/// CPD-2 — generalized from `Arc<Mutex<WriteHalf<NamedPipeServer>>>`
916/// to `crate::host_pipe::SharedWriter` so the launcher's IPC server
917/// + the host_pipe wrapper share one writer-handle representation.
918/// Wire shape is unchanged: a non-host client sees a raw `Event` JSON
919/// line (legacy schema), a host client sees a `HostFrame::Event`
920/// envelope only when the frame goes through `HostPipe::send_event`.
921/// Connection-private error replies in this file still emit raw
922/// `Event` JSON to preserve backwards compat with existing host
923/// versions that haven't adopted the envelope yet — CPD-1 lands the
924/// host-side schema migration.
925#[cfg(target_os = "windows")]
926async fn send_event_shared(
927 writer: &crate::host_pipe::SharedWriter,
928 event: Event,
929) -> std::io::Result<()> {
930 let mut buf = serde_json::to_vec(&event).map_err(|e| {
931 std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
932 })?;
933 buf.push(b'\n');
934 let mut w = writer.lock().await;
935 w.write_all(&buf).await?;
936 w.flush().await?;
937 Ok(())
938}