agentmux_launcher\reducer/connection.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Connection-lifecycle reducer handlers. Extracted from
5//! reducer/mod.rs in task #182 PR-E for navigability.
6//!
7//! Covers: register, goodbye, get_snapshot, plus the
8//! enforce_host_only gate and host_is_running predicate that
9//! window/saga handlers also call (via super::).
10
11use agentmux_common::ipc::{ClientKind, ErrorCode, Event};
12
13use crate::reducer::Ctx;
14use crate::state::{LifecyclePhase, ProcessRecord, ProcessState, State};
15
16/// Phase B.4 — gate window-mirror reports to Host clients only. The
17/// host is the only source of truth about its own window lifecycle;
18/// allowing Renderer/Srv/Tool clients to mutate the mirror would let
19/// any registered process spoof open/close traffic and break the
20/// host-authoritative model. (codex P1 PR #576 round-1.)
21///
22/// Returns `Some(Error)` if the calling connection is not a Host;
23/// `None` if the call is allowed to proceed. Looks up the kind by
24/// PID rather than threading it through `Ctx` because `processes`
25/// is already the canonical source — single source of truth, no
26/// extra plumbing.
27pub(super) fn enforce_host_only(state: &mut State, ctx: &Ctx, op: &'static str) -> Option<Event> {
28 let kind = ctx
29 .registered_pid
30 .and_then(|pid| state.processes.get(&pid).map(|r| r.kind));
31 if kind == Some(ClientKind::Host) {
32 return None;
33 }
34 let v = state.bump_version();
35 Some(Event::Error {
36 code: ErrorCode::NotRegistered,
37 message: format!(
38 "{} is Host-only; caller kind={:?}",
39 op, kind
40 ),
41 fatal: false,
42 version: v,
43 })
44}
45
46
47/// Phase B.9.3 — does state.processes contain a Host in the
48/// `Running` lifecycle? Used by the OrphanInstance transition
49/// check; without this guard, a stale Exited record would fire
50/// HostShouldQuit on every benign close. Tool-only sessions
51/// (`agentmux.exe --diag`) also correctly skip the saga because
52/// they never register a Host.
53pub(super) fn host_is_running(state: &State) -> bool {
54 use crate::state::ProcessState;
55 state.processes.values().any(|r| {
56 r.kind == ClientKind::Host && matches!(r.state, ProcessState::Running)
57 })
58}
59
60/// Phase D.1 — clone the reducer's canonical state into a `Snapshot`
61/// event. Read-only; doesn't mutate state except for bumping
62/// `event_version` so the snapshot's version is monotonically
63/// distinct from prior events (subscribers applying snapshot + delta
64/// events know the snapshot is "as-of" this version).
65///
66/// Sorted-vec serialization (rather than HashMap-as-JSON-object) for:
67/// 1. Deterministic ordering across snapshots (idempotent diffs in
68/// operator output, easier test assertions).
69/// 2. Wire compatibility with `Vec<(K, V)>` decoders that don't
70/// require canonical-string-keyed JSON objects.
71pub(super) fn handle_get_snapshot(state: &mut State) -> Vec<Event> {
72 let v = state.bump_version();
73
74 let mut windows: Vec<agentmux_common::ipc::WindowSnapshot> = state
75 .windows
76 .values()
77 .map(|w| agentmux_common::ipc::WindowSnapshot {
78 label: w.label.clone(),
79 kind: w.kind,
80 parent_label: w.parent_label.clone(),
81 hwnd: w.hwnd,
82 visible: w.visible,
83 iconic: w.iconic,
84 last_rect: w.last_rect,
85 foregrounded_since_open: w.foregrounded_since_open,
86 })
87 .collect();
88 windows.sort_by(|a, b| a.label.cmp(&b.label));
89
90 let mut pool: Vec<String> = state.pool.iter().cloned().collect();
91 pool.sort();
92
93 let mut instance_registry: Vec<(String, u32)> =
94 state.instance_registry.iter().map(|(k, v)| (k.clone(), *v)).collect();
95 instance_registry.sort_by(|a, b| a.0.cmp(&b.0));
96
97 let mut backend_window_ids: Vec<(String, String)> = state
98 .backend_window_ids
99 .iter()
100 .map(|(k, v)| (k.clone(), v.clone()))
101 .collect();
102 backend_window_ids.sort_by(|a, b| a.0.cmp(&b.0));
103
104 vec![Event::Snapshot {
105 version: v,
106 lifecycle: state.lifecycle,
107 windows,
108 pool,
109 instance_registry,
110 backend_window_ids,
111 monitors: state.monitors.clone(),
112 }]
113}
114
115pub(super) fn handle_register(
116 state: &mut State,
117 ctx: &Ctx,
118 kind: ClientKind,
119 pid: u32,
120 version: String,
121) -> Vec<Event> {
122 let mut out = Vec::with_capacity(3);
123
124 // Cross-connection invariant: only ONE live ProcessRecord per
125 // PID. We DO allow re-registration if the existing record is
126 // Exited — the OS recycles PIDs over a long-running launcher,
127 // so a new process can legitimately end up with a PID that was
128 // previously held by a process that has cleanly Goodbye'd.
129 // Without this carve-out, the process map would accumulate dead
130 // records and the launcher would reject increasingly many real
131 // registrations. (gemini MEDIUM PR #574 round-1.)
132 let existing_state = state.processes.get(&pid).map(|r| r.state);
133 if let Some(existing_state) = existing_state {
134 if !matches!(existing_state, ProcessState::Exited { .. }) {
135 let v = state.bump_version();
136 out.push(Event::Error {
137 code: ErrorCode::AlreadyRegistered,
138 message: format!(
139 "pid {} already in process registry (state={:?})",
140 pid, existing_state
141 ),
142 fatal: true,
143 version: v,
144 });
145 return out;
146 }
147 // Else: fall through. The insert below replaces the Exited
148 // record with the new live one — same entry, fresh state.
149 }
150
151 let record = ProcessRecord {
152 pid,
153 kind,
154 state: ProcessState::Running,
155 spawned_at: ctx.now_rfc3339.clone(),
156 version: version.clone(),
157 };
158 state.processes.insert(pid, record);
159
160 let spawned_v = state.bump_version();
161 out.push(Event::ProcessSpawned {
162 pid,
163 kind,
164 client_version: version,
165 version: spawned_v,
166 });
167
168 // Lifecycle: Starting → Running when the first Host registers.
169 // Subsequent Host re-registers (after a host crash + restart in
170 // some future world) won't double-fire because we'd already be
171 // in Running. Other client kinds (Renderer, Srv, Tool) don't
172 // drive the transition.
173 if state.lifecycle == LifecyclePhase::Starting && kind == ClientKind::Host {
174 let from = state.lifecycle;
175 state.lifecycle = LifecyclePhase::Running;
176 let v = state.bump_version();
177 out.push(Event::LifecyclePhaseChanged {
178 from,
179 to: LifecyclePhase::Running,
180 version: v,
181 });
182 }
183
184 let registered_v = state.bump_version();
185 let client_id = state.alloc_client_id();
186 out.push(Event::Registered {
187 client_id,
188 // launcher_pid + launcher_version are filled in by the
189 // server before broadcast — they don't belong in the pure
190 // reducer (env reads). We use a sentinel here; the server
191 // patches these before sending.
192 launcher_pid: 0,
193 launcher_version: String::new(),
194 version: registered_v,
195 });
196
197 out
198}
199
200pub(super) fn handle_goodbye(state: &mut State, pid: u32) -> Vec<Event> {
201 if pid == 0 {
202 // No pid known for this connection (B.3 limitation). Just
203 // emit a synthetic ProcessExited with pid=0 to signal the
204 // graceful close; the server will log + close.
205 let v = state.bump_version();
206 return vec![Event::ProcessExited {
207 pid: 0,
208 code: 0,
209 version: v,
210 }];
211 }
212 if let Some(record) = state.processes.get_mut(&pid) {
213 record.state = ProcessState::Exited { code: 0 };
214 }
215 let v = state.bump_version();
216 vec![Event::ProcessExited {
217 pid,
218 code: 0,
219 version: v,
220 }]
221}