agentmux_srv\server/
mod.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod cli_handlers;
5mod files;
6mod app_api;
7mod agent_handlers;
8mod identity_handlers;
9pub mod install_handlers;
10mod messagebus;
11mod reactive;
12pub(crate) mod service;
13mod tool_handlers;
14pub(crate) mod wave_obj_bridge;
15mod websocket;
16mod drone_handlers;
17
18#[cfg(test)]
19pub(crate) mod tests;
20
21use std::sync::Arc;
22
23use axum::{
24    body::Body,
25    extract::{Request, State},
26    http::{header, Method, StatusCode},
27    middleware::{self, Next},
28    response::{IntoResponse, Json, Response},
29    routing::{get, post},
30    Router,
31};
32use serde_json::json;
33use tower_http::cors::{Any, CorsLayer};
34
35use crate::backend::eventbus::EventBus;
36use crate::backend::lan_discovery::LanDiscovery;
37use crate::backend::messagebus::MessageBus;
38use crate::backend::reactive::{Poller, ReactiveHandler};
39use crate::backend::storage::filestore::FileStore;
40use crate::backend::storage::wstore::WaveStore;
41use crate::backend::history::HistoryService;
42use crate::backend::subagent_watcher::SubagentWatcher;
43use crate::backend::wconfig;
44use crate::backend::wps::Broker;
45
46// ---- AppState ----
47
48#[derive(Clone)]
49pub struct AppState {
50    pub auth_key: String,
51    pub version: String,
52    pub app_path: String,
53    pub wstore: Arc<WaveStore>,
54    pub filestore: Arc<FileStore>,
55    pub event_bus: Arc<EventBus>,
56    pub broker: Arc<Broker>,
57    pub reactive_handler: &'static ReactiveHandler,
58    pub poller: Arc<Poller>,
59    pub config_watcher: Arc<wconfig::ConfigWatcher>,
60    pub messagebus: Arc<MessageBus>,
61    pub subagent_watcher: Arc<SubagentWatcher>,
62    pub history_service: Arc<HistoryService>,
63    /// Tracks every OS-level process each agent CLI has spawned, via
64    /// platform-specific mechanisms (Windows Job Objects, Linux cgroups,
65    /// macOS process groups). Surfaces the tree to the swarm pane and
66    /// provides kill-tree on pane close / host exit.
67    /// See `backend::process_tracker` + `agentmux-ai/AGENT_SPAWNED_PROCESSES_SPEC.md`.
68    pub process_tracker: Arc<crate::backend::process_tracker::registry::AgentProcessRegistry>,
69    pub lan_discovery: Option<Arc<LanDiscovery>>,
70    /// Local HTTP URL of this instance (e.g. "http://127.0.0.1:PORT").
71    /// Used for cross-instance inject forwarding and file registry entries.
72    pub local_web_url: String,
73    /// Shared HTTP client for cross-instance inject forwarding.
74    pub http_client: reqwest::Client,
75    /// Phase E.2c.2 — srv reducer's canonical state. Workspace HTTP/WS
76    /// RPC handlers route through the reducer (dispatch
77    /// `Command::Create/Delete/...Workspace` and read out of
78    /// `state.workspaces`); the persist subscriber mirrors emitted
79    /// events back to SQLite. Tab/Block RPC migrations land in
80    /// E.2c.3 / E.2c.4.
81    pub srv_state: std::sync::Arc<tokio::sync::Mutex<crate::state::State>>,
82    /// Phase E.2c.2 — broadcast bus for srv reducer events. RPC
83    /// handlers publish reducer-emitted events here so the persist
84    /// subscriber writes them back to SQLite. Pipe IPC server (when
85    /// bound) shares the same bus.
86    pub srv_events_tx: tokio::sync::broadcast::Sender<agentmux_common::ipc::Event>,
87    /// Phase E.5.5 — monotonic saga-id allocator. Each saga
88    /// (TearOffTab, TearOffBlock, RestoreTornOffTab, etc.) calls
89    /// `fetch_add` to claim a unique id; the id is stamped onto
90    /// `Event::SagaStarted/Completed/Failed` so subscribers can
91    /// correlate. Per-instance scope (no cross-process sharing — see
92    /// `docs/retro/saga-coordinator-location-analysis-2026-04-30.md`).
93    pub saga_id_alloc: std::sync::Arc<std::sync::atomic::AtomicU64>,
94    /// Saga durability — durable on-disk log of saga lifecycle.
95    /// Written by `SagaCtx::dispatch` / `compensate` (per-step) and
96    /// `emit_terminal` (per-saga) so a srv crash mid-saga leaves a
97    /// recoverable trail. PR 1 ships the log + instrumentation; PR 2
98    /// adds resume-on-startup + `--diag sagas`.
99    /// See `docs/specs/SPEC_SAGA_DURABILITY_2026-05-01.md`.
100    pub saga_log: std::sync::Arc<crate::sagas::log::SagaLog>,
101    /// Pre-launch OAuth session state — one entry per in-flight
102    /// "Connect with OAuth" attempt from the launch modal. See
103    /// `docs/specs/SPEC_PRE_LAUNCH_OAUTH_FLOW_2026_05_14.md`.
104    pub auth_session_manager: std::sync::Arc<crate::identity::auth_session::AuthSessionManager>,
105
106    /// In-flight `install.start` sessions. Frontend subscribes to
107    /// `install_chunk` WPS events scoped by session id; the registry
108    /// holds per-session cancel handles so `install.cancel` can abort
109    /// an install mid-flight.
110    /// See `SPEC_AGENT_INSTALL_STAGE_2026_05_17.md` §9.
111    pub install_sessions: std::sync::Arc<crate::server::install_handlers::InstallSessionRegistry>,
112}
113
114/// Build the Axum router with all routes, auth middleware, and CORS.
115pub fn build_router(state: AppState) -> Router {
116    // CORS: reflect only loopback origins.
117    //
118    // Before the 2026-05-11 security audit (C3) this allowed any origin
119    // (matching the historical Go pkg/web/web.go). That made every web
120    // page the user happened to have open a potential CSRF source —
121    // localhost is not a trust boundary on a developer machine.
122    //
123    // The legitimate cross-origin callers are:
124    //   - The CEF frontend served from `http://127.0.0.1:<host-port>`
125    //   - Vite dev server at `http://localhost:5173` (and similar)
126    //
127    // Both are loopback. The predicate accepts http://127.0.0.1:* and
128    // http://localhost:* (any port, http only; https is irrelevant for
129    // loopback). External origins are denied, which means a malicious
130    // page in the user's browser can't drive the sidecar even if it
131    // discovers the port.
132    use tower_http::cors::AllowOrigin;
133    let cors = CorsLayer::new()
134        .allow_origin(AllowOrigin::predicate(|origin, _req| {
135            let Ok(s) = origin.to_str() else { return false };
136            s.starts_with("http://127.0.0.1:")
137                || s.starts_with("http://localhost:")
138                || s == "http://127.0.0.1"
139                || s == "http://localhost"
140        }))
141        .allow_methods(Any)
142        .allow_headers(vec![
143            header::CONTENT_TYPE,
144            header::AUTHORIZATION,
145            header::ACCEPT,
146            "X-Session-Id".parse().unwrap(),
147            "X-AuthKey".parse().unwrap(),
148            "X-Requested-With".parse().unwrap(),
149            "x-vercel-ai-ui-message-stream".parse().unwrap(),
150        ]);
151
152    // Reactive routes. Previously registered without auth on the
153    // assumption that localhost is a trust boundary; the 2026-05-11
154    // security audit (C1 + C2) showed that any local process — or a
155    // web page driving 127.0.0.1 via the permissive CORS layer — could
156    // drive `/agentmux/reactive/inject` and reconfigure the cloud
157    // agentbus poller. These routes are now merged into `authed_routes`
158    // below and gated by `auth_middleware`.
159    let reactive_routes = Router::new()
160        .route("/agentmux/reactive/inject", post(reactive::handle_reactive_inject))
161        .route("/agentmux/reactive/agents", get(reactive::handle_reactive_agents))
162        .route("/agentmux/reactive/agent", get(reactive::handle_reactive_agent))
163        .route("/agentmux/reactive/audit", get(reactive::handle_reactive_audit))
164        .route("/agentmux/reactive/register", post(reactive::handle_reactive_register))
165        .route(
166            "/agentmux/reactive/unregister",
167            post(reactive::handle_reactive_unregister),
168        )
169        .route(
170            "/agentmux/reactive/poller/stats",
171            get(reactive::handle_reactive_poller_stats),
172        )
173        .route(
174            "/agentmux/reactive/poller/config",
175            post(reactive::handle_reactive_poller_config),
176        )
177        .route(
178            "/agentmux/reactive/poller/status",
179            get(reactive::handle_reactive_poller_status),
180        );
181
182    // MessageBus routes (authed, localhost-only)
183    let bus_routes = Router::new()
184        .route("/api/bus/register", post(messagebus::handle_register))
185        .route("/api/bus/send", post(messagebus::handle_send))
186        .route("/api/bus/inject", post(messagebus::handle_inject))
187        .route("/api/bus/broadcast", post(messagebus::handle_broadcast))
188        .route("/api/bus/messages", get(messagebus::handle_read_messages))
189        .route("/api/bus/messages/delete", post(messagebus::handle_delete_messages))
190        .route("/api/bus/agents", get(messagebus::handle_list_agents));
191
192    let authed_routes = Router::new()
193        .route("/ws", get(websocket::handle_ws))
194        .route("/agentmux/service", post(service::handle_service))
195        .route("/agentmux/file", get(files::handle_wave_file))
196        .route("/agentmux/stream-file", get(stub_501))
197        .route("/agentmux/stream-file/*path", get(stub_501))
198        .route("/agentmux/stream-local-file", get(stub_501))
199        .route("/api/post-chat-message", get(stub_501).post(stub_501))
200        .route("/docsite/*path", get(files::handle_docsite))
201        .route("/schema/*path", get(files::handle_schema))
202        .route("/api/lan-instances", get(handle_lan_instances))
203        .route("/agentmux/diag/sagas", get(handle_diag_sagas))
204        // Streaming-bash wrapper publish endpoint
205        // (SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §4.3). agentmux-bashwrap
206        // POSTs `{event, scopes, data}` here while a PreToolUse-rewritten
207        // Bash command is running; we forward to the in-process WPS broker.
208        // Auth-gated like the other reactive routes (PR #801 pattern).
209        .route("/agentmux/wps/publish", post(handle_wps_publish))
210        .merge(bus_routes)
211        .merge(reactive_routes)
212        .route_layer(middleware::from_fn_with_state(
213            state.clone(),
214            auth_middleware,
215        ));
216
217    // Health endpoint (no auth)
218    let health = Router::new().route("/", get(health_handler));
219
220    Router::new()
221        .merge(health)
222        .merge(authed_routes)
223        .layer(cors)
224        .with_state(state)
225}
226
227// ---- Health ----
228
229async fn health_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
230    Json(json!({
231        "status": "ok",
232        "version": state.version,
233    }))
234}
235
236/// Saga durability PR 2 — operator visibility into the durable saga
237/// log. Returns the most-recent 50 saga lifecycle rows + an in-flight
238/// count derived from `unresolved_sagas`.
239///
240/// **Why a JSON HTTP endpoint and not a launcher `--diag sagas`
241/// pipe-IPC client.** The `--diag srv` pipe transport (see
242/// `agentmux-launcher/src/diag.rs`) routes through `Tool` registration
243/// + a 2 s observation window with `GetSrvSnapshot` + `GetEvents`.
244/// Adding `GetSagaLogSnapshot` to the IPC `Command` enum + an
245/// `Event::SagaLogSnapshot` variant with a Vec of `SagaSnapshot`
246/// triples the touched-files surface for one operator command.
247/// JSON HTTP is the precedent for raw operator queries (cf
248/// `/api/lan-instances`) and matches the spec §9 PR 2 phrasing
249/// "tightened scope". Promoting to first-class `--diag sagas` via
250/// pipe IPC is a follow-up if anyone asks.
251///
252/// Operator workflow today:
253/// ```text
254/// curl -s -H "X-AuthKey: $KEY" http://127.0.0.1:$PORT/agentmux/diag/sagas | jq .
255/// ```
256/// Response shape:
257/// ```json
258/// {
259///   "recent": [ { "saga_id": ..., "name": ..., "state": ..., ... }, ... ],
260///   "in_flight_count": 1,
261///   "recently_failed_count": 0,
262///   "total_returned": 50
263/// }
264/// ```
265async fn handle_diag_sagas(State(state): State<AppState>) -> Json<serde_json::Value> {
266    const LIMIT: u32 = 50;
267    let recent = match state.saga_log.snapshot_recent(LIMIT) {
268        Ok(rows) => rows,
269        Err(e) => {
270            return Json(json!({
271                "error": format!("snapshot_recent failed: {}", e),
272            }));
273        }
274    };
275    let in_flight = match state.saga_log.unresolved_sagas() {
276        Ok(rows) => rows.len(),
277        Err(e) => {
278            tracing::warn!("[diag/sagas] unresolved_sagas failed: {}", e);
279            0
280        }
281    };
282    let recently_failed = recent
283        .iter()
284        .filter(|s| s.state == "failed" || s.state == "failed_compensation")
285        .count();
286    Json(json!({
287        "recent": recent,
288        "in_flight_count": in_flight,
289        "recently_failed_count": recently_failed,
290        "total_returned": recent.len(),
291    }))
292}
293
294async fn handle_lan_instances(State(state): State<AppState>) -> Json<serde_json::Value> {
295    let instances = state
296        .lan_discovery
297        .as_ref()
298        .map(|d| d.get_instances())
299        .unwrap_or_default();
300    Json(json!(instances))
301}
302
303async fn stub_501() -> impl IntoResponse {
304    (
305        StatusCode::NOT_IMPLEMENTED,
306        Json(json!({"error": "not implemented"})),
307    )
308}
309
310/// Wire shape for `POST /agentmux/wps/publish`. Mirrors `WaveEvent`
311/// but keeps the field set narrow for what `agentmux-bashwrap`
312/// actually needs (no `sender`).
313#[derive(serde::Deserialize)]
314struct WpsPublishRequest {
315    /// WPS event name. We use a fixed `tool_chunk` for every
316    /// streaming chunk (the tool_use_id lives in the payload), but
317    /// the handler is general-purpose.
318    event: String,
319    /// Optional scope filters (e.g. `["block:<id>"]`) so only
320    /// subscribers watching that block receive the event.
321    #[serde(default)]
322    scopes: Vec<String>,
323    /// Per-scope event ring size. Lets late subscribers replay
324    /// events that landed before they subscribed. agentmux-bashwrap
325    /// sets this to 1024 for `tool_chunk` so the frontend's
326    /// subscription (installed on pane mount) picks up chunks that
327    /// flew before Claude's stream-json caught up enough to surface
328    /// the tool_use_id. Zero (or omitted) disables persistence —
329    /// pure fan-out. See SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §6.
330    #[serde(default)]
331    persist: usize,
332    /// Free-form payload. For tool_chunk events this is the
333    /// `{op, kind, content, timestamp}` shape from
334    /// `SPEC_STREAMING_BASH_RUNNER_2026_05_11.md` §4.3.
335    data: serde_json::Value,
336}
337
338/// Auth-gated WPS publish endpoint
339/// (SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §3.2). `agentmux-bashwrap`
340/// POSTs here while running a Bash command; we forward to the
341/// in-process WPS broker so subscribed frontends receive the event.
342async fn handle_wps_publish(
343    State(state): State<AppState>,
344    Json(req): Json<WpsPublishRequest>,
345) -> impl IntoResponse {
346    let event = crate::backend::wps::WaveEvent {
347        event: req.event,
348        scopes: req.scopes,
349        sender: String::new(),
350        persist: req.persist,
351        data: Some(req.data),
352    };
353    state.broker.publish(event);
354    (StatusCode::OK, Json(json!({"ok": true})))
355}
356
357// ---- Auth Middleware ----
358
359/// Auth middleware matching Go pkg/authkey/authkey.go:18-42.
360async fn auth_middleware(
361    State(state): State<AppState>,
362    req: Request<Body>,
363    next: Next,
364) -> Response {
365    if req.method() == Method::OPTIONS {
366        return next.run(req).await;
367    }
368
369    let auth_key = req
370        .headers()
371        .get("X-AuthKey")
372        .and_then(|v| v.to_str().ok())
373        .map(|s| s.to_string());
374
375    // 2026-05-11 audit (C3): the query-string `?authkey=` fallback
376    // bypasses CORS preflight and is preserved in browser history,
377    // navigation `Referer` headers, server access logs, etc. — a CSRF
378    // amplifier whenever the key leaks. It is allowed **only** on the
379    // WebSocket upgrade route (`/ws`), where the browser WS API doesn't
380    // permit custom headers and there is no other practical channel
381    // for the key. Every other route requires the header.
382    let auth_key = auth_key.or_else(|| {
383        if req.uri().path() != "/ws" {
384            return None;
385        }
386        req.uri().query().and_then(|q| {
387            q.split('&')
388                .filter_map(|pair| pair.split_once('='))
389                .find(|(k, _)| *k == "authkey")
390                .map(|(_, v)| v.to_string())
391        })
392    });
393
394    match auth_key {
395        Some(key) if key == state.auth_key => next.run(req).await,
396        _ => (
397            StatusCode::UNAUTHORIZED,
398            Json(json!({"error": "unauthorized"})),
399        )
400            .into_response(),
401    }
402}