agentmux_srv\server/
reactive.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::{
5    extract::{Query, State},
6    http::StatusCode,
7    response::{IntoResponse, Json, Response},
8};
9use serde_json::json;
10
11use crate::backend::reactive::InjectionRequest;
12use crate::backend::reactive::registry as agent_registry;
13use crate::backend::subagent_watcher;
14use crate::backend::base;
15
16use super::AppState;
17
18pub(super) async fn handle_reactive_inject(
19    State(state): State<AppState>,
20    Json(req): Json<InjectionRequest>,
21) -> Json<serde_json::Value> {
22    tracing::info!(
23        target_agent = %req.target_agent,
24        source_agent = ?req.source_agent,
25        msg_len = req.message.len(),
26        "reactive inject request received"
27    );
28
29    // 1. Try local ReactiveHandler first (fast path — same instance).
30    let resp = state.reactive_handler.inject_message(req.clone());
31    if resp.success {
32        return Json(serde_json::to_value(&resp).unwrap_or_default());
33    }
34
35    // 2. On "agent not found", check cross-instance file registry and forward.
36    let is_not_found = resp
37        .error
38        .as_deref()
39        .map(|e| e.starts_with("agent not found"))
40        .unwrap_or(false);
41
42    if is_not_found {
43        let data_dir = base::get_wave_data_dir();
44        if let Some(entry) = agent_registry::lookup(&data_dir, &req.target_agent) {
45            // Guard against self-forwarding loops.
46            if entry.local_url != state.local_web_url {
47                let forward_url = format!("{}/agentmux/reactive/inject", entry.local_url);
48                tracing::debug!(
49                    target = %req.target_agent,
50                    url = %forward_url,
51                    "cross-instance inject forward"
52                );
53                // Reactive routes now require auth (audit C1/C2 fix).
54                // Peers authenticate using the entry's `auth_key` (the
55                // owning instance's per-launch UUID, written into the
56                // registry by `registry::write`). If a pre-fix entry has
57                // no auth_key, the peer returns 401 and the cloud
58                // agentbus fallback handles it.
59                let mut fwd = state.http_client.post(&forward_url).json(&req);
60                if !entry.auth_key.is_empty() {
61                    fwd = fwd.header("X-AuthKey", &entry.auth_key);
62                }
63                match fwd.send().await {
64                    Ok(r) if r.status().is_success() => {
65                        if let Ok(body) = r.json::<serde_json::Value>().await {
66                            return Json(body);
67                        }
68                    }
69                    Ok(r) => {
70                        tracing::warn!(
71                            target = %req.target_agent,
72                            status = %r.status(),
73                            url = %forward_url,
74                            "cross-instance forward: non-success status"
75                        );
76                    }
77                    Err(e) => {
78                        tracing::warn!(
79                            target = %req.target_agent,
80                            error = %e,
81                            url = %forward_url,
82                            "cross-instance forward failed — removing stale registry entry"
83                        );
84                        // Remove stale entry so next call doesn't retry a dead instance.
85                        agent_registry::remove(&data_dir, &req.target_agent);
86                    }
87                }
88            }
89        }
90    }
91
92    // 3. Return original error (agentbus-client will fall back to cloud).
93    Json(serde_json::to_value(&resp).unwrap_or_default())
94}
95
96pub(super) async fn handle_reactive_agents(
97    State(state): State<AppState>,
98) -> Json<serde_json::Value> {
99    let agents = state.reactive_handler.list_agents();
100    Json(serde_json::to_value(&agents).unwrap_or(json!([])))
101}
102
103#[derive(serde::Deserialize)]
104pub(super) struct AgentQuery {
105    id: Option<String>,
106}
107
108pub(super) async fn handle_reactive_agent(
109    State(state): State<AppState>,
110    Query(params): Query<AgentQuery>,
111) -> Response {
112    let id = match &params.id {
113        Some(id) if !id.is_empty() => id.as_str(),
114        _ => {
115            return (
116                StatusCode::BAD_REQUEST,
117                Json(json!({"error": "missing id param"})),
118            )
119                .into_response()
120        }
121    };
122    match state.reactive_handler.get_agent(id) {
123        Some(agent) => Json(serde_json::to_value(&agent).unwrap_or_default()).into_response(),
124        None => (
125            StatusCode::NOT_FOUND,
126            Json(json!({"error": "agent not found"})),
127        )
128            .into_response(),
129    }
130}
131
132#[derive(serde::Deserialize)]
133pub(super) struct AuditQuery {
134    #[serde(default = "default_audit_limit")]
135    limit: usize,
136}
137fn default_audit_limit() -> usize {
138    100
139}
140
141pub(super) async fn handle_reactive_audit(
142    State(state): State<AppState>,
143    Query(params): Query<AuditQuery>,
144) -> Json<serde_json::Value> {
145    let log = state.reactive_handler.get_audit_log(params.limit);
146    Json(serde_json::to_value(&log).unwrap_or(json!([])))
147}
148
149#[derive(serde::Deserialize)]
150pub(super) struct RegisterRequest {
151    agent_id: String,
152    block_id: String,
153    tab_id: Option<String>,
154}
155
156pub(super) async fn handle_reactive_register(
157    State(state): State<AppState>,
158    Json(req): Json<RegisterRequest>,
159) -> Response {
160    tracing::info!(
161        agent_id = %req.agent_id,
162        block_id = %req.block_id,
163        "reactive register request"
164    );
165    match state
166        .reactive_handler
167        .register_agent(&req.agent_id, &req.block_id, req.tab_id.as_deref())
168    {
169        Ok(()) => {
170            // Also write to cross-instance file registry so other AgentMux
171            // instances can forward inject requests to this one.
172            let data_dir = base::get_wave_data_dir();
173            agent_registry::write(&data_dir, &req.agent_id, &state.local_web_url, &req.block_id);
174
175            // Auto-watch this agent's Claude Code config dir for subagent JSONL files.
176            if let Some(config_dir) = subagent_watcher::derive_claude_config_dir(&req.agent_id) {
177                state.subagent_watcher.watch_agent(&req.agent_id, config_dir);
178            }
179
180            Json(json!({"success": true})).into_response()
181        }
182        Err(e) => (
183            StatusCode::BAD_REQUEST,
184            Json(json!({"error": e})),
185        )
186            .into_response(),
187    }
188}
189
190#[derive(serde::Deserialize)]
191pub(super) struct UnregisterRequest {
192    agent_id: String,
193}
194
195pub(super) async fn handle_reactive_unregister(
196    State(state): State<AppState>,
197    Json(req): Json<UnregisterRequest>,
198) -> Json<serde_json::Value> {
199    state.reactive_handler.unregister_agent(&req.agent_id);
200    // Also remove from cross-instance file registry.
201    let data_dir = base::get_wave_data_dir();
202    agent_registry::remove(&data_dir, &req.agent_id);
203    Json(json!({"success": true}))
204}
205
206pub(super) async fn handle_reactive_poller_stats(
207    State(state): State<AppState>,
208) -> Json<serde_json::Value> {
209    let stats = state.poller.stats();
210    Json(serde_json::to_value(&stats).unwrap_or(json!({})))
211}
212
213#[derive(serde::Deserialize)]
214pub(super) struct PollerConfigRequest {
215    url: Option<String>,
216    token: Option<String>,
217}
218
219pub(super) async fn handle_reactive_poller_config(
220    State(state): State<AppState>,
221    Json(req): Json<PollerConfigRequest>,
222) -> Json<serde_json::Value> {
223    state.poller.reconfigure(req.url, req.token);
224    Json(json!({"success": true}))
225}
226
227pub(super) async fn handle_reactive_poller_status(
228    State(state): State<AppState>,
229) -> Json<serde_json::Value> {
230    let status = state.poller.status();
231    Json(serde_json::to_value(&status).unwrap_or(json!({})))
232}