agentmux_launcher\host_pipe/
mod.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// CPD-2 — launcher → host pipe wrapper.
5//
6// This module wraps the writer half of the launcher's named-pipe
7// connection to the host process and exposes a stable interface for
8// the rest of the launcher to push **Commands** (saga-issued, future
9// CPD-3 wiring) and **Events** (existing reducer fanout) down to the
10// host as `HostFrame` envelopes.
11//
12// CPD-2 is **infrastructure-only**: the saga coordinator's
13// `apply_action` for `IssueCmd::Host` STAYS log-only. CPD-3 replaces
14// the log with a `HostPipe::send_command` call.
15//
16// See `docs/specs/SPEC_CROSS_PROCESS_DISPATCH_2026-05-01.md` §3.5 +
17// §3.9 for the design + spec acceptance criteria.
18//
19// ## Lifecycle / connection model
20//
21// The IPC server's per-connection fanout task hands the host's writer
22// half to `HostPipe::set_writer` once the connecting client has
23// registered as `ClientKind::Host`. On disconnect (read EOF or write
24// failure), the same task calls `HostPipe::clear_writer`, which arms
25// the disconnect timer (30s — see §3.9). Reconnect re-arms via
26// `set_writer` again.
27//
28// While disconnected, `send_command` BUFFERS into a bounded
29// `pending_buffer` (cap 64 frames). On reconnect, frames drain in
30// FIFO order. On overflow OR after 30s of disconnection, the
31// pending buffer is flushed and **`Event::SagaActionFailed` is
32// emitted on the broadcast bus for every Command frame whose saga
33// got dropped.** Event frames are silently dropped on overflow —
34// they're already broadcast to other clients via the per-connection
35// fanout, so the host missing one isn't a saga-correctness issue.
36//
37// Until CPD-1 (schema PR) lands, `Event::SagaActionFailed` does not
38// exist on the agentmux-common Event enum. To avoid a hard dep on
39// CPD-1's merge order, this module emits failures via the existing
40// `Event::SagaFailed { saga_id, reason, version }` event with a
41// `reason` prefix of `"host pipe ..."` so a downstream observer can
42// filter. CPD-3 + CPD-1 will tighten the variant surface.
43//
44// ## HostFrame envelope
45//
46// Per spec §3.1, the launcher → host wire eventually carries a
47// tagged union of Event vs Command. Until CPD-1 lands the envelope
48// in agentmux-common, we declare `HostFrame` here locally. Both
49// CPD-1 and CPD-2 can land independently; whichever lands first
50// owns the envelope, the second PR migrates over.
51
52use std::collections::VecDeque;
53use std::io;
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56
57use agentmux_common::ipc::{Command, Event};
58pub use agentmux_common::ipc::HostFrame;
59use tokio::io::{AsyncWrite, AsyncWriteExt};
60use tokio::sync::Mutex;
61
62mod connection;
63#[cfg(test)]
64mod tests;
65
66/// Maximum number of frames buffered while the host is disconnected.
67/// On overflow, the oldest frame is dropped; if it was a Command,
68/// `Event::SagaFailed` is emitted for the saga whose dispatch was
69/// lost. See spec §3.9.
70pub const PENDING_BUFFER_CAP: usize = 64;
71
72/// Maximum disconnect duration before the pending buffer is flushed.
73/// After this, every buffered Command's saga is failed with
74/// `"host unreachable"`. See spec §3.9.
75pub const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(30);
76
77/// Reasons `send_command` / `send_event` can fail synchronously.
78///
79/// `HostNotConnected` is reserved for cases where the caller wants to
80/// **fail-fast** instead of buffering (CPD-3 will choose whichever
81/// fits per command kind). The default `send_command` code path
82/// buffers; see `try_send_command_no_buffer` for the fail-fast variant.
83#[derive(Debug)]
84pub enum HostPipeError {
85    /// No host is currently connected and the caller asked to fail
86    /// rather than buffer.
87    #[allow(dead_code)] // reserved for fail-fast callers (CPD-3+)
88    HostNotConnected,
89    /// Underlying write to the pipe failed.
90    WriteFailed(io::Error),
91    /// Frame failed to serialize. Should be impossible for well-
92    /// formed Command / Event variants but kept as a result variant
93    /// for defense-in-depth.
94    #[allow(dead_code)] // defense-in-depth; serde_json::to_vec on owned types should not fail
95    Serialize(serde_json::Error),
96    /// Pending buffer is full and we couldn't drop the oldest entry
97    /// to make room (e.g. caller asked for no-overflow semantics).
98    /// Default `send_command` does drop-oldest + emit-failed, so
99    /// callers won't see this from the public API today.
100    #[allow(dead_code)] // reserved for stricter callers
101    BufferOverflow,
102    /// Caller's captured `host_session_id` no longer matches the
103    /// pipe's current session (a new host registered, displacing the
104    /// old one). The caller — typically a per-host fanout task — should
105    /// exit immediately so it doesn't write into the new host's writer.
106    /// (codex P1 PR #642 round 2.)
107    StaleSession,
108}
109
110impl std::fmt::Display for HostPipeError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            HostPipeError::HostNotConnected => write!(f, "host not connected"),
114            HostPipeError::WriteFailed(e) => write!(f, "write to host pipe failed: {}", e),
115            HostPipeError::Serialize(e) => write!(f, "frame serialize failed: {}", e),
116            HostPipeError::BufferOverflow => write!(f, "host pipe pending buffer overflow"),
117            HostPipeError::StaleSession => write!(f, "host pipe session stale (replaced by new host registration)"),
118        }
119    }
120}
121
122impl std::error::Error for HostPipeError {}
123
124/// Trait-object writer reference the HostPipe holds when connected.
125///
126/// `Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>` so the same
127/// physical write half is reachable both by HostPipe (event fanout +
128/// saga command dispatch) AND by the IPC server's per-connection
129/// handler (connection-private error replies — Event::Error on parse
130/// failures + register-first violations). The mutex serializes
131/// writes so frames can't interleave on the wire.
132///
133/// On set, HostPipe stores a clone of the same Arc the per-connection
134/// handler uses for its main loop. On clear, HostPipe drops its
135/// clone; the handler still has its own Arc so error replies (if
136/// any) keep working through the disconnect path.
137pub type SharedWriter = Arc<Mutex<BoxedWriter>>;
138
139/// Test-only convenience: own a Box directly. Tests construct one
140/// via `make_shared_writer(Box::new(duplex_a))`.
141pub type BoxedWriter = Box<dyn AsyncWrite + Unpin + Send>;
142
143/// Wrap a boxed writer into a SharedWriter. Used by tests +
144/// `ipc::server` to coerce a concrete WriteHalf into the trait
145/// object the HostPipe stores.
146///
147/// Implementation note: we go through an intermediate
148/// `Arc<Mutex<Box<dyn AsyncWrite + ...>>>` and then move the inner
149/// Box out at write time. This avoids relying on
150/// `Arc<Mutex<Concrete>>`-to-`Arc<Mutex<dyn>>` unsizing coercion,
151/// which is unstable across MSRVs for `tokio::sync::Mutex`.
152pub fn make_shared_writer(writer: BoxedWriter) -> SharedWriter {
153    Arc::new(Mutex::new(writer))
154}
155
156/// One frame waiting to be written to the host. Tracks the saga_id
157/// when known so we can emit a meaningful `SagaFailed` if the frame
158/// is dropped on overflow / timeout.
159#[derive(Debug)]
160struct PendingFrame {
161    frame: HostFrame,
162    /// `Some(id)` for Command frames whose Command variant carries
163    /// a saga_id. `None` for Event frames (the host is not the only
164    /// subscriber for events — they also flow to the renderer / Tool
165    /// clients via the broadcast bus, so a missed event-frame is not
166    /// a saga-level failure) and for Command frames whose variant
167    /// has no saga_id field yet (the schema additions land in CPD-1).
168    saga_id: Option<u64>,
169}
170
171struct HostPipeInner {
172    writer: Option<SharedWriter>,
173    pending_buffer: VecDeque<PendingFrame>,
174    /// Set when the writer transitions from Some -> None. Cleared on
175    /// reconnect. Used to enforce the 30s disconnect timeout.
176    host_disconnected_at: Option<Instant>,
177    /// Monotonic generation incremented on every `set_writer` call.
178    /// Per-host fanout tasks capture this at start and pass it back
179    /// into `send_event_for_session`, which no-ops if it doesn't
180    /// match the current generation. Prevents a stale fanout (from
181    /// an old host connection that hasn't fully torn down yet) from
182    /// writing into a freshly-registered host's writer.
183    /// (codex P1 PR #642 round 2.)
184    host_session_id: u64,
185}
186
187impl HostPipeInner {
188    fn new() -> Self {
189        Self {
190            writer: None,
191            pending_buffer: VecDeque::with_capacity(PENDING_BUFFER_CAP),
192            host_disconnected_at: None,
193            host_session_id: 0,
194        }
195    }
196}
197
198/// Public wrapper around the launcher → host pipe.
199///
200/// Cheap to clone (one `Arc`); shared by the IPC server's per-
201/// connection task (set/clear writer) and the saga coordinator
202/// (send_command, CPD-3) and the per-host fanout task (send_event,
203/// refactored in this PR).
204///
205/// Manual `Debug` (rather than derive) so we don't have to require
206/// `Debug` on the boxed writer trait object.
207#[derive(Clone)]
208pub struct HostPipe {
209    inner: Arc<Mutex<HostPipeInner>>,
210    /// Broadcast bus reference — used to emit `Event::SagaFailed`
211    /// when a buffered Command is dropped (overflow or 30s timeout).
212    events_tx: tokio::sync::broadcast::Sender<Event>,
213    /// Reference to the launcher's reducer state for `bump_version`
214    /// when emitting saga lifecycle events. Mirrors the saga
215    /// coordinator's pattern so emitted events get a fresh,
216    /// monotonic global version.
217    state: Arc<Mutex<crate::state::State>>,
218}
219
220impl std::fmt::Debug for HostPipe {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct("HostPipe").finish_non_exhaustive()
223    }
224}
225
226impl HostPipe {
227    pub fn new(
228        events_tx: tokio::sync::broadcast::Sender<Event>,
229        state: Arc<Mutex<crate::state::State>>,
230    ) -> Self {
231        Self {
232            inner: Arc::new(Mutex::new(HostPipeInner::new())),
233            events_tx,
234            state,
235        }
236    }
237
238    /// Hand the host's writer half to the pipe. Called by the IPC
239    /// server's per-connection task once the connecting client
240    /// registers as `ClientKind::Host`. Drains any pending frames
241    /// in FIFO order before returning.
242    ///
243    /// If a writer is already set (a prior host registered and this
244    /// is a second), the prior writer is replaced. The drain logic
245    /// still runs against the new writer.
246    ///
247    /// **Locking:** the inner mutex is held across the per-frame
248    /// `write_all().await`. That's intentional — concurrent
249    /// `send_frame` calls during drain would race FIFO ordering
250    /// against the freshly-installed writer. Holding the lock keeps
251    /// the drain-then-install transition atomic from the caller's
252    /// perspective. Hold time is bounded by `pending_buffer` size
253    /// (cap 64) × per-frame write latency (microseconds on a healthy
254    /// pipe), well under the existing `state.lock()` discipline.
255    /// Returns the new `host_session_id`. The caller (IPC server's
256    /// host-connection setup) MUST capture this and pass it to the
257    /// per-connection fanout task; the fanout task uses it as a
258    /// stale-writer guard via `send_event_for_session`.
259    pub async fn set_writer(&self, writer: SharedWriter) -> u64 {
260        let mut inner = self.inner.lock().await;
261        inner.host_disconnected_at = None;
262        inner.host_session_id = inner.host_session_id.wrapping_add(1);
263        let session_id = inner.host_session_id;
264        // Drain pending frames against the new writer in FIFO order.
265        while let Some(p) = inner.pending_buffer.pop_front() {
266            let mut wlock = writer.lock().await;
267            let res = write_frame_async(&mut **wlock, &p.frame).await;
268            drop(wlock);
269            if let Err(e) = res {
270                crate::log(&format!(
271                    "[host_pipe] drain failed mid-flight: {} — re-buffering rest",
272                    e
273                ));
274                inner.pending_buffer.push_front(p);
275                inner.host_disconnected_at = Some(Instant::now());
276                return session_id;
277            }
278        }
279        inner.writer = Some(writer);
280        session_id
281    }
282
283    /// Mark the host as disconnected. Arms the 30s timer; subsequent
284    /// `send_command` calls buffer up to `PENDING_BUFFER_CAP`.
285    pub async fn clear_writer(&self) {
286        let mut inner = self.inner.lock().await;
287        if inner.writer.is_some() {
288            inner.writer = None;
289            inner.host_disconnected_at = Some(Instant::now());
290            crate::log("[host_pipe] host disconnected — buffering subsequent frames (30s budget)");
291        }
292    }
293
294    /// Remove every buffered frame whose `saga_id` matches `saga_id`.
295    /// Returns the count of removed frames.
296    ///
297    /// Called from `SagaCoordinator::claim_terminal` when a saga
298    /// terminates (Done / Failed / timeout / eviction / etc.) — the
299    /// terminal event closes the saga bracket, so any frames the
300    /// saga left in the pending buffer would, if drained on reconnect,
301    /// produce orphan side effects after the bracket is already
302    /// closed (and risk a second `SagaFailed` via the 30s expiry
303    /// path's `emit_drop_failure`). Purging here keeps the saga's
304    /// terminal slot atomic across both the bus and the wire.
305    /// (codex + reagent P1 PR #644 round 7.)
306    pub async fn cancel_saga(&self, saga_id: u64) -> usize {
307        let mut inner = self.inner.lock().await;
308        let before = inner.pending_buffer.len();
309        inner.pending_buffer.retain(|f| f.saga_id != Some(saga_id));
310        let removed = before - inner.pending_buffer.len();
311        if removed > 0 {
312            crate::log(&format!(
313                "[host_pipe] cancel_saga(saga_id={}) purged {} buffered frame(s)",
314                saga_id, removed
315            ));
316        }
317        removed
318    }
319
320    /// Drain `pending_buffer` through the given writer in FIFO order.
321    /// Used by `send_frame`'s ptr_eq-mismatch path to flush a frame
322    /// that landed in the buffer after a writer-replacement race
323    /// (codex P1 PR #642 round 4). Symmetric with `set_writer`'s
324    /// drain logic but operates on an existing writer rather than
325    /// installing a new one. On per-frame failure, re-buffers the
326    /// remaining frames at the front and arms the disconnect timer.
327    async fn drain_pending_to(&self, writer: &SharedWriter) {
328        let mut inner = self.inner.lock().await;
329        while let Some(p) = inner.pending_buffer.pop_front() {
330            let mut wlock = writer.lock().await;
331            let res = write_frame_async(&mut **wlock, &p.frame).await;
332            drop(wlock);
333            if let Err(e) = res {
334                crate::log(&format!(
335                    "[host_pipe] mid-flight drain failed: {} — re-buffering rest",
336                    e
337                ));
338                inner.pending_buffer.push_front(p);
339                // Best-effort: clear the writer slot if it's still ours
340                // (caller is expected to have already raced past clear)
341                // and arm the disconnect timer so the next reconnect
342                // resumes the drain.
343                let same_writer = inner
344                    .writer
345                    .as_ref()
346                    .is_some_and(|cur| Arc::ptr_eq(cur, writer));
347                if same_writer {
348                    inner.writer = None;
349                    inner.host_disconnected_at = Some(Instant::now());
350                }
351                return;
352            }
353        }
354    }
355
356    /// Push a Command down to the host. On disconnect, buffers up to
357    /// `PENDING_BUFFER_CAP`; on overflow or 30s timeout, drops + emits
358    /// `Event::SagaFailed` for the dropped Command's saga.
359    ///
360    /// CPD-3 — called from the saga coordinator's `apply_action` when
361    /// dispatching `IssueCmd::Host` actions.
362    pub async fn send_command(&self, cmd: &Command) -> Result<(), HostPipeError> {
363        let saga_id = saga_id_of(cmd);
364        let frame = HostFrame::Command(cmd.clone());
365        self.send_frame(frame, saga_id).await
366    }
367
368    /// Push an Event down to the host. Called by the per-connection
369    /// fanout task in `ipc::server` for the host's connection (replaces
370    /// the prior direct `send_event` call). Other client kinds keep
371    /// the direct path because they're not subject to HostPipe's
372    /// pending-buffer / reconnect semantics.
373    ///
374    /// `session_id` is the value captured from `set_writer` when the
375    /// host registered. If the current writer's session_id no longer
376    /// matches (because a new host registered and replaced the
377    /// writer), this returns `Err(HostPipeError::StaleSession)` so the
378    /// fanout task exits cleanly without writing to the new host.
379    /// (codex P1 PR #642 round 2.)
380    ///
381    /// **Wire format:** writes RAW `Event` JSON (no `HostFrame` envelope).
382    /// The host's existing parser (`agentmux-cef/src/launcher_ipc.rs`)
383    /// expects raw Event lines; CPD-3 will update host to handle
384    /// `HostFrame` and flip events to envelope form. Until then events
385    /// stay raw to preserve compat. Commands (sent via `send_command`)
386    /// already use `HostFrame::Command` because they're new and not
387    /// yet read by host. (codex P1 PR #642 round 3.)
388    ///
389    /// **Atomicity:** session check + writer fetch happen under a
390    /// single inner-lock acquisition to close the session-bypass
391    /// TOCTOU window. (codex P1 + reagent P1 PR #642 round 3.)
392    ///
393    /// **No pending buffer for events.** If the host is disconnected,
394    /// events are dropped silently. They're broadcast-bus-driven —
395    /// stale events post-reconnect would be incorrect anyway, and the
396    /// renderer/srv subscribers still get them. The pending buffer is
397    /// reserved for `send_command` (saga-issued, duty-cycle critical).
398    pub async fn send_event_for_session(
399        &self,
400        session_id: u64,
401        event: &Event,
402    ) -> Result<(), HostPipeError> {
403        // Atomic: hold inner lock for the entire session-check +
404        // writer-clone window so a concurrent set_writer can't slip
405        // a fresh writer in between.
406        let writer = {
407            let inner = self.inner.lock().await;
408            if inner.host_session_id != session_id {
409                return Err(HostPipeError::StaleSession);
410            }
411            match inner.writer.as_ref() {
412                Some(w) => Arc::clone(w),
413                None => return Ok(()), // disconnected; events drop (not buffered)
414            }
415        };
416
417        // Serialize raw Event (NOT HostFrame::Event — see fn doc).
418        let mut bytes =
419            serde_json::to_vec(event).map_err(HostPipeError::Serialize)?;
420        bytes.push(b'\n');
421
422        let mut wlock = writer.lock().await;
423        wlock
424            .write_all(&bytes)
425            .await
426            .map_err(HostPipeError::WriteFailed)?;
427        wlock
428            .flush()
429            .await
430            .map_err(HostPipeError::WriteFailed)?;
431        Ok(())
432    }
433
434    /// Send an event without a session check — only safe in tests
435    /// or single-session contexts. Production fanout MUST use
436    /// `send_event_for_session` so a stale fanout from a previous
437    /// host connection cannot write into a fresh host's writer.
438    /// Mirrors `send_event_for_session`'s raw-Event wire format
439    /// (no HostFrame wrap).
440    #[cfg(test)]
441    pub async fn send_event(&self, event: &Event) -> Result<(), HostPipeError> {
442        let writer = {
443            let inner = self.inner.lock().await;
444            match inner.writer.as_ref() {
445                Some(w) => Arc::clone(w),
446                None => return Ok(()),
447            }
448        };
449        let mut bytes =
450            serde_json::to_vec(event).map_err(HostPipeError::Serialize)?;
451        bytes.push(b'\n');
452        let mut wlock = writer.lock().await;
453        wlock
454            .write_all(&bytes)
455            .await
456            .map_err(HostPipeError::WriteFailed)?;
457        wlock
458            .flush()
459            .await
460            .map_err(HostPipeError::WriteFailed)?;
461        Ok(())
462    }
463
464    async fn send_frame(
465        &self,
466        frame: HostFrame,
467        saga_id: Option<u64>,
468    ) -> Result<(), HostPipeError> {
469        // First, check the disconnect timer. If we've been disconnected
470        // > 30s, drain the pending buffer + emit failures BEFORE we
471        // queue a fresh frame on top.
472        self.expire_pending_if_timed_out().await;
473
474        // Atomic decision: under the inner lock, either pull a writer
475        // clone (then release lock and write) OR push to pending buffer
476        // (lock held to keep the buffer-vs-writer state consistent).
477        // Splitting this into two lock acquisitions creates the TOCTOU
478        // window where set_writer can land between "see None" and
479        // "push to buffer", causing frames to wedge in the buffer
480        // while the pipe is actually connected. (codex P1 + reagent P1
481        // PR #642 round 3.)
482        enum Decision {
483            Write(SharedWriter),
484            Buffered { dropped: Option<PendingFrame> },
485        }
486        let decision = {
487            let mut inner = self.inner.lock().await;
488            match inner.writer.as_ref() {
489                Some(w) => Decision::Write(Arc::clone(w)),
490                None => {
491                    let dropped = if inner.pending_buffer.len() >= PENDING_BUFFER_CAP {
492                        inner.pending_buffer.pop_front()
493                    } else {
494                        None
495                    };
496                    inner
497                        .pending_buffer
498                        .push_back(PendingFrame { frame: frame.clone(), saga_id });
499                    Decision::Buffered { dropped }
500                }
501            }
502        };
503
504        match decision {
505            Decision::Write(writer) => {
506                let mut wlock = writer.lock().await;
507                match write_frame_async(&mut **wlock, &frame).await {
508                    Ok(()) => Ok(()),
509                    Err(e) => {
510                        drop(wlock);
511                        crate::log(&format!(
512                            "[host_pipe] direct write failed: {} — clearing writer + buffering",
513                            e
514                        ));
515                        // Arc::ptr_eq guards against clobbering a freshly-
516                        // installed writer if set_writer raced in between
517                        // the failed write and this clear. (reagent P1
518                        // PR #642 round 3.)
519                        let mut inner = self.inner.lock().await;
520                        let same_writer = inner
521                            .writer
522                            .as_ref()
523                            .is_some_and(|cur| Arc::ptr_eq(cur, &writer));
524                        if same_writer {
525                            inner.writer = None;
526                            inner.host_disconnected_at = Some(Instant::now());
527                            inner
528                                .pending_buffer
529                                .push_back(PendingFrame { frame, saga_id });
530                            Err(HostPipeError::WriteFailed(e))
531                        } else {
532                            // Writer was concurrently replaced (set_writer
533                            // raced and already drained whatever buffer
534                            // existed at install time). Push to the new
535                            // writer's buffer + immediately retry through
536                            // the new writer; otherwise this frame sits
537                            // until the next reconnect, even though the
538                            // pipe is healthy. (codex P1 PR #642 round 4.)
539                            let new_writer = inner.writer.as_ref().map(Arc::clone);
540                            // Push the failed frame to buffer FIRST so
541                            // the drain below picks it up in FIFO order
542                            // alongside any other in-flight pending.
543                            inner
544                                .pending_buffer
545                                .push_back(PendingFrame { frame, saga_id });
546                            drop(inner);
547                            if let Some(new_w) = new_writer {
548                                self.drain_pending_to(&new_w).await;
549                            }
550                            // We still surface the original write failure
551                            // to the caller — they asked for THIS frame
552                            // to be sent on the pipe they had at call
553                            // time. The drain attempt is best-effort
554                            // recovery; saga-level retry should kick in
555                            // if the drain fails too.
556                            Err(HostPipeError::WriteFailed(e))
557                        }
558                    }
559                }
560            }
561            Decision::Buffered { dropped } => {
562                if let Some(dropped) = dropped {
563                    self.emit_drop_failure(dropped, "host pipe backpressure overflow")
564                        .await;
565                }
566                Ok(())
567            }
568        }
569    }
570
571    /// If the disconnect timer has elapsed, flush the pending buffer
572    /// and emit `Event::SagaFailed` for every buffered Command.
573    async fn expire_pending_if_timed_out(&self) {
574        let now = Instant::now();
575        let drained: Vec<PendingFrame> = {
576            let mut inner = self.inner.lock().await;
577            let Some(start) = inner.host_disconnected_at else {
578                return;
579            };
580            if now.duration_since(start) < DISCONNECT_TIMEOUT {
581                return;
582            }
583            // Reset the timer so we don't re-drain on the next call;
584            // the next disconnect transition rearms it.
585            inner.host_disconnected_at = None;
586            inner.pending_buffer.drain(..).collect()
587        };
588        if !drained.is_empty() {
589            crate::log(&format!(
590                "[host_pipe] disconnect exceeded {:?}; dropping {} buffered frames",
591                DISCONNECT_TIMEOUT,
592                drained.len()
593            ));
594        }
595        for f in drained {
596            self.emit_drop_failure(f, "host unreachable").await;
597        }
598    }
599
600    async fn emit_drop_failure(&self, dropped: PendingFrame, reason: &str) {
601        let Some(saga_id) = dropped.saga_id else {
602            // Event frames + Command frames without a saga_id are
603            // silently dropped — they don't gate any saga's progress.
604            return;
605        };
606        let v = {
607            let mut state = self.state.lock().await;
608            state.bump_version()
609        };
610        let evt = Event::SagaFailed {
611            saga_id,
612            reason: reason.to_string(),
613            version: v,
614        };
615        let _ = self.events_tx.send(evt);
616    }
617
618    /// Test-only inspection of the current pending buffer length.
619    #[cfg(test)]
620    pub async fn pending_len(&self) -> usize {
621        self.inner.lock().await.pending_buffer.len()
622    }
623
624    /// Test-only inspection of whether a writer is currently set.
625    #[cfg(test)]
626    pub async fn is_connected(&self) -> bool {
627        self.inner.lock().await.writer.is_some()
628    }
629
630    /// Test-only: force the disconnect timer back by `delta` so the
631    /// 30s timeout fires without sleeping in tests.
632    #[cfg(test)]
633    pub async fn rewind_disconnect_timer(&self, delta: Duration) {
634        let mut inner = self.inner.lock().await;
635        if let Some(t) = inner.host_disconnected_at.as_mut() {
636            *t = t.checked_sub(delta).unwrap_or(*t);
637        }
638    }
639}
640
641/// Pull the saga_id off a Command if its variant carries one.
642///
643/// Used by `send_frame`'s pending-buffer machinery: when a buffered
644/// command is dropped (overflow or 30s host-disconnect timeout),
645/// `emit_drop_failure` emits `Event::SagaFailed { saga_id }` so the
646/// saga's bracket closes cleanly instead of waiting forever.
647/// CPD-1 added `saga_id` fields on host-bound variants; CPD-3 made
648/// `send_command` live from the saga coordinator, so the drop
649/// machinery is now saga-correctness-relevant.
650fn saga_id_of(cmd: &Command) -> Option<u64> {
651    // CPD-1 added saga_id fields to host-bound Commands; CPD-3 made
652    // send_command live from the saga coordinator. Returning the real
653    // id here means HostPipe's drop-on-overflow + 30s-timeout-flush
654    // paths can emit `Event::SagaFailed` for the right saga, instead
655    // of orphaning the buffered command. (reagent P1 PR #644 round 5.)
656    match cmd {
657        Command::SpawnPoolWindow { saga_id } => Some(*saga_id),
658        Command::ReapPanes { saga_id, .. } => Some(*saga_id),
659        Command::DrainPoolIfLast { saga_id, .. } => Some(*saga_id),
660        _ => None,
661    }
662}
663
664/// Serialize a `HostFrame` as newline-delimited JSON and write it
665/// to a writer. Used both for direct writes (connected) and for
666/// pending-buffer drain (post-reconnect).
667async fn write_frame_async<W: AsyncWrite + Unpin + ?Sized>(
668    writer: &mut W,
669    frame: &HostFrame,
670) -> io::Result<()> {
671    let mut buf = serde_json::to_vec(frame)
672        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
673    buf.push(b'\n');
674    writer.write_all(&buf).await?;
675    writer.flush().await?;
676    Ok(())
677}