agentmux_launcher\reducer/
connection.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Connection-lifecycle reducer handlers. Extracted from
5//! reducer/mod.rs in task #182 PR-E for navigability.
6//!
7//! Covers: register, goodbye, get_snapshot, plus the
8//! enforce_host_only gate and host_is_running predicate that
9//! window/saga handlers also call (via super::).
10
11use agentmux_common::ipc::{ClientKind, ErrorCode, Event};
12
13use crate::reducer::Ctx;
14use crate::state::{LifecyclePhase, ProcessRecord, ProcessState, State};
15
16/// Phase B.4 — gate window-mirror reports to Host clients only. The
17/// host is the only source of truth about its own window lifecycle;
18/// allowing Renderer/Srv/Tool clients to mutate the mirror would let
19/// any registered process spoof open/close traffic and break the
20/// host-authoritative model. (codex P1 PR #576 round-1.)
21///
22/// Returns `Some(Error)` if the calling connection is not a Host;
23/// `None` if the call is allowed to proceed. Looks up the kind by
24/// PID rather than threading it through `Ctx` because `processes`
25/// is already the canonical source — single source of truth, no
26/// extra plumbing.
27pub(super) fn enforce_host_only(state: &mut State, ctx: &Ctx, op: &'static str) -> Option<Event> {
28    let kind = ctx
29        .registered_pid
30        .and_then(|pid| state.processes.get(&pid).map(|r| r.kind));
31    if kind == Some(ClientKind::Host) {
32        return None;
33    }
34    let v = state.bump_version();
35    Some(Event::Error {
36        code: ErrorCode::NotRegistered,
37        message: format!(
38            "{} is Host-only; caller kind={:?}",
39            op, kind
40        ),
41        fatal: false,
42        version: v,
43    })
44}
45
46
47/// Phase B.9.3 — does state.processes contain a Host in the
48/// `Running` lifecycle? Used by the OrphanInstance transition
49/// check; without this guard, a stale Exited record would fire
50/// HostShouldQuit on every benign close. Tool-only sessions
51/// (`agentmux.exe --diag`) also correctly skip the saga because
52/// they never register a Host.
53pub(super) fn host_is_running(state: &State) -> bool {
54    use crate::state::ProcessState;
55    state.processes.values().any(|r| {
56        r.kind == ClientKind::Host && matches!(r.state, ProcessState::Running)
57    })
58}
59
60/// Phase D.1 — clone the reducer's canonical state into a `Snapshot`
61/// event. Read-only; doesn't mutate state except for bumping
62/// `event_version` so the snapshot's version is monotonically
63/// distinct from prior events (subscribers applying snapshot + delta
64/// events know the snapshot is "as-of" this version).
65///
66/// Sorted-vec serialization (rather than HashMap-as-JSON-object) for:
67/// 1. Deterministic ordering across snapshots (idempotent diffs in
68///    operator output, easier test assertions).
69/// 2. Wire compatibility with `Vec<(K, V)>` decoders that don't
70///    require canonical-string-keyed JSON objects.
71pub(super) fn handle_get_snapshot(state: &mut State) -> Vec<Event> {
72    let v = state.bump_version();
73
74    let mut windows: Vec<agentmux_common::ipc::WindowSnapshot> = state
75        .windows
76        .values()
77        .map(|w| agentmux_common::ipc::WindowSnapshot {
78            label: w.label.clone(),
79            kind: w.kind,
80            parent_label: w.parent_label.clone(),
81            hwnd: w.hwnd,
82            visible: w.visible,
83            iconic: w.iconic,
84            last_rect: w.last_rect,
85            foregrounded_since_open: w.foregrounded_since_open,
86        })
87        .collect();
88    windows.sort_by(|a, b| a.label.cmp(&b.label));
89
90    let mut pool: Vec<String> = state.pool.iter().cloned().collect();
91    pool.sort();
92
93    let mut instance_registry: Vec<(String, u32)> =
94        state.instance_registry.iter().map(|(k, v)| (k.clone(), *v)).collect();
95    instance_registry.sort_by(|a, b| a.0.cmp(&b.0));
96
97    let mut backend_window_ids: Vec<(String, String)> = state
98        .backend_window_ids
99        .iter()
100        .map(|(k, v)| (k.clone(), v.clone()))
101        .collect();
102    backend_window_ids.sort_by(|a, b| a.0.cmp(&b.0));
103
104    vec![Event::Snapshot {
105        version: v,
106        lifecycle: state.lifecycle,
107        windows,
108        pool,
109        instance_registry,
110        backend_window_ids,
111        monitors: state.monitors.clone(),
112    }]
113}
114
115pub(super) fn handle_register(
116    state: &mut State,
117    ctx: &Ctx,
118    kind: ClientKind,
119    pid: u32,
120    version: String,
121) -> Vec<Event> {
122    let mut out = Vec::with_capacity(3);
123
124    // Cross-connection invariant: only ONE live ProcessRecord per
125    // PID. We DO allow re-registration if the existing record is
126    // Exited — the OS recycles PIDs over a long-running launcher,
127    // so a new process can legitimately end up with a PID that was
128    // previously held by a process that has cleanly Goodbye'd.
129    // Without this carve-out, the process map would accumulate dead
130    // records and the launcher would reject increasingly many real
131    // registrations. (gemini MEDIUM PR #574 round-1.)
132    let existing_state = state.processes.get(&pid).map(|r| r.state);
133    if let Some(existing_state) = existing_state {
134        if !matches!(existing_state, ProcessState::Exited { .. }) {
135            let v = state.bump_version();
136            out.push(Event::Error {
137                code: ErrorCode::AlreadyRegistered,
138                message: format!(
139                    "pid {} already in process registry (state={:?})",
140                    pid, existing_state
141                ),
142                fatal: true,
143                version: v,
144            });
145            return out;
146        }
147        // Else: fall through. The insert below replaces the Exited
148        // record with the new live one — same entry, fresh state.
149    }
150
151    let record = ProcessRecord {
152        pid,
153        kind,
154        state: ProcessState::Running,
155        spawned_at: ctx.now_rfc3339.clone(),
156        version: version.clone(),
157    };
158    state.processes.insert(pid, record);
159
160    let spawned_v = state.bump_version();
161    out.push(Event::ProcessSpawned {
162        pid,
163        kind,
164        client_version: version,
165        version: spawned_v,
166    });
167
168    // Lifecycle: Starting → Running when the first Host registers.
169    // Subsequent Host re-registers (after a host crash + restart in
170    // some future world) won't double-fire because we'd already be
171    // in Running. Other client kinds (Renderer, Srv, Tool) don't
172    // drive the transition.
173    if state.lifecycle == LifecyclePhase::Starting && kind == ClientKind::Host {
174        let from = state.lifecycle;
175        state.lifecycle = LifecyclePhase::Running;
176        let v = state.bump_version();
177        out.push(Event::LifecyclePhaseChanged {
178            from,
179            to: LifecyclePhase::Running,
180            version: v,
181        });
182    }
183
184    let registered_v = state.bump_version();
185    let client_id = state.alloc_client_id();
186    out.push(Event::Registered {
187        client_id,
188        // launcher_pid + launcher_version are filled in by the
189        // server before broadcast — they don't belong in the pure
190        // reducer (env reads). We use a sentinel here; the server
191        // patches these before sending.
192        launcher_pid: 0,
193        launcher_version: String::new(),
194        version: registered_v,
195    });
196
197    out
198}
199
200pub(super) fn handle_goodbye(state: &mut State, pid: u32) -> Vec<Event> {
201    if pid == 0 {
202        // No pid known for this connection (B.3 limitation). Just
203        // emit a synthetic ProcessExited with pid=0 to signal the
204        // graceful close; the server will log + close.
205        let v = state.bump_version();
206        return vec![Event::ProcessExited {
207            pid: 0,
208            code: 0,
209            version: v,
210        }];
211    }
212    if let Some(record) = state.processes.get_mut(&pid) {
213        record.state = ProcessState::Exited { code: 0 };
214    }
215    let v = state.bump_version();
216    vec![Event::ProcessExited {
217        pid,
218        code: 0,
219        version: v,
220    }]
221}