agentmux_srv\backend\blockcontroller/
mod.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Block controller: manages lifecycle of each block (terminal, command, web app).
5//! Port of Go's pkg/blockcontroller/blockcontroller.go.
6
7//!
8//! Architecture:
9//! - Global controller registry maps block_id → Controller
10//! - Each controller manages the lifecycle of one block
11//! - ShellController handles "shell" and "cmd" block types
12//! - Controllers dispatch I/O between the user and the process/service
13
14pub mod acp;
15pub mod health;
16pub mod persistent;
17pub mod pidregistry;
18pub mod process_tree;
19pub mod session_recovery;
20pub mod session_stats;
21pub mod shell;
22pub mod subprocess;
23pub mod watchdog;
24
25use std::any::Any;
26use std::collections::HashMap;
27use std::sync::{Arc, RwLock};
28
29use serde::{Deserialize, Serialize};
30
31use super::eventbus::EventBus;
32use super::storage::filestore::FileStore;
33use super::storage::wstore::WaveStore;
34use super::obj::{Block, MetaMapType, TermSize};
35use super::wps::Broker;
36
37// ---- Controller status constants (match Go) ----
38
39pub const STATUS_INIT: &str = "init";
40pub const STATUS_RUNNING: &str = "running";
41pub const STATUS_DONE: &str = "done";
42
43// ---- Controller type constants (match Go) ----
44
45pub const BLOCK_CONTROLLER_SHELL: &str = "shell";
46pub const BLOCK_CONTROLLER_CMD: &str = "cmd";
47pub const BLOCK_CONTROLLER_TSUNAMI: &str = "tsunami";
48pub const BLOCK_CONTROLLER_SUBPROCESS: &str = "subprocess";
49pub const BLOCK_CONTROLLER_PERSISTENT: &str = "persistent";
50pub const BLOCK_CONTROLLER_ACP: &str = "acp";
51
52// ---- Block metadata key constants (match Go) ----
53
54pub const META_KEY_CONTROLLER: &str = "controller";
55pub const META_KEY_CONNECTION: &str = "connection";
56pub const META_KEY_CMD: &str = "cmd";
57pub const META_KEY_CMD_CWD: &str = "cmd:cwd";
58#[allow(dead_code)]
59pub const META_KEY_CMD_SHELL: &str = "cmd:shell";
60pub const META_KEY_CMD_ARGS: &str = "cmd:args";
61pub const META_KEY_CMD_ENV: &str = "cmd:env";
62#[allow(dead_code)]
63pub const META_KEY_CMD_JWT: &str = "cmd:jwt";
64pub const META_KEY_CMD_RUN_ON_START: &str = "cmd:runonstart";
65pub const META_KEY_CMD_RUN_ONCE: &str = "cmd:runonce";
66pub const META_KEY_CMD_CLEAR_ON_START: &str = "cmd:clearonstart";
67pub const META_KEY_CMD_CLOSE_ON_EXIT: &str = "cmd:closeonexit";
68pub const META_KEY_CMD_CLOSE_ON_EXIT_FORCE: &str = "cmd:closeonexitforce";
69pub const META_KEY_CMD_CLOSE_ON_EXIT_DELAY: &str = "cmd:closeonexitdelay";
70#[allow(dead_code)]
71pub const META_KEY_CMD_INIT_SCRIPT: &str = "cmd:initscript";
72#[allow(dead_code)]
73pub const META_KEY_CMD_INIT_SCRIPT_BASH: &str = "cmd:initscript.bash";
74#[allow(dead_code)]
75pub const META_KEY_CMD_INIT_SCRIPT_ZSH: &str = "cmd:initscript.zsh";
76#[allow(dead_code)]
77pub const META_KEY_CMD_INIT_SCRIPT_FISH: &str = "cmd:initscript.fish";
78#[allow(dead_code)]
79pub const META_KEY_CMD_INIT_SCRIPT_PWSH: &str = "cmd:initscript.pwsh";
80#[allow(dead_code)]
81pub const META_KEY_TERM_LOCAL_SHELL_PATH: &str = "term:localshellpath";
82#[allow(dead_code)]
83pub const META_KEY_TERM_LOCAL_SHELL_OPTS: &str = "term:localshellopts";
84
85// ---- Default timeouts ----
86
87/// Default controller operation timeout in milliseconds.
88#[allow(dead_code)]
89pub const DEFAULT_TIMEOUT_MS: u64 = 2000;
90
91/// Grace period before forceful kill in milliseconds.
92#[allow(dead_code)]
93pub const DEFAULT_GRACEFUL_KILL_WAIT_MS: u64 = 400;
94
95// ---- Input union (matches Go's BlockInputUnion) ----
96
97/// Input sent to a block controller.
98/// Can be raw terminal data, a signal, or a resize event.
99#[derive(Debug, Clone)]
100pub struct BlockInputUnion {
101    /// Raw terminal input bytes (base64 decoded from wire format).
102    pub input_data: Option<Vec<u8>>,
103    /// Signal name (e.g., "SIGTERM", "SIGINT").
104    pub sig_name: Option<String>,
105    /// Terminal resize event.
106    pub term_size: Option<TermSize>,
107}
108
109impl BlockInputUnion {
110    pub fn data(data: Vec<u8>) -> Self {
111        Self {
112            input_data: Some(data),
113            sig_name: None,
114            term_size: None,
115        }
116    }
117
118    pub fn signal(name: &str) -> Self {
119        Self {
120            input_data: None,
121            sig_name: Some(name.to_string()),
122            term_size: None,
123        }
124    }
125
126    pub fn resize(size: TermSize) -> Self {
127        Self {
128            input_data: None,
129            sig_name: None,
130            term_size: Some(size),
131        }
132    }
133}
134
135fn is_false(v: &bool) -> bool {
136    !v
137}
138
139// ---- Runtime status (matches Go's BlockControllerRuntimeStatus) ----
140
141/// Runtime status of a block controller, sent to the UI.
142#[derive(Debug, Clone, Serialize, Deserialize, Default)]
143pub struct BlockControllerRuntimeStatus {
144    pub blockid: String,
145    #[serde(default)]
146    pub version: i32,
147    #[serde(default, skip_serializing_if = "String::is_empty")]
148    pub shellprocstatus: String,
149    #[serde(default, skip_serializing_if = "String::is_empty")]
150    pub shellprocconnname: String,
151    #[serde(default)]
152    pub shellprocexitcode: i32,
153    /// Unix timestamp (ms) when the process was spawned; None until first spawn.
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub spawn_ts_ms: Option<i64>,
156    /// True if this pane is running an agent CLI (e.g. claude, codex, gemini, kimi, openclaw, pi).
157    #[serde(default, skip_serializing_if = "is_false")]
158    pub is_agent_pane: bool,
159}
160
161// ---- Controller trait ----
162
163/// Trait for block controllers. Each block type has its own implementation.
164/// Port of Go's `blockcontroller.Controller` interface.
165pub trait Controller: Send + Sync {
166    /// Start the controller. May spawn background tasks.
167    /// `force` restarts even if already running.
168    fn start(
169        &self,
170        block_meta: MetaMapType,
171        rt_opts: Option<serde_json::Value>,
172        force: bool,
173    ) -> Result<(), String>;
174
175    /// Stop the controller.
176    /// `graceful` waits for process to exit; `new_status` is the target state.
177    fn stop(&self, graceful: bool, new_status: &str) -> Result<(), String>;
178
179    /// Get the current runtime status.
180    fn get_runtime_status(&self) -> BlockControllerRuntimeStatus;
181
182    /// Send input (terminal data, signal, or resize) to the controller.
183    /// `seq` is the per-TermViewModel monotonic counter; `None` means fire-and-forget (no ordering).
184    fn send_input(&self, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String>;
185
186    /// Get the controller type (e.g., "shell", "cmd").
187    fn controller_type(&self) -> &str;
188
189    /// Get the block ID.
190    #[allow(dead_code)]
191    fn block_id(&self) -> &str;
192
193    /// Downcast support for concrete controller types.
194    fn as_any(&self) -> &dyn Any;
195}
196
197// ---- Global controller registry ----
198
199/// Thread-safe global controller registry.
200/// Maps block_id → Arc<dyn Controller>.
201static CONTROLLER_REGISTRY: std::sync::LazyLock<RwLock<HashMap<String, Arc<dyn Controller>>>> =
202    std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
203
204/// Get a controller by block ID.
205pub fn get_controller(block_id: &str) -> Option<Arc<dyn Controller>> {
206    CONTROLLER_REGISTRY
207        .read()
208        .unwrap()
209        .get(block_id)
210        .cloned()
211}
212
213/// Register a controller, stopping any previous one for the same block.
214pub fn register_controller(block_id: &str, controller: Arc<dyn Controller>) {
215    let mut registry = CONTROLLER_REGISTRY.write().unwrap();
216    if let Some(old) = registry.remove(block_id) {
217        // Stop the old controller before replacing
218        let _ = old.stop(true, STATUS_DONE);
219    }
220    registry.insert(block_id.to_string(), controller);
221}
222
223/// Unregister (delete) a controller by block ID, stopping it first.
224/// Removes from the registry before calling stop() so no new callers can reach it.
225pub fn delete_controller(block_id: &str) {
226    let ctrl = CONTROLLER_REGISTRY.write().unwrap().remove(block_id);
227    if let Some(ctrl) = ctrl {
228        let _ = ctrl.stop(true, STATUS_DONE);
229    }
230    // Drop the process tracker for this block. On Windows the job
231    // object's `KILL_ON_JOB_CLOSE` flag nukes the whole descendant
232    // tree; on Linux/macOS the tracker's `Drop` does the same.
233    // No-op if the tracker global isn't initialized.
234    if let Some(registry) = crate::backend::process_tracker::registry::global() {
235        registry.remove(block_id);
236    }
237}
238
239/// Get all controllers (snapshot).
240pub fn get_all_controllers() -> HashMap<String, Arc<dyn Controller>> {
241    CONTROLLER_REGISTRY.read().unwrap().clone()
242}
243
244/// Stop all running controllers gracefully.
245#[allow(dead_code)]
246pub fn stop_all_controllers() {
247    let controllers = get_all_controllers();
248    for (_, ctrl) in controllers {
249        let _ = ctrl.stop(true, STATUS_DONE);
250    }
251}
252
253// ---- Public API functions ----
254
255/// Get the runtime status for a block's controller.
256/// Returns None if no controller is registered.
257pub fn get_block_controller_status(block_id: &str) -> Option<BlockControllerRuntimeStatus> {
258    get_controller(block_id).map(|c| c.get_runtime_status())
259}
260
261/// Stop a block's controller gracefully.
262#[allow(dead_code)]
263pub fn stop_block_controller(block_id: &str) -> Result<(), String> {
264    match get_controller(block_id) {
265        Some(ctrl) => ctrl.stop(true, STATUS_DONE),
266        None => Ok(()), // No controller = already stopped
267    }
268}
269
270/// Send input to a block's controller.
271pub fn send_input(block_id: &str, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String> {
272    match get_controller(block_id) {
273        Some(ctrl) => ctrl.send_input(input, seq),
274        None => Err(format!("no controller for block {block_id}")),
275    }
276}
277
278/// Resync a block's controller — the main entry point for starting/restarting blocks.
279/// Port of Go's `ResyncController`.
280///
281/// Logic:
282/// 1. Load block from database
283/// 2. Determine controller type from meta["controller"]
284/// 3. If existing controller needs replacing (type changed, conn changed, force), stop it
285/// 4. Create new controller if needed
286/// 5. Start if status is init or done
287pub fn resync_controller(
288    block: &Block,
289    tab_id: &str,
290    rt_opts: Option<serde_json::Value>,
291    force: bool,
292    broker: Option<Arc<Broker>>,
293    event_bus: Option<Arc<EventBus>>,
294    wstore: Option<Arc<WaveStore>>,
295    filestore: Option<Arc<FileStore>>,
296) -> Result<(), String> {
297    let block_id = &block.oid;
298    let block_meta = &block.meta;
299
300    // Get controller type from block meta
301    let controller_type = super::obj::meta_get_string(block_meta, META_KEY_CONTROLLER, "");
302
303    if controller_type.is_empty() {
304        // No controller type = web/static block, nothing to manage
305        return Ok(());
306    }
307
308    tracing::info!(
309        block_id = %block_id,
310        controller_type = %controller_type,
311        wstore_present = wstore.is_some(),
312        event_bus_present = event_bus.is_some(),
313        force,
314        "[dnd-debug] resync_controller entry"
315    );
316
317    // Check if existing controller needs to be replaced
318    let existing = get_controller(block_id);
319    if let Some(ref ctrl) = existing {
320        let needs_replace = if ctrl.controller_type() != controller_type || force {
321            true // Type changed or forced restart
322        } else {
323            let status = ctrl.get_runtime_status();
324            // Check if connection changed
325            let new_conn =
326                super::obj::meta_get_string(block_meta, META_KEY_CONNECTION, "local");
327            status.shellprocconnname != new_conn
328        };
329
330        if needs_replace {
331            let _ = ctrl.stop(true, STATUS_DONE);
332            delete_controller(block_id);
333        } else {
334            // Existing controller is fine, just check if it needs starting
335            let status = ctrl.get_runtime_status();
336            tracing::info!(
337                block_id = %block_id,
338                status = %status.shellprocstatus,
339                "[dnd-debug] existing controller — skipping spawn (no cmd:cwd seed)"
340            );
341            if status.shellprocstatus == STATUS_INIT || status.shellprocstatus == STATUS_DONE {
342                return ctrl.start(block_meta.clone(), rt_opts, force);
343            }
344            return Ok(());
345        }
346    }
347
348    // Create new controller
349    match controller_type.as_str() {
350        BLOCK_CONTROLLER_SHELL | BLOCK_CONTROLLER_CMD => {
351            let ctrl = shell::ShellController::new(
352                controller_type.clone(),
353                tab_id.to_string(),
354                block_id.to_string(),
355                broker,
356                event_bus,
357                wstore,
358            );
359            let ctrl = Arc::new(ctrl);
360            register_controller(block_id, ctrl.clone());
361            ctrl.start(block_meta.clone(), rt_opts, force)
362        }
363        BLOCK_CONTROLLER_SUBPROCESS => {
364            let ctrl = subprocess::SubprocessController::new(
365                tab_id.to_string(),
366                block_id.to_string(),
367                broker,
368                event_bus,
369                wstore,
370                filestore,
371            );
372            let ctrl = Arc::new(ctrl);
373            ctrl.set_self_ref();
374            register_controller(block_id, ctrl.clone());
375            ctrl.start(block_meta.clone(), rt_opts, force)
376        }
377        BLOCK_CONTROLLER_PERSISTENT => {
378            let ctrl = persistent::PersistentSubprocessController::new(
379                tab_id.to_string(),
380                block_id.to_string(),
381                broker,
382                event_bus,
383                wstore,
384                filestore,
385            );
386            let ctrl = Arc::new(ctrl);
387            register_controller(block_id, ctrl.clone());
388            ctrl.start(block_meta.clone(), rt_opts, force)
389        }
390        BLOCK_CONTROLLER_ACP => {
391            let ctrl = acp::AcpController::new(
392                tab_id.to_string(),
393                block_id.to_string(),
394                broker,
395                event_bus,
396                wstore,
397                filestore,
398            );
399            let ctrl = Arc::new(ctrl);
400            register_controller(block_id, ctrl.clone());
401            ctrl.start(block_meta.clone(), rt_opts, force)
402        }
403        BLOCK_CONTROLLER_TSUNAMI => {
404            // Tsunami controller deferred to later phase
405            Err("tsunami controller not yet implemented".to_string())
406        }
407        _ => Err(format!("unknown controller type: {controller_type}")),
408    }
409}
410
411/// Publish a controller status event via WPS broker.
412pub fn publish_controller_status(
413    broker: &super::wps::Broker,
414    status: &BlockControllerRuntimeStatus,
415) {
416    use super::wps::{WaveEvent, EVENT_CONTROLLER_STATUS};
417
418    let event = WaveEvent {
419        event: EVENT_CONTROLLER_STATUS.to_string(),
420        scopes: vec![format!("block:{}", status.blockid)],
421        sender: String::new(),
422        persist: 0,
423        data: serde_json::to_value(status).ok(),
424    };
425    broker.publish(event);
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    fn test_status_constants() {
434        assert_eq!(STATUS_INIT, "init");
435        assert_eq!(STATUS_RUNNING, "running");
436        assert_eq!(STATUS_DONE, "done");
437    }
438
439    #[test]
440    fn test_controller_type_constants() {
441        assert_eq!(BLOCK_CONTROLLER_SHELL, "shell");
442        assert_eq!(BLOCK_CONTROLLER_CMD, "cmd");
443        assert_eq!(BLOCK_CONTROLLER_TSUNAMI, "tsunami");
444    }
445
446    #[test]
447    fn test_meta_key_constants() {
448        assert_eq!(META_KEY_CONTROLLER, "controller");
449        assert_eq!(META_KEY_CONNECTION, "connection");
450        assert_eq!(META_KEY_CMD, "cmd");
451        assert_eq!(META_KEY_CMD_RUN_ON_START, "cmd:runonstart");
452    }
453
454    #[test]
455    fn test_block_input_union_data() {
456        let input = BlockInputUnion::data(b"hello".to_vec());
457        assert_eq!(input.input_data.as_ref().unwrap(), b"hello");
458        assert!(input.sig_name.is_none());
459        assert!(input.term_size.is_none());
460    }
461
462    #[test]
463    fn test_block_input_union_signal() {
464        let input = BlockInputUnion::signal("SIGTERM");
465        assert!(input.input_data.is_none());
466        assert_eq!(input.sig_name.as_ref().unwrap(), "SIGTERM");
467        assert!(input.term_size.is_none());
468    }
469
470    #[test]
471    fn test_block_input_union_resize() {
472        let size = TermSize { rows: 40, cols: 120 };
473        let input = BlockInputUnion::resize(size.clone());
474        assert!(input.input_data.is_none());
475        assert!(input.sig_name.is_none());
476        let ts = input.term_size.unwrap();
477        assert_eq!(ts.rows, 40);
478        assert_eq!(ts.cols, 120);
479    }
480
481    #[test]
482    fn test_runtime_status_default() {
483        let status = BlockControllerRuntimeStatus::default();
484        assert!(status.blockid.is_empty());
485        assert_eq!(status.version, 0);
486        assert!(status.shellprocstatus.is_empty());
487        assert_eq!(status.shellprocexitcode, 0);
488    }
489
490    #[test]
491    fn test_runtime_status_serde() {
492        let status = BlockControllerRuntimeStatus {
493            blockid: "block-123".to_string(),
494            version: 3,
495            shellprocstatus: STATUS_RUNNING.to_string(),
496            shellprocconnname: "local".to_string(),
497            shellprocexitcode: 0,
498            ..Default::default()
499        };
500        let json = serde_json::to_string(&status).unwrap();
501        assert!(json.contains("\"blockid\":\"block-123\""));
502        assert!(json.contains("\"shellprocstatus\":\"running\""));
503
504        let parsed: BlockControllerRuntimeStatus = serde_json::from_str(&json).unwrap();
505        assert_eq!(parsed.blockid, "block-123");
506        assert_eq!(parsed.version, 3);
507    }
508
509    #[test]
510    fn test_get_nonexistent_controller() {
511        assert!(get_controller("nonexistent-block").is_none());
512    }
513
514    #[test]
515    fn test_get_block_controller_status_none() {
516        assert!(get_block_controller_status("nonexistent").is_none());
517    }
518
519    #[test]
520    fn test_stop_nonexistent_controller() {
521        // Should be ok (no-op)
522        assert!(stop_block_controller("nonexistent").is_ok());
523    }
524
525    #[test]
526    fn test_send_input_no_controller() {
527        let result = send_input("nonexistent", BlockInputUnion::data(b"test".to_vec()), None);
528        assert!(result.is_err());
529        assert!(result.unwrap_err().contains("no controller"));
530    }
531
532    #[test]
533    fn test_resync_no_controller_type() {
534        let block = Block {
535            oid: "test-block".to_string(),
536            version: 1,
537            meta: HashMap::new(),
538            ..Default::default()
539        };
540        // No "controller" key in meta = no-op
541        let result = resync_controller(&block, "tab-1", None, false, None, None, None, None);
542        assert!(result.is_ok());
543    }
544
545    #[test]
546    fn test_resync_unknown_controller_type() {
547        let mut meta = MetaMapType::new();
548        meta.insert(
549            "controller".to_string(),
550            serde_json::Value::String("unknown_type".to_string()),
551        );
552        let block = Block {
553            oid: "test-block".to_string(),
554            version: 1,
555            meta,
556            ..Default::default()
557        };
558        let result = resync_controller(&block, "tab-1", None, false, None, None, None, None);
559        assert!(result.is_err());
560        assert!(result.unwrap_err().contains("unknown controller type"));
561    }
562}