agentmux_srv\backend\blockcontroller/
persistent.rs1use std::collections::HashMap;
24use std::sync::{Arc, Mutex};
25
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::mpsc;
28
29use super::{
30 BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
31 STATUS_RUNNING,
32};
33use super::health::{classify_output_line, HealthMonitor};
34use crate::backend::eventbus::EventBus;
35use crate::backend::storage::filestore::FileStore;
36use crate::backend::storage::wstore::WaveStore;
37use crate::backend::wps;
38
39pub const PERSISTENT_OUTPUT_SUBJECT: &str = "output";
41
42pub const BLOCK_CONTROLLER_PERSISTENT: &str = "persistent";
44
45#[derive(Debug, Clone)]
47pub struct PersistentSpawnConfig {
48 pub cli_command: String,
49 pub cli_args: Vec<String>,
50 pub working_dir: String,
51 pub env_vars: HashMap<String, String>,
52 pub session_id_field: String,
53}
54
55struct PersistentInner {
57 proc_status: String,
58 proc_exit_code: i32,
59 status_version: i32,
60 session_id: Option<String>,
61 current_pid: Option<u32>,
62 stdin_tx: Option<mpsc::Sender<String>>,
64 kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
66}
67
68pub struct PersistentSubprocessController {
71 #[allow(dead_code)]
72 tab_id: String,
73 block_id: String,
74 inner: Arc<Mutex<PersistentInner>>,
75 broker: Option<Arc<wps::Broker>>,
76 event_bus: Option<Arc<EventBus>>,
77 wstore: Option<Arc<WaveStore>>,
78 filestore: Option<Arc<FileStore>>,
80 health_monitor: Arc<HealthMonitor>,
81}
82
83impl PersistentSubprocessController {
84 pub fn new(
85 tab_id: String,
86 block_id: String,
87 broker: Option<Arc<wps::Broker>>,
88 event_bus: Option<Arc<EventBus>>,
89 wstore: Option<Arc<WaveStore>>,
90 filestore: Option<Arc<FileStore>>,
91 ) -> Self {
92 let health_monitor = Arc::new(HealthMonitor::new(
93 block_id.clone(),
94 broker.clone(),
95 ));
96 Self {
97 tab_id,
98 block_id,
99 inner: Arc::new(Mutex::new(PersistentInner {
100 proc_status: STATUS_INIT.to_string(),
101 proc_exit_code: 0,
102 status_version: 0,
103 session_id: None,
104 current_pid: None,
105 stdin_tx: None,
106 kill_tx: None,
107 })),
108 broker,
109 event_bus,
110 wstore,
111 filestore,
112 health_monitor,
113 }
114 }
115
116 fn set_status(inner: &mut PersistentInner, status: &str) {
117 inner.proc_status = status.to_string();
118 inner.status_version += 1;
119 }
120
121 fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
122 let inner = self.inner.lock().unwrap();
123 BlockControllerRuntimeStatus {
124 blockid: self.block_id.clone(),
125 version: inner.status_version,
126 shellprocstatus: inner.proc_status.clone(),
127 shellprocconnname: "local".to_string(),
128 shellprocexitcode: inner.proc_exit_code,
129 spawn_ts_ms: None,
130 is_agent_pane: true,
131 }
132 }
133
134 fn publish_status(&self) {
135 if let Some(ref broker) = self.broker {
136 let status = self.get_status_snapshot();
137 super::publish_controller_status(broker, &status);
138 }
139 }
140
141 fn is_running(&self) -> bool {
142 let inner = self.inner.lock().unwrap();
143 inner.stdin_tx.is_some()
144 }
145
146 pub fn send_message(&self, message: String, config: PersistentSpawnConfig) -> Result<(), String> {
149 if !self.is_running() {
151 self.spawn_process(config)?;
152 }
153
154 let json_msg = serde_json::json!({
156 "type": "user",
157 "message": {
158 "role": "user",
159 "content": message
160 }
161 });
162
163 let inner = self.inner.lock().unwrap();
164 let tx = inner.stdin_tx.as_ref()
165 .ok_or("persistent process not running after spawn")?;
166 tx.try_send(json_msg.to_string())
167 .map_err(|e| format!("stdin send failed: {e}"))
168 }
169
170 fn spawn_process(&self, config: PersistentSpawnConfig) -> Result<(), String> {
172 let mut cmd = crate::server::cli_handlers::make_cli_cmd(&config.cli_command);
174 cmd.args(&config.cli_args);
175
176 if !config.working_dir.is_empty() {
178 let expanded_dir = if config.working_dir.starts_with("~/") || config.working_dir == "~" {
179 if let Some(home) = dirs::home_dir() {
180 home.join(config.working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
181 } else {
182 config.working_dir.clone()
183 }
184 } else {
185 config.working_dir.clone()
186 };
187 let dir_path = std::path::Path::new(&expanded_dir);
188 if !dir_path.exists() {
189 if let Err(e) = std::fs::create_dir_all(dir_path) {
190 tracing::warn!(
191 block_id = %self.block_id,
192 dir = %expanded_dir,
193 error = %e,
194 "failed to create working directory"
195 );
196 }
197 }
198 if dir_path.exists() {
199 cmd.current_dir(&expanded_dir);
200
201 let looks_like_agent_workspace = expanded_dir.contains("/.agentmux/agents/")
212 || expanded_dir.contains("\\.agentmux\\agents\\");
213 if looks_like_agent_workspace {
214 let git_dir = dir_path.join(".git");
215 if git_dir.exists() {
216 tracing::warn!(
217 block_id = %self.block_id,
218 cwd = %expanded_dir,
219 ".git detected inside agent workspace — this is \
220 usually an unintended nested clone and can waste \
221 gigabytes of disk. Clean up with: rm -rf {}/.git",
222 expanded_dir
223 );
224 }
225 }
226 }
227 }
228
229 for (k, v) in &config.env_vars {
231 let expanded = crate::backend::base::expand_home_dir_safe(v);
232 cmd.env(k, expanded.to_string_lossy().as_ref());
233 }
234
235 cmd.stdin(std::process::Stdio::piped());
236 cmd.stdout(std::process::Stdio::piped());
237 cmd.stderr(std::process::Stdio::piped());
238
239 let mut child = cmd.spawn().map_err(|e| {
240 tracing::error!(block_id = %self.block_id, error = %e, "persistent process spawn failed");
241 format!("failed to spawn persistent process: {e}")
242 })?;
243
244 let pid = child.id().unwrap_or(0);
245
246 self.health_monitor.set_active_turn(true);
250
251 tracing::info!(
252 block_id = %self.block_id,
253 pid = pid,
254 cmd = %config.cli_command,
255 args = ?config.cli_args,
256 working_dir = %config.working_dir,
257 "persistent process spawned"
258 );
259
260 if pid != 0 {
264 if let Some(registry) = crate::backend::process_tracker::registry::global() {
265 let tracker = registry.ensure_tracker(&self.block_id);
266 if let Err(e) = tracker.assign_process(pid) {
267 tracing::warn!(
268 block_id = %self.block_id,
269 pid = pid,
270 err = %e,
271 "[process-tracker] assign_process failed"
272 );
273 }
274 }
275 }
276
277 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
278 let stdin = child.stdin.take().unwrap();
279 let stdout = child.stdout.take().unwrap();
280 let stderr = child.stderr.take();
281
282 if let Some(stderr_pipe) = stderr {
284 let block_id_stderr = self.block_id.clone();
285 tokio::spawn(async move {
286 let mut reader = BufReader::new(stderr_pipe).lines();
287 while let Ok(Some(line)) = reader.next_line().await {
288 tracing::warn!(
289 block_id = %block_id_stderr,
290 line = %line,
291 "persistent stderr"
292 );
293 }
294 });
295 }
296
297 let (msg_tx, mut msg_rx) = mpsc::channel::<String>(32);
299
300 {
301 let mut inner = self.inner.lock().unwrap();
302 inner.current_pid = Some(pid);
303 inner.kill_tx = Some(kill_tx);
304 inner.stdin_tx = Some(msg_tx);
305 Self::set_status(&mut inner, STATUS_RUNNING);
306 }
307 self.publish_status();
308
309 if let Some(ref wstore) = self.wstore {
313 super::session_recovery::mark_active_pid(wstore, &self.block_id, pid);
314 }
315
316 tokio::spawn(async move {
318 let mut stdin = stdin;
319 while let Some(msg) = msg_rx.recv().await {
320 if let Err(e) = stdin.write_all(msg.as_bytes()).await {
321 tracing::warn!("persistent stdin write error: {}", e);
322 break;
323 }
324 if let Err(e) = stdin.write_all(b"\n").await {
325 tracing::warn!("persistent stdin newline error: {}", e);
326 break;
327 }
328 if let Err(e) = stdin.flush().await {
329 tracing::warn!("persistent stdin flush error: {}", e);
330 break;
331 }
332 }
333 drop(stdin);
335 });
336
337 let block_id_read = self.block_id.clone();
339 let broker_read = self.broker.clone();
340 let inner_read = Arc::clone(&self.inner);
341 let wstore_read = self.wstore.clone();
342 let event_bus_read = self.event_bus.clone();
343 let filestore_read = self.filestore.clone();
344 let health_read = Arc::clone(&self.health_monitor);
345 let session_id_field = config.session_id_field.clone();
346
347 tokio::spawn(async move {
348 let reader = BufReader::new(stdout);
349 let mut lines = reader.lines();
350 let mut stats = super::session_stats::SessionStatsAccumulator::new(block_id_read.clone());
351
352 while let Ok(Some(line)) = lines.next_line().await {
353 if line.trim().is_empty() {
354 continue;
355 }
356
357 stats.record_line(line.len(), &wstore_read);
359
360 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&line) {
362 let (meaningful, _error) = classify_output_line(&parsed);
363 health_read.record_output(meaningful);
364 if let Some(sid) = parsed.get(&session_id_field).and_then(|v| v.as_str()) {
365 let sid_string = sid.to_string();
366 let already_captured = inner_read.lock().unwrap().session_id.is_some();
367 if !already_captured {
368 tracing::info!(
369 block_id = %block_id_read,
370 session_id = %sid_string,
371 "persistent session ID captured"
372 );
373 {
374 let mut inner = inner_read.lock().unwrap();
375 inner.session_id = Some(sid_string.clone());
376 }
377 if let Some(ref store) = wstore_read {
379 let oref_str = format!("block:{}", block_id_read);
380 let mut meta_update =
381 crate::backend::obj::MetaMapType::new();
382 meta_update.insert(
383 "agent:sessionid".to_string(),
384 serde_json::Value::String(sid_string),
385 );
386 if let Err(e) = crate::server::service::update_object_meta(
387 store, &oref_str, &meta_update,
388 ) {
389 tracing::warn!(
390 block_id = %block_id_read,
391 error = %e,
392 "failed to persist agent:sessionid"
393 );
394 } else if let Some(ref event_bus) = event_bus_read {
395 if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&block_id_read) {
396 let update_data = serde_json::to_value(
397 &crate::backend::obj::WaveObjUpdate {
398 updatetype: "update".into(),
399 otype: "block".into(),
400 oid: block_id_read.clone(),
401 obj: Some(crate::backend::obj::wave_obj_to_value(&updated_block)),
402 },
403 )
404 .ok();
405 event_bus.broadcast_event(
406 &crate::backend::eventbus::WSEventType {
407 eventtype: "waveobj:update".to_string(),
408 oref: oref_str,
409 data: update_data,
410 },
411 );
412 }
413 }
414 }
415 }
416 }
417 }
418
419 tracing::info!(
422 block_id = %block_id_read,
423 line_len = line.len(),
424 "persistent stdout → blockfile"
425 );
426 let line_with_newline = format!("{}\n", line);
427 if let Some(ref broker) = broker_read {
428 super::shell::handle_append_block_file(
429 broker,
430 &block_id_read,
431 PERSISTENT_OUTPUT_SUBJECT,
432 line_with_newline.as_bytes(),
433 filestore_read.as_ref(),
434 );
435 } else {
436 tracing::warn!(block_id = %block_id_read, "persistent stdout: no broker available");
437 }
438 }
439
440 tracing::info!(block_id = %block_id_read, "persistent stdout reader finished");
441 });
442
443 let health_watchdog = Arc::clone(&self.health_monitor);
448 tokio::spawn(async move {
449 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
450 loop {
451 interval.tick().await;
452 if !health_watchdog.is_active_turn() {
453 break;
454 }
455 health_watchdog.check();
456 }
457 });
458
459 let block_id_wait = self.block_id.clone();
461 let inner_wait = Arc::clone(&self.inner);
462 let broker_wait = self.broker.clone();
463 let wstore_wait = self.wstore.clone();
464 let health_wait = Arc::clone(&self.health_monitor);
465
466 tokio::spawn(async move {
467 tokio::select! {
468 status = child.wait() => {
469 let exit_code = status.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1);
470 tracing::info!(
471 block_id = %block_id_wait,
472 exit_code = exit_code,
473 "persistent process exited"
474 );
475
476 health_wait.set_exited(exit_code);
478
479 let mut inner = inner_wait.lock().unwrap();
480 inner.proc_exit_code = exit_code;
481 inner.current_pid = None;
482 inner.stdin_tx = None;
483 inner.kill_tx = None;
484 Self::set_status(&mut inner, STATUS_DONE);
485 drop(inner);
486
487 if let Some(ref wstore) = wstore_wait {
489 super::session_recovery::clear_active_pid(wstore, &block_id_wait);
490 }
491
492 if let Some(ref broker) = broker_wait {
494 let status = BlockControllerRuntimeStatus {
495 blockid: block_id_wait.clone(),
496 version: 0,
497 shellprocstatus: STATUS_DONE.to_string(),
498 shellprocconnname: "local".to_string(),
499 shellprocexitcode: exit_code,
500 spawn_ts_ms: None,
501 is_agent_pane: true,
502 };
503 super::publish_controller_status(broker, &status);
504 }
505 }
506 Ok(force) = kill_rx => {
507 tracing::info!(
508 block_id = %block_id_wait,
509 force = force,
510 "persistent process kill requested"
511 );
512 if force {
513 let _ = child.kill().await;
514 } else {
515 {
517 let mut inner = inner_wait.lock().unwrap();
518 inner.stdin_tx = None; }
520 tokio::select! {
521 _ = child.wait() => {}
522 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
523 let _ = child.kill().await;
524 }
525 }
526 }
527
528 health_wait.set_exited(-1);
529
530 let mut inner = inner_wait.lock().unwrap();
531 inner.proc_exit_code = -1;
532 inner.current_pid = None;
533 inner.stdin_tx = None;
534 inner.kill_tx = None;
535 Self::set_status(&mut inner, STATUS_DONE);
536 drop(inner);
537
538 if let Some(ref wstore) = wstore_wait {
540 super::session_recovery::clear_active_pid(wstore, &block_id_wait);
541 }
542 }
543 }
544 });
545
546 Ok(())
547 }
548
549 pub fn stop_process(&self, force: bool) -> Result<(), String> {
550 let kill_tx = {
551 let mut inner = self.inner.lock().unwrap();
552 inner.kill_tx.take()
553 };
554 match kill_tx {
555 Some(tx) => {
556 let _ = tx.send(force);
557 Ok(())
558 }
559 None => Ok(()),
560 }
561 }
562
563 pub fn session_id(&self) -> Option<String> {
564 self.inner.lock().unwrap().session_id.clone()
565 }
566}
567
568impl Controller for PersistentSubprocessController {
569 fn start(
570 &self,
571 _block_meta: super::super::obj::MetaMapType,
572 _rt_opts: Option<serde_json::Value>,
573 _force: bool,
574 ) -> Result<(), String> {
575 tracing::info!(
576 block_id = %self.block_id,
577 "persistent controller registered (spawns on first message)"
578 );
579 Ok(())
580 }
581
582 fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
583 self.stop_process(true)?;
584 let mut inner = self.inner.lock().unwrap();
585 if inner.proc_status != new_status {
586 Self::set_status(&mut inner, new_status);
587 }
588 Ok(())
589 }
590
591 fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
592 self.get_status_snapshot()
593 }
594
595 fn send_input(&self, _input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
596 Err("persistent controller does not accept raw input; use send_message()".to_string())
597 }
598
599 fn controller_type(&self) -> &str {
600 BLOCK_CONTROLLER_PERSISTENT
601 }
602
603 fn block_id(&self) -> &str {
604 &self.block_id
605 }
606
607 fn as_any(&self) -> &dyn std::any::Any {
608 self
609 }
610}