agentmux_srv\backend/
eventbus.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Event bus: WebSocket event dispatching to connected clients.
5//! Port of Go's pkg/eventbus/eventbus.go.
6
7
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use serde::{Deserialize, Serialize};
12
13use super::wps::{WaveEvent, WpsClient};
14
15// ---- Event type constants ----
16
17#[allow(dead_code)]
18pub const WS_EVENT_TAURI_NEW_WINDOW: &str = "electron:newwindow";
19#[allow(dead_code)]
20pub const WS_EVENT_TAURI_CLOSE_WINDOW: &str = "electron:closewindow";
21#[allow(dead_code)]
22pub const WS_EVENT_TAURI_UPDATE_ACTIVE_TAB: &str = "electron:updateactivetab";
23pub const WS_EVENT_RPC: &str = "rpc";
24
25// ---- Types ----
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct WSEventType {
29    pub eventtype: String,
30    #[serde(skip_serializing_if = "String::is_empty", default)]
31    pub oref: String,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub data: Option<serde_json::Value>,
34}
35
36struct WindowWatchData {
37    sender: tokio::sync::mpsc::UnboundedSender<serde_json::Value>,
38    #[allow(dead_code)]
39    tab_id: String,
40}
41
42/// Global event bus for dispatching WebSocket events to connected clients.
43pub struct EventBus {
44    watches: Mutex<HashMap<String, WindowWatchData>>,
45}
46
47impl EventBus {
48    pub fn new() -> Self {
49        Self {
50            watches: Mutex::new(HashMap::new()),
51        }
52    }
53
54    /// Register a WebSocket connection for receiving events.
55    /// Returns a receiver channel for the connection.
56    pub fn register_ws(
57        &self,
58        conn_id: &str,
59        tab_id: &str,
60    ) -> tokio::sync::mpsc::UnboundedReceiver<serde_json::Value> {
61        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
62        let mut watches = self.watches.lock().unwrap();
63        watches.insert(
64            conn_id.to_string(),
65            WindowWatchData {
66                sender: tx,
67                tab_id: tab_id.to_string(),
68            },
69        );
70        rx
71    }
72
73    /// Unregister a WebSocket connection.
74    pub fn unregister_ws(&self, conn_id: &str) {
75        let mut watches = self.watches.lock().unwrap();
76        watches.remove(conn_id);
77    }
78
79    /// Check if any connections exist for a given window/tab ID.
80    #[allow(dead_code)]
81    pub fn has_connections_for(&self, tab_id: &str) -> bool {
82        let watches = self.watches.lock().unwrap();
83        watches.values().any(|w| w.tab_id == tab_id)
84    }
85
86    /// Wait for a connection to appear for the given tab_id (with timeout).
87    #[allow(dead_code)]
88    pub async fn wait_for_connection(
89        &self,
90        tab_id: &str,
91        timeout: std::time::Duration,
92    ) -> bool {
93        let deadline = tokio::time::Instant::now() + timeout;
94        loop {
95            if self.has_connections_for(tab_id) {
96                return true;
97            }
98            if tokio::time::Instant::now() >= deadline {
99                return false;
100            }
101            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
102        }
103    }
104
105    /// Send an event to all connected WebSocket clients.
106    pub fn broadcast_event(&self, event: &WSEventType) {
107        let data = match serde_json::to_value(event) {
108            Ok(v) => v,
109            Err(e) => {
110                tracing::error!("cannot marshal event: {}", e);
111                return;
112            }
113        };
114        let watches = self.watches.lock().unwrap();
115        for (conn_id, watch) in watches.iter() {
116            if watch.sender.send(data.clone()).is_err() {
117                tracing::warn!("failed to send event to conn {}", conn_id);
118            }
119        }
120    }
121
122    /// Send an event to connections matching a specific tab_id.
123    #[allow(dead_code)]
124    pub fn send_to_tab(&self, tab_id: &str, event: &WSEventType) {
125        let data = match serde_json::to_value(event) {
126            Ok(v) => v,
127            Err(e) => {
128                tracing::error!("cannot marshal event: {}", e);
129                return;
130            }
131        };
132        let watches = self.watches.lock().unwrap();
133        for watch in watches.values() {
134            if watch.tab_id == tab_id {
135                let _ = watch.sender.send(data.clone());
136            }
137        }
138    }
139
140    /// Get the number of active connections.
141    pub fn connection_count(&self) -> usize {
142        self.watches.lock().unwrap().len()
143    }
144}
145
146impl Default for EventBus {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152/// Bridge from WPS Broker to EventBus.
153/// Wraps WaveEvents as RPC eventrecv messages and broadcasts them to all WS clients.
154pub struct EventBusBridge {
155    event_bus: Arc<EventBus>,
156}
157
158impl EventBusBridge {
159    pub fn new(event_bus: Arc<EventBus>) -> Self {
160        Self { event_bus }
161    }
162}
163
164impl WpsClient for EventBusBridge {
165    fn send_event(&self, _route_id: &str, event: WaveEvent) {
166        // Wrap as RPC eventrecv message (format expected by frontend)
167        let ws_event = WSEventType {
168            eventtype: WS_EVENT_RPC.to_string(),
169            oref: String::new(),
170            data: Some(serde_json::json!({
171                "command": "eventrecv",
172                "data": event
173            })),
174        };
175        self.event_bus.broadcast_event(&ws_event);
176    }
177}
178
179// ====================================================================
180// Tests
181// ====================================================================
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    #[test]
188    fn test_register_unregister() {
189        let bus = EventBus::new();
190        let _rx = bus.register_ws("conn-1", "tab-1");
191        assert_eq!(bus.connection_count(), 1);
192        assert!(bus.has_connections_for("tab-1"));
193        assert!(!bus.has_connections_for("tab-2"));
194
195        bus.unregister_ws("conn-1");
196        assert_eq!(bus.connection_count(), 0);
197        assert!(!bus.has_connections_for("tab-1"));
198    }
199
200    #[test]
201    fn test_broadcast_event() {
202        let bus = EventBus::new();
203        let mut rx1 = bus.register_ws("conn-1", "tab-1");
204        let mut rx2 = bus.register_ws("conn-2", "tab-2");
205
206        let event = WSEventType {
207            eventtype: WS_EVENT_RPC.to_string(),
208            oref: String::new(),
209            data: Some(serde_json::json!({"test": true})),
210        };
211        bus.broadcast_event(&event);
212
213        assert!(rx1.try_recv().is_ok());
214        assert!(rx2.try_recv().is_ok());
215    }
216
217    #[test]
218    fn test_send_to_tab() {
219        let bus = EventBus::new();
220        let mut rx1 = bus.register_ws("conn-1", "tab-1");
221        let mut rx2 = bus.register_ws("conn-2", "tab-2");
222
223        let event = WSEventType {
224            eventtype: WS_EVENT_RPC.to_string(),
225            oref: String::new(),
226            data: None,
227        };
228        bus.send_to_tab("tab-1", &event);
229
230        assert!(rx1.try_recv().is_ok());
231        assert!(rx2.try_recv().is_err()); // tab-2 should not receive
232    }
233
234    #[test]
235    fn test_ws_event_serialization() {
236        let event = WSEventType {
237            eventtype: "test".to_string(),
238            oref: String::new(),
239            data: Some(serde_json::json!(42)),
240        };
241        let json = serde_json::to_string(&event).unwrap();
242        assert!(json.contains("\"eventtype\":\"test\""));
243        // Empty oref should be omitted
244        assert!(!json.contains("\"oref\""));
245    }
246}