HostPipe

Struct HostPipe 

Source
pub struct HostPipe {
    inner: Arc<Mutex<HostPipeInner>>,
    events_tx: Sender<Event>,
    state: Arc<Mutex<State>>,
}
Expand description

Public wrapper around the launcher → host pipe.

Cheap to clone (one Arc); shared by the IPC server’s per- connection task (set/clear writer) and the saga coordinator (send_command, CPD-3) and the per-host fanout task (send_event, refactored in this PR).

Manual Debug (rather than derive) so we don’t have to require Debug on the boxed writer trait object.

Fields§

§inner: Arc<Mutex<HostPipeInner>>§events_tx: Sender<Event>

Broadcast bus reference — used to emit Event::SagaFailed when a buffered Command is dropped (overflow or 30s timeout).

§state: Arc<Mutex<State>>

Reference to the launcher’s reducer state for bump_version when emitting saga lifecycle events. Mirrors the saga coordinator’s pattern so emitted events get a fresh, monotonic global version.

Implementations§

Source§

impl HostPipe

Source

pub fn new(events_tx: Sender<Event>, state: Arc<Mutex<State>>) -> Self

Source

pub async fn set_writer( &self, writer: Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>, ) -> u64

Hand the host’s writer half to the pipe. Called by the IPC server’s per-connection task once the connecting client registers as ClientKind::Host. Drains any pending frames in FIFO order before returning.

If a writer is already set (a prior host registered and this is a second), the prior writer is replaced. The drain logic still runs against the new writer.

Locking: the inner mutex is held across the per-frame write_all().await. That’s intentional — concurrent send_frame calls during drain would race FIFO ordering against the freshly-installed writer. Holding the lock keeps the drain-then-install transition atomic from the caller’s perspective. Hold time is bounded by pending_buffer size (cap 64) × per-frame write latency (microseconds on a healthy pipe), well under the existing state.lock() discipline. Returns the new host_session_id. The caller (IPC server’s host-connection setup) MUST capture this and pass it to the per-connection fanout task; the fanout task uses it as a stale-writer guard via send_event_for_session.

Source

pub async fn clear_writer(&self)

Mark the host as disconnected. Arms the 30s timer; subsequent send_command calls buffer up to PENDING_BUFFER_CAP.

Source

pub async fn cancel_saga(&self, saga_id: u64) -> usize

Remove every buffered frame whose saga_id matches saga_id. Returns the count of removed frames.

Called from SagaCoordinator::claim_terminal when a saga terminates (Done / Failed / timeout / eviction / etc.) — the terminal event closes the saga bracket, so any frames the saga left in the pending buffer would, if drained on reconnect, produce orphan side effects after the bracket is already closed (and risk a second SagaFailed via the 30s expiry path’s emit_drop_failure). Purging here keeps the saga’s terminal slot atomic across both the bus and the wire. (codex + reagent P1 PR #644 round 7.)

Source

async fn drain_pending_to( &self, writer: &Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>, )

Drain pending_buffer through the given writer in FIFO order. Used by send_frame’s ptr_eq-mismatch path to flush a frame that landed in the buffer after a writer-replacement race (codex P1 PR #642 round 4). Symmetric with set_writer’s drain logic but operates on an existing writer rather than installing a new one. On per-frame failure, re-buffers the remaining frames at the front and arms the disconnect timer.

Source

pub async fn send_command(&self, cmd: &Command) -> Result<(), HostPipeError>

Push a Command down to the host. On disconnect, buffers up to PENDING_BUFFER_CAP; on overflow or 30s timeout, drops + emits Event::SagaFailed for the dropped Command’s saga.

CPD-3 — called from the saga coordinator’s apply_action when dispatching IssueCmd::Host actions.

Source

pub async fn send_event_for_session( &self, session_id: u64, event: &Event, ) -> Result<(), HostPipeError>

Push an Event down to the host. Called by the per-connection fanout task in ipc::server for the host’s connection (replaces the prior direct send_event call). Other client kinds keep the direct path because they’re not subject to HostPipe’s pending-buffer / reconnect semantics.

session_id is the value captured from set_writer when the host registered. If the current writer’s session_id no longer matches (because a new host registered and replaced the writer), this returns Err(HostPipeError::StaleSession) so the fanout task exits cleanly without writing to the new host. (codex P1 PR #642 round 2.)

Wire format: writes RAW Event JSON (no HostFrame envelope). The host’s existing parser (agentmux-cef/src/launcher_ipc.rs) expects raw Event lines; CPD-3 will update host to handle HostFrame and flip events to envelope form. Until then events stay raw to preserve compat. Commands (sent via send_command) already use HostFrame::Command because they’re new and not yet read by host. (codex P1 PR #642 round 3.)

Atomicity: session check + writer fetch happen under a single inner-lock acquisition to close the session-bypass TOCTOU window. (codex P1 + reagent P1 PR #642 round 3.)

No pending buffer for events. If the host is disconnected, events are dropped silently. They’re broadcast-bus-driven — stale events post-reconnect would be incorrect anyway, and the renderer/srv subscribers still get them. The pending buffer is reserved for send_command (saga-issued, duty-cycle critical).

Source

async fn send_frame( &self, frame: HostFrame, saga_id: Option<u64>, ) -> Result<(), HostPipeError>

Source

async fn expire_pending_if_timed_out(&self)

If the disconnect timer has elapsed, flush the pending buffer and emit Event::SagaFailed for every buffered Command.

Source

async fn emit_drop_failure(&self, dropped: PendingFrame, reason: &str)

Trait Implementations§

Source§

impl Clone for HostPipe

Source§

fn clone(&self) -> HostPipe

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for HostPipe

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more