1use axum::{
16 extract::{Query, State},
17 response::Json,
18};
19use serde::Deserialize;
20use serde_json::{json, Value};
21
22use crate::backend::messagebus::{BusMessage, MessageType, Priority};
23use crate::backend::reactive::InjectionRequest;
24use super::AppState;
25
26#[derive(Deserialize)]
29pub(super) struct RegisterRequest {
30 agent_id: String,
31}
32
33#[derive(Deserialize)]
34pub(super) struct SendRequest {
35 from: String,
36 to: String,
37 payload: String,
38 #[serde(default)]
39 priority: Option<String>,
40}
41
42#[derive(Deserialize)]
43pub(super) struct InjectRequest {
44 from: String,
45 target: String,
46 message: String,
47 #[serde(default)]
48 priority: Option<String>,
49}
50
51#[derive(Deserialize)]
52pub(super) struct BroadcastRequest {
53 from: String,
54 payload: String,
55 #[serde(default)]
56 priority: Option<String>,
57}
58
59#[derive(Deserialize)]
60pub(super) struct ReadMessagesQuery {
61 agent_id: String,
62 #[serde(default = "default_limit")]
63 limit: usize,
64}
65
66fn default_limit() -> usize {
67 100
68}
69
70#[derive(Deserialize)]
71pub(super) struct DeleteMessagesRequest {
72 agent_id: String,
73 message_ids: Vec<String>,
74}
75
76fn parse_priority(s: &Option<String>) -> Priority {
79 match s.as_deref() {
80 Some("high") => Priority::High,
81 Some("urgent") => Priority::Urgent,
82 _ => Priority::Normal,
83 }
84}
85
86pub(super) async fn handle_register(
90 State(state): State<AppState>,
91 Json(req): Json<RegisterRequest>,
92) -> Json<Value> {
93 state.messagebus.register_http(&req.agent_id);
96 Json(json!({
97 "status": "registered",
98 "agent_id": req.agent_id,
99 }))
100}
101
102pub(super) async fn handle_send(
104 State(state): State<AppState>,
105 Json(req): Json<SendRequest>,
106) -> Json<Value> {
107 let priority = parse_priority(&req.priority);
108 let msg = BusMessage::new(&req.from, &req.to, MessageType::Send, &req.payload, priority);
109 let msg_id = msg.id.clone();
110
111 match state.messagebus.send(msg) {
112 Ok(()) => Json(json!({
113 "status": "sent",
114 "message_id": msg_id,
115 "to": req.to,
116 })),
117 Err(e) => Json(json!({
118 "status": "error",
119 "error": e,
120 })),
121 }
122}
123
124pub(super) async fn handle_inject(
129 State(state): State<AppState>,
130 Json(req): Json<InjectRequest>,
131) -> Json<Value> {
132 let reactive_req = InjectionRequest {
134 target_agent: req.target.clone(),
135 message: req.message.clone(),
136 source_agent: Some(req.from.clone()),
137 request_id: None,
138 priority: req.priority.clone(),
139 wait_for_idle: false,
140 };
141 let resp = state.reactive_handler.inject_message(reactive_req);
142 if resp.success {
143 return Json(json!({
144 "status": "injected",
145 "via": "pty",
146 "block_id": resp.block_id,
147 "target": req.target,
148 }));
149 }
150
151 let is_not_found = resp.error.as_deref().map(|e| e.contains("not found")).unwrap_or(false);
154 if !is_not_found {
155 return Json(json!({
156 "status": "error",
157 "error": resp.error,
158 }));
159 }
160
161 let priority = parse_priority(&req.priority);
162 match state.messagebus.inject(&req.from, &req.target, &req.message, priority) {
163 Ok(msg_id) => Json(json!({
164 "status": "injected",
165 "via": "messagebus",
166 "message_id": msg_id,
167 "target": req.target,
168 })),
169 Err(e) => Json(json!({
170 "status": "error",
171 "error": e,
172 })),
173 }
174}
175
176pub(super) async fn handle_broadcast(
178 State(state): State<AppState>,
179 Json(req): Json<BroadcastRequest>,
180) -> Json<Value> {
181 let priority = parse_priority(&req.priority);
182
183 match state.messagebus.broadcast(&req.from, &req.payload, priority) {
184 Ok(delivered) => Json(json!({
185 "status": "broadcast",
186 "delivered": delivered,
187 })),
188 Err(e) => Json(json!({
189 "status": "error",
190 "error": e,
191 })),
192 }
193}
194
195pub(super) async fn handle_read_messages(
197 State(state): State<AppState>,
198 Query(query): Query<ReadMessagesQuery>,
199) -> Json<Value> {
200 let messages = state.messagebus.read_messages(&query.agent_id, query.limit);
201 Json(json!({
202 "agent_id": query.agent_id,
203 "messages": messages,
204 "count": messages.len(),
205 }))
206}
207
208pub(super) async fn handle_list_agents(
210 State(state): State<AppState>,
211) -> Json<Value> {
212 let agents = state.messagebus.list_agents();
213 Json(json!({
214 "agents": agents,
215 "total_count": agents.len(),
216 }))
217}
218
219pub(super) async fn handle_delete_messages(
221 State(state): State<AppState>,
222 Json(req): Json<DeleteMessagesRequest>,
223) -> Json<Value> {
224 let deleted = state.messagebus.delete_messages(&req.agent_id, &req.message_ids);
225 Json(json!({
226 "status": "deleted",
227 "deleted": deleted,
228 }))
229}