1use std::io::Read as _;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23#[cfg(unix)]
24use libc;
25
26use base64::Engine as _;
27use portable_pty::{native_pty_system, CommandBuilder, PtySize};
28use tokio::sync::mpsc;
29
30use super::{
31 BlockControllerRuntimeStatus, BlockInputUnion, Controller, META_KEY_CMD, META_KEY_CMD_ARGS,
32 META_KEY_CMD_CLEAR_ON_START, META_KEY_CMD_CLOSE_ON_EXIT, META_KEY_CMD_CLOSE_ON_EXIT_DELAY,
33 META_KEY_CMD_CLOSE_ON_EXIT_FORCE, META_KEY_CMD_ENV, META_KEY_CMD_RUN_ONCE,
34 META_KEY_CMD_RUN_ON_START, META_KEY_CONNECTION, STATUS_DONE, STATUS_INIT, STATUS_RUNNING,
35};
36use crate::backend::eventbus::EventBus;
37use crate::backend::shellexec::{ConnInterface, ShellProc};
38use crate::backend::storage::filestore::FileStore;
39use crate::backend::storage::wstore::WaveStore;
40use crate::backend::obj::{self, MetaMapType};
41use crate::backend::wps;
42
43const SHELL_INPUT_CH_SIZE: usize = 32;
45
46#[cfg(windows)]
53fn detect_local_shell_path_windows() -> String {
54 use std::os::windows::process::CommandExt;
55 use std::process::Command;
56 const CREATE_NO_WINDOW: u32 = 0x08000000;
57 if Command::new("where")
59 .arg("pwsh")
60 .creation_flags(CREATE_NO_WINDOW)
61 .output()
62 .map(|o| o.status.success())
63 .unwrap_or(false)
64 {
65 return "pwsh".to_string();
66 }
67 if Command::new("where")
69 .arg("powershell")
70 .creation_flags(CREATE_NO_WINDOW)
71 .output()
72 .map(|o| o.status.success())
73 .unwrap_or(false)
74 {
75 return "powershell".to_string();
76 }
77 "cmd.exe".to_string()
78}
79
80#[cfg(not(windows))]
82fn detect_local_shell_path_windows() -> String {
83 "cmd.exe".to_string()
84}
85
86const PTY_READ_BUF_SIZE: usize = 4096;
88
89#[allow(dead_code)]
92const KILL_GRACE_SECS: u64 = 5;
93
94struct ShellControllerInner {
95 proc_status: String,
97 proc_exit_code: i32,
99 status_version: i32,
101 conn_name: String,
103 input_tx: Option<mpsc::Sender<BlockInputUnion>>,
105 #[allow(dead_code)]
107 input_rx: Option<mpsc::Receiver<BlockInputUnion>>,
108 child_pid: Option<u32>,
110 spawn_ts_ms: Option<i64>,
112 last_pty_output: Option<Instant>,
114 is_agent_pane: bool,
116 input_seq_next: u64,
118 input_seq_buf: std::collections::BTreeMap<u64, BlockInputUnion>,
120}
121
122pub type ConnFactory =
125 Box<dyn Fn(&str, &MetaMapType) -> Result<Box<dyn ConnInterface>, String> + Send + Sync>;
126
127pub struct ShellController {
129 controller_type: String,
131 tab_id: String,
133 block_id: String,
135 run_lock: Arc<AtomicBool>,
137 inner: Arc<Mutex<ShellControllerInner>>,
139 conn_factory: Mutex<Option<ConnFactory>>,
141 broker: Option<Arc<wps::Broker>>,
143 #[allow(dead_code)]
145 event_bus: Option<Arc<EventBus>>,
146 wstore: Option<Arc<WaveStore>>,
148}
149
150impl ShellController {
151 pub fn new(
153 controller_type: String,
154 tab_id: String,
155 block_id: String,
156 broker: Option<Arc<wps::Broker>>,
157 event_bus: Option<Arc<EventBus>>,
158 wstore: Option<Arc<WaveStore>>,
159 ) -> Self {
160 Self {
161 controller_type,
162 tab_id,
163 block_id,
164 run_lock: Arc::new(AtomicBool::new(false)),
165 inner: Arc::new(Mutex::new(ShellControllerInner {
166 proc_status: STATUS_INIT.to_string(),
167 proc_exit_code: 0,
168 status_version: 0,
169 conn_name: String::new(),
170 input_tx: None,
171 input_rx: None,
172 child_pid: None,
173 spawn_ts_ms: None,
174 last_pty_output: None,
175 is_agent_pane: false,
176 input_seq_next: 0,
177 input_seq_buf: std::collections::BTreeMap::new(),
178 })),
179 conn_factory: Mutex::new(None),
180 broker,
181 event_bus,
182 wstore,
183 }
184 }
185
186 #[allow(dead_code)]
188 pub fn set_conn_factory(&self, factory: ConnFactory) {
189 *self.conn_factory.lock().unwrap() = Some(factory);
190 }
191
192 fn try_lock_run(&self) -> bool {
194 self.run_lock
195 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
196 .is_ok()
197 }
198
199 fn unlock_run(&self) {
201 self.run_lock.store(false, Ordering::SeqCst);
202 }
203
204 fn set_status(inner: &mut ShellControllerInner, status: &str) {
206 inner.proc_status = status.to_string();
207 inner.status_version += 1;
208 }
209
210 fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
212 let inner = self.inner.lock().unwrap();
213 BlockControllerRuntimeStatus {
214 blockid: self.block_id.clone(),
215 version: inner.status_version,
216 shellprocstatus: inner.proc_status.clone(),
217 shellprocconnname: inner.conn_name.clone(),
218 shellprocexitcode: inner.proc_exit_code,
219 spawn_ts_ms: inner.spawn_ts_ms,
220 is_agent_pane: inner.is_agent_pane,
221 }
222 }
223
224 pub fn last_output_secs_ago(&self) -> Option<u64> {
226 self.inner.lock().unwrap().last_pty_output.map(|t| t.elapsed().as_secs())
227 }
228
229 #[allow(dead_code)]
231 pub fn is_agent_pane(&self) -> bool {
232 self.inner.lock().unwrap().is_agent_pane
233 }
234
235 fn should_run_on_start(meta: &MetaMapType) -> bool {
237 obj::meta_get_bool(meta, META_KEY_CMD_RUN_ON_START, true)
238 }
239
240 #[allow(dead_code)]
242 fn should_run_once(meta: &MetaMapType) -> bool {
243 obj::meta_get_bool(meta, META_KEY_CMD_RUN_ONCE, false)
244 }
245
246 #[allow(dead_code)]
248 fn should_clear_on_start(meta: &MetaMapType) -> bool {
249 obj::meta_get_bool(meta, META_KEY_CMD_CLEAR_ON_START, false)
250 }
251
252 #[allow(dead_code)]
254 fn should_close_on_exit(meta: &MetaMapType) -> bool {
255 obj::meta_get_bool(meta, META_KEY_CMD_CLOSE_ON_EXIT, false)
256 }
257
258 #[allow(dead_code)]
260 fn should_close_on_exit_force(meta: &MetaMapType) -> bool {
261 obj::meta_get_bool(meta, META_KEY_CMD_CLOSE_ON_EXIT_FORCE, false)
262 }
263
264 #[allow(dead_code)]
266 fn close_on_exit_delay_ms(meta: &MetaMapType) -> u64 {
267 match meta.get(META_KEY_CMD_CLOSE_ON_EXIT_DELAY) {
268 Some(serde_json::Value::Number(n)) => n.as_u64().unwrap_or(2000),
269 _ => 2000,
270 }
271 }
272
273 fn get_conn_name(meta: &MetaMapType) -> String {
275 obj::meta_get_string(meta, META_KEY_CONNECTION, "local")
276 }
277
278 fn get_cmd_str(meta: &MetaMapType) -> String {
280 obj::meta_get_string(meta, META_KEY_CMD, "")
281 }
282
283 fn get_cmd_args(meta: &MetaMapType) -> Vec<String> {
285 match meta.get(META_KEY_CMD_ARGS) {
286 Some(serde_json::Value::Array(arr)) => arr
287 .iter()
288 .filter_map(|v| v.as_str().map(|s| s.to_string()))
289 .collect(),
290 _ => vec![],
291 }
292 }
293
294 fn is_interactive(meta: &MetaMapType) -> bool {
296 obj::meta_get_bool(meta, "cmd:interactive", false)
297 }
298
299 fn publish_status(&self) {
301 if let Some(ref broker) = self.broker {
302 let status = self.get_status_snapshot();
303 super::publish_controller_status(broker, &status);
304 }
305 }
306}
307
308impl Controller for ShellController {
309 fn start(
310 &self,
311 block_meta: MetaMapType,
312 _rt_opts: Option<serde_json::Value>,
313 force: bool,
314 ) -> Result<(), String> {
315 let cmd_str_preview = Self::get_cmd_str(&block_meta);
316 let interactive_preview = Self::is_interactive(&block_meta);
317 tracing::info!(
318 block_id = %self.block_id,
319 controller = %self.controller_type,
320 cmd = %cmd_str_preview,
321 interactive = interactive_preview,
322 force = force,
323 "block start requested"
324 );
325
326 if !force && !Self::should_run_on_start(&block_meta) {
328 tracing::info!(block_id = %self.block_id, "skipping start: run_on_start is false");
329 return Ok(());
330 }
331
332 if !self.try_lock_run() {
334 return Err("controller is already running".to_string());
335 }
336
337 let conn_name = Self::get_conn_name(&block_meta);
339
340 {
342 let mut inner = self.inner.lock().unwrap();
343 Self::set_status(&mut inner, STATUS_RUNNING);
344 inner.conn_name = conn_name.clone();
345 }
346 self.publish_status();
347
348 let (input_tx, input_rx) = mpsc::channel(SHELL_INPUT_CH_SIZE);
350 {
351 let mut inner = self.inner.lock().unwrap();
352 inner.input_tx = Some(input_tx);
353 inner.input_seq_next = 0;
354 inner.input_seq_buf.clear();
355 }
356
357 let has_factory = self.conn_factory.lock().unwrap().is_some();
359
360 if has_factory {
361 let conn_result = {
363 let factory = self.conn_factory.lock().unwrap();
364 factory.as_ref().unwrap()(&conn_name, &block_meta)
365 };
366
367 let mut conn = match conn_result {
368 Ok(c) => c,
369 Err(e) => {
370 let mut inner = self.inner.lock().unwrap();
371 Self::set_status(&mut inner, STATUS_DONE);
372 inner.proc_exit_code = -1;
373 inner.input_tx = None;
374 self.unlock_run();
375 return Err(format!("failed to create connection: {e}"));
376 }
377 };
378
379 if let Err(e) = conn.start() {
380 let mut inner = self.inner.lock().unwrap();
381 Self::set_status(&mut inner, STATUS_DONE);
382 inner.proc_exit_code = -1;
383 inner.input_tx = None;
384 self.unlock_run();
385 return Err(format!("failed to start process: {e}"));
386 }
387
388 let mut shell_proc = ShellProc::new(conn_name, conn);
389 let _done_rx = shell_proc.take_done_rx();
390 let exit_code = shell_proc.wait_and_signal();
391
392 {
393 let mut inner = self.inner.lock().unwrap();
394 inner.proc_exit_code = exit_code;
395 Self::set_status(&mut inner, STATUS_DONE);
396 inner.input_tx = None;
397 }
398 self.publish_status();
399 self.unlock_run();
400 return Ok(());
401 }
402
403 let pty_system = native_pty_system();
412 let pty_size = PtySize {
413 rows: 25,
414 cols: 200,
415 pixel_width: 0,
416 pixel_height: 0,
417 };
418
419 let pair = pty_system.openpty(pty_size).map_err(|e| {
420 tracing::error!(block_id = %self.block_id, error = %e, "failed to open PTY");
421 let mut inner = self.inner.lock().unwrap();
422 Self::set_status(&mut inner, STATUS_DONE);
423 inner.proc_exit_code = -1;
424 inner.input_tx = None;
425 self.unlock_run();
426 format!("failed to open PTY: {e}")
427 })?;
428 tracing::info!(block_id = %self.block_id, rows = 25, cols = 200, "PTY opened");
429
430 let cmd_str = Self::get_cmd_str(&block_meta);
432 let cmd_args = Self::get_cmd_args(&block_meta);
433 let interactive = Self::is_interactive(&block_meta);
434
435 let agent_id_for_jekt: Option<String> = block_meta
438 .get(META_KEY_CMD_ENV)
439 .and_then(|m| m.as_object())
440 .and_then(|obj| obj.get("AGENTMUX_AGENT_ID"))
441 .and_then(|v| v.as_str())
442 .map(|s| s.to_string())
443 .or_else(|| {
444 let cfg = crate::backend::wconfig::ConfigWatcher::with_config(
445 crate::backend::wconfig::build_default_config(),
446 );
447 cfg.get_settings().cmd_env.get("AGENTMUX_AGENT_ID").cloned()
448 })
449 .or_else(|| std::env::var("WAVEMUX_AGENT_ID").ok());
450
451 let mut cmd = if !cmd_str.is_empty() && (!cmd_args.is_empty() || interactive) {
452 tracing::info!(block_id = %self.block_id, cmd = %cmd_str, args = ?cmd_args, "direct spawn path");
455 let mut c = CommandBuilder::new(&cmd_str);
456 if !cmd_args.is_empty() {
457 let arg_refs: Vec<&str> = cmd_args.iter().map(|s| s.as_str()).collect();
458 c.args(arg_refs);
459 }
460 c
461 } else if !cmd_str.is_empty() {
462 tracing::info!(block_id = %self.block_id, cmd = %cmd_str, "shell-wrapped spawn path");
464 if cfg!(windows) {
465 let mut c = CommandBuilder::new("cmd.exe");
466 c.args(["/C", &cmd_str]);
467 c
468 } else {
469 let mut c = CommandBuilder::new("/bin/sh");
470 c.args(["-c", &cmd_str]);
471 c
472 }
473 } else {
474 let shell_path = if cfg!(windows) {
477 detect_local_shell_path_windows()
478 } else {
479 std::env::var("SHELL").unwrap_or_else(|_| "/bin/bash".to_string())
480 };
481
482 let shell_type = crate::backend::shellintegration::detect_shell_type(&shell_path);
483
484 let shell_home = crate::backend::base::get_home_dir().join(".agentmux");
490 crate::backend::shellintegration::deploy_scripts(&shell_home);
491
492 tracing::info!(block_id = %self.block_id, shell = %shell_path, shell_type = ?shell_type, "interactive shell path");
493
494 let mut c = CommandBuilder::new(&shell_path);
495
496 if let Some(startup) = crate::backend::shellintegration::get_shell_startup(shell_type, &shell_home) {
498 for arg in &startup.extra_args {
499 c.arg(arg);
500 }
501 for (k, v) in &startup.env_vars {
502 c.env(k, v);
503 }
504 }
505
506 c.env("TERM", "xterm-256color");
511 c.env("COLORTERM", "truecolor");
512 c.env("TERM_PROGRAM", "agentmux");
513 c.env("AGENTMUX_BLOCKID", &self.block_id);
514 c.env("AGENTMUX_TABID", &self.tab_id);
515 c.env("AGENTMUX_VERSION", env!("CARGO_PKG_VERSION"));
516
517 let log_dir = dirs::home_dir()
520 .unwrap_or_default()
521 .join(".agentmux")
522 .join("logs");
523 c.env("AGENTMUX_LOG_DIR", log_dir.to_string_lossy().as_ref());
524
525 if let Ok(local_url) = std::env::var("AGENTMUX_LOCAL_URL") {
528 c.env("AGENTMUX_LOCAL_URL", &local_url);
529 }
530
531 c.env("AGENTMUX", "1");
536
537 {
541 let sep = if cfg!(windows) { ";" } else { ":" };
542 let current_path = std::env::var("PATH").unwrap_or_default();
543 let mut extra: Vec<String> = Vec::new();
544
545 if let Some(user_bin) = crate::backend::tool_store::user_tools_dir() {
547 if user_bin.exists() {
548 extra.push(user_bin.to_string_lossy().into_owned());
549 }
550 }
551
552 if let Some(bundled_bin) = crate::backend::tool_store::bundled_tools_dir() {
554 if bundled_bin.exists() {
555 extra.push(bundled_bin.to_string_lossy().into_owned());
556 }
557 }
558
559 if !extra.is_empty() {
560 c.env("PATH", format!("{current_path}{sep}{}", extra.join(sep)));
561 }
562 }
563
564 let mut has_agent_id = false;
568
569 let config = crate::backend::wconfig::ConfigWatcher::with_config(
571 crate::backend::wconfig::build_default_config(),
572 );
573 let settings = config.get_settings();
574 for (k, v) in &settings.cmd_env {
575 if k == "AGENTMUX_AGENT_ID" {
576 has_agent_id = true;
577 }
578 let expanded = crate::backend::base::expand_home_dir_safe(v);
579 c.env(k, expanded.to_string_lossy().as_ref());
580 }
581
582 if let Some(env_map) = block_meta.get(META_KEY_CMD_ENV) {
584 if let Some(obj) = env_map.as_object() {
585 for (k, v) in obj {
586 if let Some(val) = v.as_str() {
587 if k == "AGENTMUX_AGENT_ID" {
588 has_agent_id = true;
589 }
590 let expanded = crate::backend::base::expand_home_dir_safe(val);
591 c.env(k, expanded.to_string_lossy().as_ref());
592 }
593 }
594 }
595 }
596
597 if !has_agent_id {
603 c.env_remove("AGENTMUX_AGENT_ID");
604 c.env_remove("AGENTMUX_AGENT_COLOR");
605 c.env_remove("AGENTMUX_AGENT_TEXT_COLOR");
606 c.env_remove("WAVEMUX_AGENT_ID");
607 c.env_remove("WAVEMUX_AGENT_COLOR");
608 }
609
610 c
611 };
612
613 let cwd = obj::meta_get_string(&block_meta, super::META_KEY_CMD_CWD, "");
615 if !cwd.is_empty() {
616 cmd.cwd(&cwd);
617 }
618
619 let mut child = pair.slave.spawn_command(cmd).map_err(|e| {
620 tracing::error!(block_id = %self.block_id, error = %e, cmd = %cmd_str, "spawn failed");
621 let mut inner = self.inner.lock().unwrap();
622 Self::set_status(&mut inner, STATUS_DONE);
623 inner.proc_exit_code = -1;
624 inner.input_tx = None;
625 self.unlock_run();
626 format!("failed to spawn command: {e}")
627 })?;
628 tracing::info!(block_id = %self.block_id, "process spawned successfully");
629
630 let is_agent = agent_id_for_jekt.is_some()
632 || cmd_str.to_lowercase().contains("claude")
633 || cmd_str.to_lowercase().contains("codex")
634 || cmd_str.to_lowercase().contains("gemini");
635
636 let spawn_ts_ms = SystemTime::now()
638 .duration_since(UNIX_EPOCH)
639 .map(|d| d.as_millis() as i64)
640 .unwrap_or(0);
641 {
642 let mut inner = self.inner.lock().unwrap();
643 if let Some(pid) = child.process_id() {
644 super::pidregistry::register(&self.block_id, pid);
645 inner.child_pid = Some(pid);
646 }
647 inner.spawn_ts_ms = Some(spawn_ts_ms);
648 inner.is_agent_pane = is_agent;
649 }
650
651 if let Some(ref agent_id) = agent_id_for_jekt {
655 match crate::backend::reactive::get_global_handler()
656 .register_agent(agent_id, &self.block_id, Some(&self.tab_id))
657 {
658 Ok(()) => {
659 tracing::info!(
660 block_id = %self.block_id,
661 agent_id = %agent_id,
662 "jekt: auto-registered"
663 );
664 if let Ok(local_url) = std::env::var("AGENTMUX_LOCAL_URL") {
666 let data_dir = crate::backend::base::get_wave_data_dir();
667 crate::backend::reactive::registry::write(
668 &data_dir,
669 agent_id,
670 &local_url,
671 &self.block_id,
672 );
673 }
674 }
675 Err(e) => tracing::warn!(
676 block_id = %self.block_id,
677 agent_id = %agent_id,
678 error = %e,
679 "jekt: auto-register failed"
680 ),
681 }
682 }
683 tracing::info!(
684 block_id = %self.block_id,
685 wstore_present = self.wstore.is_some(),
686 event_bus_present = self.event_bus.is_some(),
687 "[dnd-debug] pre-seed state after spawn"
688 );
689
690 if let Some(ref store) = self.wstore {
693 let effective_cwd = if !cwd.is_empty() {
694 cwd.clone()
695 } else {
696 std::env::current_dir()
697 .map(|p| p.to_string_lossy().to_string())
698 .unwrap_or_default()
699 };
700 tracing::debug!(block_id = %self.block_id, cwd = %effective_cwd, "seeding cmd:cwd");
701 if !effective_cwd.is_empty() {
702 let oref_str = format!("block:{}", self.block_id);
703 let mut meta_update = MetaMapType::new();
704 meta_update.insert(
705 super::META_KEY_CMD_CWD.to_string(),
706 serde_json::Value::String(effective_cwd),
707 );
708 match store.must_get::<crate::backend::obj::Block>(&self.block_id) {
710 Ok(block) if obj::meta_get_string(&block.meta, super::META_KEY_CMD_CWD, "").is_empty() => {
711 match crate::server::service::update_object_meta(store, &oref_str, &meta_update) {
712 Ok(()) => {
713 if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&self.block_id) {
717 if let Some(ref event_bus) = self.event_bus {
718 let update_data = serde_json::to_value(&obj::WaveObjUpdate {
719 updatetype: "update".into(),
720 otype: "block".into(),
721 oid: self.block_id.clone(),
722 obj: Some(obj::wave_obj_to_value(&updated_block)),
723 }).ok();
724 event_bus.broadcast_event(&crate::backend::eventbus::WSEventType {
725 eventtype: "waveobj:update".to_string(),
726 oref: oref_str.clone(),
727 data: update_data,
728 });
729 tracing::info!(block_id = %self.block_id, "cmd:cwd seeded and broadcast to frontend");
730 } else {
731 tracing::warn!(block_id = %self.block_id, "cmd:cwd written to store but no event_bus to broadcast — frontend won't update");
732 }
733 }
734 }
735 Err(e) => {
736 tracing::warn!(block_id = %self.block_id, error = %e, "failed to seed cmd:cwd in store");
737 }
738 }
739 }
740 Ok(_) => {
741 tracing::debug!(block_id = %self.block_id, "cmd:cwd already set, skipping seed");
742 }
743 Err(e) => {
744 tracing::warn!(block_id = %self.block_id, error = %e, "failed to read block for cmd:cwd seed");
745 }
746 }
747 }
748 }
749
750 let reader = pair.master.try_clone_reader().map_err(|e| {
752 let _ = child.kill();
753 let mut inner = self.inner.lock().unwrap();
754 Self::set_status(&mut inner, STATUS_DONE);
755 inner.proc_exit_code = -1;
756 inner.input_tx = None;
757 self.unlock_run();
758 format!("failed to clone PTY reader: {e}")
759 })?;
760
761 let writer = pair.master.take_writer().map_err(|e| {
762 let _ = child.kill();
763 let mut inner = self.inner.lock().unwrap();
764 Self::set_status(&mut inner, STATUS_DONE);
765 inner.proc_exit_code = -1;
766 inner.input_tx = None;
767 self.unlock_run();
768 format!("failed to take PTY writer: {e}")
769 })?;
770
771 let block_id_read = self.block_id.clone();
773 let broker_read = self.broker.clone();
774 let inner_read = self.inner.clone();
775 let is_agent_read = is_agent;
776 tokio::task::spawn_blocking(move || {
777 let mut reader = reader;
778 let mut buf = [0u8; PTY_READ_BUF_SIZE];
779
780 let mut translator: Option<crate::agents::translator::claude::ClaudeTranslator> =
792 if is_agent_read {
793 Some(crate::agents::translator::claude::ClaudeTranslator::new())
794 } else {
795 None
796 };
797 let mut line_buf: Vec<u8> = Vec::new();
802
803 loop {
804 match reader.read(&mut buf) {
805 Ok(0) => break, Ok(n) => {
807 inner_read.lock().unwrap().last_pty_output = Some(Instant::now());
808 if let Some(ref broker) = broker_read {
809 handle_append_block_file(
810 broker,
811 &block_id_read,
812 "term",
813 &buf[..n],
814 None, );
816 if let Some(ref mut t) = translator {
817 accumulate_and_translate(
818 broker,
819 &block_id_read,
820 &mut line_buf,
821 &buf[..n],
822 t,
823 );
824 }
825 }
826 }
827 Err(e) => {
828 tracing::debug!("PTY read error for {}: {}", block_id_read, e);
829 break;
830 }
831 }
832 }
833 });
834
835 let master = pair.master;
838 tokio::spawn(async move {
839 let mut writer = writer;
840 let mut input_rx = input_rx;
841 while let Some(input) = input_rx.recv().await {
842 if let Some(data) = input.input_data {
843 use std::io::Write;
844 if let Err(e) = writer.write_all(&data) {
845 tracing::debug!("PTY write error: {}", e);
846 break;
847 }
848 }
849 if let Some(ref size) = input.term_size {
850 let pty_size = PtySize {
851 rows: size.rows as u16,
852 cols: size.cols as u16,
853 pixel_width: 0,
854 pixel_height: 0,
855 };
856 if let Err(e) = master.resize(pty_size) {
857 tracing::debug!("PTY resize error: {}", e);
858 }
859 }
860 if input.sig_name.is_some() {
861 break;
863 }
864 }
865 });
867
868 let inner_wait = Arc::clone(&self.inner);
870 let block_id_wait = self.block_id.clone();
871 let agent_id_wait = agent_id_for_jekt.clone();
872 let broker_wait = self.broker.clone();
873 let run_lock = Arc::clone(&self.run_lock);
874 tokio::task::spawn_blocking(move || {
875 let mut child = child;
876
877 let exit_status = child.wait();
879 let exit_code = match exit_status {
880 Ok(status) => {
881 if status.success() {
882 0
883 } else {
884 1
886 }
887 }
888 Err(e) => {
889 tracing::warn!("wait error for block {}: {}", block_id_wait, e);
890 -1
891 }
892 };
893
894 tracing::info!(block_id = %block_id_wait, exit_code = exit_code, "process exited");
895
896 super::pidregistry::unregister(&block_id_wait);
898
899 crate::backend::reactive::get_global_handler().unregister_block(&block_id_wait);
902
903 if let Some(ref agent_id) = agent_id_wait {
905 let data_dir = crate::backend::base::get_wave_data_dir();
906 crate::backend::reactive::registry::remove(&data_dir, agent_id);
907 }
908
909 {
911 let mut inner = inner_wait.lock().unwrap();
912 inner.proc_exit_code = exit_code;
913 ShellController::set_status(&mut inner, STATUS_DONE);
914 inner.input_tx = None;
915 }
916
917 if let Some(ref broker) = broker_wait {
919 let status = {
920 let inner = inner_wait.lock().unwrap();
921 BlockControllerRuntimeStatus {
922 blockid: block_id_wait.clone(),
923 version: inner.status_version,
924 shellprocstatus: inner.proc_status.clone(),
925 shellprocconnname: inner.conn_name.clone(),
926 shellprocexitcode: inner.proc_exit_code,
927 spawn_ts_ms: inner.spawn_ts_ms,
928 is_agent_pane: inner.is_agent_pane,
929 }
930 };
931 super::publish_controller_status(broker, &status);
932 }
933
934 run_lock.store(false, Ordering::SeqCst);
936 });
937
938 Ok(())
940 }
941
942 fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
943 #[allow(unused_variables)] let pid_to_kill = {
946 let mut inner = self.inner.lock().unwrap();
947 if inner.proc_status == new_status {
948 return Ok(());
949 }
950 let pid = inner.child_pid;
951 inner.input_tx = None;
954 Self::set_status(&mut inner, new_status);
955 pid
956 };
957
958 #[cfg(unix)]
964 if let Some(pid) = pid_to_kill {
965 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
967 tokio::spawn(async move {
968 tokio::time::sleep(tokio::time::Duration::from_secs(KILL_GRACE_SECS)).await;
969 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
970 });
971 }
972
973 Ok(())
974 }
975
976 fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
977 self.get_status_snapshot()
978 }
979
980 fn send_input(&self, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String> {
981 let mut inner = self.inner.lock().unwrap();
982 let tx = match &inner.input_tx {
983 Some(tx) => tx.clone(),
984 None => return Err("controller is not running".to_string()),
985 };
986 match seq {
987 None => tx.try_send(input).map_err(|e| format!("send_input: {e}")),
988 Some(s) => {
989 if s == 0 && inner.input_seq_next > 0 {
995 tracing::info!(
996 block_id = %self.block_id,
997 prev_next = inner.input_seq_next,
998 new_seq = s,
999 "input seq reset (session reset detected)"
1000 );
1001 inner.input_seq_next = s;
1002 inner.input_seq_buf.clear();
1003 }
1004
1005 let next = inner.input_seq_next;
1006 if s == next {
1007 inner.input_seq_next += 1;
1010 if let Err(e) = tx.try_send(input) {
1011 tracing::warn!(
1012 block_id = %self.block_id,
1013 seq = s,
1014 "send_input: channel full, dropping in-order packet: {e}"
1015 );
1016 return Ok(());
1017 }
1018 loop {
1020 let expected = inner.input_seq_next;
1021 match inner.input_seq_buf.remove(&expected) {
1022 Some(buffered) => {
1023 inner.input_seq_next += 1;
1024 if let Err(e) = tx.try_send(buffered) {
1025 tracing::warn!(
1026 block_id = %self.block_id,
1027 seq = expected,
1028 "send_input drain: channel full, dropping buffered packet: {e}"
1029 );
1030 }
1031 }
1032 None => break,
1033 }
1034 }
1035 Ok(())
1036 } else if s > next {
1037 if inner.input_seq_buf.len() < SHELL_INPUT_CH_SIZE {
1038 inner.input_seq_buf.insert(s, input);
1039 } else {
1040 tracing::warn!(block_id = %self.block_id, seq = s, "input reorder buffer full, dropping");
1041 }
1042 Ok(())
1043 } else {
1044 tracing::warn!(block_id = %self.block_id, seq = s, next, "duplicate input seq, discarding");
1045 Ok(())
1046 }
1047 }
1048 }
1049 }
1050
1051 fn controller_type(&self) -> &str {
1052 &self.controller_type
1053 }
1054
1055 fn block_id(&self) -> &str {
1056 &self.block_id
1057 }
1058
1059 fn as_any(&self) -> &dyn std::any::Any {
1060 self
1061 }
1062}
1063
1064const AGENT_LINE_BUFFER_CAP: usize = 1024 * 1024;
1072
1073fn extract_agent_events(
1091 line_buf: &mut Vec<u8>,
1092 chunk: &[u8],
1093 translator: &mut crate::agents::translator::claude::ClaudeTranslator,
1094) -> Vec<crate::agents::types::AgentEvent> {
1095 use crate::agents::translator::Translator as _;
1096 let mut out: Vec<crate::agents::types::AgentEvent> = Vec::new();
1097 line_buf.extend_from_slice(chunk);
1098 if line_buf.len() > AGENT_LINE_BUFFER_CAP {
1099 line_buf.clear();
1102 return out;
1103 }
1104 while let Some(nl) = line_buf.iter().position(|&b| b == b'\n') {
1105 let line_bytes: Vec<u8> = line_buf.drain(..=nl).collect();
1106 let line = String::from_utf8_lossy(&line_bytes);
1110 let trimmed = line.trim_end_matches(['\n', '\r']);
1111 if !trimmed.starts_with('{') {
1112 continue;
1115 }
1116 let Ok(frame) = serde_json::from_str::<serde_json::Value>(trimmed) else {
1117 continue;
1118 };
1119 out.extend(translator.translate(frame));
1120 }
1121 out
1122}
1123
1124fn accumulate_and_translate(
1129 broker: &wps::Broker,
1130 block_id: &str,
1131 line_buf: &mut Vec<u8>,
1132 chunk: &[u8],
1133 translator: &mut crate::agents::translator::claude::ClaudeTranslator,
1134) {
1135 for event in extract_agent_events(line_buf, chunk, translator) {
1136 broker.publish(wps::WaveEvent {
1137 event: format!("agent_event:{}", block_id),
1138 scopes: vec![],
1139 sender: String::new(),
1140 persist: 0,
1141 data: Some(serde_json::to_value(&event).unwrap_or_default()),
1142 });
1143 }
1144}
1145
1146pub fn handle_append_block_file(
1153 broker: &wps::Broker,
1154 block_id: &str,
1155 filename: &str,
1156 data: &[u8],
1157 filestore: Option<&Arc<FileStore>>,
1158) {
1159 let data64 = base64::engine::general_purpose::STANDARD.encode(data);
1160
1161 let event_data = wps::WSFileEventData {
1162 zoneid: block_id.to_string(),
1163 filename: filename.to_string(),
1164 fileop: wps::FILE_OP_APPEND.to_string(),
1165 data64,
1166 };
1167
1168 let event = wps::WaveEvent {
1169 event: wps::EVENT_BLOCK_FILE.to_string(),
1170 scopes: vec![format!("block:{block_id}")],
1171 sender: String::new(),
1172 persist: 0,
1173 data: serde_json::to_value(&event_data).ok(),
1174 };
1175
1176 broker.publish(event);
1177
1178 if let Some(fs) = filestore {
1182 let needs_create = match fs.stat(block_id, filename) {
1183 Ok(None) => true,
1184 Ok(Some(_)) => false,
1185 Err(e) => {
1186 tracing::warn!(
1187 block_id = %block_id,
1188 filename = %filename,
1189 error = %e,
1190 "filestore stat failed; skipping write-through"
1191 );
1192 return;
1193 }
1194 };
1195
1196 if needs_create {
1197 if let Err(e) = fs.make_file(
1198 block_id,
1199 filename,
1200 std::collections::HashMap::new(),
1201 crate::backend::storage::filestore::FileOpts::default(),
1202 ) {
1203 use crate::backend::storage::error::StoreError;
1206 if !matches!(e, StoreError::AlreadyExists) {
1207 tracing::warn!(
1208 block_id = %block_id,
1209 filename = %filename,
1210 error = %e,
1211 "filestore make_file failed; skipping write-through"
1212 );
1213 return;
1214 }
1215 }
1216 }
1217
1218 if let Err(e) = fs.append_data(block_id, filename, data) {
1219 tracing::warn!(
1220 block_id = %block_id,
1221 filename = %filename,
1222 error = %e,
1223 "filestore append_data failed"
1224 );
1225 }
1226 }
1227}
1228
1229#[allow(dead_code)]
1232pub fn handle_truncate_block_file(broker: &wps::Broker, block_id: &str, filename: &str) {
1233 let event_data = wps::WSFileEventData {
1234 zoneid: block_id.to_string(),
1235 filename: filename.to_string(),
1236 fileop: wps::FILE_OP_TRUNCATE.to_string(),
1237 data64: String::new(),
1238 };
1239
1240 let event = wps::WaveEvent {
1241 event: wps::EVENT_BLOCK_FILE.to_string(),
1242 scopes: vec![format!("block:{block_id}")],
1243 sender: String::new(),
1244 persist: 0,
1245 data: serde_json::to_value(&event_data).ok(),
1246 };
1247
1248 broker.publish(event);
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use super::*;
1254 use crate::backend::shellexec::MockConn;
1255 use std::sync::Arc;
1256
1257 fn make_shell_meta() -> MetaMapType {
1258 let mut meta = MetaMapType::new();
1259 meta.insert(
1260 "controller".to_string(),
1261 serde_json::Value::String("shell".to_string()),
1262 );
1263 meta
1264 }
1265
1266 fn make_cmd_meta(cmd: &str) -> MetaMapType {
1267 let mut meta = MetaMapType::new();
1268 meta.insert(
1269 "controller".to_string(),
1270 serde_json::Value::String("cmd".to_string()),
1271 );
1272 meta.insert(
1273 "cmd".to_string(),
1274 serde_json::Value::String(cmd.to_string()),
1275 );
1276 meta
1277 }
1278
1279 #[test]
1280 fn test_shell_controller_new() {
1281 let ctrl = ShellController::new(
1282 "shell".to_string(),
1283 "tab-1".to_string(),
1284 "block-1".to_string(),
1285 None,
1286 None,
1287 None,
1288 );
1289 assert_eq!(ctrl.controller_type(), "shell");
1290 assert_eq!(ctrl.block_id(), "block-1");
1291
1292 let status = ctrl.get_runtime_status();
1293 assert_eq!(status.shellprocstatus, STATUS_INIT);
1294 assert_eq!(status.blockid, "block-1");
1295 assert_eq!(status.version, 0);
1296 }
1297
1298 #[test]
1299 fn test_shell_controller_start_stop() {
1300 let ctrl = ShellController::new(
1301 "shell".to_string(),
1302 "tab-1".to_string(),
1303 "block-1".to_string(),
1304 None,
1305 None,
1306 None,
1307 );
1308
1309 ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1311 Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1312 }));
1313
1314 let meta = make_shell_meta();
1315 let result = ctrl.start(meta, None, false);
1316 assert!(result.is_ok());
1317
1318 let status = ctrl.get_runtime_status();
1320 assert_eq!(status.shellprocstatus, STATUS_DONE);
1321
1322 let result = ctrl.stop(true, STATUS_DONE);
1324 assert!(result.is_ok());
1325 }
1326
1327 #[test]
1328 fn test_shell_controller_run_on_start_false() {
1329 let ctrl = ShellController::new(
1330 "shell".to_string(),
1331 "tab-1".to_string(),
1332 "block-1".to_string(),
1333 None,
1334 None,
1335 None,
1336 );
1337
1338 let mut meta = make_shell_meta();
1339 meta.insert(
1340 META_KEY_CMD_RUN_ON_START.to_string(),
1341 serde_json::Value::Bool(false),
1342 );
1343
1344 let result = ctrl.start(meta, None, false);
1345 assert!(result.is_ok());
1346
1347 let status = ctrl.get_runtime_status();
1349 assert_eq!(status.shellprocstatus, STATUS_INIT);
1350 }
1351
1352 #[test]
1353 fn test_shell_controller_force_start() {
1354 let ctrl = ShellController::new(
1355 "shell".to_string(),
1356 "tab-1".to_string(),
1357 "block-1".to_string(),
1358 None,
1359 None,
1360 None,
1361 );
1362
1363 ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1364 Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1365 }));
1366
1367 let mut meta = make_shell_meta();
1368 meta.insert(
1369 META_KEY_CMD_RUN_ON_START.to_string(),
1370 serde_json::Value::Bool(false),
1371 );
1372
1373 let result = ctrl.start(meta, None, true);
1375 assert!(result.is_ok());
1376
1377 let status = ctrl.get_runtime_status();
1378 assert_eq!(status.shellprocstatus, STATUS_DONE);
1380 }
1381
1382 #[test]
1383 fn test_shell_controller_with_conn_factory() {
1384 let ctrl = ShellController::new(
1385 "cmd".to_string(),
1386 "tab-1".to_string(),
1387 "block-1".to_string(),
1388 None,
1389 None,
1390 None,
1391 );
1392
1393 ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1395 Ok(Box::new(MockConn::new(42)) as Box<dyn ConnInterface>)
1396 }));
1397
1398 let meta = make_cmd_meta("echo hello");
1399 let result = ctrl.start(meta, None, true);
1400 assert!(result.is_ok());
1401
1402 let status = ctrl.get_runtime_status();
1403 assert_eq!(status.shellprocstatus, STATUS_DONE);
1404 assert_eq!(status.shellprocexitcode, 42);
1405 }
1406
1407 #[test]
1408 fn test_shell_controller_conn_factory_error() {
1409 let ctrl = ShellController::new(
1410 "shell".to_string(),
1411 "tab-1".to_string(),
1412 "block-1".to_string(),
1413 None,
1414 None,
1415 None,
1416 );
1417
1418 ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1419 Err("connection refused".to_string())
1420 }));
1421
1422 let meta = make_shell_meta();
1423 let result = ctrl.start(meta, None, true);
1424 assert!(result.is_err());
1425 assert!(result.unwrap_err().contains("connection refused"));
1426
1427 let status = ctrl.get_runtime_status();
1428 assert_eq!(status.shellprocstatus, STATUS_DONE);
1429 assert_eq!(status.shellprocexitcode, -1);
1430 }
1431
1432 #[test]
1433 fn test_shell_controller_send_input_not_running() {
1434 let ctrl = ShellController::new(
1435 "shell".to_string(),
1436 "tab-1".to_string(),
1437 "block-1".to_string(),
1438 None,
1439 None,
1440 None,
1441 );
1442
1443 let result = ctrl.send_input(BlockInputUnion::data(b"hello".to_vec()), None);
1444 assert!(result.is_err());
1445 assert!(result.unwrap_err().contains("not running"));
1446 }
1447
1448 #[test]
1449 fn test_shell_controller_status_version_increments() {
1450 let ctrl = ShellController::new(
1451 "shell".to_string(),
1452 "tab-1".to_string(),
1453 "block-1".to_string(),
1454 None,
1455 None,
1456 None,
1457 );
1458
1459 ctrl.set_conn_factory(Box::new(|_conn_name, _meta| {
1460 Ok(Box::new(MockConn::new(0)) as Box<dyn ConnInterface>)
1461 }));
1462
1463 let v0 = ctrl.get_runtime_status().version;
1464
1465 let meta = make_shell_meta();
1466 ctrl.start(meta, None, true).unwrap();
1467
1468 let v_after = ctrl.get_runtime_status().version;
1469 assert!(v_after > v0);
1471 }
1472
1473 #[test]
1474 fn test_controller_trait_as_arc() {
1475 let ctrl: Arc<dyn Controller> = Arc::new(ShellController::new(
1476 "shell".to_string(),
1477 "tab-1".to_string(),
1478 "block-1".to_string(),
1479 None,
1480 None,
1481 None,
1482 ));
1483
1484 assert_eq!(ctrl.controller_type(), "shell");
1485 assert_eq!(ctrl.block_id(), "block-1");
1486 let status = ctrl.get_runtime_status();
1487 assert_eq!(status.shellprocstatus, STATUS_INIT);
1488 }
1489
1490 #[test]
1491 fn test_meta_helpers() {
1492 let mut meta = MetaMapType::new();
1493 assert!(ShellController::should_run_on_start(&meta)); assert!(!ShellController::should_run_once(&meta)); assert!(!ShellController::should_clear_on_start(&meta)); assert!(!ShellController::should_close_on_exit(&meta)); meta.insert(
1499 META_KEY_CMD_RUN_ON_START.to_string(),
1500 serde_json::Value::Bool(false),
1501 );
1502 assert!(!ShellController::should_run_on_start(&meta));
1503
1504 meta.insert(
1505 META_KEY_CMD_RUN_ONCE.to_string(),
1506 serde_json::Value::Bool(true),
1507 );
1508 assert!(ShellController::should_run_once(&meta));
1509
1510 meta.insert(
1511 META_KEY_CMD_CLEAR_ON_START.to_string(),
1512 serde_json::Value::Bool(true),
1513 );
1514 assert!(ShellController::should_clear_on_start(&meta));
1515 }
1516
1517 #[test]
1518 fn test_close_on_exit_delay() {
1519 let mut meta = MetaMapType::new();
1520 assert_eq!(ShellController::close_on_exit_delay_ms(&meta), 2000); meta.insert(
1523 META_KEY_CMD_CLOSE_ON_EXIT_DELAY.to_string(),
1524 serde_json::json!(5000),
1525 );
1526 assert_eq!(ShellController::close_on_exit_delay_ms(&meta), 5000);
1527 }
1528
1529 #[test]
1530 fn test_conn_name_from_meta() {
1531 let mut meta = MetaMapType::new();
1532 assert_eq!(ShellController::get_conn_name(&meta), "local"); meta.insert(
1535 META_KEY_CONNECTION.to_string(),
1536 serde_json::Value::String("user@host".to_string()),
1537 );
1538 assert_eq!(ShellController::get_conn_name(&meta), "user@host");
1539 }
1540
1541 #[test]
1542 fn test_handle_append_block_file() {
1543 let broker = wps::Broker::new();
1544
1545 broker.subscribe(
1547 "test-route",
1548 wps::SubscriptionRequest {
1549 event: wps::EVENT_BLOCK_FILE.to_string(),
1550 scopes: vec!["block:block-1".to_string()],
1551 allscopes: false,
1552 },
1553 );
1554
1555 handle_append_block_file(&broker, "block-1", "term", b"hello world", None);
1556
1557 let _history = broker.read_event_history(wps::EVENT_BLOCK_FILE, "block:block-1", 10);
1559 }
1562
1563 #[test]
1564 fn test_handle_truncate_block_file() {
1565 let broker = wps::Broker::new();
1566 handle_truncate_block_file(&broker, "block-1", "term");
1568 }
1569
1570 #[test]
1571 fn test_register_and_get_controller() {
1572 let ctrl: Arc<dyn Controller> = Arc::new(ShellController::new(
1573 "shell".to_string(),
1574 "tab-1".to_string(),
1575 "test-register-block".to_string(),
1576 None,
1577 None,
1578 None,
1579 ));
1580
1581 super::super::register_controller("test-register-block", ctrl.clone());
1582
1583 let retrieved = super::super::get_controller("test-register-block");
1584 assert!(retrieved.is_some());
1585 assert_eq!(retrieved.unwrap().block_id(), "test-register-block");
1586
1587 super::super::delete_controller("test-register-block");
1589 assert!(super::super::get_controller("test-register-block").is_none());
1590 }
1591
1592 #[test]
1593 fn test_resync_creates_shell_controller() {
1594 use crate::backend::obj::Block;
1595
1596 let mut meta = MetaMapType::new();
1597 meta.insert(
1598 "controller".to_string(),
1599 serde_json::Value::String("shell".to_string()),
1600 );
1601 meta.insert(
1603 META_KEY_CMD_RUN_ON_START.to_string(),
1604 serde_json::Value::Bool(false),
1605 );
1606
1607 let block = Block {
1608 oid: "resync-test-block".to_string(),
1609 version: 1,
1610 meta,
1611 ..Default::default()
1612 };
1613
1614 let result = super::super::resync_controller(&block, "tab-1", None, false, None, None, None, None);
1615 assert!(result.is_ok());
1616
1617 let ctrl = super::super::get_controller("resync-test-block");
1618 assert!(ctrl.is_some());
1619 assert_eq!(ctrl.unwrap().controller_type(), "shell");
1620
1621 super::super::delete_controller("resync-test-block");
1623 }
1624
1625 #[test]
1628 fn test_handle_append_block_file_writes_to_filestore() {
1629 use crate::backend::storage::filestore::FileStore;
1630 use std::sync::Arc;
1631
1632 let broker = wps::Broker::new();
1633 let fs = Arc::new(FileStore::open_in_memory().expect("open in-memory filestore"));
1634
1635 let block_id = "test-block-fs";
1636 let filename = "output";
1637
1638 let line1 = b"line one\n";
1640 handle_append_block_file(&broker, block_id, filename, line1, Some(&fs));
1641
1642 let line2 = b"line two\n";
1644 handle_append_block_file(&broker, block_id, filename, line2, Some(&fs));
1645
1646 let data = fs.read_file(block_id, filename)
1648 .expect("read_file ok")
1649 .expect("data present");
1650
1651 let text = String::from_utf8(data).expect("valid utf8");
1652 assert!(text.contains("line one"), "expected 'line one' in {:?}", text);
1653 assert!(text.contains("line two"), "expected 'line two' in {:?}", text);
1654
1655 let stat = fs.stat(block_id, filename).unwrap().unwrap();
1657 assert_eq!(stat.size, (line1.len() + line2.len()) as i64);
1658
1659 broker.subscribe(
1661 "test-route-fs",
1662 wps::SubscriptionRequest {
1663 event: wps::EVENT_BLOCK_FILE.to_string(),
1664 scopes: vec![format!("block:{}", block_id)],
1665 allscopes: false,
1666 },
1667 );
1668 handle_append_block_file(&broker, block_id, filename, b"line three\n", Some(&fs));
1670 let stat_after = fs.stat(block_id, filename).unwrap().unwrap();
1671 assert_eq!(stat_after.size, (line1.len() + line2.len() + b"line three\n".len()) as i64);
1672 }
1673
1674 use crate::agents::translator::claude::ClaudeTranslator;
1679 use crate::agents::types::AgentEvent;
1680
1681 #[test]
1682 fn extract_agent_events_full_line_translates() {
1683 let mut t = ClaudeTranslator::new();
1684 let mut buf: Vec<u8> = Vec::new();
1685 let line =
1686 br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"hello"}}}
1687"#;
1688 let events = extract_agent_events(&mut buf, line, &mut t);
1689 assert_eq!(events.len(), 1);
1690 match &events[0] {
1691 AgentEvent::AssistantText { delta } => assert_eq!(delta, "hello"),
1692 other => panic!("expected AssistantText, got {other:?}"),
1693 }
1694 assert!(buf.is_empty());
1696 }
1697
1698 #[test]
1699 fn extract_agent_events_chunked_line_accumulates() {
1700 let mut t = ClaudeTranslator::new();
1704 let mut buf: Vec<u8> = Vec::new();
1705 let events = extract_agent_events(
1707 &mut buf,
1708 br#"{"type":"stream_event","event":{"type":"content_"#,
1709 &mut t,
1710 );
1711 assert!(events.is_empty());
1712 let events = extract_agent_events(
1714 &mut buf,
1715 br#"block_delta","delta":{"type":"text_delta","text":"hi"}}}
1716"#,
1717 &mut t,
1718 );
1719 assert_eq!(events.len(), 1);
1720 match &events[0] {
1721 AgentEvent::AssistantText { delta } => assert_eq!(delta, "hi"),
1722 other => panic!("expected AssistantText, got {other:?}"),
1723 }
1724 }
1725
1726 #[test]
1727 fn extract_agent_events_drops_non_json_lines() {
1728 let mut t = ClaudeTranslator::new();
1731 let mut buf: Vec<u8> = Vec::new();
1732 let pty_text = b"\x1b[2K\x1b[0;0H> some prompt\n[m\nplain text\n";
1733 let events = extract_agent_events(&mut buf, pty_text, &mut t);
1734 assert!(events.is_empty(), "got unexpected events: {events:?}");
1735 assert!(buf.is_empty());
1738 }
1739
1740 #[test]
1741 fn extract_agent_events_drops_carriage_returns() {
1742 let mut t = ClaudeTranslator::new();
1745 let mut buf: Vec<u8> = Vec::new();
1746 let line = br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"crlf"}}}
1747"#;
1748 let mut bytes: Vec<u8> = line.to_vec();
1750 let last = bytes.len() - 1;
1751 bytes.insert(last, b'\r');
1752 let events = extract_agent_events(&mut buf, &bytes, &mut t);
1753 assert_eq!(events.len(), 1);
1754 }
1755
1756 #[test]
1757 fn extract_agent_events_drops_malformed_json() {
1758 let mut t = ClaudeTranslator::new();
1761 let mut buf: Vec<u8> = Vec::new();
1762 let events = extract_agent_events(&mut buf, b"{not_valid_json\n", &mut t);
1763 assert!(events.is_empty());
1764 }
1765
1766 #[test]
1767 fn extract_agent_events_resets_oversized_buffer() {
1768 let mut t = ClaudeTranslator::new();
1771 let mut buf: Vec<u8> = Vec::new();
1772 let chunk = vec![b'x'; AGENT_LINE_BUFFER_CAP + 1];
1773 let events = extract_agent_events(&mut buf, &chunk, &mut t);
1774 assert!(events.is_empty());
1775 assert!(
1776 buf.is_empty(),
1777 "buffer should reset past cap, was {} bytes",
1778 buf.len()
1779 );
1780 }
1781
1782 #[test]
1783 fn extract_agent_events_preserves_utf8_across_read_boundary() {
1784 let mut t = ClaudeTranslator::new();
1793 let mut buf: Vec<u8> = Vec::new();
1794 let frame = r#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"こんにちは"}}}
1795"#;
1796 let frame_bytes = frame.as_bytes();
1797 let split = frame_bytes
1802 .iter()
1803 .position(|&b| b == 0xe3)
1804 .expect("expected to find a multi-byte codepoint")
1805 + 1; let (a, b) = frame_bytes.split_at(split);
1807 let events = extract_agent_events(&mut buf, a, &mut t);
1808 assert!(events.is_empty());
1810 let events = extract_agent_events(&mut buf, b, &mut t);
1811 assert_eq!(events.len(), 1);
1812 match &events[0] {
1813 AgentEvent::AssistantText { delta } => {
1814 assert_eq!(
1815 delta, "こんにちは",
1816 "UTF-8 must round-trip cleanly across read boundary; got {delta:?}"
1817 );
1818 assert!(!delta.contains('\u{FFFD}'), "no replacement chars");
1819 }
1820 other => panic!("expected AssistantText, got {other:?}"),
1821 }
1822 }
1823
1824 #[test]
1825 fn extract_agent_events_two_lines_one_chunk() {
1826 let mut t = ClaudeTranslator::new();
1829 let mut buf: Vec<u8> = Vec::new();
1830 let two_lines = br#"{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"a"}}}
1831{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"b"}}}
1832"#;
1833 let events = extract_agent_events(&mut buf, two_lines, &mut t);
1834 assert_eq!(events.len(), 2);
1835 }
1836}