agentmux_srv\server/
messagebus.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP endpoint handlers for the local MessageBus.
5//!
6//! Routes:
7//!   POST /api/bus/register      - Register an agent
8//!   POST /api/bus/send          - Send a message to an agent
9//!   POST /api/bus/inject        - Inject into an agent's terminal (jekt)
10//!   POST /api/bus/broadcast     - Broadcast to all agents
11//!   GET  /api/bus/messages      - Read queued messages (polling fallback)
12//!   GET  /api/bus/agents        - List connected agents
13//!   POST /api/bus/messages/delete - Delete messages by ID
14
15use axum::{
16    extract::{Query, State},
17    response::Json,
18};
19use serde::Deserialize;
20use serde_json::{json, Value};
21
22use crate::backend::messagebus::{BusMessage, MessageType, Priority};
23use crate::backend::reactive::InjectionRequest;
24use super::AppState;
25
26// ---- Request types ----
27
28#[derive(Deserialize)]
29pub(super) struct RegisterRequest {
30    agent_id: String,
31}
32
33#[derive(Deserialize)]
34pub(super) struct SendRequest {
35    from: String,
36    to: String,
37    payload: String,
38    #[serde(default)]
39    priority: Option<String>,
40}
41
42#[derive(Deserialize)]
43pub(super) struct InjectRequest {
44    from: String,
45    target: String,
46    message: String,
47    #[serde(default)]
48    priority: Option<String>,
49}
50
51#[derive(Deserialize)]
52pub(super) struct BroadcastRequest {
53    from: String,
54    payload: String,
55    #[serde(default)]
56    priority: Option<String>,
57}
58
59#[derive(Deserialize)]
60pub(super) struct ReadMessagesQuery {
61    agent_id: String,
62    #[serde(default = "default_limit")]
63    limit: usize,
64}
65
66fn default_limit() -> usize {
67    100
68}
69
70#[derive(Deserialize)]
71pub(super) struct DeleteMessagesRequest {
72    agent_id: String,
73    message_ids: Vec<String>,
74}
75
76// ---- Helpers ----
77
78fn parse_priority(s: &Option<String>) -> Priority {
79    match s.as_deref() {
80        Some("high") => Priority::High,
81        Some("urgent") => Priority::Urgent,
82        _ => Priority::Normal,
83    }
84}
85
86// ---- Handlers ----
87
88/// POST /api/bus/register
89pub(super) async fn handle_register(
90    State(state): State<AppState>,
91    Json(req): Json<RegisterRequest>,
92) -> Json<Value> {
93    // HTTP-registered agents use polling via /api/bus/messages.
94    // WebSocket agents get their push channel wired in the WS handler.
95    state.messagebus.register_http(&req.agent_id);
96    Json(json!({
97        "status": "registered",
98        "agent_id": req.agent_id,
99    }))
100}
101
102/// POST /api/bus/send
103pub(super) async fn handle_send(
104    State(state): State<AppState>,
105    Json(req): Json<SendRequest>,
106) -> Json<Value> {
107    let priority = parse_priority(&req.priority);
108    let msg = BusMessage::new(&req.from, &req.to, MessageType::Send, &req.payload, priority);
109    let msg_id = msg.id.clone();
110
111    match state.messagebus.send(msg) {
112        Ok(()) => Json(json!({
113            "status": "sent",
114            "message_id": msg_id,
115            "to": req.to,
116        })),
117        Err(e) => Json(json!({
118            "status": "error",
119            "error": e,
120        })),
121    }
122}
123
124/// POST /api/bus/inject
125///
126/// Tries ReactiveHandler first (direct PTY write via blockcontroller).
127/// Falls back to MessageBus WebSocket push if agent has no block_id registered.
128pub(super) async fn handle_inject(
129    State(state): State<AppState>,
130    Json(req): Json<InjectRequest>,
131) -> Json<Value> {
132    // Try direct PTY injection via ReactiveHandler (agent has registered block_id)
133    let reactive_req = InjectionRequest {
134        target_agent: req.target.clone(),
135        message: req.message.clone(),
136        source_agent: Some(req.from.clone()),
137        request_id: None,
138        priority: req.priority.clone(),
139        wait_for_idle: false,
140    };
141    let resp = state.reactive_handler.inject_message(reactive_req);
142    if resp.success {
143        return Json(json!({
144            "status": "injected",
145            "via": "pty",
146            "block_id": resp.block_id,
147            "target": req.target,
148        }));
149    }
150
151    // Agent not registered with a block_id — fall back to MessageBus WS push
152    // (only fall back on "agent not found", propagate other errors)
153    let is_not_found = resp.error.as_deref().map(|e| e.contains("not found")).unwrap_or(false);
154    if !is_not_found {
155        return Json(json!({
156            "status": "error",
157            "error": resp.error,
158        }));
159    }
160
161    let priority = parse_priority(&req.priority);
162    match state.messagebus.inject(&req.from, &req.target, &req.message, priority) {
163        Ok(msg_id) => Json(json!({
164            "status": "injected",
165            "via": "messagebus",
166            "message_id": msg_id,
167            "target": req.target,
168        })),
169        Err(e) => Json(json!({
170            "status": "error",
171            "error": e,
172        })),
173    }
174}
175
176/// POST /api/bus/broadcast
177pub(super) async fn handle_broadcast(
178    State(state): State<AppState>,
179    Json(req): Json<BroadcastRequest>,
180) -> Json<Value> {
181    let priority = parse_priority(&req.priority);
182
183    match state.messagebus.broadcast(&req.from, &req.payload, priority) {
184        Ok(delivered) => Json(json!({
185            "status": "broadcast",
186            "delivered": delivered,
187        })),
188        Err(e) => Json(json!({
189            "status": "error",
190            "error": e,
191        })),
192    }
193}
194
195/// GET /api/bus/messages?agent_id=...&limit=...
196pub(super) async fn handle_read_messages(
197    State(state): State<AppState>,
198    Query(query): Query<ReadMessagesQuery>,
199) -> Json<Value> {
200    let messages = state.messagebus.read_messages(&query.agent_id, query.limit);
201    Json(json!({
202        "agent_id": query.agent_id,
203        "messages": messages,
204        "count": messages.len(),
205    }))
206}
207
208/// GET /api/bus/agents
209pub(super) async fn handle_list_agents(
210    State(state): State<AppState>,
211) -> Json<Value> {
212    let agents = state.messagebus.list_agents();
213    Json(json!({
214        "agents": agents,
215        "total_count": agents.len(),
216    }))
217}
218
219/// POST /api/bus/messages/delete
220pub(super) async fn handle_delete_messages(
221    State(state): State<AppState>,
222    Json(req): Json<DeleteMessagesRequest>,
223) -> Json<Value> {
224    let deleted = state.messagebus.delete_messages(&req.agent_id, &req.message_ids);
225    Json(json!({
226        "status": "deleted",
227        "deleted": deleted,
228    }))
229}