1use std::collections::{HashMap, VecDeque};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc;
15
16const MAX_OFFLINE_QUEUE: usize = 1000;
18
19const MAX_READ_LIMIT: usize = 500;
21
22const MESSAGE_TTL_SECS: u64 = 3600;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum MessageType {
30 Send,
31 Inject,
32 Broadcast,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum Priority {
38 Normal,
39 High,
40 Urgent,
41}
42
43impl Default for Priority {
44 fn default() -> Self {
45 Priority::Normal
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct BusMessage {
51 pub id: String,
52 pub from: String,
53 pub to: String,
54 #[serde(rename = "type")]
55 pub msg_type: MessageType,
56 pub payload: String,
57 #[serde(default)]
58 pub priority: Priority,
59 pub timestamp: u64,
60}
61
62impl BusMessage {
63 pub fn new(from: &str, to: &str, msg_type: MessageType, payload: &str, priority: Priority) -> Self {
64 let now = SystemTime::now()
65 .duration_since(UNIX_EPOCH)
66 .unwrap_or_default()
67 .as_secs();
68 Self {
69 id: uuid::Uuid::new_v4().to_string(),
70 from: from.to_string(),
71 to: to.to_string(),
72 msg_type,
73 payload: payload.to_string(),
74 priority,
75 timestamp: now,
76 }
77 }
78
79 fn is_expired(&self) -> bool {
80 let now = SystemTime::now()
81 .duration_since(UNIX_EPOCH)
82 .unwrap_or_default()
83 .as_secs();
84 now.saturating_sub(self.timestamp) > MESSAGE_TTL_SECS
85 }
86}
87
88#[derive(Debug, Clone, Serialize)]
90pub struct AgentInfo {
91 pub id: String,
92 pub registered_at: u64,
93 pub last_seen: u64,
94 pub connection_type: String, }
96
97struct AgentConnection {
99 info: AgentInfo,
100 ws_sender: Option<mpsc::UnboundedSender<BusMessage>>,
103}
104
105pub struct MessageBus {
108 agents: Mutex<HashMap<String, AgentConnection>>,
109 offline_queues: Mutex<HashMap<String, VecDeque<BusMessage>>>,
110}
111
112impl MessageBus {
113 pub fn new() -> Self {
114 Self {
115 agents: Mutex::new(HashMap::new()),
116 offline_queues: Mutex::new(HashMap::new()),
117 }
118 }
119
120 pub fn register(&self, agent_id: &str, connection_type: &str) -> mpsc::UnboundedReceiver<BusMessage> {
123 let (tx, rx) = mpsc::unbounded_channel();
124 let now = SystemTime::now()
125 .duration_since(UNIX_EPOCH)
126 .unwrap_or_default()
127 .as_secs();
128
129 let conn = AgentConnection {
130 info: AgentInfo {
131 id: agent_id.to_string(),
132 registered_at: now,
133 last_seen: now,
134 connection_type: connection_type.to_string(),
135 },
136 ws_sender: Some(tx),
137 };
138
139 self.agents.lock().insert(agent_id.to_string(), conn);
140
141 tracing::info!("messagebus: agent '{}' registered ({})", agent_id, connection_type);
142
143 self.drain_offline_queue(agent_id);
145
146 rx
147 }
148
149 pub fn register_http(&self, agent_id: &str) {
153 let now = SystemTime::now()
154 .duration_since(UNIX_EPOCH)
155 .unwrap_or_default()
156 .as_secs();
157
158 let conn = AgentConnection {
159 info: AgentInfo {
160 id: agent_id.to_string(),
161 registered_at: now,
162 last_seen: now,
163 connection_type: "http".to_string(),
164 },
165 ws_sender: None,
166 };
167
168 self.agents.lock().insert(agent_id.to_string(), conn);
169
170 tracing::info!("messagebus: agent '{}' registered (http-polling)", agent_id);
171 }
172
173 pub fn unregister(&self, agent_id: &str) {
175 if self.agents.lock().remove(agent_id).is_some() {
176 tracing::info!("messagebus: agent '{}' unregistered", agent_id);
177 }
178 }
179
180 pub fn touch(&self, agent_id: &str) {
182 if let Some(conn) = self.agents.lock().get_mut(agent_id) {
183 conn.info.last_seen = SystemTime::now()
184 .duration_since(UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_secs();
187 }
188 }
189
190 pub fn send(&self, msg: BusMessage) -> Result<(), String> {
192 let target = msg.to.clone();
193
194 let sent = {
196 let agents = self.agents.lock();
197 if let Some(conn) = agents.get(&target) {
198 if let Some(ref tx) = conn.ws_sender {
199 tx.send(msg.clone()).is_ok()
200 } else {
201 false
202 }
203 } else {
204 false
205 }
206 };
207
208 if sent {
209 return Ok(());
210 }
211
212 self.queue_offline(msg);
214 Ok(())
215 }
216
217 pub fn inject(&self, from: &str, target: &str, message: &str, priority: Priority) -> Result<String, String> {
220 let msg = BusMessage::new(from, target, MessageType::Inject, message, priority);
221 let msg_id = msg.id.clone();
222 self.send(msg)?;
223 Ok(msg_id)
224 }
225
226 pub fn broadcast(&self, from: &str, payload: &str, priority: Priority) -> Result<usize, String> {
228 let targets: Vec<(String, mpsc::UnboundedSender<BusMessage>)> = {
230 let agents = self.agents.lock();
231 agents
232 .iter()
233 .filter(|(id, _)| id.as_str() != from)
234 .filter_map(|(id, conn)| {
235 conn.ws_sender.as_ref().map(|tx| (id.clone(), tx.clone()))
236 })
237 .collect()
238 };
239
240 let mut delivered = 0;
241 for (agent_id, tx) in targets {
242 let msg = BusMessage::new(from, &agent_id, MessageType::Broadcast, payload, priority.clone());
243 if tx.send(msg).is_ok() {
244 delivered += 1;
245 }
246 }
247
248 Ok(delivered)
249 }
250
251 pub fn list_agents(&self) -> Vec<AgentInfo> {
253 self.agents.lock().values().map(|c| c.info.clone()).collect()
254 }
255
256 pub fn read_messages(&self, agent_id: &str, limit: usize) -> Vec<BusMessage> {
259 self.touch(agent_id);
260 let limit = limit.min(MAX_READ_LIMIT);
261 let mut queues = self.offline_queues.lock();
262 let queue = match queues.get_mut(agent_id) {
263 Some(q) => q,
264 None => return Vec::new(),
265 };
266
267 queue.retain(|m| !m.is_expired());
269
270 let count = limit.min(queue.len());
271 queue.drain(..count).collect()
272 }
273
274 pub fn delete_messages(&self, agent_id: &str, message_ids: &[String]) -> usize {
276 let mut queues = self.offline_queues.lock();
277 let queue = match queues.get_mut(agent_id) {
278 Some(q) => q,
279 None => return 0,
280 };
281
282 let before = queue.len();
283 queue.retain(|m| !message_ids.contains(&m.id));
284 before - queue.len()
285 }
286
287 fn queue_offline(&self, msg: BusMessage) {
290 let target = msg.to.clone();
291 let mut queues = self.offline_queues.lock();
292 let queue = queues.entry(target).or_insert_with(VecDeque::new);
293
294 if queue.len() >= MAX_OFFLINE_QUEUE {
296 queue.pop_front();
297 }
298 queue.push_back(msg);
299 }
300
301 fn drain_offline_queue(&self, agent_id: &str) {
302 let messages: Vec<BusMessage> = {
304 let mut queues = self.offline_queues.lock();
305 match queues.get_mut(agent_id) {
306 Some(queue) => {
307 queue.retain(|m| !m.is_expired());
308 queue.drain(..).collect()
309 }
310 None => return,
311 }
312 };
313
314 if messages.is_empty() {
315 return;
316 }
317
318 let tx = {
320 let agents = self.agents.lock();
321 agents
322 .get(agent_id)
323 .and_then(|conn| conn.ws_sender.clone())
324 };
325
326 if let Some(tx) = tx {
327 let count = messages.len();
328 for msg in messages {
329 let _ = tx.send(msg);
330 }
331 tracing::info!("messagebus: drained {} offline messages to '{}'", count, agent_id);
332 }
333 }
334}