agentmux_srv\backend/
messagebus.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Local MessageBus for inter-agent communication.
5//!
6//! Provides agent registration, point-to-point messaging, terminal injection,
7//! and broadcast — all over localhost with no cloud dependency.
8
9use std::collections::{HashMap, VecDeque};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc;
15
16/// Maximum messages queued per offline agent.
17const MAX_OFFLINE_QUEUE: usize = 1000;
18
19/// Maximum messages returned per read_messages call.
20const MAX_READ_LIMIT: usize = 500;
21
22/// Message time-to-live in seconds (1 hour).
23const MESSAGE_TTL_SECS: u64 = 3600;
24
25// ---- Types ----
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum MessageType {
30    Send,
31    Inject,
32    Broadcast,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum Priority {
38    Normal,
39    High,
40    Urgent,
41}
42
43impl Default for Priority {
44    fn default() -> Self {
45        Priority::Normal
46    }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct BusMessage {
51    pub id: String,
52    pub from: String,
53    pub to: String,
54    #[serde(rename = "type")]
55    pub msg_type: MessageType,
56    pub payload: String,
57    #[serde(default)]
58    pub priority: Priority,
59    pub timestamp: u64,
60}
61
62impl BusMessage {
63    pub fn new(from: &str, to: &str, msg_type: MessageType, payload: &str, priority: Priority) -> Self {
64        let now = SystemTime::now()
65            .duration_since(UNIX_EPOCH)
66            .unwrap_or_default()
67            .as_secs();
68        Self {
69            id: uuid::Uuid::new_v4().to_string(),
70            from: from.to_string(),
71            to: to.to_string(),
72            msg_type,
73            payload: payload.to_string(),
74            priority,
75            timestamp: now,
76        }
77    }
78
79    fn is_expired(&self) -> bool {
80        let now = SystemTime::now()
81            .duration_since(UNIX_EPOCH)
82            .unwrap_or_default()
83            .as_secs();
84        now.saturating_sub(self.timestamp) > MESSAGE_TTL_SECS
85    }
86}
87
88/// Info about a connected agent.
89#[derive(Debug, Clone, Serialize)]
90pub struct AgentInfo {
91    pub id: String,
92    pub registered_at: u64,
93    pub last_seen: u64,
94    pub connection_type: String, // "websocket" or "http"
95}
96
97/// Internal agent connection state.
98struct AgentConnection {
99    info: AgentInfo,
100    /// Channel sender for pushing messages to this agent's WebSocket.
101    /// None for HTTP-polling agents (they read from the offline queue instead).
102    ws_sender: Option<mpsc::UnboundedSender<BusMessage>>,
103}
104
105// ---- MessageBus ----
106
107pub struct MessageBus {
108    agents: Mutex<HashMap<String, AgentConnection>>,
109    offline_queues: Mutex<HashMap<String, VecDeque<BusMessage>>>,
110}
111
112impl MessageBus {
113    pub fn new() -> Self {
114        Self {
115            agents: Mutex::new(HashMap::new()),
116            offline_queues: Mutex::new(HashMap::new()),
117        }
118    }
119
120    /// Register an agent on the bus with a WebSocket push channel.
121    /// Returns a receiver for messages pushed to this agent.
122    pub fn register(&self, agent_id: &str, connection_type: &str) -> mpsc::UnboundedReceiver<BusMessage> {
123        let (tx, rx) = mpsc::unbounded_channel();
124        let now = SystemTime::now()
125            .duration_since(UNIX_EPOCH)
126            .unwrap_or_default()
127            .as_secs();
128
129        let conn = AgentConnection {
130            info: AgentInfo {
131                id: agent_id.to_string(),
132                registered_at: now,
133                last_seen: now,
134                connection_type: connection_type.to_string(),
135            },
136            ws_sender: Some(tx),
137        };
138
139        self.agents.lock().insert(agent_id.to_string(), conn);
140
141        tracing::info!("messagebus: agent '{}' registered ({})", agent_id, connection_type);
142
143        // Drain any offline queued messages
144        self.drain_offline_queue(agent_id);
145
146        rx
147    }
148
149    /// Register an HTTP-polling agent (no WebSocket push channel).
150    /// Messages sent to this agent are queued in the offline queue and
151    /// retrieved via `read_messages`.
152    pub fn register_http(&self, agent_id: &str) {
153        let now = SystemTime::now()
154            .duration_since(UNIX_EPOCH)
155            .unwrap_or_default()
156            .as_secs();
157
158        let conn = AgentConnection {
159            info: AgentInfo {
160                id: agent_id.to_string(),
161                registered_at: now,
162                last_seen: now,
163                connection_type: "http".to_string(),
164            },
165            ws_sender: None,
166        };
167
168        self.agents.lock().insert(agent_id.to_string(), conn);
169
170        tracing::info!("messagebus: agent '{}' registered (http-polling)", agent_id);
171    }
172
173    /// Unregister an agent from the bus.
174    pub fn unregister(&self, agent_id: &str) {
175        if self.agents.lock().remove(agent_id).is_some() {
176            tracing::info!("messagebus: agent '{}' unregistered", agent_id);
177        }
178    }
179
180    /// Update last_seen timestamp for an agent (called on HTTP polling).
181    pub fn touch(&self, agent_id: &str) {
182        if let Some(conn) = self.agents.lock().get_mut(agent_id) {
183            conn.info.last_seen = SystemTime::now()
184                .duration_since(UNIX_EPOCH)
185                .unwrap_or_default()
186                .as_secs();
187        }
188    }
189
190    /// Send a message to a specific agent.
191    pub fn send(&self, msg: BusMessage) -> Result<(), String> {
192        let target = msg.to.clone();
193
194        // Try to push via WebSocket channel (short lock scope)
195        let sent = {
196            let agents = self.agents.lock();
197            if let Some(conn) = agents.get(&target) {
198                if let Some(ref tx) = conn.ws_sender {
199                    tx.send(msg.clone()).is_ok()
200                } else {
201                    false
202                }
203            } else {
204                false
205            }
206        };
207
208        if sent {
209            return Ok(());
210        }
211
212        // Agent not connected or send failed — queue for later
213        self.queue_offline(msg);
214        Ok(())
215    }
216
217    /// Inject a message into an agent's terminal (jekt).
218    /// This is the same as send but with MessageType::Inject.
219    pub fn inject(&self, from: &str, target: &str, message: &str, priority: Priority) -> Result<String, String> {
220        let msg = BusMessage::new(from, target, MessageType::Inject, message, priority);
221        let msg_id = msg.id.clone();
222        self.send(msg)?;
223        Ok(msg_id)
224    }
225
226    /// Broadcast a message to all connected agents (except sender).
227    pub fn broadcast(&self, from: &str, payload: &str, priority: Priority) -> Result<usize, String> {
228        // Collect senders under short lock
229        let targets: Vec<(String, mpsc::UnboundedSender<BusMessage>)> = {
230            let agents = self.agents.lock();
231            agents
232                .iter()
233                .filter(|(id, _)| id.as_str() != from)
234                .filter_map(|(id, conn)| {
235                    conn.ws_sender.as_ref().map(|tx| (id.clone(), tx.clone()))
236                })
237                .collect()
238        };
239
240        let mut delivered = 0;
241        for (agent_id, tx) in targets {
242            let msg = BusMessage::new(from, &agent_id, MessageType::Broadcast, payload, priority.clone());
243            if tx.send(msg).is_ok() {
244                delivered += 1;
245            }
246        }
247
248        Ok(delivered)
249    }
250
251    /// List all registered agents.
252    pub fn list_agents(&self) -> Vec<AgentInfo> {
253        self.agents.lock().values().map(|c| c.info.clone()).collect()
254    }
255
256    /// Read (and drain) queued offline messages for an agent.
257    /// Used by HTTP-polling agents that don't have a WebSocket connection.
258    pub fn read_messages(&self, agent_id: &str, limit: usize) -> Vec<BusMessage> {
259        self.touch(agent_id);
260        let limit = limit.min(MAX_READ_LIMIT);
261        let mut queues = self.offline_queues.lock();
262        let queue = match queues.get_mut(agent_id) {
263            Some(q) => q,
264            None => return Vec::new(),
265        };
266
267        // Purge expired messages
268        queue.retain(|m| !m.is_expired());
269
270        let count = limit.min(queue.len());
271        queue.drain(..count).collect()
272    }
273
274    /// Delete specific messages by ID from an agent's offline queue.
275    pub fn delete_messages(&self, agent_id: &str, message_ids: &[String]) -> usize {
276        let mut queues = self.offline_queues.lock();
277        let queue = match queues.get_mut(agent_id) {
278            Some(q) => q,
279            None => return 0,
280        };
281
282        let before = queue.len();
283        queue.retain(|m| !message_ids.contains(&m.id));
284        before - queue.len()
285    }
286
287    // ---- Internal ----
288
289    fn queue_offline(&self, msg: BusMessage) {
290        let target = msg.to.clone();
291        let mut queues = self.offline_queues.lock();
292        let queue = queues.entry(target).or_insert_with(VecDeque::new);
293
294        // Evict oldest if at capacity
295        if queue.len() >= MAX_OFFLINE_QUEUE {
296            queue.pop_front();
297        }
298        queue.push_back(msg);
299    }
300
301    fn drain_offline_queue(&self, agent_id: &str) {
302        // Collect messages and the sender under separate lock scopes
303        let messages: Vec<BusMessage> = {
304            let mut queues = self.offline_queues.lock();
305            match queues.get_mut(agent_id) {
306                Some(queue) => {
307                    queue.retain(|m| !m.is_expired());
308                    queue.drain(..).collect()
309                }
310                None => return,
311            }
312        };
313
314        if messages.is_empty() {
315            return;
316        }
317
318        // Get the sender clone, then drop the lock before sending
319        let tx = {
320            let agents = self.agents.lock();
321            agents
322                .get(agent_id)
323                .and_then(|conn| conn.ws_sender.clone())
324        };
325
326        if let Some(tx) = tx {
327            let count = messages.len();
328            for msg in messages {
329                let _ = tx.send(msg);
330            }
331            tracing::info!("messagebus: drained {} offline messages to '{}'", count, agent_id);
332        }
333    }
334}