agentmux_srv\server/
websocket.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use axum::{
8    extract::{
9        ws::{Message, WebSocket},
10        State, WebSocketUpgrade,
11    },
12    response::Response,
13};
14use base64::Engine as _;
15use serde::Deserialize;
16use serde_json::json;
17
18use crate::backend::base::expand_home_dir_safe;
19use crate::backend::blockcontroller;
20use crate::backend::rpc::engine::WshRpcEngine;
21use crate::backend::rpc_types::{
22    CommandBlockInputData, CommandControllerResyncData, CommandEventReadHistoryData,
23    CommandGetMetaData, CommandSetMetaData, CommandToolDecisionData,
24    RpcMessage, COMMAND_CONTROLLER_INPUT,
25    COMMAND_CONTROLLER_RESYNC, COMMAND_EVENT_READ_HISTORY, COMMAND_EVENT_SUB, COMMAND_EVENT_UNSUB,
26    COMMAND_EVENT_UNSUB_ALL, COMMAND_GET_FULL_CONFIG, COMMAND_GET_META,
27    COMMAND_GET_AI_RATE_LIMIT, COMMAND_ROUTE_ANNOUNCE, COMMAND_ROUTE_UNANNOUNCE,
28    COMMAND_SET_META, COMMAND_SET_CONFIG, COMMAND_APP_INFO,
29    COMMAND_SUBPROCESS_SPAWN, COMMAND_AGENT_INPUT, COMMAND_AGENT_STOP, COMMAND_TOOL_DECISION,
30    COMMAND_WRITE_AGENT_CONFIG,
31    CommandSubprocessSpawnData, CommandAgentInputData, CommandAgentStopData, CommandWriteAgentConfigData,
32};
33use crate::backend::obj::{Block, TermSize, WaveObjUpdate, wave_obj_to_value};
34use super::service::update_object_meta;
35
36use super::AppState;
37
38/// Incoming WebSocket message envelope.
39/// Supports both ping/pong messages and wscommand-based RPC.
40#[derive(Deserialize)]
41struct WSIncoming {
42    #[serde(rename = "type")]
43    msg_type: Option<String>,
44    #[allow(dead_code)]
45    stime: Option<i64>,
46    wscommand: Option<String>,
47    message: Option<RpcMessage>,
48    // Fields for setblocktermsize / blockinput
49    blockid: Option<String>,
50    inputdata64: Option<String>,
51    termsize: Option<serde_json::Value>,
52    // Fields for bus:* commands
53    agent_id: Option<String>,
54    from: Option<String>,
55    to: Option<String>,
56    target: Option<String>,
57    payload: Option<String>,
58    #[serde(rename = "bus_message")]
59    bus_message_text: Option<String>,
60    priority: Option<String>,
61}
62
63pub(super) async fn handle_ws(
64    State(state): State<AppState>,
65    ws: WebSocketUpgrade,
66) -> Response {
67    ws.on_upgrade(move |socket| handle_ws_connection(socket, state))
68}
69
70async fn handle_ws_connection(mut socket: WebSocket, state: AppState) {
71    let ws_start = std::time::Instant::now();
72    let conn_id = uuid::Uuid::new_v4().to_string();
73    let tab_id = String::new();
74
75    tracing::info!(conn_id = %conn_id, "WebSocket client connected");
76
77    let mut event_rx = state.event_bus.register_ws(&conn_id, &tab_id);
78    tracing::info!("[ws-perf] register_ws: {:.2}ms", ws_start.elapsed().as_secs_f64() * 1000.0);
79
80    // Optional messagebus receiver — activated when pane sends bus:register
81    let mut bus_rx: Option<tokio::sync::mpsc::UnboundedReceiver<crate::backend::messagebus::BusMessage>> = None;
82    let mut bus_agent_id: Option<String> = None;
83
84    // Send initial "config" wave event via the RPC eventrecv path so the frontend
85    // populates fullConfigAtom (and shows the widget bar).
86    // Frontend only processes events via: {"eventtype":"rpc","data":{"command":"eventrecv","data":{"event":"config","data":{...}}}}
87    {
88        let t = std::time::Instant::now();
89        let config = state.config_watcher.get_full_config();
90        if let Ok(config_val) = serde_json::to_value(config.as_ref()) {
91            let config_event = json!({
92                "eventtype": "rpc",
93                "data": {
94                    "command": "eventrecv",
95                    "data": {
96                        "event": "config",
97                        "data": { "fullconfig": config_val }
98                    }
99                }
100            });
101            if let Ok(msg) = serde_json::to_string(&config_event) {
102                let _ = socket.send(Message::Text(msg.into())).await;
103            }
104        }
105        tracing::info!("[ws-perf] send_initial_config: {:.2}ms", t.elapsed().as_secs_f64() * 1000.0);
106    }
107
108    // Create RPC engine for this connection
109    let t = std::time::Instant::now();
110    let (engine, mut rpc_output_rx) = WshRpcEngine::new();
111
112    // Register handlers
113    register_handlers(&engine, state.clone());
114    tracing::info!("[ws-perf] create_engine+register_handlers: {:.2}ms", t.elapsed().as_secs_f64() * 1000.0);
115    tracing::info!("[ws-perf] TOTAL ws_setup: {:.2}ms", ws_start.elapsed().as_secs_f64() * 1000.0);
116
117    // Periodic ping interval (10 seconds)
118    let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(10));
119    ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
120
121    loop {
122        tokio::select! {
123            // Forward event bus events → WebSocket.
124            // Two sources feed the event bus:
125            //   1. WPS Broker (via EventBusBridge) — already wrapped as
126            //      { eventtype: "rpc", data: { command: "eventrecv", data: WaveEvent } }
127            //   2. Direct broadcasts (e.g., SetMeta's obj:update) — raw
128            //      { eventtype: "waveobj:update", oref: "block:xxx", data: ... }
129            // Type 1: forward as-is (already RPC-wrapped).
130            // Type 2: wrap as RPC "eventrecv" so the frontend WshRouter routes
131            //         it to handleWaveEvent → updateWaveObject → Jotai re-render.
132            Some(event) = event_rx.recv() => {
133                let msg = if event["eventtype"] == "rpc" {
134                    // Already an RPC message (from WPS broker via EventBusBridge)
135                    serde_json::to_string(&event).unwrap_or_default()
136                } else {
137                    // Raw event bus event — wrap as RPC eventrecv
138                    let wave_event = json!({
139                        "event": event["eventtype"],
140                        "scopes": [event["oref"]],
141                        "data": event["data"],
142                    });
143                    let wrapped = json!({
144                        "eventtype": "rpc",
145                        "data": {
146                            "command": "eventrecv",
147                            "data": wave_event,
148                        },
149                    });
150                    serde_json::to_string(&wrapped).unwrap_or_default()
151                };
152                if socket.send(Message::Text(msg.into())).await.is_err() {
153                    break;
154                }
155            }
156
157            // Forward RPC engine output → WebSocket (wrapped as eventtype:rpc)
158            Some(rpc_msg) = rpc_output_rx.recv() => {
159                let wrapped = json!({
160                    "eventtype": "rpc",
161                    "data": rpc_msg,
162                });
163                let msg = serde_json::to_string(&wrapped).unwrap_or_default();
164                if socket.send(Message::Text(msg.into())).await.is_err() {
165                    break;
166                }
167            }
168
169            // Forward MessageBus messages → WebSocket (if registered as agent)
170            Some(bus_msg) = async {
171                match bus_rx.as_mut() {
172                    Some(rx) => rx.recv().await,
173                    None => std::future::pending().await,
174                }
175            } => {
176                let wrapped = json!({
177                    "type": "bus:message",
178                    "data": bus_msg,
179                });
180                let msg = serde_json::to_string(&wrapped).unwrap_or_default();
181                if socket.send(Message::Text(msg.into())).await.is_err() {
182                    break;
183                }
184            }
185
186            // Incoming WebSocket messages → parse & dispatch
187            msg = socket.recv() => {
188                match msg {
189                    Some(Ok(Message::Close(_))) | None => break,
190                    Some(Ok(Message::Ping(data))) => {
191                        let _ = socket.send(Message::Pong(data)).await;
192                    }
193                    Some(Ok(Message::Text(text))) => {
194                        match handle_incoming_text(&text, &engine, &state, &mut socket).await {
195                            Err(true) => break,
196                            Ok(Some((new_rx, agent_id))) => {
197                                // bus:register returned a new receiver
198                                bus_rx = Some(new_rx);
199                                bus_agent_id = Some(agent_id);
200                            }
201                            _ => {}
202                        }
203                    }
204                    Some(Ok(_)) => {
205                        // Binary or other message types — ignore
206                    }
207                    Some(Err(_)) => break,
208                }
209            }
210
211            // Periodic ping
212            _ = ping_interval.tick() => {
213                let now = SystemTime::now()
214                    .duration_since(UNIX_EPOCH)
215                    .unwrap_or_default()
216                    .as_millis() as i64;
217                let ping = json!({ "type": "ping", "stime": now });
218                let msg = serde_json::to_string(&ping).unwrap_or_default();
219                if socket.send(Message::Text(msg.into())).await.is_err() {
220                    break;
221                }
222            }
223        }
224    }
225
226    tracing::info!(conn_id = %conn_id, "WebSocket client disconnected");
227    state.event_bus.unregister_ws(&conn_id);
228
229    // Unregister from messagebus if this connection was an agent
230    if let Some(ref agent_id) = bus_agent_id {
231        state.messagebus.unregister(agent_id);
232    }
233}
234
235/// Handle an incoming text message.
236/// Returns Err(true) if the socket send failed.
237/// Returns Ok(Some((rx, agent_id))) if a bus:register was processed.
238async fn handle_incoming_text(
239    text: &str,
240    engine: &Arc<WshRpcEngine>,
241    state: &AppState,
242    socket: &mut WebSocket,
243) -> Result<Option<(tokio::sync::mpsc::UnboundedReceiver<crate::backend::messagebus::BusMessage>, String)>, bool> {
244    let incoming: WSIncoming = match serde_json::from_str(text) {
245        Ok(v) => v,
246        Err(e) => {
247            tracing::warn!("ws: invalid JSON: {}", e);
248            return Ok(None);
249        }
250    };
251
252    // Handle ping/pong by type field
253    if let Some(ref msg_type) = incoming.msg_type {
254        match msg_type.as_str() {
255            "ping" => {
256                let now = SystemTime::now()
257                    .duration_since(UNIX_EPOCH)
258                    .unwrap_or_default()
259                    .as_millis() as i64;
260                let pong = json!({ "type": "pong", "stime": now });
261                let msg = serde_json::to_string(&pong).unwrap_or_default();
262                if socket.send(Message::Text(msg.into())).await.is_err() {
263                    return Err(true);
264                }
265                return Ok(None);
266            }
267            "pong" => {
268                return Ok(None);
269            }
270            "bus:register" => {
271                if let Some(ref agent_id) = incoming.agent_id {
272                    let rx = state.messagebus.register(agent_id, "websocket");
273                    let ack = json!({ "type": "bus:registered", "agent_id": agent_id });
274                    let msg = serde_json::to_string(&ack).unwrap_or_default();
275                    if socket.send(Message::Text(msg.into())).await.is_err() {
276                        return Err(true);
277                    }
278                    return Ok(Some((rx, agent_id.clone())));
279                }
280                return Ok(None);
281            }
282            "bus:send" => {
283                if let (Some(ref from), Some(ref to), Some(ref payload)) =
284                    (&incoming.from, &incoming.to, &incoming.payload)
285                {
286                    let priority = match incoming.priority.as_deref() {
287                        Some("high") => crate::backend::messagebus::Priority::High,
288                        Some("urgent") => crate::backend::messagebus::Priority::Urgent,
289                        _ => crate::backend::messagebus::Priority::Normal,
290                    };
291                    let bus_msg = crate::backend::messagebus::BusMessage::new(
292                        from, to, crate::backend::messagebus::MessageType::Send, payload, priority,
293                    );
294                    let msg_id = bus_msg.id.clone();
295                    let _ = state.messagebus.send(bus_msg);
296                    let ack = json!({ "type": "bus:sent", "message_id": msg_id });
297                    let msg = serde_json::to_string(&ack).unwrap_or_default();
298                    if socket.send(Message::Text(msg.into())).await.is_err() {
299                        return Err(true);
300                    }
301                }
302                return Ok(None);
303            }
304            "bus:inject" => {
305                let from = incoming.from.as_deref().unwrap_or("unknown");
306                if let (Some(ref target), Some(ref message)) =
307                    (&incoming.target, &incoming.bus_message_text)
308                {
309                    // Try direct PTY injection via ReactiveHandler first
310                    let reactive_req = crate::backend::reactive::InjectionRequest {
311                        target_agent: target.clone(),
312                        message: message.clone(),
313                        source_agent: Some(from.to_string()),
314                        request_id: None,
315                        priority: incoming.priority.clone(),
316                        wait_for_idle: false,
317                    };
318                    let resp = state.reactive_handler.inject_message(reactive_req);
319                    if resp.success {
320                        let ack = json!({ "type": "bus:injected", "via": "pty", "block_id": resp.block_id });
321                        let msg = serde_json::to_string(&ack).unwrap_or_default();
322                        if socket.send(Message::Text(msg.into())).await.is_err() {
323                            return Err(true);
324                        }
325                        return Ok(None);
326                    }
327
328                    // Non-"agent not found" error — report it
329                    let is_not_found = resp.error.as_deref().map(|e| e.contains("not found")).unwrap_or(false);
330                    if !is_not_found {
331                        let err = json!({ "type": "bus:error", "error": resp.error });
332                        let msg = serde_json::to_string(&err).unwrap_or_default();
333                        if socket.send(Message::Text(msg.into())).await.is_err() {
334                            return Err(true);
335                        }
336                        return Ok(None);
337                    }
338
339                    // Fall back to MessageBus WebSocket push
340                    let priority = match incoming.priority.as_deref() {
341                        Some("high") => crate::backend::messagebus::Priority::High,
342                        Some("urgent") => crate::backend::messagebus::Priority::Urgent,
343                        _ => crate::backend::messagebus::Priority::Normal,
344                    };
345                    match state.messagebus.inject(from, target, message, priority) {
346                        Ok(msg_id) => {
347                            let ack = json!({ "type": "bus:injected", "via": "messagebus", "message_id": msg_id });
348                            let msg = serde_json::to_string(&ack).unwrap_or_default();
349                            if socket.send(Message::Text(msg.into())).await.is_err() {
350                                return Err(true);
351                            }
352                        }
353                        Err(e) => {
354                            let err = json!({ "type": "bus:error", "error": e });
355                            let msg = serde_json::to_string(&err).unwrap_or_default();
356                            if socket.send(Message::Text(msg.into())).await.is_err() {
357                                return Err(true);
358                            }
359                        }
360                    }
361                }
362                return Ok(None);
363            }
364            "bus:broadcast" => {
365                let from = incoming.from.as_deref().unwrap_or("unknown");
366                if let Some(ref payload) = incoming.payload {
367                    let priority = match incoming.priority.as_deref() {
368                        Some("high") => crate::backend::messagebus::Priority::High,
369                        Some("urgent") => crate::backend::messagebus::Priority::Urgent,
370                        _ => crate::backend::messagebus::Priority::Normal,
371                    };
372                    let _ = state.messagebus.broadcast(from, payload, priority);
373                }
374                return Ok(None);
375            }
376            _ => {}
377        }
378    }
379
380    // Handle wscommand-based messages
381    if let Some(ref wscommand) = incoming.wscommand {
382        match wscommand.as_str() {
383            "rpc" => {
384                if let Some(rpc_msg) = incoming.message {
385                    engine.handle_message(rpc_msg);
386                } else {
387                    tracing::warn!("ws: rpc wscommand missing message field");
388                }
389            }
390            "blockinput" => {
391                if let Some(ref block_id) = incoming.blockid {
392                    if let Some(ref data64) = incoming.inputdata64 {
393                        if !data64.is_empty() {
394                            match base64::engine::general_purpose::STANDARD.decode(data64) {
395                                Ok(data) => {
396                                    let input = blockcontroller::BlockInputUnion::data(data);
397                                    if let Err(e) = blockcontroller::send_input(block_id, input, None) {
398                                        tracing::debug!("ws: blockinput error: {}", e);
399                                    }
400                                }
401                                Err(e) => {
402                                    tracing::warn!("ws: blockinput base64 decode error: {}", e);
403                                }
404                            }
405                        }
406                    }
407                }
408            }
409            "setblocktermsize" => {
410                if let Some(ref block_id) = incoming.blockid {
411                    if let Some(ref ts_val) = incoming.termsize {
412                        match serde_json::from_value::<TermSize>(ts_val.clone()) {
413                            Ok(ts) => {
414                                let input = blockcontroller::BlockInputUnion::resize(ts);
415                                if let Err(e) = blockcontroller::send_input(block_id, input, None) {
416                                    tracing::debug!("ws: setblocktermsize error: {}", e);
417                                }
418                            }
419                            Err(e) => {
420                                tracing::warn!("ws: setblocktermsize parse error: {}", e);
421                            }
422                        }
423                    }
424                }
425            }
426            other => {
427                tracing::warn!("ws: unknown wscommand: {}", other);
428            }
429        }
430    }
431
432    Ok(None)
433}
434
435fn register_handlers(engine: &Arc<WshRpcEngine>, state: AppState) {
436    // getfullconfig → return full config as JSON
437    let config_watcher = state.config_watcher.clone();
438    engine.register_handler(
439        COMMAND_GET_FULL_CONFIG,
440        Box::new(move |_data, _ctx| {
441            let cw = config_watcher.clone();
442            Box::pin(async move {
443                let config = cw.get_full_config();
444                match serde_json::to_value(config.as_ref()) {
445                    Ok(v) => Ok(Some(v)),
446                    Err(e) => Err(format!("failed to serialize config: {}", e)),
447                }
448            })
449        }),
450    );
451
452    // routeannounce → log + no-op (fire-and-forget, may have no reqid)
453    engine.register_handler(
454        COMMAND_ROUTE_ANNOUNCE,
455        Box::new(|data, _ctx| {
456            Box::pin(async move {
457                tracing::debug!("routeannounce: {:?}", data);
458                Ok(None)
459            })
460        }),
461    );
462
463    // routeunannounce → no-op
464    engine.register_handler(
465        COMMAND_ROUTE_UNANNOUNCE,
466        Box::new(|_data, _ctx| Box::pin(async move { Ok(None) })),
467    );
468
469    // eventsub → register subscription with the WPS broker
470    let broker_sub = state.broker.clone();
471    engine.register_handler(
472        COMMAND_EVENT_SUB,
473        Box::new(move |data, _ctx| {
474            let broker = broker_sub.clone();
475            Box::pin(async move {
476                let sub: crate::backend::wps::SubscriptionRequest =
477                    serde_json::from_value(data).map_err(|e| format!("eventsub: {e}"))?;
478                tracing::debug!("eventsub: event={} scopes={:?} allscopes={}", sub.event, sub.scopes, sub.allscopes);
479                broker.subscribe("ws-main", sub);
480                Ok(None)
481            })
482        }),
483    );
484
485    // eventunsub → unsubscribe from the WPS broker
486    let broker_unsub = state.broker.clone();
487    engine.register_handler(
488        COMMAND_EVENT_UNSUB,
489        Box::new(move |data, _ctx| {
490            let broker = broker_unsub.clone();
491            Box::pin(async move {
492                let event_name = data.as_str().unwrap_or("").to_string();
493                if !event_name.is_empty() {
494                    broker.unsubscribe("ws-main", &event_name);
495                }
496                Ok(None)
497            })
498        }),
499    );
500
501    // eventunsuball → unsubscribe all from the WPS broker
502    let broker_unsub_all = state.broker.clone();
503    engine.register_handler(
504        COMMAND_EVENT_UNSUB_ALL,
505        Box::new(move |_data, _ctx| {
506            let broker = broker_unsub_all.clone();
507            Box::pin(async move {
508                broker.unsubscribe_all("ws-main");
509                Ok(None)
510            })
511        }),
512    );
513
514    // setmeta → update object metadata in the DB, broadcast update event
515    let wstore_sm = state.wstore.clone();
516    let event_bus_sm = state.event_bus.clone();
517    engine.register_handler(
518        COMMAND_SET_META,
519        Box::new(move |data, _ctx| {
520            let wstore = wstore_sm.clone();
521            let event_bus = event_bus_sm.clone();
522            Box::pin(async move {
523                let cmd: CommandSetMetaData =
524                    serde_json::from_value(data).map_err(|e| format!("setmeta: {e}"))?;
525                let oref_str = cmd.oref.to_string();
526                let meta_keys: Vec<&String> = cmd.meta.keys().collect();
527                tracing::info!(oref = %oref_str, keys = ?meta_keys, "SetMeta");
528                update_object_meta(&wstore, &oref_str, &cmd.meta)?;
529                // Read the updated object and broadcast a proper WaveObjUpdate
530                // so all WS clients refresh their atoms with the new data.
531                let oref = crate::backend::ORef::parse(&oref_str)
532                    .map_err(|e| e.to_string())?;
533                let update_data = if oref.otype == "block" {
534                    if let Ok(block) = wstore.must_get::<Block>(&oref.oid) {
535                        Some(serde_json::to_value(&WaveObjUpdate {
536                            updatetype: "update".into(),
537                            otype: oref.otype.clone(),
538                            oid: oref.oid.clone(),
539                            obj: Some(wave_obj_to_value(&block)),
540                        }).unwrap_or_default())
541                    } else { None }
542                } else { None };
543                event_bus.broadcast_event(&crate::backend::eventbus::WSEventType {
544                    eventtype: "waveobj:update".to_string(),
545                    oref: oref_str,
546                    data: update_data,
547                });
548                Ok(None)
549            })
550        }),
551    );
552
553    // getmeta → return metadata for a wave object
554    let wstore_gm = state.wstore.clone();
555    engine.register_handler(
556        COMMAND_GET_META,
557        Box::new(move |data, _ctx| {
558            let wstore = wstore_gm.clone();
559            Box::pin(async move {
560                let cmd: CommandGetMetaData =
561                    serde_json::from_value(data).map_err(|e| format!("getmeta: {e}"))?;
562                let obj: Option<serde_json::Value> = wstore
563                    .get_raw(&cmd.oref.otype, &cmd.oref.oid)
564                    .map_err(|e| format!("getmeta: {e}"))?;
565                match obj {
566                    Some(val) => {
567                        // Return the "meta" field if present, otherwise the full object
568                        let meta = val.get("meta").cloned().unwrap_or(val);
569                        Ok(Some(meta))
570                    }
571                    None => Err(format!("getmeta: object {} not found", cmd.oref)),
572                }
573            })
574        }),
575    );
576
577    // waveinfo → return version and build info
578    let version_info = state.version.clone();
579    engine.register_handler(
580        COMMAND_APP_INFO,
581        Box::new(move |_data, _ctx| {
582            let version = version_info.clone();
583            Box::pin(async move {
584                Ok(Some(serde_json::json!({
585                    "version": version,
586                })))
587            })
588        }),
589    );
590
591    // getwaveairatelimit → AgentMux has no rate limits; return unlimited/unknown
592    engine.register_handler(
593        COMMAND_GET_AI_RATE_LIMIT,
594        Box::new(|_data, _ctx| {
595            Box::pin(async move {
596                Ok(Some(serde_json::json!({
597                    "req": 9999,
598                    "reqlimit": 9999,
599                    "preq": 9999,
600                    "preqlimit": 9999,
601                    "resetepoch": 0,
602                    "unknown": true
603                })))
604            })
605        }),
606    );
607
608    // controllerresync → load block from DB, create/restart controller with PTY
609    let wstore_resync = state.wstore.clone();
610    let broker_resync = state.broker.clone();
611    let event_bus_resync = state.event_bus.clone();
612    let filestore_resync = state.filestore.clone();
613    engine.register_handler(
614        COMMAND_CONTROLLER_RESYNC,
615        Box::new(move |data, _ctx| {
616            let wstore = wstore_resync.clone();
617            let broker = broker_resync.clone();
618            let event_bus = event_bus_resync.clone();
619            let filestore = filestore_resync.clone();
620            Box::pin(async move {
621                let cmd: CommandControllerResyncData = serde_json::from_value(data)
622                    .map_err(|e| format!("controllerresync: {e}"))?;
623                tracing::info!(
624                    block_id = %cmd.blockid,
625                    tab_id = %cmd.tabid,
626                    forcerestart = cmd.forcerestart,
627                    "ControllerResync"
628                );
629                let block: Block = wstore
630                    .get(&cmd.blockid)
631                    .map_err(|e| format!("controllerresync: load block: {e}"))?
632                    .ok_or_else(|| format!("controllerresync: block {} not found", cmd.blockid))?;
633                blockcontroller::resync_controller(
634                    &block,
635                    &cmd.tabid,
636                    cmd.rtopts,
637                    cmd.forcerestart,
638                    Some(broker),
639                    Some(event_bus),
640                    Some(wstore),
641                    Some(filestore),
642                )?;
643                Ok(None)
644            })
645        }),
646    );
647
648    // controllerinput → route keyboard input / signals / resize to block controller
649    engine.register_handler(
650        COMMAND_CONTROLLER_INPUT,
651        Box::new(|data, _ctx| {
652            Box::pin(async move {
653                let cmd: CommandBlockInputData = serde_json::from_value(data)
654                    .map_err(|e| format!("controllerinput: {e}"))?;
655                let input = parse_block_input(&cmd)?;
656                blockcontroller::send_input(&cmd.blockid, input, cmd.seq)?;
657                Ok(None)
658            })
659        }),
660    );
661
662    // tooldecision → reply to a per-tool-call permission gate.
663    //
664    // The original PR-3a draft tried to write `y\n` / `n\n` to the
665    // subprocess's stdin via `blockcontroller::send_input`. Codex P1
666    // on PR #557 caught that this would fail: `SubprocessController::
667    // send_input` (and `PersistentSubprocessController::send_input`)
668    // both reject raw `input_data`, returning `Err("...use
669    // AgentInputCommand")`. The deeper truth is that AgentMux runs
670    // the agent CLI in non-interactive `--print` mode — the CLI
671    // never reads stdin and a y/n write would be a no-op even if
672    // the controller accepted it. See SPEC_DECISION_PROMPT
673    // _2026_04_24.md §9.1.
674    //
675    // For now this handler accepts the decision, validates the
676    // payload, logs it (audit trail via `~/.agentmux/logs/`), and
677    // returns Ok. The actual delivery mechanism (rule-persistence
678    // for next-turn application, or interactive-mode subprocess
679    // launch with stdin write) is decided in PR-3b / PR-4 once we
680    // pick a CLI integration strategy.
681    engine.register_handler(
682        COMMAND_TOOL_DECISION,
683        Box::new(|data, _ctx| {
684            Box::pin(async move {
685                let cmd: CommandToolDecisionData = serde_json::from_value(data)
686                    .map_err(|e| format!("tooldecision: {e}"))?;
687                match cmd.outcome.as_str() {
688                    "allow" | "deny" => {}
689                    other => {
690                        return Err(format!(
691                            "tooldecision: invalid outcome '{}' (expected 'allow' or 'deny')",
692                            other
693                        ));
694                    }
695                }
696                // Validate scope so PR-3b's rules-persistence layer
697                // can trust the value without re-checking. Reagent P1
698                // round-3 on PR #557.
699                match cmd.scope.as_str() {
700                    "once" | "session" | "project" | "global" => {}
701                    other => {
702                        return Err(format!(
703                            "tooldecision: invalid scope '{}' (expected 'once'/'session'/'project'/'global')",
704                            other
705                        ));
706                    }
707                }
708                tracing::info!(
709                    block_id = %cmd.blockid,
710                    request_id = %cmd.request_id,
711                    outcome = %cmd.outcome,
712                    scope = %cmd.scope,
713                    has_feedback = cmd.feedback.is_some(),
714                    "[tooldecision] received (delivery mechanism deferred to PR-3b/PR-4)"
715                );
716                Ok(None)
717            })
718        }),
719    );
720
721    // subprocessspawn → spawn agent CLI as subprocess for a single turn
722    let wstore_spawn = state.wstore.clone();
723    let broker_spawn = state.broker.clone();
724    let event_bus_spawn = state.event_bus.clone();
725    let filestore_spawn = state.filestore.clone();
726    engine.register_handler(
727        COMMAND_SUBPROCESS_SPAWN,
728        Box::new(move |data, _ctx| {
729            let wstore = wstore_spawn.clone();
730            let broker = broker_spawn.clone();
731            let event_bus = event_bus_spawn.clone();
732            let filestore = filestore_spawn.clone();
733            Box::pin(async move {
734                let cmd: CommandSubprocessSpawnData = serde_json::from_value(data)
735                    .map_err(|e| format!("subprocessspawn: {e}"))?;
736                tracing::info!(
737                    block_id = %cmd.blockid,
738                    cli = %cmd.cli_command,
739                    "SubprocessSpawn"
740                );
741
742                // Get or create a SubprocessController for this block
743                let ctrl = match blockcontroller::get_controller(&cmd.blockid) {
744                    Some(c) if c.controller_type() == blockcontroller::BLOCK_CONTROLLER_SUBPROCESS => c,
745                    _ => {
746                        // Create and register a new SubprocessController
747                        let ctrl = blockcontroller::subprocess::SubprocessController::new(
748                            cmd.tabid.clone(),
749                            cmd.blockid.clone(),
750                            Some(broker),
751                            Some(event_bus),
752                            Some(wstore),
753                            Some(filestore),
754                        );
755                        let ctrl = std::sync::Arc::new(ctrl);
756                        ctrl.set_self_ref();
757                        blockcontroller::register_controller(&cmd.blockid, ctrl.clone());
758                        ctrl as std::sync::Arc<dyn blockcontroller::Controller>
759                    }
760                };
761
762                // Downcast to SubprocessController to call spawn_turn
763                let subprocess_ctrl = ctrl
764                    .as_any()
765                    .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
766                    .ok_or_else(|| "controller is not a SubprocessController".to_string())?;
767
768                let config = blockcontroller::subprocess::SubprocessSpawnConfig {
769                    cli_command: cmd.cli_command,
770                    cli_args: cmd.cli_args,
771                    working_dir: cmd.working_dir,
772                    env_vars: cmd.env_vars,
773                    message: cmd.message,
774                    resume_flag: "--resume".to_string(),
775                    session_id_field: "session_id".to_string(),
776                    message_id: None,
777                    // Direct-spawn legacy command — caller doesn't
778                    // carry a reattach context. Greenfield session id
779                    // is None; spawn_turn captures it from CLI stdout
780                    // on the first turn as before.
781                    session_id: None,
782                };
783                subprocess_ctrl.spawn_turn(config)?;
784                Ok(None)
785            })
786        }),
787    );
788
789    // agentinput → send message to agent (persistent or per-turn subprocess)
790    let wstore_ai = state.wstore.clone();
791    // Streaming-bash wrapper auth — clone the per-launch auth_key into the
792    // handler's closure so each spawn can inject it into Claude's env.
793    // See SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §7.
794    let auth_key_ai = state.auth_key.clone();
795    // Broker — passed into the identity-injection path so the OAuth
796    // expiry probe (PR D, spec §4.4) can publish
797    // `identitybundlebindings:changed:<bundle_id>` on status change.
798    let broker_ai = state.broker.clone();
799    engine.register_handler(
800        COMMAND_AGENT_INPUT,
801        Box::new(move |data, _ctx| {
802            let wstore = wstore_ai.clone();
803            let auth_key = auth_key_ai.clone();
804            let broker = broker_ai.clone();
805            Box::pin(async move {
806                let cmd: CommandAgentInputData = serde_json::from_value(data)
807                    .map_err(|e| format!("agentinput: {e}"))?;
808                tracing::info!(block_id = %cmd.blockid, "AgentInput");
809
810                let ctrl = blockcontroller::get_controller(&cmd.blockid)
811                    .ok_or_else(|| format!("no controller for block {}", cmd.blockid))?;
812
813                // Re-read the spawn config from block metadata
814                let block: Block = wstore
815                    .get(&cmd.blockid)
816                    .map_err(|e| format!("agentinput: load block: {e}"))?
817                    .ok_or_else(|| format!("block {} not found", cmd.blockid))?;
818
819                let cli_command = crate::backend::obj::meta_get_string(
820                    &block.meta, "cmd", "claude",
821                );
822                let cli_args: Vec<String> = match block.meta.get("cmd:args") {
823                    Some(serde_json::Value::Array(arr)) => arr
824                        .iter()
825                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
826                        .collect(),
827                    _ => vec![
828                        "-p".to_string(),
829                        "--input-format".to_string(),
830                        "stream-json".to_string(),
831                        "--output-format".to_string(),
832                        "stream-json".to_string(),
833                    ],
834                };
835                let working_dir = crate::backend::obj::meta_get_string(
836                    &block.meta, "cmd:cwd", "",
837                );
838                let mut env_vars: std::collections::HashMap<String, String> = match block.meta.get("cmd:env") {
839                    Some(serde_json::Value::Object(obj)) => obj
840                        .iter()
841                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
842                        .collect(),
843                    _ => std::collections::HashMap::new(),
844                };
845                // Identity injection: look up the active AgentInstance for
846                // this block, resolve its identity_id's bindings, and merge
847                // each per-provider env var into the spawn map. Failures
848                // are logged and skipped — the agent CLI launches with
849                // whatever resolved cleanly plus the static cmd:env block.
850                // See agentmux-srv/src/identity/resolver.rs. Broker
851                // hand-in lets the OAuth expiry probe (PR D, spec §4.4)
852                // publish `identitybundlebindings:changed:<bundle_id>`
853                // when it flips a token's status valid→expired etc.
854                crate::identity::resolver::inject_identity_env_with_broker(
855                    wstore.clone(),
856                    Some(broker.clone()),
857                    &cmd.blockid,
858                    &mut env_vars,
859                );
860                // Streaming-bash wrapper auth + discovery
861                // (SPEC_STREAMING_BASH_RUNNER_2026_05_11.md §7).
862                //
863                // 1. AGENTMUX_AUTH_KEY — config.rs:42 removed it from
864                //    the process env at startup (security PR #801).
865                //    Re-inject for this spawn so the wrapper (running
866                //    inside Claude's bash subprocess tree) can
867                //    authenticate against the auth_middleware-gated
868                //    /agentmux/wps/publish endpoint via X-AuthKey.
869                // 2. PATH — prepend the bundled tools/bin dir so
870                //    `agentmux-bashwrap.exe` resolves when the
871                //    PreToolUse hook (auto-injected by agent_config.rs)
872                //    rewrites the command to invoke it. AGENTMUX_LOCAL_URL
873                //    is already in the inherited process env (main.rs:498).
874                env_vars.insert("AGENTMUX_AUTH_KEY".to_string(), auth_key.clone());
875                // Block id so the wrapper can scope its WPS publishes
876                // to `block:<id>`. Without this, chunks publish without
877                // a scope and the frontend's per-block subscription
878                // doesn't receive them.
879                env_vars.insert("AGENTMUX_BLOCKID".to_string(), cmd.blockid.clone());
880                // PATH includes BOTH bundled tools dir (portable
881                // builds, runtime/tools/bin/) AND user tools dir
882                // (~/.agentmux/tools/bin/). bundled is None in dev
883                // mode (target/debug exclusion in tool_store), so
884                // without user_tools_dir the wrapper wouldn't be on
885                // the agent's PATH during `task dev`.
886                {
887                    let existing = env_vars
888                        .get("PATH")
889                        .cloned()
890                        .or_else(|| std::env::var("PATH").ok())
891                        .unwrap_or_default();
892                    let sep = if cfg!(windows) { ";" } else { ":" };
893                    let mut extras: Vec<String> = Vec::new();
894                    if let Some(d) = crate::backend::tool_store::bundled_tools_dir() {
895                        if d.exists() {
896                            extras.push(d.to_string_lossy().into_owned());
897                        }
898                    }
899                    if let Some(d) = crate::backend::tool_store::user_tools_dir() {
900                        if d.exists() {
901                            extras.push(d.to_string_lossy().into_owned());
902                        }
903                    }
904                    if !extras.is_empty() {
905                        let new_path = format!("{}{}{}", extras.join(sep), sep, existing);
906                        env_vars.insert("PATH".to_string(), new_path);
907                    }
908                }
909
910                let session_id_field = crate::backend::obj::meta_get_string(
911                    &block.meta, "agent:session_id_field", "session_id",
912                );
913
914                // Try persistent controller first, fall back to subprocess
915                if let Some(persistent_ctrl) = ctrl
916                    .as_any()
917                    .downcast_ref::<blockcontroller::persistent::PersistentSubprocessController>()
918                {
919                    let config = blockcontroller::persistent::PersistentSpawnConfig {
920                        cli_command,
921                        cli_args,
922                        working_dir,
923                        env_vars,
924                        session_id_field,
925                    };
926                    persistent_ctrl.send_message(cmd.message, config)?;
927                } else if let Some(subprocess_ctrl) = ctrl
928                    .as_any()
929                    .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
930                {
931                    let resume_flag = crate::backend::obj::meta_get_string(
932                        &block.meta, "agent:resume_flag", "--resume",
933                    );
934                    // Picker reattach: the frontend writes the prior
935                    // block's session id here when launching with
936                    // `continueOfInstanceId`. spawn_turn hydrates its
937                    // inner.session_id from this on the first turn so
938                    // --resume <sid> lands on the very first launch.
939                    let persisted_session_id = crate::backend::obj::meta_get_string(
940                        &block.meta, "agent:sessionid", "",
941                    );
942                    let config = blockcontroller::subprocess::SubprocessSpawnConfig {
943                        cli_command,
944                        cli_args,
945                        working_dir,
946                        env_vars,
947                        message: cmd.message,
948                        resume_flag,
949                        session_id_field,
950                        message_id: cmd.message_id,
951                        session_id: if persisted_session_id.is_empty() {
952                            None
953                        } else {
954                            Some(persisted_session_id)
955                        },
956                    };
957                    subprocess_ctrl.spawn_turn(config)?;
958                } else {
959                    return Err("controller is not a SubprocessController or PersistentSubprocessController".to_string());
960                }
961
962                Ok(None)
963            })
964        }),
965    );
966
967    // agentstop → stop the running agent subprocess
968    engine.register_handler(
969        COMMAND_AGENT_STOP,
970        Box::new(|data, _ctx| {
971            Box::pin(async move {
972                let cmd: CommandAgentStopData = serde_json::from_value(data)
973                    .map_err(|e| format!("agentstop: {e}"))?;
974                tracing::info!(block_id = %cmd.blockid, force = cmd.force, "AgentStop");
975                match blockcontroller::get_controller(&cmd.blockid) {
976                    Some(ctrl) => {
977                        ctrl.stop(!cmd.force, blockcontroller::STATUS_DONE)?;
978                        Ok(None)
979                    }
980                    None => Ok(None),
981                }
982            })
983        }),
984    );
985
986    // writeagentconfig → write config files atomically to agent working directory
987    engine.register_handler(
988        COMMAND_WRITE_AGENT_CONFIG,
989        Box::new(|data, _ctx| {
990            Box::pin(async move {
991                let cmd: CommandWriteAgentConfigData = serde_json::from_value(data)
992                    .map_err(|e| format!("writeagentconfig: {e}"))?;
993                tracing::info!(
994                    working_dir = %cmd.working_dir,
995                    file_count = cmd.files.len(),
996                    auto_allocate = cmd.auto_allocate,
997                    "WriteAgentConfig"
998                );
999
1000                // Resolve to a final on-disk path. For auto-generated
1001                // instance paths (`auto_allocate: true`), use the
1002                // atomic `<base>-N` allocator so concurrent same-hour
1003                // launches don't share a workdir. For user-specified
1004                // paths, mkdir-p as before — never rewrite.
1005                let expanded_working_dir = expand_home_dir_safe(&cmd.working_dir);
1006                let final_working_dir = if cmd.auto_allocate {
1007                    let desired = expanded_working_dir.to_string_lossy().to_string();
1008                    crate::server::app_api::allocate_agent_workdir(&desired)?
1009                } else {
1010                    let p = expanded_working_dir.as_path();
1011                    if !p.exists() {
1012                        std::fs::create_dir_all(p)
1013                            .map_err(|e| format!("failed to create working dir: {e}"))?;
1014                    }
1015                    expanded_working_dir.to_string_lossy().to_string()
1016                };
1017                let base_path = std::path::Path::new(&final_working_dir);
1018                // Canonicalize base ONCE (it exists — allocate_agent_workdir
1019                // or the explicit-path mkdir-p created it just above). Used
1020                // by the per-file symlink-escape verifier so we catch a
1021                // symlinked ancestor like `<base>/.claude -> /tmp/outside`
1022                // before fs::write follows it.
1023                let canonical_base = base_path.canonicalize().map_err(|e| {
1024                    format!("failed to canonicalize working dir {}: {e}", base_path.display())
1025                })?;
1026
1027                for file in &cmd.files {
1028                    // Lexical join + traversal check — works on Windows where
1029                    // canonicalize() adds the `\\?\` UNC prefix and breaks
1030                    // starts_with against not-yet-created files. Catches `..`,
1031                    // absolute paths, drive-letter prefixes (root and inner).
1032                    let file_path = crate::backend::base::safe_join_within_base(
1033                        base_path,
1034                        &file.path,
1035                    )
1036                    .map_err(|e| format!("path traversal denied: {} ({e})", file.path))?;
1037                    // Symlink-escape guard: if any EXISTING ancestor is a
1038                    // symlink that resolves outside the workdir, reject.
1039                    // No-op for fully-fresh agent dirs (the common case
1040                    // where every component is new).
1041                    crate::backend::base::verify_no_symlink_escape(&file_path, &canonical_base)
1042                        .map_err(|e| format!("path traversal denied: {} ({e})", file.path))?;
1043                    // Create parent directories if needed
1044                    if let Some(parent) = file_path.parent() {
1045                        if !parent.exists() {
1046                            std::fs::create_dir_all(parent)
1047                                .map_err(|e| format!("failed to create dir for {}: {e}", file.path))?;
1048                        }
1049                    }
1050                    std::fs::write(&file_path, &file.content)
1051                        .map_err(|e| format!("failed to write {}: {e}", file.path))?;
1052                    tracing::debug!(path = %file_path.display(), "wrote config file");
1053                }
1054
1055                // Return the final path so the caller can patch
1056                // `cmd:cwd` if collision resolution changed it.
1057                Ok(Some(serde_json::json!({
1058                    "working_dir": final_working_dir,
1059                })))
1060            })
1061        }),
1062    );
1063
1064    // readeditorfile → read file from disk for the editor pane
1065    engine.register_handler(
1066        "readeditorfile",
1067        Box::new(|data, _ctx| {
1068            Box::pin(async move {
1069                #[derive(serde::Deserialize)]
1070                struct Cmd { path: String }
1071                let cmd: Cmd = serde_json::from_value(data)
1072                    .map_err(|e| format!("readeditorfile: {e}"))?;
1073                let expanded = expand_home_dir_safe(&cmd.path);
1074                let path = expanded.as_path();
1075
1076                // Size guard: reject files > 10MB
1077                let metadata = std::fs::metadata(path)
1078                    .map_err(|e| format!("readeditorfile: {e}"))?;
1079                if metadata.len() > 10_000_000 {
1080                    return Err("File too large (>10MB)".to_string());
1081                }
1082
1083                let content = std::fs::read_to_string(path)
1084                    .map_err(|e| format!("readeditorfile: {e}"))?;
1085                let read_only = metadata.permissions().readonly();
1086
1087                Ok(Some(serde_json::json!({
1088                    "content": content,
1089                    "read_only": read_only,
1090                })))
1091            })
1092        }),
1093    );
1094
1095    // writeeditorfile → write file to disk from the editor pane
1096    engine.register_handler(
1097        "writeeditorfile",
1098        Box::new(|data, _ctx| {
1099            Box::pin(async move {
1100                #[derive(serde::Deserialize)]
1101                struct Cmd { path: String, content: String }
1102                let cmd: Cmd = serde_json::from_value(data)
1103                    .map_err(|e| format!("writeeditorfile: {e}"))?;
1104
1105                // Size guard: match readeditorfile's 10MB limit
1106                if cmd.content.len() > 10_000_000 {
1107                    return Err("Content too large (>10MB)".to_string());
1108                }
1109
1110                let expanded = expand_home_dir_safe(&cmd.path);
1111                let path = expanded.as_path();
1112
1113                // Path safety: restrict writes to under the user's home directory.
1114                // Allowlist approach — safer than an incomplete denylist.
1115                let home = dirs::home_dir()
1116                    .ok_or("writeeditorfile: cannot determine home directory")?;
1117                let canonical_home = home.canonicalize()
1118                    .map_err(|e| format!("writeeditorfile: home dir: {e}"))?;
1119
1120                // Resolve the target path (canonicalize existing, or parent + filename for new files)
1121                let canonical = path.canonicalize().or_else(|_| {
1122                    path.parent()
1123                        .and_then(|p| p.canonicalize().ok())
1124                        .map(|p| p.join(path.file_name().unwrap_or_default()))
1125                        .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "invalid path"))
1126                }).map_err(|e| format!("writeeditorfile: {e}"))?;
1127
1128                if !canonical.starts_with(&canonical_home) {
1129                    return Err(format!(
1130                        "writeeditorfile: path {} is outside home directory",
1131                        canonical.display()
1132                    ));
1133                }
1134
1135                std::fs::write(&canonical, &cmd.content)
1136                    .map_err(|e| format!("writeeditorfile: {e}"))?;
1137                tracing::info!(path = %canonical.display(), bytes = cmd.content.len(), "editor file saved");
1138
1139                Ok(None)
1140            })
1141        }),
1142    );
1143
1144    // CLI handlers (resolvecli, checkcliauth, runclilogin)
1145    super::cli_handlers::register_cli_handlers(engine, &state);
1146
1147    // Tool store handlers (gettoolstatus, installtool)
1148    super::tool_handlers::register_tool_handlers(engine, &state);
1149
1150    // eventreadhistory → read persisted event history from the WPS broker
1151    let broker_history = state.broker.clone();
1152    engine.register_handler(
1153        COMMAND_EVENT_READ_HISTORY,
1154        Box::new(move |data, _ctx| {
1155            let broker = broker_history.clone();
1156            Box::pin(async move {
1157                let cmd: CommandEventReadHistoryData = serde_json::from_value(data)
1158                    .map_err(|e| format!("eventreadhistory: {e}"))?;
1159                let max_items = if cmd.maxitems == 0 { 1024 } else { cmd.maxitems };
1160                let events = broker.read_event_history(&cmd.event, &cmd.scope, max_items);
1161                Ok(Some(serde_json::to_value(&events).unwrap_or_default()))
1162            })
1163        }),
1164    );
1165
1166    // setconfig → merge settings keys into settings.json AND update in-memory config immediately.
1167    // Writing to disk + broadcasting directly gives instant UI response without waiting for
1168    // the fs watcher (which has a ~300-800ms debounce + polling delay on Windows).
1169    // The fs watcher's subsequent reload is a no-op (settings already up to date).
1170    let config_watcher_setconfig = state.config_watcher.clone();
1171    let event_bus_setconfig = state.event_bus.clone();
1172    engine.register_handler(
1173        COMMAND_SET_CONFIG,
1174        Box::new(move |data, _ctx| {
1175            let cw = config_watcher_setconfig.clone();
1176            let eb = event_bus_setconfig.clone();
1177            Box::pin(async move {
1178                let new_keys: serde_json::Map<String, serde_json::Value> =
1179                    serde_json::from_value(data).map_err(|e| format!("setconfig: {e}"))?;
1180
1181                // 1. Write to disk (fs watcher will re-broadcast, harmlessly)
1182                crate::backend::config_watcher_fs::merge_settings_to_disk(new_keys.clone())
1183                    .map_err(|e| format!("setconfig write: {e}"))?;
1184
1185                // 2. Update in-memory config immediately
1186                let merged_settings = crate::backend::config_watcher_fs::merge_settings_into_current(&cw, new_keys);
1187                cw.update_settings(merged_settings);
1188
1189                // 3. Broadcast updated config now — no waiting for fs watcher
1190                let config = cw.get_full_config();
1191                if let Ok(config_val) = serde_json::to_value(config.as_ref()) {
1192                    let event = crate::backend::eventbus::WSEventType {
1193                        eventtype: crate::backend::eventbus::WS_EVENT_RPC.to_string(),
1194                        oref: String::new(),
1195                        data: Some(serde_json::json!({
1196                            "command": "eventrecv",
1197                            "data": {
1198                                "event": "config",
1199                                "data": { "fullconfig": config_val }
1200                            }
1201                        })),
1202                    };
1203                    eb.broadcast_event(&event);
1204                }
1205                Ok(None)
1206            })
1207        }),
1208    );
1209
1210    // Agent handlers (definitions, content, skills, history, import, reseed)
1211    super::agent_handlers::register_agent_handlers(engine, &state);
1212
1213    // Drone handlers (issue #753 — Drone pane DAG executor)
1214    super::drone_handlers::register_drone_handlers(engine, &state);
1215
1216    // Pre-launch OAuth handlers (auth.start / poll / submitcallback /
1217    // cancel / submitapikey — see docs/specs/SPEC_PRE_LAUNCH_OAUTH_FLOW_2026_05_14.md)
1218    super::identity_handlers::register_identity_handlers(engine, &state);
1219
1220    // Install handlers (install.start / install.cancel — see
1221    // docs/specs/SPEC_AGENT_INSTALL_STAGE_2026_05_17.md)
1222    super::install_handlers::register_install_handlers(engine, &state);
1223
1224    // App API handlers (agent.open, agent.send, agent.stop, agent.status, agent.list, agent.output)
1225    super::app_api::register_app_api_handlers(engine, &state);
1226}
1227
1228
1229/// Parse a CommandBlockInputData into a BlockInputUnion.
1230fn parse_block_input(
1231    cmd: &CommandBlockInputData,
1232) -> Result<blockcontroller::BlockInputUnion, String> {
1233    if !cmd.inputdata64.is_empty() {
1234        let data = base64::engine::general_purpose::STANDARD
1235            .decode(&cmd.inputdata64)
1236            .map_err(|e| format!("controllerinput: base64 decode: {e}"))?;
1237        return Ok(blockcontroller::BlockInputUnion::data(data));
1238    }
1239    if !cmd.signame.is_empty() {
1240        return Ok(blockcontroller::BlockInputUnion::signal(&cmd.signame));
1241    }
1242    if let Some(ref ts_val) = cmd.termsize {
1243        let ts: TermSize =
1244            serde_json::from_value(ts_val.clone()).map_err(|e| format!("controllerinput: {e}"))?;
1245        return Ok(blockcontroller::BlockInputUnion::resize(ts));
1246    }
1247    Err("controllerinput: no input data, signal, or termsize".to_string())
1248}