agentmux_srv\server/mod.rs
1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod cli_handlers;
5mod files;
6mod app_api;
7mod agent_handlers;
8mod identity_handlers;
9pub mod install_handlers;
10mod messagebus;
11mod reactive;
12pub(crate) mod service;
13mod tool_handlers;
14pub(crate) mod wave_obj_bridge;
15mod websocket;
16mod drone_handlers;
17
18#[cfg(test)]
19pub(crate) mod tests;
20
21use std::sync::Arc;
22
23use axum::{
24 body::Body,
25 extract::{Request, State},
26 http::{header, Method, StatusCode},
27 middleware::{self, Next},
28 response::{IntoResponse, Json, Response},
29 routing::{get, post},
30 Router,
31};
32use serde_json::json;
33use tower_http::cors::{Any, CorsLayer};
34
35use crate::backend::eventbus::EventBus;
36use crate::backend::lan_discovery::LanDiscovery;
37use crate::backend::messagebus::MessageBus;
38use crate::backend::reactive::{Poller, ReactiveHandler};
39use crate::backend::storage::filestore::FileStore;
40use crate::backend::storage::wstore::WaveStore;
41use crate::backend::history::HistoryService;
42use crate::backend::subagent_watcher::SubagentWatcher;
43use crate::backend::wconfig;
44use crate::backend::wps::Broker;
45
46// ---- AppState ----
47
48#[derive(Clone)]
49pub struct AppState {
50 pub auth_key: String,
51 pub version: String,
52 pub app_path: String,
53 pub wstore: Arc<WaveStore>,
54 pub filestore: Arc<FileStore>,
55 pub event_bus: Arc<EventBus>,
56 pub broker: Arc<Broker>,
57 pub reactive_handler: &'static ReactiveHandler,
58 pub poller: Arc<Poller>,
59 pub config_watcher: Arc<wconfig::ConfigWatcher>,
60 pub messagebus: Arc<MessageBus>,
61 pub subagent_watcher: Arc<SubagentWatcher>,
62 pub history_service: Arc<HistoryService>,
63 /// Tracks every OS-level process each agent CLI has spawned, via
64 /// platform-specific mechanisms (Windows Job Objects, Linux cgroups,
65 /// macOS process groups). Surfaces the tree to the swarm pane and
66 /// provides kill-tree on pane close / host exit.
67 /// See `backend::process_tracker` + `agentmux-ai/AGENT_SPAWNED_PROCESSES_SPEC.md`.
68 pub process_tracker: Arc<crate::backend::process_tracker::registry::AgentProcessRegistry>,
69 pub lan_discovery: Option<Arc<LanDiscovery>>,
70 /// Local HTTP URL of this instance (e.g. "http://127.0.0.1:PORT").
71 /// Used for cross-instance inject forwarding and file registry entries.
72 pub local_web_url: String,
73 /// Shared HTTP client for cross-instance inject forwarding.
74 pub http_client: reqwest::Client,
75 /// Phase E.2c.2 — srv reducer's canonical state. Workspace HTTP/WS
76 /// RPC handlers route through the reducer (dispatch
77 /// `Command::Create/Delete/...Workspace` and read out of
78 /// `state.workspaces`); the persist subscriber mirrors emitted
79 /// events back to SQLite. Tab/Block RPC migrations land in
80 /// E.2c.3 / E.2c.4.
81 pub srv_state: std::sync::Arc<tokio::sync::Mutex<crate::state::State>>,
82 /// Phase E.2c.2 — broadcast bus for srv reducer events. RPC
83 /// handlers publish reducer-emitted events here so the persist
84 /// subscriber writes them back to SQLite. Pipe IPC server (when
85 /// bound) shares the same bus.
86 pub srv_events_tx: tokio::sync::broadcast::Sender<agentmux_common::ipc::Event>,
87 /// Phase E.5.5 — monotonic saga-id allocator. Each saga
88 /// (TearOffTab, TearOffBlock, RestoreTornOffTab, etc.) calls
89 /// `fetch_add` to claim a unique id; the id is stamped onto
90 /// `Event::SagaStarted/Completed/Failed` so subscribers can
91 /// correlate. Per-instance scope (no cross-process sharing — see
92 /// `docs/retro/saga-coordinator-location-analysis-2026-04-30.md`).
93 pub saga_id_alloc: std::sync::Arc<std::sync::atomic::AtomicU64>,
94 /// Saga durability — durable on-disk log of saga lifecycle.
95 /// Written by `SagaCtx::dispatch` / `compensate` (per-step) and
96 /// `emit_terminal` (per-saga) so a srv crash mid-saga leaves a
97 /// recoverable trail. PR 1 ships the log + instrumentation; PR 2
98 /// adds resume-on-startup + `--diag sagas`.
99 /// See `docs/specs/SPEC_SAGA_DURABILITY_2026-05-01.md`.
100 pub saga_log: std::sync::Arc<crate::sagas::log::SagaLog>,
101 /// Pre-launch OAuth session state — one entry per in-flight
102 /// "Connect with OAuth" attempt from the launch modal. See
103 /// `docs/specs/SPEC_PRE_LAUNCH_OAUTH_FLOW_2026_05_14.md`.
104 pub auth_session_manager: std::sync::Arc<crate::identity::auth_session::AuthSessionManager>,
105
106 /// In-flight `install.start` sessions. Frontend subscribes to
107 /// `install_chunk` WPS events scoped by session id; the registry
108 /// holds per-session cancel handles so `install.cancel` can abort
109 /// an install mid-flight.
110 /// See `SPEC_AGENT_INSTALL_STAGE_2026_05_17.md` §9.
111 pub install_sessions: std::sync::Arc<crate::server::install_handlers::InstallSessionRegistry>,
112}
113
114/// Build the Axum router with all routes, auth middleware, and CORS.
115pub fn build_router(state: AppState) -> Router {
116 // CORS: reflect only loopback origins.
117 //
118 // Before the 2026-05-11 security audit (C3) this allowed any origin
119 // (matching the historical Go pkg/web/web.go). That made every web
120 // page the user happened to have open a potential CSRF source —
121 // localhost is not a trust boundary on a developer machine.
122 //
123 // The legitimate cross-origin callers are:
124 // - The CEF frontend served from `http://127.0.0.1:<host-port>`
125 // - Vite dev server at `http://localhost:5173` (and similar)
126 //
127 // Both are loopback. The predicate accepts http://127.0.0.1:* and
128 // http://localhost:* (any port, http only; https is irrelevant for
129 // loopback). External origins are denied, which means a malicious
130 // page in the user's browser can't drive the sidecar even if it
131 // discovers the port.
132 use tower_http::cors::AllowOrigin;
133 let cors = CorsLayer::new()
134 .allow_origin(AllowOrigin::predicate(|origin, _req| {
135 let Ok(s) = origin.to_str() else { return false };
136 s.starts_with("http://127.0.0.1:")
137 || s.starts_with("http://localhost:")
138 || s == "http://127.0.0.1"
139 || s == "http://localhost"
140 }))
141 .allow_methods(Any)
142 .allow_headers(vec![
143 header::CONTENT_TYPE,
144 header::AUTHORIZATION,
145 header::ACCEPT,
146 "X-Session-Id".parse().unwrap(),
147 "X-AuthKey".parse().unwrap(),
148 "X-Requested-With".parse().unwrap(),
149 "x-vercel-ai-ui-message-stream".parse().unwrap(),
150 ]);
151
152 // Reactive routes. Previously registered without auth on the
153 // assumption that localhost is a trust boundary; the 2026-05-11
154 // security audit (C1 + C2) showed that any local process — or a
155 // web page driving 127.0.0.1 via the permissive CORS layer — could
156 // drive `/agentmux/reactive/inject` and reconfigure the cloud
157 // agentbus poller. These routes are now merged into `authed_routes`
158 // below and gated by `auth_middleware`.
159 let reactive_routes = Router::new()
160 .route("/agentmux/reactive/inject", post(reactive::handle_reactive_inject))
161 .route("/agentmux/reactive/agents", get(reactive::handle_reactive_agents))
162 .route("/agentmux/reactive/agent", get(reactive::handle_reactive_agent))
163 .route("/agentmux/reactive/audit", get(reactive::handle_reactive_audit))
164 .route("/agentmux/reactive/register", post(reactive::handle_reactive_register))
165 .route(
166 "/agentmux/reactive/unregister",
167 post(reactive::handle_reactive_unregister),
168 )
169 .route(
170 "/agentmux/reactive/poller/stats",
171 get(reactive::handle_reactive_poller_stats),
172 )
173 .route(
174 "/agentmux/reactive/poller/config",
175 post(reactive::handle_reactive_poller_config),
176 )
177 .route(
178 "/agentmux/reactive/poller/status",
179 get(reactive::handle_reactive_poller_status),
180 );
181
182 // MessageBus routes (authed, localhost-only)
183 let bus_routes = Router::new()
184 .route("/api/bus/register", post(messagebus::handle_register))
185 .route("/api/bus/send", post(messagebus::handle_send))
186 .route("/api/bus/inject", post(messagebus::handle_inject))
187 .route("/api/bus/broadcast", post(messagebus::handle_broadcast))
188 .route("/api/bus/messages", get(messagebus::handle_read_messages))
189 .route("/api/bus/messages/delete", post(messagebus::handle_delete_messages))
190 .route("/api/bus/agents", get(messagebus::handle_list_agents));
191
192 let authed_routes = Router::new()
193 .route("/ws", get(websocket::handle_ws))
194 .route("/agentmux/service", post(service::handle_service))
195 .route("/agentmux/file", get(files::handle_wave_file))
196 .route("/agentmux/stream-file", get(stub_501))
197 .route("/agentmux/stream-file/*path", get(stub_501))
198 .route("/agentmux/stream-local-file", get(stub_501))
199 .route("/api/post-chat-message", get(stub_501).post(stub_501))
200 .route("/docsite/*path", get(files::handle_docsite))
201 .route("/schema/*path", get(files::handle_schema))
202 .route("/api/lan-instances", get(handle_lan_instances))
203 .route("/agentmux/diag/sagas", get(handle_diag_sagas))
204 // Streaming-bash wrapper publish endpoint
205 // (SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §4.3). agentmux-bashwrap
206 // POSTs `{event, scopes, data}` here while a PreToolUse-rewritten
207 // Bash command is running; we forward to the in-process WPS broker.
208 // Auth-gated like the other reactive routes (PR #801 pattern).
209 .route("/agentmux/wps/publish", post(handle_wps_publish))
210 .merge(bus_routes)
211 .merge(reactive_routes)
212 .route_layer(middleware::from_fn_with_state(
213 state.clone(),
214 auth_middleware,
215 ));
216
217 // Health endpoint (no auth)
218 let health = Router::new().route("/", get(health_handler));
219
220 Router::new()
221 .merge(health)
222 .merge(authed_routes)
223 .layer(cors)
224 .with_state(state)
225}
226
227// ---- Health ----
228
229async fn health_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
230 Json(json!({
231 "status": "ok",
232 "version": state.version,
233 }))
234}
235
236/// Saga durability PR 2 — operator visibility into the durable saga
237/// log. Returns the most-recent 50 saga lifecycle rows + an in-flight
238/// count derived from `unresolved_sagas`.
239///
240/// **Why a JSON HTTP endpoint and not a launcher `--diag sagas`
241/// pipe-IPC client.** The `--diag srv` pipe transport (see
242/// `agentmux-launcher/src/diag.rs`) routes through `Tool` registration
243/// + a 2 s observation window with `GetSrvSnapshot` + `GetEvents`.
244/// Adding `GetSagaLogSnapshot` to the IPC `Command` enum + an
245/// `Event::SagaLogSnapshot` variant with a Vec of `SagaSnapshot`
246/// triples the touched-files surface for one operator command.
247/// JSON HTTP is the precedent for raw operator queries (cf
248/// `/api/lan-instances`) and matches the spec §9 PR 2 phrasing
249/// "tightened scope". Promoting to first-class `--diag sagas` via
250/// pipe IPC is a follow-up if anyone asks.
251///
252/// Operator workflow today:
253/// ```text
254/// curl -s -H "X-AuthKey: $KEY" http://127.0.0.1:$PORT/agentmux/diag/sagas | jq .
255/// ```
256/// Response shape:
257/// ```json
258/// {
259/// "recent": [ { "saga_id": ..., "name": ..., "state": ..., ... }, ... ],
260/// "in_flight_count": 1,
261/// "recently_failed_count": 0,
262/// "total_returned": 50
263/// }
264/// ```
265async fn handle_diag_sagas(State(state): State<AppState>) -> Json<serde_json::Value> {
266 const LIMIT: u32 = 50;
267 let recent = match state.saga_log.snapshot_recent(LIMIT) {
268 Ok(rows) => rows,
269 Err(e) => {
270 return Json(json!({
271 "error": format!("snapshot_recent failed: {}", e),
272 }));
273 }
274 };
275 let in_flight = match state.saga_log.unresolved_sagas() {
276 Ok(rows) => rows.len(),
277 Err(e) => {
278 tracing::warn!("[diag/sagas] unresolved_sagas failed: {}", e);
279 0
280 }
281 };
282 let recently_failed = recent
283 .iter()
284 .filter(|s| s.state == "failed" || s.state == "failed_compensation")
285 .count();
286 Json(json!({
287 "recent": recent,
288 "in_flight_count": in_flight,
289 "recently_failed_count": recently_failed,
290 "total_returned": recent.len(),
291 }))
292}
293
294async fn handle_lan_instances(State(state): State<AppState>) -> Json<serde_json::Value> {
295 let instances = state
296 .lan_discovery
297 .as_ref()
298 .map(|d| d.get_instances())
299 .unwrap_or_default();
300 Json(json!(instances))
301}
302
303async fn stub_501() -> impl IntoResponse {
304 (
305 StatusCode::NOT_IMPLEMENTED,
306 Json(json!({"error": "not implemented"})),
307 )
308}
309
310/// Wire shape for `POST /agentmux/wps/publish`. Mirrors `WaveEvent`
311/// but keeps the field set narrow for what `agentmux-bashwrap`
312/// actually needs (no `sender`).
313#[derive(serde::Deserialize)]
314struct WpsPublishRequest {
315 /// WPS event name. We use a fixed `tool_chunk` for every
316 /// streaming chunk (the tool_use_id lives in the payload), but
317 /// the handler is general-purpose.
318 event: String,
319 /// Optional scope filters (e.g. `["block:<id>"]`) so only
320 /// subscribers watching that block receive the event.
321 #[serde(default)]
322 scopes: Vec<String>,
323 /// Per-scope event ring size. Lets late subscribers replay
324 /// events that landed before they subscribed. agentmux-bashwrap
325 /// sets this to 1024 for `tool_chunk` so the frontend's
326 /// subscription (installed on pane mount) picks up chunks that
327 /// flew before Claude's stream-json caught up enough to surface
328 /// the tool_use_id. Zero (or omitted) disables persistence —
329 /// pure fan-out. See SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §6.
330 #[serde(default)]
331 persist: usize,
332 /// Free-form payload. For tool_chunk events this is the
333 /// `{op, kind, content, timestamp}` shape from
334 /// `SPEC_STREAMING_BASH_RUNNER_2026_05_11.md` §4.3.
335 data: serde_json::Value,
336}
337
338/// Auth-gated WPS publish endpoint
339/// (SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §3.2). `agentmux-bashwrap`
340/// POSTs here while running a Bash command; we forward to the
341/// in-process WPS broker so subscribed frontends receive the event.
342async fn handle_wps_publish(
343 State(state): State<AppState>,
344 Json(req): Json<WpsPublishRequest>,
345) -> impl IntoResponse {
346 let event = crate::backend::wps::WaveEvent {
347 event: req.event,
348 scopes: req.scopes,
349 sender: String::new(),
350 persist: req.persist,
351 data: Some(req.data),
352 };
353 state.broker.publish(event);
354 (StatusCode::OK, Json(json!({"ok": true})))
355}
356
357// ---- Auth Middleware ----
358
359/// Auth middleware matching Go pkg/authkey/authkey.go:18-42.
360async fn auth_middleware(
361 State(state): State<AppState>,
362 req: Request<Body>,
363 next: Next,
364) -> Response {
365 if req.method() == Method::OPTIONS {
366 return next.run(req).await;
367 }
368
369 let auth_key = req
370 .headers()
371 .get("X-AuthKey")
372 .and_then(|v| v.to_str().ok())
373 .map(|s| s.to_string());
374
375 // 2026-05-11 audit (C3): the query-string `?authkey=` fallback
376 // bypasses CORS preflight and is preserved in browser history,
377 // navigation `Referer` headers, server access logs, etc. — a CSRF
378 // amplifier whenever the key leaks. It is allowed **only** on the
379 // WebSocket upgrade route (`/ws`), where the browser WS API doesn't
380 // permit custom headers and there is no other practical channel
381 // for the key. Every other route requires the header.
382 let auth_key = auth_key.or_else(|| {
383 if req.uri().path() != "/ws" {
384 return None;
385 }
386 req.uri().query().and_then(|q| {
387 q.split('&')
388 .filter_map(|pair| pair.split_once('='))
389 .find(|(k, _)| *k == "authkey")
390 .map(|(_, v)| v.to_string())
391 })
392 });
393
394 match auth_key {
395 Some(key) if key == state.auth_key => next.run(req).await,
396 _ => (
397 StatusCode::UNAUTHORIZED,
398 Json(json!({"error": "unauthorized"})),
399 )
400 .into_response(),
401 }
402}