1use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use axum::{
8 extract::{
9 ws::{Message, WebSocket},
10 State, WebSocketUpgrade,
11 },
12 response::Response,
13};
14use base64::Engine as _;
15use serde::Deserialize;
16use serde_json::json;
17
18use crate::backend::base::expand_home_dir_safe;
19use crate::backend::blockcontroller;
20use crate::backend::rpc::engine::WshRpcEngine;
21use crate::backend::rpc_types::{
22 CommandBlockInputData, CommandControllerResyncData, CommandEventReadHistoryData,
23 CommandGetMetaData, CommandSetMetaData, CommandToolDecisionData,
24 RpcMessage, COMMAND_CONTROLLER_INPUT,
25 COMMAND_CONTROLLER_RESYNC, COMMAND_EVENT_READ_HISTORY, COMMAND_EVENT_SUB, COMMAND_EVENT_UNSUB,
26 COMMAND_EVENT_UNSUB_ALL, COMMAND_GET_FULL_CONFIG, COMMAND_GET_META,
27 COMMAND_GET_AI_RATE_LIMIT, COMMAND_ROUTE_ANNOUNCE, COMMAND_ROUTE_UNANNOUNCE,
28 COMMAND_SET_META, COMMAND_SET_CONFIG, COMMAND_APP_INFO,
29 COMMAND_SUBPROCESS_SPAWN, COMMAND_AGENT_INPUT, COMMAND_AGENT_STOP, COMMAND_TOOL_DECISION,
30 COMMAND_WRITE_AGENT_CONFIG,
31 CommandSubprocessSpawnData, CommandAgentInputData, CommandAgentStopData, CommandWriteAgentConfigData,
32};
33use crate::backend::obj::{Block, TermSize, WaveObjUpdate, wave_obj_to_value};
34use super::service::update_object_meta;
35
36use super::AppState;
37
38#[derive(Deserialize)]
41struct WSIncoming {
42 #[serde(rename = "type")]
43 msg_type: Option<String>,
44 #[allow(dead_code)]
45 stime: Option<i64>,
46 wscommand: Option<String>,
47 message: Option<RpcMessage>,
48 blockid: Option<String>,
50 inputdata64: Option<String>,
51 termsize: Option<serde_json::Value>,
52 agent_id: Option<String>,
54 from: Option<String>,
55 to: Option<String>,
56 target: Option<String>,
57 payload: Option<String>,
58 #[serde(rename = "bus_message")]
59 bus_message_text: Option<String>,
60 priority: Option<String>,
61}
62
63pub(super) async fn handle_ws(
64 State(state): State<AppState>,
65 ws: WebSocketUpgrade,
66) -> Response {
67 ws.on_upgrade(move |socket| handle_ws_connection(socket, state))
68}
69
70async fn handle_ws_connection(mut socket: WebSocket, state: AppState) {
71 let ws_start = std::time::Instant::now();
72 let conn_id = uuid::Uuid::new_v4().to_string();
73 let tab_id = String::new();
74
75 tracing::info!(conn_id = %conn_id, "WebSocket client connected");
76
77 let mut event_rx = state.event_bus.register_ws(&conn_id, &tab_id);
78 tracing::info!("[ws-perf] register_ws: {:.2}ms", ws_start.elapsed().as_secs_f64() * 1000.0);
79
80 let mut bus_rx: Option<tokio::sync::mpsc::UnboundedReceiver<crate::backend::messagebus::BusMessage>> = None;
82 let mut bus_agent_id: Option<String> = None;
83
84 {
88 let t = std::time::Instant::now();
89 let config = state.config_watcher.get_full_config();
90 if let Ok(config_val) = serde_json::to_value(config.as_ref()) {
91 let config_event = json!({
92 "eventtype": "rpc",
93 "data": {
94 "command": "eventrecv",
95 "data": {
96 "event": "config",
97 "data": { "fullconfig": config_val }
98 }
99 }
100 });
101 if let Ok(msg) = serde_json::to_string(&config_event) {
102 let _ = socket.send(Message::Text(msg.into())).await;
103 }
104 }
105 tracing::info!("[ws-perf] send_initial_config: {:.2}ms", t.elapsed().as_secs_f64() * 1000.0);
106 }
107
108 let t = std::time::Instant::now();
110 let (engine, mut rpc_output_rx) = WshRpcEngine::new();
111
112 register_handlers(&engine, state.clone());
114 tracing::info!("[ws-perf] create_engine+register_handlers: {:.2}ms", t.elapsed().as_secs_f64() * 1000.0);
115 tracing::info!("[ws-perf] TOTAL ws_setup: {:.2}ms", ws_start.elapsed().as_secs_f64() * 1000.0);
116
117 let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(10));
119 ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
120
121 loop {
122 tokio::select! {
123 Some(event) = event_rx.recv() => {
133 let msg = if event["eventtype"] == "rpc" {
134 serde_json::to_string(&event).unwrap_or_default()
136 } else {
137 let wave_event = json!({
139 "event": event["eventtype"],
140 "scopes": [event["oref"]],
141 "data": event["data"],
142 });
143 let wrapped = json!({
144 "eventtype": "rpc",
145 "data": {
146 "command": "eventrecv",
147 "data": wave_event,
148 },
149 });
150 serde_json::to_string(&wrapped).unwrap_or_default()
151 };
152 if socket.send(Message::Text(msg.into())).await.is_err() {
153 break;
154 }
155 }
156
157 Some(rpc_msg) = rpc_output_rx.recv() => {
159 let wrapped = json!({
160 "eventtype": "rpc",
161 "data": rpc_msg,
162 });
163 let msg = serde_json::to_string(&wrapped).unwrap_or_default();
164 if socket.send(Message::Text(msg.into())).await.is_err() {
165 break;
166 }
167 }
168
169 Some(bus_msg) = async {
171 match bus_rx.as_mut() {
172 Some(rx) => rx.recv().await,
173 None => std::future::pending().await,
174 }
175 } => {
176 let wrapped = json!({
177 "type": "bus:message",
178 "data": bus_msg,
179 });
180 let msg = serde_json::to_string(&wrapped).unwrap_or_default();
181 if socket.send(Message::Text(msg.into())).await.is_err() {
182 break;
183 }
184 }
185
186 msg = socket.recv() => {
188 match msg {
189 Some(Ok(Message::Close(_))) | None => break,
190 Some(Ok(Message::Ping(data))) => {
191 let _ = socket.send(Message::Pong(data)).await;
192 }
193 Some(Ok(Message::Text(text))) => {
194 match handle_incoming_text(&text, &engine, &state, &mut socket).await {
195 Err(true) => break,
196 Ok(Some((new_rx, agent_id))) => {
197 bus_rx = Some(new_rx);
199 bus_agent_id = Some(agent_id);
200 }
201 _ => {}
202 }
203 }
204 Some(Ok(_)) => {
205 }
207 Some(Err(_)) => break,
208 }
209 }
210
211 _ = ping_interval.tick() => {
213 let now = SystemTime::now()
214 .duration_since(UNIX_EPOCH)
215 .unwrap_or_default()
216 .as_millis() as i64;
217 let ping = json!({ "type": "ping", "stime": now });
218 let msg = serde_json::to_string(&ping).unwrap_or_default();
219 if socket.send(Message::Text(msg.into())).await.is_err() {
220 break;
221 }
222 }
223 }
224 }
225
226 tracing::info!(conn_id = %conn_id, "WebSocket client disconnected");
227 state.event_bus.unregister_ws(&conn_id);
228
229 if let Some(ref agent_id) = bus_agent_id {
231 state.messagebus.unregister(agent_id);
232 }
233}
234
235async fn handle_incoming_text(
239 text: &str,
240 engine: &Arc<WshRpcEngine>,
241 state: &AppState,
242 socket: &mut WebSocket,
243) -> Result<Option<(tokio::sync::mpsc::UnboundedReceiver<crate::backend::messagebus::BusMessage>, String)>, bool> {
244 let incoming: WSIncoming = match serde_json::from_str(text) {
245 Ok(v) => v,
246 Err(e) => {
247 tracing::warn!("ws: invalid JSON: {}", e);
248 return Ok(None);
249 }
250 };
251
252 if let Some(ref msg_type) = incoming.msg_type {
254 match msg_type.as_str() {
255 "ping" => {
256 let now = SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .unwrap_or_default()
259 .as_millis() as i64;
260 let pong = json!({ "type": "pong", "stime": now });
261 let msg = serde_json::to_string(&pong).unwrap_or_default();
262 if socket.send(Message::Text(msg.into())).await.is_err() {
263 return Err(true);
264 }
265 return Ok(None);
266 }
267 "pong" => {
268 return Ok(None);
269 }
270 "bus:register" => {
271 if let Some(ref agent_id) = incoming.agent_id {
272 let rx = state.messagebus.register(agent_id, "websocket");
273 let ack = json!({ "type": "bus:registered", "agent_id": agent_id });
274 let msg = serde_json::to_string(&ack).unwrap_or_default();
275 if socket.send(Message::Text(msg.into())).await.is_err() {
276 return Err(true);
277 }
278 return Ok(Some((rx, agent_id.clone())));
279 }
280 return Ok(None);
281 }
282 "bus:send" => {
283 if let (Some(ref from), Some(ref to), Some(ref payload)) =
284 (&incoming.from, &incoming.to, &incoming.payload)
285 {
286 let priority = match incoming.priority.as_deref() {
287 Some("high") => crate::backend::messagebus::Priority::High,
288 Some("urgent") => crate::backend::messagebus::Priority::Urgent,
289 _ => crate::backend::messagebus::Priority::Normal,
290 };
291 let bus_msg = crate::backend::messagebus::BusMessage::new(
292 from, to, crate::backend::messagebus::MessageType::Send, payload, priority,
293 );
294 let msg_id = bus_msg.id.clone();
295 let _ = state.messagebus.send(bus_msg);
296 let ack = json!({ "type": "bus:sent", "message_id": msg_id });
297 let msg = serde_json::to_string(&ack).unwrap_or_default();
298 if socket.send(Message::Text(msg.into())).await.is_err() {
299 return Err(true);
300 }
301 }
302 return Ok(None);
303 }
304 "bus:inject" => {
305 let from = incoming.from.as_deref().unwrap_or("unknown");
306 if let (Some(ref target), Some(ref message)) =
307 (&incoming.target, &incoming.bus_message_text)
308 {
309 let reactive_req = crate::backend::reactive::InjectionRequest {
311 target_agent: target.clone(),
312 message: message.clone(),
313 source_agent: Some(from.to_string()),
314 request_id: None,
315 priority: incoming.priority.clone(),
316 wait_for_idle: false,
317 };
318 let resp = state.reactive_handler.inject_message(reactive_req);
319 if resp.success {
320 let ack = json!({ "type": "bus:injected", "via": "pty", "block_id": resp.block_id });
321 let msg = serde_json::to_string(&ack).unwrap_or_default();
322 if socket.send(Message::Text(msg.into())).await.is_err() {
323 return Err(true);
324 }
325 return Ok(None);
326 }
327
328 let is_not_found = resp.error.as_deref().map(|e| e.contains("not found")).unwrap_or(false);
330 if !is_not_found {
331 let err = json!({ "type": "bus:error", "error": resp.error });
332 let msg = serde_json::to_string(&err).unwrap_or_default();
333 if socket.send(Message::Text(msg.into())).await.is_err() {
334 return Err(true);
335 }
336 return Ok(None);
337 }
338
339 let priority = match incoming.priority.as_deref() {
341 Some("high") => crate::backend::messagebus::Priority::High,
342 Some("urgent") => crate::backend::messagebus::Priority::Urgent,
343 _ => crate::backend::messagebus::Priority::Normal,
344 };
345 match state.messagebus.inject(from, target, message, priority) {
346 Ok(msg_id) => {
347 let ack = json!({ "type": "bus:injected", "via": "messagebus", "message_id": msg_id });
348 let msg = serde_json::to_string(&ack).unwrap_or_default();
349 if socket.send(Message::Text(msg.into())).await.is_err() {
350 return Err(true);
351 }
352 }
353 Err(e) => {
354 let err = json!({ "type": "bus:error", "error": e });
355 let msg = serde_json::to_string(&err).unwrap_or_default();
356 if socket.send(Message::Text(msg.into())).await.is_err() {
357 return Err(true);
358 }
359 }
360 }
361 }
362 return Ok(None);
363 }
364 "bus:broadcast" => {
365 let from = incoming.from.as_deref().unwrap_or("unknown");
366 if let Some(ref payload) = incoming.payload {
367 let priority = match incoming.priority.as_deref() {
368 Some("high") => crate::backend::messagebus::Priority::High,
369 Some("urgent") => crate::backend::messagebus::Priority::Urgent,
370 _ => crate::backend::messagebus::Priority::Normal,
371 };
372 let _ = state.messagebus.broadcast(from, payload, priority);
373 }
374 return Ok(None);
375 }
376 _ => {}
377 }
378 }
379
380 if let Some(ref wscommand) = incoming.wscommand {
382 match wscommand.as_str() {
383 "rpc" => {
384 if let Some(rpc_msg) = incoming.message {
385 engine.handle_message(rpc_msg);
386 } else {
387 tracing::warn!("ws: rpc wscommand missing message field");
388 }
389 }
390 "blockinput" => {
391 if let Some(ref block_id) = incoming.blockid {
392 if let Some(ref data64) = incoming.inputdata64 {
393 if !data64.is_empty() {
394 match base64::engine::general_purpose::STANDARD.decode(data64) {
395 Ok(data) => {
396 let input = blockcontroller::BlockInputUnion::data(data);
397 if let Err(e) = blockcontroller::send_input(block_id, input, None) {
398 tracing::debug!("ws: blockinput error: {}", e);
399 }
400 }
401 Err(e) => {
402 tracing::warn!("ws: blockinput base64 decode error: {}", e);
403 }
404 }
405 }
406 }
407 }
408 }
409 "setblocktermsize" => {
410 if let Some(ref block_id) = incoming.blockid {
411 if let Some(ref ts_val) = incoming.termsize {
412 match serde_json::from_value::<TermSize>(ts_val.clone()) {
413 Ok(ts) => {
414 let input = blockcontroller::BlockInputUnion::resize(ts);
415 if let Err(e) = blockcontroller::send_input(block_id, input, None) {
416 tracing::debug!("ws: setblocktermsize error: {}", e);
417 }
418 }
419 Err(e) => {
420 tracing::warn!("ws: setblocktermsize parse error: {}", e);
421 }
422 }
423 }
424 }
425 }
426 other => {
427 tracing::warn!("ws: unknown wscommand: {}", other);
428 }
429 }
430 }
431
432 Ok(None)
433}
434
435fn register_handlers(engine: &Arc<WshRpcEngine>, state: AppState) {
436 let config_watcher = state.config_watcher.clone();
438 engine.register_handler(
439 COMMAND_GET_FULL_CONFIG,
440 Box::new(move |_data, _ctx| {
441 let cw = config_watcher.clone();
442 Box::pin(async move {
443 let config = cw.get_full_config();
444 match serde_json::to_value(config.as_ref()) {
445 Ok(v) => Ok(Some(v)),
446 Err(e) => Err(format!("failed to serialize config: {}", e)),
447 }
448 })
449 }),
450 );
451
452 engine.register_handler(
454 COMMAND_ROUTE_ANNOUNCE,
455 Box::new(|data, _ctx| {
456 Box::pin(async move {
457 tracing::debug!("routeannounce: {:?}", data);
458 Ok(None)
459 })
460 }),
461 );
462
463 engine.register_handler(
465 COMMAND_ROUTE_UNANNOUNCE,
466 Box::new(|_data, _ctx| Box::pin(async move { Ok(None) })),
467 );
468
469 let broker_sub = state.broker.clone();
471 engine.register_handler(
472 COMMAND_EVENT_SUB,
473 Box::new(move |data, _ctx| {
474 let broker = broker_sub.clone();
475 Box::pin(async move {
476 let sub: crate::backend::wps::SubscriptionRequest =
477 serde_json::from_value(data).map_err(|e| format!("eventsub: {e}"))?;
478 tracing::debug!("eventsub: event={} scopes={:?} allscopes={}", sub.event, sub.scopes, sub.allscopes);
479 broker.subscribe("ws-main", sub);
480 Ok(None)
481 })
482 }),
483 );
484
485 let broker_unsub = state.broker.clone();
487 engine.register_handler(
488 COMMAND_EVENT_UNSUB,
489 Box::new(move |data, _ctx| {
490 let broker = broker_unsub.clone();
491 Box::pin(async move {
492 let event_name = data.as_str().unwrap_or("").to_string();
493 if !event_name.is_empty() {
494 broker.unsubscribe("ws-main", &event_name);
495 }
496 Ok(None)
497 })
498 }),
499 );
500
501 let broker_unsub_all = state.broker.clone();
503 engine.register_handler(
504 COMMAND_EVENT_UNSUB_ALL,
505 Box::new(move |_data, _ctx| {
506 let broker = broker_unsub_all.clone();
507 Box::pin(async move {
508 broker.unsubscribe_all("ws-main");
509 Ok(None)
510 })
511 }),
512 );
513
514 let wstore_sm = state.wstore.clone();
516 let event_bus_sm = state.event_bus.clone();
517 engine.register_handler(
518 COMMAND_SET_META,
519 Box::new(move |data, _ctx| {
520 let wstore = wstore_sm.clone();
521 let event_bus = event_bus_sm.clone();
522 Box::pin(async move {
523 let cmd: CommandSetMetaData =
524 serde_json::from_value(data).map_err(|e| format!("setmeta: {e}"))?;
525 let oref_str = cmd.oref.to_string();
526 let meta_keys: Vec<&String> = cmd.meta.keys().collect();
527 tracing::info!(oref = %oref_str, keys = ?meta_keys, "SetMeta");
528 update_object_meta(&wstore, &oref_str, &cmd.meta)?;
529 let oref = crate::backend::ORef::parse(&oref_str)
532 .map_err(|e| e.to_string())?;
533 let update_data = if oref.otype == "block" {
534 if let Ok(block) = wstore.must_get::<Block>(&oref.oid) {
535 Some(serde_json::to_value(&WaveObjUpdate {
536 updatetype: "update".into(),
537 otype: oref.otype.clone(),
538 oid: oref.oid.clone(),
539 obj: Some(wave_obj_to_value(&block)),
540 }).unwrap_or_default())
541 } else { None }
542 } else { None };
543 event_bus.broadcast_event(&crate::backend::eventbus::WSEventType {
544 eventtype: "waveobj:update".to_string(),
545 oref: oref_str,
546 data: update_data,
547 });
548 Ok(None)
549 })
550 }),
551 );
552
553 let wstore_gm = state.wstore.clone();
555 engine.register_handler(
556 COMMAND_GET_META,
557 Box::new(move |data, _ctx| {
558 let wstore = wstore_gm.clone();
559 Box::pin(async move {
560 let cmd: CommandGetMetaData =
561 serde_json::from_value(data).map_err(|e| format!("getmeta: {e}"))?;
562 let obj: Option<serde_json::Value> = wstore
563 .get_raw(&cmd.oref.otype, &cmd.oref.oid)
564 .map_err(|e| format!("getmeta: {e}"))?;
565 match obj {
566 Some(val) => {
567 let meta = val.get("meta").cloned().unwrap_or(val);
569 Ok(Some(meta))
570 }
571 None => Err(format!("getmeta: object {} not found", cmd.oref)),
572 }
573 })
574 }),
575 );
576
577 let version_info = state.version.clone();
579 engine.register_handler(
580 COMMAND_APP_INFO,
581 Box::new(move |_data, _ctx| {
582 let version = version_info.clone();
583 Box::pin(async move {
584 Ok(Some(serde_json::json!({
585 "version": version,
586 })))
587 })
588 }),
589 );
590
591 engine.register_handler(
593 COMMAND_GET_AI_RATE_LIMIT,
594 Box::new(|_data, _ctx| {
595 Box::pin(async move {
596 Ok(Some(serde_json::json!({
597 "req": 9999,
598 "reqlimit": 9999,
599 "preq": 9999,
600 "preqlimit": 9999,
601 "resetepoch": 0,
602 "unknown": true
603 })))
604 })
605 }),
606 );
607
608 let wstore_resync = state.wstore.clone();
610 let broker_resync = state.broker.clone();
611 let event_bus_resync = state.event_bus.clone();
612 let filestore_resync = state.filestore.clone();
613 engine.register_handler(
614 COMMAND_CONTROLLER_RESYNC,
615 Box::new(move |data, _ctx| {
616 let wstore = wstore_resync.clone();
617 let broker = broker_resync.clone();
618 let event_bus = event_bus_resync.clone();
619 let filestore = filestore_resync.clone();
620 Box::pin(async move {
621 let cmd: CommandControllerResyncData = serde_json::from_value(data)
622 .map_err(|e| format!("controllerresync: {e}"))?;
623 tracing::info!(
624 block_id = %cmd.blockid,
625 tab_id = %cmd.tabid,
626 forcerestart = cmd.forcerestart,
627 "ControllerResync"
628 );
629 let block: Block = wstore
630 .get(&cmd.blockid)
631 .map_err(|e| format!("controllerresync: load block: {e}"))?
632 .ok_or_else(|| format!("controllerresync: block {} not found", cmd.blockid))?;
633 blockcontroller::resync_controller(
634 &block,
635 &cmd.tabid,
636 cmd.rtopts,
637 cmd.forcerestart,
638 Some(broker),
639 Some(event_bus),
640 Some(wstore),
641 Some(filestore),
642 )?;
643 Ok(None)
644 })
645 }),
646 );
647
648 engine.register_handler(
650 COMMAND_CONTROLLER_INPUT,
651 Box::new(|data, _ctx| {
652 Box::pin(async move {
653 let cmd: CommandBlockInputData = serde_json::from_value(data)
654 .map_err(|e| format!("controllerinput: {e}"))?;
655 let input = parse_block_input(&cmd)?;
656 blockcontroller::send_input(&cmd.blockid, input, cmd.seq)?;
657 Ok(None)
658 })
659 }),
660 );
661
662 engine.register_handler(
682 COMMAND_TOOL_DECISION,
683 Box::new(|data, _ctx| {
684 Box::pin(async move {
685 let cmd: CommandToolDecisionData = serde_json::from_value(data)
686 .map_err(|e| format!("tooldecision: {e}"))?;
687 match cmd.outcome.as_str() {
688 "allow" | "deny" => {}
689 other => {
690 return Err(format!(
691 "tooldecision: invalid outcome '{}' (expected 'allow' or 'deny')",
692 other
693 ));
694 }
695 }
696 match cmd.scope.as_str() {
700 "once" | "session" | "project" | "global" => {}
701 other => {
702 return Err(format!(
703 "tooldecision: invalid scope '{}' (expected 'once'/'session'/'project'/'global')",
704 other
705 ));
706 }
707 }
708 tracing::info!(
709 block_id = %cmd.blockid,
710 request_id = %cmd.request_id,
711 outcome = %cmd.outcome,
712 scope = %cmd.scope,
713 has_feedback = cmd.feedback.is_some(),
714 "[tooldecision] received (delivery mechanism deferred to PR-3b/PR-4)"
715 );
716 Ok(None)
717 })
718 }),
719 );
720
721 let wstore_spawn = state.wstore.clone();
723 let broker_spawn = state.broker.clone();
724 let event_bus_spawn = state.event_bus.clone();
725 let filestore_spawn = state.filestore.clone();
726 engine.register_handler(
727 COMMAND_SUBPROCESS_SPAWN,
728 Box::new(move |data, _ctx| {
729 let wstore = wstore_spawn.clone();
730 let broker = broker_spawn.clone();
731 let event_bus = event_bus_spawn.clone();
732 let filestore = filestore_spawn.clone();
733 Box::pin(async move {
734 let cmd: CommandSubprocessSpawnData = serde_json::from_value(data)
735 .map_err(|e| format!("subprocessspawn: {e}"))?;
736 tracing::info!(
737 block_id = %cmd.blockid,
738 cli = %cmd.cli_command,
739 "SubprocessSpawn"
740 );
741
742 let ctrl = match blockcontroller::get_controller(&cmd.blockid) {
744 Some(c) if c.controller_type() == blockcontroller::BLOCK_CONTROLLER_SUBPROCESS => c,
745 _ => {
746 let ctrl = blockcontroller::subprocess::SubprocessController::new(
748 cmd.tabid.clone(),
749 cmd.blockid.clone(),
750 Some(broker),
751 Some(event_bus),
752 Some(wstore),
753 Some(filestore),
754 );
755 let ctrl = std::sync::Arc::new(ctrl);
756 ctrl.set_self_ref();
757 blockcontroller::register_controller(&cmd.blockid, ctrl.clone());
758 ctrl as std::sync::Arc<dyn blockcontroller::Controller>
759 }
760 };
761
762 let subprocess_ctrl = ctrl
764 .as_any()
765 .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
766 .ok_or_else(|| "controller is not a SubprocessController".to_string())?;
767
768 let config = blockcontroller::subprocess::SubprocessSpawnConfig {
769 cli_command: cmd.cli_command,
770 cli_args: cmd.cli_args,
771 working_dir: cmd.working_dir,
772 env_vars: cmd.env_vars,
773 message: cmd.message,
774 resume_flag: "--resume".to_string(),
775 session_id_field: "session_id".to_string(),
776 message_id: None,
777 session_id: None,
782 };
783 subprocess_ctrl.spawn_turn(config)?;
784 Ok(None)
785 })
786 }),
787 );
788
789 let wstore_ai = state.wstore.clone();
791 let auth_key_ai = state.auth_key.clone();
795 let broker_ai = state.broker.clone();
799 engine.register_handler(
800 COMMAND_AGENT_INPUT,
801 Box::new(move |data, _ctx| {
802 let wstore = wstore_ai.clone();
803 let auth_key = auth_key_ai.clone();
804 let broker = broker_ai.clone();
805 Box::pin(async move {
806 let cmd: CommandAgentInputData = serde_json::from_value(data)
807 .map_err(|e| format!("agentinput: {e}"))?;
808 tracing::info!(block_id = %cmd.blockid, "AgentInput");
809
810 let ctrl = blockcontroller::get_controller(&cmd.blockid)
811 .ok_or_else(|| format!("no controller for block {}", cmd.blockid))?;
812
813 let block: Block = wstore
815 .get(&cmd.blockid)
816 .map_err(|e| format!("agentinput: load block: {e}"))?
817 .ok_or_else(|| format!("block {} not found", cmd.blockid))?;
818
819 let cli_command = crate::backend::obj::meta_get_string(
820 &block.meta, "cmd", "claude",
821 );
822 let cli_args: Vec<String> = match block.meta.get("cmd:args") {
823 Some(serde_json::Value::Array(arr)) => arr
824 .iter()
825 .filter_map(|v| v.as_str().map(|s| s.to_string()))
826 .collect(),
827 _ => vec![
828 "-p".to_string(),
829 "--input-format".to_string(),
830 "stream-json".to_string(),
831 "--output-format".to_string(),
832 "stream-json".to_string(),
833 ],
834 };
835 let working_dir = crate::backend::obj::meta_get_string(
836 &block.meta, "cmd:cwd", "",
837 );
838 let mut env_vars: std::collections::HashMap<String, String> = match block.meta.get("cmd:env") {
839 Some(serde_json::Value::Object(obj)) => obj
840 .iter()
841 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
842 .collect(),
843 _ => std::collections::HashMap::new(),
844 };
845 crate::identity::resolver::inject_identity_env_with_broker(
855 wstore.clone(),
856 Some(broker.clone()),
857 &cmd.blockid,
858 &mut env_vars,
859 );
860 env_vars.insert("AGENTMUX_AUTH_KEY".to_string(), auth_key.clone());
875 env_vars.insert("AGENTMUX_BLOCKID".to_string(), cmd.blockid.clone());
880 {
887 let existing = env_vars
888 .get("PATH")
889 .cloned()
890 .or_else(|| std::env::var("PATH").ok())
891 .unwrap_or_default();
892 let sep = if cfg!(windows) { ";" } else { ":" };
893 let mut extras: Vec<String> = Vec::new();
894 if let Some(d) = crate::backend::tool_store::bundled_tools_dir() {
895 if d.exists() {
896 extras.push(d.to_string_lossy().into_owned());
897 }
898 }
899 if let Some(d) = crate::backend::tool_store::user_tools_dir() {
900 if d.exists() {
901 extras.push(d.to_string_lossy().into_owned());
902 }
903 }
904 if !extras.is_empty() {
905 let new_path = format!("{}{}{}", extras.join(sep), sep, existing);
906 env_vars.insert("PATH".to_string(), new_path);
907 }
908 }
909
910 let session_id_field = crate::backend::obj::meta_get_string(
911 &block.meta, "agent:session_id_field", "session_id",
912 );
913
914 if let Some(persistent_ctrl) = ctrl
916 .as_any()
917 .downcast_ref::<blockcontroller::persistent::PersistentSubprocessController>()
918 {
919 let config = blockcontroller::persistent::PersistentSpawnConfig {
920 cli_command,
921 cli_args,
922 working_dir,
923 env_vars,
924 session_id_field,
925 };
926 persistent_ctrl.send_message(cmd.message, config)?;
927 } else if let Some(subprocess_ctrl) = ctrl
928 .as_any()
929 .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
930 {
931 let resume_flag = crate::backend::obj::meta_get_string(
932 &block.meta, "agent:resume_flag", "--resume",
933 );
934 let persisted_session_id = crate::backend::obj::meta_get_string(
940 &block.meta, "agent:sessionid", "",
941 );
942 let config = blockcontroller::subprocess::SubprocessSpawnConfig {
943 cli_command,
944 cli_args,
945 working_dir,
946 env_vars,
947 message: cmd.message,
948 resume_flag,
949 session_id_field,
950 message_id: cmd.message_id,
951 session_id: if persisted_session_id.is_empty() {
952 None
953 } else {
954 Some(persisted_session_id)
955 },
956 };
957 subprocess_ctrl.spawn_turn(config)?;
958 } else {
959 return Err("controller is not a SubprocessController or PersistentSubprocessController".to_string());
960 }
961
962 Ok(None)
963 })
964 }),
965 );
966
967 engine.register_handler(
969 COMMAND_AGENT_STOP,
970 Box::new(|data, _ctx| {
971 Box::pin(async move {
972 let cmd: CommandAgentStopData = serde_json::from_value(data)
973 .map_err(|e| format!("agentstop: {e}"))?;
974 tracing::info!(block_id = %cmd.blockid, force = cmd.force, "AgentStop");
975 match blockcontroller::get_controller(&cmd.blockid) {
976 Some(ctrl) => {
977 ctrl.stop(!cmd.force, blockcontroller::STATUS_DONE)?;
978 Ok(None)
979 }
980 None => Ok(None),
981 }
982 })
983 }),
984 );
985
986 engine.register_handler(
988 COMMAND_WRITE_AGENT_CONFIG,
989 Box::new(|data, _ctx| {
990 Box::pin(async move {
991 let cmd: CommandWriteAgentConfigData = serde_json::from_value(data)
992 .map_err(|e| format!("writeagentconfig: {e}"))?;
993 tracing::info!(
994 working_dir = %cmd.working_dir,
995 file_count = cmd.files.len(),
996 auto_allocate = cmd.auto_allocate,
997 "WriteAgentConfig"
998 );
999
1000 let expanded_working_dir = expand_home_dir_safe(&cmd.working_dir);
1006 let final_working_dir = if cmd.auto_allocate {
1007 let desired = expanded_working_dir.to_string_lossy().to_string();
1008 crate::server::app_api::allocate_agent_workdir(&desired)?
1009 } else {
1010 let p = expanded_working_dir.as_path();
1011 if !p.exists() {
1012 std::fs::create_dir_all(p)
1013 .map_err(|e| format!("failed to create working dir: {e}"))?;
1014 }
1015 expanded_working_dir.to_string_lossy().to_string()
1016 };
1017 let base_path = std::path::Path::new(&final_working_dir);
1018 let canonical_base = base_path.canonicalize().map_err(|e| {
1024 format!("failed to canonicalize working dir {}: {e}", base_path.display())
1025 })?;
1026
1027 for file in &cmd.files {
1028 let file_path = crate::backend::base::safe_join_within_base(
1033 base_path,
1034 &file.path,
1035 )
1036 .map_err(|e| format!("path traversal denied: {} ({e})", file.path))?;
1037 crate::backend::base::verify_no_symlink_escape(&file_path, &canonical_base)
1042 .map_err(|e| format!("path traversal denied: {} ({e})", file.path))?;
1043 if let Some(parent) = file_path.parent() {
1045 if !parent.exists() {
1046 std::fs::create_dir_all(parent)
1047 .map_err(|e| format!("failed to create dir for {}: {e}", file.path))?;
1048 }
1049 }
1050 std::fs::write(&file_path, &file.content)
1051 .map_err(|e| format!("failed to write {}: {e}", file.path))?;
1052 tracing::debug!(path = %file_path.display(), "wrote config file");
1053 }
1054
1055 Ok(Some(serde_json::json!({
1058 "working_dir": final_working_dir,
1059 })))
1060 })
1061 }),
1062 );
1063
1064 engine.register_handler(
1066 "readeditorfile",
1067 Box::new(|data, _ctx| {
1068 Box::pin(async move {
1069 #[derive(serde::Deserialize)]
1070 struct Cmd { path: String }
1071 let cmd: Cmd = serde_json::from_value(data)
1072 .map_err(|e| format!("readeditorfile: {e}"))?;
1073 let expanded = expand_home_dir_safe(&cmd.path);
1074 let path = expanded.as_path();
1075
1076 let metadata = std::fs::metadata(path)
1078 .map_err(|e| format!("readeditorfile: {e}"))?;
1079 if metadata.len() > 10_000_000 {
1080 return Err("File too large (>10MB)".to_string());
1081 }
1082
1083 let content = std::fs::read_to_string(path)
1084 .map_err(|e| format!("readeditorfile: {e}"))?;
1085 let read_only = metadata.permissions().readonly();
1086
1087 Ok(Some(serde_json::json!({
1088 "content": content,
1089 "read_only": read_only,
1090 })))
1091 })
1092 }),
1093 );
1094
1095 engine.register_handler(
1097 "writeeditorfile",
1098 Box::new(|data, _ctx| {
1099 Box::pin(async move {
1100 #[derive(serde::Deserialize)]
1101 struct Cmd { path: String, content: String }
1102 let cmd: Cmd = serde_json::from_value(data)
1103 .map_err(|e| format!("writeeditorfile: {e}"))?;
1104
1105 if cmd.content.len() > 10_000_000 {
1107 return Err("Content too large (>10MB)".to_string());
1108 }
1109
1110 let expanded = expand_home_dir_safe(&cmd.path);
1111 let path = expanded.as_path();
1112
1113 let home = dirs::home_dir()
1116 .ok_or("writeeditorfile: cannot determine home directory")?;
1117 let canonical_home = home.canonicalize()
1118 .map_err(|e| format!("writeeditorfile: home dir: {e}"))?;
1119
1120 let canonical = path.canonicalize().or_else(|_| {
1122 path.parent()
1123 .and_then(|p| p.canonicalize().ok())
1124 .map(|p| p.join(path.file_name().unwrap_or_default()))
1125 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "invalid path"))
1126 }).map_err(|e| format!("writeeditorfile: {e}"))?;
1127
1128 if !canonical.starts_with(&canonical_home) {
1129 return Err(format!(
1130 "writeeditorfile: path {} is outside home directory",
1131 canonical.display()
1132 ));
1133 }
1134
1135 std::fs::write(&canonical, &cmd.content)
1136 .map_err(|e| format!("writeeditorfile: {e}"))?;
1137 tracing::info!(path = %canonical.display(), bytes = cmd.content.len(), "editor file saved");
1138
1139 Ok(None)
1140 })
1141 }),
1142 );
1143
1144 super::cli_handlers::register_cli_handlers(engine, &state);
1146
1147 super::tool_handlers::register_tool_handlers(engine, &state);
1149
1150 let broker_history = state.broker.clone();
1152 engine.register_handler(
1153 COMMAND_EVENT_READ_HISTORY,
1154 Box::new(move |data, _ctx| {
1155 let broker = broker_history.clone();
1156 Box::pin(async move {
1157 let cmd: CommandEventReadHistoryData = serde_json::from_value(data)
1158 .map_err(|e| format!("eventreadhistory: {e}"))?;
1159 let max_items = if cmd.maxitems == 0 { 1024 } else { cmd.maxitems };
1160 let events = broker.read_event_history(&cmd.event, &cmd.scope, max_items);
1161 Ok(Some(serde_json::to_value(&events).unwrap_or_default()))
1162 })
1163 }),
1164 );
1165
1166 let config_watcher_setconfig = state.config_watcher.clone();
1171 let event_bus_setconfig = state.event_bus.clone();
1172 engine.register_handler(
1173 COMMAND_SET_CONFIG,
1174 Box::new(move |data, _ctx| {
1175 let cw = config_watcher_setconfig.clone();
1176 let eb = event_bus_setconfig.clone();
1177 Box::pin(async move {
1178 let new_keys: serde_json::Map<String, serde_json::Value> =
1179 serde_json::from_value(data).map_err(|e| format!("setconfig: {e}"))?;
1180
1181 crate::backend::config_watcher_fs::merge_settings_to_disk(new_keys.clone())
1183 .map_err(|e| format!("setconfig write: {e}"))?;
1184
1185 let merged_settings = crate::backend::config_watcher_fs::merge_settings_into_current(&cw, new_keys);
1187 cw.update_settings(merged_settings);
1188
1189 let config = cw.get_full_config();
1191 if let Ok(config_val) = serde_json::to_value(config.as_ref()) {
1192 let event = crate::backend::eventbus::WSEventType {
1193 eventtype: crate::backend::eventbus::WS_EVENT_RPC.to_string(),
1194 oref: String::new(),
1195 data: Some(serde_json::json!({
1196 "command": "eventrecv",
1197 "data": {
1198 "event": "config",
1199 "data": { "fullconfig": config_val }
1200 }
1201 })),
1202 };
1203 eb.broadcast_event(&event);
1204 }
1205 Ok(None)
1206 })
1207 }),
1208 );
1209
1210 super::agent_handlers::register_agent_handlers(engine, &state);
1212
1213 super::drone_handlers::register_drone_handlers(engine, &state);
1215
1216 super::identity_handlers::register_identity_handlers(engine, &state);
1219
1220 super::install_handlers::register_install_handlers(engine, &state);
1223
1224 super::app_api::register_app_api_handlers(engine, &state);
1226}
1227
1228
1229fn parse_block_input(
1231 cmd: &CommandBlockInputData,
1232) -> Result<blockcontroller::BlockInputUnion, String> {
1233 if !cmd.inputdata64.is_empty() {
1234 let data = base64::engine::general_purpose::STANDARD
1235 .decode(&cmd.inputdata64)
1236 .map_err(|e| format!("controllerinput: base64 decode: {e}"))?;
1237 return Ok(blockcontroller::BlockInputUnion::data(data));
1238 }
1239 if !cmd.signame.is_empty() {
1240 return Ok(blockcontroller::BlockInputUnion::signal(&cmd.signame));
1241 }
1242 if let Some(ref ts_val) = cmd.termsize {
1243 let ts: TermSize =
1244 serde_json::from_value(ts_val.clone()).map_err(|e| format!("controllerinput: {e}"))?;
1245 return Ok(blockcontroller::BlockInputUnion::resize(ts));
1246 }
1247 Err("controllerinput: no input data, signal, or termsize".to_string())
1248}