agentmux_srv\backend/
sysinfo.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sysinfo data collection loop: collects CPU, memory, and network metrics
5//! and publishes them via the WPS broker. Sampling interval is configurable
6//! via the `telemetry:interval` setting (0.1s–2.0s, default 1.0s).
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use sysinfo::{Disks, Networks, Pid, ProcessRefreshKind, ProcessesToUpdate};
13use tokio::time::MissedTickBehavior;
14
15use crate::backend::blockcontroller::pidregistry;
16use crate::backend::blockcontroller::process_tree;
17use crate::backend::rpc_types::TimeSeriesData;
18use crate::backend::wconfig::ConfigWatcher;
19use crate::backend::wps::{Broker, WaveEvent, EVENT_BLOCK_STATS, EVENT_SYS_INFO};
20
21const BYTES_PER_GB: f64 = 1_073_741_824.0;
22const BYTES_PER_MB: f64 = 1_048_576.0;
23const PERSIST_COUNT: usize = 1024;
24const DEFAULT_INTERVAL_SECS: f64 = 1.0;
25const MIN_INTERVAL_SECS: f64 = 0.2;
26const MAX_INTERVAL_SECS: f64 = 2.0;
27
28/// Collect CPU usage (total + per-core).
29fn get_cpu_data(sys: &sysinfo::System, values: &mut HashMap<String, f64>) {
30    let cpus = sys.cpus();
31    if cpus.is_empty() {
32        return;
33    }
34    // Total CPU usage (average across all cores)
35    let total: f64 = cpus.iter().map(|c| c.cpu_usage() as f64).sum::<f64>() / cpus.len() as f64;
36    values.insert("cpu".to_string(), total);
37    // Per-core usage
38    for (idx, cpu) in cpus.iter().enumerate() {
39        values.insert(format!("cpu:{}", idx), cpu.cpu_usage() as f64);
40    }
41}
42
43/// Collect memory metrics (in GB).
44fn get_mem_data(sys: &sysinfo::System, values: &mut HashMap<String, f64>) {
45    let total = sys.total_memory() as f64 / BYTES_PER_GB;
46    let used = sys.used_memory() as f64 / BYTES_PER_GB;
47    let available = sys.available_memory() as f64 / BYTES_PER_GB;
48    let free = sys.free_memory() as f64 / BYTES_PER_GB;
49    values.insert("mem:total".to_string(), total);
50    values.insert("mem:used".to_string(), used);
51    values.insert("mem:available".to_string(), available);
52    values.insert("mem:free".to_string(), free);
53}
54
55/// Network I/O tracking state for rate calculations.
56struct NetState {
57    prev_sent: u64,
58    prev_recv: u64,
59    prev_time: Option<Instant>,
60}
61
62impl NetState {
63    fn new() -> Self {
64        Self {
65            prev_sent: 0,
66            prev_recv: 0,
67            prev_time: None,
68        }
69    }
70
71    /// Collect network I/O rates (in MB/s).
72    fn get_net_data(&mut self, networks: &Networks, values: &mut HashMap<String, f64>) {
73        // Sum across all interfaces
74        let mut total_sent: u64 = 0;
75        let mut total_recv: u64 = 0;
76        for (_name, data) in networks.iter() {
77            total_sent += data.total_transmitted();
78            total_recv += data.total_received();
79        }
80
81        let now = Instant::now();
82        if let Some(prev_time) = self.prev_time {
83            let elapsed = now.duration_since(prev_time).as_secs_f64();
84            if elapsed > 0.0 {
85                let sent_rate = (total_sent.saturating_sub(self.prev_sent)) as f64 / elapsed / BYTES_PER_MB;
86                let recv_rate = (total_recv.saturating_sub(self.prev_recv)) as f64 / elapsed / BYTES_PER_MB;
87                values.insert("net:bytessent".to_string(), sent_rate);
88                values.insert("net:bytesrecv".to_string(), recv_rate);
89                values.insert("net:bytestotal".to_string(), sent_rate + recv_rate);
90            }
91        }
92
93        self.prev_sent = total_sent;
94        self.prev_recv = total_recv;
95        self.prev_time = Some(now);
96    }
97}
98
99/// Collect disk I/O rates (in MB/s).
100/// sysinfo Disk::usage() returns deltas (bytes since last refresh) so we
101/// divide by elapsed time to get rates.
102fn get_disk_data(disks: &Disks, elapsed_secs: f64, values: &mut HashMap<String, f64>) {
103    if elapsed_secs <= 0.0 {
104        return;
105    }
106    let (total_read, total_write) = disks.list().iter().fold((0u64, 0u64), |(r, w), disk| {
107        let u = disk.usage();
108        (r + u.read_bytes, w + u.written_bytes)
109    });
110    let read_rate = total_read as f64 / elapsed_secs / BYTES_PER_MB;
111    let write_rate = total_write as f64 / elapsed_secs / BYTES_PER_MB;
112    values.insert("disk:read".to_string(), read_rate);
113    values.insert("disk:write".to_string(), write_rate);
114    values.insert("disk:total".to_string(), read_rate + write_rate);
115}
116
117/// Read the telemetry interval from config, clamped to [MIN, MAX].
118fn get_interval_secs(config_watcher: &ConfigWatcher) -> f64 {
119    let val = config_watcher.get_settings().telemetry_interval;
120    if val <= 0.0 {
121        return DEFAULT_INTERVAL_SECS;
122    }
123    val.clamp(MIN_INTERVAL_SECS, MAX_INTERVAL_SECS)
124}
125
126/// Run the sysinfo collection loop. Uses `tokio::time::interval` for steady
127/// tick rate regardless of refresh duration. Interval is re-read from config
128/// each tick and the timer is reset if it changes.
129pub async fn run_sysinfo_loop(broker: Arc<Broker>, config_watcher: Arc<ConfigWatcher>, conn_name: String) {
130    let mut sys = sysinfo::System::new_all();
131    let mut networks = Networks::new_with_refreshed_list();
132    let mut net_state = NetState::new();
133    let mut disks = Disks::new_with_refreshed_list();
134    let mut last_tick = Instant::now();
135
136    let mut current_interval = get_interval_secs(&config_watcher);
137    let mut ticker = tokio::time::interval(Duration::from_secs_f64(current_interval));
138    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
139    // Skip the first immediate tick
140    ticker.tick().await;
141
142    tracing::info!("sysinfo loop started for conn:{}", conn_name);
143
144    loop {
145        ticker.tick().await;
146
147        // Check if interval changed and reset ticker if so
148        let new_interval = get_interval_secs(&config_watcher);
149        if (new_interval - current_interval).abs() > 0.001 {
150            current_interval = new_interval;
151            ticker = tokio::time::interval(Duration::from_secs_f64(current_interval));
152            ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
153            ticker.tick().await; // consume immediate first tick
154            tracing::info!("sysinfo interval changed to {}s", current_interval);
155        }
156
157        // Refresh all metrics
158        sys.refresh_cpu_usage();
159        sys.refresh_memory();
160        networks.refresh(true);
161        disks.refresh(true);
162
163        let now_instant = Instant::now();
164        let elapsed_secs = now_instant.duration_since(last_tick).as_secs_f64();
165        last_tick = now_instant;
166
167        let mut values = HashMap::new();
168        get_cpu_data(&sys, &mut values);
169        get_mem_data(&sys, &mut values);
170        net_state.get_net_data(&networks, &mut values);
171        get_disk_data(&disks, elapsed_secs, &mut values);
172
173        let now = std::time::SystemTime::now()
174            .duration_since(std::time::UNIX_EPOCH)
175            .unwrap_or_default()
176            .as_millis() as i64;
177
178        let ts_data = TimeSeriesData { ts: now, values };
179
180        let event = WaveEvent {
181            event: EVENT_SYS_INFO.to_string(),
182            scopes: vec![conn_name.clone()],
183            sender: String::new(),
184            persist: PERSIST_COUNT,
185            data: serde_json::to_value(&ts_data).ok(),
186        };
187
188        broker.publish(event);
189
190        // Per-pane process tree metrics: aggregate CPU/mem across each block's
191        // shell process and all its descendants.
192        let block_pids = pidregistry::get_all();
193        if !block_pids.is_empty() {
194            // Pass 1: cheap minimal refresh of all processes to populate parent()
195            // links. ProcessRefreshKind::new() skips CPU accounting and memory
196            // queries — just PID/PPID/name. ~0.5ms on a typical desktop.
197            sys.refresh_processes_specifics(
198                ProcessesToUpdate::All,
199                false, // keep stale entries — pass 2 removes dead ones
200                ProcessRefreshKind::nothing(),
201            );
202
203            // For each block, BFS the process tree from the shell PID.
204            let mut block_trees: Vec<(String, Vec<Pid>)> = block_pids
205                .iter()
206                .map(|(block_id, pid)| {
207                    let root = Pid::from(*pid as usize);
208                    let tree = process_tree::collect_descendants(
209                        &sys,
210                        root,
211                        process_tree::MAX_PIDS_PER_BLOCK,
212                    );
213                    (block_id.clone(), tree)
214                })
215                .collect();
216
217            // Pass 2: targeted deep refresh (CPU + mem) for only the PIDs we care about.
218            // Deduplicate across blocks so each PID is refreshed at most once.
219            let mut all_pids: Vec<Pid> = block_trees
220                .iter()
221                .flat_map(|(_, pids)| pids.iter().copied())
222                .collect();
223            all_pids.sort_unstable();
224            all_pids.dedup();
225            sys.refresh_processes_specifics(
226                ProcessesToUpdate::Some(&all_pids),
227                true, // remove dead processes on this authoritative pass
228                ProcessRefreshKind::everything(),
229            );
230
231            // Aggregate per block and publish.
232            // After Pass 2 (remove_dead=true), sys.process() returns None for any
233            // PID that no longer exists — use this to detect orphaned registry entries.
234            let mut dead_block_ids: Vec<String> = Vec::new();
235
236            for (block_id, pids) in &mut block_trees {
237                // collect_descendants() always puts the root PID first.
238                let root_pid = pids.first().copied().unwrap_or(Pid::from(0usize));
239                let mut total_cpu: f64 = 0.0;
240                let mut total_mem: u64 = 0;
241                let mut live_count: u32 = 0;
242
243                for pid in pids.iter() {
244                    if let Some(proc) = sys.process(*pid) {
245                        total_cpu += proc.cpu_usage() as f64;
246                        total_mem += proc.memory();
247                        live_count += 1;
248                    }
249                }
250
251                // Root process is gone — evict from registry.  This is the last-resort
252                // cleanup for processes that exit without normal wait-task teardown
253                // (SIGKILL by the OS, unexpected crash, or stop() race).
254                if sys.process(root_pid).is_none() {
255                    dead_block_ids.push(block_id.clone());
256                    continue; // skip publishing stale stats for a dead block
257                }
258
259                let mut block_values = HashMap::new();
260                block_values.insert("cpu".to_string(), total_cpu);
261                block_values.insert("mem".to_string(), total_mem as f64);
262                block_values.insert("pids".to_string(), live_count as f64);
263
264                let block_ts = TimeSeriesData {
265                    ts: now,
266                    values: block_values,
267                };
268                let block_event = WaveEvent {
269                    event: EVENT_BLOCK_STATS.to_string(),
270                    scopes: vec![format!("block:{}", block_id)],
271                    sender: String::new(),
272                    persist: 0,
273                    data: serde_json::to_value(&block_ts).ok(),
274                };
275                broker.publish(block_event);
276            }
277
278            for block_id in &dead_block_ids {
279                pidregistry::unregister(block_id);
280                tracing::warn!(
281                    block_id = %block_id,
282                    "sysinfo: evicted dead root PID — process exited without normal cleanup"
283                );
284            }
285        }
286    }
287}