agentmux_launcher/
srv_spawner.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Spawn the agentmux-srv backend sidecar from the LAUNCHER (Phase B.1).
5//
6// Today's flow (pre-Phase-B): launcher spawns host; host spawns srv;
7// host owns a Job Object J1 wrapping srv; renderers inherit launcher's
8// J0 via host. Result: when host crashes, J1 closes and srv dies.
9//
10// Phase B.1 flow: launcher spawns BOTH srv and host as siblings,
11// assigns BOTH to launcher's J0 directly. Host's J1 on srv is
12// deleted (it would actively defeat "srv survives host crash" if
13// kept). Renderers continue to inherit J0 via host as before.
14//
15// The launcher passes srv's endpoints to the host via env vars
16// (AGENTMUX_BACKEND_WS, _WEB, _PID). Host detects them and skips
17// its own spawn_backend path (which is preserved for `task dev`
18// fallback where launcher isn't in the loop).
19//
20// Adapted from `agentmux-cef/src/sidecar.rs::spawn_backend` —
21// kept structurally similar so divergence is auditable. Key
22// differences:
23//   * Tokio process API (not std::process)
24//   * CREATE_SUSPENDED + assign-to-job + ResumeThread (PR #570 race
25//     pattern, applied to srv too)
26//   * No separate Job Object on srv; launcher's J0 covers it
27//   * Auth key generated here, not consumed from a shared AppState
28//   * stderr ESTART parsing returns the result via tokio mpsc
29
30use std::path::{Path, PathBuf};
31use std::process::Stdio;
32
33use tokio::io::{AsyncBufReadExt, BufReader};
34use tokio::process::{Child, Command};
35use tokio::sync::mpsc;
36
37use crate::data_dir::DataPaths;
38
39/// What the launcher learns about srv after it signals ready.
40/// Held by the launcher and used to populate env vars the host reads.
41#[derive(Debug, Clone)]
42pub struct SrvSpawnResult {
43    pub pid: u32,
44    pub ws_endpoint: String,
45    pub web_endpoint: String,
46    pub instance_id: String,
47    pub auth_key: String,
48    /// RFC3339 timestamp captured when ESTART arrived. Carried on the
49    /// result for `--diag` / debug observability; not currently
50    /// propagated into env. F.7 cleanup audit: keep with allow + this
51    /// note rather than delete — a future `--diag srv` printer is the
52    /// natural reader.
53    #[allow(dead_code)]
54    pub started_at: String,
55}
56
57/// Errors during srv spawn — granular enough that the launcher can
58/// log the right diagnostic.
59#[derive(Debug)]
60pub enum SrvSpawnError {
61    BinaryNotFound(String),
62    SpawnFailed(String),
63    JobAssignFailed(String),
64    ResumeFailed(String),
65    EstartTimeout,
66    EstartChannelClosed,
67}
68
69impl std::fmt::Display for SrvSpawnError {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            Self::BinaryNotFound(s) => write!(f, "srv binary not found: {}", s),
73            Self::SpawnFailed(s) => write!(f, "spawn failed: {}", s),
74            Self::JobAssignFailed(s) => write!(f, "AssignProcessToJobObject failed: {}", s),
75            Self::ResumeFailed(s) => write!(f, "ResumeThread failed: {}", s),
76            Self::EstartTimeout => write!(f, "timeout waiting for AGENTMUXSRV-ESTART (30s)"),
77            Self::EstartChannelClosed => {
78                write!(f, "ESTART channel closed before srv signalled ready")
79            }
80        }
81    }
82}
83
84/// Spawn srv as a child of the launcher, assigned to launcher's
85/// Job Object J0 so it dies cleanly with the launcher tree.
86///
87/// `launcher_exe_dir` is used to locate the srv binary alongside the
88/// launcher (or in `runtime/` for portable). `paths` carries the
89/// data + config dirs and is propagated to srv via env vars.
90/// `job_handle` is the launcher's Job Object so srv joins the same
91/// kill-on-job-close contract as the host. Returns once srv prints
92/// `AGENTMUXSRV-ESTART` (or the 30s timeout fires).
93///
94/// Caller keeps the returned `Child` alive — drop closes srv's
95/// stdin and srv's existing PPID death-watcher takes over (already
96/// part of agentmux-srv per `SPEC_BACKEND_LIFECYCLE.md`).
97// Phase E.1b — `srv_pipe_path` is the launcher-computed pipe path
98// (same data-dir hash as launcher's own pipe, different leaf name).
99// Passed via `AGENTMUX_SRV_PIPE_PATH` so srv doesn't have to
100// recompute the hash; launcher is the single source of truth.
101pub async fn spawn_srv(
102    launcher_exe_dir: &Path,
103    paths: &DataPaths,
104    srv_pipe_path: &str,
105    #[cfg(target_os = "windows")] job_handle: windows_sys::Win32::Foundation::HANDLE,
106) -> Result<(SrvSpawnResult, Child), SrvSpawnError> {
107    let backend_path = resolve_srv_binary(launcher_exe_dir)?;
108
109    // Generate a fresh auth_key per run (UUID v4 — same as host did).
110    // This is the launcher's responsibility now; host receives it via
111    // AGENTMUX_AUTH_KEY env so srv + host + frontend agree on the key.
112    let auth_key = uuid::Uuid::new_v4().to_string();
113    let version = env!("CARGO_PKG_VERSION");
114    let instance_id = format!("v{}", version);
115
116    // app_path = launcher's exe dir (used by srv for finding bundled
117    // tooling like jq.exe / rg.exe). In portable mode this is the
118    // top of the portable folder; the runtime/tools/bin/ subdir
119    // lives under runtime/, but srv's app_path lookup is currently
120    // exe_dir-based per the host's code.
121    //
122    // For B.1 we keep parity: pass exe_dir of the LAUNCHER. If srv
123    // tooling lookup breaks, follow up — log it loudly.
124    let app_path_str = launcher_exe_dir.to_string_lossy().to_string();
125
126    let mut cmd = Command::new(&backend_path);
127    cmd.args([
128        "--wavedata",
129        &paths.data_dir.to_string_lossy(),
130        "--instance",
131        &instance_id,
132    ])
133    .env("AGENTMUX_AUTH_KEY", &auth_key)
134    // Canonical AGENTMUX_* env vars (INSTANCE_DIR / DATA_DIR /
135    // CONFIG_DIR / LOG_DIR / CEF_CACHE_DIR / AGENTS_DIR / INSTANCE_
136    // RUNTIME_DIR / SHARED_DIR / RUNTIME_MODE). Replaces the old
137    // AGENTMUX_DATA_HOME / AGENTMUX_DEV / AGENTMUX_CONFIG_HOME /
138    // AGENTMUX_SETTINGS_DIR pre-unification names. srv reads them
139    // via `DataPaths::from_env()` (or the raw var names directly).
140    .envs(paths.common.to_env_vars())
141    .env("AGENTMUX_APP_PATH", &app_path_str)
142    .env("AGENTMUX_SRV_PIPE_PATH", srv_pipe_path)
143    .stdin(Stdio::piped())
144    .stdout(Stdio::piped())
145    .stderr(Stdio::piped())
146    .kill_on_drop(false); // Job Object handles cleanup; tokio's kill-on-drop would force-kill.
147
148    // Windows: spawn suspended so we can assign-to-job before any
149    // child code runs (PR #570 race pattern, now applied to srv).
150    // Without this, srv could open files / sockets before joining
151    // the job — those resources would survive launcher death.
152    #[cfg(target_os = "windows")]
153    {
154        const CREATE_SUSPENDED: u32 = 0x00000004;
155        const CREATE_NO_WINDOW: u32 = 0x08000000;
156        cmd.creation_flags(CREATE_SUSPENDED | CREATE_NO_WINDOW);
157    }
158
159    let mut child = cmd
160        .spawn()
161        .map_err(|e| SrvSpawnError::SpawnFailed(e.to_string()))?;
162    let pid = child
163        .id()
164        .ok_or_else(|| SrvSpawnError::SpawnFailed("child has no PID".to_string()))?;
165
166    // Windows: assign srv to launcher's job, then resume.
167    // Skip job assignment if launcher's job creation failed (J0 is
168    // null) — that's the degraded mode logged by main.rs. Srv still
169    // runs but won't be reaped on launcher death.
170    //
171    // Both error paths must explicitly start_kill the suspended
172    // child before returning. We set kill_on_drop(false) (J0 normally
173    // handles cleanup), so dropping the Child wouldn't terminate the
174    // suspended srv — it would orphan as a permanent zombie holding
175    // resources and the data dir lockfile, blocking subsequent
176    // launches. (codex P1 @ srv_spawner.rs:161, PR #571 round-3.)
177    #[cfg(target_os = "windows")]
178    {
179        if !job_handle.is_null() {
180            if let Err(e) = assign_pid_to_job(pid, job_handle) {
181                let _ = child.start_kill();
182                return Err(SrvSpawnError::JobAssignFailed(e));
183            }
184        }
185        if let Err(e) = crate::resume_main_thread(pid) {
186            let _ = child.start_kill();
187            return Err(SrvSpawnError::ResumeFailed(e));
188        }
189    }
190
191    let started_at = chrono::Utc::now().to_rfc3339();
192
193    // Forward srv stdout to our log (info level; the launcher's log
194    // file is the same one srv-logs end up in once we wire log
195    // forwarding properly).
196    if let Some(stdout) = child.stdout.take() {
197        let pid_for_log = pid;
198        tokio::spawn(async move {
199            let mut reader = BufReader::new(stdout).lines();
200            while let Ok(Some(line)) = reader.next_line().await {
201                crate::log(&format!("[srv {} stdout] {}", pid_for_log, line));
202            }
203        });
204    }
205
206    // Parse stderr for AGENTMUXSRV-ESTART (the readiness signal). srv
207    // writes other diagnostic lines too (AGENTMUXSRV-EVENT:..., plain
208    // text); for B.1 we just log them. Phase B sub-PR B.2 will
209    // forward AGENTMUXSRV-EVENT messages to subscribers via the IPC
210    // event stream.
211    let stderr = child
212        .stderr
213        .take()
214        .ok_or_else(|| SrvSpawnError::SpawnFailed("no stderr handle".to_string()))?;
215    let (tx, mut rx) = mpsc::channel::<SrvSpawnResult>(1);
216    let auth_key_for_estart = auth_key.clone();
217    let started_at_for_estart = started_at.clone();
218    let pid_for_log = pid;
219    tokio::spawn(async move {
220        let mut reader = BufReader::new(stderr).lines();
221        let mut estart_sent = false;
222        while let Ok(Some(line)) = reader.next_line().await {
223            if !estart_sent && line.starts_with("AGENTMUXSRV-ESTART") {
224                let parsed = parse_estart(&line);
225                let result = SrvSpawnResult {
226                    pid: pid_for_log,
227                    ws_endpoint: parsed.ws_endpoint,
228                    web_endpoint: parsed.web_endpoint,
229                    instance_id: parsed.instance_id,
230                    auth_key: auth_key_for_estart.clone(),
231                    started_at: started_at_for_estart.clone(),
232                };
233                crate::log(&format!(
234                    "srv {} ready: ws={} web={} instance={}",
235                    result.pid, result.ws_endpoint, result.web_endpoint, result.instance_id
236                ));
237                let _ = tx.send(result).await;
238                estart_sent = true;
239            } else if line.starts_with("AGENTMUXSRV-EVENT:") {
240                crate::log(&format!("[srv {} event] {}", pid_for_log, line));
241                // Phase B.2 will forward these to subscribers.
242            } else {
243                crate::log(&format!("[srv {} stderr] {}", pid_for_log, line));
244            }
245        }
246        // EOF on stderr → srv exited (or its stderr closed). Logged
247        // by the wait-task in main; nothing else to do here.
248    });
249
250    // Wait for ESTART. Both error paths must explicitly start_kill
251    // the child before returning — same kill_on_drop(false) leak
252    // class as the assign/resume failures above. Without this, the
253    // 30s timeout in degraded mode (J0 absent) would leak a fully-
254    // running srv that keeps the data dir lockfile, blocking the
255    // next launch. (codex P2 @ srv_spawner.rs:240, PR #571 round-4.)
256    let recv = tokio::time::timeout(std::time::Duration::from_secs(30), rx.recv()).await;
257    match recv {
258        Err(_) => {
259            let _ = child.start_kill();
260            Err(SrvSpawnError::EstartTimeout)
261        }
262        Ok(None) => {
263            let _ = child.start_kill();
264            Err(SrvSpawnError::EstartChannelClosed)
265        }
266        Ok(Some(result)) => Ok((result, child)),
267    }
268}
269
270/// Resolve the agentmux-srv binary path from the LAUNCHER's vantage
271/// point.
272///
273/// Search order, mirroring the host's `resolve_backend_binary`
274/// (sidecar.rs:318-402) but anchored at the launcher's exe dir:
275///   1. `<launcher_dir>/runtime/agentmux-srv-{ver}-{os}.{arch}.exe`
276///      (versioned portable layout)
277///   2. `<launcher_dir>/runtime/agentmux-srv.exe` (dev fallback)
278///   3. `<launcher_dir>/agentmux-srv-{ver}-{os}.{arch}.exe`
279///      (launcher in same dir as srv — should not happen in portable
280///      but covers cargo-built dev mode where launcher + srv both
281///      land in target/release/)
282///   4. `<launcher_dir>/agentmux-srv.exe` (dev fallback)
283fn resolve_srv_binary(launcher_exe_dir: &Path) -> Result<PathBuf, SrvSpawnError> {
284    let backend_name = "agentmux-srv";
285    let exe_suffix = if cfg!(target_os = "windows") { ".exe" } else { "" };
286    let version = env!("CARGO_PKG_VERSION");
287    let (os_name, arch) = if cfg!(target_os = "macos") {
288        ("darwin", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
289    } else if cfg!(target_os = "linux") {
290        ("linux", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
291    } else {
292        ("windows", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
293    };
294
295    let candidates = [
296        // Portable: srv lives in launcher_dir/runtime/
297        launcher_exe_dir
298            .join("runtime")
299            .join(format!("{}-{}-{}.{}{}", backend_name, version, os_name, arch, exe_suffix)),
300        launcher_exe_dir
301            .join("runtime")
302            .join(format!("{}{}", backend_name, exe_suffix)),
303        // Dev: launcher and srv side-by-side in target/release/
304        launcher_exe_dir
305            .join(format!("{}-{}-{}.{}{}", backend_name, version, os_name, arch, exe_suffix)),
306        launcher_exe_dir.join(format!("{}{}", backend_name, exe_suffix)),
307    ];
308
309    for p in &candidates {
310        if p.exists() {
311            return Ok(p.clone());
312        }
313    }
314
315    Err(SrvSpawnError::BinaryNotFound(format!(
316        "{} v{} not found. Searched:\n  {}",
317        backend_name,
318        version,
319        candidates
320            .iter()
321            .map(|p| p.display().to_string())
322            .collect::<Vec<_>>()
323            .join("\n  ")
324    )))
325}
326
327/// Parsed fields out of a `AGENTMUXSRV-ESTART` line. Same shape as the
328/// host's `parse_estart` (sidecar.rs:404-420).
329struct EstartFields {
330    ws_endpoint: String,
331    web_endpoint: String,
332    instance_id: String,
333}
334
335fn parse_estart(line: &str) -> EstartFields {
336    let parts: Vec<&str> = line.split_whitespace().collect();
337    let get = |prefix: &str| -> String {
338        parts
339            .iter()
340            .find_map(|p| p.strip_prefix(prefix))
341            .unwrap_or_default()
342            .to_string()
343    };
344    EstartFields {
345        ws_endpoint: get("ws:"),
346        web_endpoint: get("web:"),
347        instance_id: get("instance:"),
348    }
349}
350
351/// Assign a process to the launcher's Job Object J0. Used by
352/// `spawn_srv` for srv and exported for `main.rs` to use for the
353/// host. Separated from job creation because both children join
354/// the SAME job (only one J0 ever exists per launcher run).
355#[cfg(target_os = "windows")]
356pub fn assign_pid_to_job(
357    pid: u32,
358    job: windows_sys::Win32::Foundation::HANDLE,
359) -> Result<(), String> {
360    use windows_sys::Win32::Foundation::CloseHandle;
361    use windows_sys::Win32::System::JobObjects::AssignProcessToJobObject;
362    use windows_sys::Win32::System::Threading::{
363        OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE,
364    };
365    unsafe {
366        let process = OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid);
367        if process.is_null() {
368            return Err(format!("OpenProcess({}) returned null", pid));
369        }
370        let ok = AssignProcessToJobObject(job, process);
371        CloseHandle(process);
372        if ok == 0 {
373            return Err(format!(
374                "AssignProcessToJobObject failed for pid={}",
375                pid
376            ));
377        }
378        Ok(())
379    }
380}