pub struct HostPipe {
inner: Arc<Mutex<HostPipeInner>>,
events_tx: Sender<Event>,
state: Arc<Mutex<State>>,
}Expand description
Public wrapper around the launcher → host pipe.
Cheap to clone (one Arc); shared by the IPC server’s per-
connection task (set/clear writer) and the saga coordinator
(send_command, CPD-3) and the per-host fanout task (send_event,
refactored in this PR).
Manual Debug (rather than derive) so we don’t have to require
Debug on the boxed writer trait object.
Fields§
§inner: Arc<Mutex<HostPipeInner>>§events_tx: Sender<Event>Broadcast bus reference — used to emit Event::SagaFailed
when a buffered Command is dropped (overflow or 30s timeout).
state: Arc<Mutex<State>>Reference to the launcher’s reducer state for bump_version
when emitting saga lifecycle events. Mirrors the saga
coordinator’s pattern so emitted events get a fresh,
monotonic global version.
Implementations§
Source§impl HostPipe
impl HostPipe
pub fn new(events_tx: Sender<Event>, state: Arc<Mutex<State>>) -> Self
Sourcepub async fn set_writer(
&self,
writer: Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>,
) -> u64
pub async fn set_writer( &self, writer: Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>, ) -> u64
Hand the host’s writer half to the pipe. Called by the IPC
server’s per-connection task once the connecting client
registers as ClientKind::Host. Drains any pending frames
in FIFO order before returning.
If a writer is already set (a prior host registered and this is a second), the prior writer is replaced. The drain logic still runs against the new writer.
Locking: the inner mutex is held across the per-frame
write_all().await. That’s intentional — concurrent
send_frame calls during drain would race FIFO ordering
against the freshly-installed writer. Holding the lock keeps
the drain-then-install transition atomic from the caller’s
perspective. Hold time is bounded by pending_buffer size
(cap 64) × per-frame write latency (microseconds on a healthy
pipe), well under the existing state.lock() discipline.
Returns the new host_session_id. The caller (IPC server’s
host-connection setup) MUST capture this and pass it to the
per-connection fanout task; the fanout task uses it as a
stale-writer guard via send_event_for_session.
Sourcepub async fn clear_writer(&self)
pub async fn clear_writer(&self)
Mark the host as disconnected. Arms the 30s timer; subsequent
send_command calls buffer up to PENDING_BUFFER_CAP.
Sourcepub async fn cancel_saga(&self, saga_id: u64) -> usize
pub async fn cancel_saga(&self, saga_id: u64) -> usize
Remove every buffered frame whose saga_id matches saga_id.
Returns the count of removed frames.
Called from SagaCoordinator::claim_terminal when a saga
terminates (Done / Failed / timeout / eviction / etc.) — the
terminal event closes the saga bracket, so any frames the
saga left in the pending buffer would, if drained on reconnect,
produce orphan side effects after the bracket is already
closed (and risk a second SagaFailed via the 30s expiry
path’s emit_drop_failure). Purging here keeps the saga’s
terminal slot atomic across both the bus and the wire.
(codex + reagent P1 PR #644 round 7.)
Sourceasync fn drain_pending_to(
&self,
writer: &Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>,
)
async fn drain_pending_to( &self, writer: &Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>, )
Drain pending_buffer through the given writer in FIFO order.
Used by send_frame’s ptr_eq-mismatch path to flush a frame
that landed in the buffer after a writer-replacement race
(codex P1 PR #642 round 4). Symmetric with set_writer’s
drain logic but operates on an existing writer rather than
installing a new one. On per-frame failure, re-buffers the
remaining frames at the front and arms the disconnect timer.
Sourcepub async fn send_command(&self, cmd: &Command) -> Result<(), HostPipeError>
pub async fn send_command(&self, cmd: &Command) -> Result<(), HostPipeError>
Push a Command down to the host. On disconnect, buffers up to
PENDING_BUFFER_CAP; on overflow or 30s timeout, drops + emits
Event::SagaFailed for the dropped Command’s saga.
CPD-3 — called from the saga coordinator’s apply_action when
dispatching IssueCmd::Host actions.
Sourcepub async fn send_event_for_session(
&self,
session_id: u64,
event: &Event,
) -> Result<(), HostPipeError>
pub async fn send_event_for_session( &self, session_id: u64, event: &Event, ) -> Result<(), HostPipeError>
Push an Event down to the host. Called by the per-connection
fanout task in ipc::server for the host’s connection (replaces
the prior direct send_event call). Other client kinds keep
the direct path because they’re not subject to HostPipe’s
pending-buffer / reconnect semantics.
session_id is the value captured from set_writer when the
host registered. If the current writer’s session_id no longer
matches (because a new host registered and replaced the
writer), this returns Err(HostPipeError::StaleSession) so the
fanout task exits cleanly without writing to the new host.
(codex P1 PR #642 round 2.)
Wire format: writes RAW Event JSON (no HostFrame envelope).
The host’s existing parser (agentmux-cef/src/launcher_ipc.rs)
expects raw Event lines; CPD-3 will update host to handle
HostFrame and flip events to envelope form. Until then events
stay raw to preserve compat. Commands (sent via send_command)
already use HostFrame::Command because they’re new and not
yet read by host. (codex P1 PR #642 round 3.)
Atomicity: session check + writer fetch happen under a single inner-lock acquisition to close the session-bypass TOCTOU window. (codex P1 + reagent P1 PR #642 round 3.)
No pending buffer for events. If the host is disconnected,
events are dropped silently. They’re broadcast-bus-driven —
stale events post-reconnect would be incorrect anyway, and the
renderer/srv subscribers still get them. The pending buffer is
reserved for send_command (saga-issued, duty-cycle critical).
async fn send_frame( &self, frame: HostFrame, saga_id: Option<u64>, ) -> Result<(), HostPipeError>
Sourceasync fn expire_pending_if_timed_out(&self)
async fn expire_pending_if_timed_out(&self)
If the disconnect timer has elapsed, flush the pending buffer
and emit Event::SagaFailed for every buffered Command.