1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use sysinfo::{Disks, Networks, Pid, ProcessRefreshKind, ProcessesToUpdate};
13use tokio::time::MissedTickBehavior;
14
15use crate::backend::blockcontroller::pidregistry;
16use crate::backend::blockcontroller::process_tree;
17use crate::backend::rpc_types::TimeSeriesData;
18use crate::backend::wconfig::ConfigWatcher;
19use crate::backend::wps::{Broker, WaveEvent, EVENT_BLOCK_STATS, EVENT_SYS_INFO};
20
21const BYTES_PER_GB: f64 = 1_073_741_824.0;
22const BYTES_PER_MB: f64 = 1_048_576.0;
23const PERSIST_COUNT: usize = 1024;
24const DEFAULT_INTERVAL_SECS: f64 = 1.0;
25const MIN_INTERVAL_SECS: f64 = 0.2;
26const MAX_INTERVAL_SECS: f64 = 2.0;
27
28fn get_cpu_data(sys: &sysinfo::System, values: &mut HashMap<String, f64>) {
30 let cpus = sys.cpus();
31 if cpus.is_empty() {
32 return;
33 }
34 let total: f64 = cpus.iter().map(|c| c.cpu_usage() as f64).sum::<f64>() / cpus.len() as f64;
36 values.insert("cpu".to_string(), total);
37 for (idx, cpu) in cpus.iter().enumerate() {
39 values.insert(format!("cpu:{}", idx), cpu.cpu_usage() as f64);
40 }
41}
42
43fn get_mem_data(sys: &sysinfo::System, values: &mut HashMap<String, f64>) {
45 let total = sys.total_memory() as f64 / BYTES_PER_GB;
46 let used = sys.used_memory() as f64 / BYTES_PER_GB;
47 let available = sys.available_memory() as f64 / BYTES_PER_GB;
48 let free = sys.free_memory() as f64 / BYTES_PER_GB;
49 values.insert("mem:total".to_string(), total);
50 values.insert("mem:used".to_string(), used);
51 values.insert("mem:available".to_string(), available);
52 values.insert("mem:free".to_string(), free);
53}
54
55struct NetState {
57 prev_sent: u64,
58 prev_recv: u64,
59 prev_time: Option<Instant>,
60}
61
62impl NetState {
63 fn new() -> Self {
64 Self {
65 prev_sent: 0,
66 prev_recv: 0,
67 prev_time: None,
68 }
69 }
70
71 fn get_net_data(&mut self, networks: &Networks, values: &mut HashMap<String, f64>) {
73 let mut total_sent: u64 = 0;
75 let mut total_recv: u64 = 0;
76 for (_name, data) in networks.iter() {
77 total_sent += data.total_transmitted();
78 total_recv += data.total_received();
79 }
80
81 let now = Instant::now();
82 if let Some(prev_time) = self.prev_time {
83 let elapsed = now.duration_since(prev_time).as_secs_f64();
84 if elapsed > 0.0 {
85 let sent_rate = (total_sent.saturating_sub(self.prev_sent)) as f64 / elapsed / BYTES_PER_MB;
86 let recv_rate = (total_recv.saturating_sub(self.prev_recv)) as f64 / elapsed / BYTES_PER_MB;
87 values.insert("net:bytessent".to_string(), sent_rate);
88 values.insert("net:bytesrecv".to_string(), recv_rate);
89 values.insert("net:bytestotal".to_string(), sent_rate + recv_rate);
90 }
91 }
92
93 self.prev_sent = total_sent;
94 self.prev_recv = total_recv;
95 self.prev_time = Some(now);
96 }
97}
98
99fn get_disk_data(disks: &Disks, elapsed_secs: f64, values: &mut HashMap<String, f64>) {
103 if elapsed_secs <= 0.0 {
104 return;
105 }
106 let (total_read, total_write) = disks.list().iter().fold((0u64, 0u64), |(r, w), disk| {
107 let u = disk.usage();
108 (r + u.read_bytes, w + u.written_bytes)
109 });
110 let read_rate = total_read as f64 / elapsed_secs / BYTES_PER_MB;
111 let write_rate = total_write as f64 / elapsed_secs / BYTES_PER_MB;
112 values.insert("disk:read".to_string(), read_rate);
113 values.insert("disk:write".to_string(), write_rate);
114 values.insert("disk:total".to_string(), read_rate + write_rate);
115}
116
117fn get_interval_secs(config_watcher: &ConfigWatcher) -> f64 {
119 let val = config_watcher.get_settings().telemetry_interval;
120 if val <= 0.0 {
121 return DEFAULT_INTERVAL_SECS;
122 }
123 val.clamp(MIN_INTERVAL_SECS, MAX_INTERVAL_SECS)
124}
125
126pub async fn run_sysinfo_loop(broker: Arc<Broker>, config_watcher: Arc<ConfigWatcher>, conn_name: String) {
130 let mut sys = sysinfo::System::new_all();
131 let mut networks = Networks::new_with_refreshed_list();
132 let mut net_state = NetState::new();
133 let mut disks = Disks::new_with_refreshed_list();
134 let mut last_tick = Instant::now();
135
136 let mut current_interval = get_interval_secs(&config_watcher);
137 let mut ticker = tokio::time::interval(Duration::from_secs_f64(current_interval));
138 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
139 ticker.tick().await;
141
142 tracing::info!("sysinfo loop started for conn:{}", conn_name);
143
144 loop {
145 ticker.tick().await;
146
147 let new_interval = get_interval_secs(&config_watcher);
149 if (new_interval - current_interval).abs() > 0.001 {
150 current_interval = new_interval;
151 ticker = tokio::time::interval(Duration::from_secs_f64(current_interval));
152 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
153 ticker.tick().await; tracing::info!("sysinfo interval changed to {}s", current_interval);
155 }
156
157 sys.refresh_cpu_usage();
159 sys.refresh_memory();
160 networks.refresh(true);
161 disks.refresh(true);
162
163 let now_instant = Instant::now();
164 let elapsed_secs = now_instant.duration_since(last_tick).as_secs_f64();
165 last_tick = now_instant;
166
167 let mut values = HashMap::new();
168 get_cpu_data(&sys, &mut values);
169 get_mem_data(&sys, &mut values);
170 net_state.get_net_data(&networks, &mut values);
171 get_disk_data(&disks, elapsed_secs, &mut values);
172
173 let now = std::time::SystemTime::now()
174 .duration_since(std::time::UNIX_EPOCH)
175 .unwrap_or_default()
176 .as_millis() as i64;
177
178 let ts_data = TimeSeriesData { ts: now, values };
179
180 let event = WaveEvent {
181 event: EVENT_SYS_INFO.to_string(),
182 scopes: vec![conn_name.clone()],
183 sender: String::new(),
184 persist: PERSIST_COUNT,
185 data: serde_json::to_value(&ts_data).ok(),
186 };
187
188 broker.publish(event);
189
190 let block_pids = pidregistry::get_all();
193 if !block_pids.is_empty() {
194 sys.refresh_processes_specifics(
198 ProcessesToUpdate::All,
199 false, ProcessRefreshKind::nothing(),
201 );
202
203 let mut block_trees: Vec<(String, Vec<Pid>)> = block_pids
205 .iter()
206 .map(|(block_id, pid)| {
207 let root = Pid::from(*pid as usize);
208 let tree = process_tree::collect_descendants(
209 &sys,
210 root,
211 process_tree::MAX_PIDS_PER_BLOCK,
212 );
213 (block_id.clone(), tree)
214 })
215 .collect();
216
217 let mut all_pids: Vec<Pid> = block_trees
220 .iter()
221 .flat_map(|(_, pids)| pids.iter().copied())
222 .collect();
223 all_pids.sort_unstable();
224 all_pids.dedup();
225 sys.refresh_processes_specifics(
226 ProcessesToUpdate::Some(&all_pids),
227 true, ProcessRefreshKind::everything(),
229 );
230
231 let mut dead_block_ids: Vec<String> = Vec::new();
235
236 for (block_id, pids) in &mut block_trees {
237 let root_pid = pids.first().copied().unwrap_or(Pid::from(0usize));
239 let mut total_cpu: f64 = 0.0;
240 let mut total_mem: u64 = 0;
241 let mut live_count: u32 = 0;
242
243 for pid in pids.iter() {
244 if let Some(proc) = sys.process(*pid) {
245 total_cpu += proc.cpu_usage() as f64;
246 total_mem += proc.memory();
247 live_count += 1;
248 }
249 }
250
251 if sys.process(root_pid).is_none() {
255 dead_block_ids.push(block_id.clone());
256 continue; }
258
259 let mut block_values = HashMap::new();
260 block_values.insert("cpu".to_string(), total_cpu);
261 block_values.insert("mem".to_string(), total_mem as f64);
262 block_values.insert("pids".to_string(), live_count as f64);
263
264 let block_ts = TimeSeriesData {
265 ts: now,
266 values: block_values,
267 };
268 let block_event = WaveEvent {
269 event: EVENT_BLOCK_STATS.to_string(),
270 scopes: vec![format!("block:{}", block_id)],
271 sender: String::new(),
272 persist: 0,
273 data: serde_json::to_value(&block_ts).ok(),
274 };
275 broker.publish(block_event);
276 }
277
278 for block_id in &dead_block_ids {
279 pidregistry::unregister(block_id);
280 tracing::warn!(
281 block_id = %block_id,
282 "sysinfo: evicted dead root PID — process exited without normal cleanup"
283 );
284 }
285 }
286 }
287}