agentmux_srv/
main.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4mod agents;
5mod backend;
6mod config;
7mod event_log;
8mod identity;
9mod persist;
10mod persist_subscriber;
11mod reducer;
12mod registry;
13mod sagas;
14mod server;
15mod srv_ipc;
16mod state;
17mod drone;
18#[cfg(windows)]
19mod crash_monitor;
20
21use std::future::IntoFuture;
22use std::sync::Arc;
23
24use clap::Parser;
25use config::CliArgs;
26use server::{AppState, build_router};
27use tokio::net::TcpListener;
28use tokio::signal;
29
30use backend::eventbus::EventBus;
31use backend::reactive::{self, Poller, PollerConfig};
32use backend::storage::filestore::FileStore;
33use backend::storage::wstore::WaveStore;
34use backend::wps::Broker;
35use backend::wconfig;
36use backend::{docsite, sysinfo, base, wcore};
37
38/// Start a ppid polling watchdog on Linux/macOS.
39/// If the parent process dies, getppid() changes (reparented to init/launchd).
40/// This is safer than PR_SET_PDEATHSIG which tracks the parent *thread*, not process,
41/// and can fire spuriously with async runtimes like Tokio.
42#[cfg(any(target_os = "linux", target_os = "macos"))]
43fn start_ppid_watchdog() {
44    let original_ppid = unsafe { libc::getppid() };
45    std::thread::spawn(move || {
46        loop {
47            std::thread::sleep(std::time::Duration::from_secs(2));
48            let current_ppid = unsafe { libc::getppid() };
49            if current_ppid != original_ppid {
50                eprintln!(
51                    "parent process died (ppid changed {} -> {}), shutting down",
52                    original_ppid, current_ppid
53                );
54                std::process::exit(0);
55            }
56        }
57    });
58}
59
60/// Event-driven parent process watcher using kqueue (macOS) or pidfd (Linux).
61/// Monitors a specific PID and exits when that process terminates.
62/// Falls back to PPID polling on older Linux kernels without pidfd support.
63#[cfg(target_os = "macos")]
64fn start_parent_watcher(parent_pid: u32) {
65    std::thread::spawn(move || {
66        unsafe {
67            let kq = libc::kqueue();
68            if kq < 0 {
69                eprintln!(
70                    "kqueue() failed (errno={}), falling back to ppid watchdog",
71                    *libc::__error()
72                );
73                let _ = kq;
74                start_ppid_watchdog();
75                return;
76            }
77
78            // Register EVFILT_PROC + NOTE_EXIT on the parent PID.
79            let mut changelist: [libc::kevent; 1] = std::mem::zeroed();
80            changelist[0] = libc::kevent {
81                ident: parent_pid as usize,
82                filter: libc::EVFILT_PROC,
83                flags: libc::EV_ADD | libc::EV_ONESHOT,
84                fflags: libc::NOTE_EXIT,
85                data: 0,
86                udata: std::ptr::null_mut(),
87            };
88
89            let ret = libc::kevent(
90                kq,
91                changelist.as_ptr(),
92                1,
93                std::ptr::null_mut(),
94                0,
95                std::ptr::null(),
96            );
97
98            if ret < 0 {
99                let errno = *libc::__error();
100                libc::close(kq);
101                if errno == libc::ESRCH {
102                    // Parent already dead
103                    eprintln!(
104                        "parent process {} already exited (ESRCH during kqueue registration), shutting down",
105                        parent_pid
106                    );
107                    std::process::exit(0);
108                }
109                eprintln!(
110                    "kevent() registration failed (errno={}), falling back to ppid watchdog",
111                    errno
112                );
113                start_ppid_watchdog();
114                return;
115            }
116
117            eprintln!("kqueue EVFILT_PROC registered for parent pid {}", parent_pid);
118
119            // Race condition guard: check if the parent is still alive after registering.
120            // If it died between our registration and this check, we might miss the event.
121            if libc::kill(parent_pid as i32, 0) != 0 && *libc::__error() == libc::ESRCH {
122                libc::close(kq);
123                eprintln!(
124                    "parent process {} already exited (post-registration check), shutting down",
125                    parent_pid
126                );
127                std::process::exit(0);
128            }
129
130            // Block until the parent exits.
131            let mut eventlist: [libc::kevent; 1] = std::mem::zeroed();
132            let n = libc::kevent(
133                kq,
134                std::ptr::null(),
135                0,
136                eventlist.as_mut_ptr(),
137                1,
138                std::ptr::null(),
139            );
140            libc::close(kq);
141
142            if n > 0 {
143                eprintln!(
144                    "parent process {} exited (kqueue EVFILT_PROC), shutting down",
145                    parent_pid
146                );
147            } else {
148                eprintln!(
149                    "kevent() wait returned {} (errno={}), shutting down",
150                    n,
151                    *libc::__error()
152                );
153            }
154            std::process::exit(0);
155        }
156    });
157}
158
159/// Event-driven parent process watcher using pidfd_open (Linux 5.3+).
160/// Falls back to PPID polling on older kernels without pidfd support.
161#[cfg(target_os = "linux")]
162fn start_parent_watcher(parent_pid: u32) {
163    std::thread::spawn(move || {
164        unsafe {
165            // Try pidfd_open (syscall 434 on x86_64, 434 on aarch64)
166            let pidfd = libc::syscall(libc::SYS_pidfd_open, parent_pid as libc::c_int, 0 as libc::c_int);
167
168            if pidfd < 0 {
169                let errno = *libc::__errno_location();
170                if errno == libc::ESRCH {
171                    // Parent already dead
172                    eprintln!(
173                        "parent process {} already exited (ESRCH from pidfd_open), shutting down",
174                        parent_pid
175                    );
176                    std::process::exit(0);
177                }
178                // ENOSYS means kernel doesn't support pidfd_open — fall back
179                eprintln!(
180                    "pidfd_open() failed (errno={}), falling back to ppid watchdog",
181                    errno
182                );
183                start_ppid_watchdog();
184                return;
185            }
186
187            let pidfd = pidfd as libc::c_int;
188
189            // Race condition guard: verify parent is still alive
190            if libc::kill(parent_pid as i32, 0) != 0 && *libc::__errno_location() == libc::ESRCH {
191                libc::close(pidfd);
192                eprintln!(
193                    "parent process {} already exited (post-pidfd check), shutting down",
194                    parent_pid
195                );
196                std::process::exit(0);
197            }
198
199            // poll() on the pidfd — blocks until the process exits
200            let mut pfd = libc::pollfd {
201                fd: pidfd,
202                events: libc::POLLIN,
203                revents: 0,
204            };
205
206            let ret = libc::poll(&mut pfd, 1, -1); // infinite timeout
207            libc::close(pidfd);
208
209            if ret > 0 {
210                eprintln!(
211                    "parent process {} exited (pidfd poll), shutting down",
212                    parent_pid
213                );
214            } else {
215                eprintln!(
216                    "poll() on pidfd returned {} (errno={}), shutting down",
217                    ret,
218                    *libc::__errno_location()
219                );
220            }
221            std::process::exit(0);
222        }
223    });
224}
225
226#[tokio::main]
227async fn main() {
228    // -1. Crash monitor branch — must be checked before any other initialization.
229    //     The monitor process runs a blocking minidumper::Server and exits when the
230    //     main process disconnects. It does not run any backend logic.
231    #[cfg(windows)]
232    if std::env::args().any(|a| a == "--crash-monitor") {
233        crash_monitor::run_monitor();
234        return;
235    }
236
237    // 0. Start parent process watcher BEFORE tokio runtime does real work (Linux/macOS only).
238    // On Windows, the frontend uses a Job Object with KILL_ON_JOB_CLOSE instead.
239    // Uses getppid() to get the parent PID, then kqueue/pidfd to watch it (event-driven,
240    // zero CPU). Falls back to PPID polling if kqueue/pidfd setup fails or parent is init/launchd.
241    #[cfg(any(target_os = "linux", target_os = "macos"))]
242    {
243        let ppid = unsafe { libc::getppid() } as u32;
244        if ppid <= 1 {
245            // Parent is init/launchd — can't meaningfully watch it, use polling fallback
246            start_ppid_watchdog();
247        } else {
248            start_parent_watcher(ppid);
249        }
250    }
251
252    // 0b. Attach out-of-process crash dump handler (Windows only).
253    //     Spawns self with --crash-monitor and installs a VEH handler.
254    //     _crash_guard must stay alive — dropping it uninstalls the VEH handler.
255    //     Non-fatal: if the monitor fails to start, the process continues normally
256    //     and WER LocalDumps still captures __fastfail crashes independently.
257    #[cfg(windows)]
258    let _crash_guard = crash_monitor::spawn_and_attach();
259
260    // 1. Init tracing (stderr + rolling file)
261    let _log_guard = init_logging();
262
263    // 2. Parse CLI args and build config
264    let args = CliArgs::parse();
265    let config = config::Config::from_env_and_args(&args).unwrap_or_else(|e| {
266        tracing::error!("Failed to load config: {}", e);
267        std::process::exit(1);
268    });
269
270    let version = config.version.to_string();
271    let build_time = config.build_time.to_string();
272
273    // Make the per-launch auth_key available to the cross-instance agent
274    // registry writer. Peers performing an HTTP forward of a missed inject
275    // use this to authenticate against the writing instance's sidecar.
276    // Must happen after Config::from_env_and_args (which removes
277    // AGENTMUX_AUTH_KEY from the process env) but before anything calls
278    // `agent_registry::write`.
279    crate::backend::reactive::registry::init_local_auth_key(&config.auth_key);
280
281    // 4. Initialize backend (matching Go cmd/server/main-server.go:374-590)
282    base::set_version(&version);
283    base::set_build_time(&build_time);
284
285    // Migrate ~/.waveterm → ~/.agentmux if needed (one-time, non-destructive)
286    base::migrate_legacy_data_dir();
287
288    // Set up data directory (uses AGENTMUX_DATA_HOME or default)
289    if !config.data_home.is_empty() {
290        std::env::set_var("AGENTMUX_DATA_HOME", &config.data_home);
291    }
292    if !config.config_home.is_empty() {
293        std::env::set_var("AGENTMUX_CONFIG_HOME", &config.config_home);
294    }
295    if !config.app_path.is_empty() {
296        std::env::set_var("AGENTMUX_APP_PATH", &config.app_path);
297    }
298
299    base::ensure_wave_data_dir().unwrap_or_else(|e| {
300        tracing::error!("Failed to ensure data dir: {}", e);
301        std::process::exit(1);
302    });
303    base::ensure_wave_db_dir().unwrap_or_else(|e| {
304        tracing::error!("Failed to ensure db dir: {}", e);
305        std::process::exit(1);
306    });
307
308    // Startup diagnostics
309    tracing::info!(
310        data_dir = %base::get_wave_data_dir().display(),
311        db_dir = %base::get_wave_db_dir().display(),
312        app_path = %config.app_path,
313        instance_id = %config.instance_id,
314        "backend directories initialized"
315    );
316
317    // Open databases
318    let db_dir = base::get_wave_db_dir();
319    let wstore_raw = WaveStore::open(&db_dir.join("objects.db")).unwrap_or_else(|e| {
320        tracing::error!("Failed to open object store: {}", e);
321        std::process::exit(1);
322    });
323    // Attach the cross-version named-agent registry. Falls back to a
324    // disabled registry when the shared home can't be resolved (CI,
325    // unusual envs); mutations still hit SQLite, just don't mirror.
326    // See docs/specs/SPEC_SHARED_AGENT_REGISTRY_2026_05_12.md.
327    if let Some(root) = registry::resolve_shared_registry_dir() {
328        match registry::Registry::open(root.clone()) {
329            Ok(reg) => {
330                // PR B — one-shot backfill from every per-version
331                // objects.db into the registry. Idempotent via marker
332                // file in the registry root. Read-only on SQLite.
333                //
334                // Gating: the registry is only attached to wstore if
335                // migration completes (Ok) — that way the read path
336                // never serves a partial view. On Err, mirror writes
337                // are also disabled (registry stays detached); SQLite
338                // remains authoritative and the next launch retries
339                // the migration via the same marker logic.
340                let shared_home = root
341                    .parent()
342                    .and_then(|p| p.parent())
343                    .map(|p| p.to_path_buf());
344                let migration_ok = match shared_home {
345                    Some(home) => match registry::migrate_from_sqlite_once(&home, &reg) {
346                        Ok(stats) => {
347                            if stats.versions_scanned > 0 || stats.records_written > 0 {
348                                tracing::info!(
349                                    versions_scanned = stats.versions_scanned,
350                                    rows_seen = stats.rows_seen,
351                                    records_written = stats.records_written,
352                                    records_skipped_existing = stats.records_skipped_existing,
353                                    records_skipped_unmappable = stats.records_skipped_unmappable,
354                                    complete = stats.complete,
355                                    "registry: one-shot SQLite migration finished"
356                                );
357                            }
358                            // Gate attach on `complete` — partial
359                            // migration leaves the registry detached
360                            // so the read path serves SQLite (full,
361                            // current-version-only view) rather than
362                            // a half-populated registry.
363                            stats.complete
364                        }
365                        Err(e) => {
366                            tracing::warn!(
367                                error = %e,
368                                "registry: SQLite migration errored — leaving registry detached; SQLite stays authoritative, next launch retries"
369                            );
370                            false
371                        }
372                    },
373                    None => {
374                        tracing::warn!(
375                            root = %root.display(),
376                            "registry: cannot resolve shared home (root has fewer than 2 ancestors) — leaving registry detached"
377                        );
378                        false
379                    }
380                };
381                if migration_ok {
382                    tracing::info!(root = %root.display(), "registry: shared agent registry attached");
383                    wstore_raw.set_registry(Arc::new(reg));
384                }
385            }
386            Err(e) => tracing::warn!(
387                root = %root.display(),
388                error = %e,
389                "registry: failed to open shared agent registry — SQLite remains authoritative"
390            ),
391        }
392    } else {
393        tracing::warn!("registry: could not resolve shared registry dir — mirror disabled");
394    }
395    let wstore = Arc::new(wstore_raw);
396    let filestore = Arc::new(FileStore::open(&db_dir.join("filestore.db")).unwrap_or_else(|e| {
397        tracing::error!("Failed to open file store: {}", e);
398        std::process::exit(1);
399    }));
400    // Saga durability — see SPEC_SAGA_DURABILITY_2026-05-01.md.
401    // Backed by its own SQLite file (`sagas.db`) so saga writes
402    // commit independently of the wstore connection. Failure here
403    // is fatal: without the log, a srv crash mid-saga leaves
404    // unrecoverable state divergence.
405    let saga_log = Arc::new(
406        crate::sagas::log::SagaLog::open(&db_dir.join("sagas.db")).unwrap_or_else(|e| {
407            tracing::error!("Failed to open saga log: {}", e);
408            std::process::exit(1);
409        }),
410    );
411    // Seed `saga_id_alloc` from the highest persisted saga_id so
412    // restarts don't reuse IDs from prior runs (reagent P1 + codex
413    // P1 PR #631). With this seed + the plain INSERT (no OR REPLACE)
414    // in `start_saga`, ID collisions become impossible by
415    // construction.
416    let saga_id_seed = saga_log.max_saga_id().unwrap_or_else(|e| {
417        tracing::warn!(
418            "[saga] failed to read MAX(saga_id) for allocator seed: {} — defaulting to 0; ID collisions on restart possible until next successful query",
419            e
420        );
421        0
422    });
423    if saga_id_seed > 0 {
424        tracing::info!(
425            "[saga] seeded saga_id_alloc from durable log: next saga_id = {}",
426            saga_id_seed + 1
427        );
428    }
429
430    // Bootstrap data (creates Client/Window/Workspace/Tab on first launch)
431    let first_launch = wcore::ensure_initial_data(&wstore).unwrap_or_else(|e| {
432        tracing::error!("Failed to ensure initial data: {}", e);
433        std::process::exit(1);
434    });
435    if first_launch {
436        tracing::info!("First launch: created initial data");
437    }
438
439    // Seed ~/.agentmux/.gitignore so accidental git operations inside the
440    // data directory (e.g. an agent running `git init` or `git clone` in its
441    // cwd) don't stage anything by default. Idempotent — written once per
442    // install; we don't overwrite an existing user-customized file.
443    if let Some(home) = dirs::home_dir() {
444        let data_dir = home.join(".agentmux");
445        if data_dir.is_dir() {
446            let gitignore = data_dir.join(".gitignore");
447            if !gitignore.exists() {
448                let _ = std::fs::write(&gitignore, "*\n!.gitignore\n");
449            }
450        }
451    }
452
453    // Self-heal layouts: remove orphaned block nodes that cause blank panes.
454    // Runs on every startup to catch any corruption from prior sessions.
455    heal_all_layouts(&wstore);
456
457    // Option E (PR 1 of 2) — one-shot migration of per-block agent
458    // session zones into per-agent zones. Gated by a marker file under
459    // the data dir; a second startup is a no-op. Failures on
460    // individual blocks are logged but do not abort startup; the
461    // marker file is written even on partial failure so we don't
462    // retry indefinitely (operators can delete the marker to force a
463    // re-run). See
464    // docs/specs/SPEC_CONTINUATION_SESSION_PERSISTENCE_2026_05_23.md.
465    let _agent_zones_migration_stats = backend::agent_session::migrate_block_zones_v1(
466        &wstore,
467        &filestore,
468        &base::get_wave_data_dir(),
469    );
470
471    // Two-tier picker — Phase 1 (SPEC_AGENT_PICKER_TWO_TIER_2026_05_24.md).
472    // Mandatory companion to the picker UI split: any seeded template
473    // that currently carries a session zone (e.g. `agent:claude:current`
474    // with Maks's conversation) is promoted to a new user-owned
475    // definition with a sensible default name, and its zones +
476    // referencing instances are moved over. Without this step the
477    // freshly-introduced "Templates" section of the picker would
478    // silently reattach into pre-existing user sessions. Marker-file
479    // gated; second start is a no-op.
480    let _template_promote_stats = backend::agent_session::migrate_promote_template_sessions_v1(
481        &wstore,
482        &filestore,
483        &base::get_wave_data_dir(),
484    );
485
486    // Session recovery (Phase 4.2): scan for agent blocks that still have
487    // `session:active_pid` from a previous run — those sessions were killed
488    // by a crash/reboot. Transfer to `session:was_interrupted` so the
489    // frontend can show a reconnect banner.
490    let orphan_count = backend::blockcontroller::session_recovery::scan_orphans(&wstore);
491    if orphan_count > 0 {
492        tracing::info!(
493            orphan_count = orphan_count,
494            "session_recovery: flagged {} interrupted sessions for user reconnect",
495            orphan_count
496        );
497    }
498
499    // Auto-seed agent definitions on first launch (or empty DB)
500    backend::agent_seed::auto_seed_on_startup(&wstore);
501
502    // Phase 3a — `db_agents` consolidation backfill. Marker-file gated
503    // under the data dir; idempotent across restarts. WRITE-ONLY in
504    // Phase 3a: dual-write keeps `db_agents` fresh; reads still hit
505    // `db_agent_definitions` / `db_agent_instances`. Phase 3b will
506    // flip readers over. Failures here are logged + tolerated — the
507    // old tables remain authoritative; a future startup retries.
508    // See docs/specs/SPEC_AGENT_CONCEPT_CONSOLIDATION_2026_05_24.md.
509    match wstore.run_agents_consolidate(Some(&base::get_wave_data_dir())) {
510        Ok(stats) if stats.already_done => {
511            tracing::debug!("agents_consolidate: marker present; backfill already done");
512        }
513        Ok(stats) => {
514            tracing::info!(
515                templates_inserted = stats.templates_inserted,
516                user_defs_inserted = stats.user_defs_inserted,
517                instances_as_clone_inserted = stats.instances_as_clone_inserted,
518                instances_folded_into_def = stats.instances_folded_into_def,
519                instances_skipped_continuation = stats.instances_skipped_continuation,
520                instances_skipped_no_definition = stats.instances_skipped_no_definition,
521                instances_collision_warned = stats.instances_collision_warned,
522                "agents_consolidate: Phase 3a backfill done",
523            );
524        }
525        Err(e) => {
526            tracing::warn!(
527                error = %e,
528                "agents_consolidate: backfill failed; old tables remain authoritative",
529            );
530        }
531    }
532
533    // Event infrastructure
534    let event_bus = Arc::new(EventBus::new());
535    let broker = Arc::new(Broker::new());
536
537    // Bridge WPS events to WebSocket clients via EventBus
538    let bridge = backend::eventbus::EventBusBridge::new(event_bus.clone());
539    broker.set_client(Box::new(bridge));
540
541    // OAuth-bundles startup migration (PR E, spec §5):
542    // on first launch after an upgrade, detect ambient OAuth
543    // credentials in `<HOME>/.<auth_dir_name>/.credentials.json` for
544    // each oauth-class provider (claude / codex / openclaw) and seed a
545    // "Default" identity bundle whose binding points at the ambient
546    // dir via `SecretRef::OAuthConfigDir`. Idempotent across restarts —
547    // a second invocation sees the existing binding and exits early
548    // for each already-covered provider. Legacy empty / "blank"
549    // identity_id rows on `db_agent_instances` are back-filled to the
550    // Default bundle in the same pass. Pure no-op when no ambient
551    // creds exist (fresh install) or every oauth-class provider is
552    // already bound by a user-driven flow.
553    let _oauth_migration_stats = identity::migration::run_default_bundle_migration(
554        &wstore,
555        Some(&broker),
556        None,
557    );
558
559    // Config watcher (created before sysinfo loop so it can read telemetry:interval)
560    let config_watcher = Arc::new(wconfig::ConfigWatcher::with_config(wconfig::build_default_config()));
561
562    // Load user's settings.json from disk (merges with defaults)
563    backend::config_watcher_fs::load_settings_from_disk(&config_watcher);
564
565    // Watch settings.json for changes and broadcast to WebSocket clients
566    let _settings_watcher = backend::config_watcher_fs::spawn_settings_watcher(
567        config_watcher.clone(),
568        event_bus.clone(),
569    );
570
571    // Start sysinfo collection loop (interval configurable via telemetry:interval)
572    let sysinfo_broker = broker.clone();
573    let sysinfo_config = config_watcher.clone();
574    tokio::spawn(async move {
575        sysinfo::run_sysinfo_loop(sysinfo_broker, sysinfo_config, "local".to_string()).await;
576    });
577
578    // Start agent process watchdog (kills panes that exceed max-runtime or idle-output limits)
579    let watchdog_config = config_watcher.clone();
580    tokio::spawn(async move {
581        backend::blockcontroller::watchdog::run_watchdog_loop(watchdog_config).await;
582    });
583
584    // Reactive handler (global singleton) + poller
585    let reactive_handler = reactive::get_global_handler();
586    reactive_handler.set_input_sender(Arc::new(|block_id: &str, data: &[u8]| {
587        backend::blockcontroller::send_input(
588            block_id,
589            backend::blockcontroller::BlockInputUnion::data(data.to_vec()),
590            None,
591        )
592    }));
593    let poller = Arc::new(Poller::new(
594        PollerConfig {
595            agentmux_url: None,
596            agentmux_token: None,
597            poll_interval_secs: reactive::DEFAULT_POLL_INTERVAL_SECS,
598        },
599        reactive_handler,
600    ));
601
602    // Set up docsite directory
603    if let Some(app_path) = base::get_wave_app_path() {
604        let docsite_dir = app_path.join("docsite");
605        docsite::set_docsite_dir(docsite_dir);
606    }
607
608    // Local MessageBus for inter-agent communication
609    let messagebus = Arc::new(backend::messagebus::MessageBus::new());
610
611    // Subagent watcher — monitors Claude Code session dirs for spawned subagents
612    let subagent_watcher = backend::subagent_watcher::SubagentWatcher::spawn(event_bus.clone());
613
614    // History service — discovers and indexes past CLI agent conversations
615    let history_service = Arc::new(backend::history::HistoryService::new());
616
617    // Session archiver — auto-archive sessions inactive for >7 days, cap at 2 GB.
618    // Skip if home directory can't be determined (would otherwise fall back to a
619    // relative path and create archives under the current working directory).
620    if let Some(archive_dir) = backend::session_archive::default_archive_dir() {
621        let archiver = Arc::new(backend::session_archive::SessionArchiver::new(
622            wstore.clone(),
623            filestore.clone(),
624            7,                              // inactive days
625            2 * 1024 * 1024 * 1024,         // 2 GB max
626            archive_dir,
627        ));
628        tokio::spawn(async move {
629            loop {
630                tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
631                match archiver.sweep().await {
632                    Ok(stats) => tracing::info!(?stats, "session archiver sweep complete"),
633                    Err(e) => tracing::warn!(error = %e, "session archiver sweep failed"),
634                }
635            }
636        });
637    } else {
638        tracing::warn!("session archiver: home dir unavailable, archiver disabled");
639    }
640
641    // 5. Bind 2 TCP listeners on 127.0.0.1:0 (web + ws — separate ports matching Go)
642    let web_listener = TcpListener::bind("127.0.0.1:0")
643        .await
644        .expect("failed to bind web listener");
645    let ws_listener = TcpListener::bind("127.0.0.1:0")
646        .await
647        .expect("failed to bind ws listener");
648
649    let web_addr = web_listener.local_addr().unwrap();
650    let ws_addr = ws_listener.local_addr().unwrap();
651    let local_web_url = format!("http://{}", web_addr);
652
653    // Make local backend URL available to child processes (PTY shells).
654    // agentbus-client reads AGENTMUX_LOCAL_URL and uses it for local PTY delivery
655    // instead of routing through the cloud agentbus.
656    std::env::set_var("AGENTMUX_LOCAL_URL", &local_web_url);
657
658    // LAN discovery via mDNS — opt-in to avoid Windows Firewall prompt.
659    // mDNS binds 0.0.0.0:5353 UDP which triggers the firewall dialog.
660    // Only start if explicitly enabled in settings.
661    let lan_discovery_enabled = config_watcher.get_settings().network_lan_discovery;
662    let hostname = whoami::fallible::hostname().unwrap_or_else(|_| "unknown".to_string());
663    let lan_discovery = if lan_discovery_enabled {
664        match backend::lan_discovery::LanDiscovery::start(
665            config.instance_id.clone(),
666            hostname,
667            version.clone(),
668            web_addr.port(),
669            event_bus.clone(),
670        ) {
671            Ok(d) => Some(d),
672            Err(e) => {
673                tracing::warn!("LAN discovery unavailable: {e}");
674                None
675            }
676        }
677    } else {
678        tracing::info!("LAN discovery disabled (enable via network:lan_discovery setting)");
679        None
680    };
681
682    // Clean up stale cross-instance agent registry entries (entries older than 4h).
683    backend::reactive::registry::cleanup_stale(
684        &base::get_wave_data_dir(),
685        4 * 60 * 60 * 1000,
686    );
687
688    // Tracks agent-spawned OS processes per block. Registered trackers
689    // live as long as their agent pane; the background poller emits
690    // delta events (`agent:process-added`/`-exited`) to the frontend.
691    let process_tracker = std::sync::Arc::new(
692        backend::process_tracker::registry::AgentProcessRegistry::new(Some(broker.clone())),
693    );
694    backend::process_tracker::registry::set_global(process_tracker.clone());
695    backend::process_tracker::registry::spawn_poller(process_tracker.clone());
696
697    // Phase E.2 / E.2c.2 — srv reducer plumbing, hoisted out of the
698    // (conditional) pipe-IPC bind block so HTTP/WS RPC handlers in
699    // dispatch_service can route through the reducer. State, event
700    // bus, event log, and persist subscriber all live unconditionally;
701    // the pipe IPC server is still conditional on
702    // `AGENTMUX_SRV_PIPE_PATH` being set (absent in `task dev` mode).
703    let wstore_for_persist = Arc::clone(&wstore);
704    let srv_state = std::sync::Arc::new(tokio::sync::Mutex::new(state::State::default()));
705    let (srv_events_tx, _) =
706        tokio::sync::broadcast::channel::<agentmux_common::ipc::Event>(1024);
707    let srv_event_log = std::sync::Arc::new(event_log::EventLog::new(Some(
708        base::get_wave_data_dir().join("srv-events.log"),
709    )));
710
711    // Bootstrap reducer state from SQLite. Always runs (even in
712    // `task dev` where there's no pipe IPC server) so RPC handlers
713    // dispatching through the reducer see populated state.
714    persist::bootstrap_state_from_wstore(&srv_state, &wstore_for_persist).await;
715
716    // Spawn the disk writer (forensic log of every reducer event)
717    // and the persist subscriber (idempotent SQLite write-back).
718    let disk_writer_rx = srv_events_tx.subscribe();
719    let log_for_writer = std::sync::Arc::clone(&srv_event_log);
720    tokio::spawn(event_log::run_disk_writer(log_for_writer, disk_writer_rx));
721    let subscriber_rx = srv_events_tx.subscribe();
722    persist_subscriber::spawn_persist_subscriber(
723        subscriber_rx,
724        std::sync::Arc::clone(&wstore_for_persist),
725        std::sync::Arc::clone(&srv_state),
726    );
727
728    // Phase 1 of the WaveObjUpdate bridge: subscribe to srv_events_tx and
729    // translate workspace mutations into `waveobj:update` WS broadcasts.
730    // Fixes the workspace-rename reactivity gap where UpdateWorkspace
731    // returned `success_empty()` and the response loop had nothing to
732    // broadcast — see docs/specs/SPEC_OBJ_UPDATE_BRIDGE_2026-05-14.md.
733    //
734    // Watchdog: capture the JoinHandle and observe it from a sibling task
735    // so a panic in the bridge's loop scaffolding (vs. an inner
736    // dispatch_event panic, which is already caught per-event) is logged
737    // loudly. Without this, a silent bridge death would manifest as
738    // "renaming a workspace stopped propagating" with no log evidence.
739    // (Per ReAgent P2 follow-up on PR #852.)
740    let bridge_rx = srv_events_tx.subscribe();
741    let bridge_handle = server::wave_obj_bridge::spawn_wave_obj_bridge(
742        bridge_rx,
743        std::sync::Arc::clone(&wstore_for_persist),
744        std::sync::Arc::clone(&event_bus),
745    );
746    tokio::spawn(async move {
747        match bridge_handle.await {
748            Ok(()) => tracing::info!(
749                target: "wave-obj-bridge",
750                "bridge task exited normally (events channel closed at srv shutdown)"
751            ),
752            Err(e) if e.is_panic() => tracing::error!(
753                target: "wave-obj-bridge",
754                "bridge task PANICKED at top level — frontend WOS will stop receiving updates until srv restart. Panic: {}",
755                e
756            ),
757            Err(e) => tracing::error!(
758                target: "wave-obj-bridge",
759                "bridge task terminated unexpectedly (non-panic JoinError): {}",
760                e
761            ),
762        }
763    });
764
765    let state = AppState {
766        auth_key: config.auth_key.clone(),
767        version: version.clone(),
768        app_path: config.app_path.clone(),
769        wstore,
770        filestore,
771        event_bus,
772        broker,
773        reactive_handler,
774        poller,
775        config_watcher,
776        messagebus,
777        subagent_watcher,
778        history_service,
779        lan_discovery,
780        local_web_url: local_web_url.clone(),
781        http_client: reqwest::Client::new(),
782        process_tracker,
783        // Phase E.2c.2 — reducer state + event bus exposed to HTTP/WS
784        // dispatch handlers. Workspace handlers route through the
785        // reducer and publish events to `srv_events_tx`; the persist
786        // subscriber writes back to SQLite asynchronously.
787        srv_state: std::sync::Arc::clone(&srv_state),
788        srv_events_tx: srv_events_tx.clone(),
789        // Phase E.5.5 — saga-id allocator. Seeded from
790        // `SagaLog::max_saga_id()` so restarts don't collide with
791        // prior runs' IDs. First new saga after restart gets
792        // `seed + 1`; on a fresh DB seed=0, first saga gets id 1.
793        saga_id_alloc: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(saga_id_seed)),
794        saga_log: Arc::clone(&saga_log),
795        auth_session_manager: std::sync::Arc::new(
796            crate::identity::auth_session::AuthSessionManager::new(),
797        ),
798        install_sessions: crate::server::install_handlers::InstallSessionRegistry::new(),
799    };
800
801    // Saga durability PR 2 — resume-on-startup. Walk any sagas the
802    // durable log says are unresolved (running / compensating /
803    // failed) from a prior srv-process run, dispatch their inverse
804    // commands, and mark them compensated. Runs AFTER reducer
805    // bootstrap + persist subscriber spawn so the recovery's reducer
806    // dispatches operate against fully-populated state, BUT BEFORE
807    // the API server starts accepting requests so resumed
808    // compensation can't interleave with new sagas.
809    //
810    // Failure here is non-fatal: the saga log read might be transient,
811    // and starting up without recovery beats refusing to start.
812    // Operator can still inspect via `--diag sagas` (PR 2 part 2).
813    let resumed = sagas::recovery::compensate_unresolved(&state)
814        .await
815        .unwrap_or_else(|e| {
816            tracing::error!(
817                "[saga] resume-on-startup failed: {} — continuing; operator review needed",
818                e
819            );
820            0
821        });
822    if resumed > 0 {
823        tracing::info!(
824            "[saga] resume-on-startup compensated {} unresolved saga(s) from prior run",
825            resumed
826        );
827    }
828
829    // Phase E.1b — srv pipe IPC server. Bound when launcher passes
830    // `AGENTMUX_SRV_PIPE_PATH`; absent in `task dev` mode (no
831    // launcher in the loop).
832    //
833    // Phase E.2 — bootstrap reducer state from SQLite at startup
834    // so the session-only projection starts populated. The persist
835    // subscriber that mirrors pipe-event effects back to SQLite is
836    // deferred to E.2c (alongside the RPC-through-reducer migration);
837    // until then, HTTP/WS RPC continues writing directly via wcore
838    // and pipe commands only mutate the reducer's session-only state.
839    //
840    // Bind happens BEFORE the AGENTMUXSRV-ESTART line so the
841    // launcher knows the pipe is ready when host starts. Non-fatal
842    // if the bind fails — srv keeps running with HTTP/WS only.
843    #[cfg(target_os = "windows")]
844    if let Ok(srv_pipe_path) = std::env::var("AGENTMUX_SRV_PIPE_PATH") {
845        if !srv_pipe_path.is_empty() {
846            match srv_ipc::server::bind_first_pipe_instance(&srv_pipe_path) {
847                Ok(first_pipe) => {
848                    // Phase E.2c.2 — pipe IPC server reuses the
849                    // hoisted srv_state / events_tx / event_log so
850                    // pipe-originated commands and HTTP/WS-originated
851                    // commands mutate the same canonical state.
852                    let srv_ctx = srv_ipc::ServerCtx {
853                        srv_pid: std::process::id(),
854                        srv_version: version.clone(),
855                        state: std::sync::Arc::clone(&srv_state),
856                        events_tx: srv_events_tx.clone(),
857                        event_log: std::sync::Arc::clone(&srv_event_log),
858                    };
859                    let _srv_ipc_handle = srv_ipc::run_srv_ipc_server(
860                        srv_pipe_path.clone(),
861                        first_pipe,
862                        srv_ctx,
863                    );
864                    tracing::info!(
865                        target: "srv-ipc",
866                        "[srv-ipc] bound + spawned on {}",
867                        srv_pipe_path
868                    );
869                }
870                Err(e) => {
871                    tracing::error!(
872                        target: "srv-ipc",
873                        "[srv-ipc] bind failed on {}: {} — srv runs without pipe IPC",
874                        srv_pipe_path,
875                        e
876                    );
877                }
878            }
879        }
880    }
881
882    // 6. Emit AGENTMUXSRV-ESTART on stderr (exact format from cmd/server/main-server.go:617)
883    eprintln!(
884        "AGENTMUXSRV-ESTART ws:{} web:{} version:{} buildtime:{} instance:{}",
885        ws_addr, web_addr, version, build_time, config.instance_id
886    );
887
888    // 7. Build router and serve on both listeners
889    let router = build_router(state);
890
891    let web_server = axum::serve(web_listener, router.clone());
892    let ws_server = axum::serve(ws_listener, router);
893
894    // 8. Spawn stdin watch thread (exit on EOF — matching Go's stdinReadWatch)
895    let stdin_token = tokio_util::sync::CancellationToken::new();
896    let stdin_shutdown = stdin_token.clone();
897    std::thread::spawn(move || {
898        use std::io::Read;
899        let mut stdin = std::io::stdin().lock();
900        let mut buf = [0u8; 1024];
901        loop {
902            match stdin.read(&mut buf) {
903                Ok(0) => {
904                    eprintln!("stdin closed, shutting down");
905                    stdin_shutdown.cancel();
906                    break;
907                }
908                Ok(_) => {}
909                Err(e) => {
910                    eprintln!("stdin read error: {}, shutting down", e);
911                    stdin_shutdown.cancel();
912                    break;
913                }
914            }
915        }
916    });
917
918    // 9. Spawn signal handler (SIGINT/SIGTERM → graceful shutdown)
919    let signal_token = stdin_token.clone();
920    tokio::spawn(async move {
921        let ctrl_c = signal::ctrl_c();
922        #[cfg(unix)]
923        {
924            let mut sigterm =
925                signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap();
926            tokio::select! {
927                _ = ctrl_c => {
928                    tracing::info!("received SIGINT, shutting down");
929                }
930                _ = sigterm.recv() => {
931                    tracing::info!("received SIGTERM, shutting down");
932                }
933            }
934        }
935        #[cfg(not(unix))]
936        {
937            ctrl_c.await.ok();
938            tracing::info!("received Ctrl+C, shutting down");
939        }
940        signal_token.cancel();
941    });
942
943    // Run both servers until shutdown
944    tokio::select! {
945        result = web_server.into_future() => {
946            if let Err(e) = result {
947                tracing::error!("web server error: {}", e);
948            }
949        }
950        result = ws_server.into_future() => {
951            if let Err(e) = result {
952                tracing::error!("ws server error: {}", e);
953            }
954        }
955        _ = stdin_token.cancelled() => {
956            tracing::info!("shutdown signal received, exiting");
957        }
958    }
959}
960
961/// Initialize tracing with dual output: JSON rolling file + human-readable stderr.
962/// Returns a guard that must be held for the lifetime of the app to ensure log flushing.
963fn init_logging() -> tracing_appender::non_blocking::WorkerGuard {
964    use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
965
966    // Always log to ~/.agentmux/logs/ so all logs (host + sidecar) land in one
967    // discoverable directory. AGENTMUX_DATA_HOME controls the data dir, not logs.
968    // Version is embedded in the filename for side-by-side coexistence.
969    let version = env!("CARGO_PKG_VERSION");
970    let log_dir = dirs::home_dir()
971        .unwrap_or_default()
972        .join(".agentmux")
973        .join("logs");
974    let _ = std::fs::create_dir_all(&log_dir);
975
976    // Delete log files older than 7 days to prevent unbounded growth.
977    cleanup_old_logs(&log_dir, 7);
978
979    // Rolling daily log file with JSON structured output
980    let log_prefix = format!("agentmuxsrv-v{}.log", version);
981    let file_appender = tracing_appender::rolling::daily(&log_dir, &log_prefix);
982    let (non_blocking_file, guard) = tracing_appender::non_blocking(file_appender);
983
984    // Write pointer to current log file for zero-lookup agent discovery.
985    // Version-qualified name so multi-instance doesn't clobber pointers.
986    let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
987    let current_filename = format!("{}.{}", log_prefix, today);
988    let pointer_name = format!("current-srv-v{}.path", version);
989    let _ = std::fs::write(log_dir.join(&pointer_name), &current_filename);
990
991    // Spawn a background thread to refresh the pointer on UTC date rollover.
992    // tracing_appender::rolling::daily creates a new file at midnight UTC.
993    {
994        let log_dir = log_dir.clone();
995        let log_prefix = log_prefix.clone();
996        let pointer_name = pointer_name.clone();
997        std::thread::Builder::new()
998            .name("srv-log-pointer".into())
999            .spawn(move || {
1000                let mut last_date = chrono::Utc::now().format("%Y-%m-%d").to_string();
1001                loop {
1002                    std::thread::sleep(std::time::Duration::from_secs(60));
1003                    let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
1004                    if last_date != today {
1005                        last_date = today.clone();
1006                        let filename = format!("{}.{}", log_prefix, today);
1007                        let _ = std::fs::write(log_dir.join(&pointer_name), &filename);
1008                    }
1009                }
1010            })
1011            .ok();
1012    }
1013
1014    let subscriber = tracing_subscriber::registry()
1015        .with(
1016            EnvFilter::try_from_default_env()
1017                .unwrap_or_else(|_| EnvFilter::new("agentmuxsrv=info,info")),
1018        )
1019        .with(
1020            fmt::layer()
1021                .json()
1022                .with_writer(non_blocking_file)
1023                .with_target(true)
1024                .with_thread_ids(true),
1025        )
1026        .with(
1027            fmt::layer()
1028                .with_writer(std::io::stderr)
1029                .with_ansi(true),
1030        );
1031
1032    tracing::subscriber::set_global_default(subscriber).ok();
1033
1034    tracing::info!(
1035        version = env!("CARGO_PKG_VERSION"),
1036        os = std::env::consts::OS,
1037        arch = std::env::consts::ARCH,
1038        log_dir = %log_dir.display(),
1039        "agentmuxsrv starting"
1040    );
1041
1042    guard
1043}
1044
1045/// Delete log files (*.log.*) older than `days` to prevent unbounded growth.
1046/// Only touches files with `.log.` in the name — pointer files and other data are safe.
1047fn cleanup_old_logs(log_dir: &std::path::Path, days: u64) {
1048    let cutoff = std::time::SystemTime::now()
1049        - std::time::Duration::from_secs(days * 86400);
1050    let Ok(entries) = std::fs::read_dir(log_dir) else { return };
1051    for entry in entries.flatten() {
1052        let path = entry.path();
1053        if !path.to_string_lossy().contains(".log.") {
1054            continue;
1055        }
1056        if let Ok(meta) = entry.metadata() {
1057            if let Ok(modified) = meta.modified() {
1058                if modified < cutoff {
1059                    let _ = std::fs::remove_file(&path);
1060                }
1061            }
1062        }
1063    }
1064}
1065
1066/// Walk all tabs and heal their layouts by removing orphaned block references.
1067fn heal_all_layouts(store: &WaveStore) {
1068    use backend::obj::Tab;
1069
1070    let tabs: Vec<Tab> = match store.get_all::<Tab>() {
1071        Ok(tabs) => tabs,
1072        Err(e) => {
1073            tracing::warn!(error = %e, "heal_all_layouts: failed to list tabs");
1074            return;
1075        }
1076    };
1077
1078    let mut healed = 0;
1079    for tab in &tabs {
1080        match backend::wcore::heal_layout(store, &tab.oid) {
1081            Ok(true) => {
1082                tracing::info!(tab_id = %tab.oid, tab_name = %tab.name, "layout healed on startup");
1083                healed += 1;
1084            }
1085            Ok(false) => {}
1086            Err(e) => {
1087                tracing::warn!(tab_id = %tab.oid, error = %e, "heal_layout failed");
1088            }
1089        }
1090    }
1091    if healed > 0 {
1092        tracing::info!(tabs_healed = healed, "layout self-healing complete");
1093    } else {
1094        tracing::info!("layout self-healing: all layouts clean");
1095    }
1096}