1mod agents;
5mod backend;
6mod config;
7mod event_log;
8mod identity;
9mod persist;
10mod persist_subscriber;
11mod reducer;
12mod registry;
13mod sagas;
14mod server;
15mod srv_ipc;
16mod state;
17mod drone;
18#[cfg(windows)]
19mod crash_monitor;
20
21use std::future::IntoFuture;
22use std::sync::Arc;
23
24use clap::Parser;
25use config::CliArgs;
26use server::{AppState, build_router};
27use tokio::net::TcpListener;
28use tokio::signal;
29
30use backend::eventbus::EventBus;
31use backend::reactive::{self, Poller, PollerConfig};
32use backend::storage::filestore::FileStore;
33use backend::storage::wstore::WaveStore;
34use backend::wps::Broker;
35use backend::wconfig;
36use backend::{docsite, sysinfo, base, wcore};
37
38#[cfg(any(target_os = "linux", target_os = "macos"))]
43fn start_ppid_watchdog() {
44 let original_ppid = unsafe { libc::getppid() };
45 std::thread::spawn(move || {
46 loop {
47 std::thread::sleep(std::time::Duration::from_secs(2));
48 let current_ppid = unsafe { libc::getppid() };
49 if current_ppid != original_ppid {
50 eprintln!(
51 "parent process died (ppid changed {} -> {}), shutting down",
52 original_ppid, current_ppid
53 );
54 std::process::exit(0);
55 }
56 }
57 });
58}
59
60#[cfg(target_os = "macos")]
64fn start_parent_watcher(parent_pid: u32) {
65 std::thread::spawn(move || {
66 unsafe {
67 let kq = libc::kqueue();
68 if kq < 0 {
69 eprintln!(
70 "kqueue() failed (errno={}), falling back to ppid watchdog",
71 *libc::__error()
72 );
73 let _ = kq;
74 start_ppid_watchdog();
75 return;
76 }
77
78 let mut changelist: [libc::kevent; 1] = std::mem::zeroed();
80 changelist[0] = libc::kevent {
81 ident: parent_pid as usize,
82 filter: libc::EVFILT_PROC,
83 flags: libc::EV_ADD | libc::EV_ONESHOT,
84 fflags: libc::NOTE_EXIT,
85 data: 0,
86 udata: std::ptr::null_mut(),
87 };
88
89 let ret = libc::kevent(
90 kq,
91 changelist.as_ptr(),
92 1,
93 std::ptr::null_mut(),
94 0,
95 std::ptr::null(),
96 );
97
98 if ret < 0 {
99 let errno = *libc::__error();
100 libc::close(kq);
101 if errno == libc::ESRCH {
102 eprintln!(
104 "parent process {} already exited (ESRCH during kqueue registration), shutting down",
105 parent_pid
106 );
107 std::process::exit(0);
108 }
109 eprintln!(
110 "kevent() registration failed (errno={}), falling back to ppid watchdog",
111 errno
112 );
113 start_ppid_watchdog();
114 return;
115 }
116
117 eprintln!("kqueue EVFILT_PROC registered for parent pid {}", parent_pid);
118
119 if libc::kill(parent_pid as i32, 0) != 0 && *libc::__error() == libc::ESRCH {
122 libc::close(kq);
123 eprintln!(
124 "parent process {} already exited (post-registration check), shutting down",
125 parent_pid
126 );
127 std::process::exit(0);
128 }
129
130 let mut eventlist: [libc::kevent; 1] = std::mem::zeroed();
132 let n = libc::kevent(
133 kq,
134 std::ptr::null(),
135 0,
136 eventlist.as_mut_ptr(),
137 1,
138 std::ptr::null(),
139 );
140 libc::close(kq);
141
142 if n > 0 {
143 eprintln!(
144 "parent process {} exited (kqueue EVFILT_PROC), shutting down",
145 parent_pid
146 );
147 } else {
148 eprintln!(
149 "kevent() wait returned {} (errno={}), shutting down",
150 n,
151 *libc::__error()
152 );
153 }
154 std::process::exit(0);
155 }
156 });
157}
158
159#[cfg(target_os = "linux")]
162fn start_parent_watcher(parent_pid: u32) {
163 std::thread::spawn(move || {
164 unsafe {
165 let pidfd = libc::syscall(libc::SYS_pidfd_open, parent_pid as libc::c_int, 0 as libc::c_int);
167
168 if pidfd < 0 {
169 let errno = *libc::__errno_location();
170 if errno == libc::ESRCH {
171 eprintln!(
173 "parent process {} already exited (ESRCH from pidfd_open), shutting down",
174 parent_pid
175 );
176 std::process::exit(0);
177 }
178 eprintln!(
180 "pidfd_open() failed (errno={}), falling back to ppid watchdog",
181 errno
182 );
183 start_ppid_watchdog();
184 return;
185 }
186
187 let pidfd = pidfd as libc::c_int;
188
189 if libc::kill(parent_pid as i32, 0) != 0 && *libc::__errno_location() == libc::ESRCH {
191 libc::close(pidfd);
192 eprintln!(
193 "parent process {} already exited (post-pidfd check), shutting down",
194 parent_pid
195 );
196 std::process::exit(0);
197 }
198
199 let mut pfd = libc::pollfd {
201 fd: pidfd,
202 events: libc::POLLIN,
203 revents: 0,
204 };
205
206 let ret = libc::poll(&mut pfd, 1, -1); libc::close(pidfd);
208
209 if ret > 0 {
210 eprintln!(
211 "parent process {} exited (pidfd poll), shutting down",
212 parent_pid
213 );
214 } else {
215 eprintln!(
216 "poll() on pidfd returned {} (errno={}), shutting down",
217 ret,
218 *libc::__errno_location()
219 );
220 }
221 std::process::exit(0);
222 }
223 });
224}
225
226#[tokio::main]
227async fn main() {
228 #[cfg(windows)]
232 if std::env::args().any(|a| a == "--crash-monitor") {
233 crash_monitor::run_monitor();
234 return;
235 }
236
237 #[cfg(any(target_os = "linux", target_os = "macos"))]
242 {
243 let ppid = unsafe { libc::getppid() } as u32;
244 if ppid <= 1 {
245 start_ppid_watchdog();
247 } else {
248 start_parent_watcher(ppid);
249 }
250 }
251
252 #[cfg(windows)]
258 let _crash_guard = crash_monitor::spawn_and_attach();
259
260 let _log_guard = init_logging();
262
263 let args = CliArgs::parse();
265 let config = config::Config::from_env_and_args(&args).unwrap_or_else(|e| {
266 tracing::error!("Failed to load config: {}", e);
267 std::process::exit(1);
268 });
269
270 let version = config.version.to_string();
271 let build_time = config.build_time.to_string();
272
273 crate::backend::reactive::registry::init_local_auth_key(&config.auth_key);
280
281 base::set_version(&version);
283 base::set_build_time(&build_time);
284
285 base::migrate_legacy_data_dir();
287
288 if !config.data_home.is_empty() {
290 std::env::set_var("AGENTMUX_DATA_HOME", &config.data_home);
291 }
292 if !config.config_home.is_empty() {
293 std::env::set_var("AGENTMUX_CONFIG_HOME", &config.config_home);
294 }
295 if !config.app_path.is_empty() {
296 std::env::set_var("AGENTMUX_APP_PATH", &config.app_path);
297 }
298
299 base::ensure_wave_data_dir().unwrap_or_else(|e| {
300 tracing::error!("Failed to ensure data dir: {}", e);
301 std::process::exit(1);
302 });
303 base::ensure_wave_db_dir().unwrap_or_else(|e| {
304 tracing::error!("Failed to ensure db dir: {}", e);
305 std::process::exit(1);
306 });
307
308 tracing::info!(
310 data_dir = %base::get_wave_data_dir().display(),
311 db_dir = %base::get_wave_db_dir().display(),
312 app_path = %config.app_path,
313 instance_id = %config.instance_id,
314 "backend directories initialized"
315 );
316
317 let db_dir = base::get_wave_db_dir();
319 let wstore_raw = WaveStore::open(&db_dir.join("objects.db")).unwrap_or_else(|e| {
320 tracing::error!("Failed to open object store: {}", e);
321 std::process::exit(1);
322 });
323 if let Some(root) = registry::resolve_shared_registry_dir() {
328 match registry::Registry::open(root.clone()) {
329 Ok(reg) => {
330 let shared_home = root
341 .parent()
342 .and_then(|p| p.parent())
343 .map(|p| p.to_path_buf());
344 let migration_ok = match shared_home {
345 Some(home) => match registry::migrate_from_sqlite_once(&home, ®) {
346 Ok(stats) => {
347 if stats.versions_scanned > 0 || stats.records_written > 0 {
348 tracing::info!(
349 versions_scanned = stats.versions_scanned,
350 rows_seen = stats.rows_seen,
351 records_written = stats.records_written,
352 records_skipped_existing = stats.records_skipped_existing,
353 records_skipped_unmappable = stats.records_skipped_unmappable,
354 complete = stats.complete,
355 "registry: one-shot SQLite migration finished"
356 );
357 }
358 stats.complete
364 }
365 Err(e) => {
366 tracing::warn!(
367 error = %e,
368 "registry: SQLite migration errored — leaving registry detached; SQLite stays authoritative, next launch retries"
369 );
370 false
371 }
372 },
373 None => {
374 tracing::warn!(
375 root = %root.display(),
376 "registry: cannot resolve shared home (root has fewer than 2 ancestors) — leaving registry detached"
377 );
378 false
379 }
380 };
381 if migration_ok {
382 tracing::info!(root = %root.display(), "registry: shared agent registry attached");
383 wstore_raw.set_registry(Arc::new(reg));
384 }
385 }
386 Err(e) => tracing::warn!(
387 root = %root.display(),
388 error = %e,
389 "registry: failed to open shared agent registry — SQLite remains authoritative"
390 ),
391 }
392 } else {
393 tracing::warn!("registry: could not resolve shared registry dir — mirror disabled");
394 }
395 let wstore = Arc::new(wstore_raw);
396 let filestore = Arc::new(FileStore::open(&db_dir.join("filestore.db")).unwrap_or_else(|e| {
397 tracing::error!("Failed to open file store: {}", e);
398 std::process::exit(1);
399 }));
400 let saga_log = Arc::new(
406 crate::sagas::log::SagaLog::open(&db_dir.join("sagas.db")).unwrap_or_else(|e| {
407 tracing::error!("Failed to open saga log: {}", e);
408 std::process::exit(1);
409 }),
410 );
411 let saga_id_seed = saga_log.max_saga_id().unwrap_or_else(|e| {
417 tracing::warn!(
418 "[saga] failed to read MAX(saga_id) for allocator seed: {} — defaulting to 0; ID collisions on restart possible until next successful query",
419 e
420 );
421 0
422 });
423 if saga_id_seed > 0 {
424 tracing::info!(
425 "[saga] seeded saga_id_alloc from durable log: next saga_id = {}",
426 saga_id_seed + 1
427 );
428 }
429
430 let first_launch = wcore::ensure_initial_data(&wstore).unwrap_or_else(|e| {
432 tracing::error!("Failed to ensure initial data: {}", e);
433 std::process::exit(1);
434 });
435 if first_launch {
436 tracing::info!("First launch: created initial data");
437 }
438
439 if let Some(home) = dirs::home_dir() {
444 let data_dir = home.join(".agentmux");
445 if data_dir.is_dir() {
446 let gitignore = data_dir.join(".gitignore");
447 if !gitignore.exists() {
448 let _ = std::fs::write(&gitignore, "*\n!.gitignore\n");
449 }
450 }
451 }
452
453 heal_all_layouts(&wstore);
456
457 let _agent_zones_migration_stats = backend::agent_session::migrate_block_zones_v1(
466 &wstore,
467 &filestore,
468 &base::get_wave_data_dir(),
469 );
470
471 let _template_promote_stats = backend::agent_session::migrate_promote_template_sessions_v1(
481 &wstore,
482 &filestore,
483 &base::get_wave_data_dir(),
484 );
485
486 let orphan_count = backend::blockcontroller::session_recovery::scan_orphans(&wstore);
491 if orphan_count > 0 {
492 tracing::info!(
493 orphan_count = orphan_count,
494 "session_recovery: flagged {} interrupted sessions for user reconnect",
495 orphan_count
496 );
497 }
498
499 backend::agent_seed::auto_seed_on_startup(&wstore);
501
502 match wstore.run_agents_consolidate(Some(&base::get_wave_data_dir())) {
510 Ok(stats) if stats.already_done => {
511 tracing::debug!("agents_consolidate: marker present; backfill already done");
512 }
513 Ok(stats) => {
514 tracing::info!(
515 templates_inserted = stats.templates_inserted,
516 user_defs_inserted = stats.user_defs_inserted,
517 instances_as_clone_inserted = stats.instances_as_clone_inserted,
518 instances_folded_into_def = stats.instances_folded_into_def,
519 instances_skipped_continuation = stats.instances_skipped_continuation,
520 instances_skipped_no_definition = stats.instances_skipped_no_definition,
521 instances_collision_warned = stats.instances_collision_warned,
522 "agents_consolidate: Phase 3a backfill done",
523 );
524 }
525 Err(e) => {
526 tracing::warn!(
527 error = %e,
528 "agents_consolidate: backfill failed; old tables remain authoritative",
529 );
530 }
531 }
532
533 let event_bus = Arc::new(EventBus::new());
535 let broker = Arc::new(Broker::new());
536
537 let bridge = backend::eventbus::EventBusBridge::new(event_bus.clone());
539 broker.set_client(Box::new(bridge));
540
541 let _oauth_migration_stats = identity::migration::run_default_bundle_migration(
554 &wstore,
555 Some(&broker),
556 None,
557 );
558
559 let config_watcher = Arc::new(wconfig::ConfigWatcher::with_config(wconfig::build_default_config()));
561
562 backend::config_watcher_fs::load_settings_from_disk(&config_watcher);
564
565 let _settings_watcher = backend::config_watcher_fs::spawn_settings_watcher(
567 config_watcher.clone(),
568 event_bus.clone(),
569 );
570
571 let sysinfo_broker = broker.clone();
573 let sysinfo_config = config_watcher.clone();
574 tokio::spawn(async move {
575 sysinfo::run_sysinfo_loop(sysinfo_broker, sysinfo_config, "local".to_string()).await;
576 });
577
578 let watchdog_config = config_watcher.clone();
580 tokio::spawn(async move {
581 backend::blockcontroller::watchdog::run_watchdog_loop(watchdog_config).await;
582 });
583
584 let reactive_handler = reactive::get_global_handler();
586 reactive_handler.set_input_sender(Arc::new(|block_id: &str, data: &[u8]| {
587 backend::blockcontroller::send_input(
588 block_id,
589 backend::blockcontroller::BlockInputUnion::data(data.to_vec()),
590 None,
591 )
592 }));
593 let poller = Arc::new(Poller::new(
594 PollerConfig {
595 agentmux_url: None,
596 agentmux_token: None,
597 poll_interval_secs: reactive::DEFAULT_POLL_INTERVAL_SECS,
598 },
599 reactive_handler,
600 ));
601
602 if let Some(app_path) = base::get_wave_app_path() {
604 let docsite_dir = app_path.join("docsite");
605 docsite::set_docsite_dir(docsite_dir);
606 }
607
608 let messagebus = Arc::new(backend::messagebus::MessageBus::new());
610
611 let subagent_watcher = backend::subagent_watcher::SubagentWatcher::spawn(event_bus.clone());
613
614 let history_service = Arc::new(backend::history::HistoryService::new());
616
617 if let Some(archive_dir) = backend::session_archive::default_archive_dir() {
621 let archiver = Arc::new(backend::session_archive::SessionArchiver::new(
622 wstore.clone(),
623 filestore.clone(),
624 7, 2 * 1024 * 1024 * 1024, archive_dir,
627 ));
628 tokio::spawn(async move {
629 loop {
630 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
631 match archiver.sweep().await {
632 Ok(stats) => tracing::info!(?stats, "session archiver sweep complete"),
633 Err(e) => tracing::warn!(error = %e, "session archiver sweep failed"),
634 }
635 }
636 });
637 } else {
638 tracing::warn!("session archiver: home dir unavailable, archiver disabled");
639 }
640
641 let web_listener = TcpListener::bind("127.0.0.1:0")
643 .await
644 .expect("failed to bind web listener");
645 let ws_listener = TcpListener::bind("127.0.0.1:0")
646 .await
647 .expect("failed to bind ws listener");
648
649 let web_addr = web_listener.local_addr().unwrap();
650 let ws_addr = ws_listener.local_addr().unwrap();
651 let local_web_url = format!("http://{}", web_addr);
652
653 std::env::set_var("AGENTMUX_LOCAL_URL", &local_web_url);
657
658 let lan_discovery_enabled = config_watcher.get_settings().network_lan_discovery;
662 let hostname = whoami::fallible::hostname().unwrap_or_else(|_| "unknown".to_string());
663 let lan_discovery = if lan_discovery_enabled {
664 match backend::lan_discovery::LanDiscovery::start(
665 config.instance_id.clone(),
666 hostname,
667 version.clone(),
668 web_addr.port(),
669 event_bus.clone(),
670 ) {
671 Ok(d) => Some(d),
672 Err(e) => {
673 tracing::warn!("LAN discovery unavailable: {e}");
674 None
675 }
676 }
677 } else {
678 tracing::info!("LAN discovery disabled (enable via network:lan_discovery setting)");
679 None
680 };
681
682 backend::reactive::registry::cleanup_stale(
684 &base::get_wave_data_dir(),
685 4 * 60 * 60 * 1000,
686 );
687
688 let process_tracker = std::sync::Arc::new(
692 backend::process_tracker::registry::AgentProcessRegistry::new(Some(broker.clone())),
693 );
694 backend::process_tracker::registry::set_global(process_tracker.clone());
695 backend::process_tracker::registry::spawn_poller(process_tracker.clone());
696
697 let wstore_for_persist = Arc::clone(&wstore);
704 let srv_state = std::sync::Arc::new(tokio::sync::Mutex::new(state::State::default()));
705 let (srv_events_tx, _) =
706 tokio::sync::broadcast::channel::<agentmux_common::ipc::Event>(1024);
707 let srv_event_log = std::sync::Arc::new(event_log::EventLog::new(Some(
708 base::get_wave_data_dir().join("srv-events.log"),
709 )));
710
711 persist::bootstrap_state_from_wstore(&srv_state, &wstore_for_persist).await;
715
716 let disk_writer_rx = srv_events_tx.subscribe();
719 let log_for_writer = std::sync::Arc::clone(&srv_event_log);
720 tokio::spawn(event_log::run_disk_writer(log_for_writer, disk_writer_rx));
721 let subscriber_rx = srv_events_tx.subscribe();
722 persist_subscriber::spawn_persist_subscriber(
723 subscriber_rx,
724 std::sync::Arc::clone(&wstore_for_persist),
725 std::sync::Arc::clone(&srv_state),
726 );
727
728 let bridge_rx = srv_events_tx.subscribe();
741 let bridge_handle = server::wave_obj_bridge::spawn_wave_obj_bridge(
742 bridge_rx,
743 std::sync::Arc::clone(&wstore_for_persist),
744 std::sync::Arc::clone(&event_bus),
745 );
746 tokio::spawn(async move {
747 match bridge_handle.await {
748 Ok(()) => tracing::info!(
749 target: "wave-obj-bridge",
750 "bridge task exited normally (events channel closed at srv shutdown)"
751 ),
752 Err(e) if e.is_panic() => tracing::error!(
753 target: "wave-obj-bridge",
754 "bridge task PANICKED at top level — frontend WOS will stop receiving updates until srv restart. Panic: {}",
755 e
756 ),
757 Err(e) => tracing::error!(
758 target: "wave-obj-bridge",
759 "bridge task terminated unexpectedly (non-panic JoinError): {}",
760 e
761 ),
762 }
763 });
764
765 let state = AppState {
766 auth_key: config.auth_key.clone(),
767 version: version.clone(),
768 app_path: config.app_path.clone(),
769 wstore,
770 filestore,
771 event_bus,
772 broker,
773 reactive_handler,
774 poller,
775 config_watcher,
776 messagebus,
777 subagent_watcher,
778 history_service,
779 lan_discovery,
780 local_web_url: local_web_url.clone(),
781 http_client: reqwest::Client::new(),
782 process_tracker,
783 srv_state: std::sync::Arc::clone(&srv_state),
788 srv_events_tx: srv_events_tx.clone(),
789 saga_id_alloc: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(saga_id_seed)),
794 saga_log: Arc::clone(&saga_log),
795 auth_session_manager: std::sync::Arc::new(
796 crate::identity::auth_session::AuthSessionManager::new(),
797 ),
798 install_sessions: crate::server::install_handlers::InstallSessionRegistry::new(),
799 };
800
801 let resumed = sagas::recovery::compensate_unresolved(&state)
814 .await
815 .unwrap_or_else(|e| {
816 tracing::error!(
817 "[saga] resume-on-startup failed: {} — continuing; operator review needed",
818 e
819 );
820 0
821 });
822 if resumed > 0 {
823 tracing::info!(
824 "[saga] resume-on-startup compensated {} unresolved saga(s) from prior run",
825 resumed
826 );
827 }
828
829 #[cfg(target_os = "windows")]
844 if let Ok(srv_pipe_path) = std::env::var("AGENTMUX_SRV_PIPE_PATH") {
845 if !srv_pipe_path.is_empty() {
846 match srv_ipc::server::bind_first_pipe_instance(&srv_pipe_path) {
847 Ok(first_pipe) => {
848 let srv_ctx = srv_ipc::ServerCtx {
853 srv_pid: std::process::id(),
854 srv_version: version.clone(),
855 state: std::sync::Arc::clone(&srv_state),
856 events_tx: srv_events_tx.clone(),
857 event_log: std::sync::Arc::clone(&srv_event_log),
858 };
859 let _srv_ipc_handle = srv_ipc::run_srv_ipc_server(
860 srv_pipe_path.clone(),
861 first_pipe,
862 srv_ctx,
863 );
864 tracing::info!(
865 target: "srv-ipc",
866 "[srv-ipc] bound + spawned on {}",
867 srv_pipe_path
868 );
869 }
870 Err(e) => {
871 tracing::error!(
872 target: "srv-ipc",
873 "[srv-ipc] bind failed on {}: {} — srv runs without pipe IPC",
874 srv_pipe_path,
875 e
876 );
877 }
878 }
879 }
880 }
881
882 eprintln!(
884 "AGENTMUXSRV-ESTART ws:{} web:{} version:{} buildtime:{} instance:{}",
885 ws_addr, web_addr, version, build_time, config.instance_id
886 );
887
888 let router = build_router(state);
890
891 let web_server = axum::serve(web_listener, router.clone());
892 let ws_server = axum::serve(ws_listener, router);
893
894 let stdin_token = tokio_util::sync::CancellationToken::new();
896 let stdin_shutdown = stdin_token.clone();
897 std::thread::spawn(move || {
898 use std::io::Read;
899 let mut stdin = std::io::stdin().lock();
900 let mut buf = [0u8; 1024];
901 loop {
902 match stdin.read(&mut buf) {
903 Ok(0) => {
904 eprintln!("stdin closed, shutting down");
905 stdin_shutdown.cancel();
906 break;
907 }
908 Ok(_) => {}
909 Err(e) => {
910 eprintln!("stdin read error: {}, shutting down", e);
911 stdin_shutdown.cancel();
912 break;
913 }
914 }
915 }
916 });
917
918 let signal_token = stdin_token.clone();
920 tokio::spawn(async move {
921 let ctrl_c = signal::ctrl_c();
922 #[cfg(unix)]
923 {
924 let mut sigterm =
925 signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap();
926 tokio::select! {
927 _ = ctrl_c => {
928 tracing::info!("received SIGINT, shutting down");
929 }
930 _ = sigterm.recv() => {
931 tracing::info!("received SIGTERM, shutting down");
932 }
933 }
934 }
935 #[cfg(not(unix))]
936 {
937 ctrl_c.await.ok();
938 tracing::info!("received Ctrl+C, shutting down");
939 }
940 signal_token.cancel();
941 });
942
943 tokio::select! {
945 result = web_server.into_future() => {
946 if let Err(e) = result {
947 tracing::error!("web server error: {}", e);
948 }
949 }
950 result = ws_server.into_future() => {
951 if let Err(e) = result {
952 tracing::error!("ws server error: {}", e);
953 }
954 }
955 _ = stdin_token.cancelled() => {
956 tracing::info!("shutdown signal received, exiting");
957 }
958 }
959}
960
961fn init_logging() -> tracing_appender::non_blocking::WorkerGuard {
964 use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
965
966 let version = env!("CARGO_PKG_VERSION");
970 let log_dir = dirs::home_dir()
971 .unwrap_or_default()
972 .join(".agentmux")
973 .join("logs");
974 let _ = std::fs::create_dir_all(&log_dir);
975
976 cleanup_old_logs(&log_dir, 7);
978
979 let log_prefix = format!("agentmuxsrv-v{}.log", version);
981 let file_appender = tracing_appender::rolling::daily(&log_dir, &log_prefix);
982 let (non_blocking_file, guard) = tracing_appender::non_blocking(file_appender);
983
984 let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
987 let current_filename = format!("{}.{}", log_prefix, today);
988 let pointer_name = format!("current-srv-v{}.path", version);
989 let _ = std::fs::write(log_dir.join(&pointer_name), ¤t_filename);
990
991 {
994 let log_dir = log_dir.clone();
995 let log_prefix = log_prefix.clone();
996 let pointer_name = pointer_name.clone();
997 std::thread::Builder::new()
998 .name("srv-log-pointer".into())
999 .spawn(move || {
1000 let mut last_date = chrono::Utc::now().format("%Y-%m-%d").to_string();
1001 loop {
1002 std::thread::sleep(std::time::Duration::from_secs(60));
1003 let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
1004 if last_date != today {
1005 last_date = today.clone();
1006 let filename = format!("{}.{}", log_prefix, today);
1007 let _ = std::fs::write(log_dir.join(&pointer_name), &filename);
1008 }
1009 }
1010 })
1011 .ok();
1012 }
1013
1014 let subscriber = tracing_subscriber::registry()
1015 .with(
1016 EnvFilter::try_from_default_env()
1017 .unwrap_or_else(|_| EnvFilter::new("agentmuxsrv=info,info")),
1018 )
1019 .with(
1020 fmt::layer()
1021 .json()
1022 .with_writer(non_blocking_file)
1023 .with_target(true)
1024 .with_thread_ids(true),
1025 )
1026 .with(
1027 fmt::layer()
1028 .with_writer(std::io::stderr)
1029 .with_ansi(true),
1030 );
1031
1032 tracing::subscriber::set_global_default(subscriber).ok();
1033
1034 tracing::info!(
1035 version = env!("CARGO_PKG_VERSION"),
1036 os = std::env::consts::OS,
1037 arch = std::env::consts::ARCH,
1038 log_dir = %log_dir.display(),
1039 "agentmuxsrv starting"
1040 );
1041
1042 guard
1043}
1044
1045fn cleanup_old_logs(log_dir: &std::path::Path, days: u64) {
1048 let cutoff = std::time::SystemTime::now()
1049 - std::time::Duration::from_secs(days * 86400);
1050 let Ok(entries) = std::fs::read_dir(log_dir) else { return };
1051 for entry in entries.flatten() {
1052 let path = entry.path();
1053 if !path.to_string_lossy().contains(".log.") {
1054 continue;
1055 }
1056 if let Ok(meta) = entry.metadata() {
1057 if let Ok(modified) = meta.modified() {
1058 if modified < cutoff {
1059 let _ = std::fs::remove_file(&path);
1060 }
1061 }
1062 }
1063 }
1064}
1065
1066fn heal_all_layouts(store: &WaveStore) {
1068 use backend::obj::Tab;
1069
1070 let tabs: Vec<Tab> = match store.get_all::<Tab>() {
1071 Ok(tabs) => tabs,
1072 Err(e) => {
1073 tracing::warn!(error = %e, "heal_all_layouts: failed to list tabs");
1074 return;
1075 }
1076 };
1077
1078 let mut healed = 0;
1079 for tab in &tabs {
1080 match backend::wcore::heal_layout(store, &tab.oid) {
1081 Ok(true) => {
1082 tracing::info!(tab_id = %tab.oid, tab_name = %tab.name, "layout healed on startup");
1083 healed += 1;
1084 }
1085 Ok(false) => {}
1086 Err(e) => {
1087 tracing::warn!(tab_id = %tab.oid, error = %e, "heal_layout failed");
1088 }
1089 }
1090 }
1091 if healed > 0 {
1092 tracing::info!(tabs_healed = healed, "layout self-healing complete");
1093 } else {
1094 tracing::info!("layout self-healing: all layouts clean");
1095 }
1096}