agentmux_srv\backend/
eventbus.rs1use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use serde::{Deserialize, Serialize};
12
13use super::wps::{WaveEvent, WpsClient};
14
15#[allow(dead_code)]
18pub const WS_EVENT_TAURI_NEW_WINDOW: &str = "electron:newwindow";
19#[allow(dead_code)]
20pub const WS_EVENT_TAURI_CLOSE_WINDOW: &str = "electron:closewindow";
21#[allow(dead_code)]
22pub const WS_EVENT_TAURI_UPDATE_ACTIVE_TAB: &str = "electron:updateactivetab";
23pub const WS_EVENT_RPC: &str = "rpc";
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct WSEventType {
29 pub eventtype: String,
30 #[serde(skip_serializing_if = "String::is_empty", default)]
31 pub oref: String,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub data: Option<serde_json::Value>,
34}
35
36struct WindowWatchData {
37 sender: tokio::sync::mpsc::UnboundedSender<serde_json::Value>,
38 #[allow(dead_code)]
39 tab_id: String,
40}
41
42pub struct EventBus {
44 watches: Mutex<HashMap<String, WindowWatchData>>,
45}
46
47impl EventBus {
48 pub fn new() -> Self {
49 Self {
50 watches: Mutex::new(HashMap::new()),
51 }
52 }
53
54 pub fn register_ws(
57 &self,
58 conn_id: &str,
59 tab_id: &str,
60 ) -> tokio::sync::mpsc::UnboundedReceiver<serde_json::Value> {
61 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
62 let mut watches = self.watches.lock().unwrap();
63 watches.insert(
64 conn_id.to_string(),
65 WindowWatchData {
66 sender: tx,
67 tab_id: tab_id.to_string(),
68 },
69 );
70 rx
71 }
72
73 pub fn unregister_ws(&self, conn_id: &str) {
75 let mut watches = self.watches.lock().unwrap();
76 watches.remove(conn_id);
77 }
78
79 #[allow(dead_code)]
81 pub fn has_connections_for(&self, tab_id: &str) -> bool {
82 let watches = self.watches.lock().unwrap();
83 watches.values().any(|w| w.tab_id == tab_id)
84 }
85
86 #[allow(dead_code)]
88 pub async fn wait_for_connection(
89 &self,
90 tab_id: &str,
91 timeout: std::time::Duration,
92 ) -> bool {
93 let deadline = tokio::time::Instant::now() + timeout;
94 loop {
95 if self.has_connections_for(tab_id) {
96 return true;
97 }
98 if tokio::time::Instant::now() >= deadline {
99 return false;
100 }
101 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
102 }
103 }
104
105 pub fn broadcast_event(&self, event: &WSEventType) {
107 let data = match serde_json::to_value(event) {
108 Ok(v) => v,
109 Err(e) => {
110 tracing::error!("cannot marshal event: {}", e);
111 return;
112 }
113 };
114 let watches = self.watches.lock().unwrap();
115 for (conn_id, watch) in watches.iter() {
116 if watch.sender.send(data.clone()).is_err() {
117 tracing::warn!("failed to send event to conn {}", conn_id);
118 }
119 }
120 }
121
122 #[allow(dead_code)]
124 pub fn send_to_tab(&self, tab_id: &str, event: &WSEventType) {
125 let data = match serde_json::to_value(event) {
126 Ok(v) => v,
127 Err(e) => {
128 tracing::error!("cannot marshal event: {}", e);
129 return;
130 }
131 };
132 let watches = self.watches.lock().unwrap();
133 for watch in watches.values() {
134 if watch.tab_id == tab_id {
135 let _ = watch.sender.send(data.clone());
136 }
137 }
138 }
139
140 pub fn connection_count(&self) -> usize {
142 self.watches.lock().unwrap().len()
143 }
144}
145
146impl Default for EventBus {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152pub struct EventBusBridge {
155 event_bus: Arc<EventBus>,
156}
157
158impl EventBusBridge {
159 pub fn new(event_bus: Arc<EventBus>) -> Self {
160 Self { event_bus }
161 }
162}
163
164impl WpsClient for EventBusBridge {
165 fn send_event(&self, _route_id: &str, event: WaveEvent) {
166 let ws_event = WSEventType {
168 eventtype: WS_EVENT_RPC.to_string(),
169 oref: String::new(),
170 data: Some(serde_json::json!({
171 "command": "eventrecv",
172 "data": event
173 })),
174 };
175 self.event_bus.broadcast_event(&ws_event);
176 }
177}
178
179#[cfg(test)]
184mod tests {
185 use super::*;
186
187 #[test]
188 fn test_register_unregister() {
189 let bus = EventBus::new();
190 let _rx = bus.register_ws("conn-1", "tab-1");
191 assert_eq!(bus.connection_count(), 1);
192 assert!(bus.has_connections_for("tab-1"));
193 assert!(!bus.has_connections_for("tab-2"));
194
195 bus.unregister_ws("conn-1");
196 assert_eq!(bus.connection_count(), 0);
197 assert!(!bus.has_connections_for("tab-1"));
198 }
199
200 #[test]
201 fn test_broadcast_event() {
202 let bus = EventBus::new();
203 let mut rx1 = bus.register_ws("conn-1", "tab-1");
204 let mut rx2 = bus.register_ws("conn-2", "tab-2");
205
206 let event = WSEventType {
207 eventtype: WS_EVENT_RPC.to_string(),
208 oref: String::new(),
209 data: Some(serde_json::json!({"test": true})),
210 };
211 bus.broadcast_event(&event);
212
213 assert!(rx1.try_recv().is_ok());
214 assert!(rx2.try_recv().is_ok());
215 }
216
217 #[test]
218 fn test_send_to_tab() {
219 let bus = EventBus::new();
220 let mut rx1 = bus.register_ws("conn-1", "tab-1");
221 let mut rx2 = bus.register_ws("conn-2", "tab-2");
222
223 let event = WSEventType {
224 eventtype: WS_EVENT_RPC.to_string(),
225 oref: String::new(),
226 data: None,
227 };
228 bus.send_to_tab("tab-1", &event);
229
230 assert!(rx1.try_recv().is_ok());
231 assert!(rx2.try_recv().is_err()); }
233
234 #[test]
235 fn test_ws_event_serialization() {
236 let event = WSEventType {
237 eventtype: "test".to_string(),
238 oref: String::new(),
239 data: Some(serde_json::json!(42)),
240 };
241 let json = serde_json::to_string(&event).unwrap();
242 assert!(json.contains("\"eventtype\":\"test\""));
243 assert!(!json.contains("\"oref\""));
245 }
246}