agentmux_srv\backend\blockcontroller/
shell.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! ShellController: manages lifecycle of shell and command blocks.
5//! Port of Go's pkg/blockcontroller/shellcontroller.go.
6//!
7//! State machine:
8//!   INIT ─(start)─> RUNNING ─(exit/stop)─> DONE
9//!   DONE ─(resync+force)─> RUNNING
10//!
11//! I/O model (3 async tasks when running):
12//! 1. PTY read loop: process stdout → FileStore + WPS event
13//! 2. Input loop: input channel → process stdin
14//! 3. Wait loop: monitor process exit, update status
15
16
17use std::io::Read as _;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23#[cfg(unix)]
24use libc;
25
26use base64::Engine as _;
27use portable_pty::{native_pty_system, CommandBuilder, PtySize};
28use tokio::sync::mpsc;
29
30use super::{
31    BlockControllerRuntimeStatus, BlockInputUnion, Controller, META_KEY_CMD, META_KEY_CMD_ARGS,
32    META_KEY_CMD_CLEAR_ON_START, META_KEY_CMD_CLOSE_ON_EXIT, META_KEY_CMD_CLOSE_ON_EXIT_DELAY,
33    META_KEY_CMD_CLOSE_ON_EXIT_FORCE, META_KEY_CMD_ENV, META_KEY_CMD_RUN_ONCE,
34    META_KEY_CMD_RUN_ON_START, META_KEY_CONNECTION, STATUS_DONE, STATUS_INIT, STATUS_RUNNING,
35};
36use crate::backend::eventbus::EventBus;
37use crate::backend::shellexec::{ConnInterface, ShellProc};
38use crate::backend::storage::filestore::FileStore;
39use crate::backend::storage::wstore::WaveStore;
40use crate::backend::obj::{self, MetaMapType};
41use crate::backend::wps;
42
43/// Channel buffer size for shell input (matches Go's 32).
44const SHELL_INPUT_CH_SIZE: usize = 32;
45
46/// Detect the best available interactive shell on Windows.
47///
48/// Mirrors the original Go logic from pkg/util/shellutil/shellutil.go DetectLocalShellPath():
49///   1. Try `pwsh`  (PowerShell 7 — cross-platform)
50///   2. Try `powershell` (Windows PowerShell 5.x)
51///   3. Fall back to `cmd.exe`
52#[cfg(windows)]
53fn detect_local_shell_path_windows() -> String {
54    use std::os::windows::process::CommandExt;
55    use std::process::Command;
56    const CREATE_NO_WINDOW: u32 = 0x08000000;
57    // Try pwsh (PowerShell 7)
58    if Command::new("where")
59        .arg("pwsh")
60        .creation_flags(CREATE_NO_WINDOW)
61        .output()
62        .map(|o| o.status.success())
63        .unwrap_or(false)
64    {
65        return "pwsh".to_string();
66    }
67    // Try powershell (Windows PowerShell 5.x)
68    if Command::new("where")
69        .arg("powershell")
70        .creation_flags(CREATE_NO_WINDOW)
71        .output()
72        .map(|o| o.status.success())
73        .unwrap_or(false)
74    {
75        return "powershell".to_string();
76    }
77    "cmd.exe".to_string()
78}
79
80/// Stub for non-Windows builds (never called due to cfg!(windows) guard).
81#[cfg(not(windows))]
82fn detect_local_shell_path_windows() -> String {
83    "cmd.exe".to_string()
84}
85
86/// PTY read buffer size (matches Go's 4096).
87const PTY_READ_BUF_SIZE: usize = 4096;
88
89/// Inner state protected by mutex.
90/// Grace period (seconds) between SIGTERM and SIGKILL during stop().
91#[allow(dead_code)]
92const KILL_GRACE_SECS: u64 = 5;
93
94struct ShellControllerInner {
95    /// Current process status.
96    proc_status: String,
97    /// Process exit code.
98    proc_exit_code: i32,
99    /// Status version counter (incremented on each change).
100    status_version: i32,
101    /// Connection name for the shell process.
102    conn_name: String,
103    /// Input channel sender (sends to the PTY input loop).
104    input_tx: Option<mpsc::Sender<BlockInputUnion>>,
105    /// Input channel receiver (consumed by the PTY input loop).
106    #[allow(dead_code)]
107    input_rx: Option<mpsc::Receiver<BlockInputUnion>>,
108    /// OS PID of the running child process, kept for signal delivery in stop().
109    child_pid: Option<u32>,
110    /// Unix timestamp (ms) when the process was spawned; None until first spawn.
111    spawn_ts_ms: Option<i64>,
112    /// Monotonic instant of the most recent PTY read; None until first output.
113    last_pty_output: Option<Instant>,
114    /// True if this pane is running an agent CLI (e.g. claude).
115    is_agent_pane: bool,
116    /// Next expected input seq number (per-TermViewModel monotonic counter).
117    input_seq_next: u64,
118    /// Out-of-order input packets waiting for their seq slot (capped at SHELL_INPUT_CH_SIZE).
119    input_seq_buf: std::collections::BTreeMap<u64, BlockInputUnion>,
120}
121
122/// Factory function type for creating ConnInterface instances.
123/// This allows dependency injection for testing.
124pub type ConnFactory =
125    Box<dyn Fn(&str, &MetaMapType) -> Result<Box<dyn ConnInterface>, String> + Send + Sync>;
126
127/// ShellController manages one shell or command block.
128pub struct ShellController {
129    /// Controller type: "shell" or "cmd".
130    controller_type: String,
131    /// Parent tab UUID.
132    tab_id: String,
133    /// Block UUID.
134    block_id: String,
135    /// Prevents concurrent run() calls.
136    run_lock: Arc<AtomicBool>,
137    /// Protected inner state.
138    inner: Arc<Mutex<ShellControllerInner>>,
139    /// Optional factory for creating ConnInterface (for testing).
140    conn_factory: Mutex<Option<ConnFactory>>,
141    /// WPS broker for publishing events (blockfile, controllerstatus).
142    broker: Option<Arc<wps::Broker>>,
143    /// Event bus (unused for now, reserved for future event routing).
144    #[allow(dead_code)]
145    event_bus: Option<Arc<EventBus>>,
146    /// Wave object store — used to seed cmd:cwd on shell spawn.
147    wstore: Option<Arc<WaveStore>>,
148}
149
150impl ShellController {
151    /// Create a new ShellController.
152    pub fn new(
153        controller_type: String,
154        tab_id: String,
155        block_id: String,
156        broker: Option<Arc<wps::Broker>>,
157        event_bus: Option<Arc<EventBus>>,
158        wstore: Option<Arc<WaveStore>>,
159    ) -> Self {
160        Self {
161            controller_type,
162            tab_id,
163            block_id,
164            run_lock: Arc::new(AtomicBool::new(false)),
165            inner: Arc::new(Mutex::new(ShellControllerInner {
166                proc_status: STATUS_INIT.to_string(),
167                proc_exit_code: 0,
168                status_version: 0,
169                conn_name: String::new(),
170                input_tx: None,
171                input_rx: None,
172                child_pid: None,
173                spawn_ts_ms: None,
174                last_pty_output: None,
175                is_agent_pane: false,
176                input_seq_next: 0,
177                input_seq_buf: std::collections::BTreeMap::new(),
178            })),
179            conn_factory: Mutex::new(None),
180            broker,
181            event_bus,
182            wstore,
183        }
184    }
185
186    /// Set a custom ConnInterface factory (for testing).
187    #[allow(dead_code)]
188    pub fn set_conn_factory(&self, factory: ConnFactory) {
189        *self.conn_factory.lock().unwrap() = Some(factory);
190    }
191
192    /// Try to acquire the run lock. Returns false if already running.
193    fn try_lock_run(&self) -> bool {
194        self.run_lock
195            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
196            .is_ok()
197    }
198
199    /// Release the run lock.
200    fn unlock_run(&self) {
201        self.run_lock.store(false, Ordering::SeqCst);
202    }
203
204    /// Update process status and increment version (must hold inner lock).
205    fn set_status(inner: &mut ShellControllerInner, status: &str) {
206        inner.proc_status = status.to_string();
207        inner.status_version += 1;
208    }
209
210    /// Get the runtime status (snapshot).
211    fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
212        let inner = self.inner.lock().unwrap();
213        BlockControllerRuntimeStatus {
214            blockid: self.block_id.clone(),
215            version: inner.status_version,
216            shellprocstatus: inner.proc_status.clone(),
217            shellprocconnname: inner.conn_name.clone(),
218            shellprocexitcode: inner.proc_exit_code,
219            spawn_ts_ms: inner.spawn_ts_ms,
220            is_agent_pane: inner.is_agent_pane,
221        }
222    }
223
224    /// Seconds since last PTY output, or None if no output yet.
225    pub fn last_output_secs_ago(&self) -> Option<u64> {
226        self.inner.lock().unwrap().last_pty_output.map(|t| t.elapsed().as_secs())
227    }
228
229    /// True if this pane is running an agent CLI.
230    #[allow(dead_code)]
231    pub fn is_agent_pane(&self) -> bool {
232        self.inner.lock().unwrap().is_agent_pane
233    }
234
235    /// Check block meta for whether to run on start.
236    fn should_run_on_start(meta: &MetaMapType) -> bool {
237        obj::meta_get_bool(meta, META_KEY_CMD_RUN_ON_START, true)
238    }
239
240    /// Check block meta for run-once mode (used in full lifecycle integration).
241    #[allow(dead_code)]
242    fn should_run_once(meta: &MetaMapType) -> bool {
243        obj::meta_get_bool(meta, META_KEY_CMD_RUN_ONCE, false)
244    }
245
246    /// Check block meta for clear-on-start (used in full lifecycle integration).
247    #[allow(dead_code)]
248    fn should_clear_on_start(meta: &MetaMapType) -> bool {
249        obj::meta_get_bool(meta, META_KEY_CMD_CLEAR_ON_START, false)
250    }
251
252    /// Check block meta for close-on-exit (used in full lifecycle integration).
253    #[allow(dead_code)]
254    fn should_close_on_exit(meta: &MetaMapType) -> bool {
255        obj::meta_get_bool(meta, META_KEY_CMD_CLOSE_ON_EXIT, false)
256    }
257
258    /// Check block meta for force close-on-exit (used in full lifecycle integration).
259    #[allow(dead_code)]
260    fn should_close_on_exit_force(meta: &MetaMapType) -> bool {
261        obj::meta_get_bool(meta, META_KEY_CMD_CLOSE_ON_EXIT_FORCE, false)
262    }
263
264    /// Get the close-on-exit delay in ms (defaults to 2000, used in full lifecycle integration).
265    #[allow(dead_code)]
266    fn close_on_exit_delay_ms(meta: &MetaMapType) -> u64 {
267        match meta.get(META_KEY_CMD_CLOSE_ON_EXIT_DELAY) {
268            Some(serde_json::Value::Number(n)) => n.as_u64().unwrap_or(2000),
269            _ => 2000,
270        }
271    }
272
273    /// Get the connection name from block meta.
274    fn get_conn_name(meta: &MetaMapType) -> String {
275        obj::meta_get_string(meta, META_KEY_CONNECTION, "local")
276    }
277
278    /// Get the command string from block meta.
279    fn get_cmd_str(meta: &MetaMapType) -> String {
280        obj::meta_get_string(meta, META_KEY_CMD, "")
281    }
282
283    /// Get cmd:args array from block meta.
284    fn get_cmd_args(meta: &MetaMapType) -> Vec<String> {
285        match meta.get(META_KEY_CMD_ARGS) {
286            Some(serde_json::Value::Array(arr)) => arr
287                .iter()
288                .filter_map(|v| v.as_str().map(|s| s.to_string()))
289                .collect(),
290            _ => vec![],
291        }
292    }
293
294    /// Check if cmd:interactive is set in block meta.
295    fn is_interactive(meta: &MetaMapType) -> bool {
296        obj::meta_get_bool(meta, "cmd:interactive", false)
297    }
298
299    /// Publish current controller status via the WPS broker.
300    fn publish_status(&self) {
301        if let Some(ref broker) = self.broker {
302            let status = self.get_status_snapshot();
303            super::publish_controller_status(broker, &status);
304        }
305    }
306}
307
308impl Controller for ShellController {
309    fn start(
310        &self,
311        block_meta: MetaMapType,
312        _rt_opts: Option<serde_json::Value>,
313        force: bool,
314    ) -> Result<(), String> {
315        let cmd_str_preview = Self::get_cmd_str(&block_meta);
316        let interactive_preview = Self::is_interactive(&block_meta);
317        tracing::info!(
318            block_id = %self.block_id,
319            controller = %self.controller_type,
320            cmd = %cmd_str_preview,
321            interactive = interactive_preview,
322            force = force,
323            "block start requested"
324        );
325
326        // Check if we should run
327        if !force && !Self::should_run_on_start(&block_meta) {
328            tracing::info!(block_id = %self.block_id, "skipping start: run_on_start is false");
329            return Ok(());
330        }
331
332        // Try to acquire run lock
333        if !self.try_lock_run() {
334            return Err("controller is already running".to_string());
335        }
336
337        // Get connection info
338        let conn_name = Self::get_conn_name(&block_meta);
339
340        // Update status to running and publish
341        {
342            let mut inner = self.inner.lock().unwrap();
343            Self::set_status(&mut inner, STATUS_RUNNING);
344            inner.conn_name = conn_name.clone();
345        }
346        self.publish_status();
347
348        // Create input channel
349        let (input_tx, input_rx) = mpsc::channel(SHELL_INPUT_CH_SIZE);
350        {
351            let mut inner = self.inner.lock().unwrap();
352            inner.input_tx = Some(input_tx);
353            inner.input_seq_next = 0;
354            inner.input_seq_buf.clear();
355        }
356
357        // Check if we have a conn_factory (test/mock path)
358        let has_factory = self.conn_factory.lock().unwrap().is_some();
359
360        if has_factory {
361            // Mock path: use ConnInterface factory (synchronous, for tests)
362            let conn_result = {
363                let factory = self.conn_factory.lock().unwrap();
364                factory.as_ref().unwrap()(&conn_name, &block_meta)
365            };
366
367            let mut conn = match conn_result {
368                Ok(c) => c,
369                Err(e) => {
370                    let mut inner = self.inner.lock().unwrap();
371                    Self::set_status(&mut inner, STATUS_DONE);
372                    inner.proc_exit_code = -1;
373                    inner.input_tx = None;
374                    self.unlock_run();
375                    return Err(format!("failed to create connection: {e}"));
376                }
377            };
378
379            if let Err(e) = conn.start() {
380                let mut inner = self.inner.lock().unwrap();
381                Self::set_status(&mut inner, STATUS_DONE);
382                inner.proc_exit_code = -1;
383                inner.input_tx = None;
384                self.unlock_run();
385                return Err(format!("failed to start process: {e}"));
386            }
387
388            let mut shell_proc = ShellProc::new(conn_name, conn);
389            let _done_rx = shell_proc.take_done_rx();
390            let exit_code = shell_proc.wait_and_signal();
391
392            {
393                let mut inner = self.inner.lock().unwrap();
394                inner.proc_exit_code = exit_code;
395                Self::set_status(&mut inner, STATUS_DONE);
396                inner.input_tx = None;
397            }
398            self.publish_status();
399            self.unlock_run();
400            return Ok(());
401        }
402
403        // Real PTY path.
404        //
405        // Initial cols bumped to 200 (was 80) so the agent-pane live-log
406        // doesn't render hard-wrapped at ~80 chars before the frontend's
407        // dynamic resize lands (see hooks/usePtyWidth.ts and
408        // docs/analysis/AGENT_PANE_PTY_WRAP_2026_05_23.md). The dynamic
409        // resize coalesces over a ~150 ms debounce after mount, so the
410        // PTY uses this default for the very first batch of output.
411        let pty_system = native_pty_system();
412        let pty_size = PtySize {
413            rows: 25,
414            cols: 200,
415            pixel_width: 0,
416            pixel_height: 0,
417        };
418
419        let pair = pty_system.openpty(pty_size).map_err(|e| {
420            tracing::error!(block_id = %self.block_id, error = %e, "failed to open PTY");
421            let mut inner = self.inner.lock().unwrap();
422            Self::set_status(&mut inner, STATUS_DONE);
423            inner.proc_exit_code = -1;
424            inner.input_tx = None;
425            self.unlock_run();
426            format!("failed to open PTY: {e}")
427        })?;
428        tracing::info!(block_id = %self.block_id, rows = 25, cols = 200, "PTY opened");
429
430        // Determine shell command
431        let cmd_str = Self::get_cmd_str(&block_meta);
432        let cmd_args = Self::get_cmd_args(&block_meta);
433        let interactive = Self::is_interactive(&block_meta);
434
435        // Resolve effective AGENTMUX_AGENT_ID for jekt auto-registration.
436        // Priority: block metadata > global settings > WAVEMUX_AGENT_ID env compat.
437        let agent_id_for_jekt: Option<String> = block_meta
438            .get(META_KEY_CMD_ENV)
439            .and_then(|m| m.as_object())
440            .and_then(|obj| obj.get("AGENTMUX_AGENT_ID"))
441            .and_then(|v| v.as_str())
442            .map(|s| s.to_string())
443            .or_else(|| {
444                let cfg = crate::backend::wconfig::ConfigWatcher::with_config(
445                    crate::backend::wconfig::build_default_config(),
446                );
447                cfg.get_settings().cmd_env.get("AGENTMUX_AGENT_ID").cloned()
448            })
449            .or_else(|| std::env::var("WAVEMUX_AGENT_ID").ok());
450
451        let mut cmd = if !cmd_str.is_empty() && (!cmd_args.is_empty() || interactive) {
452            // Direct spawn: cmd:args provided or cmd:interactive set.
453            // Spawn the CLI directly (no sh -c wrapper) so args are passed correctly.
454            tracing::info!(block_id = %self.block_id, cmd = %cmd_str, args = ?cmd_args, "direct spawn path");
455            let mut c = CommandBuilder::new(&cmd_str);
456            if !cmd_args.is_empty() {
457                let arg_refs: Vec<&str> = cmd_args.iter().map(|s| s.as_str()).collect();
458                c.args(arg_refs);
459            }
460            c
461        } else if !cmd_str.is_empty() {
462            // "cmd" controller: run a specific command string via shell wrapper
463            tracing::info!(block_id = %self.block_id, cmd = %cmd_str, "shell-wrapped spawn path");
464            if cfg!(windows) {
465                let mut c = CommandBuilder::new("cmd.exe");
466                c.args(["/C", &cmd_str]);
467                c
468            } else {
469                let mut c = CommandBuilder::new("/bin/sh");
470                c.args(["-c", &cmd_str]);
471                c
472            }
473        } else {
474            // "shell" controller: interactive shell with AgentMux integration
475            // On Windows: prefer pwsh (PowerShell 7), fall back to powershell.exe (5.x), then cmd.exe
476            let shell_path = if cfg!(windows) {
477                detect_local_shell_path_windows()
478            } else {
479                std::env::var("SHELL").unwrap_or_else(|_| "/bin/bash".to_string())
480            };
481
482            let shell_type = crate::backend::shellintegration::detect_shell_type(&shell_path);
483
484            // Deploy shell integration scripts to ~/.agentmux/ (the user's home-based
485            // data dir) instead of AGENTMUX_DATA_HOME.  MSIX packages virtualise writes
486            // to %LocalAppData%, so files written by the packaged backend aren't visible
487            // to child processes (pwsh, bash, etc.) spawned via ConPTY.  The home dir is
488            // never virtualised, so the scripts are always reachable at their literal path.
489            let shell_home = crate::backend::base::get_home_dir().join(".agentmux");
490            crate::backend::shellintegration::deploy_scripts(&shell_home);
491
492            tracing::info!(block_id = %self.block_id, shell = %shell_path, shell_type = ?shell_type, "interactive shell path");
493
494            let mut c = CommandBuilder::new(&shell_path);
495
496            // Apply shell-specific startup args (--rcfile, -File, etc.)
497            if let Some(startup) = crate::backend::shellintegration::get_shell_startup(shell_type, &shell_home) {
498                for arg in &startup.extra_args {
499                    c.arg(arg);
500                }
501                for (k, v) in &startup.env_vars {
502                    c.env(k, v);
503                }
504            }
505
506            // Inject terminal capability env vars into the PTY environment.
507            // ConPTY on Windows fully supports VT/ANSI sequences, so set TERM
508            // on all platforms. Without this, CLI tools (e.g. Claude Code) use
509            // different Unicode width tables, causing ANSI color offset on Windows.
510            c.env("TERM", "xterm-256color");
511            c.env("COLORTERM", "truecolor");
512            c.env("TERM_PROGRAM", "agentmux");
513            c.env("AGENTMUX_BLOCKID", &self.block_id);
514            c.env("AGENTMUX_TABID", &self.tab_id);
515            c.env("AGENTMUX_VERSION", env!("CARGO_PKG_VERSION"));
516
517            // Inject log directory so agents can find logs without guessing.
518            // Always ~/.agentmux/logs/ — matches both host and sidecar.
519            let log_dir = dirs::home_dir()
520                .unwrap_or_default()
521                .join(".agentmux")
522                .join("logs");
523            c.env("AGENTMUX_LOG_DIR", log_dir.to_string_lossy().as_ref());
524
525            // Propagate local backend URL so agentbus-client prefers local PTY delivery.
526            // Set by main.rs after binding; absent in test/mock contexts (graceful no-op).
527            if let Ok(local_url) = std::env::var("AGENTMUX_LOCAL_URL") {
528                c.env("AGENTMUX_LOCAL_URL", &local_url);
529            }
530
531            // AGENTMUX is a plain "1" sentinel — wsh has been retired.
532            // Shell integrations check for the presence of AGENTMUX but no
533            // longer prepend a path to $PATH based on its value.
534            // See specs/SPEC_RETIRE_WSH_2026_04_12.md.
535            c.env("AGENTMUX", "1");
536
537            // Append AgentMux-managed tool dirs to PATH so agents can use
538            // jq, rg, etc. without the host having them installed system-wide.
539            // Appended (not prepended) so system PATH always wins.
540            {
541                let sep = if cfg!(windows) { ";" } else { ":" };
542                let current_path = std::env::var("PATH").unwrap_or_default();
543                let mut extra: Vec<String> = Vec::new();
544
545                // User-managed store (~/.agentmux/tools/bin/)
546                if let Some(user_bin) = crate::backend::tool_store::user_tools_dir() {
547                    if user_bin.exists() {
548                        extra.push(user_bin.to_string_lossy().into_owned());
549                    }
550                }
551
552                // Bundled store (ships with the app, e.g. runtime/tools/bin/)
553                if let Some(bundled_bin) = crate::backend::tool_store::bundled_tools_dir() {
554                    if bundled_bin.exists() {
555                        extra.push(bundled_bin.to_string_lossy().into_owned());
556                    }
557                }
558
559                if !extra.is_empty() {
560                    c.env("PATH", format!("{current_path}{sep}{}", extra.join(sep)));
561                }
562            }
563
564            // Inject cmd:env from wconfig settings and block metadata.
565            // Track whether AGENTMUX_AGENT_ID is explicitly set so we know
566            // whether to apply the backward-compat WAVEMUX bridge.
567            let mut has_agent_id = false;
568
569            // Settings (global defaults, lowest priority)
570            let config = crate::backend::wconfig::ConfigWatcher::with_config(
571                crate::backend::wconfig::build_default_config(),
572            );
573            let settings = config.get_settings();
574            for (k, v) in &settings.cmd_env {
575                if k == "AGENTMUX_AGENT_ID" {
576                    has_agent_id = true;
577                }
578                let expanded = crate::backend::base::expand_home_dir_safe(v);
579                c.env(k, expanded.to_string_lossy().as_ref());
580            }
581
582            // Block metadata (per-block overrides, highest priority)
583            if let Some(env_map) = block_meta.get(META_KEY_CMD_ENV) {
584                if let Some(obj) = env_map.as_object() {
585                    for (k, v) in obj {
586                        if let Some(val) = v.as_str() {
587                            if k == "AGENTMUX_AGENT_ID" {
588                                has_agent_id = true;
589                            }
590                            let expanded = crate::backend::base::expand_home_dir_safe(val);
591                            c.env(k, expanded.to_string_lossy().as_ref());
592                        }
593                    }
594                }
595            }
596
597            // Strip host-inherited agent identity unless explicitly configured
598            // in settings.cmd_env or block cmd:env metadata.
599            // This also supersedes the old WAVEMUX backward-compat bridge —
600            // both AGENTMUX_* and WAVEMUX_* vars are removed so new panes
601            // start as plain "Terminal".
602            if !has_agent_id {
603                c.env_remove("AGENTMUX_AGENT_ID");
604                c.env_remove("AGENTMUX_AGENT_COLOR");
605                c.env_remove("AGENTMUX_AGENT_TEXT_COLOR");
606                c.env_remove("WAVEMUX_AGENT_ID");
607                c.env_remove("WAVEMUX_AGENT_COLOR");
608            }
609
610            c
611        };
612
613        // Set working directory if specified
614        let cwd = obj::meta_get_string(&block_meta, super::META_KEY_CMD_CWD, "");
615        if !cwd.is_empty() {
616            cmd.cwd(&cwd);
617        }
618
619        let mut child = pair.slave.spawn_command(cmd).map_err(|e| {
620            tracing::error!(block_id = %self.block_id, error = %e, cmd = %cmd_str, "spawn failed");
621            let mut inner = self.inner.lock().unwrap();
622            Self::set_status(&mut inner, STATUS_DONE);
623            inner.proc_exit_code = -1;
624            inner.input_tx = None;
625            self.unlock_run();
626            format!("failed to spawn command: {e}")
627        })?;
628        tracing::info!(block_id = %self.block_id, "process spawned successfully");
629
630        // Detect agent pane: cmd contains a known agent CLI or has AGENTMUX_AGENT_ID set.
631        let is_agent = agent_id_for_jekt.is_some()
632            || cmd_str.to_lowercase().contains("claude")
633            || cmd_str.to_lowercase().contains("codex")
634            || cmd_str.to_lowercase().contains("gemini");
635
636        // Register PID and record spawn metadata.
637        let spawn_ts_ms = SystemTime::now()
638            .duration_since(UNIX_EPOCH)
639            .map(|d| d.as_millis() as i64)
640            .unwrap_or(0);
641        {
642            let mut inner = self.inner.lock().unwrap();
643            if let Some(pid) = child.process_id() {
644                super::pidregistry::register(&self.block_id, pid);
645                inner.child_pid = Some(pid);
646            }
647            inner.spawn_ts_ms = Some(spawn_ts_ms);
648            inner.is_agent_pane = is_agent;
649        }
650
651        // Auto-register with jekt if AGENTMUX_AGENT_ID was set in the block env.
652        // This maps agent_id → block_id in the ReactiveHandler so jekt can deliver
653        // messages directly to this PTY without a separate /agentmux/reactive/register call.
654        if let Some(ref agent_id) = agent_id_for_jekt {
655            match crate::backend::reactive::get_global_handler()
656                .register_agent(agent_id, &self.block_id, Some(&self.tab_id))
657            {
658                Ok(()) => {
659                    tracing::info!(
660                        block_id = %self.block_id,
661                        agent_id = %agent_id,
662                        "jekt: auto-registered"
663                    );
664                    // Also write to cross-instance file registry.
665                    if let Ok(local_url) = std::env::var("AGENTMUX_LOCAL_URL") {
666                        let data_dir = crate::backend::base::get_wave_data_dir();
667                        crate::backend::reactive::registry::write(
668                            &data_dir,
669                            agent_id,
670                            &local_url,
671                            &self.block_id,
672                        );
673                    }
674                }
675                Err(e) => tracing::warn!(
676                    block_id = %self.block_id,
677                    agent_id = %agent_id,
678                    error = %e,
679                    "jekt: auto-register failed"
680                ),
681            }
682        }
683        tracing::info!(
684            block_id = %self.block_id,
685            wstore_present = self.wstore.is_some(),
686            event_bus_present = self.event_bus.is_some(),
687            "[dnd-debug] pre-seed state after spawn"
688        );
689
690        // Seed cmd:cwd in block meta immediately after spawn so drag-and-drop works
691        // before the shell emits its first OSC 7 (or for shells without integration).
692        if let Some(ref store) = self.wstore {
693            let effective_cwd = if !cwd.is_empty() {
694                cwd.clone()
695            } else {
696                std::env::current_dir()
697                    .map(|p| p.to_string_lossy().to_string())
698                    .unwrap_or_default()
699            };
700            tracing::debug!(block_id = %self.block_id, cwd = %effective_cwd, "seeding cmd:cwd");
701            if !effective_cwd.is_empty() {
702                let oref_str = format!("block:{}", self.block_id);
703                let mut meta_update = MetaMapType::new();
704                meta_update.insert(
705                    super::META_KEY_CMD_CWD.to_string(),
706                    serde_json::Value::String(effective_cwd),
707                );
708                // Only set if not already populated — don't clobber a restored session CWD
709                match store.must_get::<crate::backend::obj::Block>(&self.block_id) {
710                    Ok(block) if obj::meta_get_string(&block.meta, super::META_KEY_CMD_CWD, "").is_empty() => {
711                        match crate::server::service::update_object_meta(store, &oref_str, &meta_update) {
712                            Ok(()) => {
713                                // Re-read updated block and broadcast obj:update so the
714                                // frontend Jotai atom refreshes (update_object_meta only writes
715                                // to SQLite — it does NOT send a WebSocket event on its own).
716                                if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&self.block_id) {
717                                    if let Some(ref event_bus) = self.event_bus {
718                                        let update_data = serde_json::to_value(&obj::WaveObjUpdate {
719                                            updatetype: "update".into(),
720                                            otype: "block".into(),
721                                            oid: self.block_id.clone(),
722                                            obj: Some(obj::wave_obj_to_value(&updated_block)),
723                                        }).ok();
724                                        event_bus.broadcast_event(&crate::backend::eventbus::WSEventType {
725                                            eventtype: "waveobj:update".to_string(),
726                                            oref: oref_str.clone(),
727                                            data: update_data,
728                                        });
729                                        tracing::info!(block_id = %self.block_id, "cmd:cwd seeded and broadcast to frontend");
730                                    } else {
731                                        tracing::warn!(block_id = %self.block_id, "cmd:cwd written to store but no event_bus to broadcast — frontend won't update");
732                                    }
733                                }
734                            }
735                            Err(e) => {
736                                tracing::warn!(block_id = %self.block_id, error = %e, "failed to seed cmd:cwd in store");
737                            }
738                        }
739                    }
740                    Ok(_) => {
741                        tracing::debug!(block_id = %self.block_id, "cmd:cwd already set, skipping seed");
742                    }
743                    Err(e) => {
744                        tracing::warn!(block_id = %self.block_id, error = %e, "failed to read block for cmd:cwd seed");
745                    }
746                }
747            }
748        }
749
750        // Get reader/writer from master
751        let reader = pair.master.try_clone_reader().map_err(|e| {
752            let _ = child.kill();
753            let mut inner = self.inner.lock().unwrap();
754            Self::set_status(&mut inner, STATUS_DONE);
755            inner.proc_exit_code = -1;
756            inner.input_tx = None;
757            self.unlock_run();
758            format!("failed to clone PTY reader: {e}")
759        })?;
760
761        let writer = pair.master.take_writer().map_err(|e| {
762            let _ = child.kill();
763            let mut inner = self.inner.lock().unwrap();
764            Self::set_status(&mut inner, STATUS_DONE);
765            inner.proc_exit_code = -1;
766            inner.input_tx = None;
767            self.unlock_run();
768            format!("failed to take PTY writer: {e}")
769        })?;
770
771        // Spawn PTY read task (blocking I/O → spawn_blocking)
772        let block_id_read = self.block_id.clone();
773        let broker_read = self.broker.clone();
774        let inner_read = self.inner.clone();
775        let is_agent_read = is_agent;
776        tokio::task::spawn_blocking(move || {
777            let mut reader = reader;
778            let mut buf = [0u8; PTY_READ_BUF_SIZE];
779
780            // Phase 1.5 PR 1 (additive): if this is an agent pane,
781            // also try to interpret stdout as Claude Code stream-json
782            // line-by-line, feeding successful parses through
783            // ClaudeTranslator and emitting AgentEvents on a new WPS
784            // scope `agent_event:<block_id>`. The existing raw-chunk
785            // path stays byte-equal — interactive panes (which don't
786            // emit JSON) see no behavior change because every line
787            // fails the JSON parse and is silently dropped. The
788            // future stream-json-mode pane and the drone inspector
789            // (issue #830 / Phase 1.5 PR 3) will be the first real
790            // consumers.
791            let mut translator: Option<crate::agents::translator::claude::ClaudeTranslator> =
792                if is_agent_read {
793                    Some(crate::agents::translator::claude::ClaudeTranslator::new())
794                } else {
795                    None
796                };
797            // Per-block line-buffer (raw bytes — see
798            // `extract_agent_events`). Capped to AGENT_LINE_BUFFER_CAP
799            // so a producer that never emits a newline can't grow
800            // the buffer unboundedly.
801            let mut line_buf: Vec<u8> = Vec::new();
802
803            loop {
804                match reader.read(&mut buf) {
805                    Ok(0) => break, // EOF
806                    Ok(n) => {
807                        inner_read.lock().unwrap().last_pty_output = Some(Instant::now());
808                        if let Some(ref broker) = broker_read {
809                            handle_append_block_file(
810                                broker,
811                                &block_id_read,
812                                "term",
813                                &buf[..n],
814                                None, // PTY output is raw terminal data; no FileStore write-through
815                            );
816                            if let Some(ref mut t) = translator {
817                                accumulate_and_translate(
818                                    broker,
819                                    &block_id_read,
820                                    &mut line_buf,
821                                    &buf[..n],
822                                    t,
823                                );
824                            }
825                        }
826                    }
827                    Err(e) => {
828                        tracing::debug!("PTY read error for {}: {}", block_id_read, e);
829                        break;
830                    }
831                }
832            }
833        });
834
835        // Spawn input task (routes input channel → PTY writer + resize + signals)
836        // Owns writer and master — dropping them closes the PTY, causing child to exit.
837        let master = pair.master;
838        tokio::spawn(async move {
839            let mut writer = writer;
840            let mut input_rx = input_rx;
841            while let Some(input) = input_rx.recv().await {
842                if let Some(data) = input.input_data {
843                    use std::io::Write;
844                    if let Err(e) = writer.write_all(&data) {
845                        tracing::debug!("PTY write error: {}", e);
846                        break;
847                    }
848                }
849                if let Some(ref size) = input.term_size {
850                    let pty_size = PtySize {
851                        rows: size.rows as u16,
852                        cols: size.cols as u16,
853                        pixel_width: 0,
854                        pixel_height: 0,
855                    };
856                    if let Err(e) = master.resize(pty_size) {
857                        tracing::debug!("PTY resize error: {}", e);
858                    }
859                }
860                if input.sig_name.is_some() {
861                    // Drop writer + master to close PTY, which terminates the child
862                    break;
863                }
864            }
865            // writer and master drop here → PTY closes → child gets EOF/terminates
866        });
867
868        // Spawn wait task (monitors process exit)
869        let inner_wait = Arc::clone(&self.inner);
870        let block_id_wait = self.block_id.clone();
871        let agent_id_wait = agent_id_for_jekt.clone();
872        let broker_wait = self.broker.clone();
873        let run_lock = Arc::clone(&self.run_lock);
874        tokio::task::spawn_blocking(move || {
875            let mut child = child;
876
877            // Wait for child to exit (blocking)
878            let exit_status = child.wait();
879            let exit_code = match exit_status {
880                Ok(status) => {
881                    if status.success() {
882                        0
883                    } else {
884                        // portable-pty ExitStatus doesn't expose raw code on all platforms
885                        1
886                    }
887                }
888                Err(e) => {
889                    tracing::warn!("wait error for block {}: {}", block_id_wait, e);
890                    -1
891                }
892            };
893
894            tracing::info!(block_id = %block_id_wait, exit_code = exit_code, "process exited");
895
896            // Unregister PID from per-pane metrics
897            super::pidregistry::unregister(&block_id_wait);
898
899            // Deregister from jekt — removes the agent_id → block_id mapping so
900            // subsequent jekt attempts fall back to MessageBus rather than a dead PTY.
901            crate::backend::reactive::get_global_handler().unregister_block(&block_id_wait);
902
903            // Also remove from cross-instance file registry.
904            if let Some(ref agent_id) = agent_id_wait {
905                let data_dir = crate::backend::base::get_wave_data_dir();
906                crate::backend::reactive::registry::remove(&data_dir, agent_id);
907            }
908
909            // Update inner state
910            {
911                let mut inner = inner_wait.lock().unwrap();
912                inner.proc_exit_code = exit_code;
913                ShellController::set_status(&mut inner, STATUS_DONE);
914                inner.input_tx = None;
915            }
916
917            // Publish done status
918            if let Some(ref broker) = broker_wait {
919                let status = {
920                    let inner = inner_wait.lock().unwrap();
921                    BlockControllerRuntimeStatus {
922                        blockid: block_id_wait.clone(),
923                        version: inner.status_version,
924                        shellprocstatus: inner.proc_status.clone(),
925                        shellprocconnname: inner.conn_name.clone(),
926                        shellprocexitcode: inner.proc_exit_code,
927                        spawn_ts_ms: inner.spawn_ts_ms,
928                        is_agent_pane: inner.is_agent_pane,
929                    }
930                };
931                super::publish_controller_status(broker, &status);
932            }
933
934            // Release run lock
935            run_lock.store(false, Ordering::SeqCst);
936        });
937
938        // Return immediately — PTY tasks run in background
939        Ok(())
940    }
941
942    fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
943        // Extract what we need from the lock, release it before any async work.
944        #[allow(unused_variables)] // used under #[cfg(unix)] only
945        let pid_to_kill = {
946            let mut inner = self.inner.lock().unwrap();
947            if inner.proc_status == new_status {
948                return Ok(());
949            }
950            let pid = inner.child_pid;
951            // Drop the input channel — closes PTY writer → delivers EOF/SIGHUP as
952            // belt-and-suspenders in case signal delivery fails on the platform.
953            inner.input_tx = None;
954            Self::set_status(&mut inner, new_status);
955            pid
956        };
957
958        // Send SIGTERM to the process group so that child processes spawned by
959        // the shell (e.g. `claude --dangerously-skip-permissions` and its subtree)
960        // are also signalled. Negative pid targets the whole process group.
961        // Schedule SIGKILL after KILL_GRACE_SECS as a backstop for processes
962        // that ignore or delay on SIGTERM.
963        #[cfg(unix)]
964        if let Some(pid) = pid_to_kill {
965            // SAFETY: kill() is a well-defined POSIX syscall.
966            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
967            tokio::spawn(async move {
968                tokio::time::sleep(tokio::time::Duration::from_secs(KILL_GRACE_SECS)).await;
969                unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
970            });
971        }
972
973        Ok(())
974    }
975
976    fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
977        self.get_status_snapshot()
978    }
979
980    fn send_input(&self, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String> {
981        let mut inner = self.inner.lock().unwrap();
982        let tx = match &inner.input_tx {
983            Some(tx) => tx.clone(),
984            None => return Err("controller is not running".to_string()),
985        };
986        match seq {
987            None => tx.try_send(input).map_err(|e| format!("send_input: {e}")),
988            Some(s) => {
989                // Detect session reset: seq==0 means the TermViewModel
990                // restarted and its per-block counter is back at zero.
991                // (A gap-threshold heuristic was tried and removed — a
992                // stale/duplicate packet far behind could falsely trigger
993                // a reset and replay old input.)
994                if s == 0 && inner.input_seq_next > 0 {
995                    tracing::info!(
996                        block_id = %self.block_id,
997                        prev_next = inner.input_seq_next,
998                        new_seq = s,
999                        "input seq reset (session reset detected)"
1000                    );
1001                    inner.input_seq_next = s;
1002                    inner.input_seq_buf.clear();
1003                }
1004
1005                let next = inner.input_seq_next;
1006                if s == next {
1007                    // Advance before sending — a try_send failure must not leave the
1008                    // backend stuck waiting for a seq the frontend will never resend.
1009                    inner.input_seq_next += 1;
1010                    if let Err(e) = tx.try_send(input) {
1011                        tracing::warn!(
1012                            block_id = %self.block_id,
1013                            seq = s,
1014                            "send_input: channel full, dropping in-order packet: {e}"
1015                        );
1016                        return Ok(());
1017                    }
1018                    // Drain any buffered out-of-order packets now in order.
1019                    loop {
1020                        let expected = inner.input_seq_next;
1021                        match inner.input_seq_buf.remove(&expected) {
1022                            Some(buffered) => {
1023                                inner.input_seq_next += 1;
1024                                if let Err(e) = tx.try_send(buffered) {
1025                                    tracing::warn!(
1026                                        block_id = %self.block_id,
1027                                        seq = expected,
1028                                        "send_input drain: channel full, dropping buffered packet: {e}"
1029                                    );
1030                                }
1031                            }
1032                            None => break,
1033                        }
1034                    }
1035                    Ok(())
1036                } else if s > next {
1037                    if inner.input_seq_buf.len() < SHELL_INPUT_CH_SIZE {
1038                        inner.input_seq_buf.insert(s, input);
1039                    } else {
1040                        tracing::warn!(block_id = %self.block_id, seq = s, "input reorder buffer full, dropping");
1041                    }
1042                    Ok(())
1043                } else {
1044                    tracing::warn!(block_id = %self.block_id, seq = s, next, "duplicate input seq, discarding");
1045                    Ok(())
1046                }
1047            }
1048        }
1049    }
1050
1051    fn controller_type(&self) -> &str {
1052        &self.controller_type
1053    }
1054
1055    fn block_id(&self) -> &str {
1056        &self.block_id
1057    }
1058
1059    fn as_any(&self) -> &dyn std::any::Any {
1060        self
1061    }
1062}
1063
1064// ---- File operation helpers ----
1065
1066/// Append data to a block's terminal output file, publish a WPS event,
1067/// Maximum size of the per-block line buffer used by the agent-event
1068/// translation path. Past this, the buffer is reset (a producer that
1069/// never emits a newline can't grow it unboundedly). 1 MiB is far
1070/// beyond any plausible stream-json frame size.
1071const AGENT_LINE_BUFFER_CAP: usize = 1024 * 1024;
1072
1073/// Append `chunk` to `line_buf`, drain complete lines, JSON-parse
1074/// each, and run successful parses through `translator`. Returns the
1075/// events the translator emitted. Caller is responsible for
1076/// publishing them.
1077///
1078/// Buffers RAW BYTES (not lossy-decoded strings) so a multi-byte
1079/// UTF-8 character split across two PTY reads decodes cleanly when
1080/// the complete line arrives. Decoding each chunk lossily would
1081/// insert U+FFFD into the middle of words for non-ASCII content
1082/// (CJK, emoji, accented chars), silently corrupting
1083/// `AssistantText`/`Done.response` while the parallel raw-byte WPS
1084/// path stays correct — drone consumers would have no way to
1085/// recover. Reagent P1 + codex P2 on PR #833.
1086///
1087/// Pure function — split out from `accumulate_and_translate` so the
1088/// line-buffering + JSON-fast-reject + translator-call logic is
1089/// unit-testable without spinning up a broker.
1090fn extract_agent_events(
1091    line_buf: &mut Vec<u8>,
1092    chunk: &[u8],
1093    translator: &mut crate::agents::translator::claude::ClaudeTranslator,
1094) -> Vec<crate::agents::types::AgentEvent> {
1095    use crate::agents::translator::Translator as _;
1096    let mut out: Vec<crate::agents::types::AgentEvent> = Vec::new();
1097    line_buf.extend_from_slice(chunk);
1098    if line_buf.len() > AGENT_LINE_BUFFER_CAP {
1099        // No newline in a megabyte — definitely not stream-json.
1100        // Reset to keep memory bounded.
1101        line_buf.clear();
1102        return out;
1103    }
1104    while let Some(nl) = line_buf.iter().position(|&b| b == b'\n') {
1105        let line_bytes: Vec<u8> = line_buf.drain(..=nl).collect();
1106        // Decode the COMPLETE line as lossy UTF-8 — split codepoints
1107        // are now fully present, so lossy-vs-strict only matters for
1108        // genuinely malformed bytes which would also fail strict.
1109        let line = String::from_utf8_lossy(&line_bytes);
1110        let trimmed = line.trim_end_matches(['\n', '\r']);
1111        if !trimmed.starts_with('{') {
1112            // Fast-reject: stream-json frames are JSON objects.
1113            // Skips ANSI escapes, prompts, blank lines, etc.
1114            continue;
1115        }
1116        let Ok(frame) = serde_json::from_str::<serde_json::Value>(trimmed) else {
1117            continue;
1118        };
1119        out.extend(translator.translate(frame));
1120    }
1121    out
1122}
1123
1124/// Publish the events `extract_agent_events` produced on the WPS
1125/// scope `agent_event:<block_id>`. Phase 1.5 PR 1 hook for agent
1126/// panes. Called only when `is_agent` is true at spawn time (see
1127/// read-task closure in `start()`).
1128fn accumulate_and_translate(
1129    broker: &wps::Broker,
1130    block_id: &str,
1131    line_buf: &mut Vec<u8>,
1132    chunk: &[u8],
1133    translator: &mut crate::agents::translator::claude::ClaudeTranslator,
1134) {
1135    for event in extract_agent_events(line_buf, chunk, translator) {
1136        broker.publish(wps::WaveEvent {
1137            event: format!("agent_event:{}", block_id),
1138            scopes: vec![],
1139            sender: String::new(),
1140            persist: 0,
1141            data: Some(serde_json::to_value(&event).unwrap_or_default()),
1142        });
1143    }
1144}
1145
1146/// and write-through to FileStore (if provided).
1147///
1148/// Port of Go's `HandleAppendBlockFile`.
1149///
1150/// The FileStore write is fire-and-forget: if it fails we emit a warning but
1151/// never propagate the error back to the hot stdout-reader path.
1152pub fn handle_append_block_file(
1153    broker: &wps::Broker,
1154    block_id: &str,
1155    filename: &str,
1156    data: &[u8],
1157    filestore: Option<&Arc<FileStore>>,
1158) {
1159    let data64 = base64::engine::general_purpose::STANDARD.encode(data);
1160
1161    let event_data = wps::WSFileEventData {
1162        zoneid: block_id.to_string(),
1163        filename: filename.to_string(),
1164        fileop: wps::FILE_OP_APPEND.to_string(),
1165        data64,
1166    };
1167
1168    let event = wps::WaveEvent {
1169        event: wps::EVENT_BLOCK_FILE.to_string(),
1170        scopes: vec![format!("block:{block_id}")],
1171        sender: String::new(),
1172        persist: 0,
1173        data: serde_json::to_value(&event_data).ok(),
1174    };
1175
1176    broker.publish(event);
1177
1178    // Write-through to FileStore for persistent history (Phase 1.3).
1179    // Create the file lazily on first append; if the file already exists
1180    // we skip make_file and go straight to append_data.
1181    if let Some(fs) = filestore {
1182        let needs_create = match fs.stat(block_id, filename) {
1183            Ok(None) => true,
1184            Ok(Some(_)) => false,
1185            Err(e) => {
1186                tracing::warn!(
1187                    block_id = %block_id,
1188                    filename = %filename,
1189                    error = %e,
1190                    "filestore stat failed; skipping write-through"
1191                );
1192                return;
1193            }
1194        };
1195
1196        if needs_create {
1197            if let Err(e) = fs.make_file(
1198                block_id,
1199                filename,
1200                std::collections::HashMap::new(),
1201                crate::backend::storage::filestore::FileOpts::default(),
1202            ) {
1203                // AlreadyExists is benign (race between two appends); anything
1204                // else is worth warning about.
1205                use crate::backend::storage::error::StoreError;
1206                if !matches!(e, StoreError::AlreadyExists) {
1207                    tracing::warn!(
1208                        block_id = %block_id,
1209                        filename = %filename,
1210                        error = %e,
1211                        "filestore make_file failed; skipping write-through"
1212                    );
1213                    return;
1214                }
1215            }
1216        }
1217
1218        if let Err(e) = fs.append_data(block_id, filename, data) {
1219            tracing::warn!(
1220                block_id = %block_id,
1221                filename = %filename,
1222                error = %e,
1223                "filestore append_data failed"
1224            );
1225        }
1226    }
1227}
1228
1229/// Truncate a block's terminal output file and publish a WPS event.
1230/// Port of Go's `HandleTruncateBlockFile`.
1231#[allow(dead_code)]
1232pub fn handle_truncate_block_file(broker: &wps::Broker, block_id: &str, filename: &str) {
1233    let event_data = wps::WSFileEventData {
1234        zoneid: block_id.to_string(),
1235        filename: filename.to_string(),
1236        fileop: wps::FILE_OP_TRUNCATE.to_string(),
1237        data64: String::new(),
1238    };
1239
1240    let event = wps::WaveEvent {
1241        event: wps::EVENT_BLOCK_FILE.to_string(),
1242        scopes: vec![format!("block:{block_id}")],
1243        sender: String::new(),
1244        persist: 0,
1245        data: serde_json::to_value(&event_data).ok(),
1246    };
1247
1248    broker.publish(event);
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254    use crate::backend::shellexec::MockConn;
1255    use std::sync::Arc;
1256
1257    fn make_shell_meta() -> MetaMapType {
1258        let mut meta = MetaMapType::new();
1259        meta.insert(
1260            "controller".to_string(),
1261            serde_json::Value::String("shell".to_string()),
1262        );
1263        meta
1264    }
1265
1266    fn make_cmd_meta(cmd: &str) -> MetaMapType {
1267        let mut meta = MetaMapType::new();
1268        meta.insert(
1269            "controller".to_string(),
1270            serde_json::Value::String("cmd".to_string()),
1271        );
1272        meta.insert(
1273            "cmd".to_string(),
1274            serde_json::Value::String(cmd.to_string()),
1275        );
1276        meta
1277    }
1278
1279    #[test]
1280    fn test_shell_controller_new() {
1281        let ctrl = ShellController::new(
1282            "shell".to_string(),
1283            "tab-1".to_string(),
1284            "block-1".to_string(),
1285            None,
1286            None,
1287            None,
1288        );
1289        assert_eq!(ctrl.controller_type(), "shell");
1290        assert_eq!(ctrl.block_id(), "block-1");
1291
1292        let status = ctrl.get_runtime_status();
1293        assert_eq!(status.shellprocstatus, STATUS_INIT);
1294        assert_eq!(status.blockid, "block-1");
1295        assert_eq!(status.version, 0);
1296    }
1297
1298    #[test]
1299    fn test_shell_controller_start_stop() {
1300        let ctrl = ShellController::new(
1301            "shell".to_string(),
1302            "tab-1".to_string(),
1303            "block-1".to_string(),
1304            None,
1305            None,
1306            None,
1307        );
1308
1309        // Use mock factory so we don't open a real PTY in tests
1310        ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1311            Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1312        }));
1313
1314        let meta = make_shell_meta();
1315        let result = ctrl.start(meta, None, false);
1316        assert!(result.is_ok());
1317
1318        // After start with mock, process immediately exits → status is done
1319        let status = ctrl.get_runtime_status();
1320        assert_eq!(status.shellprocstatus, STATUS_DONE);
1321
1322        // Stop should work
1323        let result = ctrl.stop(true, STATUS_DONE);
1324        assert!(result.is_ok());
1325    }
1326
1327    #[test]
1328    fn test_shell_controller_run_on_start_false() {
1329        let ctrl = ShellController::new(
1330            "shell".to_string(),
1331            "tab-1".to_string(),
1332            "block-1".to_string(),
1333            None,
1334            None,
1335            None,
1336        );
1337
1338        let mut meta = make_shell_meta();
1339        meta.insert(
1340            META_KEY_CMD_RUN_ON_START.to_string(),
1341            serde_json::Value::Bool(false),
1342        );
1343
1344        let result = ctrl.start(meta, None, false);
1345        assert!(result.is_ok());
1346
1347        // Should still be in init state (didn't start)
1348        let status = ctrl.get_runtime_status();
1349        assert_eq!(status.shellprocstatus, STATUS_INIT);
1350    }
1351
1352    #[test]
1353    fn test_shell_controller_force_start() {
1354        let ctrl = ShellController::new(
1355            "shell".to_string(),
1356            "tab-1".to_string(),
1357            "block-1".to_string(),
1358            None,
1359            None,
1360            None,
1361        );
1362
1363        ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1364            Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1365        }));
1366
1367        let mut meta = make_shell_meta();
1368        meta.insert(
1369            META_KEY_CMD_RUN_ON_START.to_string(),
1370            serde_json::Value::Bool(false),
1371        );
1372
1373        // Force should override run_on_start=false
1374        let result = ctrl.start(meta, None, true);
1375        assert!(result.is_ok());
1376
1377        let status = ctrl.get_runtime_status();
1378        // With mock, immediately exits to done
1379        assert_eq!(status.shellprocstatus, STATUS_DONE);
1380    }
1381
1382    #[test]
1383    fn test_shell_controller_with_conn_factory() {
1384        let ctrl = ShellController::new(
1385            "cmd".to_string(),
1386            "tab-1".to_string(),
1387            "block-1".to_string(),
1388            None,
1389            None,
1390            None,
1391        );
1392
1393        // Set a custom factory that returns a mock with exit code 42
1394        ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1395            Ok(Box::new(MockConn::new(42)) as Box<dyn ConnInterface>)
1396        }));
1397
1398        let meta = make_cmd_meta("echo hello");
1399        let result = ctrl.start(meta, None, true);
1400        assert!(result.is_ok());
1401
1402        let status = ctrl.get_runtime_status();
1403        assert_eq!(status.shellprocstatus, STATUS_DONE);
1404        assert_eq!(status.shellprocexitcode, 42);
1405    }
1406
1407    #[test]
1408    fn test_shell_controller_conn_factory_error() {
1409        let ctrl = ShellController::new(
1410            "shell".to_string(),
1411            "tab-1".to_string(),
1412            "block-1".to_string(),
1413            None,
1414            None,
1415            None,
1416        );
1417
1418        ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1419            Err("connection refused".to_string())
1420        }));
1421
1422        let meta = make_shell_meta();
1423        let result = ctrl.start(meta, None, true);
1424        assert!(result.is_err());
1425        assert!(result.unwrap_err().contains("connection refused"));
1426
1427        let status = ctrl.get_runtime_status();
1428        assert_eq!(status.shellprocstatus, STATUS_DONE);
1429        assert_eq!(status.shellprocexitcode, -1);
1430    }
1431
1432    #[test]
1433    fn test_shell_controller_send_input_not_running() {
1434        let ctrl = ShellController::new(
1435            "shell".to_string(),
1436            "tab-1".to_string(),
1437            "block-1".to_string(),
1438            None,
1439            None,
1440            None,
1441        );
1442
1443        let result = ctrl.send_input(BlockInputUnion::data(b"hello".to_vec()), None);
1444        assert!(result.is_err());
1445        assert!(result.unwrap_err().contains("not running"));
1446    }
1447
1448    #[test]
1449    fn test_shell_controller_status_version_increments() {
1450        let ctrl = ShellController::new(
1451            "shell".to_string(),
1452            "tab-1".to_string(),
1453            "block-1".to_string(),
1454            None,
1455            None,
1456            None,
1457        );
1458
1459        ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1460            Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1461        }));
1462
1463        let v0 = ctrl.get_runtime_status().version;
1464
1465        let meta = make_shell_meta();
1466        ctrl.start(meta, None, true).unwrap();
1467
1468        let v_after = ctrl.get_runtime_status().version;
1469        // Status changed from init → running → done = at least 2 increments
1470        assert!(v_after > v0);
1471    }
1472
1473    #[test]
1474    fn test_controller_trait_as_arc() {
1475        let ctrl: Arc<dyn Controller> = Arc::new(ShellController::new(
1476            "shell".to_string(),
1477            "tab-1".to_string(),
1478            "block-1".to_string(),
1479            None,
1480            None,
1481            None,
1482        ));
1483
1484        assert_eq!(ctrl.controller_type(), "shell");
1485        assert_eq!(ctrl.block_id(), "block-1");
1486        let status = ctrl.get_runtime_status();
1487        assert_eq!(status.shellprocstatus, STATUS_INIT);
1488    }
1489
1490    #[test]
1491    fn test_meta_helpers() {
1492        let mut meta = MetaMapType::new();
1493        assert!(ShellController::should_run_on_start(&meta)); // default true
1494        assert!(!ShellController::should_run_once(&meta)); // default false
1495        assert!(!ShellController::should_clear_on_start(&meta)); // default false
1496        assert!(!ShellController::should_close_on_exit(&meta)); // default false
1497
1498        meta.insert(
1499            META_KEY_CMD_RUN_ON_START.to_string(),
1500            serde_json::Value::Bool(false),
1501        );
1502        assert!(!ShellController::should_run_on_start(&meta));
1503
1504        meta.insert(
1505            META_KEY_CMD_RUN_ONCE.to_string(),
1506            serde_json::Value::Bool(true),
1507        );
1508        assert!(ShellController::should_run_once(&meta));
1509
1510        meta.insert(
1511            META_KEY_CMD_CLEAR_ON_START.to_string(),
1512            serde_json::Value::Bool(true),
1513        );
1514        assert!(ShellController::should_clear_on_start(&meta));
1515    }
1516
1517    #[test]
1518    fn test_close_on_exit_delay() {
1519        let mut meta = MetaMapType::new();
1520        assert_eq!(ShellController::close_on_exit_delay_ms(&meta), 2000); // default
1521
1522        meta.insert(
1523            META_KEY_CMD_CLOSE_ON_EXIT_DELAY.to_string(),
1524            serde_json::json!(5000),
1525        );
1526        assert_eq!(ShellController::close_on_exit_delay_ms(&meta), 5000);
1527    }
1528
1529    #[test]
1530    fn test_conn_name_from_meta() {
1531        let mut meta = MetaMapType::new();
1532        assert_eq!(ShellController::get_conn_name(&meta), "local"); // default
1533
1534        meta.insert(
1535            META_KEY_CONNECTION.to_string(),
1536            serde_json::Value::String("user@host".to_string()),
1537        );
1538        assert_eq!(ShellController::get_conn_name(&meta), "user@host");
1539    }
1540
1541    #[test]
1542    fn test_handle_append_block_file() {
1543        let broker = wps::Broker::new();
1544
1545        // Subscribe to block file events
1546        broker.subscribe(
1547            "test-route",
1548            wps::SubscriptionRequest {
1549                event: wps::EVENT_BLOCK_FILE.to_string(),
1550                scopes: vec!["block:block-1".to_string()],
1551                allscopes: false,
1552            },
1553        );
1554
1555        handle_append_block_file(&broker, "block-1", "term", b"hello world", None);
1556
1557        // Check event was published
1558        let _history = broker.read_event_history(wps::EVENT_BLOCK_FILE, "block:block-1", 10);
1559        // Note: events are only persisted if persist > 0, so we verify via the publish mechanism
1560        // The broker successfully processed without panic, which verifies correctness
1561    }
1562
1563    #[test]
1564    fn test_handle_truncate_block_file() {
1565        let broker = wps::Broker::new();
1566        // Should not panic
1567        handle_truncate_block_file(&broker, "block-1", "term");
1568    }
1569
1570    #[test]
1571    fn test_register_and_get_controller() {
1572        let ctrl: Arc<dyn Controller> = Arc::new(ShellController::new(
1573            "shell".to_string(),
1574            "tab-1".to_string(),
1575            "test-register-block".to_string(),
1576            None,
1577            None,
1578            None,
1579        ));
1580
1581        super::super::register_controller("test-register-block", ctrl.clone());
1582
1583        let retrieved = super::super::get_controller("test-register-block");
1584        assert!(retrieved.is_some());
1585        assert_eq!(retrieved.unwrap().block_id(), "test-register-block");
1586
1587        // Cleanup
1588        super::super::delete_controller("test-register-block");
1589        assert!(super::super::get_controller("test-register-block").is_none());
1590    }
1591
1592    #[test]
1593    fn test_resync_creates_shell_controller() {
1594        use crate::backend::obj::Block;
1595
1596        let mut meta = MetaMapType::new();
1597        meta.insert(
1598            "controller".to_string(),
1599            serde_json::Value::String("shell".to_string()),
1600        );
1601        // Disable auto-start so we don't open a real PTY in tests
1602        meta.insert(
1603            META_KEY_CMD_RUN_ON_START.to_string(),
1604            serde_json::Value::Bool(false),
1605        );
1606
1607        let block = Block {
1608            oid: "resync-test-block".to_string(),
1609            version: 1,
1610            meta,
1611            ..Default::default()
1612        };
1613
1614        let result = super::super::resync_controller(&block, "tab-1", None, false, None, None, None, None);
1615        assert!(result.is_ok());
1616
1617        let ctrl = super::super::get_controller("resync-test-block");
1618        assert!(ctrl.is_some());
1619        assert_eq!(ctrl.unwrap().controller_type(), "shell");
1620
1621        // Cleanup
1622        super::super::delete_controller("resync-test-block");
1623    }
1624
1625    /// Phase 1.3 integration test: write output via handle_append_block_file with a
1626    /// FileStore, then verify the data is readable back from the store.
1627    #[test]
1628    fn test_handle_append_block_file_writes_to_filestore() {
1629        use crate::backend::storage::filestore::FileStore;
1630        use std::sync::Arc;
1631
1632        let broker = wps::Broker::new();
1633        let fs = Arc::new(FileStore::open_in_memory().expect("open in-memory filestore"));
1634
1635        let block_id = "test-block-fs";
1636        let filename = "output";
1637
1638        // First append — file does not exist yet; handle_append_block_file must create it lazily.
1639        let line1 = b"line one\n";
1640        handle_append_block_file(&broker, block_id, filename, line1, Some(&fs));
1641
1642        // Second append
1643        let line2 = b"line two\n";
1644        handle_append_block_file(&broker, block_id, filename, line2, Some(&fs));
1645
1646        // Read back from FileStore
1647        let data = fs.read_file(block_id, filename)
1648            .expect("read_file ok")
1649            .expect("data present");
1650
1651        let text = String::from_utf8(data).expect("valid utf8");
1652        assert!(text.contains("line one"), "expected 'line one' in {:?}", text);
1653        assert!(text.contains("line two"), "expected 'line two' in {:?}", text);
1654
1655        // Also verify total size matches
1656        let stat = fs.stat(block_id, filename).unwrap().unwrap();
1657        assert_eq!(stat.size, (line1.len() + line2.len()) as i64);
1658
1659        // Verify WPS events were also published (broker path still works)
1660        broker.subscribe(
1661            "test-route-fs",
1662            wps::SubscriptionRequest {
1663                event: wps::EVENT_BLOCK_FILE.to_string(),
1664                scopes: vec![format!("block:{}", block_id)],
1665                allscopes: false,
1666            },
1667        );
1668        // Re-publish one more line to confirm broker still receives events alongside filestore
1669        handle_append_block_file(&broker, block_id, filename, b"line three\n", Some(&fs));
1670        let stat_after = fs.stat(block_id, filename).unwrap().unwrap();
1671        assert_eq!(stat_after.size, (line1.len() + line2.len() + b"line three\n".len()) as i64);
1672    }
1673
1674    // ────────────────────────────────────────────────────────────────
1675    // extract_agent_events — Phase 1.5 PR 1
1676    // ────────────────────────────────────────────────────────────────
1677
1678    use crate::agents::translator::claude::ClaudeTranslator;
1679    use crate::agents::types::AgentEvent;
1680
1681    #[test]
1682    fn extract_agent_events_full_line_translates() {
1683        let mut t = ClaudeTranslator::new();
1684        let mut buf: Vec<u8> = Vec::new();
1685        let line =
1686            br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"hello"}}}
1687"#;
1688        let events = extract_agent_events(&mut buf, line, &mut t);
1689        assert_eq!(events.len(), 1);
1690        match &events[0] {
1691            AgentEvent::AssistantText { delta } => assert_eq!(delta, "hello"),
1692            other => panic!("expected AssistantText, got {other:?}"),
1693        }
1694        // Buffer drained.
1695        assert!(buf.is_empty());
1696    }
1697
1698    #[test]
1699    fn extract_agent_events_chunked_line_accumulates() {
1700        // PTY can deliver a single logical line across multiple read()
1701        // calls. The buffer must accumulate across calls and only emit
1702        // once the newline arrives.
1703        let mut t = ClaudeTranslator::new();
1704        let mut buf: Vec<u8> = Vec::new();
1705        // First chunk: prefix of the JSON, no newline.
1706        let events = extract_agent_events(
1707            &mut buf,
1708            br#"{"type":"stream_event","event":{"type":"content_"#,
1709            &mut t,
1710        );
1711        assert!(events.is_empty());
1712        // Second chunk: rest of the JSON + newline.
1713        let events = extract_agent_events(
1714            &mut buf,
1715            br#"block_delta","delta":{"type":"text_delta","text":"hi"}}}
1716"#,
1717            &mut t,
1718        );
1719        assert_eq!(events.len(), 1);
1720        match &events[0] {
1721            AgentEvent::AssistantText { delta } => assert_eq!(delta, "hi"),
1722            other => panic!("expected AssistantText, got {other:?}"),
1723        }
1724    }
1725
1726    #[test]
1727    fn extract_agent_events_drops_non_json_lines() {
1728        // Interactive pane output (ANSI escapes, prompts, blank lines)
1729        // must not produce events.
1730        let mut t = ClaudeTranslator::new();
1731        let mut buf: Vec<u8> = Vec::new();
1732        let pty_text = b"\x1b[2K\x1b[0;0H> some prompt\n[m\nplain text\n";
1733        let events = extract_agent_events(&mut buf, pty_text, &mut t);
1734        assert!(events.is_empty(), "got unexpected events: {events:?}");
1735        // Buffer drained — only the trailing line (if any) is retained.
1736        // Here every chunk had a newline at the end, so buf is empty.
1737        assert!(buf.is_empty());
1738    }
1739
1740    #[test]
1741    fn extract_agent_events_drops_carriage_returns() {
1742        // PTYs in cooked mode often emit \r\n. Trimming should accept
1743        // both.
1744        let mut t = ClaudeTranslator::new();
1745        let mut buf: Vec<u8> = Vec::new();
1746        let line = br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"crlf"}}}
1747"#;
1748        // Replace the \n with \r\n.
1749        let mut bytes: Vec<u8> = line.to_vec();
1750        let last = bytes.len() - 1;
1751        bytes.insert(last, b'\r');
1752        let events = extract_agent_events(&mut buf, &bytes, &mut t);
1753        assert_eq!(events.len(), 1);
1754    }
1755
1756    #[test]
1757    fn extract_agent_events_drops_malformed_json() {
1758        // A line that starts with `{` but isn't valid JSON should be
1759        // silently dropped.
1760        let mut t = ClaudeTranslator::new();
1761        let mut buf: Vec<u8> = Vec::new();
1762        let events = extract_agent_events(&mut buf, b"{not_valid_json\n", &mut t);
1763        assert!(events.is_empty());
1764    }
1765
1766    #[test]
1767    fn extract_agent_events_resets_oversized_buffer() {
1768        // A producer that never emits newlines for >1 MiB triggers
1769        // the buffer reset so memory stays bounded.
1770        let mut t = ClaudeTranslator::new();
1771        let mut buf: Vec<u8> = Vec::new();
1772        let chunk = vec![b'x'; AGENT_LINE_BUFFER_CAP + 1];
1773        let events = extract_agent_events(&mut buf, &chunk, &mut t);
1774        assert!(events.is_empty());
1775        assert!(
1776            buf.is_empty(),
1777            "buffer should reset past cap, was {} bytes",
1778            buf.len()
1779        );
1780    }
1781
1782    #[test]
1783    fn extract_agent_events_preserves_utf8_across_read_boundary() {
1784        // Reagent P1 / Codex P2 on PR #833: a multi-byte UTF-8
1785        // character split across two PTY reads must decode cleanly
1786        // once the complete line arrives, not lossy-decode each
1787        // half into U+FFFD.
1788        //
1789        // 'こんにちは' = e3 81 93 / e3 82 93 / e3 81 ab / e3 81 a1 / e3 81 af
1790        // Each char is 3 bytes. Split a line mid-character to verify
1791        // the buffered-bytes path preserves the codepoint.
1792        let mut t = ClaudeTranslator::new();
1793        let mut buf: Vec<u8> = Vec::new();
1794        let frame = r#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"こんにちは"}}}
1795"#;
1796        let frame_bytes = frame.as_bytes();
1797        // Split at byte 60 — somewhere inside one of the multi-byte
1798        // sequences of こんにちは (which starts around byte 75 in the
1799        // JSON). Use a position that's clearly inside the codepoint
1800        // run.
1801        let split = frame_bytes
1802            .iter()
1803            .position(|&b| b == 0xe3)
1804            .expect("expected to find a multi-byte codepoint")
1805            + 1; // split right after the leading byte (mid-codepoint)
1806        let (a, b) = frame_bytes.split_at(split);
1807        let events = extract_agent_events(&mut buf, a, &mut t);
1808        // First chunk had no newline.
1809        assert!(events.is_empty());
1810        let events = extract_agent_events(&mut buf, b, &mut t);
1811        assert_eq!(events.len(), 1);
1812        match &events[0] {
1813            AgentEvent::AssistantText { delta } => {
1814                assert_eq!(
1815                    delta, "こんにちは",
1816                    "UTF-8 must round-trip cleanly across read boundary; got {delta:?}"
1817                );
1818                assert!(!delta.contains('\u{FFFD}'), "no replacement chars");
1819            }
1820            other => panic!("expected AssistantText, got {other:?}"),
1821        }
1822    }
1823
1824    #[test]
1825    fn extract_agent_events_two_lines_one_chunk() {
1826        // PTY can also deliver multiple complete lines in a single
1827        // read(). Verify they're all processed.
1828        let mut t = ClaudeTranslator::new();
1829        let mut buf: Vec<u8> = Vec::new();
1830        let two_lines = br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"a"}}}
1831{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"b"}}}
1832"#;
1833        let events = extract_agent_events(&mut buf, two_lines, &mut t);
1834        assert_eq!(events.len(), 2);
1835    }
1836}