agentmux_launcher/srv_spawner.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Spawn the agentmux-srv backend sidecar from the LAUNCHER (Phase B.1).
5//
6// Today's flow (pre-Phase-B): launcher spawns host; host spawns srv;
7// host owns a Job Object J1 wrapping srv; renderers inherit launcher's
8// J0 via host. Result: when host crashes, J1 closes and srv dies.
9//
10// Phase B.1 flow: launcher spawns BOTH srv and host as siblings,
11// assigns BOTH to launcher's J0 directly. Host's J1 on srv is
12// deleted (it would actively defeat "srv survives host crash" if
13// kept). Renderers continue to inherit J0 via host as before.
14//
15// The launcher passes srv's endpoints to the host via env vars
16// (AGENTMUX_BACKEND_WS, _WEB, _PID). Host detects them and skips
17// its own spawn_backend path (which is preserved for `task dev`
18// fallback where launcher isn't in the loop).
19//
20// Adapted from `agentmux-cef/src/sidecar.rs::spawn_backend` —
21// kept structurally similar so divergence is auditable. Key
22// differences:
23// * Tokio process API (not std::process)
24// * CREATE_SUSPENDED + assign-to-job + ResumeThread (PR #570 race
25// pattern, applied to srv too)
26// * No separate Job Object on srv; launcher's J0 covers it
27// * Auth key generated here, not consumed from a shared AppState
28// * stderr ESTART parsing returns the result via tokio mpsc
29
30use std::path::{Path, PathBuf};
31use std::process::Stdio;
32
33use tokio::io::{AsyncBufReadExt, BufReader};
34use tokio::process::{Child, Command};
35use tokio::sync::mpsc;
36
37use crate::data_dir::DataPaths;
38
39/// What the launcher learns about srv after it signals ready.
40/// Held by the launcher and used to populate env vars the host reads.
41#[derive(Debug, Clone)]
42pub struct SrvSpawnResult {
43 pub pid: u32,
44 pub ws_endpoint: String,
45 pub web_endpoint: String,
46 pub instance_id: String,
47 pub auth_key: String,
48 /// RFC3339 timestamp captured when ESTART arrived. Carried on the
49 /// result for `--diag` / debug observability; not currently
50 /// propagated into env. F.7 cleanup audit: keep with allow + this
51 /// note rather than delete — a future `--diag srv` printer is the
52 /// natural reader.
53 #[allow(dead_code)]
54 pub started_at: String,
55}
56
57/// Errors during srv spawn — granular enough that the launcher can
58/// log the right diagnostic.
59#[derive(Debug)]
60pub enum SrvSpawnError {
61 BinaryNotFound(String),
62 SpawnFailed(String),
63 JobAssignFailed(String),
64 ResumeFailed(String),
65 EstartTimeout,
66 EstartChannelClosed,
67}
68
69impl std::fmt::Display for SrvSpawnError {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::BinaryNotFound(s) => write!(f, "srv binary not found: {}", s),
73 Self::SpawnFailed(s) => write!(f, "spawn failed: {}", s),
74 Self::JobAssignFailed(s) => write!(f, "AssignProcessToJobObject failed: {}", s),
75 Self::ResumeFailed(s) => write!(f, "ResumeThread failed: {}", s),
76 Self::EstartTimeout => write!(f, "timeout waiting for AGENTMUXSRV-ESTART (30s)"),
77 Self::EstartChannelClosed => {
78 write!(f, "ESTART channel closed before srv signalled ready")
79 }
80 }
81 }
82}
83
84/// Spawn srv as a child of the launcher, assigned to launcher's
85/// Job Object J0 so it dies cleanly with the launcher tree.
86///
87/// `launcher_exe_dir` is used to locate the srv binary alongside the
88/// launcher (or in `runtime/` for portable). `paths` carries the
89/// data + config dirs and is propagated to srv via env vars.
90/// `job_handle` is the launcher's Job Object so srv joins the same
91/// kill-on-job-close contract as the host. Returns once srv prints
92/// `AGENTMUXSRV-ESTART` (or the 30s timeout fires).
93///
94/// Caller keeps the returned `Child` alive — drop closes srv's
95/// stdin and srv's existing PPID death-watcher takes over (already
96/// part of agentmux-srv per `SPEC_BACKEND_LIFECYCLE.md`).
97// Phase E.1b — `srv_pipe_path` is the launcher-computed pipe path
98// (same data-dir hash as launcher's own pipe, different leaf name).
99// Passed via `AGENTMUX_SRV_PIPE_PATH` so srv doesn't have to
100// recompute the hash; launcher is the single source of truth.
101pub async fn spawn_srv(
102 launcher_exe_dir: &Path,
103 paths: &DataPaths,
104 srv_pipe_path: &str,
105 #[cfg(target_os = "windows")] job_handle: windows_sys::Win32::Foundation::HANDLE,
106) -> Result<(SrvSpawnResult, Child), SrvSpawnError> {
107 let backend_path = resolve_srv_binary(launcher_exe_dir)?;
108
109 // Generate a fresh auth_key per run (UUID v4 — same as host did).
110 // This is the launcher's responsibility now; host receives it via
111 // AGENTMUX_AUTH_KEY env so srv + host + frontend agree on the key.
112 let auth_key = uuid::Uuid::new_v4().to_string();
113 let version = env!("CARGO_PKG_VERSION");
114 let instance_id = format!("v{}", version);
115
116 // app_path = launcher's exe dir (used by srv for finding bundled
117 // tooling like jq.exe / rg.exe). In portable mode this is the
118 // top of the portable folder; the runtime/tools/bin/ subdir
119 // lives under runtime/, but srv's app_path lookup is currently
120 // exe_dir-based per the host's code.
121 //
122 // For B.1 we keep parity: pass exe_dir of the LAUNCHER. If srv
123 // tooling lookup breaks, follow up — log it loudly.
124 let app_path_str = launcher_exe_dir.to_string_lossy().to_string();
125
126 let mut cmd = Command::new(&backend_path);
127 cmd.args([
128 "--wavedata",
129 &paths.data_dir.to_string_lossy(),
130 "--instance",
131 &instance_id,
132 ])
133 .env("AGENTMUX_AUTH_KEY", &auth_key)
134 // Canonical AGENTMUX_* env vars (INSTANCE_DIR / DATA_DIR /
135 // CONFIG_DIR / LOG_DIR / CEF_CACHE_DIR / AGENTS_DIR / INSTANCE_
136 // RUNTIME_DIR / SHARED_DIR / RUNTIME_MODE). Replaces the old
137 // AGENTMUX_DATA_HOME / AGENTMUX_DEV / AGENTMUX_CONFIG_HOME /
138 // AGENTMUX_SETTINGS_DIR pre-unification names. srv reads them
139 // via `DataPaths::from_env()` (or the raw var names directly).
140 .envs(paths.common.to_env_vars())
141 .env("AGENTMUX_APP_PATH", &app_path_str)
142 .env("AGENTMUX_SRV_PIPE_PATH", srv_pipe_path)
143 .stdin(Stdio::piped())
144 .stdout(Stdio::piped())
145 .stderr(Stdio::piped())
146 .kill_on_drop(false); // Job Object handles cleanup; tokio's kill-on-drop would force-kill.
147
148 // Windows: spawn suspended so we can assign-to-job before any
149 // child code runs (PR #570 race pattern, now applied to srv).
150 // Without this, srv could open files / sockets before joining
151 // the job — those resources would survive launcher death.
152 #[cfg(target_os = "windows")]
153 {
154 const CREATE_SUSPENDED: u32 = 0x00000004;
155 const CREATE_NO_WINDOW: u32 = 0x08000000;
156 cmd.creation_flags(CREATE_SUSPENDED | CREATE_NO_WINDOW);
157 }
158
159 let mut child = cmd
160 .spawn()
161 .map_err(|e| SrvSpawnError::SpawnFailed(e.to_string()))?;
162 let pid = child
163 .id()
164 .ok_or_else(|| SrvSpawnError::SpawnFailed("child has no PID".to_string()))?;
165
166 // Windows: assign srv to launcher's job, then resume.
167 // Skip job assignment if launcher's job creation failed (J0 is
168 // null) — that's the degraded mode logged by main.rs. Srv still
169 // runs but won't be reaped on launcher death.
170 //
171 // Both error paths must explicitly start_kill the suspended
172 // child before returning. We set kill_on_drop(false) (J0 normally
173 // handles cleanup), so dropping the Child wouldn't terminate the
174 // suspended srv — it would orphan as a permanent zombie holding
175 // resources and the data dir lockfile, blocking subsequent
176 // launches. (codex P1 @ srv_spawner.rs:161, PR #571 round-3.)
177 #[cfg(target_os = "windows")]
178 {
179 if !job_handle.is_null() {
180 if let Err(e) = assign_pid_to_job(pid, job_handle) {
181 let _ = child.start_kill();
182 return Err(SrvSpawnError::JobAssignFailed(e));
183 }
184 }
185 if let Err(e) = crate::resume_main_thread(pid) {
186 let _ = child.start_kill();
187 return Err(SrvSpawnError::ResumeFailed(e));
188 }
189 }
190
191 let started_at = chrono::Utc::now().to_rfc3339();
192
193 // Forward srv stdout to our log (info level; the launcher's log
194 // file is the same one srv-logs end up in once we wire log
195 // forwarding properly).
196 if let Some(stdout) = child.stdout.take() {
197 let pid_for_log = pid;
198 tokio::spawn(async move {
199 let mut reader = BufReader::new(stdout).lines();
200 while let Ok(Some(line)) = reader.next_line().await {
201 crate::log(&format!("[srv {} stdout] {}", pid_for_log, line));
202 }
203 });
204 }
205
206 // Parse stderr for AGENTMUXSRV-ESTART (the readiness signal). srv
207 // writes other diagnostic lines too (AGENTMUXSRV-EVENT:..., plain
208 // text); for B.1 we just log them. Phase B sub-PR B.2 will
209 // forward AGENTMUXSRV-EVENT messages to subscribers via the IPC
210 // event stream.
211 let stderr = child
212 .stderr
213 .take()
214 .ok_or_else(|| SrvSpawnError::SpawnFailed("no stderr handle".to_string()))?;
215 let (tx, mut rx) = mpsc::channel::<SrvSpawnResult>(1);
216 let auth_key_for_estart = auth_key.clone();
217 let started_at_for_estart = started_at.clone();
218 let pid_for_log = pid;
219 tokio::spawn(async move {
220 let mut reader = BufReader::new(stderr).lines();
221 let mut estart_sent = false;
222 while let Ok(Some(line)) = reader.next_line().await {
223 if !estart_sent && line.starts_with("AGENTMUXSRV-ESTART") {
224 let parsed = parse_estart(&line);
225 let result = SrvSpawnResult {
226 pid: pid_for_log,
227 ws_endpoint: parsed.ws_endpoint,
228 web_endpoint: parsed.web_endpoint,
229 instance_id: parsed.instance_id,
230 auth_key: auth_key_for_estart.clone(),
231 started_at: started_at_for_estart.clone(),
232 };
233 crate::log(&format!(
234 "srv {} ready: ws={} web={} instance={}",
235 result.pid, result.ws_endpoint, result.web_endpoint, result.instance_id
236 ));
237 let _ = tx.send(result).await;
238 estart_sent = true;
239 } else if line.starts_with("AGENTMUXSRV-EVENT:") {
240 crate::log(&format!("[srv {} event] {}", pid_for_log, line));
241 // Phase B.2 will forward these to subscribers.
242 } else {
243 crate::log(&format!("[srv {} stderr] {}", pid_for_log, line));
244 }
245 }
246 // EOF on stderr → srv exited (or its stderr closed). Logged
247 // by the wait-task in main; nothing else to do here.
248 });
249
250 // Wait for ESTART. Both error paths must explicitly start_kill
251 // the child before returning — same kill_on_drop(false) leak
252 // class as the assign/resume failures above. Without this, the
253 // 30s timeout in degraded mode (J0 absent) would leak a fully-
254 // running srv that keeps the data dir lockfile, blocking the
255 // next launch. (codex P2 @ srv_spawner.rs:240, PR #571 round-4.)
256 let recv = tokio::time::timeout(std::time::Duration::from_secs(30), rx.recv()).await;
257 match recv {
258 Err(_) => {
259 let _ = child.start_kill();
260 Err(SrvSpawnError::EstartTimeout)
261 }
262 Ok(None) => {
263 let _ = child.start_kill();
264 Err(SrvSpawnError::EstartChannelClosed)
265 }
266 Ok(Some(result)) => Ok((result, child)),
267 }
268}
269
270/// Resolve the agentmux-srv binary path from the LAUNCHER's vantage
271/// point.
272///
273/// Search order, mirroring the host's `resolve_backend_binary`
274/// (sidecar.rs:318-402) but anchored at the launcher's exe dir:
275/// 1. `<launcher_dir>/runtime/agentmux-srv-{ver}-{os}.{arch}.exe`
276/// (versioned portable layout)
277/// 2. `<launcher_dir>/runtime/agentmux-srv.exe` (dev fallback)
278/// 3. `<launcher_dir>/agentmux-srv-{ver}-{os}.{arch}.exe`
279/// (launcher in same dir as srv — should not happen in portable
280/// but covers cargo-built dev mode where launcher + srv both
281/// land in target/release/)
282/// 4. `<launcher_dir>/agentmux-srv.exe` (dev fallback)
283fn resolve_srv_binary(launcher_exe_dir: &Path) -> Result<PathBuf, SrvSpawnError> {
284 let backend_name = "agentmux-srv";
285 let exe_suffix = if cfg!(target_os = "windows") { ".exe" } else { "" };
286 let version = env!("CARGO_PKG_VERSION");
287 let (os_name, arch) = if cfg!(target_os = "macos") {
288 ("darwin", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
289 } else if cfg!(target_os = "linux") {
290 ("linux", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
291 } else {
292 ("windows", if cfg!(target_arch = "aarch64") { "arm64" } else { "x64" })
293 };
294
295 let candidates = [
296 // Portable: srv lives in launcher_dir/runtime/
297 launcher_exe_dir
298 .join("runtime")
299 .join(format!("{}-{}-{}.{}{}", backend_name, version, os_name, arch, exe_suffix)),
300 launcher_exe_dir
301 .join("runtime")
302 .join(format!("{}{}", backend_name, exe_suffix)),
303 // Dev: launcher and srv side-by-side in target/release/
304 launcher_exe_dir
305 .join(format!("{}-{}-{}.{}{}", backend_name, version, os_name, arch, exe_suffix)),
306 launcher_exe_dir.join(format!("{}{}", backend_name, exe_suffix)),
307 ];
308
309 for p in &candidates {
310 if p.exists() {
311 return Ok(p.clone());
312 }
313 }
314
315 Err(SrvSpawnError::BinaryNotFound(format!(
316 "{} v{} not found. Searched:\n {}",
317 backend_name,
318 version,
319 candidates
320 .iter()
321 .map(|p| p.display().to_string())
322 .collect::<Vec<_>>()
323 .join("\n ")
324 )))
325}
326
327/// Parsed fields out of a `AGENTMUXSRV-ESTART` line. Same shape as the
328/// host's `parse_estart` (sidecar.rs:404-420).
329struct EstartFields {
330 ws_endpoint: String,
331 web_endpoint: String,
332 instance_id: String,
333}
334
335fn parse_estart(line: &str) -> EstartFields {
336 let parts: Vec<&str> = line.split_whitespace().collect();
337 let get = |prefix: &str| -> String {
338 parts
339 .iter()
340 .find_map(|p| p.strip_prefix(prefix))
341 .unwrap_or_default()
342 .to_string()
343 };
344 EstartFields {
345 ws_endpoint: get("ws:"),
346 web_endpoint: get("web:"),
347 instance_id: get("instance:"),
348 }
349}
350
351/// Assign a process to the launcher's Job Object J0. Used by
352/// `spawn_srv` for srv and exported for `main.rs` to use for the
353/// host. Separated from job creation because both children join
354/// the SAME job (only one J0 ever exists per launcher run).
355#[cfg(target_os = "windows")]
356pub fn assign_pid_to_job(
357 pid: u32,
358 job: windows_sys::Win32::Foundation::HANDLE,
359) -> Result<(), String> {
360 use windows_sys::Win32::Foundation::CloseHandle;
361 use windows_sys::Win32::System::JobObjects::AssignProcessToJobObject;
362 use windows_sys::Win32::System::Threading::{
363 OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE,
364 };
365 unsafe {
366 let process = OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid);
367 if process.is_null() {
368 return Err(format!("OpenProcess({}) returned null", pid));
369 }
370 let ok = AssignProcessToJobObject(job, process);
371 CloseHandle(process);
372 if ok == 0 {
373 return Err(format!(
374 "AssignProcessToJobObject failed for pid={}",
375 pid
376 ));
377 }
378 Ok(())
379 }
380}