agentmux_srv\backend/
lan_discovery.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! LAN instance discovery via mDNS/DNS-SD.
5//!
6//! Each AgentMux backend advertises itself as `_agentmux._tcp.local.` and
7//! continuously browses for peers. Discovered instances are tracked in memory
8//! and broadcast to frontend clients via EventBus.
9
10use std::collections::HashMap;
11use std::net::IpAddr;
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19
20use super::eventbus::{EventBus, WSEventType};
21
22const SERVICE_TYPE: &str = "_agentmux._tcp.local.";
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct LanInstance {
26    pub instance_id: String,
27    pub hostname: String,
28    pub version: String,
29    pub address: String,
30    pub port: u16,
31    pub agents: Vec<String>,
32    pub first_seen: u64,
33    pub last_seen: u64,
34}
35
36pub struct LanDiscovery {
37    daemon: ServiceDaemon,
38    instances: Arc<RwLock<HashMap<String, LanInstance>>>,
39    instance_id: String,
40    event_bus: Arc<EventBus>,
41    service_fullname: String,
42}
43
44impl LanDiscovery {
45    /// Start LAN discovery: register this instance and browse for peers.
46    pub fn start(
47        instance_id: String,
48        hostname: String,
49        version: String,
50        port: u16,
51        event_bus: Arc<EventBus>,
52    ) -> Result<Arc<Self>, String> {
53        let daemon = ServiceDaemon::new().map_err(|e| format!("mDNS daemon failed: {e}"))?;
54
55        // Register this instance
56        let service_name = format!("agentmux-{}", &instance_id);
57        let properties = [
58            ("version", version.as_str()),
59            ("hostname", hostname.as_str()),
60            ("instance_id", instance_id.as_str()),
61        ];
62        let service_info = ServiceInfo::new(
63            SERVICE_TYPE,
64            &service_name,
65            &hostname,
66            "",  // empty = auto-detect IP
67            port,
68            &properties[..],
69        )
70        .map_err(|e| format!("ServiceInfo creation failed: {e}"))?;
71
72        let service_fullname = service_info.get_fullname().to_string();
73
74        daemon
75            .register(service_info)
76            .map_err(|e| format!("mDNS register failed: {e}"))?;
77
78        // Browse for peers — keep the receiver for the event loop
79        let browse_receiver = daemon
80            .browse(SERVICE_TYPE)
81            .map_err(|e| format!("mDNS browse failed: {e}"))?;
82
83        let instances = Arc::new(RwLock::new(HashMap::new()));
84
85        let discovery = Arc::new(Self {
86            daemon,
87            instances: instances.clone(),
88            instance_id: instance_id.clone(),
89            event_bus: event_bus.clone(),
90            service_fullname,
91        });
92
93        // Spawn event receiver on a blocking thread to avoid starving the tokio runtime
94        let disc = discovery.clone();
95        tokio::task::spawn_blocking(move || {
96            disc.event_loop(browse_receiver);
97        });
98
99        tracing::info!(
100            instance_id = %instance_id,
101            port = port,
102            "LAN discovery started (mDNS)"
103        );
104
105        Ok(discovery)
106    }
107
108    fn event_loop(&self, receiver: mdns_sd::Receiver<ServiceEvent>) {
109        loop {
110            match receiver.recv() {
111                Ok(event) => self.handle_event(event),
112                Err(_) => {
113                    tracing::warn!("mDNS event receiver closed");
114                    break;
115                }
116            }
117        }
118    }
119
120    fn handle_event(&self, event: ServiceEvent) {
121        match event {
122            ServiceEvent::ServiceResolved(info) => {
123                let peer_id = info
124                    .get_property_val_str("instance_id")
125                    .unwrap_or_default()
126                    .to_string();
127
128                // Skip self
129                if peer_id == self.instance_id {
130                    return;
131                }
132
133                let now = SystemTime::now()
134                    .duration_since(UNIX_EPOCH)
135                    .unwrap_or_default()
136                    .as_secs();
137
138                let address = info
139                    .get_addresses()
140                    .iter()
141                    .find(|a| matches!(a, IpAddr::V4(_)))
142                    .or_else(|| info.get_addresses().iter().next())
143                    .map(|a| a.to_string())
144                    .unwrap_or_default();
145
146                let hostname = info
147                    .get_property_val_str("hostname")
148                    .unwrap_or_default()
149                    .to_string();
150                let version = info
151                    .get_property_val_str("version")
152                    .unwrap_or_default()
153                    .to_string();
154
155                let fullname = info.get_fullname().to_string();
156                let mut instances = self.instances.write();
157                let entry = instances.entry(fullname).or_insert_with(|| LanInstance {
158                    instance_id: peer_id.clone(),
159                    hostname: hostname.clone(),
160                    version: version.clone(),
161                    address: address.clone(),
162                    port: info.get_port(),
163                    agents: Vec::new(),
164                    first_seen: now,
165                    last_seen: now,
166                });
167                entry.last_seen = now;
168                entry.hostname = hostname;
169                entry.version = version;
170                entry.address = address;
171                entry.port = info.get_port();
172                drop(instances);
173
174                tracing::info!(
175                    peer_id = %peer_id,
176                    address = %info.get_addresses().iter().next().map(|a| a.to_string()).unwrap_or_default(),
177                    port = info.get_port(),
178                    "LAN peer discovered"
179                );
180
181                self.broadcast_instances();
182            }
183            ServiceEvent::ServiceRemoved(_, fullname) => {
184                let removed = {
185                    let mut instances = self.instances.write();
186                    instances.remove(&fullname).is_some()
187                };
188                if removed {
189                    tracing::info!(fullname = %fullname, "LAN peer removed");
190                    self.broadcast_instances();
191                }
192            }
193            _ => {}
194        }
195    }
196
197    fn broadcast_instances(&self) {
198        let instances: Vec<LanInstance> = self.instances.read().values().cloned().collect();
199        self.event_bus.broadcast_event(&WSEventType {
200            eventtype: "laninstances".to_string(),
201            oref: String::new(),
202            data: Some(json!(instances)),
203        });
204    }
205
206    /// Get current list of discovered LAN peers (excludes self).
207    pub fn get_instances(&self) -> Vec<LanInstance> {
208        self.instances.read().values().cloned().collect()
209    }
210
211    /// Get peer count (excludes self).
212    #[allow(dead_code)]
213    pub fn peer_count(&self) -> usize {
214        self.instances.read().len()
215    }
216}
217
218impl Drop for LanDiscovery {
219    fn drop(&mut self) {
220        // Gracefully unregister from mDNS
221        if let Err(e) = self.daemon.unregister(&self.service_fullname) {
222            tracing::warn!("mDNS unregister failed: {e}");
223        }
224        if let Err(e) = self.daemon.shutdown() {
225            tracing::warn!("mDNS daemon shutdown failed: {e}");
226        }
227    }
228}