agentmux_srv\server/
app_api.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! App API — high-level commands for programmatic control of AgentMux.
5//!
6//! These commands orchestrate multiple low-level operations (CreateBlock, SetMeta,
7//! ControllerResync) behind stable, intent-based interfaces. Callers express what
8//! they want ("open an agent pane with AgentX"), not how to do it.
9
10use std::sync::Arc;
11
12use base64::Engine;
13use serde_json::json;
14
15use crate::backend::blockcontroller;
16use crate::backend::obj::{self, Block, Tab, Workspace, MetaMapType};
17use crate::backend::providers;
18use crate::backend::rpc::engine::WshRpcEngine;
19use crate::backend::rpc_types::*;
20use crate::backend::session_archive;
21use crate::backend::storage::wstore::WaveStore;
22
23use super::AppState;
24use crate::server::cli_handlers::resolve_cli_on_path;
25
26/// Register all App API handlers on the RPC engine.
27pub fn register_app_api_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
28    register_agent_open(engine, state);
29    register_agent_send(engine, state);
30    register_agent_stop(engine, state);
31    register_agent_status(engine, state);
32    register_agent_list(engine, state);
33    register_agent_output(engine, state);
34    register_agent_process_list(engine, state);
35    register_agent_tracked_blocks(engine, state);
36    register_agent_kill_process(engine, state);
37    register_agent_kill_tree(engine, state);
38    register_pane_open(engine, state);
39    register_blockfile_line_count(engine, state);
40    register_blockfile_read_range(engine, state);
41    register_blockfile_read_state(engine, state);
42    register_blockfile_write_state(engine, state);
43    register_session_digest(engine, state);
44    register_session_archive_handler(engine, state);
45    register_session_restore_handler(engine, state);
46    register_session_export_handler(engine, state);
47}
48
49// ---------------------------------------------------------------------------
50// agent.process-list + agent.tracked-blocks
51// ---------------------------------------------------------------------------
52
53fn register_agent_process_list(engine: &Arc<WshRpcEngine>, state: &AppState) {
54    let process_tracker = state.process_tracker.clone();
55    engine.register_handler(
56        COMMAND_AGENT_PROCESS_LIST,
57        Box::new(move |data, _ctx| {
58            let process_tracker = process_tracker.clone();
59            Box::pin(async move {
60                let cmd: AgentProcessListCommand = serde_json::from_value(data)
61                    .map_err(|e| format!("agent.process-list: {e}"))?;
62                let members = process_tracker.list_block(&cmd.block_id);
63                let confidence = match process_tracker.confidence_of(&cmd.block_id) {
64                    crate::backend::process_tracker::TrackingConfidence::High => "high",
65                    crate::backend::process_tracker::TrackingConfidence::BestEffort => "best_effort",
66                    crate::backend::process_tracker::TrackingConfidence::None => "none",
67                };
68                let processes: Vec<AgentProcessInfo> = members
69                    .into_iter()
70                    .map(|m| AgentProcessInfo {
71                        pid: m.pid,
72                        command: m.command,
73                        rss_bytes: m.rss_bytes,
74                        started_at_ms: m.started_at_ms,
75                    })
76                    .collect();
77                Ok(Some(serde_json::to_value(&AgentProcessListResult {
78                    block_id: cmd.block_id,
79                    confidence: confidence.to_string(),
80                    processes,
81                }).unwrap()))
82            })
83        }),
84    );
85}
86
87fn register_agent_tracked_blocks(engine: &Arc<WshRpcEngine>, state: &AppState) {
88    let process_tracker = state.process_tracker.clone();
89    engine.register_handler(
90        COMMAND_AGENT_TRACKED_BLOCKS,
91        Box::new(move |_data, _ctx| {
92            let process_tracker = process_tracker.clone();
93            Box::pin(async move {
94                Ok(Some(serde_json::to_value(&AgentTrackedBlocksResult {
95                    block_ids: process_tracker.list_all_blocks(),
96                }).unwrap()))
97            })
98        }),
99    );
100}
101
102fn register_agent_kill_process(engine: &Arc<WshRpcEngine>, state: &AppState) {
103    let process_tracker = state.process_tracker.clone();
104    engine.register_handler(
105        COMMAND_AGENT_KILL_PROCESS,
106        Box::new(move |data, _ctx| {
107            let process_tracker = process_tracker.clone();
108            Box::pin(async move {
109                let cmd: AgentKillProcessCommand = serde_json::from_value(data)
110                    .map_err(|e| format!("agent.kill-process: {e}"))?;
111                tracing::info!(
112                    block_id = %cmd.block_id,
113                    pid = cmd.pid,
114                    "agent.kill-process"
115                );
116                let ok = process_tracker.kill_pid(&cmd.block_id, cmd.pid);
117                Ok(Some(serde_json::to_value(&AgentKillResult { ok }).unwrap()))
118            })
119        }),
120    );
121}
122
123fn register_agent_kill_tree(engine: &Arc<WshRpcEngine>, state: &AppState) {
124    let process_tracker = state.process_tracker.clone();
125    engine.register_handler(
126        COMMAND_AGENT_KILL_TREE,
127        Box::new(move |data, _ctx| {
128            let process_tracker = process_tracker.clone();
129            Box::pin(async move {
130                let cmd: AgentKillTreeCommand = serde_json::from_value(data)
131                    .map_err(|e| format!("agent.kill-tree: {e}"))?;
132                tracing::info!(block_id = %cmd.block_id, "agent.kill-tree");
133                let ok = process_tracker.kill_tree(&cmd.block_id);
134                Ok(Some(serde_json::to_value(&AgentKillResult { ok }).unwrap()))
135            })
136        }),
137    );
138}
139
140// ---------------------------------------------------------------------------
141// agent.open
142// ---------------------------------------------------------------------------
143
144fn register_agent_open(engine: &Arc<WshRpcEngine>, state: &AppState) {
145    let wstore = state.wstore.clone();
146    let broker = state.broker.clone();
147    let event_bus = state.event_bus.clone();
148    let filestore = state.filestore.clone();
149
150    engine.register_handler(
151        COMMAND_AGENT_OPEN,
152        Box::new(move |data, _ctx| {
153            let wstore = wstore.clone();
154            let broker = broker.clone();
155            let event_bus = event_bus.clone();
156            let filestore = filestore.clone();
157            Box::pin(async move {
158                let cmd: CommandAgentOpenData = serde_json::from_value(data)
159                    .map_err(|e| format!("agent.open: {e}"))?;
160
161                tracing::info!(agent_id = %cmd.agent_id, "agent.open");
162
163                // 1. Load the agent definition (by id or name)
164                let agents = wstore.agent_def_list()
165                    .map_err(|e| format!("agent.open: {e}"))?;
166                let agent = agents.iter()
167                    .find(|a| a.id == cmd.agent_id || a.name.eq_ignore_ascii_case(&cmd.agent_id))
168                    .ok_or_else(|| format!("AGENT_NOT_FOUND: no agent definition with id '{}'", cmd.agent_id))?
169                    .clone();
170
171                // 2. Resolve provider
172                let provider = providers::get_provider(&agent.provider)
173                    .ok_or_else(|| format!("INVALID_PROVIDER: unknown provider '{}'", agent.provider))?;
174
175                // 3. Determine tab
176                let tab_id = resolve_tab_id(&wstore, cmd.tab_id.as_deref())?;
177
178                // 4. Check for existing agent pane in this tab (idempotent)
179                // Use resolved agent.id (not raw user input which could be a name)
180                if let Some(existing) = find_agent_block(&wstore, &tab_id, &agent.id)? {
181                    // Ensure the controller is registered (may be missing if block
182                    // was created by the frontend without backend initialization)
183                    if blockcontroller::get_controller(&existing.oid).is_none() {
184                        let controller_type = provider.controller_type_str();
185                        // Set essential metadata if missing
186                        let mut meta_update = obj::MetaMapType::new();
187                        meta_update.insert("controller".to_string(), json!(controller_type));
188                        meta_update.insert("agentProvider".to_string(), json!(&agent.provider));
189                        let _ = crate::server::service::update_object_meta(
190                            &wstore, &format!("block:{}", existing.oid), &meta_update,
191                        );
192                        // Register controller
193                        let block_for_resync = wstore.must_get::<Block>(&existing.oid)
194                            .map_err(|e| format!("agent.open: reload block: {e}"))?;
195                        let _ = blockcontroller::resync_controller(
196                            &block_for_resync, &tab_id, None, true,
197                            Some(broker.clone()), Some(event_bus.clone()), Some(wstore.clone()),
198                            Some(filestore.clone()),
199                        );
200                    }
201                    let status = blockcontroller::get_block_controller_status(&existing.oid)
202                        .map(|s| s.shellprocstatus)
203                        .unwrap_or_else(|| "init".to_string());
204                    return Ok(Some(serde_json::to_value(&AgentOpenResult {
205                        block_id: existing.oid,
206                        tab_id,
207                        agent_id: cmd.agent_id,
208                        provider: agent.provider,
209                        controller_type: provider.controller_type_str().to_string(),
210                        status,
211                        created: false,
212                    }).unwrap()));
213                }
214
215                // 5. Resolve CLI path
216                let version = env!("CARGO_PKG_VERSION");
217                let home = std::env::var("HOME")
218                    .or_else(|_| std::env::var("USERPROFILE"))
219                    .map_err(|_| "cannot determine home directory".to_string())?;
220                let provider_dir = format!("{}/.agentmux/{}/cli/{}", home, version, provider.id);
221                let npm_bin = if cfg!(windows) {
222                    format!("{}/node_modules/.bin/{}.cmd", provider_dir, provider.cli_command)
223                } else {
224                    format!("{}/node_modules/.bin/{}", provider_dir, provider.cli_command)
225                };
226                let mut resolved_cli_path = npm_bin.clone();
227                if !std::path::Path::new(&resolved_cli_path).exists() {
228                    // Fallback: provider not installed via npm — try system PATH.
229                    // This is used for Python-based CLIs like Kimi that are not
230                    // distributed on npm.
231                    if provider.npm_package.is_empty() {
232                        if let Some(path) = resolve_cli_on_path(provider.cli_command).await {
233                            resolved_cli_path = path;
234                        }
235                    }
236                    if !std::path::Path::new(&resolved_cli_path).exists() {
237                        return Err(format!(
238                            "CLI_NOT_AVAILABLE: {} not installed at {}. Open an agent pane in the UI to trigger installation.",
239                            provider.cli_command, npm_bin
240                        ));
241                    }
242                }
243
244                // 6. Build metadata
245                let controller_type = provider.controller_type_str();
246                let is_persistent = controller_type == "persistent";
247                let cli_args: Vec<String> = if is_persistent {
248                    provider.persistent_launch_args
249                        .unwrap_or(provider.launch_args)
250                        .iter().map(|s| s.to_string()).collect()
251                } else {
252                    provider.launch_args.iter().map(|s| s.to_string()).collect()
253                };
254
255                let agent_slug = agent.name.to_lowercase()
256                    .chars().map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
257                    .collect::<String>();
258                let work_dir = if agent.working_directory.is_empty() {
259                    format!("~/.agentmux/agents/{}", agent_slug)
260                } else {
261                    agent.working_directory.clone()
262                };
263
264                // Build env vars
265                let mut env_vars = serde_json::Map::new();
266                for key in provider.unset_env {
267                    env_vars.insert(key.to_string(), json!(""));
268                }
269                // Use AGENTMUX_CONFIG_HOME so portable installs stay self-contained.
270                // Falls back to ~/.agentmux/config for non-portable installs.
271                let config_home = std::env::var("AGENTMUX_CONFIG_HOME")
272                    .unwrap_or_else(|_| format!("{}/.agentmux/config", home));
273                // Auth dir
274                let auth_dir = format!("{}/auth/{}", config_home, provider.auth_dir_name);
275                let _ = std::fs::create_dir_all(&auth_dir);
276                env_vars.insert(provider.auth_config_dir_env_var.to_string(), json!(auth_dir));
277                for (k, v) in provider.auth_extra_env {
278                    env_vars.insert(k.to_string(), json!(v));
279                }
280                // Agent identity
281                env_vars.insert("GH_CONFIG_DIR".to_string(), json!(format!("{}/gh-{}", config_home, agent_slug)));
282                env_vars.insert("AGENTMUX_AGENT_ID".to_string(), json!(&agent.name));
283                // Exit delay only for subprocess
284                if !is_persistent {
285                    env_vars.insert("CLAUDE_CODE_EXIT_AFTER_STOP_DELAY".to_string(), json!("30000"));
286                }
287
288                let mut meta = MetaMapType::new();
289                meta.insert("view".to_string(), json!("agent"));
290                meta.insert("agentId".to_string(), json!(&agent.id));
291                meta.insert("agentProvider".to_string(), json!(&agent.provider));
292                meta.insert("agentName".to_string(), json!(&agent.name));
293                meta.insert("agentIcon".to_string(), json!(if agent.icon.is_empty() { "sparkles" } else { &agent.icon }));
294                meta.insert("agentMode".to_string(), json!(if agent.agent_type.is_empty() { "host" } else { &agent.agent_type }));
295                // Derive output format from provider ID (matches frontend providers/index.ts)
296                let output_format = match provider.id {
297                    "claude" => "claude-stream-json",
298                    "codex" => "codex-json",
299                    "gemini" => "gemini-json",
300                    "kimi" => "kimi-stream-json",
301                    _ => "claude-stream-json",
302                };
303                meta.insert("agentOutputFormat".to_string(), json!(output_format));
304                meta.insert("controller".to_string(), json!(controller_type));
305                meta.insert("cmd".to_string(), json!(&resolved_cli_path));
306                meta.insert("cmd:args".to_string(), json!(cli_args));
307                meta.insert("cmd:cwd".to_string(), json!(&work_dir));
308                meta.insert("cmd:env".to_string(), serde_json::Value::Object(env_vars));
309                meta.insert("agent:resume_flag".to_string(), json!(provider.resume_flag.unwrap_or("")));
310                meta.insert("agent:session_id_field".to_string(), json!(provider.session_id_field));
311
312                // 7. Create block + insert into layout tree
313                let block = crate::backend::wcore::create_block(&wstore, &tab_id, meta)
314                    .map_err(|e| format!("agent.open: create_block: {e}"))?;
315                let block_id = block.oid.clone();
316
317                // Enqueue a layout insert action for the frontend to process.
318                // The frontend's LayoutModel watches pendingbackendactions on the
319                // LayoutState and applies them via treeReducer — same mechanism
320                // used by cross-window drag-and-drop (dnd.rs).
321                {
322                    let tab: Tab = wstore.must_get(&tab_id)
323                        .map_err(|e| format!("agent.open: reload tab: {e}"))?;
324                    if let Ok(mut layout) = wstore.must_get::<obj::LayoutState>(&tab.layoutstate) {
325                        let mut actions = layout.pendingbackendactions.take().unwrap_or_default();
326                        actions.push(obj::LayoutActionData {
327                            actiontype: "insert".to_string(),
328                            actionid: uuid::Uuid::new_v4().to_string(),
329                            blockid: block_id.clone(),
330                            nodesize: None,
331                            indexarr: None,
332                            focused: true,
333                            magnified: false,
334                            ephemeral: false,
335                            targetblockid: String::new(),
336                            position: String::new(),
337                        });
338                        layout.pendingbackendactions = Some(actions);
339                        let _ = wstore.update(&mut layout);
340                    }
341                }
342
343                tracing::info!(
344                    block_id = %block_id,
345                    agent_id = %cmd.agent_id,
346                    provider = %agent.provider,
347                    controller_type = %controller_type,
348                    "agent.open: block created + layout updated"
349                );
350
351                // 8. Write agent config files. No collision resolution
352                //    in this path — the function creates the dir if
353                //    missing and overwrites whatever's there. Same-
354                //    name same-hour launches will share a workdir;
355                //    proper allocation is tracked as a follow-up.
356                write_agent_config_files(&wstore, &agent, &agent_slug, &work_dir)?;
357
358                // 9. Register controller (resync)
359                let block_for_resync = wstore.must_get::<Block>(&block_id)
360                    .map_err(|e| format!("agent.open: reload block: {e}"))?;
361                blockcontroller::resync_controller(
362                    &block_for_resync,
363                    &tab_id,
364                    None,
365                    true,
366                    Some(broker.clone()),
367                    Some(event_bus.clone()),
368                    Some(wstore.clone()),
369                    Some(filestore.clone()),
370                )?;
371
372                // 10. Broadcast block + tab + layout updates to frontend
373                {
374                    let mut updates = Vec::new();
375                    if let Ok(updated_block) = wstore.must_get::<Block>(&block_id) {
376                        updates.push(obj::WaveObjUpdate {
377                            updatetype: "update".into(),
378                            otype: "block".into(),
379                            oid: block_id.clone(),
380                            obj: Some(obj::wave_obj_to_value(&updated_block)),
381                        });
382                    }
383                    if let Ok(updated_tab) = wstore.must_get::<Tab>(&tab_id) {
384                        updates.push(obj::WaveObjUpdate {
385                            updatetype: "update".into(),
386                            otype: "tab".into(),
387                            oid: tab_id.clone(),
388                            obj: Some(obj::wave_obj_to_value(&updated_tab)),
389                        });
390                        if let Ok(updated_layout) = wstore.must_get::<obj::LayoutState>(&updated_tab.layoutstate) {
391                            updates.push(obj::WaveObjUpdate {
392                                updatetype: "update".into(),
393                                otype: "layout".into(),
394                                oid: updated_tab.layoutstate.clone(),
395                                obj: Some(obj::wave_obj_to_value(&updated_layout)),
396                            });
397                        }
398                    }
399                    for update in &updates {
400                        let oref = format!("{}:{}", update.otype, update.oid);
401                        if let Ok(data) = serde_json::to_value(update) {
402                            event_bus.broadcast_event(
403                                &crate::backend::eventbus::WSEventType {
404                                    eventtype: "waveobj:update".to_string(),
405                                    oref,
406                                    data: Some(data),
407                                },
408                            );
409                        }
410                    }
411                }
412
413                Ok(Some(serde_json::to_value(&AgentOpenResult {
414                    block_id,
415                    tab_id,
416                    agent_id: cmd.agent_id,
417                    provider: agent.provider,
418                    controller_type: controller_type.to_string(),
419                    status: "init".to_string(),
420                    created: true,
421                }).unwrap()))
422            })
423        }),
424    );
425}
426
427// ---------------------------------------------------------------------------
428// agent.send
429// ---------------------------------------------------------------------------
430
431fn register_agent_send(engine: &Arc<WshRpcEngine>, state: &AppState) {
432    let wstore = state.wstore.clone();
433    let broker = state.broker.clone();
434
435    engine.register_handler(
436        COMMAND_AGENT_SEND,
437        Box::new(move |data, _ctx| {
438            let wstore = wstore.clone();
439            let broker = broker.clone();
440            Box::pin(async move {
441                let cmd: CommandAgentSendData = serde_json::from_value(data)
442                    .map_err(|e| format!("agent.send: {e}"))?;
443
444                tracing::info!(block_id = %cmd.block_id, "agent.send");
445
446                let ctrl = blockcontroller::get_controller(&cmd.block_id)
447                    .ok_or_else(|| format!("NOT_RUNNING: no controller for block {}", cmd.block_id))?;
448
449                // Re-read spawn config from block metadata (same pattern as agentinput)
450                let block: Block = wstore
451                    .get(&cmd.block_id)
452                    .map_err(|e| format!("agent.send: {e}"))?
453                    .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
454
455                let cli_command = obj::meta_get_string(&block.meta, "cmd", "claude");
456                let cli_args: Vec<String> = match block.meta.get("cmd:args") {
457                    Some(serde_json::Value::Array(arr)) => arr
458                        .iter()
459                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
460                        .collect(),
461                    _ => vec![],
462                };
463                let working_dir = obj::meta_get_string(&block.meta, "cmd:cwd", "");
464                let mut env_vars: std::collections::HashMap<String, String> = match block.meta.get("cmd:env") {
465                    Some(serde_json::Value::Object(obj)) => obj
466                        .iter()
467                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
468                        .collect(),
469                    _ => std::collections::HashMap::new(),
470                };
471                // Identity injection — same path as websocket.rs's
472                // AgentInputCommand. See identity/resolver.rs. Passes
473                // the broker so the OAuth-class branch can publish a
474                // `identitybundlebindings:changed:<bundle_id>` event
475                // when the expiry probe updates an account's status
476                // (PR D — spec §4.4).
477                crate::identity::resolver::inject_identity_env_with_broker(
478                    wstore.clone(),
479                    Some(broker.clone()),
480                    &cmd.block_id,
481                    &mut env_vars,
482                );
483                let session_id_field = obj::meta_get_string(
484                    &block.meta, "agent:session_id_field", "session_id",
485                );
486
487                // Dispatch to persistent or subprocess controller
488                let mut session_id = None;
489                if let Some(persistent_ctrl) = ctrl
490                    .as_any()
491                    .downcast_ref::<blockcontroller::persistent::PersistentSubprocessController>()
492                {
493                    let config = blockcontroller::persistent::PersistentSpawnConfig {
494                        cli_command,
495                        cli_args,
496                        working_dir,
497                        env_vars,
498                        session_id_field,
499                    };
500                    persistent_ctrl.send_message(cmd.message, config)?;
501                    session_id = persistent_ctrl.session_id();
502                } else if let Some(subprocess_ctrl) = ctrl
503                    .as_any()
504                    .downcast_ref::<blockcontroller::subprocess::SubprocessController>()
505                {
506                    let resume_flag = obj::meta_get_string(
507                        &block.meta, "agent:resume_flag", "--resume",
508                    );
509                    // Picker reattach (parallel of the websocket-path
510                    // logic): hydrate the persisted session id from
511                    // block meta so spawn_turn appends --resume <sid>
512                    // on the FIRST turn after reattach.
513                    let persisted_session_id = obj::meta_get_string(
514                        &block.meta, "agent:sessionid", "",
515                    );
516                    let config = blockcontroller::subprocess::SubprocessSpawnConfig {
517                        cli_command,
518                        cli_args,
519                        working_dir,
520                        env_vars,
521                        message: cmd.message,
522                        resume_flag,
523                        session_id_field,
524                        message_id: None,
525                        session_id: if persisted_session_id.is_empty() {
526                            None
527                        } else {
528                            Some(persisted_session_id)
529                        },
530                    };
531                    subprocess_ctrl.spawn_turn(config)?;
532                } else {
533                    return Err("NOT_RUNNING: controller type not supported".to_string());
534                }
535
536                Ok(Some(serde_json::to_value(&AgentSendResult {
537                    block_id: cmd.block_id,
538                    status: "running".to_string(),
539                    session_id,
540                }).unwrap()))
541            })
542        }),
543    );
544}
545
546// ---------------------------------------------------------------------------
547// agent.stop
548// ---------------------------------------------------------------------------
549
550fn register_agent_stop(engine: &Arc<WshRpcEngine>, _state: &AppState) {
551    engine.register_handler(
552        COMMAND_AGENT_STOP_API,
553        Box::new(|data, _ctx| {
554            Box::pin(async move {
555                let cmd: CommandAgentStopApiData = serde_json::from_value(data)
556                    .map_err(|e| format!("agent.stop: {e}"))?;
557
558                tracing::info!(block_id = %cmd.block_id, signal = ?cmd.signal, "agent.stop");
559
560                let ctrl = blockcontroller::get_controller(&cmd.block_id)
561                    .ok_or_else(|| format!("NOT_RUNNING: no controller for block {}", cmd.block_id))?;
562
563                let force = matches!(cmd.signal.as_deref(), Some("SIGKILL") | Some("SIGTERM"));
564                ctrl.stop(!force, blockcontroller::STATUS_DONE)?;
565
566                let exit_code = blockcontroller::get_block_controller_status(&cmd.block_id)
567                    .map(|s| s.shellprocexitcode);
568
569                Ok(Some(serde_json::to_value(&AgentStopResult {
570                    block_id: cmd.block_id,
571                    status: "done".to_string(),
572                    exit_code,
573                }).unwrap()))
574            })
575        }),
576    );
577}
578
579// ---------------------------------------------------------------------------
580// agent.status
581// ---------------------------------------------------------------------------
582
583fn register_agent_status(engine: &Arc<WshRpcEngine>, state: &AppState) {
584    let wstore = state.wstore.clone();
585
586    engine.register_handler(
587        COMMAND_AGENT_STATUS,
588        Box::new(move |data, _ctx| {
589            let wstore = wstore.clone();
590            Box::pin(async move {
591                let cmd: CommandAgentStatusData = serde_json::from_value(data)
592                    .map_err(|e| format!("agent.status: {e}"))?;
593
594                let block: Block = wstore
595                    .get(&cmd.block_id)
596                    .map_err(|e| format!("agent.status: {e}"))?
597                    .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
598
599                let agent_id = obj::meta_get_string(&block.meta, "agentId", "");
600                let provider = obj::meta_get_string(&block.meta, "agentProvider", "");
601                let controller_type = obj::meta_get_string(&block.meta, "controller", "");
602
603                let runtime = blockcontroller::get_block_controller_status(&cmd.block_id);
604                let status = runtime.as_ref()
605                    .map(|s| s.shellprocstatus.clone())
606                    .unwrap_or_else(|| "init".to_string());
607                let exit_code = runtime.as_ref()
608                    .and_then(|s| if s.shellprocstatus == "done" { Some(s.shellprocexitcode) } else { None });
609                let pid = None; // PID not currently exposed in status struct
610
611                // Get session ID from block meta
612                let session_id = block.meta.get("agent:sessionid")
613                    .and_then(|v| v.as_str())
614                    .map(|s| s.to_string());
615
616                Ok(Some(serde_json::to_value(&AgentStatusResult {
617                    block_id: cmd.block_id,
618                    agent_id,
619                    provider,
620                    controller_type,
621                    status,
622                    session_id,
623                    pid,
624                    exit_code,
625                }).unwrap()))
626            })
627        }),
628    );
629}
630
631// ---------------------------------------------------------------------------
632// agent.list
633// ---------------------------------------------------------------------------
634
635fn register_agent_list(engine: &Arc<WshRpcEngine>, state: &AppState) {
636    let wstore = state.wstore.clone();
637
638    engine.register_handler(
639        COMMAND_AGENT_LIST,
640        Box::new(move |_data, _ctx| {
641            let wstore = wstore.clone();
642            Box::pin(async move {
643                let tabs: Vec<Tab> = wstore.get_all::<Tab>()
644                    .map_err(|e| format!("agent.list: {e}"))?;
645
646                let mut agents = Vec::new();
647                for tab in &tabs {
648                    for block_id in &tab.blockids {
649                        if let Ok(Some(block)) = wstore.get::<Block>(block_id) {
650                            let agent_id = obj::meta_get_string(&block.meta, "agentId", "");
651                            if agent_id.is_empty() {
652                                continue;
653                            }
654                            let provider = obj::meta_get_string(&block.meta, "agentProvider", "");
655                            let status = blockcontroller::get_block_controller_status(block_id)
656                                .map(|s| s.shellprocstatus)
657                                .unwrap_or_else(|| "init".to_string());
658                            let session_id = block.meta.get("agent:sessionid")
659                                .and_then(|v| v.as_str())
660                                .map(|s| s.to_string());
661
662                            agents.push(AgentListEntry {
663                                block_id: block_id.clone(),
664                                tab_id: tab.oid.clone(),
665                                agent_id,
666                                provider,
667                                status,
668                                session_id,
669                            });
670                        }
671                    }
672                }
673
674                Ok(Some(serde_json::to_value(&AgentListResult { agents }).unwrap()))
675            })
676        }),
677    );
678}
679
680// ---------------------------------------------------------------------------
681// agent.output
682// ---------------------------------------------------------------------------
683
684fn register_agent_output(engine: &Arc<WshRpcEngine>, state: &AppState) {
685    let broker = state.broker.clone();
686
687    engine.register_handler(
688        COMMAND_AGENT_OUTPUT,
689        Box::new(move |data, _ctx| {
690            let broker = broker.clone();
691            Box::pin(async move {
692                let cmd: CommandAgentOutputData = serde_json::from_value(data)
693                    .map_err(|e| format!("agent.output: {e}"))?;
694
695                let scope = format!("block:{}", cmd.block_id);
696                let max = cmd.max_lines.unwrap_or(1000);
697                let after = cmd.after_line.unwrap_or(0);
698
699                // Read persisted blockfile events from broker history
700                let mut all_lines: Vec<String> = Vec::new();
701                {
702                    let events = broker.read_event_history(
703                        crate::backend::wps::EVENT_BLOCK_FILE,
704                        &scope,
705                        max + after, // read enough to cover offset
706                    );
707                    for event in events {
708                        if let Some(ref data) = event.data {
709                            if let Some(data64) = data.get("data64").and_then(|v| v.as_str()) {
710                                if let Ok(bytes) = base64::engine::general_purpose::STANDARD
711                                    .decode(data64)
712                                {
713                                    let text = String::from_utf8_lossy(&bytes);
714                                    for line in text.lines() {
715                                        if !line.trim().is_empty() {
716                                            all_lines.push(line.to_string());
717                                        }
718                                    }
719                                }
720                            }
721                        }
722                    }
723                }
724
725                let total = all_lines.len();
726                let lines: Vec<String> = all_lines.into_iter()
727                    .skip(after)
728                    .take(max)
729                    .collect();
730                let has_more = after + lines.len() < total;
731
732                Ok(Some(serde_json::to_value(&AgentOutputResult {
733                    block_id: cmd.block_id,
734                    lines,
735                    total_lines: total,
736                    has_more,
737                }).unwrap()))
738            })
739        }),
740    );
741}
742
743// ---------------------------------------------------------------------------
744// pane.open
745// ---------------------------------------------------------------------------
746
747fn register_pane_open(engine: &Arc<WshRpcEngine>, state: &AppState) {
748    let wstore = state.wstore.clone();
749    let event_bus = state.event_bus.clone();
750
751    engine.register_handler(
752        COMMAND_PANE_OPEN,
753        Box::new(move |data, _ctx| {
754            let wstore = wstore.clone();
755            let event_bus = event_bus.clone();
756            Box::pin(async move {
757                let cmd: CommandPaneOpenData = serde_json::from_value(data)
758                    .map_err(|e| format!("pane.open: {e}"))?;
759
760                tracing::info!(view = %cmd.view, "pane.open");
761
762                // Build meta for the requested view, validating required args
763                let meta = build_pane_meta(&cmd)?;
764
765                // Resolve tab
766                let tab_id = resolve_tab_id(&wstore, cmd.tab_id.as_deref())?;
767
768                // Create block
769                let block = crate::backend::wcore::create_block(&wstore, &tab_id, meta)
770                    .map_err(|e| format!("pane.open: create_block: {e}"))?;
771                let block_id = block.oid.clone();
772
773                // Enqueue layout action — split if requested, else append
774                let (actiontype, targetblockid, position) = resolve_placement(
775                    cmd.split_direction.as_deref(),
776                    cmd.split_reference_block_id.as_deref(),
777                );
778                let focused = cmd.focus.unwrap_or(true);
779
780                {
781                    let tab: Tab = wstore.must_get(&tab_id)
782                        .map_err(|e| format!("pane.open: reload tab: {e}"))?;
783                    if let Ok(mut layout) = wstore.must_get::<obj::LayoutState>(&tab.layoutstate) {
784                        let mut actions = layout.pendingbackendactions.take().unwrap_or_default();
785                        actions.push(obj::LayoutActionData {
786                            actiontype,
787                            actionid: uuid::Uuid::new_v4().to_string(),
788                            blockid: block_id.clone(),
789                            nodesize: None,
790                            indexarr: None,
791                            focused,
792                            magnified: false,
793                            ephemeral: false,
794                            targetblockid,
795                            position,
796                        });
797                        layout.pendingbackendactions = Some(actions);
798                        let _ = wstore.update(&mut layout);
799                    }
800                }
801
802                tracing::info!(
803                    block_id = %block_id,
804                    view = %cmd.view,
805                    "pane.open: block created + layout updated"
806                );
807
808                // Broadcast block + tab + layout updates
809                {
810                    let mut updates = Vec::new();
811                    if let Ok(updated_block) = wstore.must_get::<Block>(&block_id) {
812                        updates.push(obj::WaveObjUpdate {
813                            updatetype: "update".into(),
814                            otype: "block".into(),
815                            oid: block_id.clone(),
816                            obj: Some(obj::wave_obj_to_value(&updated_block)),
817                        });
818                    }
819                    if let Ok(updated_tab) = wstore.must_get::<Tab>(&tab_id) {
820                        updates.push(obj::WaveObjUpdate {
821                            updatetype: "update".into(),
822                            otype: "tab".into(),
823                            oid: tab_id.clone(),
824                            obj: Some(obj::wave_obj_to_value(&updated_tab)),
825                        });
826                        if let Ok(updated_layout) = wstore.must_get::<obj::LayoutState>(&updated_tab.layoutstate) {
827                            updates.push(obj::WaveObjUpdate {
828                                updatetype: "update".into(),
829                                otype: "layout".into(),
830                                oid: updated_tab.layoutstate.clone(),
831                                obj: Some(obj::wave_obj_to_value(&updated_layout)),
832                            });
833                        }
834                    }
835                    for update in &updates {
836                        let oref = format!("{}:{}", update.otype, update.oid);
837                        if let Ok(data) = serde_json::to_value(update) {
838                            event_bus.broadcast_event(
839                                &crate::backend::eventbus::WSEventType {
840                                    eventtype: "waveobj:update".to_string(),
841                                    oref,
842                                    data: Some(data),
843                                },
844                            );
845                        }
846                    }
847                }
848
849                Ok(Some(serde_json::to_value(&PaneOpenResult {
850                    block_id,
851                    tab_id,
852                    view: cmd.view,
853                    created: true,
854                }).unwrap()))
855            })
856        }),
857    );
858}
859
860/// Build the metadata map for a pane.open request, validating required args.
861fn build_pane_meta(cmd: &CommandPaneOpenData) -> Result<MetaMapType, String> {
862    let mut meta = MetaMapType::new();
863
864    match cmd.view.as_str() {
865        "editor" => {
866            let file = cmd.file.as_deref().filter(|s| !s.is_empty())
867                .ok_or_else(|| "MISSING_ARG: view=editor requires 'file'".to_string())?;
868            meta.insert("view".to_string(), json!("editor"));
869            meta.insert("file".to_string(), json!(file));
870        }
871        "term" => {
872            meta.insert("view".to_string(), json!("term"));
873            meta.insert("controller".to_string(), json!("shell"));
874            if let Some(cwd) = cmd.cwd.as_deref().filter(|s| !s.is_empty()) {
875                meta.insert("cmd:cwd".to_string(), json!(cwd));
876            }
877        }
878        "browser" => {
879            let url = cmd.url.as_deref().filter(|s| !s.is_empty())
880                .ok_or_else(|| "MISSING_ARG: view=browser requires 'url'".to_string())?;
881            meta.insert("view".to_string(), json!("browser"));
882            meta.insert("url".to_string(), json!(url));
883        }
884        "sysinfo" => {
885            meta.insert("view".to_string(), json!("sysinfo"));
886        }
887        "help" => {
888            meta.insert("view".to_string(), json!("help"));
889        }
890        other => {
891            return Err(format!(
892                "INVALID_VIEW: unsupported view '{other}' (expected editor/term/browser/sysinfo/help)"
893            ));
894        }
895    }
896
897    if let Some(title) = cmd.title.as_deref().filter(|s| !s.is_empty()) {
898        meta.insert("frame:title".to_string(), json!(title));
899    }
900
901    Ok(meta)
902}
903
904/// Translate `split_direction` + `split_reference_block_id` into the backend
905/// layout action triple. Returns `(actiontype, targetblockid, position)`.
906/// Falls back to a plain `insert` if direction/reference are missing.
907fn resolve_placement(
908    direction: Option<&str>,
909    reference: Option<&str>,
910) -> (String, String, String) {
911    let reference = match reference.filter(|s| !s.is_empty()) {
912        Some(r) => r,
913        None => return ("insert".to_string(), String::new(), String::new()),
914    };
915
916    let (actiontype, position) = match direction {
917        Some("right") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_HORIZONTAL, "after"),
918        Some("left") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_HORIZONTAL, "before"),
919        Some("down") | Some("below") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_VERTICAL, "after"),
920        Some("up") | Some("above") => (crate::backend::wcore::LAYOUT_ACTION_SPLIT_VERTICAL, "before"),
921        _ => return ("insert".to_string(), String::new(), String::new()),
922    };
923
924    (actiontype.to_string(), reference.to_string(), position.to_string())
925}
926
927// ---------------------------------------------------------------------------
928// blockfile:line_count
929// ---------------------------------------------------------------------------
930
931fn register_blockfile_line_count(engine: &Arc<WshRpcEngine>, state: &AppState) {
932    let broker = state.broker.clone();
933    let wstore = state.wstore.clone();
934
935    engine.register_handler(
936        COMMAND_BLOCKFILE_LINE_COUNT,
937        Box::new(move |data, _ctx| {
938            let broker = broker.clone();
939            let wstore = wstore.clone();
940            Box::pin(async move {
941                let cmd: CommandBlockfileLineCountData = serde_json::from_value(data)
942                    .map_err(|e| format!("blockfile:line_count: {e}"))?;
943
944                tracing::info!(block_id = %cmd.block_id, filename = %cmd.filename, "blockfile:line_count");
945
946                // Fast path: read session:line_count meta (O(1), maintained
947                // by SessionStatsAccumulator). For "output" filename this is
948                // the authoritative total — matches the unbounded counter
949                // that SessionStats increments on every line. FileStore's
950                // persisted line count will trail meta by up to the debounce
951                // interval (1s), and reading the full file just to count
952                // lines is O(file size) which defeats the point of a fast
953                // line_count endpoint.
954                if cmd.filename == "output" {
955                    if let Ok(Some(block)) = wstore.get::<Block>(&cmd.block_id) {
956                        if let Some(count) = block.meta.get("session:line_count").and_then(|v| v.as_u64()) {
957                            return Ok(Some(serde_json::to_value(
958                                &BlockfileLineCountResult { count },
959                            ).unwrap()));
960                        }
961                    }
962                }
963
964                // Fallback: count from WPS event ring buffer (capped at MAX_PERSIST = 4096).
965                let scope = format!("block:{}", cmd.block_id);
966                let events = broker.read_event_history(
967                    crate::backend::wps::EVENT_BLOCK_FILE,
968                    &scope,
969                    usize::MAX, // broker clamps to MAX_PERSIST internally
970                );
971
972                let mut count: u64 = 0;
973                for event in events {
974                    if let Some(ref event_data) = event.data {
975                        let ev_filename = event_data.get("filename")
976                            .and_then(|v| v.as_str()).unwrap_or("");
977                        if ev_filename != cmd.filename {
978                            continue;
979                        }
980                        if let Some(data64) = event_data.get("data64").and_then(|v| v.as_str()) {
981                            if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(data64) {
982                                let text = String::from_utf8_lossy(&bytes);
983                                for line in text.lines() {
984                                    if !line.trim().is_empty() {
985                                        count += 1;
986                                    }
987                                }
988                            }
989                        }
990                    }
991                }
992
993                Ok(Some(serde_json::to_value(&BlockfileLineCountResult { count }).unwrap()))
994            })
995        }),
996    );
997}
998
999// ---------------------------------------------------------------------------
1000// blockfile:read_range
1001// ---------------------------------------------------------------------------
1002
1003fn register_blockfile_read_range(engine: &Arc<WshRpcEngine>, state: &AppState) {
1004    let broker = state.broker.clone();
1005    let filestore = state.filestore.clone();
1006
1007    engine.register_handler(
1008        COMMAND_BLOCKFILE_READ_RANGE,
1009        Box::new(move |data, _ctx| {
1010            let broker = broker.clone();
1011            let filestore = filestore.clone();
1012            Box::pin(async move {
1013                let cmd: CommandBlockfileReadRangeData = serde_json::from_value(data)
1014                    .map_err(|e| format!("blockfile:read_range: {e}"))?;
1015
1016                tracing::info!(block_id = %cmd.block_id, filename = %cmd.filename, offset = cmd.offset, limit = cmd.limit, "blockfile:read_range");
1017
1018                let limit = cmd.limit.min(10_000) as usize;
1019                let offset = cmd.offset as usize;
1020                let end = offset.saturating_add(limit);
1021
1022                // Phase 1.3: Prefer FileStore (persistent, no size cap) over the
1023                // WPS broker ring buffer (MAX_PERSIST = 4096 events).
1024                //
1025                // If FileStore has the file and it is non-empty, read from disk.
1026                // Otherwise fall back to ring buffer for backward compatibility.
1027                let filestore_lines = match filestore.stat(&cmd.block_id, &cmd.filename) {
1028                    Ok(Some(ref wf)) if wf.size > 0 => {
1029                        match filestore.read_file(&cmd.block_id, &cmd.filename) {
1030                            Ok(Some(bytes)) => {
1031                                let text = String::from_utf8_lossy(&bytes);
1032                                let lines: Vec<String> = text.lines()
1033                                    .filter(|l| !l.trim().is_empty())
1034                                    .map(|l| l.to_string())
1035                                    .collect();
1036                                Some(lines)
1037                            }
1038                            Ok(None) => None,
1039                            Err(e) => {
1040                                tracing::warn!(
1041                                    block_id = %cmd.block_id,
1042                                    filename = %cmd.filename,
1043                                    error = %e,
1044                                    "blockfile:read_range: filestore read failed, falling back to ring buffer"
1045                                );
1046                                None
1047                            }
1048                        }
1049                    }
1050                    Ok(_) => None, // file absent or empty → fall back
1051                    Err(e) => {
1052                        tracing::warn!(
1053                            block_id = %cmd.block_id,
1054                            error = %e,
1055                            "blockfile:read_range: filestore stat failed, falling back to ring buffer"
1056                        );
1057                        None
1058                    }
1059                };
1060
1061                let all_lines = if let Some(lines) = filestore_lines {
1062                    lines
1063                } else {
1064                    // Fallback: reconstruct from WPS event ring buffer.
1065                    // The ring buffer holds at most MAX_PERSIST = 4096 events;
1066                    // older events are evicted. Offset 0 = oldest retained line.
1067                    let scope = format!("block:{}", cmd.block_id);
1068                    let events = broker.read_event_history(
1069                        crate::backend::wps::EVENT_BLOCK_FILE,
1070                        &scope,
1071                        usize::MAX, // broker clamps to MAX_PERSIST internally
1072                    );
1073
1074                    let mut lines: Vec<String> = Vec::new();
1075                    for event in events {
1076                        let Some(ref event_data) = event.data else { continue };
1077                        let ev_filename = event_data.get("filename")
1078                            .and_then(|v| v.as_str()).unwrap_or("");
1079                        if ev_filename != cmd.filename {
1080                            continue;
1081                        }
1082                        let Some(data64) = event_data.get("data64").and_then(|v| v.as_str()) else { continue };
1083                        let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(data64) else { continue };
1084                        let text = String::from_utf8_lossy(&bytes);
1085                        for line in text.lines() {
1086                            if !line.trim().is_empty() {
1087                                lines.push(line.to_string());
1088                            }
1089                        }
1090                    }
1091                    lines
1092                };
1093
1094                let total = all_lines.len() as u64;
1095                let clamped_offset = offset.min(all_lines.len());
1096                let clamped_end = end.min(all_lines.len());
1097                let lines: Vec<String> = if clamped_offset >= clamped_end {
1098                    Vec::new()
1099                } else {
1100                    all_lines[clamped_offset..clamped_end].to_vec()
1101                };
1102
1103                Ok(Some(serde_json::to_value(&BlockfileReadRangeResult {
1104                    lines,
1105                    total,
1106                }).unwrap()))
1107            })
1108        }),
1109    );
1110}
1111
1112// ---------------------------------------------------------------------------
1113// blockfile:read_state — sidecar JSON snapshot read
1114// Spec: docs/specs/SPEC_AGENT_PANE_STATE_PERSISTENCE_2026_05_15.md §4.6
1115// ---------------------------------------------------------------------------
1116
1117fn register_blockfile_read_state(engine: &Arc<WshRpcEngine>, state: &AppState) {
1118    let filestore = state.filestore.clone();
1119    engine.register_handler(
1120        COMMAND_BLOCKFILE_READ_STATE,
1121        Box::new(move |data, _ctx| {
1122            let filestore = filestore.clone();
1123            Box::pin(async move {
1124                let cmd: CommandBlockfileReadStateData = serde_json::from_value(data)
1125                    .map_err(|e| format!("blockfile:read_state: {e}"))?;
1126                if cmd.filename.contains('/') || cmd.filename.contains('\\') || cmd.filename.contains("..") {
1127                    return Err("blockfile:read_state: filename must not contain path separators".to_string());
1128                }
1129                tracing::debug!(block_id = %cmd.block_id, filename = %cmd.filename, "blockfile:read_state");
1130
1131                let content = match filestore.read_file(&cmd.block_id, &cmd.filename) {
1132                    Ok(Some(bytes)) => Some(String::from_utf8_lossy(&bytes).into_owned()),
1133                    Ok(None) => None,
1134                    Err(e) => {
1135                        // NotFound is the common case (no snapshot yet). Suppress.
1136                        if matches!(e, crate::backend::storage::StoreError::NotFound) {
1137                            None
1138                        } else {
1139                            tracing::warn!(block_id = %cmd.block_id, error = %e, "blockfile:read_state: read failed");
1140                            None
1141                        }
1142                    }
1143                };
1144
1145                Ok(Some(serde_json::to_value(&BlockfileReadStateResult { content }).unwrap()))
1146            })
1147        }),
1148    );
1149}
1150
1151// ---------------------------------------------------------------------------
1152// blockfile:write_state — sidecar JSON snapshot write (atomic via DB tx)
1153// Spec: docs/specs/SPEC_AGENT_PANE_STATE_PERSISTENCE_2026_05_15.md §4.3
1154// ---------------------------------------------------------------------------
1155
1156fn register_blockfile_write_state(engine: &Arc<WshRpcEngine>, state: &AppState) {
1157    let filestore = state.filestore.clone();
1158    engine.register_handler(
1159        COMMAND_BLOCKFILE_WRITE_STATE,
1160        Box::new(move |data, _ctx| {
1161            let filestore = filestore.clone();
1162            Box::pin(async move {
1163                let cmd: CommandBlockfileWriteStateData = serde_json::from_value(data)
1164                    .map_err(|e| format!("blockfile:write_state: {e}"))?;
1165                if cmd.filename.contains('/') || cmd.filename.contains('\\') || cmd.filename.contains("..") {
1166                    return Err("blockfile:write_state: filename must not contain path separators".to_string());
1167                }
1168                let bytes = cmd.content.as_bytes();
1169                let bytes_written = bytes.len() as u64;
1170                tracing::debug!(block_id = %cmd.block_id, filename = %cmd.filename, bytes = bytes_written, "blockfile:write_state");
1171
1172                // FileStore.write_file is atomic at the DB level (single
1173                // tx replaces all data parts) — no torn write surfaces.
1174                // Need make_file first if the sidecar doesn't yet exist.
1175                use crate::backend::storage::filestore::{FileMeta, FileOpts};
1176                use crate::backend::storage::StoreError;
1177                match filestore.write_file(&cmd.block_id, &cmd.filename, bytes) {
1178                    Ok(()) => {}
1179                    Err(StoreError::NotFound) => {
1180                        filestore
1181                            .make_file(&cmd.block_id, &cmd.filename, FileMeta::default(), FileOpts::default())
1182                            .map_err(|e| format!("blockfile:write_state: make_file: {e}"))?;
1183                        filestore
1184                            .write_file(&cmd.block_id, &cmd.filename, bytes)
1185                            .map_err(|e| format!("blockfile:write_state: write_file: {e}"))?;
1186                    }
1187                    Err(e) => return Err(format!("blockfile:write_state: {e}")),
1188                }
1189
1190                Ok(Some(serde_json::to_value(&BlockfileWriteStateResult { bytes_written }).unwrap()))
1191            })
1192        }),
1193    );
1194}
1195
1196// ---------------------------------------------------------------------------
1197// session:archive
1198// ---------------------------------------------------------------------------
1199
1200fn register_session_archive_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1201    let wstore = state.wstore.clone();
1202    let filestore = state.filestore.clone();
1203
1204    engine.register_handler(
1205        COMMAND_SESSION_ARCHIVE,
1206        Box::new(move |data, _ctx| {
1207            let wstore = wstore.clone();
1208            let filestore = filestore.clone();
1209            Box::pin(async move {
1210                let cmd: CommandSessionArchiveData = serde_json::from_value(data)
1211                    .map_err(|e| format!("session:archive: {e}"))?;
1212
1213                tracing::info!(block_id = %cmd.block_id, "session:archive");
1214
1215                let archive_dir = session_archive::default_archive_dir()
1216                    .ok_or_else(|| "cannot determine home directory".to_string())?;
1217
1218                let (archived_bytes, archived_at) = session_archive::archive_session_output(
1219                    &wstore,
1220                    &filestore,
1221                    &cmd.block_id,
1222                    &archive_dir,
1223                )?;
1224
1225                Ok(Some(serde_json::to_value(&SessionArchiveResult {
1226                    block_id: cmd.block_id,
1227                    archived_bytes,
1228                    archived_at,
1229                }).unwrap()))
1230            })
1231        }),
1232    );
1233}
1234
1235// ---------------------------------------------------------------------------
1236// session:restore
1237// ---------------------------------------------------------------------------
1238
1239fn register_session_restore_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1240    let wstore = state.wstore.clone();
1241    let filestore = state.filestore.clone();
1242
1243    engine.register_handler(
1244        COMMAND_SESSION_RESTORE,
1245        Box::new(move |data, _ctx| {
1246            let wstore = wstore.clone();
1247            let filestore = filestore.clone();
1248            Box::pin(async move {
1249                let cmd: CommandSessionRestoreData = serde_json::from_value(data)
1250                    .map_err(|e| format!("session:restore: {e}"))?;
1251
1252                tracing::info!(block_id = %cmd.block_id, "session:restore");
1253
1254                let restored_bytes = session_archive::restore_session_output(
1255                    &wstore,
1256                    &filestore,
1257                    &cmd.block_id,
1258                )?;
1259
1260                Ok(Some(serde_json::to_value(&SessionRestoreResult {
1261                    block_id: cmd.block_id,
1262                    restored_bytes,
1263                }).unwrap()))
1264            })
1265        }),
1266    );
1267}
1268
1269// ---------------------------------------------------------------------------
1270// session:export
1271// ---------------------------------------------------------------------------
1272
1273fn register_session_export_handler(engine: &Arc<WshRpcEngine>, state: &AppState) {
1274    let wstore = state.wstore.clone();
1275    let filestore = state.filestore.clone();
1276
1277    engine.register_handler(
1278        COMMAND_SESSION_EXPORT,
1279        Box::new(move |data, _ctx| {
1280            let wstore = wstore.clone();
1281            let filestore = filestore.clone();
1282            Box::pin(async move {
1283                let cmd: CommandSessionExportData = serde_json::from_value(data)
1284                    .map_err(|e| format!("session:export: {e}"))?;
1285
1286                tracing::info!(block_id = %cmd.block_id, "session:export");
1287
1288                let (raw_bytes, line_count) = session_archive::read_session_output(
1289                    &wstore,
1290                    &filestore,
1291                    &cmd.block_id,
1292                )?;
1293
1294                let byte_count = raw_bytes.len() as u64;
1295                let content = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
1296
1297                Ok(Some(serde_json::to_value(&SessionExportResult {
1298                    content,
1299                    line_count,
1300                    byte_count,
1301                }).unwrap()))
1302            })
1303        }),
1304    );
1305}
1306
1307// ---------------------------------------------------------------------------
1308// session:digest
1309// ---------------------------------------------------------------------------
1310
1311fn register_session_digest(engine: &Arc<WshRpcEngine>, state: &AppState) {
1312    let wstore = state.wstore.clone();
1313    let filestore = state.filestore.clone();
1314    let broker = state.broker.clone();
1315
1316    engine.register_handler(
1317        COMMAND_SESSION_DIGEST,
1318        Box::new(move |data, _ctx| {
1319            let wstore = wstore.clone();
1320            let filestore = filestore.clone();
1321            let broker = broker.clone();
1322            Box::pin(async move {
1323                let cmd: CommandSessionDigestData = serde_json::from_value(data)
1324                    .map_err(|e| format!("session:digest: {e}"))?;
1325
1326                tracing::info!(block_id = %cmd.block_id, force = ?cmd.force, "session:digest");
1327
1328                let force = cmd.force.unwrap_or(false);
1329
1330                // Read block meta
1331                let block: Block = wstore
1332                    .get(&cmd.block_id)
1333                    .map_err(|e| format!("session:digest: {e}"))?
1334                    .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", cmd.block_id))?;
1335
1336                // Check for a valid cached digest
1337                let cached_summary = block.meta.get("session:digest_summary")
1338                    .and_then(|v| v.as_str())
1339                    .map(|s| s.to_string());
1340                let cached_generated_at = block.meta.get("session:digest_generated_at")
1341                    .and_then(|v| v.as_i64())
1342                    .unwrap_or(0);
1343                let digest_last_line_count = block.meta.get("session:digest_last_line_count")
1344                    .and_then(|v| v.as_u64())
1345                    .unwrap_or(0);
1346
1347                // Current line count from meta (O(1))
1348                let current_line_count = block.meta.get("session:line_count")
1349                    .and_then(|v| v.as_u64())
1350                    .unwrap_or(0);
1351
1352                // Serve cache if: not forced, cached digest exists, AND fewer than 20 new lines
1353                // since the digest was last generated.
1354                let lines_since_digest = current_line_count.saturating_sub(digest_last_line_count);
1355                if !force && cached_summary.is_some() && lines_since_digest < 20 {
1356                    return Ok(Some(serde_json::to_value(&SessionDigestResult {
1357                        summary: cached_summary.unwrap(),
1358                        generated_at: cached_generated_at,
1359                        cached: true,
1360                    }).unwrap()));
1361                }
1362
1363                // --- Generate a new digest ---
1364
1365                // Read up to the last 200 lines from FileStore, falling back to the WPS ring buffer.
1366                let all_lines: Vec<String> = {
1367                    let filestore_lines = match filestore.stat(&cmd.block_id, "output") {
1368                        Ok(Some(ref wf)) if wf.size > 0 => {
1369                            match filestore.read_file(&cmd.block_id, "output") {
1370                                Ok(Some(bytes)) => {
1371                                    let text = String::from_utf8_lossy(&bytes);
1372                                    let lines: Vec<String> = text.lines()
1373                                        .filter(|l| !l.trim().is_empty())
1374                                        .map(|l| l.to_string())
1375                                        .collect();
1376                                    Some(lines)
1377                                }
1378                                _ => None,
1379                            }
1380                        }
1381                        _ => None,
1382                    };
1383
1384                    if let Some(lines) = filestore_lines {
1385                        lines
1386                    } else {
1387                        // Fallback: WPS ring buffer
1388                        let scope = format!("block:{}", cmd.block_id);
1389                        let events = broker.read_event_history(
1390                            crate::backend::wps::EVENT_BLOCK_FILE,
1391                            &scope,
1392                            usize::MAX,
1393                        );
1394                        let mut lines: Vec<String> = Vec::new();
1395                        for event in events {
1396                            let Some(ref ed) = event.data else { continue };
1397                            let fname = ed.get("filename").and_then(|v| v.as_str()).unwrap_or("");
1398                            if fname != "output" { continue; }
1399                            if let Some(d64) = ed.get("data64").and_then(|v| v.as_str()) {
1400                                if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(d64) {
1401                                    let text = String::from_utf8_lossy(&bytes);
1402                                    for line in text.lines() {
1403                                        if !line.trim().is_empty() {
1404                                            lines.push(line.to_string());
1405                                        }
1406                                    }
1407                                }
1408                            }
1409                        }
1410                        lines
1411                    }
1412                };
1413
1414                // Take the last 200 lines
1415                let n = all_lines.len();
1416                let start = n.saturating_sub(200);
1417                let window: Vec<&str> = all_lines[start..].iter().map(|s| s.as_str()).collect();
1418
1419                if window.is_empty() {
1420                    return Ok(Some(serde_json::to_value(&SessionDigestResult {
1421                        summary: String::new(),
1422                        generated_at: 0,
1423                        cached: false,
1424                    }).unwrap()));
1425                }
1426
1427                // Extract meaningful text (skip system events and raw stream deltas)
1428                let extracted = extract_digest_text(&window);
1429
1430                if extracted.is_empty() {
1431                    return Ok(Some(serde_json::to_value(&SessionDigestResult {
1432                        summary: String::new(),
1433                        generated_at: 0,
1434                        cached: false,
1435                    }).unwrap()));
1436                }
1437
1438                // Locate the Claude CLI (stored in block meta as "cmd" by runLaunchFlow)
1439                let cli_path = obj::meta_get_string(&block.meta, "cmd", "");
1440                if cli_path.is_empty() {
1441                    tracing::warn!(block_id = %cmd.block_id, "session:digest: no CLI path in meta");
1442                    return Ok(Some(serde_json::to_value(&SessionDigestResult {
1443                        summary: String::new(),
1444                        generated_at: 0,
1445                        cached: false,
1446                    }).unwrap()));
1447                }
1448
1449                // Build the summarization prompt
1450                let prompt = format!(
1451                    "Summarize this AI coding session in 3-4 sentences. Focus on: what was worked on, \
1452                     tools used, any errors encountered, and the current state. Be concise and factual.\n\n\
1453                     Session content (last 200 events):\n\n{}",
1454                    extracted
1455                );
1456
1457                // Invoke the Claude CLI and extract the summary text
1458                let summary = match invoke_cli_for_digest(&cli_path, &prompt, &block.meta).await {
1459                    Ok(text) => text,
1460                    Err(e) => {
1461                        tracing::warn!(block_id = %cmd.block_id, error = %e, "session:digest: CLI invocation failed");
1462                        String::new()
1463                    }
1464                };
1465
1466                if summary.is_empty() {
1467                    return Ok(Some(serde_json::to_value(&SessionDigestResult {
1468                        summary: String::new(),
1469                        generated_at: 0,
1470                        cached: false,
1471                    }).unwrap()));
1472                }
1473
1474                // Cache in block meta
1475                let generated_at = std::time::SystemTime::now()
1476                    .duration_since(std::time::UNIX_EPOCH)
1477                    .map(|d| d.as_millis() as i64)
1478                    .unwrap_or(0);
1479
1480                let mut meta_update = obj::MetaMapType::new();
1481                meta_update.insert("session:digest_summary".to_string(), json!(summary.clone()));
1482                meta_update.insert("session:digest_generated_at".to_string(), json!(generated_at));
1483                meta_update.insert("session:digest_last_line_count".to_string(), json!(current_line_count));
1484
1485                if let Err(e) = crate::server::service::update_object_meta(
1486                    &wstore,
1487                    &format!("block:{}", cmd.block_id),
1488                    &meta_update,
1489                ) {
1490                    tracing::warn!(block_id = %cmd.block_id, error = %e, "session:digest: failed to cache in meta");
1491                }
1492
1493                Ok(Some(serde_json::to_value(&SessionDigestResult {
1494                    summary,
1495                    generated_at,
1496                    cached: false,
1497                }).unwrap()))
1498            })
1499        }),
1500    );
1501}
1502
1503/// Extract meaningful text from raw stream-json lines for digest summarization.
1504/// Skips system/result events and raw stream_event deltas; extracts assistant text
1505/// and tool call summaries.
1506fn extract_digest_text(lines: &[&str]) -> String {
1507    let mut parts: Vec<String> = Vec::new();
1508
1509    for line in lines {
1510        let Ok(val) = serde_json::from_str::<serde_json::Value>(line) else { continue };
1511
1512        let msg_type = val.get("type").and_then(|v| v.as_str()).unwrap_or("");
1513
1514        match msg_type {
1515            "assistant" => {
1516                if let Some(content) = val.get("message")
1517                    .and_then(|m| m.get("content"))
1518                    .and_then(|c| c.as_array())
1519                {
1520                    for block in content {
1521                        let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
1522                        if btype == "text" {
1523                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1524                                let trimmed = text.trim();
1525                                if !trimmed.is_empty() {
1526                                    parts.push(format!("[assistant] {}", trimmed));
1527                                }
1528                            }
1529                        } else if btype == "tool_use" {
1530                            let tool_name = block.get("name")
1531                                .and_then(|v| v.as_str())
1532                                .unwrap_or("unknown");
1533                            parts.push(format!("[tool] {}", tool_name));
1534                        }
1535                    }
1536                }
1537            }
1538            "user" => {
1539                if let Some(content) = val.get("message")
1540                    .and_then(|m| m.get("content"))
1541                    .and_then(|c| c.as_array())
1542                {
1543                    for block in content {
1544                        let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
1545                        if btype == "tool_result" {
1546                            let is_error = block.get("is_error")
1547                                .and_then(|v| v.as_bool())
1548                                .unwrap_or(false);
1549                            if is_error {
1550                                let err_text = block.get("content")
1551                                    .and_then(|c| c.as_str())
1552                                    .unwrap_or("(error)")
1553                                    .chars().take(120).collect::<String>();
1554                                parts.push(format!("[error] {}", err_text));
1555                            }
1556                        } else if btype == "text" {
1557                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1558                                let trimmed = text.trim();
1559                                if !trimmed.is_empty() {
1560                                    parts.push(format!("[user] {}", trimmed));
1561                                }
1562                            }
1563                        }
1564                    }
1565                }
1566            }
1567            "result" => {
1568                if let Some(cost) = val.get("total_cost_usd").and_then(|v| v.as_f64()) {
1569                    if let Some(turns) = val.get("num_turns").and_then(|v| v.as_u64()) {
1570                        parts.push(format!("[summary] {} turns, ${:.4} total cost", turns, cost));
1571                    }
1572                }
1573            }
1574            // Skip: system, stream_event (deltas), rate_limit_event
1575            _ => {}
1576        }
1577    }
1578
1579    parts.join("\n")
1580}
1581
1582/// Invoke the Claude CLI with a prompt and extract the text response.
1583/// Uses `-p --output-format stream-json --verbose` (non-interactive mode).
1584async fn invoke_cli_for_digest(
1585    cli_path: &str,
1586    prompt: &str,
1587    meta: &obj::MetaMapType,
1588) -> Result<String, String> {
1589    // Inherit auth env from block meta (CLAUDE_CONFIG_DIR, etc.)
1590    let auth_env: std::collections::HashMap<String, String> = match meta.get("cmd:env") {
1591        Some(serde_json::Value::Object(obj_map)) => obj_map
1592            .iter()
1593            .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1594            .collect(),
1595        _ => std::collections::HashMap::new(),
1596    };
1597
1598    // Pipe the prompt via stdin rather than passing it as a CLI arg — Linux
1599    // caps individual argv entries at MAX_ARG_STRLEN (~128 KB), and a digest
1600    // over 200 lines of session content can easily exceed that.
1601    // `kill_on_drop(true)` ensures the child is terminated if the timeout
1602    // future below is dropped — tokio `Child` does NOT kill on drop by default.
1603    let mut child = crate::server::cli_handlers::make_cli_cmd(cli_path)
1604        .args(["-p", "--output-format", "stream-json", "--verbose"])
1605        .envs(&auth_env)
1606        .stdin(std::process::Stdio::piped())
1607        .stdout(std::process::Stdio::piped())
1608        .stderr(std::process::Stdio::null())
1609        .kill_on_drop(true)
1610        .spawn()
1611        .map_err(|e| format!("failed to spawn digest CLI: {e}"))?;
1612
1613    if let Some(mut stdin) = child.stdin.take() {
1614        use tokio::io::AsyncWriteExt;
1615        stdin
1616            .write_all(prompt.as_bytes())
1617            .await
1618            .map_err(|e| format!("digest CLI stdin write: {e}"))?;
1619        stdin
1620            .shutdown()
1621            .await
1622            .map_err(|e| format!("digest CLI stdin shutdown: {e}"))?;
1623    }
1624
1625    let output = tokio::time::timeout(
1626        std::time::Duration::from_secs(60),
1627        child.wait_with_output(),
1628    )
1629    .await
1630    .map_err(|_| "digest CLI timed out after 60s".to_string())?
1631    .map_err(|e| format!("digest CLI wait: {e}"))?;
1632
1633    if !output.status.success() {
1634        return Err(format!("digest CLI exited with status {}", output.status));
1635    }
1636
1637    // Parse stream-json output — capture the last assistant text block
1638    let stdout = String::from_utf8_lossy(&output.stdout);
1639    let mut last_text = String::new();
1640
1641    for line in stdout.lines() {
1642        let Ok(val) = serde_json::from_str::<serde_json::Value>(line) else { continue };
1643        let msg_type = val.get("type").and_then(|v| v.as_str()).unwrap_or("");
1644        if msg_type == "assistant" {
1645            if let Some(content) = val.get("message")
1646                .and_then(|m| m.get("content"))
1647                .and_then(|c| c.as_array())
1648            {
1649                for block in content {
1650                    if block.get("type").and_then(|v| v.as_str()) == Some("text") {
1651                        if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1652                            last_text = text.trim().to_string();
1653                        }
1654                    }
1655                }
1656            }
1657        }
1658    }
1659
1660    if last_text.is_empty() {
1661        return Err("no text content in digest CLI response".to_string());
1662    }
1663
1664    Ok(last_text)
1665}
1666
1667// ---------------------------------------------------------------------------
1668// Helpers
1669// ---------------------------------------------------------------------------
1670
1671/// Resolve a tab ID: use the provided one, or fall back to the first workspace's active tab.
1672fn resolve_tab_id(wstore: &WaveStore, explicit: Option<&str>) -> Result<String, String> {
1673    if let Some(tid) = explicit {
1674        return Ok(tid.to_string());
1675    }
1676
1677    // Fall back to first workspace's active tab
1678    let workspaces: Vec<Workspace> = wstore.get_all::<Workspace>()
1679        .map_err(|e| format!("agent.open: list workspaces: {e}"))?;
1680
1681    for ws in &workspaces {
1682        if !ws.activetabid.is_empty() {
1683            return Ok(ws.activetabid.clone());
1684        }
1685        if let Some(first_tab) = ws.tabids.first() {
1686            return Ok(first_tab.clone());
1687        }
1688    }
1689
1690    Err("no tabs found in any workspace".to_string())
1691}
1692
1693/// Find an existing agent block in a tab by agent ID.
1694fn find_agent_block(wstore: &WaveStore, tab_id: &str, agent_id: &str) -> Result<Option<Block>, String> {
1695    let tab: Tab = wstore.must_get(tab_id)
1696        .map_err(|e| format!("TAB_NOT_FOUND: {e}"))?;
1697
1698    for block_id in &tab.blockids {
1699        if let Ok(Some(block)) = wstore.get::<Block>(block_id) {
1700            let block_agent_id = obj::meta_get_string(&block.meta, "agentId", "");
1701            if block_agent_id == agent_id {
1702                return Ok(Some(block));
1703            }
1704        }
1705    }
1706    Ok(None)
1707}
1708
1709/// Atomically allocate an agent working directory.
1710///
1711/// Tries to atomically create `desired` via `std::fs::create_dir`. If
1712/// that fails because the directory already exists, tries `<desired>-1`,
1713/// `<desired>-2`, …, up to `-99`. The atomic `create_dir` (NOT
1714/// `create_dir_all` for the leaf) is the reservation mechanism: two
1715/// concurrent callers competing for the same path race on the OS
1716/// `mkdir` syscall and one wins; the loser sees `AlreadyExists` and
1717/// moves on.
1718///
1719/// Caller is responsible for distinguishing auto-generated paths from
1720/// user-specified ones — this function rewrites the path on collision,
1721/// which would clobber a user's intent if they pointed an agent at
1722/// `~/projects/myrepo` and that already had a `CLAUDE.md`.
1723pub fn allocate_agent_workdir(desired: &str) -> Result<String, String> {
1724    let p = std::path::Path::new(desired);
1725    if let Some(parent) = p.parent() {
1726        if !parent.as_os_str().is_empty() {
1727            std::fs::create_dir_all(parent)
1728                .map_err(|e| format!("allocate_agent_workdir: parent {}: {e}", parent.display()))?;
1729        }
1730    }
1731    match std::fs::create_dir(p) {
1732        Ok(()) => return Ok(desired.to_string()),
1733        Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
1734        Err(e) => return Err(format!("allocate_agent_workdir: create_dir({}): {e}", desired)),
1735    }
1736    for n in 1..=99u32 {
1737        let candidate = format!("{desired}-{n}");
1738        match std::fs::create_dir(std::path::Path::new(&candidate)) {
1739            Ok(()) => return Ok(candidate),
1740            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue,
1741            Err(e) => return Err(format!("allocate_agent_workdir: create_dir({candidate}): {e}")),
1742        }
1743    }
1744    Err(format!(
1745        "allocate_agent_workdir: too many collisions (>99) under {desired}-N — clean up old runs"
1746    ))
1747}
1748
1749/// Write agent config files (CLAUDE.md, .mcp.json, etc.) to the working directory.
1750fn write_agent_config_files(
1751    wstore: &WaveStore,
1752    agent: &crate::backend::storage::AgentDefinition,
1753    agent_slug: &str,
1754    work_dir: &str,
1755) -> Result<(), String> {
1756    // Load agent content and skills
1757    let contents = wstore.agent_content_get_all(&agent.id)
1758        .unwrap_or_default();
1759    let skills = wstore.agent_skill_list(&agent.id)
1760        .unwrap_or_default();
1761
1762    let mut content_map = std::collections::HashMap::new();
1763    for fc in &contents {
1764        content_map.insert(fc.content_type.clone(), fc.content.clone());
1765    }
1766
1767    let config_files = crate::backend::agent_config::build_config_files(
1768        &content_map,
1769        &skills,
1770        &agent.name,
1771        &agent.id,
1772    );
1773
1774    // Expand ~ in work_dir
1775    let expanded_dir = if work_dir.starts_with("~/") || work_dir == "~" {
1776        if let Ok(home) = std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
1777            format!("{}/{}", home, work_dir.trim_start_matches("~/"))
1778        } else {
1779            work_dir.to_string()
1780        }
1781    } else {
1782        work_dir.to_string()
1783    };
1784
1785    let base_path = std::path::Path::new(&expanded_dir);
1786    if !base_path.exists() {
1787        std::fs::create_dir_all(base_path)
1788            .map_err(|e| format!("failed to create working dir: {e}"))?;
1789    }
1790
1791    for file in &config_files {
1792        let file_path = base_path.join(&file.filename);
1793        if let Some(parent) = file_path.parent() {
1794            if !parent.exists() {
1795                let _ = std::fs::create_dir_all(parent);
1796            }
1797        }
1798        std::fs::write(&file_path, &file.content)
1799            .map_err(|e| format!("failed to write {}: {e}", file.filename))?;
1800    }
1801
1802    tracing::info!(
1803        agent_id = %agent.id,
1804        work_dir = %expanded_dir,
1805        file_count = config_files.len(),
1806        "agent.open: wrote config files"
1807    );
1808
1809    Ok(())
1810}
1811