agentmux_srv\backend/
lan_discovery.rs1use std::collections::HashMap;
11use std::net::IpAddr;
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19
20use super::eventbus::{EventBus, WSEventType};
21
22const SERVICE_TYPE: &str = "_agentmux._tcp.local.";
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct LanInstance {
26 pub instance_id: String,
27 pub hostname: String,
28 pub version: String,
29 pub address: String,
30 pub port: u16,
31 pub agents: Vec<String>,
32 pub first_seen: u64,
33 pub last_seen: u64,
34}
35
36pub struct LanDiscovery {
37 daemon: ServiceDaemon,
38 instances: Arc<RwLock<HashMap<String, LanInstance>>>,
39 instance_id: String,
40 event_bus: Arc<EventBus>,
41 service_fullname: String,
42}
43
44impl LanDiscovery {
45 pub fn start(
47 instance_id: String,
48 hostname: String,
49 version: String,
50 port: u16,
51 event_bus: Arc<EventBus>,
52 ) -> Result<Arc<Self>, String> {
53 let daemon = ServiceDaemon::new().map_err(|e| format!("mDNS daemon failed: {e}"))?;
54
55 let service_name = format!("agentmux-{}", &instance_id);
57 let properties = [
58 ("version", version.as_str()),
59 ("hostname", hostname.as_str()),
60 ("instance_id", instance_id.as_str()),
61 ];
62 let service_info = ServiceInfo::new(
63 SERVICE_TYPE,
64 &service_name,
65 &hostname,
66 "", port,
68 &properties[..],
69 )
70 .map_err(|e| format!("ServiceInfo creation failed: {e}"))?;
71
72 let service_fullname = service_info.get_fullname().to_string();
73
74 daemon
75 .register(service_info)
76 .map_err(|e| format!("mDNS register failed: {e}"))?;
77
78 let browse_receiver = daemon
80 .browse(SERVICE_TYPE)
81 .map_err(|e| format!("mDNS browse failed: {e}"))?;
82
83 let instances = Arc::new(RwLock::new(HashMap::new()));
84
85 let discovery = Arc::new(Self {
86 daemon,
87 instances: instances.clone(),
88 instance_id: instance_id.clone(),
89 event_bus: event_bus.clone(),
90 service_fullname,
91 });
92
93 let disc = discovery.clone();
95 tokio::task::spawn_blocking(move || {
96 disc.event_loop(browse_receiver);
97 });
98
99 tracing::info!(
100 instance_id = %instance_id,
101 port = port,
102 "LAN discovery started (mDNS)"
103 );
104
105 Ok(discovery)
106 }
107
108 fn event_loop(&self, receiver: mdns_sd::Receiver<ServiceEvent>) {
109 loop {
110 match receiver.recv() {
111 Ok(event) => self.handle_event(event),
112 Err(_) => {
113 tracing::warn!("mDNS event receiver closed");
114 break;
115 }
116 }
117 }
118 }
119
120 fn handle_event(&self, event: ServiceEvent) {
121 match event {
122 ServiceEvent::ServiceResolved(info) => {
123 let peer_id = info
124 .get_property_val_str("instance_id")
125 .unwrap_or_default()
126 .to_string();
127
128 if peer_id == self.instance_id {
130 return;
131 }
132
133 let now = SystemTime::now()
134 .duration_since(UNIX_EPOCH)
135 .unwrap_or_default()
136 .as_secs();
137
138 let address = info
139 .get_addresses()
140 .iter()
141 .find(|a| matches!(a, IpAddr::V4(_)))
142 .or_else(|| info.get_addresses().iter().next())
143 .map(|a| a.to_string())
144 .unwrap_or_default();
145
146 let hostname = info
147 .get_property_val_str("hostname")
148 .unwrap_or_default()
149 .to_string();
150 let version = info
151 .get_property_val_str("version")
152 .unwrap_or_default()
153 .to_string();
154
155 let fullname = info.get_fullname().to_string();
156 let mut instances = self.instances.write();
157 let entry = instances.entry(fullname).or_insert_with(|| LanInstance {
158 instance_id: peer_id.clone(),
159 hostname: hostname.clone(),
160 version: version.clone(),
161 address: address.clone(),
162 port: info.get_port(),
163 agents: Vec::new(),
164 first_seen: now,
165 last_seen: now,
166 });
167 entry.last_seen = now;
168 entry.hostname = hostname;
169 entry.version = version;
170 entry.address = address;
171 entry.port = info.get_port();
172 drop(instances);
173
174 tracing::info!(
175 peer_id = %peer_id,
176 address = %info.get_addresses().iter().next().map(|a| a.to_string()).unwrap_or_default(),
177 port = info.get_port(),
178 "LAN peer discovered"
179 );
180
181 self.broadcast_instances();
182 }
183 ServiceEvent::ServiceRemoved(_, fullname) => {
184 let removed = {
185 let mut instances = self.instances.write();
186 instances.remove(&fullname).is_some()
187 };
188 if removed {
189 tracing::info!(fullname = %fullname, "LAN peer removed");
190 self.broadcast_instances();
191 }
192 }
193 _ => {}
194 }
195 }
196
197 fn broadcast_instances(&self) {
198 let instances: Vec<LanInstance> = self.instances.read().values().cloned().collect();
199 self.event_bus.broadcast_event(&WSEventType {
200 eventtype: "laninstances".to_string(),
201 oref: String::new(),
202 data: Some(json!(instances)),
203 });
204 }
205
206 pub fn get_instances(&self) -> Vec<LanInstance> {
208 self.instances.read().values().cloned().collect()
209 }
210
211 #[allow(dead_code)]
213 pub fn peer_count(&self) -> usize {
214 self.instances.read().len()
215 }
216}
217
218impl Drop for LanDiscovery {
219 fn drop(&mut self) {
220 if let Err(e) = self.daemon.unregister(&self.service_fullname) {
222 tracing::warn!("mDNS unregister failed: {e}");
223 }
224 if let Err(e) = self.daemon.shutdown() {
225 tracing::warn!("mDNS daemon shutdown failed: {e}");
226 }
227 }
228}