agentmux_srv\backend/
config_watcher_fs.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Filesystem watcher for settings.json — detects saves and pushes updated
5//! config to all connected WebSocket clients in real time.
6
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10
11use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
12use serde_json::json;
13use tokio::sync::mpsc;
14
15use super::eventbus::{EventBus, WSEventType, WS_EVENT_RPC};
16use super::wconfig::{self, ConfigWatcher, SettingsType};
17
18/// Resolve the directory containing settings.json.
19///
20/// Priority:
21/// 1. `AGENTMUX_SETTINGS_DIR` env var (set by Tauri host to app_config_dir)
22/// 2. `AGENTMUX_CONFIG_HOME` env var (backend's config root)
23/// 3. `~/.agentmux` (legacy fallback)
24pub fn resolve_settings_dir() -> PathBuf {
25    if let Ok(dir) = std::env::var("AGENTMUX_SETTINGS_DIR") {
26        if !dir.is_empty() {
27            return PathBuf::from(dir);
28        }
29    }
30    // Fall back to config home parent (settings.json sits at app_config_dir root,
31    // not inside the instances subdir)
32    if let Ok(dir) = std::env::var("AGENTMUX_CONFIG_HOME") {
33        if !dir.is_empty() {
34            // AGENTMUX_CONFIG_HOME = .../instances/v0.31.XX — go up two levels
35            let path = PathBuf::from(&dir);
36            if let Some(root) = path.parent().and_then(|p| p.parent()) {
37                return root.to_path_buf();
38            }
39        }
40    }
41    dirs::home_dir().unwrap_or_default().join(".agentmux")
42}
43
44/// Load settings.json from disk into the ConfigWatcher.
45/// Called once at startup so the backend has the user's saved settings.
46pub fn load_settings_from_disk(config_watcher: &ConfigWatcher) {
47    let settings_dir = resolve_settings_dir();
48    let settings_path = settings_dir.join(wconfig::SETTINGS_FILE);
49
50    tracing::info!(
51        path = %settings_path.display(),
52        exists = settings_path.exists(),
53        "loading settings.json from disk"
54    );
55
56    let (settings, errors): (SettingsType, _) = wconfig::read_config_file(&settings_path);
57
58    if !errors.is_empty() {
59        for err in &errors {
60            tracing::warn!(file = %err.file, error = %err.err, "settings parse error at startup");
61        }
62        return;
63    }
64
65    config_watcher.update_settings(settings);
66    tracing::info!("settings.json loaded successfully");
67}
68
69/// Spawn a filesystem watcher that monitors `settings.json` and broadcasts
70/// config updates to all WebSocket clients on change.
71///
72/// Returns a handle to the watcher (must be held alive for the duration of the app).
73pub fn spawn_settings_watcher(
74    config_watcher: Arc<ConfigWatcher>,
75    event_bus: Arc<EventBus>,
76) -> Option<RecommendedWatcher> {
77    let settings_dir = resolve_settings_dir();
78    let settings_path = settings_dir.join(wconfig::SETTINGS_FILE);
79
80    if !settings_dir.exists() {
81        tracing::warn!(
82            dir = %settings_dir.display(),
83            "settings directory does not exist, file watcher not started"
84        );
85        return None;
86    }
87
88    // Channel to bridge sync notify callbacks into async tokio
89    let (tx, mut rx) = mpsc::unbounded_channel::<()>();
90
91    let watched_path = settings_path.clone();
92    let mut watcher = match notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
93        match res {
94            Ok(event) => {
95                let dominated = matches!(
96                    event.kind,
97                    EventKind::Modify(_) | EventKind::Create(_)
98                );
99                if dominated && event.paths.iter().any(|p| p.ends_with("settings.json")) {
100                    let _ = tx.send(());
101                }
102            }
103            Err(e) => {
104                tracing::warn!(error = %e, "filesystem watcher error");
105            }
106        }
107    }) {
108        Ok(w) => w,
109        Err(e) => {
110            tracing::warn!(error = %e, "failed to create settings file watcher");
111            return None;
112        }
113    };
114
115    if let Err(e) = watcher.watch(&settings_dir, RecursiveMode::NonRecursive) {
116        tracing::warn!(
117            dir = %settings_dir.display(),
118            error = %e,
119            "failed to watch settings directory"
120        );
121        return None;
122    }
123
124    tracing::info!(
125        path = %settings_path.display(),
126        dir = %settings_dir.display(),
127        "filesystem watcher active for settings.json"
128    );
129
130    // Spawn async task: debounce notifications and reload config
131    tokio::spawn(async move {
132        loop {
133            // Wait for first notification
134            if rx.recv().await.is_none() {
135                tracing::info!("settings watcher channel closed, stopping");
136                break;
137            }
138            // Debounce: drain any additional events within 300ms
139            tokio::time::sleep(Duration::from_millis(300)).await;
140            while rx.try_recv().is_ok() {}
141
142            reload_and_broadcast(&watched_path, &config_watcher, &event_bus);
143        }
144    });
145
146    Some(watcher)
147}
148
149/// Merge new keys into the current in-memory SettingsType and return the result.
150/// Used by the setconfig handler to update in-memory state before the fs watcher fires.
151pub fn merge_settings_into_current(
152    config_watcher: &wconfig::ConfigWatcher,
153    new_keys: serde_json::Map<String, serde_json::Value>,
154) -> wconfig::SettingsType {
155    let mut current = config_watcher.get_settings();
156    // Merge via JSON round-trip so the extra HashMap catches all dynamic keys
157    if let Ok(mut current_val) = serde_json::to_value(&current) {
158        if let serde_json::Value::Object(ref mut map) = current_val {
159            map.extend(new_keys.into_iter().filter(|(_, v)| !v.is_null()));
160        }
161        if let Ok(merged) = serde_json::from_value(current_val) {
162            current = merged;
163        }
164    }
165    current
166}
167
168/// Merge a flat map of settings keys into `settings.json` on disk.
169/// Existing keys not present in `new_keys` are preserved.
170/// The fs watcher will detect the write (~300ms) and broadcast the updated config.
171pub fn merge_settings_to_disk(new_keys: serde_json::Map<String, serde_json::Value>) -> Result<(), String> {
172    if new_keys.is_empty() {
173        return Ok(());
174    }
175    let settings_dir = resolve_settings_dir();
176    let settings_path = settings_dir.join(wconfig::SETTINGS_FILE);
177
178    let mut current = wconfig::read_settings_raw_jsonc(&settings_path);
179    current.extend(new_keys);
180
181    // Remove keys explicitly set to null (deletion semantics)
182    current.retain(|_, v| !v.is_null());
183
184    let merged = wconfig::merge_into_template(wconfig::SETTINGS_TEMPLATE, &current);
185    std::fs::write(&settings_path, &merged)
186        .map_err(|e| format!("write settings.json: {e}"))?;
187
188    tracing::info!(path = %settings_path.display(), "settings.json updated via setconfig");
189    Ok(())
190}
191
192fn reload_and_broadcast(
193    settings_path: &PathBuf,
194    config_watcher: &Arc<ConfigWatcher>,
195    event_bus: &Arc<EventBus>,
196) {
197    tracing::info!(path = %settings_path.display(), "settings.json changed, reloading");
198
199    let (settings, errors): (SettingsType, _) = wconfig::read_config_file(settings_path);
200
201    if !errors.is_empty() {
202        for err in &errors {
203            tracing::warn!(file = %err.file, error = %err.err, "settings reload parse error (keeping previous config)");
204        }
205        return;
206    }
207
208    config_watcher.update_settings(settings);
209    tracing::info!("settings.json reloaded, broadcasting to clients");
210
211    // Broadcast updated config to all connected clients (same format as initial config push)
212    let config = config_watcher.get_full_config();
213    let client_count = event_bus.connection_count();
214    if let Ok(config_val) = serde_json::to_value(config.as_ref()) {
215        let event = WSEventType {
216            eventtype: WS_EVENT_RPC.to_string(),
217            oref: String::new(),
218            data: Some(json!({
219                "command": "eventrecv",
220                "data": {
221                    "event": "config",
222                    "data": { "fullconfig": config_val }
223                }
224            })),
225        };
226        event_bus.broadcast_event(&event);
227        tracing::info!(clients = client_count, "config event broadcast complete");
228    }
229}