agentmux_launcher\host_pipe/mod.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// CPD-2 — launcher → host pipe wrapper.
5//
6// This module wraps the writer half of the launcher's named-pipe
7// connection to the host process and exposes a stable interface for
8// the rest of the launcher to push **Commands** (saga-issued, future
9// CPD-3 wiring) and **Events** (existing reducer fanout) down to the
10// host as `HostFrame` envelopes.
11//
12// CPD-2 is **infrastructure-only**: the saga coordinator's
13// `apply_action` for `IssueCmd::Host` STAYS log-only. CPD-3 replaces
14// the log with a `HostPipe::send_command` call.
15//
16// See `docs/specs/SPEC_CROSS_PROCESS_DISPATCH_2026-05-01.md` §3.5 +
17// §3.9 for the design + spec acceptance criteria.
18//
19// ## Lifecycle / connection model
20//
21// The IPC server's per-connection fanout task hands the host's writer
22// half to `HostPipe::set_writer` once the connecting client has
23// registered as `ClientKind::Host`. On disconnect (read EOF or write
24// failure), the same task calls `HostPipe::clear_writer`, which arms
25// the disconnect timer (30s — see §3.9). Reconnect re-arms via
26// `set_writer` again.
27//
28// While disconnected, `send_command` BUFFERS into a bounded
29// `pending_buffer` (cap 64 frames). On reconnect, frames drain in
30// FIFO order. On overflow OR after 30s of disconnection, the
31// pending buffer is flushed and **`Event::SagaActionFailed` is
32// emitted on the broadcast bus for every Command frame whose saga
33// got dropped.** Event frames are silently dropped on overflow —
34// they're already broadcast to other clients via the per-connection
35// fanout, so the host missing one isn't a saga-correctness issue.
36//
37// Until CPD-1 (schema PR) lands, `Event::SagaActionFailed` does not
38// exist on the agentmux-common Event enum. To avoid a hard dep on
39// CPD-1's merge order, this module emits failures via the existing
40// `Event::SagaFailed { saga_id, reason, version }` event with a
41// `reason` prefix of `"host pipe ..."` so a downstream observer can
42// filter. CPD-3 + CPD-1 will tighten the variant surface.
43//
44// ## HostFrame envelope
45//
46// Per spec §3.1, the launcher → host wire eventually carries a
47// tagged union of Event vs Command. Until CPD-1 lands the envelope
48// in agentmux-common, we declare `HostFrame` here locally. Both
49// CPD-1 and CPD-2 can land independently; whichever lands first
50// owns the envelope, the second PR migrates over.
51
52use std::collections::VecDeque;
53use std::io;
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56
57use agentmux_common::ipc::{Command, Event};
58pub use agentmux_common::ipc::HostFrame;
59use tokio::io::{AsyncWrite, AsyncWriteExt};
60use tokio::sync::Mutex;
61
62mod connection;
63#[cfg(test)]
64mod tests;
65
66/// Maximum number of frames buffered while the host is disconnected.
67/// On overflow, the oldest frame is dropped; if it was a Command,
68/// `Event::SagaFailed` is emitted for the saga whose dispatch was
69/// lost. See spec §3.9.
70pub const PENDING_BUFFER_CAP: usize = 64;
71
72/// Maximum disconnect duration before the pending buffer is flushed.
73/// After this, every buffered Command's saga is failed with
74/// `"host unreachable"`. See spec §3.9.
75pub const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(30);
76
77/// Reasons `send_command` / `send_event` can fail synchronously.
78///
79/// `HostNotConnected` is reserved for cases where the caller wants to
80/// **fail-fast** instead of buffering (CPD-3 will choose whichever
81/// fits per command kind). The default `send_command` code path
82/// buffers; see `try_send_command_no_buffer` for the fail-fast variant.
83#[derive(Debug)]
84pub enum HostPipeError {
85 /// No host is currently connected and the caller asked to fail
86 /// rather than buffer.
87 #[allow(dead_code)] // reserved for fail-fast callers (CPD-3+)
88 HostNotConnected,
89 /// Underlying write to the pipe failed.
90 WriteFailed(io::Error),
91 /// Frame failed to serialize. Should be impossible for well-
92 /// formed Command / Event variants but kept as a result variant
93 /// for defense-in-depth.
94 #[allow(dead_code)] // defense-in-depth; serde_json::to_vec on owned types should not fail
95 Serialize(serde_json::Error),
96 /// Pending buffer is full and we couldn't drop the oldest entry
97 /// to make room (e.g. caller asked for no-overflow semantics).
98 /// Default `send_command` does drop-oldest + emit-failed, so
99 /// callers won't see this from the public API today.
100 #[allow(dead_code)] // reserved for stricter callers
101 BufferOverflow,
102 /// Caller's captured `host_session_id` no longer matches the
103 /// pipe's current session (a new host registered, displacing the
104 /// old one). The caller — typically a per-host fanout task — should
105 /// exit immediately so it doesn't write into the new host's writer.
106 /// (codex P1 PR #642 round 2.)
107 StaleSession,
108}
109
110impl std::fmt::Display for HostPipeError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 HostPipeError::HostNotConnected => write!(f, "host not connected"),
114 HostPipeError::WriteFailed(e) => write!(f, "write to host pipe failed: {}", e),
115 HostPipeError::Serialize(e) => write!(f, "frame serialize failed: {}", e),
116 HostPipeError::BufferOverflow => write!(f, "host pipe pending buffer overflow"),
117 HostPipeError::StaleSession => write!(f, "host pipe session stale (replaced by new host registration)"),
118 }
119 }
120}
121
122impl std::error::Error for HostPipeError {}
123
124/// Trait-object writer reference the HostPipe holds when connected.
125///
126/// `Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>` so the same
127/// physical write half is reachable both by HostPipe (event fanout +
128/// saga command dispatch) AND by the IPC server's per-connection
129/// handler (connection-private error replies — Event::Error on parse
130/// failures + register-first violations). The mutex serializes
131/// writes so frames can't interleave on the wire.
132///
133/// On set, HostPipe stores a clone of the same Arc the per-connection
134/// handler uses for its main loop. On clear, HostPipe drops its
135/// clone; the handler still has its own Arc so error replies (if
136/// any) keep working through the disconnect path.
137pub type SharedWriter = Arc<Mutex<BoxedWriter>>;
138
139/// Test-only convenience: own a Box directly. Tests construct one
140/// via `make_shared_writer(Box::new(duplex_a))`.
141pub type BoxedWriter = Box<dyn AsyncWrite + Unpin + Send>;
142
143/// Wrap a boxed writer into a SharedWriter. Used by tests +
144/// `ipc::server` to coerce a concrete WriteHalf into the trait
145/// object the HostPipe stores.
146///
147/// Implementation note: we go through an intermediate
148/// `Arc<Mutex<Box<dyn AsyncWrite + ...>>>` and then move the inner
149/// Box out at write time. This avoids relying on
150/// `Arc<Mutex<Concrete>>`-to-`Arc<Mutex<dyn>>` unsizing coercion,
151/// which is unstable across MSRVs for `tokio::sync::Mutex`.
152pub fn make_shared_writer(writer: BoxedWriter) -> SharedWriter {
153 Arc::new(Mutex::new(writer))
154}
155
156/// One frame waiting to be written to the host. Tracks the saga_id
157/// when known so we can emit a meaningful `SagaFailed` if the frame
158/// is dropped on overflow / timeout.
159#[derive(Debug)]
160struct PendingFrame {
161 frame: HostFrame,
162 /// `Some(id)` for Command frames whose Command variant carries
163 /// a saga_id. `None` for Event frames (the host is not the only
164 /// subscriber for events — they also flow to the renderer / Tool
165 /// clients via the broadcast bus, so a missed event-frame is not
166 /// a saga-level failure) and for Command frames whose variant
167 /// has no saga_id field yet (the schema additions land in CPD-1).
168 saga_id: Option<u64>,
169}
170
171struct HostPipeInner {
172 writer: Option<SharedWriter>,
173 pending_buffer: VecDeque<PendingFrame>,
174 /// Set when the writer transitions from Some -> None. Cleared on
175 /// reconnect. Used to enforce the 30s disconnect timeout.
176 host_disconnected_at: Option<Instant>,
177 /// Monotonic generation incremented on every `set_writer` call.
178 /// Per-host fanout tasks capture this at start and pass it back
179 /// into `send_event_for_session`, which no-ops if it doesn't
180 /// match the current generation. Prevents a stale fanout (from
181 /// an old host connection that hasn't fully torn down yet) from
182 /// writing into a freshly-registered host's writer.
183 /// (codex P1 PR #642 round 2.)
184 host_session_id: u64,
185}
186
187impl HostPipeInner {
188 fn new() -> Self {
189 Self {
190 writer: None,
191 pending_buffer: VecDeque::with_capacity(PENDING_BUFFER_CAP),
192 host_disconnected_at: None,
193 host_session_id: 0,
194 }
195 }
196}
197
198/// Public wrapper around the launcher → host pipe.
199///
200/// Cheap to clone (one `Arc`); shared by the IPC server's per-
201/// connection task (set/clear writer) and the saga coordinator
202/// (send_command, CPD-3) and the per-host fanout task (send_event,
203/// refactored in this PR).
204///
205/// Manual `Debug` (rather than derive) so we don't have to require
206/// `Debug` on the boxed writer trait object.
207#[derive(Clone)]
208pub struct HostPipe {
209 inner: Arc<Mutex<HostPipeInner>>,
210 /// Broadcast bus reference — used to emit `Event::SagaFailed`
211 /// when a buffered Command is dropped (overflow or 30s timeout).
212 events_tx: tokio::sync::broadcast::Sender<Event>,
213 /// Reference to the launcher's reducer state for `bump_version`
214 /// when emitting saga lifecycle events. Mirrors the saga
215 /// coordinator's pattern so emitted events get a fresh,
216 /// monotonic global version.
217 state: Arc<Mutex<crate::state::State>>,
218}
219
220impl std::fmt::Debug for HostPipe {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("HostPipe").finish_non_exhaustive()
223 }
224}
225
226impl HostPipe {
227 pub fn new(
228 events_tx: tokio::sync::broadcast::Sender<Event>,
229 state: Arc<Mutex<crate::state::State>>,
230 ) -> Self {
231 Self {
232 inner: Arc::new(Mutex::new(HostPipeInner::new())),
233 events_tx,
234 state,
235 }
236 }
237
238 /// Hand the host's writer half to the pipe. Called by the IPC
239 /// server's per-connection task once the connecting client
240 /// registers as `ClientKind::Host`. Drains any pending frames
241 /// in FIFO order before returning.
242 ///
243 /// If a writer is already set (a prior host registered and this
244 /// is a second), the prior writer is replaced. The drain logic
245 /// still runs against the new writer.
246 ///
247 /// **Locking:** the inner mutex is held across the per-frame
248 /// `write_all().await`. That's intentional — concurrent
249 /// `send_frame` calls during drain would race FIFO ordering
250 /// against the freshly-installed writer. Holding the lock keeps
251 /// the drain-then-install transition atomic from the caller's
252 /// perspective. Hold time is bounded by `pending_buffer` size
253 /// (cap 64) × per-frame write latency (microseconds on a healthy
254 /// pipe), well under the existing `state.lock()` discipline.
255 /// Returns the new `host_session_id`. The caller (IPC server's
256 /// host-connection setup) MUST capture this and pass it to the
257 /// per-connection fanout task; the fanout task uses it as a
258 /// stale-writer guard via `send_event_for_session`.
259 pub async fn set_writer(&self, writer: SharedWriter) -> u64 {
260 let mut inner = self.inner.lock().await;
261 inner.host_disconnected_at = None;
262 inner.host_session_id = inner.host_session_id.wrapping_add(1);
263 let session_id = inner.host_session_id;
264 // Drain pending frames against the new writer in FIFO order.
265 while let Some(p) = inner.pending_buffer.pop_front() {
266 let mut wlock = writer.lock().await;
267 let res = write_frame_async(&mut **wlock, &p.frame).await;
268 drop(wlock);
269 if let Err(e) = res {
270 crate::log(&format!(
271 "[host_pipe] drain failed mid-flight: {} — re-buffering rest",
272 e
273 ));
274 inner.pending_buffer.push_front(p);
275 inner.host_disconnected_at = Some(Instant::now());
276 return session_id;
277 }
278 }
279 inner.writer = Some(writer);
280 session_id
281 }
282
283 /// Mark the host as disconnected. Arms the 30s timer; subsequent
284 /// `send_command` calls buffer up to `PENDING_BUFFER_CAP`.
285 pub async fn clear_writer(&self) {
286 let mut inner = self.inner.lock().await;
287 if inner.writer.is_some() {
288 inner.writer = None;
289 inner.host_disconnected_at = Some(Instant::now());
290 crate::log("[host_pipe] host disconnected — buffering subsequent frames (30s budget)");
291 }
292 }
293
294 /// Remove every buffered frame whose `saga_id` matches `saga_id`.
295 /// Returns the count of removed frames.
296 ///
297 /// Called from `SagaCoordinator::claim_terminal` when a saga
298 /// terminates (Done / Failed / timeout / eviction / etc.) — the
299 /// terminal event closes the saga bracket, so any frames the
300 /// saga left in the pending buffer would, if drained on reconnect,
301 /// produce orphan side effects after the bracket is already
302 /// closed (and risk a second `SagaFailed` via the 30s expiry
303 /// path's `emit_drop_failure`). Purging here keeps the saga's
304 /// terminal slot atomic across both the bus and the wire.
305 /// (codex + reagent P1 PR #644 round 7.)
306 pub async fn cancel_saga(&self, saga_id: u64) -> usize {
307 let mut inner = self.inner.lock().await;
308 let before = inner.pending_buffer.len();
309 inner.pending_buffer.retain(|f| f.saga_id != Some(saga_id));
310 let removed = before - inner.pending_buffer.len();
311 if removed > 0 {
312 crate::log(&format!(
313 "[host_pipe] cancel_saga(saga_id={}) purged {} buffered frame(s)",
314 saga_id, removed
315 ));
316 }
317 removed
318 }
319
320 /// Drain `pending_buffer` through the given writer in FIFO order.
321 /// Used by `send_frame`'s ptr_eq-mismatch path to flush a frame
322 /// that landed in the buffer after a writer-replacement race
323 /// (codex P1 PR #642 round 4). Symmetric with `set_writer`'s
324 /// drain logic but operates on an existing writer rather than
325 /// installing a new one. On per-frame failure, re-buffers the
326 /// remaining frames at the front and arms the disconnect timer.
327 async fn drain_pending_to(&self, writer: &SharedWriter) {
328 let mut inner = self.inner.lock().await;
329 while let Some(p) = inner.pending_buffer.pop_front() {
330 let mut wlock = writer.lock().await;
331 let res = write_frame_async(&mut **wlock, &p.frame).await;
332 drop(wlock);
333 if let Err(e) = res {
334 crate::log(&format!(
335 "[host_pipe] mid-flight drain failed: {} — re-buffering rest",
336 e
337 ));
338 inner.pending_buffer.push_front(p);
339 // Best-effort: clear the writer slot if it's still ours
340 // (caller is expected to have already raced past clear)
341 // and arm the disconnect timer so the next reconnect
342 // resumes the drain.
343 let same_writer = inner
344 .writer
345 .as_ref()
346 .is_some_and(|cur| Arc::ptr_eq(cur, writer));
347 if same_writer {
348 inner.writer = None;
349 inner.host_disconnected_at = Some(Instant::now());
350 }
351 return;
352 }
353 }
354 }
355
356 /// Push a Command down to the host. On disconnect, buffers up to
357 /// `PENDING_BUFFER_CAP`; on overflow or 30s timeout, drops + emits
358 /// `Event::SagaFailed` for the dropped Command's saga.
359 ///
360 /// CPD-3 — called from the saga coordinator's `apply_action` when
361 /// dispatching `IssueCmd::Host` actions.
362 pub async fn send_command(&self, cmd: &Command) -> Result<(), HostPipeError> {
363 let saga_id = saga_id_of(cmd);
364 let frame = HostFrame::Command(cmd.clone());
365 self.send_frame(frame, saga_id).await
366 }
367
368 /// Push an Event down to the host. Called by the per-connection
369 /// fanout task in `ipc::server` for the host's connection (replaces
370 /// the prior direct `send_event` call). Other client kinds keep
371 /// the direct path because they're not subject to HostPipe's
372 /// pending-buffer / reconnect semantics.
373 ///
374 /// `session_id` is the value captured from `set_writer` when the
375 /// host registered. If the current writer's session_id no longer
376 /// matches (because a new host registered and replaced the
377 /// writer), this returns `Err(HostPipeError::StaleSession)` so the
378 /// fanout task exits cleanly without writing to the new host.
379 /// (codex P1 PR #642 round 2.)
380 ///
381 /// **Wire format:** writes RAW `Event` JSON (no `HostFrame` envelope).
382 /// The host's existing parser (`agentmux-cef/src/launcher_ipc.rs`)
383 /// expects raw Event lines; CPD-3 will update host to handle
384 /// `HostFrame` and flip events to envelope form. Until then events
385 /// stay raw to preserve compat. Commands (sent via `send_command`)
386 /// already use `HostFrame::Command` because they're new and not
387 /// yet read by host. (codex P1 PR #642 round 3.)
388 ///
389 /// **Atomicity:** session check + writer fetch happen under a
390 /// single inner-lock acquisition to close the session-bypass
391 /// TOCTOU window. (codex P1 + reagent P1 PR #642 round 3.)
392 ///
393 /// **No pending buffer for events.** If the host is disconnected,
394 /// events are dropped silently. They're broadcast-bus-driven —
395 /// stale events post-reconnect would be incorrect anyway, and the
396 /// renderer/srv subscribers still get them. The pending buffer is
397 /// reserved for `send_command` (saga-issued, duty-cycle critical).
398 pub async fn send_event_for_session(
399 &self,
400 session_id: u64,
401 event: &Event,
402 ) -> Result<(), HostPipeError> {
403 // Atomic: hold inner lock for the entire session-check +
404 // writer-clone window so a concurrent set_writer can't slip
405 // a fresh writer in between.
406 let writer = {
407 let inner = self.inner.lock().await;
408 if inner.host_session_id != session_id {
409 return Err(HostPipeError::StaleSession);
410 }
411 match inner.writer.as_ref() {
412 Some(w) => Arc::clone(w),
413 None => return Ok(()), // disconnected; events drop (not buffered)
414 }
415 };
416
417 // Serialize raw Event (NOT HostFrame::Event — see fn doc).
418 let mut bytes =
419 serde_json::to_vec(event).map_err(HostPipeError::Serialize)?;
420 bytes.push(b'\n');
421
422 let mut wlock = writer.lock().await;
423 wlock
424 .write_all(&bytes)
425 .await
426 .map_err(HostPipeError::WriteFailed)?;
427 wlock
428 .flush()
429 .await
430 .map_err(HostPipeError::WriteFailed)?;
431 Ok(())
432 }
433
434 /// Send an event without a session check — only safe in tests
435 /// or single-session contexts. Production fanout MUST use
436 /// `send_event_for_session` so a stale fanout from a previous
437 /// host connection cannot write into a fresh host's writer.
438 /// Mirrors `send_event_for_session`'s raw-Event wire format
439 /// (no HostFrame wrap).
440 #[cfg(test)]
441 pub async fn send_event(&self, event: &Event) -> Result<(), HostPipeError> {
442 let writer = {
443 let inner = self.inner.lock().await;
444 match inner.writer.as_ref() {
445 Some(w) => Arc::clone(w),
446 None => return Ok(()),
447 }
448 };
449 let mut bytes =
450 serde_json::to_vec(event).map_err(HostPipeError::Serialize)?;
451 bytes.push(b'\n');
452 let mut wlock = writer.lock().await;
453 wlock
454 .write_all(&bytes)
455 .await
456 .map_err(HostPipeError::WriteFailed)?;
457 wlock
458 .flush()
459 .await
460 .map_err(HostPipeError::WriteFailed)?;
461 Ok(())
462 }
463
464 async fn send_frame(
465 &self,
466 frame: HostFrame,
467 saga_id: Option<u64>,
468 ) -> Result<(), HostPipeError> {
469 // First, check the disconnect timer. If we've been disconnected
470 // > 30s, drain the pending buffer + emit failures BEFORE we
471 // queue a fresh frame on top.
472 self.expire_pending_if_timed_out().await;
473
474 // Atomic decision: under the inner lock, either pull a writer
475 // clone (then release lock and write) OR push to pending buffer
476 // (lock held to keep the buffer-vs-writer state consistent).
477 // Splitting this into two lock acquisitions creates the TOCTOU
478 // window where set_writer can land between "see None" and
479 // "push to buffer", causing frames to wedge in the buffer
480 // while the pipe is actually connected. (codex P1 + reagent P1
481 // PR #642 round 3.)
482 enum Decision {
483 Write(SharedWriter),
484 Buffered { dropped: Option<PendingFrame> },
485 }
486 let decision = {
487 let mut inner = self.inner.lock().await;
488 match inner.writer.as_ref() {
489 Some(w) => Decision::Write(Arc::clone(w)),
490 None => {
491 let dropped = if inner.pending_buffer.len() >= PENDING_BUFFER_CAP {
492 inner.pending_buffer.pop_front()
493 } else {
494 None
495 };
496 inner
497 .pending_buffer
498 .push_back(PendingFrame { frame: frame.clone(), saga_id });
499 Decision::Buffered { dropped }
500 }
501 }
502 };
503
504 match decision {
505 Decision::Write(writer) => {
506 let mut wlock = writer.lock().await;
507 match write_frame_async(&mut **wlock, &frame).await {
508 Ok(()) => Ok(()),
509 Err(e) => {
510 drop(wlock);
511 crate::log(&format!(
512 "[host_pipe] direct write failed: {} — clearing writer + buffering",
513 e
514 ));
515 // Arc::ptr_eq guards against clobbering a freshly-
516 // installed writer if set_writer raced in between
517 // the failed write and this clear. (reagent P1
518 // PR #642 round 3.)
519 let mut inner = self.inner.lock().await;
520 let same_writer = inner
521 .writer
522 .as_ref()
523 .is_some_and(|cur| Arc::ptr_eq(cur, &writer));
524 if same_writer {
525 inner.writer = None;
526 inner.host_disconnected_at = Some(Instant::now());
527 inner
528 .pending_buffer
529 .push_back(PendingFrame { frame, saga_id });
530 Err(HostPipeError::WriteFailed(e))
531 } else {
532 // Writer was concurrently replaced (set_writer
533 // raced and already drained whatever buffer
534 // existed at install time). Push to the new
535 // writer's buffer + immediately retry through
536 // the new writer; otherwise this frame sits
537 // until the next reconnect, even though the
538 // pipe is healthy. (codex P1 PR #642 round 4.)
539 let new_writer = inner.writer.as_ref().map(Arc::clone);
540 // Push the failed frame to buffer FIRST so
541 // the drain below picks it up in FIFO order
542 // alongside any other in-flight pending.
543 inner
544 .pending_buffer
545 .push_back(PendingFrame { frame, saga_id });
546 drop(inner);
547 if let Some(new_w) = new_writer {
548 self.drain_pending_to(&new_w).await;
549 }
550 // We still surface the original write failure
551 // to the caller — they asked for THIS frame
552 // to be sent on the pipe they had at call
553 // time. The drain attempt is best-effort
554 // recovery; saga-level retry should kick in
555 // if the drain fails too.
556 Err(HostPipeError::WriteFailed(e))
557 }
558 }
559 }
560 }
561 Decision::Buffered { dropped } => {
562 if let Some(dropped) = dropped {
563 self.emit_drop_failure(dropped, "host pipe backpressure overflow")
564 .await;
565 }
566 Ok(())
567 }
568 }
569 }
570
571 /// If the disconnect timer has elapsed, flush the pending buffer
572 /// and emit `Event::SagaFailed` for every buffered Command.
573 async fn expire_pending_if_timed_out(&self) {
574 let now = Instant::now();
575 let drained: Vec<PendingFrame> = {
576 let mut inner = self.inner.lock().await;
577 let Some(start) = inner.host_disconnected_at else {
578 return;
579 };
580 if now.duration_since(start) < DISCONNECT_TIMEOUT {
581 return;
582 }
583 // Reset the timer so we don't re-drain on the next call;
584 // the next disconnect transition rearms it.
585 inner.host_disconnected_at = None;
586 inner.pending_buffer.drain(..).collect()
587 };
588 if !drained.is_empty() {
589 crate::log(&format!(
590 "[host_pipe] disconnect exceeded {:?}; dropping {} buffered frames",
591 DISCONNECT_TIMEOUT,
592 drained.len()
593 ));
594 }
595 for f in drained {
596 self.emit_drop_failure(f, "host unreachable").await;
597 }
598 }
599
600 async fn emit_drop_failure(&self, dropped: PendingFrame, reason: &str) {
601 let Some(saga_id) = dropped.saga_id else {
602 // Event frames + Command frames without a saga_id are
603 // silently dropped — they don't gate any saga's progress.
604 return;
605 };
606 let v = {
607 let mut state = self.state.lock().await;
608 state.bump_version()
609 };
610 let evt = Event::SagaFailed {
611 saga_id,
612 reason: reason.to_string(),
613 version: v,
614 };
615 let _ = self.events_tx.send(evt);
616 }
617
618 /// Test-only inspection of the current pending buffer length.
619 #[cfg(test)]
620 pub async fn pending_len(&self) -> usize {
621 self.inner.lock().await.pending_buffer.len()
622 }
623
624 /// Test-only inspection of whether a writer is currently set.
625 #[cfg(test)]
626 pub async fn is_connected(&self) -> bool {
627 self.inner.lock().await.writer.is_some()
628 }
629
630 /// Test-only: force the disconnect timer back by `delta` so the
631 /// 30s timeout fires without sleeping in tests.
632 #[cfg(test)]
633 pub async fn rewind_disconnect_timer(&self, delta: Duration) {
634 let mut inner = self.inner.lock().await;
635 if let Some(t) = inner.host_disconnected_at.as_mut() {
636 *t = t.checked_sub(delta).unwrap_or(*t);
637 }
638 }
639}
640
641/// Pull the saga_id off a Command if its variant carries one.
642///
643/// Used by `send_frame`'s pending-buffer machinery: when a buffered
644/// command is dropped (overflow or 30s host-disconnect timeout),
645/// `emit_drop_failure` emits `Event::SagaFailed { saga_id }` so the
646/// saga's bracket closes cleanly instead of waiting forever.
647/// CPD-1 added `saga_id` fields on host-bound variants; CPD-3 made
648/// `send_command` live from the saga coordinator, so the drop
649/// machinery is now saga-correctness-relevant.
650fn saga_id_of(cmd: &Command) -> Option<u64> {
651 // CPD-1 added saga_id fields to host-bound Commands; CPD-3 made
652 // send_command live from the saga coordinator. Returning the real
653 // id here means HostPipe's drop-on-overflow + 30s-timeout-flush
654 // paths can emit `Event::SagaFailed` for the right saga, instead
655 // of orphaning the buffered command. (reagent P1 PR #644 round 5.)
656 match cmd {
657 Command::SpawnPoolWindow { saga_id } => Some(*saga_id),
658 Command::ReapPanes { saga_id, .. } => Some(*saga_id),
659 Command::DrainPoolIfLast { saga_id, .. } => Some(*saga_id),
660 _ => None,
661 }
662}
663
664/// Serialize a `HostFrame` as newline-delimited JSON and write it
665/// to a writer. Used both for direct writes (connected) and for
666/// pending-buffer drain (post-reconnect).
667async fn write_frame_async<W: AsyncWrite + Unpin + ?Sized>(
668 writer: &mut W,
669 frame: &HostFrame,
670) -> io::Result<()> {
671 let mut buf = serde_json::to_vec(frame)
672 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
673 buf.push(b'\n');
674 writer.write_all(&buf).await?;
675 writer.flush().await?;
676 Ok(())
677}