agentmux_launcher/main.rs
1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4// AgentMux Launcher — Sets DLL search path then spawns srv + the CEF host.
5//
6// Phase B.1: launcher now spawns srv directly (sibling of host) so srv
7// survives host crashes. Both children are assigned to the launcher's
8// Job Object J0 with KILL_ON_JOB_CLOSE; killing the launcher reaps
9// the entire tree atomically via the OS.
10//
11// This was previously a tiny sync wrapper that just SetDllDirectoryW'd
12// runtime/ then spawned the CEF host. Phase B grew it into the
13// privileged owner per
14// specs/SPEC_WINDOW_PROCESS_STATE_MACHINE_2026_04_27.md.
15//
16// Process tree after B.1:
17// launcher (J0)
18// ├── srv (assigned to J0; survives host crash)
19// └── host (assigned to J0; CEF render workers inherit J0)
20
21#![cfg_attr(
22 all(not(debug_assertions), target_os = "windows"),
23 windows_subsystem = "windows"
24)]
25
26mod config;
27mod data_dir;
28mod diag;
29mod event_log;
30mod hash;
31mod host_pipe;
32mod ipc;
33mod reducer;
34mod saga;
35#[cfg(target_os = "windows")]
36mod splash;
37mod srv_spawner;
38mod state;
39mod wrr;
40
41/// Suppress the Windows "Application Error" / WER crash dialog so an unhandled
42/// fault terminates the process immediately instead of wedging it behind a
43/// modal. No-op off Windows. Spec:
44/// docs/specs/SPEC_SERVICE_SUPERVISION_AND_RECOVERY_2026_05_20.md.
45#[cfg(target_os = "windows")]
46fn suppress_os_crash_dialogs() {
47 use windows_sys::Win32::System::Diagnostics::Debug::{SetErrorMode, SEM_FAILCRITICALERRORS};
48 use windows_sys::Win32::System::ErrorReporting::{WerSetFlags, WER_FAULT_REPORTING_NO_UI};
49 unsafe {
50 // Suppress the WER crash-dialog UI WITHOUT disabling WER itself —
51 // SEM_NOGPFAULTERRORBOX would also kill WER/LocalDumps crash-dump
52 // collection, the postmortem diagnostics this stability work needs.
53 // WER_FAULT_REPORTING_NO_UI is the documented "no UI, keep
54 // reports" path.
55 let _ = WerSetFlags(WER_FAULT_REPORTING_NO_UI);
56 // SEM_FAILCRITICALERRORS suppresses the critical-error handler
57 // (e.g. "no disk in drive" popups) — unrelated to crash reporting.
58 SetErrorMode(SEM_FAILCRITICALERRORS);
59 }
60}
61
62#[cfg(not(target_os = "windows"))]
63fn suppress_os_crash_dialogs() {}
64
65/// Process entry point. `suppress_os_crash_dialogs()` runs FIRST — before the
66/// Tokio runtime is built. The runtime is built explicitly here (rather than
67/// via `#[tokio::main]`, whose generated wrapper would construct it before any
68/// of our code runs) so a fault during runtime construction can't surface the
69/// Windows crash modal either. Spec:
70/// docs/specs/SPEC_SERVICE_SUPERVISION_AND_RECOVERY_2026_05_20.md.
71fn main() {
72 suppress_os_crash_dialogs();
73 tokio::runtime::Runtime::new()
74 .expect("failed to build Tokio runtime")
75 .block_on(launcher_main());
76}
77
78async fn launcher_main() {
79 let exe_path = std::env::current_exe().expect("cannot resolve exe path");
80 let exe_dir = exe_path.parent().expect("exe has no parent directory");
81 let runtime_dir = exe_dir.join("runtime");
82
83 log(&format!(
84 "starting — exe={} runtime={}",
85 exe_path.display(),
86 runtime_dir.display()
87 ));
88
89 // Set DLL search path so libcef.dll (in runtime/) is found by the
90 // CEF host's load-time linker. SetDllDirectoryW is process-local
91 // and inherited by child processes — both srv (which doesn't
92 // need libcef but harmless) and host (which absolutely does).
93 #[cfg(target_os = "windows")]
94 {
95 use std::os::windows::ffi::OsStrExt;
96 let wide: Vec<u16> = runtime_dir
97 .as_os_str()
98 .encode_wide()
99 .chain(Some(0))
100 .collect();
101 unsafe {
102 windows_sys::Win32::System::LibraryLoader::SetDllDirectoryW(wide.as_ptr());
103 }
104 }
105 log("SetDllDirectoryW done");
106
107 let args: Vec<String> = std::env::args().skip(1).collect();
108
109 // LSD-3 — `agentmux.exe --diag sagas` is OFFLINE: it reads the
110 // launcher saga SQLite log directly, with no IPC and no running
111 // launcher. So it MUST run BEFORE the CEF runtime existence
112 // check below — the offline-diagnostic value is most needed
113 // exactly when the launcher won't start (e.g. corrupt runtime
114 // folder). (codex P1 + reagent P1 PR #647 round 3.)
115 if matches!(
116 (args.first().map(String::as_str), args.get(1).map(String::as_str)),
117 (Some("--diag"), Some("sagas"))
118 ) {
119 match diag::run_sagas_diag(exe_dir).await {
120 Ok(()) => std::process::exit(0),
121 Err(msg) => {
122 eprintln!("--diag sagas failed: {}", msg);
123 std::process::exit(1);
124 }
125 }
126 }
127
128 let real_exe = find_cef_binary(&runtime_dir);
129 log(&format!("resolved CEF binary: {}", real_exe.display()));
130 if !real_exe.exists() {
131 log(&format!(
132 "FATAL: CEF binary not found at {}",
133 real_exe.display()
134 ));
135 eprintln!(
136 "AgentMux runtime not found in: {}\nMake sure the runtime/ folder is intact.",
137 runtime_dir.display()
138 );
139 std::process::exit(1);
140 }
141
142 log(&format!("forwarding {} CLI args to host", args.len()));
143
144 // Phase B.8 — `agentmux.exe --diag wrr` and `--diag srv` Tool
145 // clients. Connect to the running launcher (or srv) over IPC,
146 // capture events for a short window, print summary, exit.
147 // (Note: --diag sagas is handled above, before the CEF runtime
148 // check, since it doesn't need IPC.)
149 if matches!(args.first().map(String::as_str), Some("--diag")) {
150 let topic = args.get(1).map(String::as_str).unwrap_or("");
151 match topic {
152 "wrr" => match diag::run_wrr_diag(exe_dir).await {
153 Ok(()) => std::process::exit(0),
154 Err(msg) => {
155 eprintln!("--diag wrr failed: {}", msg);
156 std::process::exit(1);
157 }
158 },
159 // Phase E.7 — operator visibility into the srv reducer's
160 // canonical state (workspaces / tabs / blocks / sagas) +
161 // recent activity. Same `Tool` IPC pattern as `--diag wrr`,
162 // talks to the srv pipe instead of the launcher pipe.
163 "srv" => match diag::run_srv_diag(exe_dir).await {
164 Ok(()) => std::process::exit(0),
165 Err(msg) => {
166 eprintln!("--diag srv failed: {}", msg);
167 std::process::exit(1);
168 }
169 },
170 // sagas is handled above, before the runtime check.
171 "sagas" => {
172 // Should never reach here — `sagas` is matched + handled
173 // above the CEF runtime check. Kept for completeness.
174 unreachable!("--diag sagas is handled before runtime check");
175 }
176 "" => {
177 eprintln!("usage: agentmux.exe --diag <topic>\nknown topics: wrr, srv, sagas");
178 std::process::exit(2);
179 }
180 other => {
181 eprintln!("unknown --diag topic: {} (known: wrr, srv, sagas)", other);
182 std::process::exit(2);
183 }
184 }
185 }
186
187 #[cfg(target_os = "windows")]
188 {
189 run_windows(exe_dir, &real_exe, &args).await;
190 }
191
192 #[cfg(not(target_os = "windows"))]
193 {
194 // Phase 7 covers cross-platform parity. For now the legacy
195 // exec-into-host path is preserved on macOS/Linux. Phase B.1
196 // is Windows-only.
197 log("exec into CEF host (Unix)");
198 use std::os::unix::process::CommandExt;
199 let err = std::process::Command::new(&real_exe).args(&args).exec();
200 log(&format!("FATAL: exec failed: {}", err));
201 eprintln!("Failed to launch AgentMux: {}", err);
202 std::process::exit(1);
203 }
204}
205
206/// Phase 1 host supervision (spec
207/// `docs/specs/SPEC_SERVICE_SUPERVISION_AND_RECOVERY_2026_05_20.md`): on an
208/// abnormal host exit the launcher relaunches the host, but at most
209/// `HOST_RESTART_BUDGET` times within `HOST_RESTART_WINDOW` — a crash budget
210/// so a deterministic crash cannot spin forever (spec §10-A).
211#[cfg(target_os = "windows")]
212const HOST_RESTART_BUDGET: usize = 3;
213#[cfg(target_os = "windows")]
214const HOST_RESTART_WINDOW: std::time::Duration = std::time::Duration::from_secs(60);
215
216/// Spawn the CEF host suspended, assign it to the launcher's Job Object, and
217/// resume it. Returns the running child, or `None` if any step failed — the
218/// caller decides (fatal on first launch, give-up on a restart). `splash_event`
219/// is passed on every launch — including restarts — so a relaunched host can
220/// still dismiss a splash left pending by a host that crashed pre-first-frame.
221/// `disable_gpu` is the retry ladder's rung-2 degraded mode (spec §7): when set
222/// the host is launched with `--disable-gpu` (software rendering).
223#[cfg(target_os = "windows")]
224fn spawn_host_supervised(
225 real_exe: &std::path::Path,
226 args: &[String],
227 srv: &srv_spawner::SrvSpawnResult,
228 host_env: &[(&'static str, std::ffi::OsString)],
229 pipe_path: &str,
230 job_present: bool,
231 job_handle: windows_sys::Win32::Foundation::HANDLE,
232 splash_event: Option<&str>,
233 disable_gpu: bool,
234) -> Option<tokio::process::Child> {
235 use windows_sys::Win32::System::Threading::CREATE_SUSPENDED;
236
237 let mut host_cmd = tokio::process::Command::new(real_exe);
238 host_cmd
239 .args(args)
240 .env("AGENTMUX_BACKEND_WS", &srv.ws_endpoint)
241 .env("AGENTMUX_BACKEND_WEB", &srv.web_endpoint)
242 .env("AGENTMUX_BACKEND_PID", srv.pid.to_string())
243 .env("AGENTMUX_AUTH_KEY", &srv.auth_key)
244 .env("AGENTMUX_INSTANCE_ID", &srv.instance_id)
245 .envs(host_env.iter().cloned())
246 .env("AGENTMUX_LAUNCHER_PIPE", pipe_path)
247 .creation_flags(CREATE_SUSPENDED)
248 .kill_on_drop(false); // J0 handles cleanup.
249 if let Some(name) = splash_event {
250 host_cmd.env("AGENTMUX_SPLASH_EVENT", name);
251 }
252 // Retry-ladder rung 2 (spec §7): software rendering — no GPU process to
253 // crash. A Chromium switch the host forwards to CEF.
254 if disable_gpu {
255 host_cmd.arg("--disable-gpu");
256 }
257
258 let mut host_child = match host_cmd.spawn() {
259 Ok(c) => c,
260 Err(e) => {
261 log(&format!("failed to spawn CEF host: {}", e));
262 return None;
263 }
264 };
265 let host_pid = host_child.id().unwrap_or(0);
266 log(&format!("spawned CEF host pid={} (suspended)", host_pid));
267
268 // Assign to J0 BEFORE resuming so CEF render children inherit the job.
269 if job_present && host_pid != 0 {
270 match srv_spawner::assign_pid_to_job(host_pid, job_handle) {
271 Ok(()) => log(&format!(
272 "Job Object assigned to host pid={}, KILL_ON_JOB_CLOSE active",
273 host_pid
274 )),
275 Err(e) => log(&format!(
276 "WARN: AssignProcessToJobObject(host) failed: {} — host children may escape job",
277 e
278 )),
279 }
280 }
281
282 // Resume the suspended main thread.
283 if let Err(e) = resume_main_thread(host_pid) {
284 log(&format!("failed to resume host pid={}: {}", host_pid, e));
285 let _ = host_child.start_kill();
286 return None;
287 }
288 Some(host_child)
289}
290
291/// Windows main flow: resolve paths → create J0 → spawn srv → spawn
292/// host with srv endpoints in env → supervised wait → cleanup.
293#[cfg(target_os = "windows")]
294async fn run_windows(
295 launcher_exe_dir: &std::path::Path,
296 real_exe: &std::path::Path,
297 args: &[String],
298) {
299
300 let version = env!("CARGO_PKG_VERSION");
301
302 // 1. Resolve data_dir / config_dir / user_home_dir. Both srv and
303 // host receive these via env so they don't recompute (and so they
304 // can't drift). Host's existing data_dir computation in sidecar.rs
305 // still runs as a fallback for `task dev` mode where the launcher
306 // is not in the loop.
307 let paths = match data_dir::resolve_paths(launcher_exe_dir, version) {
308 Ok(p) => p,
309 Err(e) => {
310 log(&format!("FATAL: path resolution failed: {}", e));
311 eprintln!("Failed to resolve AgentMux data directories: {}", e);
312 std::process::exit(1);
313 }
314 };
315 log(&format!(
316 "paths resolved: data={} config={} user_home={} portable={}",
317 paths.data_dir.display(),
318 paths.config_dir.display(),
319 paths.user_home_dir.display(),
320 paths.portable_root.is_some(),
321 ));
322 if let Err(e) = data_dir::ensure_dirs(&paths) {
323 log(&format!("FATAL: failed to create data dirs: {}", e));
324 eprintln!("{}", e);
325 std::process::exit(1);
326 }
327
328 // 2. Create the launcher's Job Object J0 BEFORE any spawn. Both
329 // srv and host will be assigned to it (so they're siblings under
330 // a single OS-enforced cleanup contract). Failure here drops us
331 // into "degraded mode" — children spawn but won't be reaped on
332 // launcher death.
333 let job: Option<JobHandle> = match create_job_object() {
334 Ok(handle) => {
335 log("Job Object created (KILL_ON_JOB_CLOSE active)");
336 Some(JobHandle(handle))
337 }
338 Err(e) => {
339 log(&format!(
340 "WARN: Job Object setup failed: {} (process-tree cleanup degraded)",
341 e
342 ));
343 None
344 }
345 };
346 let job_handle: windows_sys::Win32::Foundation::HANDLE =
347 job.as_ref().map(|j| j.0).unwrap_or(std::ptr::null_mut());
348
349 // Phase B.2: start the named-pipe IPC server BEFORE spawning
350 // any children. Host connects to this pipe at startup using the
351 // AGENTMUX_LAUNCHER_PIPE env var the launcher passes below.
352 //
353 // The server runs in its own Tokio task; the JoinHandle is held
354 // for the rest of run_windows so the task isn't cancelled mid-
355 // accept. Server owns the namespace `\\.\pipe\agentmux-{hash}\
356 // command` per data dir, so multi-instance launchers (different
357 // data dirs) get distinct pipes.
358 //
359 // Phase B.6: the bind itself is the single-instance signal.
360 // `bind_first_pipe_instance` synchronously reserves the pipe;
361 // a second launcher pointing at the same data dir gets
362 // ERROR_ACCESS_DENIED. We surface that to the user as
363 // "AgentMux is already running for this data directory" and
364 // exit cleanly BEFORE spawning srv/host (otherwise the second
365 // host would briefly contend on the CEF cache lockfile).
366 let dir_hash = hash::data_dir_hash16(&paths.data_dir);
367 let pipe_path = ipc::pipe_name(&dir_hash);
368 let first_pipe = match ipc::server::bind_first_pipe_instance(&pipe_path) {
369 Ok(p) => p,
370 Err(e) => {
371 // ERROR_ACCESS_DENIED (5) means another launcher already
372 // owns this pipe — i.e., another AgentMux is running for
373 // this data dir. The user-facing behavior matches the
374 // status-bar version popup's "new window": forward an
375 // `open_new_window` IPC POST to the existing host and
376 // exit 0. The named-pipe bind is the AUTHORITATIVE
377 // single-instance signal; this HTTP call is just the
378 // forwarding hint. Other errors (namespace misconfig,
379 // security descriptor failure) genuinely fail — show
380 // the dialog and exit 2.
381 const ERROR_ACCESS_DENIED: i32 = 5;
382 let already_running = e.raw_os_error() == Some(ERROR_ACCESS_DENIED);
383 log(&format!(
384 "pipe bind failed (already_running={}): {} pipe={}",
385 already_running, e, pipe_path
386 ));
387 if already_running {
388 match forward_open_new_window(&paths.data_dir) {
389 Ok(()) => {
390 log("forwarded open_new_window to existing instance — exiting 0");
391 std::process::exit(0);
392 }
393 Err(ForwardError::Transient(reason)) => {
394 // Transient race: the host is alive (pipe is
395 // held by the first launcher) but its
396 // forwarding hint isn't readable yet —
397 // typically because the host is mid-CEF-init
398 // and hasn't written `<data-dir>/ipc-port`
399 // yet. Silent exit so the user isn't punished
400 // for double-clicking quickly.
401 log(&format!("forward transient: {} — exiting 0 silently", reason));
402 std::process::exit(0);
403 }
404 Err(ForwardError::Fatal(reason)) => {
405 // Fatal forward failure: the port file IS
406 // readable, so the host got far enough to
407 // publish it, but the HTTP path is dead
408 // (connect refused, write failed). Could be
409 // a hung host, a port collision, or
410 // ERROR_ACCESS_DENIED that wasn't really
411 // "another instance" (namespace conflict).
412 // Surface the dialog so the user sees that
413 // something is genuinely broken rather than
414 // a silent no-op. (codex P2 PR #598.)
415 log(&format!("forward fatal: {} — surfacing dialog", reason));
416 show_fatal_dialog(
417 "AgentMux",
418 &format!(
419 "AgentMux appears to already be running but isn't responding.\n\nData dir: {}\nReason: {}\n\nClose any leftover AgentMux processes and try again. If the problem persists, check the launcher log.",
420 paths.data_dir.display(),
421 reason
422 ),
423 );
424 std::process::exit(2);
425 }
426 }
427 }
428 // Genuine bind failure (not "already running"). Surface
429 // it loudly because it indicates a system-level problem.
430 show_fatal_dialog(
431 "AgentMux",
432 &format!(
433 "AgentMux failed to start: could not bind IPC pipe.\n\nPipe: {}\nError: {}\n\nIf the problem persists, check the launcher log.",
434 pipe_path, e
435 ),
436 );
437 std::process::exit(2);
438 }
439 };
440 // Spawn the native pre-splash immediately after claiming the
441 // single-instance pipe — before srv spawn and CEF init.
442 // The event name is forwarded to the CEF host as
443 // AGENTMUX_SPLASH_EVENT so it can signal dismiss from on_load_end.
444 #[cfg(target_os = "windows")]
445 let splash_event_name = splash::spawn_splash(&dir_hash);
446 #[cfg(not(target_os = "windows"))]
447 let splash_event_name: Option<String> = None;
448
449 // Phase B.8 — broadcast bus for reducer-emitted events. Capacity
450 // 1024 is comfortable headroom for the launcher's event volume
451 // (~10–50 events per user action × handful of subscribers); a
452 // lagging client gets `RecvError::Lagged` and reconnects.
453 let (events_tx, _) = tokio::sync::broadcast::channel::<agentmux_common::ipc::Event>(1024);
454
455 // Phase D.2 — event log: in-memory ring (replay source for D.3's
456 // GetEvents) + optional disk persistence at
457 // `<data-dir>/launcher-events.log` for crash forensics.
458 let log_disk_path = paths.data_dir.join("launcher-events.log");
459 let event_log = std::sync::Arc::new(event_log::EventLog::new(Some(log_disk_path)));
460 let event_log_for_writer = std::sync::Arc::clone(&event_log);
461 let disk_writer_rx = events_tx.subscribe();
462 tokio::spawn(event_log::run_disk_writer(event_log_for_writer, disk_writer_rx));
463
464 // Phase E.1a — canonical state shared between IPC server + saga
465 // coordinator (and, in E.5, individual sagas). Single Mutex
466 // owner, multiple readers via Arc.
467 let state = std::sync::Arc::new(tokio::sync::Mutex::new(state::State::default()));
468
469 // LSD-2 — open the durable launcher saga log at
470 // `<data-dir>/db/launcher-sagas.db` (separate file from
471 // `launcher-events.log`; the saga log is structured SQLite, the
472 // event log is append-only JSONL). Failure to open is a launcher
473 // startup error — without the log, sagas have no crash-recovery
474 // story (LSD-3 walks `unresolved_sagas` to mark interrupted
475 // sagas `failed_compensation`). Spec
476 // `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §3.1.
477 //
478 // `launcher_saga_log_path` performs the back-compat move from
479 // the pre-AUDIT_SQLITE_SYSTEMS_2026_05_19.md location
480 // (`<data-dir>/launcher-sagas.db` — outside `db/`) into the
481 // canonical `db/` subdir alongside srv's SQLite files.
482 let saga_log_path = data_dir::launcher_saga_log_path(&paths.data_dir);
483 let saga_log = match saga::LauncherSagaLog::open(&saga_log_path) {
484 Ok(l) => std::sync::Arc::new(l),
485 Err(e) => {
486 log(&format!(
487 "FATAL: failed to open launcher saga log at {:?}: {}",
488 saga_log_path, e
489 ));
490 std::process::exit(2);
491 }
492 };
493
494 // LSD-3 — startup recovery walker. Walks the durable saga log,
495 // marks any saga still in `running` / `compensating` / `failed`
496 // (left over from a crashed prior run) as `failed_compensation`
497 // so operators see them in `--diag sagas` and the next coordinator
498 // run can't accidentally double-act on partially-applied effects.
499 // MUST run BEFORE `tokio::spawn(saga::run_coordinator(..))` below
500 // (LSD spec §5 risk #5: don't spawn while recovery is in progress).
501 // Runs BEFORE LSD-4 vacuum so just-recovered sagas land in their
502 // failed_compensation state for the operator to see — vacuum
503 // honors the 7-day retention window and won't immediately purge.
504 // Spec `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §3.5.
505 if let Err(e) = saga::compensate_unresolved_launcher_sagas(&saga_log).await {
506 log(&format!(
507 "[saga-recovery] WARN: walker failed: {} — coordinator will still spawn; prior crashed sagas remain unresolved until next restart",
508 e
509 ));
510 }
511
512 // LSD-4 — startup retention vacuum. Runs once per launcher boot,
513 // before the coordinator subscribes, so any rows it deletes are
514 // already terminal and can't possibly belong to an in-flight saga
515 // the coordinator is about to drive (see `vacuum_older_than` SQL —
516 // `running` and `compensating` rows are unreachable by the DELETE
517 // regardless of timing). Failure is non-fatal.
518 // Spec §3.6.
519 let retention_days =
520 config::load_saga_retention_days(&paths.user_home_dir, |w| log(w));
521 let cutoff = chrono::Utc::now() - chrono::Duration::days(retention_days);
522 match saga_log.vacuum_older_than(cutoff) {
523 Ok(removed) => log(&format!(
524 "[saga-log] vacuumed {} sagas older than {} (retention {} days)",
525 removed, cutoff, retention_days
526 )),
527 Err(e) => log(&format!("[saga-log] WARN: vacuum failed: {}", e)),
528 }
529
530 // CPD-2 — launcher → host pipe wrapper. Owns the writer half of
531 // the host's IPC connection (installed by the per-connection
532 // handler in `ipc::server` once the host registers) and exposes
533 // `send_command` / `send_event` to the rest of the launcher.
534 // CPD-2 wires the wrapper + refactors event fanout for the host
535 // connection to flow through here. CPD-3 wires this into the
536 // saga coordinator's `apply_action` so `IssueCmd::Host` actions
537 // dispatch live (no longer log-only).
538 let host_pipe = std::sync::Arc::new(host_pipe::HostPipe::new(
539 events_tx.clone(),
540 std::sync::Arc::clone(&state),
541 ));
542
543 // Phase E.1a — saga coordinator task. Subscribes to the broadcast
544 // bus, drives in-flight sagas. E.1a registry is empty — framework
545 // only. E.5 adds the first concrete saga consumer (tear-off).
546 // LSD-2 — durable saga log is now installed; every lifecycle
547 // transition is persisted.
548 // CPD-3 — install `host_pipe` so saga `IssueCmd::Host` actions
549 // dispatch through the launcher → host wire instead of being
550 // log-only.
551 //
552 // Subscribe BEFORE spawning so the race window between construction
553 // and first `recv()` doesn't drop early events. (reagent P2 PR #609.)
554 // Same pattern as the disk writer above.
555 // with_log() can fail if max_saga_id() fails (e.g. corrupted SQLite
556 // file). Treat as fatal — continuing with a default next_saga_id=1
557 // while the log is attached would let the coordinator silently
558 // mutate prior saga history on restart. Better to crash loudly so
559 // operators see + investigate. (codex P1 PR #645 round 2.)
560 let saga_coord_inner = saga::SagaCoordinator::new(events_tx.clone(), std::sync::Arc::clone(&state))
561 .with_log(std::sync::Arc::clone(&saga_log))
562 .unwrap_or_else(|e| {
563 log(&format!(
564 "[main] FATAL: failed to seed saga_id allocator from launcher_saga.max(saga_id): {} — refusing to start with degraded coordinator",
565 e
566 ));
567 std::process::exit(1);
568 })
569 .with_host_pipe(std::sync::Arc::clone(&host_pipe));
570 let saga_coord = std::sync::Arc::new(saga_coord_inner);
571 let saga_rx = events_tx.subscribe();
572 tokio::spawn(saga::run_coordinator(
573 std::sync::Arc::clone(&saga_coord),
574 saga_rx,
575 ));
576
577 let _ipc_handle = ipc::run_ipc_server(
578 pipe_path.clone(),
579 first_pipe,
580 ipc::server::ServerCtx {
581 launcher_pid: std::process::id(),
582 launcher_version: env!("CARGO_PKG_VERSION").to_string(),
583 state,
584 events_tx,
585 event_log,
586 host_pipe: std::sync::Arc::clone(&host_pipe),
587 },
588 );
589 log(&format!("IPC server started on {}", pipe_path));
590
591 // 3. Spawn srv first. Host needs srv's endpoints to skip its own
592 // spawn_backend path. Srv signals readiness via AGENTMUXSRV-ESTART on
593 // stderr; the spawner returns once we see that line (or after a
594 // 30s timeout).
595 // Phase E.1b — pre-compute srv's pipe path (same data-dir hash
596 // as launcher's pipe) and pass via env so srv binds it on
597 // startup. Launcher is the sole authority for the data-dir hash.
598 let srv_pipe_path = ipc::srv_pipe_name(&dir_hash);
599 log(&format!("[ipc] srv pipe path = {}", srv_pipe_path));
600
601 let (srv_result, mut srv_child) = match srv_spawner::spawn_srv(
602 launcher_exe_dir,
603 &paths,
604 &srv_pipe_path,
605 job_handle,
606 )
607 .await
608 {
609 Ok(pair) => pair,
610 Err(e) => {
611 log(&format!("FATAL: srv spawn failed: {}", e));
612 eprintln!("Failed to start backend: {}", e);
613 drop(job);
614 std::process::exit(1);
615 }
616 };
617
618 // CRITICAL: tokio::process::Child::wait() proactively drops
619 // self.stdin before waiting (tokio source comment: "Ensure stdin
620 // is closed so the child can't read from it any more"). agentmux-
621 // srv has a parent-watch loop on its own stdin — when stdin reads
622 // 0 bytes (EOF from a closed write end), it interprets that as
623 // "parent died" and shuts itself down. tokio's wait() would
624 // trigger that within milliseconds, causing srv to exit before
625 // the host even mounts its first browser. Move srv's stdin out
626 // of the Child into a launcher-scope binding so tokio can't see
627 // it (its take() returns None) and the pipe stays open for the
628 // launcher's lifetime. (Smoke test on v0.33.447 caught this.)
629 let _srv_stdin_keepalive = srv_child.stdin.take();
630
631 // 4-6. Spawn the host (suspended) → assign to J0 → resume, via
632 // spawn_host_supervised(). The splash event is passed on every launch
633 // (including restarts) so a relaunched host still dismisses a pending
634 // splash if the first host crashed before its first frame.
635 let host_env = paths.common.to_env_vars();
636 let mut host_child = match spawn_host_supervised(
637 real_exe,
638 args,
639 &srv_result,
640 &host_env,
641 &pipe_path,
642 job.is_some(),
643 job_handle,
644 splash_event_name.as_deref(),
645 false,
646 ) {
647 Some(c) => c,
648 None => {
649 // First-launch failure is fatal. Happy path: drop(job) →
650 // KILL_ON_JOB_CLOSE reaps srv. Degraded path (J0 absent):
651 // kill srv explicitly or it orphans (kill_on_drop is false).
652 log("FATAL: could not start CEF host — terminating");
653 eprintln!("Failed to launch AgentMux.");
654 if job.is_none() {
655 let _ = srv_child.start_kill();
656 }
657 drop(job);
658 std::process::exit(1);
659 }
660 };
661
662 // 7. Supervised wait loop (Phase 1 — host supervision). The host is
663 // auto-restarted on abnormal exit, bounded by a crash budget so a
664 // deterministic crash can't spin forever (spec §10-A). A clean host
665 // exit (code 0) ends the loop. srv is NOT yet supervised — an srv
666 // exit still terminates the launcher; srv supervision is Phase 2.
667 //
668 // We don't manually kill the surviving child in the happy path:
669 // dropping `job` below triggers KILL_ON_JOB_CLOSE which reaps the
670 // entire J0 membership. The explicit start_kill is the backstop for
671 // degraded mode (job == None) only.
672 log("entering supervised host + srv wait");
673 let mut host_restarts: Vec<std::time::Instant> = Vec::new();
674 let mut last_abnormal_code: Option<i32> = None;
675 let mut host_degraded = false;
676 let exit_code = loop {
677 tokio::select! {
678 host_status = host_child.wait() => {
679 let code = match host_status {
680 Ok(s) => s.code().unwrap_or(1),
681 Err(e) => {
682 log(&format!("FATAL: host wait failed: {}", e));
683 break 1;
684 }
685 };
686 if code == 0 {
687 log("CEF host exited cleanly (code 0) — shutting down");
688 break 0;
689 }
690 // Abnormal exit — relaunch within the crash budget.
691 let now = std::time::Instant::now();
692 host_restarts.retain(|t| now.duration_since(*t) < HOST_RESTART_WINDOW);
693 if host_restarts.len() >= HOST_RESTART_BUDGET {
694 log(&format!(
695 "CEF host exited abnormally (code {}); restart budget exhausted \
696 ({} in {}s) — giving up",
697 code,
698 host_restarts.len(),
699 HOST_RESTART_WINDOW.as_secs()
700 ));
701 break code;
702 }
703 host_restarts.push(now);
704 // Crash classification + retry ladder (spec §7): a crash that
705 // reproduces the previous abnormal exit code is deterministic —
706 // step down to a degraded (--disable-gpu) relaunch so the retry
707 // isn't "the same thing again". Degraded is sticky; the ladder
708 // only steps down.
709 if last_abnormal_code == Some(code) {
710 host_degraded = true;
711 }
712 last_abnormal_code = Some(code);
713 log(&format!(
714 "CEF host exited abnormally (code {}) — relaunching (restart {}/{}{})",
715 code,
716 host_restarts.len(),
717 HOST_RESTART_BUDGET,
718 if host_degraded { ", degraded: --disable-gpu" } else { "" }
719 ));
720 match spawn_host_supervised(
721 real_exe,
722 args,
723 &srv_result,
724 &host_env,
725 &pipe_path,
726 job.is_some(),
727 job_handle,
728 splash_event_name.as_deref(),
729 host_degraded,
730 ) {
731 Some(c) => host_child = c,
732 None => {
733 log("host relaunch failed to spawn — giving up");
734 break code;
735 }
736 }
737 }
738 srv_status = srv_child.wait() => {
739 match srv_status {
740 Ok(s) => log(&format!(
741 "srv exited UNEXPECTEDLY (host still running) with code {} — terminating launcher",
742 s.code().unwrap_or(1)
743 )),
744 Err(e) => log(&format!("FATAL: srv wait failed: {}", e)),
745 }
746 break 1;
747 }
748 }
749 };
750
751 // 8. Cleanup. Happy path: drop(job) → KILL_ON_JOB_CLOSE reaps
752 // the surviving child + CEF renderers. Degraded path (job is
753 // None): explicit start_kill on both — neither will be reaped
754 // by the OS, so we have to terminate them ourselves to avoid
755 // orphans. (gemini PR #570 round-1 MEDIUM L105 / round-2 P1
756 // backstop pattern.)
757 if job.is_none() {
758 log("WARN: J0 absent — explicitly killing surviving children");
759 let _ = host_child.start_kill();
760 let _ = srv_child.start_kill();
761 }
762 drop(job);
763 log(&format!("launcher exiting with code {}", exit_code));
764 std::process::exit(exit_code);
765}
766
767/// Append a timestamped line to ~/.agentmux/logs/agentmux-launcher.log.
768/// Best-effort — silently no-ops if the log dir doesn't exist yet.
769pub(crate) fn log(msg: &str) {
770 let log_dir = dirs_fallback_home().join(".agentmux").join("logs");
771 let _ = std::fs::create_dir_all(&log_dir);
772 let path = log_dir.join("agentmux-launcher.log");
773 if let Ok(mut f) = std::fs::OpenOptions::new()
774 .create(true)
775 .append(true)
776 .open(&path)
777 {
778 use std::io::Write;
779 let secs = std::time::SystemTime::now()
780 .duration_since(std::time::UNIX_EPOCH)
781 .map(|d| d.as_secs())
782 .unwrap_or(0);
783 let _ = writeln!(f, "[{}] v{} {}", secs, env!("CARGO_PKG_VERSION"), msg);
784 }
785}
786
787/// Home dir without depending on `dirs` for THIS specific lookup.
788/// Kept to avoid a dirs dep cycle from log() — log() is called from
789/// data_dir::resolve_paths via failure paths, and we want it to work
790/// even if `dirs` itself is mid-failure.
791fn dirs_fallback_home() -> std::path::PathBuf {
792 std::env::var("USERPROFILE")
793 .or_else(|_| std::env::var("HOME"))
794 .map(std::path::PathBuf::from)
795 .unwrap_or_else(|_| std::path::PathBuf::from("."))
796}
797
798/// Owns a Windows Job Object handle. CloseHandle on drop. The job's
799/// `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` flag means closing the last handle
800/// terminates every assigned process — which is what we want as a backstop
801/// if this launcher dies abruptly.
802#[cfg(target_os = "windows")]
803struct JobHandle(windows_sys::Win32::Foundation::HANDLE);
804
805#[cfg(target_os = "windows")]
806unsafe impl Send for JobHandle {}
807
808#[cfg(target_os = "windows")]
809impl Drop for JobHandle {
810 fn drop(&mut self) {
811 if !self.0.is_null() {
812 unsafe {
813 windows_sys::Win32::Foundation::CloseHandle(self.0);
814 }
815 }
816 }
817}
818
819/// Create a Job Object J0 with `KILL_ON_JOB_CLOSE`. Caller assigns
820/// processes to it via `srv_spawner::assign_pid_to_job(pid, job)`.
821#[cfg(target_os = "windows")]
822fn create_job_object() -> Result<windows_sys::Win32::Foundation::HANDLE, String> {
823 use windows_sys::Win32::Foundation::CloseHandle;
824 use windows_sys::Win32::System::JobObjects::*;
825
826 unsafe {
827 let job = CreateJobObjectW(std::ptr::null(), std::ptr::null());
828 if job.is_null() {
829 return Err("CreateJobObjectW returned null".into());
830 }
831 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed();
832 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
833 let ok = SetInformationJobObject(
834 job,
835 JobObjectExtendedLimitInformation,
836 &info as *const _ as *const std::ffi::c_void,
837 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
838 );
839 if ok == 0 {
840 CloseHandle(job);
841 return Err("SetInformationJobObject failed".into());
842 }
843 Ok(job)
844 }
845}
846
847/// Resume the (single) main thread of a CREATE_SUSPENDED process.
848///
849/// Walks a Toolhelp32 thread snapshot to find the one thread belonging
850/// to `pid` (a freshly-spawned suspended process has only its main
851/// thread), opens it with THREAD_SUSPEND_RESUME, and ResumeThread's it.
852///
853/// Errors come from snapshot creation, OpenThread, or ResumeThread
854/// returning `(DWORD)-1`. A `ResumeThread` return of 0 means the thread
855/// was already running (impossible if the process was just created
856/// suspended) — treated as success.
857#[cfg(target_os = "windows")]
858pub(crate) fn resume_main_thread(pid: u32) -> Result<(), String> {
859 use windows_sys::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE};
860 use windows_sys::Win32::System::Diagnostics::ToolHelp::{
861 CreateToolhelp32Snapshot, Thread32First, Thread32Next, TH32CS_SNAPTHREAD,
862 THREADENTRY32,
863 };
864 use windows_sys::Win32::System::Threading::{
865 OpenThread, ResumeThread, THREAD_SUSPEND_RESUME,
866 };
867
868 unsafe {
869 let snap = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
870 if snap == INVALID_HANDLE_VALUE {
871 return Err("CreateToolhelp32Snapshot failed".into());
872 }
873
874 let mut entry: THREADENTRY32 = std::mem::zeroed();
875 entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
876
877 let mut found = false;
878 if Thread32First(snap, &mut entry) != 0 {
879 loop {
880 if entry.th32OwnerProcessID == pid {
881 let thread = OpenThread(THREAD_SUSPEND_RESUME, 0, entry.th32ThreadID);
882 if !thread.is_null() {
883 let prev = ResumeThread(thread);
884 CloseHandle(thread);
885 if prev == u32::MAX {
886 CloseHandle(snap);
887 return Err(format!(
888 "ResumeThread failed for tid={}",
889 entry.th32ThreadID
890 ));
891 }
892 found = true;
893 break;
894 }
895 }
896 entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
897 if Thread32Next(snap, &mut entry) == 0 {
898 break;
899 }
900 }
901 }
902
903 CloseHandle(snap);
904 if !found {
905 return Err(format!("no thread found for pid={}", pid));
906 }
907 Ok(())
908 }
909}
910
911/// Phase B.6 (post-fix) — forward an `open_new_window` request to
912/// the already-running host and let this launcher exit 0.
913///
914/// The host writes `<data-dir>/ipc-port` after CEF init as
915/// `port:token`. We open a TCP connection to 127.0.0.1:port, send a
916/// minimal HTTP/1.1 POST to /ipc with the bearer token and a JSON
917/// body, and bail. We deliberately do NOT pull in reqwest: the
918/// launcher binary should stay tiny (~325 KB) and the protocol is
919/// fixed, so a hand-rolled request is the right tool.
920///
921/// Failure classification (codex P2 PR #598):
922/// - `Transient` — port file missing / unreadable / malformed.
923/// The host is alive (pipe held) but mid-startup; caller exits
924/// 0 silently so the user isn't punished for double-clicking
925/// quickly.
926/// - `Fatal` — port file is readable, but the HTTP path failed
927/// (connect refused, write failed, timeout). Either a hung
928/// host or a non-running-instance source of
929/// `ERROR_ACCESS_DENIED` (namespace conflict, security
930/// descriptor failure). Caller surfaces the dialog so the user
931/// sees a real problem rather than a silent no-op.
932enum ForwardError {
933 Transient(String),
934 Fatal(String),
935}
936
937fn forward_open_new_window(data_dir: &std::path::Path) -> Result<(), ForwardError> {
938 let port_file = data_dir.join("ipc-port");
939 let contents = std::fs::read_to_string(&port_file).map_err(|e| {
940 ForwardError::Transient(format!("read {}: {}", port_file.display(), e))
941 })?;
942 let trimmed = contents.trim();
943 let (port_str, token) = trimmed.split_once(':').ok_or_else(|| {
944 ForwardError::Transient(format!(
945 "malformed port file (expected port:token): {}",
946 trimmed
947 ))
948 })?;
949 let port: u16 = port_str
950 .parse()
951 .map_err(|e| ForwardError::Transient(format!("invalid port {:?}: {}", port_str, e)))?;
952
953 // From here on the file was readable: any failure is a fatal
954 // forward (the host got far enough to publish but isn't
955 // serving the IPC port).
956 let addr: std::net::SocketAddr = ([127, 0, 0, 1], port).into();
957 let mut stream = std::net::TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(2))
958 .map_err(|e| ForwardError::Fatal(format!("connect 127.0.0.1:{}: {}", port, e)))?;
959 stream
960 .set_write_timeout(Some(std::time::Duration::from_secs(2)))
961 .ok();
962
963 let body = r#"{"cmd":"open_new_window"}"#;
964 let req = format!(
965 "POST /ipc HTTP/1.1\r\nHost: 127.0.0.1\r\nContent-Type: application/json\r\nAuthorization: Bearer {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
966 token,
967 body.len(),
968 body
969 );
970 use std::io::{Read, Write};
971 stream
972 .write_all(req.as_bytes())
973 .map_err(|e| ForwardError::Fatal(format!("write request: {}", e)))?;
974 // CRITICAL: read at least the status line. The host's axum
975 // handler is async — if the launcher closes the TCP socket
976 // before axum has finished parsing + dispatching to
977 // `open_new_window`, the request can be dropped (smoke caught
978 // exactly this on v0.33.481: the launcher logged "forwarded"
979 // but no second window appeared because the process exited
980 // before axum ran the handler). We don't care about the body
981 // — `Connection: close` lets the server drop the socket once
982 // the response is written, so a single short read is enough
983 // to keep the connection alive past handler dispatch.
984 stream
985 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
986 .ok();
987 let mut sink = [0u8; 64];
988 let _ = stream.read(&mut sink);
989 Ok(())
990}
991
992/// Show a modal error dialog before the launcher exits. Used for
993/// genuine bind failures (NOT the "already running" path — that
994/// silently forwards via `forward_open_new_window`). Without this,
995/// the launcher exit is silent (it has the `windows` subsystem in
996/// release, so eprintln! goes nowhere).
997#[cfg(target_os = "windows")]
998fn show_fatal_dialog(title: &str, body: &str) {
999 use std::os::windows::ffi::OsStrExt;
1000 use windows_sys::Win32::UI::WindowsAndMessaging::{
1001 MessageBoxW, MB_ICONWARNING, MB_OK, MB_SETFOREGROUND, MB_TOPMOST,
1002 };
1003 let title_w: Vec<u16> = std::ffi::OsStr::new(title)
1004 .encode_wide()
1005 .chain(Some(0))
1006 .collect();
1007 let body_w: Vec<u16> = std::ffi::OsStr::new(body)
1008 .encode_wide()
1009 .chain(Some(0))
1010 .collect();
1011 unsafe {
1012 MessageBoxW(
1013 std::ptr::null_mut(),
1014 body_w.as_ptr(),
1015 title_w.as_ptr(),
1016 MB_OK | MB_ICONWARNING | MB_SETFOREGROUND | MB_TOPMOST,
1017 );
1018 }
1019}
1020
1021#[cfg(not(target_os = "windows"))]
1022fn show_fatal_dialog(_title: &str, body: &str) {
1023 eprintln!("{}", body);
1024}
1025
1026/// Find the CEF host binary in the runtime directory.
1027/// Tries versioned name first (agentmux-X.Y.Z.exe), then the old
1028/// agentmux-cef-X.Y.Z.exe pattern for backwards compat, then plain
1029/// agentmux-cef.exe (dev mode).
1030fn find_cef_binary(runtime_dir: &std::path::Path) -> std::path::PathBuf {
1031 let ext = if cfg!(target_os = "windows") { ".exe" } else { "" };
1032
1033 let versioned = format!("agentmux-{}{}", env!("CARGO_PKG_VERSION"), ext);
1034 let versioned_path = runtime_dir.join(&versioned);
1035 if versioned_path.exists() {
1036 return versioned_path;
1037 }
1038
1039 if let Ok(entries) = std::fs::read_dir(runtime_dir) {
1040 let prefix = "agentmux-";
1041 let cef_prefix = "agentmux-cef";
1042 for entry in entries.flatten() {
1043 let name = entry.file_name();
1044 let name = name.to_string_lossy();
1045 if name.starts_with(prefix)
1046 && !name.starts_with(cef_prefix)
1047 && !name.starts_with("agentmux-srv")
1048 && name.ends_with(ext)
1049 {
1050 return entry.path();
1051 }
1052 }
1053 }
1054
1055 let versioned_old = format!("agentmux-cef-{}{}", env!("CARGO_PKG_VERSION"), ext);
1056 let versioned_old_path = runtime_dir.join(&versioned_old);
1057 if versioned_old_path.exists() {
1058 return versioned_old_path;
1059 }
1060
1061 runtime_dir.join(format!("agentmux-cef{}", ext))
1062}