1use std::sync::Arc;
11
12use base64::Engine;
13use serde_json::json;
14
15use crate::backend::blockcontroller;
16use crate::backend::obj::{self, Block, Tab, Workspace, MetaMapType};
17use crate::backend::providers;
18use crate::backend::rpc::engine::WshRpcEngine;
19use crate::backend::rpc_types::*;
20use crate::backend::session_archive;
21use crate::backend::storage::wstore::WaveStore;
22
23use super::AppState;
24use crate::server::cli_handlers::resolve_cli_on_path;
25
26pub fn register_app_api_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
28 register_agent_open(engine, state);
29 register_agent_send(engine, state);
30 register_agent_stop(engine, state);
31 register_agent_status(engine, state);
32 register_agent_list(engine, state);
33 register_agent_output(engine, state);
34 register_agent_process_list(engine, state);
35 register_agent_tracked_blocks(engine, state);
36 register_agent_kill_process(engine, state);
37 register_agent_kill_tree(engine, state);
38 register_pane_open(engine, state);
39 register_blockfile_line_count(engine, state);
40 register_blockfile_read_range(engine, state);
41 register_blockfile_read_state(engine, state);
42 register_blockfile_write_state(engine, state);
43 register_session_digest(engine, state);
44 register_session_archive_handler(engine, state);
45 register_session_restore_handler(engine, state);
46 register_session_export_handler(engine, state);
47}
48
49fn register_agent_process_list(engine: &Arc<WshRpcEngine>, state: &AppState) {
54 let process_tracker = state.process_tracker.clone();
55 engine.register_handler(
56 COMMAND_AGENT_PROCESS_LIST,
57 Box::new(move |data, _ctx| {
58 let process_tracker = process_tracker.clone();
59 Box::pin(async move {
60 let cmd: AgentProcessListCommand = serde_json::from_value(data)
61 .map_err(|e| format!("agent.process-list: {e}"))?;
62 let members = process_tracker.list_block(&cmd.block_id);
63 let confidence = match process_tracker.confidence_of(&cmd.block_id) {
64 crate::backend::process_tracker::TrackingConfidence::High => "high",
65 crate::backend::process_tracker::TrackingConfidence::BestEffort => "best_effort",
66 crate::backend::process_tracker::TrackingConfidence::None => "none",
67 };
68 let processes: Vec<AgentProcessInfo> = members
69 .into_iter()
70 .map(|m| AgentProcessInfo {
71 pid: m.pid,
72 command: m.command,
73 rss_bytes: m.rss_bytes,
74 started_at_ms: m.started_at_ms,
75 })
76 .collect();
77 Ok(Some(serde_json::to_value(&AgentProcessListResult {
78 block_id: cmd.block_id,
79 confidence: confidence.to_string(),
80 processes,
81 }).unwrap()))
82 })
83 }),
84 );
85}
86
87fn register_agent_tracked_blocks(engine: &Arc<WshRpcEngine>, state: &AppState) {
88 let process_tracker = state.process_tracker.clone();
89 engine.register_handler(
90 COMMAND_AGENT_TRACKED_BLOCKS,
91 Box::new(move |_data, _ctx| {
92 let process_tracker = process_tracker.clone();
93 Box::pin(async move {
94 Ok(Some(serde_json::to_value(&AgentTrackedBlocksResult {
95 block_ids: process_tracker.list_all_blocks(),
96 }).unwrap()))
97 })
98 }),
99 );
100}
101
102fn register_agent_kill_process(engine: &Arc<WshRpcEngine>, state: &AppState) {
103 let process_tracker = state.process_tracker.clone();
104 engine.register_handler(
105 COMMAND_AGENT_KILL_PROCESS,
106 Box::new(move |data, _ctx| {
107 let process_tracker = process_tracker.clone();
108 Box::pin(async move {
109 let cmd: AgentKillProcessCommand = serde_json::from_value(data)
110 .map_err(|e| format!("agent.kill-process: {e}"))?;
111 tracing::info!(
112 block_id = %cmd.block_id,
113 pid = cmd.pid,
114 "agent.kill-process"
115 );
116 let ok = process_tracker.kill_pid(&cmd.block_id, cmd.pid);
117 Ok(Some(serde_json::to_value(&AgentKillResult { ok }).unwrap()))
118 })
119 }),
120 );
121}
122
123fn register_agent_kill_tree(engine: &Arc<WshRpcEngine>, state: &AppState) {
124 let process_tracker = state.process_tracker.clone();
125 engine.register_handler(
126 COMMAND_AGENT_KILL_TREE,
127 Box::new(move |data, _ctx| {
128 let process_tracker = process_tracker.clone();
129 Box::pin(async move {
130 let cmd: AgentKillTreeCommand = serde_json::from_value(data)
131 .map_err(|e| format!("agent.kill-tree: {e}"))?;
132 tracing::info!(block_id = %cmd.block_id, "agent.kill-tree");
133 let ok = process_tracker.kill_tree(&cmd.block_id);
134 Ok(Some(serde_json::to_value(&AgentKillResult { ok }).unwrap()))
135 })
136 }),
137 );
138}
139
140fn register_agent_open(engine: &Arc<WshRpcEngine>, state: &AppState) {
145 let wstore = state.wstore.clone();
146 let broker = state.broker.clone();
147 let event_bus = state.event_bus.clone();
148 let filestore = state.filestore.clone();
149
150 engine.register_handler(
151 COMMAND_AGENT_OPEN,
152 Box::new(move |data, _ctx| {
153 let wstore = wstore.clone();
154 let broker = broker.clone();
155 let event_bus = event_bus.clone();
156 let filestore = filestore.clone();
157 Box::pin(async move {
158 let cmd: CommandAgentOpenData = serde_json::from_value(data)
159 .map_err(|e| format!("agent.open: {e}"))?;
160
161 tracing::info!(agent_id = %cmd.agent_id, "agent.open");
162
163 let agents = wstore.agent_def_list()
165 .map_err(|e| format!("agent.open: {e}"))?;
166 let agent = agents.iter()
167 .find(|a| a.id == cmd.agent_id || a.name.eq_ignore_ascii_case(&cmd.agent_id))
168 .ok_or_else(|| format!("AGENT_NOT_FOUND: no agent definition with id '{}'", cmd.agent_id))?
169 .clone();
170
171 let provider = providers::get_provider(&agent.provider)
173 .ok_or_else(|| format!("INVALID_PROVIDER: unknown provider '{}'", agent.provider))?;
174
175 let tab_id = resolve_tab_id(&wstore, cmd.tab_id.as_deref())?;
177
178 if let Some(existing) = find_agent_block(&wstore, &tab_id, &agent.id)? {
181 if blockcontroller::get_controller(&existing.oid).is_none() {
184 let controller_type = provider.controller_type_str();
185 let mut meta_update = obj::MetaMapType::new();
187 meta_update.insert("controller".to_string(), json!(controller_type));
188 meta_update.insert("agentProvider".to_string(), json!(&agent.provider));
189 let _ = crate::server::service::update_object_meta(
190 &wstore, &format!("block:{}", existing.oid), &meta_update,
191 );
192 let block_for_resync = wstore.must_get::<Block>(&existing.oid)
194 .map_err(|e| format!("agent.open: reload block: {e}"))?;
195 let _ = blockcontroller::resync_controller(
196 &block_for_resync, &tab_id, None, true,
197 Some(broker.clone()), Some(event_bus.clone()), Some(wstore.clone()),
198 Some(filestore.clone()),
199 );
200 }
201 let status = blockcontroller::get_block_controller_status(&existing.oid)
202 .map(|s| s.shellprocstatus)
203 .unwrap_or_else(|| "init".to_string());
204 return Ok(Some(serde_json::to_value(&AgentOpenResult {
205 block_id: existing.oid,
206 tab_id,
207 agent_id: cmd.agent_id,
208 provider: agent.provider,
209 controller_type: provider.controller_type_str().to_string(),
210 status,
211 created: false,
212 }).unwrap()));
213 }
214
215 let version = env!("CARGO_PKG_VERSION");
217 let home = std::env::var("HOME")
218 .or_else(|_| std::env::var("USERPROFILE"))
219 .map_err(|_| "cannot determine home directory".to_string())?;
220 let provider_dir = format!("{}/.agentmux/{}/cli/{}", home, version, provider.id);
221 let npm_bin = if cfg!(windows) {
222 format!("{}/node_modules/.bin/{}.cmd", provider_dir, provider.cli_command)
223 } else {
224 format!("{}/node_modules/.bin/{}", provider_dir, provider.cli_command)
225 };
226 let mut resolved_cli_path = npm_bin.clone();
227 if !std::path::Path::new(&resolved_cli_path).exists() {
228 if provider.npm_package.is_empty() {
232 if let Some(path) = resolve_cli_on_path(provider.cli_command).await {
233 resolved_cli_path = path;
234 }
235 }
236 if !std::path::Path::new(&resolved_cli_path).exists() {
237 return Err(format!(
238 "CLI_NOT_AVAILABLE: {} not installed at {}. Open an agent pane in the UI to trigger installation.",
239 provider.cli_command, npm_bin
240 ));
241 }
242 }
243
244 let controller_type = provider.controller_type_str();
246 let is_persistent = controller_type == "persistent";
247 let cli_args: Vec<String> = if is_persistent {
248 provider.persistent_launch_args
249 .unwrap_or(provider.launch_args)
250 .iter().map(|s| s.to_string()).collect()
251 } else {
252 provider.launch_args.iter().map(|s| s.to_string()).collect()
253 };
254
255 let agent_slug = agent.name.to_lowercase()
256 .chars().map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
257 .collect::<String>();
258 let work_dir = if agent.working_directory.is_empty() {
259 format!("~/.agentmux/agents/{}", agent_slug)
260 } else {
261 agent.working_directory.clone()
262 };
263
264 let mut env_vars = serde_json::Map::new();
266 for key in provider.unset_env {
267 env_vars.insert(key.to_string(), json!(""));
268 }
269 let config_home = std::env::var("AGENTMUX_CONFIG_HOME")
272 .unwrap_or_else(|_| format!("{}/.agentmux/config", home));
273 let auth_dir = format!("{}/auth/{}", config_home, provider.auth_dir_name);
275 let _ = std::fs::create_dir_all(&auth_dir);
276 env_vars.insert(provider.auth_config_dir_env_var.to_string(), json!(auth_dir));
277 for (k, v) in provider.auth_extra_env {
278 env_vars.insert(k.to_string(), json!(v));
279 }
280 env_vars.insert("GH_CONFIG_DIR".to_string(), json!(format!("{}/gh-{}", config_home, agent_slug)));
282 env_vars.insert("AGENTMUX_AGENT_ID".to_string(), json!(&agent.name));
283 if !is_persistent {
285 env_vars.insert("CLAUDE_CODE_EXIT_AFTER_STOP_DELAY".to_string(), json!("30000"));
286 }
287
288 let mut meta = MetaMapType::new();
289 meta.insert("view".to_string(), json!("agent"));
290 meta.insert("agentId".to_string(), json!(&agent.id));
291 meta.insert("agentProvider".to_string(), json!(&agent.provider));
292 meta.insert("agentName".to_string(), json!(&agent.name));
293 meta.insert("agentIcon".to_string(), json!(if agent.icon.is_empty() { "sparkles" } else { &agent.icon }));
294 meta.insert("agentMode".to_string(), json!(if agent.agent_type.is_empty() { "host" } else { &agent.agent_type }));
295 let output_format = match provider.id {
297 "claude" => "claude-stream-json",
298 "codex" => "codex-json",
299 "gemini" => "gemini-json",
300 "kimi" => "kimi-stream-json",
301 _ => "claude-stream-json",
302 };
303 meta.insert("agentOutputFormat".to_string(), json!(output_format));
304 meta.insert("controller".to_string(), json!(controller_type));
305 meta.insert("cmd".to_string(), json!(&resolved_cli_path));
306 meta.insert("cmd:args".to_string(), json!(cli_args));
307 meta.insert("cmd:cwd".to_string(), json!(&work_dir));
308 meta.insert("cmd:env".to_string(), serde_json::Value::Object(env_vars));
309 meta.insert("agent:resume_flag".to_string(), json!(provider.resume_flag.unwrap_or("")));
310 meta.insert("agent:session_id_field".to_string(), json!(provider.session_id_field));
311
312 let block = crate::backend::wcore::create_block(&wstore, &tab_id, meta)
314 .map_err(|e| format!("agent.open: create_block: {e}"))?;
315 let block_id = block.oid.clone();
316
317 {
322 let tab: Tab = wstore.must_get(&tab_id)
323 .map_err(|e| format!("agent.open: reload tab: {e}"))?;
324 if let Ok(mut layout) = wstore.must_get::<obj::LayoutState>(&tab.layoutstate) {
325 let mut actions = layout.pendingbackendactions.take().unwrap_or_default();
326 actions.push(obj::LayoutActionData {
327 actiontype: "insert".to_string(),
328 actionid: uuid::Uuid::new_v4().to_string(),
329 blockid: block_id.clone(),
330 nodesize: None,
331 indexarr: None,
332 focused: true,
333 magnified: false,
334 ephemeral: false,
335 targetblockid: String::new(),
336 position: String::new(),
337 });
338 layout.pendingbackendactions = Some(actions);
339 let _ = wstore.update(&mut layout);
340 }
341 }
342
343 tracing::info!(
344 block_id = %block_id,
345 agent_id = %cmd.agent_id,
346 provider = %agent.provider,
347 controller_type = %controller_type,
348 "agent.open: block created + layout updated"
349 );
350
351 write_agent_config_files(&wstore, &agent, &agent_slug, &work_dir)?;
357
358 let block_for_resync = wstore.must_get::<Block>(&block_id)
360 .map_err(|e| format!("agent.open: reload block: {e}"))?;
361 blockcontroller::resync_controller(
362 &block_for_resync,
363 &tab_id,
364 None,
365 true,
366 Some(broker.clone()),
367 Some(event_bus.clone()),
368 Some(wstore.clone()),
369 Some(filestore.clone()),
370 )?;
371
372 {
374 let mut updates = Vec::new();
375 if let Ok(updated_block) = wstore.must_get::<Block>(&block_id) {
376 updates.push(obj::WaveObjUpdate {
377 updatetype: "update".into(),
378 otype: "block".into(),
379 oid: block_id.clone(),
380 obj: Some(obj::wave_obj_to_value(&updated_block)),
381 });
382 }
383 if let Ok(updated_tab) = wstore.must_get::<Tab>(&tab_id) {
384 updates.push(obj::WaveObjUpdate {
385 updatetype: "update".into(),
386 otype: "tab".into(),
387 oid: tab_id.clone(),
388 obj: Some(obj::wave_obj_to_value(&updated_tab)),
389 });
390 if let Ok(updated_layout) = wstore.must_get::<obj::LayoutState>(&updated_tab.layoutstate) {
391 updates.push(obj::WaveObjUpdate {
392 updatetype: "update".into(),
393 otype: "layout".into(),
394 oid: updated_tab.layoutstate.clone(),
395 obj: Some(obj::wave_obj_to_value(&updated_layout)),
396 });
397 }
398 }
399 for update in &updates {
400 let oref = format!("{}:{}", update.otype, update.oid);
401 if let Ok(data) = serde_json::to_value(update) {
402 event_bus.broadcast_event(
403 &crate::backend::eventbus::WSEventType {
404 eventtype: "waveobj:update".to_string(),
405 oref,
406 data: Some(data),
407 },
408 );
409 }
410 }
411 }
412
413 Ok(Some(serde_json::to_value(&AgentOpenResult {
414 block_id,
415 tab_id,
416 agent_id: cmd.agent_id,
417 provider: agent.provider,
418 controller_type: controller_type.to_string(),
419 status: "init".to_string(),
420 created: true,
421 }).unwrap()))
422 })
423 }),
424 );
425}
426
427fn register_agent_send(engine: &Arc<WshRpcEngine>, state: &AppState) {
432 let wstore = state.wstore.clone();
433 let broker = state.broker.clone();
434
435 engine.register_handler(
436 COMMAND_AGENT_SEND,
437 Box::new(move |data, _ctx| {
438 let wstore = wstore.clone();
439 let broker = broker.clone();
440 Box::pin(async move {
441 let cmd: CommandAgentSendData = serde_json::from_value(data)
442 .map_err(|e| format!("agent.send: {e}"))?;
443
444 tracing::info!(block_id = %cmd.block_id, "agent.send");
445
446 let ctrl = blockcontroller::get_controller(&cmd.block_id)
447 .ok_or_else(|| format!("NOT_RUNNING: no controller for block {}", cmd.block_id))?;
448
449 let block: Block = wstore
451 .get(&cmd.block_id)
452 .map_err(|e| format!("agent.send: {e}"))?
453 .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
454
455 let cli_command = obj::meta_get_string(&block.meta, "cmd", "claude");
456 let cli_args: Vec<String> = match block.meta.get("cmd:args") {
457 Some(serde_json::Value::Array(arr)) => arr
458 .iter()
459 .filter_map(|v| v.as_str().map(|s| s.to_string()))
460 .collect(),
461 _ => vec![],
462 };
463 let working_dir = obj::meta_get_string(&block.meta, "cmd:cwd", "");
464 let mut env_vars: std::collections::HashMap<String, String> = match block.meta.get("cmd:env") {
465 Some(serde_json::Value::Object(obj)) => obj
466 .iter()
467 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
468 .collect(),
469 _ => std::collections::HashMap::new(),
470 };
471 crate::identity::resolver::inject_identity_env_with_broker(
478 wstore.clone(),
479 Some(broker.clone()),
480 &cmd.block_id,
481 &mut env_vars,
482 );
483 let session_id_field = obj::meta_get_string(
484 &block.meta, "agent:session_id_field", "session_id",
485 );
486
487 let mut session_id = None;
489 if let Some(persistent_ctrl) = ctrl
490 .as_any()
491 .downcast_ref::<blockcontroller::persistent::PersistentSubprocessController>()
492 {
493 let config = blockcontroller::persistent::PersistentSpawnConfig {
494 cli_command,
495 cli_args,
496 working_dir,
497 env_vars,
498 session_id_field,
499 };
500 persistent_ctrl.send_message(cmd.message, config)?;
501 session_id = persistent_ctrl.session_id();
502 } else if let Some(subprocess_ctrl) = ctrl
503 .as_any()
504 .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
505 {
506 let resume_flag = obj::meta_get_string(
507 &block.meta, "agent:resume_flag", "--resume",
508 );
509 let persisted_session_id = obj::meta_get_string(
514 &block.meta, "agent:sessionid", "",
515 );
516 let config = blockcontroller::subprocess::SubprocessSpawnConfig {
517 cli_command,
518 cli_args,
519 working_dir,
520 env_vars,
521 message: cmd.message,
522 resume_flag,
523 session_id_field,
524 message_id: None,
525 session_id: if persisted_session_id.is_empty() {
526 None
527 } else {
528 Some(persisted_session_id)
529 },
530 };
531 subprocess_ctrl.spawn_turn(config)?;
532 } else {
533 return Err("NOT_RUNNING: controller type not supported".to_string());
534 }
535
536 Ok(Some(serde_json::to_value(&AgentSendResult {
537 block_id: cmd.block_id,
538 status: "running".to_string(),
539 session_id,
540 }).unwrap()))
541 })
542 }),
543 );
544}
545
546fn register_agent_stop(engine: &Arc<WshRpcEngine>, _state: &AppState) {
551 engine.register_handler(
552 COMMAND_AGENT_STOP_API,
553 Box::new(|data, _ctx| {
554 Box::pin(async move {
555 let cmd: CommandAgentStopApiData = serde_json::from_value(data)
556 .map_err(|e| format!("agent.stop: {e}"))?;
557
558 tracing::info!(block_id = %cmd.block_id, signal = ?cmd.signal, "agent.stop");
559
560 let ctrl = blockcontroller::get_controller(&cmd.block_id)
561 .ok_or_else(|| format!("NOT_RUNNING: no controller for block {}", cmd.block_id))?;
562
563 let force = matches!(cmd.signal.as_deref(), Some("SIGKILL") | Some("SIGTERM"));
564 ctrl.stop(!force, blockcontroller::STATUS_DONE)?;
565
566 let exit_code = blockcontroller::get_block_controller_status(&cmd.block_id)
567 .map(|s| s.shellprocexitcode);
568
569 Ok(Some(serde_json::to_value(&AgentStopResult {
570 block_id: cmd.block_id,
571 status: "done".to_string(),
572 exit_code,
573 }).unwrap()))
574 })
575 }),
576 );
577}
578
579fn register_agent_status(engine: &Arc<WshRpcEngine>, state: &AppState) {
584 let wstore = state.wstore.clone();
585
586 engine.register_handler(
587 COMMAND_AGENT_STATUS,
588 Box::new(move |data, _ctx| {
589 let wstore = wstore.clone();
590 Box::pin(async move {
591 let cmd: CommandAgentStatusData = serde_json::from_value(data)
592 .map_err(|e| format!("agent.status: {e}"))?;
593
594 let block: Block = wstore
595 .get(&cmd.block_id)
596 .map_err(|e| format!("agent.status: {e}"))?
597 .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
598
599 let agent_id = obj::meta_get_string(&block.meta, "agentId", "");
600 let provider = obj::meta_get_string(&block.meta, "agentProvider", "");
601 let controller_type = obj::meta_get_string(&block.meta, "controller", "");
602
603 let runtime = blockcontroller::get_block_controller_status(&cmd.block_id);
604 let status = runtime.as_ref()
605 .map(|s| s.shellprocstatus.clone())
606 .unwrap_or_else(|| "init".to_string());
607 let exit_code = runtime.as_ref()
608 .and_then(|s| if s.shellprocstatus == "done" { Some(s.shellprocexitcode) } else { None });
609 let pid = None; let session_id = block.meta.get("agent:sessionid")
613 .and_then(|v| v.as_str())
614 .map(|s| s.to_string());
615
616 Ok(Some(serde_json::to_value(&AgentStatusResult {
617 block_id: cmd.block_id,
618 agent_id,
619 provider,
620 controller_type,
621 status,
622 session_id,
623 pid,
624 exit_code,
625 }).unwrap()))
626 })
627 }),
628 );
629}
630
631fn register_agent_list(engine: &Arc<WshRpcEngine>, state: &AppState) {
636 let wstore = state.wstore.clone();
637
638 engine.register_handler(
639 COMMAND_AGENT_LIST,
640 Box::new(move |_data, _ctx| {
641 let wstore = wstore.clone();
642 Box::pin(async move {
643 let tabs: Vec<Tab> = wstore.get_all::<Tab>()
644 .map_err(|e| format!("agent.list: {e}"))?;
645
646 let mut agents = Vec::new();
647 for tab in &tabs {
648 for block_id in &tab.blockids {
649 if let Ok(Some(block)) = wstore.get::<Block>(block_id) {
650 let agent_id = obj::meta_get_string(&block.meta, "agentId", "");
651 if agent_id.is_empty() {
652 continue;
653 }
654 let provider = obj::meta_get_string(&block.meta, "agentProvider", "");
655 let status = blockcontroller::get_block_controller_status(block_id)
656 .map(|s| s.shellprocstatus)
657 .unwrap_or_else(|| "init".to_string());
658 let session_id = block.meta.get("agent:sessionid")
659 .and_then(|v| v.as_str())
660 .map(|s| s.to_string());
661
662 agents.push(AgentListEntry {
663 block_id: block_id.clone(),
664 tab_id: tab.oid.clone(),
665 agent_id,
666 provider,
667 status,
668 session_id,
669 });
670 }
671 }
672 }
673
674 Ok(Some(serde_json::to_value(&AgentListResult { agents }).unwrap()))
675 })
676 }),
677 );
678}
679
680fn register_agent_output(engine: &Arc<WshRpcEngine>, state: &AppState) {
685 let broker = state.broker.clone();
686
687 engine.register_handler(
688 COMMAND_AGENT_OUTPUT,
689 Box::new(move |data, _ctx| {
690 let broker = broker.clone();
691 Box::pin(async move {
692 let cmd: CommandAgentOutputData = serde_json::from_value(data)
693 .map_err(|e| format!("agent.output: {e}"))?;
694
695 let scope = format!("block:{}", cmd.block_id);
696 let max = cmd.max_lines.unwrap_or(1000);
697 let after = cmd.after_line.unwrap_or(0);
698
699 let mut all_lines: Vec<String> = Vec::new();
701 {
702 let events = broker.read_event_history(
703 crate::backend::wps::EVENT_BLOCK_FILE,
704 &scope,
705 max + after, );
707 for event in events {
708 if let Some(ref data) = event.data {
709 if let Some(data64) = data.get("data64").and_then(|v| v.as_str()) {
710 if let Ok(bytes) = base64::engine::general_purpose::STANDARD
711 .decode(data64)
712 {
713 let text = String::from_utf8_lossy(&bytes);
714 for line in text.lines() {
715 if !line.trim().is_empty() {
716 all_lines.push(line.to_string());
717 }
718 }
719 }
720 }
721 }
722 }
723 }
724
725 let total = all_lines.len();
726 let lines: Vec<String> = all_lines.into_iter()
727 .skip(after)
728 .take(max)
729 .collect();
730 let has_more = after + lines.len() < total;
731
732 Ok(Some(serde_json::to_value(&AgentOutputResult {
733 block_id: cmd.block_id,
734 lines,
735 total_lines: total,
736 has_more,
737 }).unwrap()))
738 })
739 }),
740 );
741}
742
743fn register_pane_open(engine: &Arc<WshRpcEngine>, state: &AppState) {
748 let wstore = state.wstore.clone();
749 let event_bus = state.event_bus.clone();
750
751 engine.register_handler(
752 COMMAND_PANE_OPEN,
753 Box::new(move |data, _ctx| {
754 let wstore = wstore.clone();
755 let event_bus = event_bus.clone();
756 Box::pin(async move {
757 let cmd: CommandPaneOpenData = serde_json::from_value(data)
758 .map_err(|e| format!("pane.open: {e}"))?;
759
760 tracing::info!(view = %cmd.view, "pane.open");
761
762 let meta = build_pane_meta(&cmd)?;
764
765 let tab_id = resolve_tab_id(&wstore, cmd.tab_id.as_deref())?;
767
768 let block = crate::backend::wcore::create_block(&wstore, &tab_id, meta)
770 .map_err(|e| format!("pane.open: create_block: {e}"))?;
771 let block_id = block.oid.clone();
772
773 let (actiontype, targetblockid, position) = resolve_placement(
775 cmd.split_direction.as_deref(),
776 cmd.split_reference_block_id.as_deref(),
777 );
778 let focused = cmd.focus.unwrap_or(true);
779
780 {
781 let tab: Tab = wstore.must_get(&tab_id)
782 .map_err(|e| format!("pane.open: reload tab: {e}"))?;
783 if let Ok(mut layout) = wstore.must_get::<obj::LayoutState>(&tab.layoutstate) {
784 let mut actions = layout.pendingbackendactions.take().unwrap_or_default();
785 actions.push(obj::LayoutActionData {
786 actiontype,
787 actionid: uuid::Uuid::new_v4().to_string(),
788 blockid: block_id.clone(),
789 nodesize: None,
790 indexarr: None,
791 focused,
792 magnified: false,
793 ephemeral: false,
794 targetblockid,
795 position,
796 });
797 layout.pendingbackendactions = Some(actions);
798 let _ = wstore.update(&mut layout);
799 }
800 }
801
802 tracing::info!(
803 block_id = %block_id,
804 view = %cmd.view,
805 "pane.open: block created + layout updated"
806 );
807
808 {
810 let mut updates = Vec::new();
811 if let Ok(updated_block) = wstore.must_get::<Block>(&block_id) {
812 updates.push(obj::WaveObjUpdate {
813 updatetype: "update".into(),
814 otype: "block".into(),
815 oid: block_id.clone(),
816 obj: Some(obj::wave_obj_to_value(&updated_block)),
817 });
818 }
819 if let Ok(updated_tab) = wstore.must_get::<Tab>(&tab_id) {
820 updates.push(obj::WaveObjUpdate {
821 updatetype: "update".into(),
822 otype: "tab".into(),
823 oid: tab_id.clone(),
824 obj: Some(obj::wave_obj_to_value(&updated_tab)),
825 });
826 if let Ok(updated_layout) = wstore.must_get::<obj::LayoutState>(&updated_tab.layoutstate) {
827 updates.push(obj::WaveObjUpdate {
828 updatetype: "update".into(),
829 otype: "layout".into(),
830 oid: updated_tab.layoutstate.clone(),
831 obj: Some(obj::wave_obj_to_value(&updated_layout)),
832 });
833 }
834 }
835 for update in &updates {
836 let oref = format!("{}:{}", update.otype, update.oid);
837 if let Ok(data) = serde_json::to_value(update) {
838 event_bus.broadcast_event(
839 &crate::backend::eventbus::WSEventType {
840 eventtype: "waveobj:update".to_string(),
841 oref,
842 data: Some(data),
843 },
844 );
845 }
846 }
847 }
848
849 Ok(Some(serde_json::to_value(&PaneOpenResult {
850 block_id,
851 tab_id,
852 view: cmd.view,
853 created: true,
854 }).unwrap()))
855 })
856 }),
857 );
858}
859
860fn build_pane_meta(cmd: &CommandPaneOpenData) -> Result<MetaMapType, String> {
862 let mut meta = MetaMapType::new();
863
864 match cmd.view.as_str() {
865 "editor" => {
866 let file = cmd.file.as_deref().filter(|s| !s.is_empty())
867 .ok_or_else(|| "MISSING_ARG: view=editor requires 'file'".to_string())?;
868 meta.insert("view".to_string(), json!("editor"));
869 meta.insert("file".to_string(), json!(file));
870 }
871 "term" => {
872 meta.insert("view".to_string(), json!("term"));
873 meta.insert("controller".to_string(), json!("shell"));
874 if let Some(cwd) = cmd.cwd.as_deref().filter(|s| !s.is_empty()) {
875 meta.insert("cmd:cwd".to_string(), json!(cwd));
876 }
877 }
878 "browser" => {
879 let url = cmd.url.as_deref().filter(|s| !s.is_empty())
880 .ok_or_else(|| "MISSING_ARG: view=browser requires 'url'".to_string())?;
881 meta.insert("view".to_string(), json!("browser"));
882 meta.insert("url".to_string(), json!(url));
883 }
884 "sysinfo" => {
885 meta.insert("view".to_string(), json!("sysinfo"));
886 }
887 "help" => {
888 meta.insert("view".to_string(), json!("help"));
889 }
890 other => {
891 return Err(format!(
892 "INVALID_VIEW: unsupported view '{other}' (expected editor/term/browser/sysinfo/help)"
893 ));
894 }
895 }
896
897 if let Some(title) = cmd.title.as_deref().filter(|s| !s.is_empty()) {
898 meta.insert("frame:title".to_string(), json!(title));
899 }
900
901 Ok(meta)
902}
903
904fn resolve_placement(
908 direction: Option<&str>,
909 reference: Option<&str>,
910) -> (String, String, String) {
911 let reference = match reference.filter(|s| !s.is_empty()) {
912 Some(r) => r,
913 None => return ("insert".to_string(), String::new(), String::new()),
914 };
915
916 let (actiontype, position) = match direction {
917 Some("right") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_HORIZONTAL, "after"),
918 Some("left") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_HORIZONTAL, "before"),
919 Some("down") | Some("below") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_VERTICAL, "after"),
920 Some("up") | Some("above") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_VERTICAL, "before"),
921 _ => return ("insert".to_string(), String::new(), String::new()),
922 };
923
924 (actiontype.to_string(), reference.to_string(), position.to_string())
925}
926
927fn register_blockfile_line_count(engine: &Arc<WshRpcEngine>, state: &AppState) {
932 let broker = state.broker.clone();
933 let wstore = state.wstore.clone();
934
935 engine.register_handler(
936 COMMAND_BLOCKFILE_LINE_COUNT,
937 Box::new(move |data, _ctx| {
938 let broker = broker.clone();
939 let wstore = wstore.clone();
940 Box::pin(async move {
941 let cmd: CommandBlockfileLineCountData = serde_json::from_value(data)
942 .map_err(|e| format!("blockfile:line_count: {e}"))?;
943
944 tracing::info!(block_id = %cmd.block_id, filename = %cmd.filename, "blockfile:line_count");
945
946 if cmd.filename == "output" {
955 if let Ok(Some(block)) = wstore.get::<Block>(&cmd.block_id) {
956 if let Some(count) = block.meta.get("session:line_count").and_then(|v| v.as_u64()) {
957 return Ok(Some(serde_json::to_value(
958 &BlockfileLineCountResult { count },
959 ).unwrap()));
960 }
961 }
962 }
963
964 let scope = format!("block:{}", cmd.block_id);
966 let events = broker.read_event_history(
967 crate::backend::wps::EVENT_BLOCK_FILE,
968 &scope,
969 usize::MAX, );
971
972 let mut count: u64 = 0;
973 for event in events {
974 if let Some(ref event_data) = event.data {
975 let ev_filename = event_data.get("filename")
976 .and_then(|v| v.as_str()).unwrap_or("");
977 if ev_filename != cmd.filename {
978 continue;
979 }
980 if let Some(data64) = event_data.get("data64").and_then(|v| v.as_str()) {
981 if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(data64) {
982 let text = String::from_utf8_lossy(&bytes);
983 for line in text.lines() {
984 if !line.trim().is_empty() {
985 count += 1;
986 }
987 }
988 }
989 }
990 }
991 }
992
993 Ok(Some(serde_json::to_value(&BlockfileLineCountResult { count }).unwrap()))
994 })
995 }),
996 );
997}
998
999fn register_blockfile_read_range(engine: &Arc<WshRpcEngine>, state: &AppState) {
1004 let broker = state.broker.clone();
1005 let filestore = state.filestore.clone();
1006
1007 engine.register_handler(
1008 COMMAND_BLOCKFILE_READ_RANGE,
1009 Box::new(move |data, _ctx| {
1010 let broker = broker.clone();
1011 let filestore = filestore.clone();
1012 Box::pin(async move {
1013 let cmd: CommandBlockfileReadRangeData = serde_json::from_value(data)
1014 .map_err(|e| format!("blockfile:read_range: {e}"))?;
1015
1016 tracing::info!(block_id = %cmd.block_id, filename = %cmd.filename, offset = cmd.offset, limit = cmd.limit, "blockfile:read_range");
1017
1018 let limit = cmd.limit.min(10_000) as usize;
1019 let offset = cmd.offset as usize;
1020 let end = offset.saturating_add(limit);
1021
1022 let filestore_lines = match filestore.stat(&cmd.block_id, &cmd.filename) {
1028 Ok(Some(ref wf)) if wf.size > 0 => {
1029 match filestore.read_file(&cmd.block_id, &cmd.filename) {
1030 Ok(Some(bytes)) => {
1031 let text = String::from_utf8_lossy(&bytes);
1032 let lines: Vec<String> = text.lines()
1033 .filter(|l| !l.trim().is_empty())
1034 .map(|l| l.to_string())
1035 .collect();
1036 Some(lines)
1037 }
1038 Ok(None) => None,
1039 Err(e) => {
1040 tracing::warn!(
1041 block_id = %cmd.block_id,
1042 filename = %cmd.filename,
1043 error = %e,
1044 "blockfile:read_range: filestore read failed, falling back to ring buffer"
1045 );
1046 None
1047 }
1048 }
1049 }
1050 Ok(_) => None, Err(e) => {
1052 tracing::warn!(
1053 block_id = %cmd.block_id,
1054 error = %e,
1055 "blockfile:read_range: filestore stat failed, falling back to ring buffer"
1056 );
1057 None
1058 }
1059 };
1060
1061 let all_lines = if let Some(lines) = filestore_lines {
1062 lines
1063 } else {
1064 let scope = format!("block:{}", cmd.block_id);
1068 let events = broker.read_event_history(
1069 crate::backend::wps::EVENT_BLOCK_FILE,
1070 &scope,
1071 usize::MAX, );
1073
1074 let mut lines: Vec<String> = Vec::new();
1075 for event in events {
1076 let Some(ref event_data) = event.data else { continue };
1077 let ev_filename = event_data.get("filename")
1078 .and_then(|v| v.as_str()).unwrap_or("");
1079 if ev_filename != cmd.filename {
1080 continue;
1081 }
1082 let Some(data64) = event_data.get("data64").and_then(|v| v.as_str()) else { continue };
1083 let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(data64) else { continue };
1084 let text = String::from_utf8_lossy(&bytes);
1085 for line in text.lines() {
1086 if !line.trim().is_empty() {
1087 lines.push(line.to_string());
1088 }
1089 }
1090 }
1091 lines
1092 };
1093
1094 let total = all_lines.len() as u64;
1095 let clamped_offset = offset.min(all_lines.len());
1096 let clamped_end = end.min(all_lines.len());
1097 let lines: Vec<String> = if clamped_offset >= clamped_end {
1098 Vec::new()
1099 } else {
1100 all_lines[clamped_offset..clamped_end].to_vec()
1101 };
1102
1103 Ok(Some(serde_json::to_value(&BlockfileReadRangeResult {
1104 lines,
1105 total,
1106 }).unwrap()))
1107 })
1108 }),
1109 );
1110}
1111
1112fn register_blockfile_read_state(engine: &Arc<WshRpcEngine>, state: &AppState) {
1118 let filestore = state.filestore.clone();
1119 engine.register_handler(
1120 COMMAND_BLOCKFILE_READ_STATE,
1121 Box::new(move |data, _ctx| {
1122 let filestore = filestore.clone();
1123 Box::pin(async move {
1124 let cmd: CommandBlockfileReadStateData = serde_json::from_value(data)
1125 .map_err(|e| format!("blockfile:read_state: {e}"))?;
1126 if cmd.filename.contains('/') || cmd.filename.contains('\\') || cmd.filename.contains("..") {
1127 return Err("blockfile:read_state: filename must not contain path separators".to_string());
1128 }
1129 tracing::debug!(block_id = %cmd.block_id, filename = %cmd.filename, "blockfile:read_state");
1130
1131 let content = match filestore.read_file(&cmd.block_id, &cmd.filename) {
1132 Ok(Some(bytes)) => Some(String::from_utf8_lossy(&bytes).into_owned()),
1133 Ok(None) => None,
1134 Err(e) => {
1135 if matches!(e, crate::backend::storage::StoreError::NotFound) {
1137 None
1138 } else {
1139 tracing::warn!(block_id = %cmd.block_id, error = %e, "blockfile:read_state: read failed");
1140 None
1141 }
1142 }
1143 };
1144
1145 Ok(Some(serde_json::to_value(&BlockfileReadStateResult { content }).unwrap()))
1146 })
1147 }),
1148 );
1149}
1150
1151fn register_blockfile_write_state(engine: &Arc<WshRpcEngine>, state: &AppState) {
1157 let filestore = state.filestore.clone();
1158 engine.register_handler(
1159 COMMAND_BLOCKFILE_WRITE_STATE,
1160 Box::new(move |data, _ctx| {
1161 let filestore = filestore.clone();
1162 Box::pin(async move {
1163 let cmd: CommandBlockfileWriteStateData = serde_json::from_value(data)
1164 .map_err(|e| format!("blockfile:write_state: {e}"))?;
1165 if cmd.filename.contains('/') || cmd.filename.contains('\\') || cmd.filename.contains("..") {
1166 return Err("blockfile:write_state: filename must not contain path separators".to_string());
1167 }
1168 let bytes = cmd.content.as_bytes();
1169 let bytes_written = bytes.len() as u64;
1170 tracing::debug!(block_id = %cmd.block_id, filename = %cmd.filename, bytes = bytes_written, "blockfile:write_state");
1171
1172 use crate::backend::storage::filestore::{FileMeta, FileOpts};
1176 use crate::backend::storage::StoreError;
1177 match filestore.write_file(&cmd.block_id, &cmd.filename, bytes) {
1178 Ok(()) => {}
1179 Err(StoreError::NotFound) => {
1180 filestore
1181 .make_file(&cmd.block_id, &cmd.filename, FileMeta::default(), FileOpts::default())
1182 .map_err(|e| format!("blockfile:write_state: make_file: {e}"))?;
1183 filestore
1184 .write_file(&cmd.block_id, &cmd.filename, bytes)
1185 .map_err(|e| format!("blockfile:write_state: write_file: {e}"))?;
1186 }
1187 Err(e) => return Err(format!("blockfile:write_state: {e}")),
1188 }
1189
1190 Ok(Some(serde_json::to_value(&BlockfileWriteStateResult { bytes_written }).unwrap()))
1191 })
1192 }),
1193 );
1194}
1195
1196fn register_session_archive_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1201 let wstore = state.wstore.clone();
1202 let filestore = state.filestore.clone();
1203
1204 engine.register_handler(
1205 COMMAND_SESSION_ARCHIVE,
1206 Box::new(move |data, _ctx| {
1207 let wstore = wstore.clone();
1208 let filestore = filestore.clone();
1209 Box::pin(async move {
1210 let cmd: CommandSessionArchiveData = serde_json::from_value(data)
1211 .map_err(|e| format!("session:archive: {e}"))?;
1212
1213 tracing::info!(block_id = %cmd.block_id, "session:archive");
1214
1215 let archive_dir = session_archive::default_archive_dir()
1216 .ok_or_else(|| "cannot determine home directory".to_string())?;
1217
1218 let (archived_bytes, archived_at) = session_archive::archive_session_output(
1219 &wstore,
1220 &filestore,
1221 &cmd.block_id,
1222 &archive_dir,
1223 )?;
1224
1225 Ok(Some(serde_json::to_value(&SessionArchiveResult {
1226 block_id: cmd.block_id,
1227 archived_bytes,
1228 archived_at,
1229 }).unwrap()))
1230 })
1231 }),
1232 );
1233}
1234
1235fn register_session_restore_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1240 let wstore = state.wstore.clone();
1241 let filestore = state.filestore.clone();
1242
1243 engine.register_handler(
1244 COMMAND_SESSION_RESTORE,
1245 Box::new(move |data, _ctx| {
1246 let wstore = wstore.clone();
1247 let filestore = filestore.clone();
1248 Box::pin(async move {
1249 let cmd: CommandSessionRestoreData = serde_json::from_value(data)
1250 .map_err(|e| format!("session:restore: {e}"))?;
1251
1252 tracing::info!(block_id = %cmd.block_id, "session:restore");
1253
1254 let restored_bytes = session_archive::restore_session_output(
1255 &wstore,
1256 &filestore,
1257 &cmd.block_id,
1258 )?;
1259
1260 Ok(Some(serde_json::to_value(&SessionRestoreResult {
1261 block_id: cmd.block_id,
1262 restored_bytes,
1263 }).unwrap()))
1264 })
1265 }),
1266 );
1267}
1268
1269fn register_session_export_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1274 let wstore = state.wstore.clone();
1275 let filestore = state.filestore.clone();
1276
1277 engine.register_handler(
1278 COMMAND_SESSION_EXPORT,
1279 Box::new(move |data, _ctx| {
1280 let wstore = wstore.clone();
1281 let filestore = filestore.clone();
1282 Box::pin(async move {
1283 let cmd: CommandSessionExportData = serde_json::from_value(data)
1284 .map_err(|e| format!("session:export: {e}"))?;
1285
1286 tracing::info!(block_id = %cmd.block_id, "session:export");
1287
1288 let (raw_bytes, line_count) = session_archive::read_session_output(
1289 &wstore,
1290 &filestore,
1291 &cmd.block_id,
1292 )?;
1293
1294 let byte_count = raw_bytes.len() as u64;
1295 let content = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
1296
1297 Ok(Some(serde_json::to_value(&SessionExportResult {
1298 content,
1299 line_count,
1300 byte_count,
1301 }).unwrap()))
1302 })
1303 }),
1304 );
1305}
1306
1307fn register_session_digest(engine: &Arc<WshRpcEngine>, state: &AppState) {
1312 let wstore = state.wstore.clone();
1313 let filestore = state.filestore.clone();
1314 let broker = state.broker.clone();
1315
1316 engine.register_handler(
1317 COMMAND_SESSION_DIGEST,
1318 Box::new(move |data, _ctx| {
1319 let wstore = wstore.clone();
1320 let filestore = filestore.clone();
1321 let broker = broker.clone();
1322 Box::pin(async move {
1323 let cmd: CommandSessionDigestData = serde_json::from_value(data)
1324 .map_err(|e| format!("session:digest: {e}"))?;
1325
1326 tracing::info!(block_id = %cmd.block_id, force = ?cmd.force, "session:digest");
1327
1328 let force = cmd.force.unwrap_or(false);
1329
1330 let block: Block = wstore
1332 .get(&cmd.block_id)
1333 .map_err(|e| format!("session:digest: {e}"))?
1334 .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
1335
1336 let cached_summary = block.meta.get("session:digest_summary")
1338 .and_then(|v| v.as_str())
1339 .map(|s| s.to_string());
1340 let cached_generated_at = block.meta.get("session:digest_generated_at")
1341 .and_then(|v| v.as_i64())
1342 .unwrap_or(0);
1343 let digest_last_line_count = block.meta.get("session:digest_last_line_count")
1344 .and_then(|v| v.as_u64())
1345 .unwrap_or(0);
1346
1347 let current_line_count = block.meta.get("session:line_count")
1349 .and_then(|v| v.as_u64())
1350 .unwrap_or(0);
1351
1352 let lines_since_digest = current_line_count.saturating_sub(digest_last_line_count);
1355 if !force && cached_summary.is_some() && lines_since_digest < 20 {
1356 return Ok(Some(serde_json::to_value(&SessionDigestResult {
1357 summary: cached_summary.unwrap(),
1358 generated_at: cached_generated_at,
1359 cached: true,
1360 }).unwrap()));
1361 }
1362
1363 let all_lines: Vec<String> = {
1367 let filestore_lines = match filestore.stat(&cmd.block_id, "output") {
1368 Ok(Some(ref wf)) if wf.size > 0 => {
1369 match filestore.read_file(&cmd.block_id, "output") {
1370 Ok(Some(bytes)) => {
1371 let text = String::from_utf8_lossy(&bytes);
1372 let lines: Vec<String> = text.lines()
1373 .filter(|l| !l.trim().is_empty())
1374 .map(|l| l.to_string())
1375 .collect();
1376 Some(lines)
1377 }
1378 _ => None,
1379 }
1380 }
1381 _ => None,
1382 };
1383
1384 if let Some(lines) = filestore_lines {
1385 lines
1386 } else {
1387 let scope = format!("block:{}", cmd.block_id);
1389 let events = broker.read_event_history(
1390 crate::backend::wps::EVENT_BLOCK_FILE,
1391 &scope,
1392 usize::MAX,
1393 );
1394 let mut lines: Vec<String> = Vec::new();
1395 for event in events {
1396 let Some(ref ed) = event.data else { continue };
1397 let fname = ed.get("filename").and_then(|v| v.as_str()).unwrap_or("");
1398 if fname != "output" { continue; }
1399 if let Some(d64) = ed.get("data64").and_then(|v| v.as_str()) {
1400 if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(d64) {
1401 let text = String::from_utf8_lossy(&bytes);
1402 for line in text.lines() {
1403 if !line.trim().is_empty() {
1404 lines.push(line.to_string());
1405 }
1406 }
1407 }
1408 }
1409 }
1410 lines
1411 }
1412 };
1413
1414 let n = all_lines.len();
1416 let start = n.saturating_sub(200);
1417 let window: Vec<&str> = all_lines[start..].iter().map(|s| s.as_str()).collect();
1418
1419 if window.is_empty() {
1420 return Ok(Some(serde_json::to_value(&SessionDigestResult {
1421 summary: String::new(),
1422 generated_at: 0,
1423 cached: false,
1424 }).unwrap()));
1425 }
1426
1427 let extracted = extract_digest_text(&window);
1429
1430 if extracted.is_empty() {
1431 return Ok(Some(serde_json::to_value(&SessionDigestResult {
1432 summary: String::new(),
1433 generated_at: 0,
1434 cached: false,
1435 }).unwrap()));
1436 }
1437
1438 let cli_path = obj::meta_get_string(&block.meta, "cmd", "");
1440 if cli_path.is_empty() {
1441 tracing::warn!(block_id = %cmd.block_id, "session:digest: no CLI path in meta");
1442 return Ok(Some(serde_json::to_value(&SessionDigestResult {
1443 summary: String::new(),
1444 generated_at: 0,
1445 cached: false,
1446 }).unwrap()));
1447 }
1448
1449 let prompt = format!(
1451 "Summarize this AI coding session in 3-4 sentences. Focus on: what was worked on, \
1452 tools used, any errors encountered, and the current state. Be concise and factual.\n\n\
1453 Session content (last 200 events):\n\n{}",
1454 extracted
1455 );
1456
1457 let summary = match invoke_cli_for_digest(&cli_path, &prompt, &block.meta).await {
1459 Ok(text) => text,
1460 Err(e) => {
1461 tracing::warn!(block_id = %cmd.block_id, error = %e, "session:digest: CLI invocation failed");
1462 String::new()
1463 }
1464 };
1465
1466 if summary.is_empty() {
1467 return Ok(Some(serde_json::to_value(&SessionDigestResult {
1468 summary: String::new(),
1469 generated_at: 0,
1470 cached: false,
1471 }).unwrap()));
1472 }
1473
1474 let generated_at = std::time::SystemTime::now()
1476 .duration_since(std::time::UNIX_EPOCH)
1477 .map(|d| d.as_millis() as i64)
1478 .unwrap_or(0);
1479
1480 let mut meta_update = obj::MetaMapType::new();
1481 meta_update.insert("session:digest_summary".to_string(), json!(summary.clone()));
1482 meta_update.insert("session:digest_generated_at".to_string(), json!(generated_at));
1483 meta_update.insert("session:digest_last_line_count".to_string(), json!(current_line_count));
1484
1485 if let Err(e) = crate::server::service::update_object_meta(
1486 &wstore,
1487 &format!("block:{}", cmd.block_id),
1488 &meta_update,
1489 ) {
1490 tracing::warn!(block_id = %cmd.block_id, error = %e, "session:digest: failed to cache in meta");
1491 }
1492
1493 Ok(Some(serde_json::to_value(&SessionDigestResult {
1494 summary,
1495 generated_at,
1496 cached: false,
1497 }).unwrap()))
1498 })
1499 }),
1500 );
1501}
1502
1503fn extract_digest_text(lines: &[&str]) -> String {
1507 let mut parts: Vec<String> = Vec::new();
1508
1509 for line in lines {
1510 let Ok(val) = serde_json::from_str::<serde_json::Value>(line) else { continue };
1511
1512 let msg_type = val.get("type").and_then(|v| v.as_str()).unwrap_or("");
1513
1514 match msg_type {
1515 "assistant" => {
1516 if let Some(content) = val.get("message")
1517 .and_then(|m| m.get("content"))
1518 .and_then(|c| c.as_array())
1519 {
1520 for block in content {
1521 let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
1522 if btype == "text" {
1523 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1524 let trimmed = text.trim();
1525 if !trimmed.is_empty() {
1526 parts.push(format!("[assistant] {}", trimmed));
1527 }
1528 }
1529 } else if btype == "tool_use" {
1530 let tool_name = block.get("name")
1531 .and_then(|v| v.as_str())
1532 .unwrap_or("unknown");
1533 parts.push(format!("[tool] {}", tool_name));
1534 }
1535 }
1536 }
1537 }
1538 "user" => {
1539 if let Some(content) = val.get("message")
1540 .and_then(|m| m.get("content"))
1541 .and_then(|c| c.as_array())
1542 {
1543 for block in content {
1544 let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
1545 if btype == "tool_result" {
1546 let is_error = block.get("is_error")
1547 .and_then(|v| v.as_bool())
1548 .unwrap_or(false);
1549 if is_error {
1550 let err_text = block.get("content")
1551 .and_then(|c| c.as_str())
1552 .unwrap_or("(error)")
1553 .chars().take(120).collect::<String>();
1554 parts.push(format!("[error] {}", err_text));
1555 }
1556 } else if btype == "text" {
1557 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1558 let trimmed = text.trim();
1559 if !trimmed.is_empty() {
1560 parts.push(format!("[user] {}", trimmed));
1561 }
1562 }
1563 }
1564 }
1565 }
1566 }
1567 "result" => {
1568 if let Some(cost) = val.get("total_cost_usd").and_then(|v| v.as_f64()) {
1569 if let Some(turns) = val.get("num_turns").and_then(|v| v.as_u64()) {
1570 parts.push(format!("[summary] {} turns, ${:.4} total cost", turns, cost));
1571 }
1572 }
1573 }
1574 _ => {}
1576 }
1577 }
1578
1579 parts.join("\n")
1580}
1581
1582async fn invoke_cli_for_digest(
1585 cli_path: &str,
1586 prompt: &str,
1587 meta: &obj::MetaMapType,
1588) -> Result<String, String> {
1589 let auth_env: std::collections::HashMap<String, String> = match meta.get("cmd:env") {
1591 Some(serde_json::Value::Object(obj_map)) => obj_map
1592 .iter()
1593 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1594 .collect(),
1595 _ => std::collections::HashMap::new(),
1596 };
1597
1598 let mut child = crate::server::cli_handlers::make_cli_cmd(cli_path)
1604 .args(["-p", "--output-format", "stream-json", "--verbose"])
1605 .envs(&auth_env)
1606 .stdin(std::process::Stdio::piped())
1607 .stdout(std::process::Stdio::piped())
1608 .stderr(std::process::Stdio::null())
1609 .kill_on_drop(true)
1610 .spawn()
1611 .map_err(|e| format!("failed to spawn digest CLI: {e}"))?;
1612
1613 if let Some(mut stdin) = child.stdin.take() {
1614 use tokio::io::AsyncWriteExt;
1615 stdin
1616 .write_all(prompt.as_bytes())
1617 .await
1618 .map_err(|e| format!("digest CLI stdin write: {e}"))?;
1619 stdin
1620 .shutdown()
1621 .await
1622 .map_err(|e| format!("digest CLI stdin shutdown: {e}"))?;
1623 }
1624
1625 let output = tokio::time::timeout(
1626 std::time::Duration::from_secs(60),
1627 child.wait_with_output(),
1628 )
1629 .await
1630 .map_err(|_| "digest CLI timed out after 60s".to_string())?
1631 .map_err(|e| format!("digest CLI wait: {e}"))?;
1632
1633 if !output.status.success() {
1634 return Err(format!("digest CLI exited with status {}", output.status));
1635 }
1636
1637 let stdout = String::from_utf8_lossy(&output.stdout);
1639 let mut last_text = String::new();
1640
1641 for line in stdout.lines() {
1642 let Ok(val) = serde_json::from_str::<serde_json::Value>(line) else { continue };
1643 let msg_type = val.get("type").and_then(|v| v.as_str()).unwrap_or("");
1644 if msg_type == "assistant" {
1645 if let Some(content) = val.get("message")
1646 .and_then(|m| m.get("content"))
1647 .and_then(|c| c.as_array())
1648 {
1649 for block in content {
1650 if block.get("type").and_then(|v| v.as_str()) == Some("text") {
1651 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1652 last_text = text.trim().to_string();
1653 }
1654 }
1655 }
1656 }
1657 }
1658 }
1659
1660 if last_text.is_empty() {
1661 return Err("no text content in digest CLI response".to_string());
1662 }
1663
1664 Ok(last_text)
1665}
1666
1667fn resolve_tab_id(wstore: &WaveStore, explicit: Option<&str>) -> Result<String, String> {
1673 if let Some(tid) = explicit {
1674 return Ok(tid.to_string());
1675 }
1676
1677 let workspaces: Vec<Workspace> = wstore.get_all::<Workspace>()
1679 .map_err(|e| format!("agent.open: list workspaces: {e}"))?;
1680
1681 for ws in &workspaces {
1682 if !ws.activetabid.is_empty() {
1683 return Ok(ws.activetabid.clone());
1684 }
1685 if let Some(first_tab) = ws.tabids.first() {
1686 return Ok(first_tab.clone());
1687 }
1688 }
1689
1690 Err("no tabs found in any workspace".to_string())
1691}
1692
1693fn find_agent_block(wstore: &WaveStore, tab_id: &str, agent_id: &str) -> Result<Option<Block>, String> {
1695 let tab: Tab = wstore.must_get(tab_id)
1696 .map_err(|e| format!("TAB_NOT_FOUND: {e}"))?;
1697
1698 for block_id in &tab.blockids {
1699 if let Ok(Some(block)) = wstore.get::<Block>(block_id) {
1700 let block_agent_id = obj::meta_get_string(&block.meta, "agentId", "");
1701 if block_agent_id == agent_id {
1702 return Ok(Some(block));
1703 }
1704 }
1705 }
1706 Ok(None)
1707}
1708
1709pub fn allocate_agent_workdir(desired: &str) -> Result<String, String> {
1724 let p = std::path::Path::new(desired);
1725 if let Some(parent) = p.parent() {
1726 if !parent.as_os_str().is_empty() {
1727 std::fs::create_dir_all(parent)
1728 .map_err(|e| format!("allocate_agent_workdir: parent {}: {e}", parent.display()))?;
1729 }
1730 }
1731 match std::fs::create_dir(p) {
1732 Ok(()) => return Ok(desired.to_string()),
1733 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
1734 Err(e) => return Err(format!("allocate_agent_workdir: create_dir({}): {e}", desired)),
1735 }
1736 for n in 1..=99u32 {
1737 let candidate = format!("{desired}-{n}");
1738 match std::fs::create_dir(std::path::Path::new(&candidate)) {
1739 Ok(()) => return Ok(candidate),
1740 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue,
1741 Err(e) => return Err(format!("allocate_agent_workdir: create_dir({candidate}): {e}")),
1742 }
1743 }
1744 Err(format!(
1745 "allocate_agent_workdir: too many collisions (>99) under {desired}-N — clean up old runs"
1746 ))
1747}
1748
1749fn write_agent_config_files(
1751 wstore: &WaveStore,
1752 agent: &crate::backend::storage::AgentDefinition,
1753 agent_slug: &str,
1754 work_dir: &str,
1755) -> Result<(), String> {
1756 let contents = wstore.agent_content_get_all(&agent.id)
1758 .unwrap_or_default();
1759 let skills = wstore.agent_skill_list(&agent.id)
1760 .unwrap_or_default();
1761
1762 let mut content_map = std::collections::HashMap::new();
1763 for fc in &contents {
1764 content_map.insert(fc.content_type.clone(), fc.content.clone());
1765 }
1766
1767 let config_files = crate::backend::agent_config::build_config_files(
1768 &content_map,
1769 &skills,
1770 &agent.name,
1771 &agent.id,
1772 );
1773
1774 let expanded_dir = if work_dir.starts_with("~/") || work_dir == "~" {
1776 if let Ok(home) = std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
1777 format!("{}/{}", home, work_dir.trim_start_matches("~/"))
1778 } else {
1779 work_dir.to_string()
1780 }
1781 } else {
1782 work_dir.to_string()
1783 };
1784
1785 let base_path = std::path::Path::new(&expanded_dir);
1786 if !base_path.exists() {
1787 std::fs::create_dir_all(base_path)
1788 .map_err(|e| format!("failed to create working dir: {e}"))?;
1789 }
1790
1791 for file in &config_files {
1792 let file_path = base_path.join(&file.filename);
1793 if let Some(parent) = file_path.parent() {
1794 if !parent.exists() {
1795 let _ = std::fs::create_dir_all(parent);
1796 }
1797 }
1798 std::fs::write(&file_path, &file.content)
1799 .map_err(|e| format!("failed to write {}: {e}", file.filename))?;
1800 }
1801
1802 tracing::info!(
1803 agent_id = %agent.id,
1804 work_dir = %expanded_dir,
1805 file_count = config_files.len(),
1806 "agent.open: wrote config files"
1807 );
1808
1809 Ok(())
1810}
1811