agentmux_srv\server/
reactive.rs1use axum::{
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Json, Response},
8};
9use serde_json::json;
10
11use crate::backend::reactive::InjectionRequest;
12use crate::backend::reactive::registry as agent_registry;
13use crate::backend::subagent_watcher;
14use crate::backend::base;
15
16use super::AppState;
17
18pub(super) async fn handle_reactive_inject(
19 State(state): State<AppState>,
20 Json(req): Json<InjectionRequest>,
21) -> Json<serde_json::Value> {
22 tracing::info!(
23 target_agent = %req.target_agent,
24 source_agent = ?req.source_agent,
25 msg_len = req.message.len(),
26 "reactive inject request received"
27 );
28
29 let resp = state.reactive_handler.inject_message(req.clone());
31 if resp.success {
32 return Json(serde_json::to_value(&resp).unwrap_or_default());
33 }
34
35 let is_not_found = resp
37 .error
38 .as_deref()
39 .map(|e| e.starts_with("agent not found"))
40 .unwrap_or(false);
41
42 if is_not_found {
43 let data_dir = base::get_wave_data_dir();
44 if let Some(entry) = agent_registry::lookup(&data_dir, &req.target_agent) {
45 if entry.local_url != state.local_web_url {
47 let forward_url = format!("{}/agentmux/reactive/inject", entry.local_url);
48 tracing::debug!(
49 target = %req.target_agent,
50 url = %forward_url,
51 "cross-instance inject forward"
52 );
53 let mut fwd = state.http_client.post(&forward_url).json(&req);
60 if !entry.auth_key.is_empty() {
61 fwd = fwd.header("X-AuthKey", &entry.auth_key);
62 }
63 match fwd.send().await {
64 Ok(r) if r.status().is_success() => {
65 if let Ok(body) = r.json::<serde_json::Value>().await {
66 return Json(body);
67 }
68 }
69 Ok(r) => {
70 tracing::warn!(
71 target = %req.target_agent,
72 status = %r.status(),
73 url = %forward_url,
74 "cross-instance forward: non-success status"
75 );
76 }
77 Err(e) => {
78 tracing::warn!(
79 target = %req.target_agent,
80 error = %e,
81 url = %forward_url,
82 "cross-instance forward failed — removing stale registry entry"
83 );
84 agent_registry::remove(&data_dir, &req.target_agent);
86 }
87 }
88 }
89 }
90 }
91
92 Json(serde_json::to_value(&resp).unwrap_or_default())
94}
95
96pub(super) async fn handle_reactive_agents(
97 State(state): State<AppState>,
98) -> Json<serde_json::Value> {
99 let agents = state.reactive_handler.list_agents();
100 Json(serde_json::to_value(&agents).unwrap_or(json!([])))
101}
102
103#[derive(serde::Deserialize)]
104pub(super) struct AgentQuery {
105 id: Option<String>,
106}
107
108pub(super) async fn handle_reactive_agent(
109 State(state): State<AppState>,
110 Query(params): Query<AgentQuery>,
111) -> Response {
112 let id = match ¶ms.id {
113 Some(id) if !id.is_empty() => id.as_str(),
114 _ => {
115 return (
116 StatusCode::BAD_REQUEST,
117 Json(json!({"error": "missing id param"})),
118 )
119 .into_response()
120 }
121 };
122 match state.reactive_handler.get_agent(id) {
123 Some(agent) => Json(serde_json::to_value(&agent).unwrap_or_default()).into_response(),
124 None => (
125 StatusCode::NOT_FOUND,
126 Json(json!({"error": "agent not found"})),
127 )
128 .into_response(),
129 }
130}
131
132#[derive(serde::Deserialize)]
133pub(super) struct AuditQuery {
134 #[serde(default = "default_audit_limit")]
135 limit: usize,
136}
137fn default_audit_limit() -> usize {
138 100
139}
140
141pub(super) async fn handle_reactive_audit(
142 State(state): State<AppState>,
143 Query(params): Query<AuditQuery>,
144) -> Json<serde_json::Value> {
145 let log = state.reactive_handler.get_audit_log(params.limit);
146 Json(serde_json::to_value(&log).unwrap_or(json!([])))
147}
148
149#[derive(serde::Deserialize)]
150pub(super) struct RegisterRequest {
151 agent_id: String,
152 block_id: String,
153 tab_id: Option<String>,
154}
155
156pub(super) async fn handle_reactive_register(
157 State(state): State<AppState>,
158 Json(req): Json<RegisterRequest>,
159) -> Response {
160 tracing::info!(
161 agent_id = %req.agent_id,
162 block_id = %req.block_id,
163 "reactive register request"
164 );
165 match state
166 .reactive_handler
167 .register_agent(&req.agent_id, &req.block_id, req.tab_id.as_deref())
168 {
169 Ok(()) => {
170 let data_dir = base::get_wave_data_dir();
173 agent_registry::write(&data_dir, &req.agent_id, &state.local_web_url, &req.block_id);
174
175 if let Some(config_dir) = subagent_watcher::derive_claude_config_dir(&req.agent_id) {
177 state.subagent_watcher.watch_agent(&req.agent_id, config_dir);
178 }
179
180 Json(json!({"success": true})).into_response()
181 }
182 Err(e) => (
183 StatusCode::BAD_REQUEST,
184 Json(json!({"error": e})),
185 )
186 .into_response(),
187 }
188}
189
190#[derive(serde::Deserialize)]
191pub(super) struct UnregisterRequest {
192 agent_id: String,
193}
194
195pub(super) async fn handle_reactive_unregister(
196 State(state): State<AppState>,
197 Json(req): Json<UnregisterRequest>,
198) -> Json<serde_json::Value> {
199 state.reactive_handler.unregister_agent(&req.agent_id);
200 let data_dir = base::get_wave_data_dir();
202 agent_registry::remove(&data_dir, &req.agent_id);
203 Json(json!({"success": true}))
204}
205
206pub(super) async fn handle_reactive_poller_stats(
207 State(state): State<AppState>,
208) -> Json<serde_json::Value> {
209 let stats = state.poller.stats();
210 Json(serde_json::to_value(&stats).unwrap_or(json!({})))
211}
212
213#[derive(serde::Deserialize)]
214pub(super) struct PollerConfigRequest {
215 url: Option<String>,
216 token: Option<String>,
217}
218
219pub(super) async fn handle_reactive_poller_config(
220 State(state): State<AppState>,
221 Json(req): Json<PollerConfigRequest>,
222) -> Json<serde_json::Value> {
223 state.poller.reconfigure(req.url, req.token);
224 Json(json!({"success": true}))
225}
226
227pub(super) async fn handle_reactive_poller_status(
228 State(state): State<AppState>,
229) -> Json<serde_json::Value> {
230 let status = state.poller.status();
231 Json(serde_json::to_value(&status).unwrap_or(json!({})))
232}