agentmux_srv\backend\wshutil/
event.rs

1#![allow(dead_code)]
2// Copyright 2025-2026, AgentMux Corp.
3// SPDX-License-Identifier: Apache-2.0
4
5//! Event listener system for AgentMux RPC.
6//! Port of Go's `pkg/wshutil/wshevent.go`.
7//!
8//! Provides a pub/sub event system that converts WaveEvents into
9//! a listener-based API with register/unregister support.
10
11
12use std::collections::HashMap;
13use std::sync::Mutex;
14use uuid::Uuid;
15
16/// Callback type for event listeners.
17pub type EventCallback = Box<dyn Fn(&WaveEvent) + Send + Sync>;
18
19/// A wave event with a name and JSON payload.
20#[derive(Debug, Clone)]
21pub struct WaveEvent {
22    pub event: String,
23    pub scopes: Vec<String>,
24    pub data: Option<serde_json::Value>,
25}
26
27struct SingleListener {
28    id: String,
29    callback: EventCallback,
30}
31
32/// Thread-safe event listener with pub/sub support.
33pub struct EventListener {
34    listeners: Mutex<HashMap<String, Vec<SingleListener>>>,
35}
36
37impl EventListener {
38    pub fn new() -> Self {
39        Self {
40            listeners: Mutex::new(HashMap::new()),
41        }
42    }
43
44    /// Register a listener for an event. Returns a listener ID for unregistration.
45    pub fn on(&self, event_name: &str, callback: EventCallback) -> String {
46        let id = Uuid::new_v4().to_string();
47        let mut listeners = self.listeners.lock().unwrap();
48        let entry = listeners.entry(event_name.to_string()).or_default();
49        entry.push(SingleListener {
50            id: id.clone(),
51            callback,
52        });
53        id
54    }
55
56    /// Unregister a listener by event name and listener ID.
57    pub fn unregister(&self, event_name: &str, id: &str) {
58        let mut listeners = self.listeners.lock().unwrap();
59        if let Some(entry) = listeners.get_mut(event_name) {
60            entry.retain(|sl| sl.id != id);
61        }
62    }
63
64    /// Dispatch an event to all registered listeners.
65    pub fn recv_event(&self, event: &WaveEvent) {
66        let listeners = self.listeners.lock().unwrap();
67        if let Some(entry) = listeners.get(&event.event) {
68            for sl in entry {
69                (sl.callback)(event);
70            }
71        }
72    }
73
74    /// Get the count of listeners for a specific event.
75    pub fn listener_count(&self, event_name: &str) -> usize {
76        let listeners = self.listeners.lock().unwrap();
77        listeners.get(event_name).map_or(0, |v| v.len())
78    }
79}
80
81impl Default for EventListener {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use std::sync::Arc;
91    use std::sync::atomic::{AtomicUsize, Ordering};
92
93    #[test]
94    fn test_event_listener_on_and_recv() {
95        let el = EventListener::new();
96        let counter = Arc::new(AtomicUsize::new(0));
97        let counter_clone = counter.clone();
98
99        el.on("test_event", Box::new(move |_| {
100            counter_clone.fetch_add(1, Ordering::SeqCst);
101        }));
102
103        let event = WaveEvent {
104            event: "test_event".to_string(),
105            scopes: vec![],
106            data: None,
107        };
108        el.recv_event(&event);
109        assert_eq!(counter.load(Ordering::SeqCst), 1);
110
111        el.recv_event(&event);
112        assert_eq!(counter.load(Ordering::SeqCst), 2);
113    }
114
115    #[test]
116    fn test_event_listener_unregister() {
117        let el = EventListener::new();
118        let counter = Arc::new(AtomicUsize::new(0));
119        let counter_clone = counter.clone();
120
121        let id = el.on("test_event", Box::new(move |_| {
122            counter_clone.fetch_add(1, Ordering::SeqCst);
123        }));
124
125        assert_eq!(el.listener_count("test_event"), 1);
126        el.unregister("test_event", &id);
127        assert_eq!(el.listener_count("test_event"), 0);
128
129        let event = WaveEvent {
130            event: "test_event".to_string(),
131            scopes: vec![],
132            data: None,
133        };
134        el.recv_event(&event);
135        assert_eq!(counter.load(Ordering::SeqCst), 0);
136    }
137
138    #[test]
139    fn test_event_listener_no_listeners() {
140        let el = EventListener::new();
141        let event = WaveEvent {
142            event: "unknown".to_string(),
143            scopes: vec![],
144            data: None,
145        };
146        // Should not panic
147        el.recv_event(&event);
148    }
149}