pub struct SubprocessController {
tab_id: String,
block_id: String,
run_lock: Arc<AtomicBool>,
inner: Arc<Mutex<SubprocessControllerInner>>,
broker: Option<Arc<Broker>>,
event_bus: Option<Arc<EventBus>>,
wstore: Option<Arc<WaveStore>>,
filestore: Option<Arc<FileStore>>,
health_monitor: Arc<HealthMonitor>,
self_ref: Mutex<Option<Weak<Self>>>,
}Expand description
SubprocessController manages per-turn subprocess lifecycle for agent blocks.
Unlike ShellController which maintains a long-running PTY process,
SubprocessController spawns a fresh process for each user turn.
Multi-turn continuity comes from --resume <session-id>.
Fields§
§tab_id: StringParent tab UUID.
block_id: StringBlock UUID.
run_lock: Arc<AtomicBool>Prevents concurrent spawns.
inner: Arc<Mutex<SubprocessControllerInner>>Protected inner state.
broker: Option<Arc<Broker>>WPS broker for publishing events (blockfile, controllerstatus).
event_bus: Option<Arc<EventBus>>Event bus for obj:update broadcasts.
wstore: Option<Arc<WaveStore>>Wave object store for block metadata persistence.
filestore: Option<Arc<FileStore>>FileStore for write-through persistence of output lines (Phase 1.3).
health_monitor: Arc<HealthMonitor>Agent health monitor (output activity + error tracking).
self_ref: Mutex<Option<Weak<Self>>>Weak self-reference for queue drain. Set by set_self_ref after
the controller is wrapped in Arc.
Implementations§
Source§impl SubprocessController
impl SubprocessController
Sourcepub fn new(
tab_id: String,
block_id: String,
broker: Option<Arc<Broker>>,
event_bus: Option<Arc<EventBus>>,
wstore: Option<Arc<WaveStore>>,
filestore: Option<Arc<FileStore>>,
) -> Self
pub fn new( tab_id: String, block_id: String, broker: Option<Arc<Broker>>, event_bus: Option<Arc<EventBus>>, wstore: Option<Arc<WaveStore>>, filestore: Option<Arc<FileStore>>, ) -> Self
Create a new SubprocessController.
Sourcepub fn set_self_ref(self: &Arc<Self>)
pub fn set_self_ref(self: &Arc<Self>)
Store a weak self-reference so the process_waiter can drain queued messages by calling spawn_turn after the current turn exits. Must be called after wrapping in Arc.
Sourcefn try_lock_run(&self) -> bool
fn try_lock_run(&self) -> bool
Try to acquire the run lock. Returns false if a turn is already in progress.
Sourcefn unlock_run(&self)
fn unlock_run(&self)
Release the run lock.
Sourcefn set_status(inner: &mut SubprocessControllerInner, status: &str)
fn set_status(inner: &mut SubprocessControllerInner, status: &str)
Update process status and increment version (must hold inner lock).
Sourcefn get_status_snapshot(&self) -> BlockControllerRuntimeStatus
fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus
Get the runtime status (snapshot).
Sourcefn publish_status(&self)
fn publish_status(&self)
Publish current controller status via the WPS broker.
Sourcefn emit_message_accepted(&self, config: &SubprocessSpawnConfig)
fn emit_message_accepted(&self, config: &SubprocessSpawnConfig)
Emit agent-message-accepted for the given config, if it carries
a message_id. Called from both spawn_turn (direct path) and
the process_waiter drain site (queue path). No-op if the config
has no id, or the broker isn’t configured.
Sourcepub fn session_id(&self) -> Option<String>
pub fn session_id(&self) -> Option<String>
Get the stored session ID (if any).
Sourcepub(crate) fn record_captured_session_id_inner(
inner: &Mutex<SubprocessControllerInner>,
sid: &str,
) -> bool
pub(crate) fn record_captured_session_id_inner( inner: &Mutex<SubprocessControllerInner>, sid: &str, ) -> bool
Record an authoritative session id captured from the CLI’s
stdout init/thread.started event. The CLI is the source of
truth for which session is live, so this ALWAYS overwrites
any prior value of inner.session_id — including values
previously hydrated from config on a picker reattach (which
may be stale by the time the CLI speaks).
Free-function form (taking &Arc<Mutex<…Inner>> instead of
&self) so the spawn_turn stdout-reader tokio task can call
it without holding an Arc<Self> reference. The
&SubprocessController method below just delegates.
Returns true when the value changed (caller should
broadcast the meta update + persist to block meta). Returns
false when the new id matches the current one — common
when the CLI emits the same session_id on every NDJSON
frame within a single turn.
Sourcepub(crate) fn hydrate_session_id_from_config(&self, config_sid: Option<&str>)
pub(crate) fn hydrate_session_id_from_config(&self, config_sid: Option<&str>)
Hydrate inner.session_id from a config-supplied id when the
controller hasn’t seen a value yet.
Picker reattach path: a fresh SubprocessController is
registered for the new block, so its inner.session_id is
None. The frontend persisted the prior block’s session id
into agent:sessionid meta, the websocket / app_api caller
read it into SubprocessSpawnConfig::session_id, and this
method copies it to inner so the spawn_turn args-builder
appends --resume <sid> on the FIRST turn.
Hydration is best-effort, not authoritative. If
inner.session_id is already Some we no-op (don’t overwrite
a value already in place — could be a captured-from-stdout
id from an earlier turn, or a prior hydration on the same
reattach). Critically, the CLI’s stdout-emitted session id
is authoritative and overwrites any prior value at capture
time (see the stdout-reader block in spawn_turn). So if the
hydrated value is stale, the FIRST turn passes the stale id
via --resume (likely accepted as a no-op or rejected with a
“no such session” error from the CLI), the CLI then emits its
own session id in the init event, and inner.session_id is
overwritten with that authoritative value for subsequent
turns. Without the capture overwrite, a stale hydrated id
would be re-used forever — that was the bug codex flagged on
PR #1018 first cut.
Empty &str is treated as “no value” so the caller can use
it unconditionally without filtering.
Sourcepub fn spawn_turn(&self, config: SubprocessSpawnConfig) -> Result<(), String>
pub fn spawn_turn(&self, config: SubprocessSpawnConfig) -> Result<(), String>
Spawn a single turn of the agent CLI.
This is the core method — it spawns claude -p, writes the user message to stdin,
reads NDJSON from stdout (publishing WPS events), and waits for exit.
If a session_id exists from a previous turn, --resume <sid> is appended to args.
Trait Implementations§
Source§impl Controller for SubprocessController
impl Controller for SubprocessController
Source§fn start(
&self,
_block_meta: HashMap<String, Value>,
_rt_opts: Option<Value>,
_force: bool,
) -> Result<(), String>
fn start( &self, _block_meta: HashMap<String, Value>, _rt_opts: Option<Value>, _force: bool, ) -> Result<(), String>
force restarts even if already running.Source§fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String>
fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String>
graceful waits for process to exit; new_status is the target state.Source§fn get_runtime_status(&self) -> BlockControllerRuntimeStatus
fn get_runtime_status(&self) -> BlockControllerRuntimeStatus
Source§fn send_input(
&self,
input: BlockInputUnion,
_seq: Option<u64>,
) -> Result<(), String>
fn send_input( &self, input: BlockInputUnion, _seq: Option<u64>, ) -> Result<(), String>
seq is the per-TermViewModel monotonic counter; None means fire-and-forget (no ordering).Source§fn controller_type(&self) -> &str
fn controller_type(&self) -> &str
Auto Trait Implementations§
impl !Freeze for SubprocessController
impl RefUnwindSafe for SubprocessController
impl Send for SubprocessController
impl Sync for SubprocessController
impl Unpin for SubprocessController
impl UnwindSafe for SubprocessController
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can
then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be
further downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.