agentmux_srv\backend\blockcontroller/subprocess.rs
1// Copyright 2025, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! SubprocessController: manages agent CLI as stateless per-turn subprocess invocations.
5//!
6//! Architecture:
7//! Each user message spawns a fresh `claude -p` process.
8//! Multi-turn continuity uses `--resume <session-id>`.
9//! The process reads one JSON message from stdin, runs the agentic loop,
10//! streams NDJSON on stdout, then exits.
11//!
12//! State machine:
13//! INIT ─(spawn)─> RUNNING ─(process exits)─> DONE
14//! DONE ─(new message)─> RUNNING (re-spawn with --resume)
15//!
16//! I/O model (2 async tasks per turn):
17//! 1. stdout_reader: piped stdout → .jsonl persistence + WPS blockfile events on "output" subject
18//! 2. process_waiter: wait for exit, update status, publish lifecycle event
19
20
21use std::collections::{HashMap, VecDeque};
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
26
27
28use super::{
29 BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
30 STATUS_RUNNING,
31};
32use super::health::{classify_output_line, HealthMonitor};
33use crate::backend::eventbus::EventBus;
34use crate::backend::storage::filestore::FileStore;
35use crate::backend::storage::wstore::WaveStore;
36use crate::backend::wps;
37
38/// WPS file subject name for subprocess output (replaces "term" from PTY).
39pub const SUBPROCESS_OUTPUT_SUBJECT: &str = "output";
40
41/// Controller type constant.
42pub const BLOCK_CONTROLLER_SUBPROCESS: &str = "subprocess";
43
44/// Configuration for spawning a subprocess turn.
45#[derive(Debug, Clone)]
46pub struct SubprocessSpawnConfig {
47 /// CLI executable (e.g., "claude").
48 pub cli_command: String,
49 /// CLI arguments (e.g., ["-p", "--output-format", "stream-json", ...]).
50 pub cli_args: Vec<String>,
51 /// Working directory for the subprocess.
52 pub working_dir: String,
53 /// Environment variables to set.
54 pub env_vars: HashMap<String, String>,
55 /// The user's JSON message to write to stdin.
56 pub message: String,
57 /// Flag used to resume a previous session, e.g. "--resume" (Claude), "-r" (Gemini).
58 /// Empty string means this provider does not support simple-flag resume.
59 pub resume_flag: String,
60 /// JSON field name in the CLI's init event that contains the session/thread ID.
61 /// e.g. "session_id" (Claude/Gemini) or "thread_id" (Codex).
62 pub session_id_field: String,
63 /// Optional client-supplied message id. Echoed back via the
64 /// `agent-message-accepted` event when this config transitions from
65 /// queued → running so the frontend can pair the event with a
66 /// pending `PendingMessage`. None means no feedback is emitted.
67 pub message_id: Option<String>,
68 /// Session id to hydrate `inner.session_id` with BEFORE the first
69 /// turn — used by the picker "My Agents" reattach path. The
70 /// caller reads this from `block.meta["agent:sessionid"]` which
71 /// the frontend pre-populates from the prior block's session id
72 /// when launching with `continueOfInstanceId`. Without this
73 /// hydration `spawn_turn` would only see the captured session id
74 /// AFTER the first turn — meaning the first turn always launches
75 /// the CLI fresh (no `--resume <sid>`) and starts a new
76 /// conversation that re-injects the startup context.
77 ///
78 /// Empty / `None` means "no prior session" (greenfield launch).
79 ///
80 /// Best-effort, not authoritative: if the hydrated id is stale,
81 /// the CLI's stdout-emitted session id always overwrites it at
82 /// capture time (see `spawn_turn`'s stdout-reader block). The
83 /// reattach turn passes the (possibly stale) hydrated id via
84 /// `--resume`; the CLI either accepts it or starts a new
85 /// session and emits its own id, which then becomes the in-
86 /// memory authority for every subsequent turn.
87 pub session_id: Option<String>,
88}
89
90/// Inner state protected by mutex.
91struct SubprocessControllerInner {
92 /// Current process status.
93 proc_status: String,
94 /// Process exit code from the most recent turn.
95 proc_exit_code: i32,
96 /// Status version counter (incremented on each change).
97 status_version: i32,
98 /// Session ID captured from the first `system/init` message.
99 session_id: Option<String>,
100 /// PID of the currently running subprocess (None if idle).
101 current_pid: Option<u32>,
102 /// Handle to kill the current subprocess.
103 kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
104 /// Messages queued while a turn is in progress.
105 /// Drained sequentially after the current turn exits.
106 pending_messages: VecDeque<SubprocessSpawnConfig>,
107}
108
109/// SubprocessController manages per-turn subprocess lifecycle for agent blocks.
110///
111/// Unlike `ShellController` which maintains a long-running PTY process,
112/// `SubprocessController` spawns a fresh process for each user turn.
113/// Multi-turn continuity comes from `--resume <session-id>`.
114pub struct SubprocessController {
115 /// Parent tab UUID.
116 #[allow(dead_code)]
117 tab_id: String,
118 /// Block UUID.
119 block_id: String,
120 /// Prevents concurrent spawns.
121 run_lock: Arc<AtomicBool>,
122 /// Protected inner state.
123 inner: Arc<Mutex<SubprocessControllerInner>>,
124 /// WPS broker for publishing events (blockfile, controllerstatus).
125 broker: Option<Arc<wps::Broker>>,
126 /// Event bus for obj:update broadcasts.
127 event_bus: Option<Arc<EventBus>>,
128 /// Wave object store for block metadata persistence.
129 wstore: Option<Arc<WaveStore>>,
130 /// FileStore for write-through persistence of output lines (Phase 1.3).
131 filestore: Option<Arc<FileStore>>,
132 /// Agent health monitor (output activity + error tracking).
133 health_monitor: Arc<HealthMonitor>,
134 /// Weak self-reference for queue drain. Set by `set_self_ref` after
135 /// the controller is wrapped in Arc.
136 self_ref: Mutex<Option<std::sync::Weak<Self>>>,
137}
138
139impl SubprocessController {
140 /// Create a new SubprocessController.
141 pub fn new(
142 tab_id: String,
143 block_id: String,
144 broker: Option<Arc<wps::Broker>>,
145 event_bus: Option<Arc<EventBus>>,
146 wstore: Option<Arc<WaveStore>>,
147 filestore: Option<Arc<FileStore>>,
148 ) -> Self {
149 let health_monitor = Arc::new(HealthMonitor::new(
150 block_id.clone(),
151 broker.clone(),
152 ));
153 Self {
154 tab_id,
155 block_id,
156 run_lock: Arc::new(AtomicBool::new(false)),
157 inner: Arc::new(Mutex::new(SubprocessControllerInner {
158 proc_status: STATUS_INIT.to_string(),
159 proc_exit_code: 0,
160 status_version: 0,
161 session_id: None,
162 current_pid: None,
163 kill_tx: None,
164 pending_messages: VecDeque::new(),
165 })),
166 broker,
167 event_bus,
168 wstore,
169 filestore,
170 health_monitor,
171 self_ref: Mutex::new(None),
172 }
173 }
174
175 /// Store a weak self-reference so the process_waiter can drain queued
176 /// messages by calling spawn_turn after the current turn exits.
177 /// Must be called after wrapping in Arc.
178 pub fn set_self_ref(self: &Arc<Self>) {
179 *self.self_ref.lock().unwrap() = Some(Arc::downgrade(self));
180 }
181
182 /// Try to acquire the run lock. Returns false if a turn is already in progress.
183 fn try_lock_run(&self) -> bool {
184 self.run_lock
185 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
186 .is_ok()
187 }
188
189 /// Release the run lock.
190 fn unlock_run(&self) {
191 self.run_lock.store(false, Ordering::SeqCst);
192 }
193
194 /// Update process status and increment version (must hold inner lock).
195 fn set_status(inner: &mut SubprocessControllerInner, status: &str) {
196 inner.proc_status = status.to_string();
197 inner.status_version += 1;
198 }
199
200 /// Get the runtime status (snapshot).
201 fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
202 let inner = self.inner.lock().unwrap();
203 BlockControllerRuntimeStatus {
204 blockid: self.block_id.clone(),
205 version: inner.status_version,
206 shellprocstatus: inner.proc_status.clone(),
207 shellprocconnname: "local".to_string(),
208 shellprocexitcode: inner.proc_exit_code,
209 spawn_ts_ms: None,
210 is_agent_pane: false,
211 }
212 }
213
214 /// Publish current controller status via the WPS broker.
215 fn publish_status(&self) {
216 if let Some(ref broker) = self.broker {
217 let status = self.get_status_snapshot();
218 super::publish_controller_status(broker, &status);
219 }
220 }
221
222 /// Emit `agent-message-accepted` for the given config, if it carries
223 /// a `message_id`. Called from both `spawn_turn` (direct path) and
224 /// the `process_waiter` drain site (queue path). No-op if the config
225 /// has no id, or the broker isn't configured.
226 fn emit_message_accepted(&self, config: &SubprocessSpawnConfig) {
227 let Some(id) = config.message_id.as_deref() else { return };
228 let Some(ref broker) = self.broker else { return };
229 let event = super::super::wps::WaveEvent {
230 event: super::super::wps::EVENT_AGENT_MESSAGE_ACCEPTED.to_string(),
231 scopes: vec![format!("block:{}", self.block_id)],
232 sender: String::new(),
233 persist: 0,
234 data: Some(serde_json::json!({
235 "block_id": self.block_id,
236 "message_id": id,
237 })),
238 };
239 broker.publish(event);
240 tracing::info!(
241 block_id = %self.block_id,
242 message_id = %id,
243 "emitted agent-message-accepted"
244 );
245 }
246
247 /// Get the stored session ID (if any).
248 #[allow(dead_code)]
249 pub fn session_id(&self) -> Option<String> {
250 self.inner.lock().unwrap().session_id.clone()
251 }
252
253 /// Record an authoritative session id captured from the CLI's
254 /// stdout init/`thread.started` event. The CLI is the source of
255 /// truth for which session is live, so this ALWAYS overwrites
256 /// any prior value of `inner.session_id` — including values
257 /// previously hydrated from config on a picker reattach (which
258 /// may be stale by the time the CLI speaks).
259 ///
260 /// Free-function form (taking `&Arc<Mutex<…Inner>>` instead of
261 /// `&self`) so the spawn_turn stdout-reader tokio task can call
262 /// it without holding an `Arc<Self>` reference. The
263 /// `&SubprocessController` method below just delegates.
264 ///
265 /// Returns `true` when the value changed (caller should
266 /// broadcast the meta update + persist to block meta). Returns
267 /// `false` when the new id matches the current one — common
268 /// when the CLI emits the same `session_id` on every NDJSON
269 /// frame within a single turn.
270 pub(crate) fn record_captured_session_id_inner(
271 inner: &Mutex<SubprocessControllerInner>,
272 sid: &str,
273 ) -> bool {
274 if sid.is_empty() {
275 return false;
276 }
277 let mut guard = inner.lock().unwrap();
278 let differs = guard.session_id.as_deref() != Some(sid);
279 if differs {
280 guard.session_id = Some(sid.to_string());
281 }
282 differs
283 }
284
285 /// `&self` convenience wrapper around
286 /// `record_captured_session_id_inner` — used by tests that
287 /// already hold a `SubprocessController`.
288 #[cfg(test)]
289 pub(crate) fn record_captured_session_id(&self, sid: &str) -> bool {
290 Self::record_captured_session_id_inner(&self.inner, sid)
291 }
292
293 /// Hydrate `inner.session_id` from a config-supplied id when the
294 /// controller hasn't seen a value yet.
295 ///
296 /// Picker reattach path: a fresh `SubprocessController` is
297 /// registered for the new block, so its `inner.session_id` is
298 /// `None`. The frontend persisted the prior block's session id
299 /// into `agent:sessionid` meta, the websocket / app_api caller
300 /// read it into `SubprocessSpawnConfig::session_id`, and this
301 /// method copies it to inner so the spawn_turn args-builder
302 /// appends `--resume <sid>` on the FIRST turn.
303 ///
304 /// **Hydration is best-effort, not authoritative.** If
305 /// `inner.session_id` is already `Some` we no-op (don't overwrite
306 /// a value already in place — could be a captured-from-stdout
307 /// id from an earlier turn, or a prior hydration on the same
308 /// reattach). Critically, the **CLI's stdout-emitted session id
309 /// is authoritative** and overwrites any prior value at capture
310 /// time (see the stdout-reader block in `spawn_turn`). So if the
311 /// hydrated value is stale, the FIRST turn passes the stale id
312 /// via `--resume` (likely accepted as a no-op or rejected with a
313 /// "no such session" error from the CLI), the CLI then emits its
314 /// own session id in the init event, and `inner.session_id` is
315 /// overwritten with that authoritative value for subsequent
316 /// turns. Without the capture overwrite, a stale hydrated id
317 /// would be re-used forever — that was the bug codex flagged on
318 /// PR #1018 first cut.
319 ///
320 /// Empty `&str` is treated as "no value" so the caller can use
321 /// it unconditionally without filtering.
322 pub(crate) fn hydrate_session_id_from_config(&self, config_sid: Option<&str>) {
323 let Some(sid) = config_sid.filter(|s| !s.is_empty()) else {
324 return;
325 };
326 let mut inner = self.inner.lock().unwrap();
327 if inner.session_id.is_some() {
328 return;
329 }
330 tracing::info!(
331 block_id = %self.block_id,
332 session_id = %sid,
333 "hydrated session_id from config (picker reattach)"
334 );
335 inner.session_id = Some(sid.to_string());
336 }
337
338 /// Spawn a single turn of the agent CLI.
339 ///
340 /// This is the core method — it spawns `claude -p`, writes the user message to stdin,
341 /// reads NDJSON from stdout (publishing WPS events), and waits for exit.
342 ///
343 /// If a session_id exists from a previous turn, `--resume <sid>` is appended to args.
344 pub fn spawn_turn(&self, config: SubprocessSpawnConfig) -> Result<(), String> {
345 if !self.try_lock_run() {
346 // Turn in progress — queue the message for after it exits.
347 let mut inner = self.inner.lock().unwrap();
348 tracing::info!(
349 block_id = %self.block_id,
350 queue_depth = inner.pending_messages.len() + 1,
351 "subprocess busy — message queued"
352 );
353 inner.pending_messages.push_back(config);
354 return Ok(());
355 }
356
357 // Direct-spawn path (queue was empty): emit the accepted event
358 // now so the frontend can promote its pending entry. The
359 // drain-from-queue path (in process_waiter) emits the same
360 // event just before calling spawn_turn recursively.
361 self.emit_message_accepted(&config);
362
363 // Hydrate inner.session_id from the config-supplied id if the
364 // controller hasn't captured one yet. See
365 // `hydrate_session_id_from_config` for the full rationale.
366 self.hydrate_session_id_from_config(config.session_id.as_deref());
367
368 // Build CLI args, appending resume flag + session_id if we have one and the provider supports it
369 let mut args = config.cli_args.clone();
370 {
371 let inner = self.inner.lock().unwrap();
372 if let Some(ref sid) = inner.session_id {
373 if !config.resume_flag.is_empty() {
374 args.push(config.resume_flag.clone());
375 args.push(sid.clone());
376 }
377 }
378 }
379
380 // Update status to running
381 {
382 let mut inner = self.inner.lock().unwrap();
383 Self::set_status(&mut inner, STATUS_RUNNING);
384 }
385 self.publish_status();
386 self.health_monitor.set_active_turn(true);
387
388 // Build command — on Windows, .cmd batch wrappers can't be reliably spawned
389 // via cmd.exe /C with piped stdio. Resolve to node <script> instead.
390 let mut cmd = crate::server::cli_handlers::make_cli_cmd(&config.cli_command);
391 cmd.args(&args);
392
393 // On Windows: suppress console-window allocation. Without CREATE_NO_WINDOW,
394 // node.exe spawned from a windowless sidecar may try to create/attach to a
395 // console, causing stdout to go to that console rather than the pipe.
396 #[cfg(windows)]
397 {
398 const CREATE_NO_WINDOW: u32 = 0x0800_0000;
399 cmd.creation_flags(CREATE_NO_WINDOW);
400 }
401 if !config.working_dir.is_empty() {
402 // Expand ~ to home directory (cross-platform)
403 let expanded_dir = if config.working_dir.starts_with("~/") || config.working_dir == "~" {
404 if let Some(home) = dirs::home_dir() {
405 home.join(config.working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
406 } else {
407 config.working_dir.clone()
408 }
409 } else {
410 config.working_dir.clone()
411 };
412 // Create directory if it doesn't exist
413 let dir_path = std::path::Path::new(&expanded_dir);
414 if !dir_path.exists() {
415 if let Err(e) = std::fs::create_dir_all(dir_path) {
416 tracing::warn!(
417 block_id = %self.block_id,
418 dir = %expanded_dir,
419 error = %e,
420 "failed to create working directory, using current dir"
421 );
422 } else {
423 tracing::info!(
424 block_id = %self.block_id,
425 dir = %expanded_dir,
426 "created working directory"
427 );
428 }
429 }
430 if dir_path.exists() {
431 cmd.current_dir(&expanded_dir);
432 }
433 }
434 for (k, v) in &config.env_vars {
435 let expanded = crate::backend::base::expand_home_dir_safe(v);
436 cmd.env(k, expanded.to_string_lossy().as_ref());
437 }
438 cmd.stdin(std::process::Stdio::piped());
439 cmd.stdout(std::process::Stdio::piped());
440 cmd.stderr(std::process::Stdio::piped());
441
442 // Spawn
443 let mut child = cmd.spawn().map_err(|e| {
444 let mut inner = self.inner.lock().unwrap();
445 Self::set_status(&mut inner, STATUS_DONE);
446 inner.proc_exit_code = -1;
447 self.unlock_run();
448 format!("failed to spawn subprocess: {e}")
449 })?;
450
451 let pid = child.id().unwrap_or(0);
452 tracing::info!(
453 block_id = %self.block_id,
454 pid = pid,
455 cmd = %config.cli_command,
456 args = ?args,
457 "subprocess spawned"
458 );
459
460 // Assign the child to this block's process tracker so every
461 // descendant it spawns (bg bash, dev servers, watchers, etc.)
462 // is caught by the per-platform tracking mechanism and surfaces
463 // in the swarm activity panel. No-op if the tracker global
464 // hasn't been initialized (tests) or on platforms without a
465 // real tracker impl yet (stub handle accepts silently).
466 // See `backend::process_tracker`.
467 if pid != 0 {
468 if let Some(registry) = crate::backend::process_tracker::registry::global() {
469 let tracker = registry.ensure_tracker(&self.block_id);
470 if let Err(e) = tracker.assign_process(pid) {
471 tracing::warn!(
472 block_id = %self.block_id,
473 pid = pid,
474 err = %e,
475 "[process-tracker] assign_process failed"
476 );
477 }
478 }
479 }
480
481 // Store PID
482 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
483 {
484 let mut inner = self.inner.lock().unwrap();
485 inner.current_pid = Some(pid);
486 inner.kill_tx = Some(kill_tx);
487 }
488
489 // Take ownership of stdin/stdout
490 let stdin = child.stdin.take().unwrap();
491 let stdout = child.stdout.take().unwrap();
492 let stderr = child.stderr.take().unwrap();
493
494 // Write user message to stdin, then close it.
495 // CRITICAL: This must complete BEFORE the child's stdin timeout
496 // (Claude CLI: 3s). Using std::thread + synchronous write to
497 // bypass the Tokio task scheduler — a tokio::spawn'd task may
498 // not run for seconds on a busy runtime, causing the child to
499 // time out with "no stdin data received in 3s".
500 let message = config.message;
501 let block_id_stdin = self.block_id.clone();
502 {
503 // Convert Tokio's async ChildStdin to a raw OS handle, then
504 // wrap in a std::fs::File for synchronous write. The pipe
505 // buffer (4-64KB on Windows) easily fits our message, so
506 // write_all returns instantly without blocking.
507 #[cfg(unix)]
508 let raw_handle = {
509 use std::os::unix::io::{AsRawFd, FromRawFd};
510 let fd = stdin.as_raw_fd();
511 unsafe { std::fs::File::from_raw_fd(fd) }
512 };
513 #[cfg(windows)]
514 let raw_handle = {
515 use std::os::windows::io::{AsRawHandle, FromRawHandle};
516 let handle = stdin.as_raw_handle();
517 unsafe { std::fs::File::from_raw_handle(handle) }
518 };
519
520 // Spawn a real OS thread (not a Tokio task) for the write.
521 // This ensures it runs immediately regardless of runtime load.
522 // The raw handle is valid as long as `stdin` lives — we move
523 // `stdin` into the thread via a guard to keep it alive.
524 std::thread::spawn(move || {
525 use std::io::Write;
526 let _keep_alive = stdin; // prevent Tokio ChildStdin drop
527 let mut pipe = raw_handle;
528 let payload = format!("{}\n", message);
529 if let Err(e) = pipe.write_all(payload.as_bytes()) {
530 tracing::warn!(block_id = %block_id_stdin, "subprocess stdin write error: {}", e);
531 std::mem::forget(pipe); // don't close the handle — _keep_alive owns it
532 return;
533 }
534 if let Err(e) = pipe.flush() {
535 tracing::warn!(block_id = %block_id_stdin, "subprocess stdin flush error: {}", e);
536 }
537 std::mem::forget(pipe); // don't double-close — _keep_alive owns the handle
538 // _keep_alive (Tokio ChildStdin) drops here → EOF to the subprocess
539 });
540 }
541
542 // Spawn stdout_reader task
543 let block_id_read = self.block_id.clone();
544 let broker_read = self.broker.clone();
545 let inner_read = Arc::clone(&self.inner);
546 let wstore_read = self.wstore.clone();
547 let event_bus_read = self.event_bus.clone();
548 let filestore_read = self.filestore.clone();
549 let health_read = Arc::clone(&self.health_monitor);
550 let session_id_field = config.session_id_field.clone();
551 tokio::spawn(async move {
552 let reader = BufReader::new(stdout);
553 let mut lines = reader.lines();
554 let mut stats = super::session_stats::SessionStatsAccumulator::new(block_id_read.clone());
555
556 tracing::info!(block_id = %block_id_read, "stdout_reader started");
557
558 loop {
559 match lines.next_line().await {
560 Err(e) => {
561 tracing::warn!(block_id = %block_id_read, error = %e, "subprocess stdout read error");
562 break;
563 }
564 Ok(None) => {
565 tracing::info!(block_id = %block_id_read, "subprocess stdout EOF");
566 break;
567 }
568 Ok(Some(line)) => {
569 let trimmed = line.trim();
570 if trimmed.is_empty() {
571 continue;
572 }
573
574 // Track session metadata (debounced 1 s).
575 // Use `line.len()` (not `trimmed.len()`) to match persistent.rs
576 // so token_estimate stays consistent across controller types.
577 stats.record_line(line.len(), &wstore_read);
578
579 // Classify output for health monitoring
580 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
581 let (meaningful, error) = classify_output_line(&parsed);
582 health_read.record_output(meaningful);
583 if let Some((class, msg)) = error {
584 health_read.record_error(class, msg);
585 }
586 }
587
588 // Try to capture session/thread ID from the provider's init event.
589 // Claude: {"type":"system","subtype":"init","session_id":"..."}
590 // Gemini: {"type":"init","session_id":"..."}
591 // Codex: {"type":"thread.started","thread_id":"..."}
592 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
593 if let Some(sid) = parsed.get(&session_id_field).and_then(|v| v.as_str()) {
594 let sid_string = sid.to_string();
595 // Authoritative CLI capture —
596 // overwrites any prior value
597 // (including stale hydrated ids
598 // from picker reattach). De-dups
599 // when the same id repeats across
600 // turns. See
601 // `record_captured_session_id_inner`
602 // for the unit-tested form.
603 let changed = SubprocessController::record_captured_session_id_inner(
604 &inner_read,
605 &sid_string,
606 );
607 if changed {
608 tracing::info!(
609 block_id = %block_id_read,
610 field = %session_id_field,
611 session_id = %sid_string,
612 "captured session id"
613 );
614
615 // Persist session_id to block metadata
616 if let Some(ref store) = wstore_read {
617 let oref_str = format!("block:{}", block_id_read);
618 let mut meta_update =
619 crate::backend::obj::MetaMapType::new();
620 meta_update.insert(
621 "agent:sessionid".to_string(),
622 serde_json::Value::String(sid_string),
623 );
624 if let Err(e) = crate::server::service::update_object_meta(
625 store, &oref_str, &meta_update,
626 ) {
627 tracing::warn!(
628 block_id = %block_id_read,
629 error = %e,
630 "failed to persist agent:sessionid"
631 );
632 } else if let Some(ref event_bus) = event_bus_read {
633 // Broadcast metadata update to frontend
634 if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&block_id_read) {
635 let update_data = serde_json::to_value(
636 &crate::backend::obj::WaveObjUpdate {
637 updatetype: "update".into(),
638 otype: "block".into(),
639 oid: block_id_read.clone(),
640 obj: Some(crate::backend::obj::wave_obj_to_value(&updated_block)),
641 },
642 )
643 .ok();
644 event_bus.broadcast_event(
645 &crate::backend::eventbus::WSEventType {
646 eventtype: "waveobj:update".to_string(),
647 oref: oref_str,
648 data: update_data,
649 },
650 );
651 }
652 }
653 }
654 }
655 }
656 }
657
658 // Publish the NDJSON line as a WPS blockfile event on the "output" subject
659 // and write-through to FileStore for persistent history (Phase 1.3).
660 if let Some(ref broker) = broker_read {
661 tracing::info!(block_id = %block_id_read, line = %trimmed, "subprocess stdout → blockfile");
662 // Include the newline so the frontend line splitter works correctly
663 let line_with_newline = format!("{}\n", trimmed);
664 super::shell::handle_append_block_file(
665 broker,
666 &block_id_read,
667 SUBPROCESS_OUTPUT_SUBJECT,
668 line_with_newline.as_bytes(),
669 filestore_read.as_ref(),
670 );
671 }
672 }
673 }
674 }
675
676 tracing::info!(block_id = %block_id_read, "stdout_reader exiting");
677 });
678
679 // Spawn stderr reader (log warnings, don't publish)
680 let block_id_err = self.block_id.clone();
681 tokio::spawn(async move {
682 let reader = BufReader::new(stderr);
683 let mut lines = reader.lines();
684 loop {
685 match lines.next_line().await {
686 Err(e) => {
687 tracing::warn!(block_id = %block_id_err, error = %e, "subprocess stderr read error");
688 break;
689 }
690 Ok(None) => break,
691 Ok(Some(line)) => {
692 if !line.trim().is_empty() {
693 tracing::info!(
694 block_id = %block_id_err,
695 stderr = %line,
696 "subprocess stderr"
697 );
698 }
699 }
700 }
701 }
702 });
703
704 // Spawn health watchdog (checks every 5s while turn is active)
705 let health_watchdog = Arc::clone(&self.health_monitor);
706 tokio::spawn(async move {
707 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
708 loop {
709 interval.tick().await;
710 if !health_watchdog.is_active_turn() {
711 break;
712 }
713 health_watchdog.check();
714 }
715 });
716
717 // Spawn process_waiter task
718 let inner_wait = Arc::clone(&self.inner);
719 let block_id_wait = self.block_id.clone();
720 let broker_wait = self.broker.clone();
721 let run_lock = Arc::clone(&self.run_lock);
722 let health_wait = Arc::clone(&self.health_monitor);
723 let self_ref_wait = self.self_ref.lock().unwrap().clone().unwrap_or_default();
724 tokio::spawn(async move {
725 // Wait for either process exit or kill signal
726 tokio::select! {
727 exit_result = child.wait() => {
728 let exit_code = match exit_result {
729 Ok(status) => status.code().unwrap_or(-1),
730 Err(e) => {
731 tracing::warn!(
732 block_id = %block_id_wait,
733 error = %e,
734 "subprocess wait error"
735 );
736 -1
737 }
738 };
739
740 tracing::info!(
741 block_id = %block_id_wait,
742 exit_code = exit_code,
743 "subprocess exited"
744 );
745
746 // Update inner state
747 {
748 let mut inner = inner_wait.lock().unwrap();
749 inner.proc_exit_code = exit_code;
750 SubprocessController::set_status(&mut inner, STATUS_DONE);
751 inner.current_pid = None;
752 inner.kill_tx = None;
753 }
754 }
755 force = kill_rx => {
756 let force = force.unwrap_or(false);
757 tracing::info!(
758 block_id = %block_id_wait,
759 force = force,
760 "subprocess kill requested"
761 );
762
763 if force {
764 let _ = child.kill().await;
765 } else {
766 // On Unix, send SIGTERM. On Windows, kill() is the only option.
767 #[cfg(unix)]
768 {
769 if let Some(pid) = child.id() {
770 unsafe { libc::kill(pid as i32, libc::SIGTERM); }
771 }
772 // Give it a moment to exit gracefully
773 tokio::time::sleep(tokio::time::Duration::from_millis(
774 super::DEFAULT_GRACEFUL_KILL_WAIT_MS,
775 )).await;
776 let _ = child.kill().await;
777 }
778 #[cfg(not(unix))]
779 {
780 let _ = child.kill().await;
781 }
782 }
783
784 let _ = child.wait().await;
785
786 {
787 let mut inner = inner_wait.lock().unwrap();
788 inner.proc_exit_code = -1;
789 SubprocessController::set_status(&mut inner, STATUS_DONE);
790 inner.current_pid = None;
791 inner.kill_tx = None;
792 }
793 }
794 }
795
796 // Update health monitor with exit status
797 {
798 let inner = inner_wait.lock().unwrap();
799 health_wait.set_exited(inner.proc_exit_code);
800 }
801
802 // Publish done status
803 if let Some(ref broker) = broker_wait {
804 let status = {
805 let inner = inner_wait.lock().unwrap();
806 BlockControllerRuntimeStatus {
807 blockid: block_id_wait.clone(),
808 version: inner.status_version,
809 shellprocstatus: inner.proc_status.clone(),
810 shellprocconnname: "local".to_string(),
811 shellprocexitcode: inner.proc_exit_code,
812 spawn_ts_ms: None,
813 is_agent_pane: false,
814 }
815 };
816 super::publish_controller_status(broker, &status);
817 }
818
819 // Release run lock
820 run_lock.store(false, Ordering::SeqCst);
821
822 // Drain message queue: if messages were queued while this turn
823 // was running, pop the next one and spawn it via the weak
824 // self-reference.
825 let next_config = {
826 let mut inner = inner_wait.lock().unwrap();
827 inner.pending_messages.pop_front()
828 };
829 if let Some(config) = next_config {
830 if let Some(ctrl) = self_ref_wait.upgrade() {
831 tracing::info!(
832 block_id = %block_id_wait,
833 "draining queued message"
834 );
835 if let Err(e) = ctrl.spawn_turn(config) {
836 tracing::warn!(
837 block_id = %block_id_wait,
838 error = %e,
839 "failed to spawn queued turn"
840 );
841 }
842 }
843 }
844 });
845
846 Ok(())
847 }
848
849 /// Stop the currently running subprocess.
850 pub fn stop_subprocess(&self, force: bool) -> Result<(), String> {
851 let kill_tx = {
852 let mut inner = self.inner.lock().unwrap();
853 inner.kill_tx.take()
854 };
855 match kill_tx {
856 Some(tx) => {
857 let _ = tx.send(force);
858 Ok(())
859 }
860 None => Ok(()), // No running process
861 }
862 }
863}
864
865impl Controller for SubprocessController {
866 fn start(
867 &self,
868 _block_meta: super::super::obj::MetaMapType,
869 _rt_opts: Option<serde_json::Value>,
870 _force: bool,
871 ) -> Result<(), String> {
872 // SubprocessController doesn't auto-start on resync.
873 // Turns are initiated by SubprocessSpawnCommand / AgentInputCommand.
874 tracing::info!(
875 block_id = %self.block_id,
876 "subprocess controller registered (no auto-start)"
877 );
878 Ok(())
879 }
880
881 fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
882 // Stop any running subprocess
883 self.stop_subprocess(true)?;
884
885 let mut inner = self.inner.lock().unwrap();
886 if inner.proc_status != new_status {
887 Self::set_status(&mut inner, new_status);
888 }
889
890 Ok(())
891 }
892
893 fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
894 self.get_status_snapshot()
895 }
896
897 fn send_input(&self, input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
898 // SubprocessController doesn't accept raw PTY input — user messages
899 // go through spawn_turn() (via AgentInputCommand RPC).
900 //
901 // Signals ARE accepted though: the agent-pane composer's Esc
902 // handler sends SIGINT via `ControllerInputCommand({signame:"SIGINT"})`
903 // when the user wants to cancel an in-flight turn. Route that to
904 // `stop_subprocess(force=true)` so the current subprocess is
905 // killed via `kill_tx`. Without this, Esc was silently rejected
906 // and the agent kept running.
907 if let Some(sig) = input.sig_name.as_deref() {
908 if sig == "SIGINT" || sig == "SIGTERM" {
909 tracing::info!(
910 block_id = %self.block_id,
911 sig = %sig,
912 "subprocess controller: received signal, killing current turn"
913 );
914 return self.stop_subprocess(true);
915 }
916 return Err(format!(
917 "subprocess controller: unsupported signal {sig} (only SIGINT/SIGTERM)"
918 ));
919 }
920 if input.input_data.is_some() {
921 return Err("subprocess controller does not accept raw input; use AgentInputCommand".to_string());
922 }
923 // Term resize / other input types: accepted-no-op.
924 Ok(())
925 }
926
927 fn controller_type(&self) -> &str {
928 BLOCK_CONTROLLER_SUBPROCESS
929 }
930
931 fn block_id(&self) -> &str {
932 &self.block_id
933 }
934
935 fn as_any(&self) -> &dyn std::any::Any {
936 self
937 }
938}
939
940#[cfg(test)]
941mod tests {
942 use super::*;
943
944 #[test]
945 fn test_subprocess_controller_new() {
946 let ctrl = SubprocessController::new(
947 "tab-1".to_string(),
948 "block-1".to_string(),
949 None,
950 None,
951 None,
952 None,
953 );
954 assert_eq!(ctrl.controller_type(), BLOCK_CONTROLLER_SUBPROCESS);
955 assert_eq!(ctrl.block_id(), "block-1");
956
957 let status = ctrl.get_runtime_status();
958 assert_eq!(status.shellprocstatus, STATUS_INIT);
959 assert_eq!(status.blockid, "block-1");
960 }
961
962 #[test]
963 fn test_subprocess_controller_rejects_raw_input() {
964 let ctrl = SubprocessController::new(
965 "tab-1".to_string(),
966 "block-1".to_string(),
967 None,
968 None,
969 None,
970 None,
971 );
972 let result = ctrl.send_input(BlockInputUnion::data(b"hello".to_vec()), None);
973 assert!(result.is_err());
974 assert!(result.unwrap_err().contains("AgentInputCommand"));
975 }
976
977 #[test]
978 fn test_subprocess_controller_start_is_noop() {
979 let ctrl = SubprocessController::new(
980 "tab-1".to_string(),
981 "block-1".to_string(),
982 None,
983 None,
984 None,
985 None,
986 );
987 let result = ctrl.start(HashMap::new(), None, false);
988 assert!(result.is_ok());
989
990 // Still in init state — no auto-start
991 let status = ctrl.get_runtime_status();
992 assert_eq!(status.shellprocstatus, STATUS_INIT);
993 }
994
995 #[test]
996 fn test_subprocess_controller_stop_when_idle() {
997 let ctrl = SubprocessController::new(
998 "tab-1".to_string(),
999 "block-1".to_string(),
1000 None,
1001 None,
1002 None,
1003 None,
1004 );
1005 let result = ctrl.stop(true, STATUS_DONE);
1006 assert!(result.is_ok());
1007
1008 let status = ctrl.get_runtime_status();
1009 assert_eq!(status.shellprocstatus, STATUS_DONE);
1010 }
1011
1012 #[test]
1013 fn test_subprocess_controller_session_id_initially_none() {
1014 let ctrl = SubprocessController::new(
1015 "tab-1".to_string(),
1016 "block-1".to_string(),
1017 None,
1018 None,
1019 None,
1020 None,
1021 );
1022 assert!(ctrl.session_id().is_none());
1023 }
1024
1025 #[test]
1026 fn test_subprocess_controller_concurrent_spawn_blocked() {
1027 let ctrl = SubprocessController::new(
1028 "tab-1".to_string(),
1029 "block-1".to_string(),
1030 None,
1031 None,
1032 None,
1033 None,
1034 );
1035
1036 // Manually acquire run lock
1037 ctrl.run_lock.store(true, Ordering::SeqCst);
1038
1039 let config = SubprocessSpawnConfig {
1040 cli_command: "echo".to_string(),
1041 cli_args: vec![],
1042 working_dir: String::new(),
1043 env_vars: HashMap::new(),
1044 message: "test".to_string(),
1045 resume_flag: String::new(),
1046 session_id_field: "session_id".to_string(),
1047 message_id: None,
1048 session_id: None,
1049 };
1050
1051 let result = ctrl.spawn_turn(config);
1052 // spawn_turn now queues instead of rejecting when busy
1053 assert!(result.is_ok());
1054
1055 // Verify the message was queued
1056 let inner = ctrl.inner.lock().unwrap();
1057 assert_eq!(inner.pending_messages.len(), 1);
1058 assert_eq!(inner.pending_messages[0].message, "test");
1059 drop(inner);
1060
1061 // Release lock
1062 ctrl.run_lock.store(false, Ordering::SeqCst);
1063 }
1064
1065 #[test]
1066 fn hydrate_session_id_populates_inner_when_none() {
1067 // Regression test for the 2026-05-24 "clicking My Agents
1068 // re-inserts the startup context" report. A fresh
1069 // SubprocessController is created for the reattached block;
1070 // its inner.session_id starts as None. The picker reattach
1071 // flow persists the prior block's session id into
1072 // `agent:sessionid` meta, the caller plumbs it into
1073 // `SubprocessSpawnConfig::session_id`, and spawn_turn calls
1074 // `hydrate_session_id_from_config` before building args.
1075 // After hydration, the existing args-builder appends
1076 // `--resume <sid>` on this very first turn — no
1077 // re-injected startup context.
1078 let ctrl = SubprocessController::new(
1079 "tab-1".to_string(),
1080 "block-reattach".to_string(),
1081 None,
1082 None,
1083 None,
1084 None,
1085 );
1086 assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1087
1088 ctrl.hydrate_session_id_from_config(Some("prior-sid-from-meta"));
1089 assert_eq!(
1090 ctrl.inner.lock().unwrap().session_id.as_deref(),
1091 Some("prior-sid-from-meta")
1092 );
1093 }
1094
1095 #[test]
1096 fn hydrate_session_id_is_noop_when_value_already_present() {
1097 // Hydration is best-effort, not authoritative — it only
1098 // sets `inner.session_id` when None. The reason isn't
1099 // captured-id-wins (that's enforced at CAPTURE time below);
1100 // it's just to avoid re-hydrating on every spawn_turn call
1101 // within a controller lifetime. A stale value here is fine
1102 // because the next CLI emit at `record_captured_session_id_inner`
1103 // will overwrite.
1104 let ctrl = SubprocessController::new(
1105 "tab-1".to_string(),
1106 "block-resume".to_string(),
1107 None,
1108 None,
1109 None,
1110 None,
1111 );
1112 ctrl.inner.lock().unwrap().session_id = Some("captured-sid".to_string());
1113
1114 ctrl.hydrate_session_id_from_config(Some("different-config-sid"));
1115 assert_eq!(
1116 ctrl.inner.lock().unwrap().session_id.as_deref(),
1117 Some("captured-sid"),
1118 "hydration must not overwrite an existing value"
1119 );
1120 }
1121
1122 #[test]
1123 fn record_captured_overwrites_hydrated_value() {
1124 // The CLI is authoritative for session id once it speaks.
1125 // Codex P1 on PR #1018 first cut: my original
1126 // `if !already_captured` guard in the stdout reader meant
1127 // that a hydrated (possibly stale) session id would lock
1128 // out every subsequent CLI-emitted value, so a wrong
1129 // `--resume <stale>` would be passed forever. The fix
1130 // (`record_captured_session_id_inner`) always overwrites
1131 // and returns whether the value changed.
1132 let ctrl = SubprocessController::new(
1133 "tab-1".to_string(),
1134 "block-overwrite".to_string(),
1135 None,
1136 None,
1137 None,
1138 None,
1139 );
1140 ctrl.hydrate_session_id_from_config(Some("stale-hydrated-sid"));
1141 assert_eq!(
1142 ctrl.session_id().as_deref(),
1143 Some("stale-hydrated-sid")
1144 );
1145
1146 let changed = ctrl.record_captured_session_id("authoritative-sid");
1147 assert!(changed, "value differs from hydrated; must report changed");
1148 assert_eq!(
1149 ctrl.session_id().as_deref(),
1150 Some("authoritative-sid"),
1151 "CLI-emitted id must overwrite hydrated value"
1152 );
1153 }
1154
1155 #[test]
1156 fn record_captured_dedups_same_value() {
1157 // Real CLI streams emit `session_id` on every NDJSON frame,
1158 // not just the first. The dedup is a perf knob (skips the
1159 // meta-update broadcast on repeats), not a correctness
1160 // gate — captured-id is still authoritative on first emit.
1161 let ctrl = SubprocessController::new(
1162 "tab-1".to_string(),
1163 "block-dedup".to_string(),
1164 None,
1165 None,
1166 None,
1167 None,
1168 );
1169 assert!(ctrl.record_captured_session_id("sid-1"));
1170 assert!(!ctrl.record_captured_session_id("sid-1"),
1171 "second call with same value must return false (no broadcast)");
1172 assert_eq!(ctrl.session_id().as_deref(), Some("sid-1"));
1173 }
1174
1175 #[test]
1176 fn record_captured_ignores_empty() {
1177 // Defensive: empty string from a malformed CLI emit must
1178 // not clear a valid prior value.
1179 let ctrl = SubprocessController::new(
1180 "tab-1".to_string(),
1181 "block-empty".to_string(),
1182 None,
1183 None,
1184 None,
1185 None,
1186 );
1187 ctrl.record_captured_session_id("real-sid");
1188 assert!(!ctrl.record_captured_session_id(""),
1189 "empty CLI emit must be ignored");
1190 assert_eq!(ctrl.session_id().as_deref(), Some("real-sid"));
1191 }
1192
1193 #[test]
1194 fn hydrate_session_id_ignores_empty_and_none() {
1195 // Greenfield launches pass `None` (or `Some("")` if the
1196 // caller didn't filter) — hydration must be a no-op in
1197 // either case so inner.session_id stays None until the CLI
1198 // captures its own.
1199 let ctrl = SubprocessController::new(
1200 "tab-1".to_string(),
1201 "block-greenfield".to_string(),
1202 None,
1203 None,
1204 None,
1205 None,
1206 );
1207 ctrl.hydrate_session_id_from_config(None);
1208 assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1209
1210 ctrl.hydrate_session_id_from_config(Some(""));
1211 assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1212 }
1213
1214 #[test]
1215 fn spawn_turn_preserves_session_id_in_queued_config() {
1216 // When the controller is busy, spawn_turn queues the config
1217 // for the drain-from-queue path. The hydration ONLY runs on
1218 // the direct-spawn path (after try_lock_run), so the queued
1219 // config must carry session_id through unchanged for the
1220 // drain path's recursive call to see it.
1221 let ctrl = SubprocessController::new(
1222 "tab-1".to_string(),
1223 "block-queued".to_string(),
1224 None,
1225 None,
1226 None,
1227 None,
1228 );
1229 ctrl.run_lock.store(true, Ordering::SeqCst);
1230
1231 let config = SubprocessSpawnConfig {
1232 cli_command: "claude".to_string(),
1233 cli_args: vec!["-p".to_string()],
1234 working_dir: String::new(),
1235 env_vars: HashMap::new(),
1236 message: "hi".to_string(),
1237 resume_flag: "--resume".to_string(),
1238 session_id_field: "session_id".to_string(),
1239 message_id: None,
1240 session_id: Some("prior-sid".to_string()),
1241 };
1242 let _ = ctrl.spawn_turn(config);
1243
1244 let inner = ctrl.inner.lock().unwrap();
1245 assert_eq!(inner.pending_messages.len(), 1);
1246 assert_eq!(
1247 inner.pending_messages[0].session_id.as_deref(),
1248 Some("prior-sid"),
1249 );
1250 // Hydration didn't run yet — direct-spawn path was bypassed
1251 // by the busy lock; the drain will hydrate when it dequeues.
1252 assert!(inner.session_id.is_none());
1253 }
1254}