SubprocessController

Struct SubprocessController 

Source
pub struct SubprocessController {
    tab_id: String,
    block_id: String,
    run_lock: Arc<AtomicBool>,
    inner: Arc<Mutex<SubprocessControllerInner>>,
    broker: Option<Arc<Broker>>,
    event_bus: Option<Arc<EventBus>>,
    wstore: Option<Arc<WaveStore>>,
    filestore: Option<Arc<FileStore>>,
    health_monitor: Arc<HealthMonitor>,
    self_ref: Mutex<Option<Weak<Self>>>,
}
Expand description

SubprocessController manages per-turn subprocess lifecycle for agent blocks.

Unlike ShellController which maintains a long-running PTY process, SubprocessController spawns a fresh process for each user turn. Multi-turn continuity comes from --resume <session-id>.

Fields§

§tab_id: String

Parent tab UUID.

§block_id: String

Block UUID.

§run_lock: Arc<AtomicBool>

Prevents concurrent spawns.

§inner: Arc<Mutex<SubprocessControllerInner>>

Protected inner state.

§broker: Option<Arc<Broker>>

WPS broker for publishing events (blockfile, controllerstatus).

§event_bus: Option<Arc<EventBus>>

Event bus for obj:update broadcasts.

§wstore: Option<Arc<WaveStore>>

Wave object store for block metadata persistence.

§filestore: Option<Arc<FileStore>>

FileStore for write-through persistence of output lines (Phase 1.3).

§health_monitor: Arc<HealthMonitor>

Agent health monitor (output activity + error tracking).

§self_ref: Mutex<Option<Weak<Self>>>

Weak self-reference for queue drain. Set by set_self_ref after the controller is wrapped in Arc.

Implementations§

Source§

impl SubprocessController

Source

pub fn new( tab_id: String, block_id: String, broker: Option<Arc<Broker>>, event_bus: Option<Arc<EventBus>>, wstore: Option<Arc<WaveStore>>, filestore: Option<Arc<FileStore>>, ) -> Self

Create a new SubprocessController.

Source

pub fn set_self_ref(self: &Arc<Self>)

Store a weak self-reference so the process_waiter can drain queued messages by calling spawn_turn after the current turn exits. Must be called after wrapping in Arc.

Source

fn try_lock_run(&self) -> bool

Try to acquire the run lock. Returns false if a turn is already in progress.

Source

fn unlock_run(&self)

Release the run lock.

Source

fn set_status(inner: &mut SubprocessControllerInner, status: &str)

Update process status and increment version (must hold inner lock).

Source

fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus

Get the runtime status (snapshot).

Source

fn publish_status(&self)

Publish current controller status via the WPS broker.

Source

fn emit_message_accepted(&self, config: &SubprocessSpawnConfig)

Emit agent-message-accepted for the given config, if it carries a message_id. Called from both spawn_turn (direct path) and the process_waiter drain site (queue path). No-op if the config has no id, or the broker isn’t configured.

Source

pub fn session_id(&self) -> Option<String>

Get the stored session ID (if any).

Source

pub(crate) fn record_captured_session_id_inner( inner: &Mutex<SubprocessControllerInner>, sid: &str, ) -> bool

Record an authoritative session id captured from the CLI’s stdout init/thread.started event. The CLI is the source of truth for which session is live, so this ALWAYS overwrites any prior value of inner.session_id — including values previously hydrated from config on a picker reattach (which may be stale by the time the CLI speaks).

Free-function form (taking &Arc<Mutex<…Inner>> instead of &self) so the spawn_turn stdout-reader tokio task can call it without holding an Arc<Self> reference. The &SubprocessController method below just delegates.

Returns true when the value changed (caller should broadcast the meta update + persist to block meta). Returns false when the new id matches the current one — common when the CLI emits the same session_id on every NDJSON frame within a single turn.

Source

pub(crate) fn hydrate_session_id_from_config(&self, config_sid: Option<&str>)

Hydrate inner.session_id from a config-supplied id when the controller hasn’t seen a value yet.

Picker reattach path: a fresh SubprocessController is registered for the new block, so its inner.session_id is None. The frontend persisted the prior block’s session id into agent:sessionid meta, the websocket / app_api caller read it into SubprocessSpawnConfig::session_id, and this method copies it to inner so the spawn_turn args-builder appends --resume <sid> on the FIRST turn.

Hydration is best-effort, not authoritative. If inner.session_id is already Some we no-op (don’t overwrite a value already in place — could be a captured-from-stdout id from an earlier turn, or a prior hydration on the same reattach). Critically, the CLI’s stdout-emitted session id is authoritative and overwrites any prior value at capture time (see the stdout-reader block in spawn_turn). So if the hydrated value is stale, the FIRST turn passes the stale id via --resume (likely accepted as a no-op or rejected with a “no such session” error from the CLI), the CLI then emits its own session id in the init event, and inner.session_id is overwritten with that authoritative value for subsequent turns. Without the capture overwrite, a stale hydrated id would be re-used forever — that was the bug codex flagged on PR #1018 first cut.

Empty &str is treated as “no value” so the caller can use it unconditionally without filtering.

Source

pub fn spawn_turn(&self, config: SubprocessSpawnConfig) -> Result<(), String>

Spawn a single turn of the agent CLI.

This is the core method — it spawns claude -p, writes the user message to stdin, reads NDJSON from stdout (publishing WPS events), and waits for exit.

If a session_id exists from a previous turn, --resume <sid> is appended to args.

Source

pub fn stop_subprocess(&self, force: bool) -> Result<(), String>

Stop the currently running subprocess.

Trait Implementations§

Source§

impl Controller for SubprocessController

Source§

fn start( &self, _block_meta: HashMap<String, Value>, _rt_opts: Option<Value>, _force: bool, ) -> Result<(), String>

Start the controller. May spawn background tasks. force restarts even if already running.
Source§

fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String>

Stop the controller. graceful waits for process to exit; new_status is the target state.
Source§

fn get_runtime_status(&self) -> BlockControllerRuntimeStatus

Get the current runtime status.
Source§

fn send_input( &self, input: BlockInputUnion, _seq: Option<u64>, ) -> Result<(), String>

Send input (terminal data, signal, or resize) to the controller. seq is the per-TermViewModel monotonic counter; None means fire-and-forget (no ordering).
Source§

fn controller_type(&self) -> &str

Get the controller type (e.g., “shell”, “cmd”).
Source§

fn block_id(&self) -> &str

Get the block ID.
Source§

fn as_any(&self) -> &dyn Any

Downcast support for concrete controller types.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Downcast for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

Convert Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

Convert Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
§

fn as_any(&self) -> &(dyn Any + 'static)

Convert &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
§

fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)

Convert &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
§

impl<T> DowncastSync for T
where T: Any + Send + Sync,

§

fn into_any_arc(self: Arc<T>) -> Arc<dyn Any + Sync + Send>

Convert Arc<Trait> (where Trait: Downcast) to Arc<Any>. Arc<Any> can then be further downcast into Arc<ConcreteType> where ConcreteType implements Trait.
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,