agentmux_srv\backend/
shellexec.rs

1#![allow(dead_code)]
2// Copyright 2025-2026, AgentMux Corp.
3// SPDX-License-Identifier: Apache-2.0
4
5//! Shell process execution: PTY management and process lifecycle.
6//! Port of Go's pkg/shellexec/shellexec.go + conninterface.go.
7
8//!
9//! Uses a trait-based abstraction (`ConnInterface`) so that:
10//! - Real PTY implementations can use `portable-pty` or platform APIs
11//! - Tests can use mock implementations
12//! - SSH/WSL connections implement the same interface
13
14use std::collections::HashMap;
15use std::io;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::sync::oneshot;
18
19use super::obj::TermSize;
20
21// ---- Constants ----
22
23/// Default terminal rows (matches Go's shellutil.DefaultTermRows).
24pub const DEFAULT_TERM_ROWS: i64 = 25;
25
26/// Default terminal columns (matches Go's shellutil.DefaultTermCols).
27pub const DEFAULT_TERM_COLS: i64 = 80;
28
29/// Connection type constants (match Go's conncontroller types).
30pub const CONN_TYPE_LOCAL: &str = "local";
31pub const CONN_TYPE_WSL: &str = "wsl";
32pub const CONN_TYPE_SSH: &str = "ssh";
33
34/// Block file name constants (match Go's base.BlockFile_*).
35pub const BLOCK_FILE_TERM: &str = "term";
36pub const BLOCK_FILE_CACHE: &str = "cache";
37pub const BLOCK_FILE_ENV: &str = "env";
38
39/// Default max file size for terminal circular buffer (256KB).
40pub const DEFAULT_TERM_MAX_FILE_SIZE: usize = 256 * 1024;
41
42/// Default max file size for HTML content (256KB).
43pub const DEFAULT_HTML_MAX_FILE_SIZE: usize = 256 * 1024;
44
45/// Max init script size (50KB).
46pub const MAX_INIT_SCRIPT_SIZE: usize = 50 * 1024;
47
48// ---- ConnInterface trait ----
49
50/// Abstraction over a PTY-connected process.
51/// Port of Go's `shellexec.ConnInterface` which embeds `pty.Pty`.
52///
53/// Implementations:
54/// - `CmdWrap` (local processes with PTY)
55/// - `SessionWrap` (SSH sessions)
56/// - `WslCmdWrap` (WSL processes)
57/// - `MockConn` (testing)
58pub trait ConnInterface: Send + Sync {
59    /// Start the process. Called once after creation.
60    fn start(&mut self) -> io::Result<()>;
61
62    /// Wait for process to exit. Returns exit status as error.
63    fn wait(&mut self) -> io::Result<i32>;
64
65    /// Kill the process immediately.
66    fn kill(&self) -> io::Result<()>;
67
68    /// Kill the process gracefully with a timeout.
69    fn kill_graceful(&self, timeout_ms: u64) -> io::Result<()>;
70
71    /// Get the process exit code (only valid after wait returns).
72    fn exit_code(&self) -> i32;
73
74    /// Write data to process stdin/PTY.
75    fn write_data(&self, data: &[u8]) -> io::Result<usize>;
76
77    /// Read data from process stdout/PTY.
78    fn read_data(&self, buf: &mut [u8]) -> io::Result<usize>;
79
80    /// Resize the PTY.
81    fn set_size(&self, rows: i64, cols: i64) -> io::Result<()>;
82
83    /// Close the connection.
84    fn close(&self) -> io::Result<()>;
85}
86
87// ---- Command options ----
88
89/// Options for shell command execution.
90/// Port of Go's `shellexec.CommandOptsType`.
91#[derive(Debug, Clone, Default)]
92pub struct CommandOpts {
93    /// Whether the shell should be interactive (-i flag).
94    pub interactive: bool,
95    /// Whether the shell should be a login shell (-l flag).
96    pub login: bool,
97    /// Working directory for the process.
98    pub cwd: String,
99    /// Path to the shell binary (e.g., /bin/bash).
100    pub shell_path: String,
101    /// Additional shell options/flags.
102    pub shell_opts: Vec<String>,
103    /// Environment variables to inject.
104    pub env: HashMap<String, String>,
105    /// Whether to include JWT token in environment.
106    pub force_jwt: bool,
107}
108
109// ---- ShellProc ----
110
111/// A running shell process wrapping a ConnInterface.
112/// Port of Go's `shellexec.ShellProc`.
113pub struct ShellProc {
114    /// Connection name ("local", "wsl://distro", "user@host").
115    pub conn_name: String,
116
117    /// The underlying PTY/process connection.
118    cmd: Box<dyn ConnInterface>,
119
120    /// Ensures close is called only once.
121    close_once: AtomicBool,
122
123    /// Signaled when the process exits. The i32 is the exit code.
124    done_tx: Option<oneshot::Sender<i32>>,
125
126    /// Receiver for wait completion.
127    done_rx: Option<oneshot::Receiver<i32>>,
128
129    /// Exit code after wait completes.
130    exit_code: std::sync::Mutex<Option<i32>>,
131}
132
133impl ShellProc {
134    /// Create a new ShellProc wrapping a ConnInterface.
135    pub fn new(conn_name: String, cmd: Box<dyn ConnInterface>) -> Self {
136        let (done_tx, done_rx) = oneshot::channel();
137        Self {
138            conn_name,
139            cmd,
140            close_once: AtomicBool::new(false),
141            done_tx: Some(done_tx),
142            done_rx: Some(done_rx),
143            exit_code: std::sync::Mutex::new(None),
144        }
145    }
146
147    /// Start the process.
148    pub fn start(&mut self) -> io::Result<()> {
149        self.cmd.start()
150    }
151
152    /// Write data to the process stdin/PTY.
153    pub fn write(&self, data: &[u8]) -> io::Result<usize> {
154        self.cmd.write_data(data)
155    }
156
157    /// Read data from the process stdout/PTY.
158    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
159        self.cmd.read_data(buf)
160    }
161
162    /// Resize the terminal.
163    pub fn set_size(&self, rows: i64, cols: i64) -> io::Result<()> {
164        self.cmd.set_size(rows, cols)
165    }
166
167    /// Close the process. Idempotent via AtomicBool.
168    pub fn close(&self) -> io::Result<()> {
169        if self
170            .close_once
171            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
172            .is_ok()
173        {
174            self.cmd.close()
175        } else {
176            Ok(())
177        }
178    }
179
180    /// Kill the process immediately.
181    pub fn kill(&self) -> io::Result<()> {
182        self.cmd.kill()
183    }
184
185    /// Wait for process exit and signal done channel.
186    /// This should be called from a dedicated task.
187    pub fn wait_and_signal(&mut self) -> i32 {
188        let exit_code = self.cmd.wait().unwrap_or(-1);
189        *self.exit_code.lock().unwrap() = Some(exit_code);
190        if let Some(tx) = self.done_tx.take() {
191            let _ = tx.send(exit_code);
192        }
193        exit_code
194    }
195
196    /// Take the done receiver (can only be called once).
197    /// Used by the block controller to await process completion.
198    pub fn take_done_rx(&mut self) -> Option<oneshot::Receiver<i32>> {
199        self.done_rx.take()
200    }
201
202    /// Get the exit code (only valid after wait completes).
203    pub fn get_exit_code(&self) -> Option<i32> {
204        *self.exit_code.lock().unwrap()
205    }
206}
207
208// ---- Mock implementation for testing ----
209
210/// Mock ConnInterface for testing without a real PTY.
211pub struct MockConn {
212    /// Data written to this mock (simulates stdin).
213    pub written: std::sync::Mutex<Vec<u8>>,
214    /// Data to return from read (simulates stdout).
215    pub read_data: std::sync::Mutex<Vec<u8>>,
216    /// Whether the process has been started.
217    pub started: AtomicBool,
218    /// Whether the process has been killed.
219    pub killed: AtomicBool,
220    /// Whether the process has been closed.
221    pub closed: AtomicBool,
222    /// Exit code to return.
223    pub mock_exit_code: i32,
224    /// Current terminal size.
225    pub term_size: std::sync::Mutex<(i64, i64)>,
226    /// Notify when wait should return (simulate process exit).
227    pub wait_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
228    pub wait_rx: tokio::sync::Mutex<Option<oneshot::Receiver<()>>>,
229}
230
231impl MockConn {
232    pub fn new(mock_exit_code: i32) -> Self {
233        let (tx, rx) = oneshot::channel();
234        Self {
235            written: std::sync::Mutex::new(Vec::new()),
236            read_data: std::sync::Mutex::new(Vec::new()),
237            started: AtomicBool::new(false),
238            killed: AtomicBool::new(false),
239            closed: AtomicBool::new(false),
240            mock_exit_code,
241            term_size: std::sync::Mutex::new((DEFAULT_TERM_ROWS, DEFAULT_TERM_COLS)),
242            wait_tx: std::sync::Mutex::new(Some(tx)),
243            wait_rx: tokio::sync::Mutex::new(Some(rx)),
244        }
245    }
246
247    /// Set data that will be returned by read_data.
248    pub fn set_read_data(&self, data: Vec<u8>) {
249        *self.read_data.lock().unwrap() = data;
250    }
251
252    /// Signal that the mock process should exit.
253    pub fn signal_exit(&self) {
254        if let Some(tx) = self.wait_tx.lock().unwrap().take() {
255            let _ = tx.send(());
256        }
257    }
258
259    /// Get all data written to the mock.
260    pub fn get_written(&self) -> Vec<u8> {
261        self.written.lock().unwrap().clone()
262    }
263}
264
265impl ConnInterface for MockConn {
266    fn start(&mut self) -> io::Result<()> {
267        self.started.store(true, Ordering::SeqCst);
268        Ok(())
269    }
270
271    fn wait(&mut self) -> io::Result<i32> {
272        // In tests, this blocks until signal_exit is called.
273        // Since we can't do async in a sync trait method, we just return immediately.
274        Ok(self.mock_exit_code)
275    }
276
277    fn kill(&self) -> io::Result<()> {
278        self.killed.store(true, Ordering::SeqCst);
279        Ok(())
280    }
281
282    fn kill_graceful(&self, _timeout_ms: u64) -> io::Result<()> {
283        self.kill()
284    }
285
286    fn exit_code(&self) -> i32 {
287        self.mock_exit_code
288    }
289
290    fn write_data(&self, data: &[u8]) -> io::Result<usize> {
291        self.written.lock().unwrap().extend_from_slice(data);
292        Ok(data.len())
293    }
294
295    fn read_data(&self, buf: &mut [u8]) -> io::Result<usize> {
296        let mut read_data = self.read_data.lock().unwrap();
297        if read_data.is_empty() {
298            return Ok(0);
299        }
300        let len = buf.len().min(read_data.len());
301        buf[..len].copy_from_slice(&read_data[..len]);
302        read_data.drain(..len);
303        Ok(len)
304    }
305
306    fn set_size(&self, rows: i64, cols: i64) -> io::Result<()> {
307        *self.term_size.lock().unwrap() = (rows, cols);
308        Ok(())
309    }
310
311    fn close(&self) -> io::Result<()> {
312        self.closed.store(true, Ordering::SeqCst);
313        Ok(())
314    }
315}
316
317// ---- Helper functions ----
318
319/// Get the default TermSize.
320pub fn default_term_size() -> TermSize {
321    TermSize {
322        rows: DEFAULT_TERM_ROWS,
323        cols: DEFAULT_TERM_COLS,
324    }
325}
326
327/// Determine the shell type from a shell path.
328/// Returns one of: "bash", "zsh", "fish", "pwsh", "unknown".
329pub fn detect_shell_type(shell_path: &str) -> &'static str {
330    let basename = shell_path.rsplit('/').next().unwrap_or(shell_path);
331    let basename = basename.rsplit('\\').next().unwrap_or(basename);
332    match basename {
333        "bash" | "bash.exe" => "bash",
334        "zsh" | "zsh.exe" => "zsh",
335        "fish" | "fish.exe" => "fish",
336        "pwsh" | "pwsh.exe" | "powershell" | "powershell.exe" => "pwsh",
337        _ => "unknown",
338    }
339}
340
341/// Build standard AGENTMUX_* environment variables.
342pub fn build_wave_env(
343    block_id: &str,
344    tab_id: &str,
345    workspace_id: &str,
346    client_id: &str,
347    conn_name: &str,
348    version: &str,
349) -> HashMap<String, String> {
350    let mut env = HashMap::new();
351    env.insert("AGENTMUX".to_string(), "1".to_string());
352    env.insert("AGENTMUX_BLOCKID".to_string(), block_id.to_string());
353    env.insert("AGENTMUX_TABID".to_string(), tab_id.to_string());
354    env.insert(
355        "AGENTMUX_WORKSPACEID".to_string(),
356        workspace_id.to_string(),
357    );
358    env.insert("AGENTMUX_CLIENTID".to_string(), client_id.to_string());
359    env.insert("AGENTMUX_CONN".to_string(), conn_name.to_string());
360    env.insert("AGENTMUX_VERSION".to_string(), version.to_string());
361    env
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[test]
369    fn test_default_term_size() {
370        let ts = default_term_size();
371        assert_eq!(ts.rows, 25);
372        assert_eq!(ts.cols, 80);
373    }
374
375    #[test]
376    fn test_detect_shell_type() {
377        assert_eq!(detect_shell_type("/bin/bash"), "bash");
378        assert_eq!(detect_shell_type("/usr/bin/zsh"), "zsh");
379        assert_eq!(detect_shell_type("/usr/bin/fish"), "fish");
380        assert_eq!(detect_shell_type("/usr/bin/pwsh"), "pwsh");
381        assert_eq!(detect_shell_type("C:\\Windows\\pwsh.exe"), "pwsh");
382        assert_eq!(detect_shell_type("/bin/sh"), "unknown");
383        assert_eq!(detect_shell_type("bash"), "bash");
384    }
385
386    #[test]
387    fn test_build_wave_env() {
388        let env = build_wave_env("block1", "tab1", "ws1", "client1", "local", "0.19.0");
389        assert_eq!(env.get("AGENTMUX").unwrap(), "1");
390        assert_eq!(env.get("AGENTMUX_BLOCKID").unwrap(), "block1");
391        assert_eq!(env.get("AGENTMUX_TABID").unwrap(), "tab1");
392        assert_eq!(env.get("AGENTMUX_WORKSPACEID").unwrap(), "ws1");
393        assert_eq!(env.get("AGENTMUX_CLIENTID").unwrap(), "client1");
394        assert_eq!(env.get("AGENTMUX_CONN").unwrap(), "local");
395        assert_eq!(env.get("AGENTMUX_VERSION").unwrap(), "0.19.0");
396        assert_eq!(env.len(), 7);
397    }
398
399    #[test]
400    fn test_mock_conn_write_read() {
401        let mut mock = MockConn::new(0);
402        mock.start().unwrap();
403        assert!(mock.started.load(Ordering::SeqCst));
404
405        // Write data
406        let written = mock.write_data(b"hello").unwrap();
407        assert_eq!(written, 5);
408        assert_eq!(mock.get_written(), b"hello");
409
410        // Set and read data
411        mock.set_read_data(b"world".to_vec());
412        let mut buf = [0u8; 10];
413        let n = mock.read_data(&mut buf).unwrap();
414        assert_eq!(n, 5);
415        assert_eq!(&buf[..n], b"world");
416    }
417
418    #[test]
419    fn test_mock_conn_resize() {
420        let mock = MockConn::new(0);
421        mock.set_size(40, 120).unwrap();
422        let (rows, cols) = *mock.term_size.lock().unwrap();
423        assert_eq!(rows, 40);
424        assert_eq!(cols, 120);
425    }
426
427    #[test]
428    fn test_mock_conn_kill_close() {
429        let mock = MockConn::new(42);
430        assert!(!mock.killed.load(Ordering::SeqCst));
431        mock.kill().unwrap();
432        assert!(mock.killed.load(Ordering::SeqCst));
433
434        assert!(!mock.closed.load(Ordering::SeqCst));
435        mock.close().unwrap();
436        assert!(mock.closed.load(Ordering::SeqCst));
437
438        assert_eq!(mock.exit_code(), 42);
439    }
440
441    #[test]
442    fn test_shell_proc_lifecycle() {
443        let mock = MockConn::new(0);
444        let mut proc = ShellProc::new("local".to_string(), Box::new(mock));
445
446        // Start
447        proc.start().unwrap();
448
449        // Write
450        proc.write(b"test input").unwrap();
451
452        // Resize
453        proc.set_size(30, 100).unwrap();
454
455        // Close (idempotent)
456        proc.close().unwrap();
457        proc.close().unwrap(); // Second call should be no-op
458    }
459
460    #[test]
461    fn test_shell_proc_wait_and_signal() {
462        let mock = MockConn::new(42);
463        let mut proc = ShellProc::new("local".to_string(), Box::new(mock));
464        proc.start().unwrap();
465
466        let exit_code = proc.wait_and_signal();
467        assert_eq!(exit_code, 42);
468        assert_eq!(proc.get_exit_code(), Some(42));
469    }
470
471    #[test]
472    fn test_shell_proc_take_done_rx() {
473        let mock = MockConn::new(0);
474        let mut proc = ShellProc::new("local".to_string(), Box::new(mock));
475
476        // First take should succeed
477        assert!(proc.take_done_rx().is_some());
478        // Second take should return None
479        assert!(proc.take_done_rx().is_none());
480    }
481
482    #[test]
483    fn test_command_opts_default() {
484        let opts = CommandOpts::default();
485        assert!(!opts.interactive);
486        assert!(!opts.login);
487        assert!(opts.cwd.is_empty());
488        assert!(opts.shell_path.is_empty());
489        assert!(opts.shell_opts.is_empty());
490        assert!(opts.env.is_empty());
491        assert!(!opts.force_jwt);
492    }
493
494    #[test]
495    fn test_constants() {
496        assert_eq!(CONN_TYPE_LOCAL, "local");
497        assert_eq!(CONN_TYPE_WSL, "wsl");
498        assert_eq!(CONN_TYPE_SSH, "ssh");
499        assert_eq!(BLOCK_FILE_TERM, "term");
500        assert_eq!(DEFAULT_TERM_MAX_FILE_SIZE, 256 * 1024);
501    }
502}