agentmux_launcher\saga/
mod.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.1a — saga coordinator infrastructure.
5// Phase F.5 — first concrete saga consumer (`pool_respawn`).
6//
7// A saga is a state machine that orchestrates a multi-step,
8// multi-reducer flow (e.g. tear-off touches host pool + srv
9// workspace + launcher window registration). Sagas exist where
10// per-flow correctness needs explicit coordination — distributed
11// subscriber callbacks aren't enough.
12//
13// E.1a shipped this module as framework-only (no real sagas).
14// **F.5 lights up the first concrete saga (`pool_respawn`) plus the
15// minimal coordinator wiring needed to drive it**: the bus-
16// subscription loop now starts sagas in response to trigger events,
17// routes subsequent bus events into in-flight sagas via
18// `Saga::on_event`, dispatches `SagaAction` results, and emits
19// `SagaStarted` / `SagaCompleted` / `SagaFailed` brackets.
20//
21// Design (per `docs/specs/SPEC_PHASE_E_SRV_REDUCER_2026_04_29.md` §7
22// + `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.1):
23//
24//   trait Saga {
25//       fn start(&mut self, ctx: &SagaCtx) -> SagaAction;
26//       fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction;
27//       fn name(&self) -> &'static str;
28//   }
29//
30//   enum SagaAction {
31//       IssueCmd { target: PipeTarget, cmd: Command },
32//       Done,
33//       Failed { reason: String },
34//       Wait,
35//   }
36//
37// The `SagaCoordinator` task subscribes to the broadcast bus,
38// routes events to in-flight sagas, dispatches IssueCmd actions to
39// the appropriate pipe, and emits `SagaStarted` / `SagaCompleted`
40// / `SagaFailed` so subscribers (renderer) can buffer-until-complete.
41//
42// **Cross-process dispatch — CPD-3 LIVE.** The launcher → host
43// command pipe is now wired via `HostPipe::send_command()` (CPD-2
44// + CPD-3). F.5's `pool_respawn` and F.6's `window_cleanup_cascade`
45// sagas issue `IssueCmd::Host` actions; the coordinator dispatches
46// them through the wire instead of merely logging. Per-saga
47// timeouts (`Saga::timeout()`, default 5s, F.6 overrides 30s)
48// + per-saga deadline tasks fire `SagaFailed` if a host action
49// stays unresolved past its budget. Host-emitted
50// `Command::ReportSagaActionFailed` translates into
51// `Event::SagaActionFailed` and terminates the matching saga.
52//
53// Per-variant `saga_id` tagging on existing Command/Event variants
54// is deferred; F.5's coordinator doesn't yet need it because:
55//   - only one `pool_respawn` saga can be in flight per promote
56//     (sagas are dispatched on `PoolWindowPromoted` 1:1);
57//   - `Event::PoolWindowAdded` from the *implicit* refill is the
58//     unique terminal signal — no foreign event type collides.
59// When concurrent promotes can happen (and produce overlapping
60// `PoolWindowAdded` streams), a future PR adds saga_id correlation.
61//
62// Saga state is in-memory. Launcher restart abandons in-flight
63// sagas; renderer-side timeouts cover the visible consequence.
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use agentmux_common::ipc::{Command, Event};
69
70use crate::host_pipe::HostPipe;
71
72pub mod pool_respawn;
73pub mod window_cleanup;
74
75#[cfg(test)]
76mod integration_tests;
77
78// LSD-1 (PR LSD-1) — durable launcher saga log + API. Foundations
79// only: the coordinator does NOT call any of these methods yet.
80// Module is declared here so it compiles + tests run; PR LSD-2 wires
81// the coordinator to write through `LauncherSagaLog` on every state
82// transition. See `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md`
83// §4 PR1 for the staged-rollout rationale.
84pub(crate) mod log;
85pub use log::LauncherSagaLog;
86use log::SagaOutcome;
87
88// LSD-3 — startup recovery walker for unresolved launcher sagas.
89// `main.rs::run_windows` calls `compensate_unresolved_launcher_sagas`
90// after opening the durable saga log and BEFORE spawning the saga
91// coordinator. See `saga/recovery.rs` for design notes.
92mod recovery;
93pub use recovery::compensate_unresolved_launcher_sagas;
94
95/// Where a `SagaAction::IssueCmd` should be dispatched.
96///
97/// `LauncherSelf` means "feed this command to the launcher's own
98/// reducer" (in-process); `Host` and `Srv` mean "forward to the
99/// peer's pipe."
100///
101/// **CPD-3 status:** `Host` is now LIVE — the saga coordinator's
102/// `apply_action` dispatches `IssueCmd::Host` actions through
103/// `HostPipe::send_command()` over the launcher → host wire.
104/// `LauncherSelf` and `Srv` remain reserved for class-D/E sagas.
105///
106/// F.7 cleanup audit: only `Host` is constructed today (F.5/F.6 saga
107/// IssueCmds). `LauncherSelf` and `Srv` are framework slots reserved
108/// for the cross-process dispatch follow-up phase per
109/// `SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §4.3. Allow stays.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111#[allow(dead_code)] // LauncherSelf + Srv reserved for future cross-process sagas
112pub enum PipeTarget {
113    LauncherSelf,
114    Host,
115    Srv,
116}
117
118/// What a saga decides to do next, returned from `start` /
119/// `on_event`. The coordinator drives the saga forward by reacting
120/// to this enum.
121///
122/// F.7 cleanup audit: `IssueCmd`, `Done`, `Wait` are all constructed
123/// by F.5 + F.6 sagas. `Failed` is consumed in `apply_action` (and
124/// its corresponding `emit_failed` is now actively called by the
125/// evict-and-replace path), but no shipped saga *constructs*
126/// `Failed` yet — sagas today only succeed or wait. Variant-level
127/// allow scopes the dead-code suppression precisely to that one
128/// reserved variant.
129#[derive(Debug)]
130pub enum SagaAction {
131    /// Dispatch a command on the target pipe; saga remains in flight.
132    IssueCmd {
133        target: PipeTarget,
134        cmd: Command,
135    },
136    /// Saga succeeded. Coordinator removes it from `in_flight`
137    /// and emits `Event::SagaCompleted`.
138    Done,
139    /// Saga failed irrecoverably. Coordinator emits `Event::SagaFailed`
140    /// after dispatching any compensation IssueCmds the saga issued
141    /// before returning Failed. Reserved for sagas with explicit
142    /// failure conditions (e.g. cross-process dispatch follow-up
143    /// + saga timeouts).
144    #[allow(dead_code)] // reserved for sagas that explicitly fail; F.5/F.6 only succeed or wait
145    Failed { reason: String },
146    /// Saga is waiting for an event it hasn't seen yet. No-op for
147    /// the coordinator until the next bus event.
148    Wait,
149}
150
151/// Read-only context passed to saga callbacks. Currently carries
152/// only the saga_id; kept as a struct (rather than a bare u64) so
153/// future fields can be added without touching every `Saga` impl.
154///
155/// F.7 cleanup audit: `saga_id` is unread by the shipped sagas
156/// (F.5/F.6 don't need it — coordinator already routes events
157/// per-registry). Reserved for the per-event saga_id correlation
158/// follow-up; allow stays.
159#[allow(dead_code)] // saga_id reserved for per-event correlation follow-up
160#[derive(Debug, Clone, Copy)]
161pub struct SagaCtx {
162    pub saga_id: u64,
163}
164
165/// A multi-step, multi-reducer state machine. Implementations
166/// describe one logical operation (tear-off, pool-respawn, etc.).
167///
168/// Lifecycle: coordinator calls `start` once when the saga is
169/// added to `in_flight`. After that, every event on the bus is
170/// passed to `on_event`. The saga inspects the event, advances
171/// its internal state, returns the next action.
172///
173/// **Identification:** sagas know which events belong to them by
174/// inspecting saga_id in lifecycle events or by matching patterns
175/// in event payloads (e.g. specific labels). F.5 sagas correlate
176/// by event type only (`pool_respawn` matches any
177/// `PoolWindowAdded` after start). Per-variant `saga_id` tagging
178/// on commands/events is deferred until concurrent same-type
179/// sagas are needed.
180pub trait Saga: Send {
181    fn start(&mut self, ctx: &SagaCtx) -> SagaAction;
182    fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction;
183    fn name(&self) -> &'static str;
184    /// LSD-2 — saga's input arguments serialized for the durable log's
185    /// `input_json` column. The coordinator calls this once at
186    /// `spawn_saga` time (before `start`) and writes the result via
187    /// `LauncherSagaLog::start_saga`. Operators see this in
188    /// `--diag sagas` output (e.g. `{"closed_label":"win-3"}`) so they
189    /// can tell which window's cleanup a recovered-failed saga
190    /// belonged to.
191    ///
192    /// Default `Value::Null` — sagas with no input fields can ignore
193    /// it. Concrete sagas should override with a `serde_json::json!`
194    /// of their constructor args.
195    fn input_snapshot(&self) -> serde_json::Value {
196        serde_json::Value::Null
197    }
198
199    /// CPD-3 — per-saga deadline budget for completing all
200    /// `IssueCmd`+wait cycles. Coordinator arms a timer when the saga
201    /// registers; if the saga is still in_flight when `timeout()`
202    /// elapses, it is force-failed (`SagaFailed { reason: "saga
203    /// timeout" }`) and removed from the registry.
204    ///
205    /// Default 5s — fits class-C single-step host dispatch (e.g.
206    /// pool respawn). `WindowCleanupCascade` overrides to 30s
207    /// because pane drain on a workspace with many panes can
208    /// legitimately take that long. Per spec §3.10.
209    fn timeout(&self) -> Duration {
210        Duration::from_secs(5)
211    }
212}
213
214/// Saga coordinator task.
215///
216/// Owns the registry of in-flight sagas, allocates saga ids,
217/// routes events to sagas, dispatches IssueCmd actions to the
218/// appropriate pipes, and emits lifecycle events on the broadcast
219/// bus.
220///
221/// F.5 adds the first real consumer (`pool_respawn`). The
222/// `in_flight` registry is now actually populated; the bus loop
223/// dispatches events into it.
224/// LSD-2 — coordinator's per-saga book-keeping. Wraps the boxed saga
225/// with the durable-log step bookkeeping the coordinator needs to call
226/// `LauncherSagaLog::finish_step` when the awaited bus event lands.
227///
228/// `awaiting_step` is `Some(idx)` when the saga is parked on a
229/// `Wait`-then-event pivot (the most recent `IssueCmd` allocated step
230/// `idx`); `None` between dispatches and after termination. Tracking
231/// it on the in-flight record (rather than inside each saga impl) keeps
232/// the durability concern out of saga authors' hands — they only deal
233/// with `SagaAction`.
234///
235/// `next_step_index` is the monotonic counter the coordinator
236/// `fetch_add(1)`s on every `IssueCmd` for this saga. Mirrors srv's
237/// `SagaCtx::step_index`. Lives per-saga (not coordinator-global) so
238/// concurrent sagas don't interleave indices in the log.
239struct InFlightSaga {
240    saga: Box<dyn Saga>,
241    awaiting_step: Option<u32>,
242    next_step_index: u32,
243}
244
245pub struct SagaCoordinator {
246    /// Monotonic saga-id allocator.
247    next_saga_id: std::sync::atomic::AtomicU64,
248    /// In-flight sagas keyed by saga_id, each wrapped with the
249    /// durable-log bookkeeping (LSD-2: `awaiting_step` +
250    /// `next_step_index`).
251    in_flight: tokio::sync::Mutex<std::collections::HashMap<u64, InFlightSaga>>,
252    /// Reference to the broadcast bus so the coordinator can emit
253    /// `SagaStarted` / `SagaCompleted` / `SagaFailed`.
254    events_tx: tokio::sync::broadcast::Sender<Event>,
255    /// Reference to the launcher's reducer state for `bump_version`
256    /// when emitting saga lifecycle events.
257    state: Arc<tokio::sync::Mutex<crate::state::State>>,
258    /// LSD-2 — durable saga log. `None` in tests that don't exercise
259    /// the durability path (the saga then logs + remains in flight,
260    /// preserving the pre-LSD-2 behavior tests rely on for end-to-end
261    /// bracket assertions). When `Some`, every saga lifecycle
262    /// transition (`spawn_saga` → `start_saga`, `IssueCmd` →
263    /// `start_step`, awaited-event consumed → `finish_step`,
264    /// `Done` → `terminate_saga(Completed)`, `Failed` / evicted →
265    /// `terminate_saga(Failed)`) writes to this log.
266    log: Option<Arc<LauncherSagaLog>>,
267    /// CPD-3 — launcher → host pipe wrapper. `apply_action` for
268    /// `IssueCmd::Host` dispatches via `host_pipe.send_command()` when
269    /// installed. `None` in tests that don't exercise host dispatch
270    /// (those drive sagas via synthetic terminal events on the bus).
271    host_pipe: Option<Arc<HostPipe>>,
272}
273
274impl SagaCoordinator {
275    pub fn new(
276        events_tx: tokio::sync::broadcast::Sender<Event>,
277        state: Arc<tokio::sync::Mutex<crate::state::State>>,
278    ) -> Self {
279        Self {
280            next_saga_id: std::sync::atomic::AtomicU64::new(1),
281            in_flight: tokio::sync::Mutex::new(std::collections::HashMap::new()),
282            events_tx,
283            state,
284            log: None,
285            host_pipe: None,
286        }
287    }
288
289    /// CPD-3 — install the host pipe so `IssueCmd::Host` actions are
290    /// dispatched live (instead of log-only). Builder-style setter so
291    /// existing tests + the `next_id_is_monotonic` smoke don't have to
292    /// construct a HostPipe. Production wiring (`main.rs`) calls this
293    /// once before `run_coordinator` is spawned.
294    pub fn with_host_pipe(mut self, host_pipe: Arc<HostPipe>) -> Self {
295        self.host_pipe = Some(host_pipe);
296        self
297    }
298
299    /// LSD-2 — install the durable saga log so saga lifecycle
300    /// transitions are persisted to `<data-dir>/db/launcher-sagas.db`.
301    /// Builder-style setter rather than a constructor parameter so
302    /// existing tests + the `next_id_is_monotonic` smoke don't have
303    /// to construct an in-memory log. Production wiring (in
304    /// `main.rs`) calls this once before `run_coordinator` is spawned.
305    pub fn with_log(mut self, log: Arc<LauncherSagaLog>) -> Result<Self, crate::saga::log::LogError> {
306        // Seed the saga_id allocator from the highest persisted id
307        // so a launcher restart cannot reuse an id that already
308        // exists in `launcher_saga`. Reusing would (a) fail new
309        // INSERTs on duplicate-PK, and (b) silently mutate prior
310        // saga rows via `terminate_saga` / `finish_step` UPDATEs
311        // keyed by saga_id — corrupting saga history + recovery
312        // diagnostics. (codex P1 PR #645 round 1.)
313        //
314        // On `max_saga_id()` error: return Err so caller (main.rs)
315        // can fail launcher startup loudly. Continuing with a default
316        // next_saga_id=1 while the log is still attached would leave
317        // the coordinator in the exact failure mode this seed is
318        // meant to prevent. (codex P1 PR #645 round 2.)
319        let max = log.max_saga_id()?;
320        self.next_saga_id
321            .store(max + 1, std::sync::atomic::Ordering::Relaxed);
322        if max > 0 {
323            crate::log(&format!(
324                "[saga] seeded next_saga_id={} from launcher_saga.max(saga_id)={}",
325                max + 1,
326                max
327            ));
328        }
329        self.log = Some(log);
330        Ok(self)
331    }
332
333    /// Allocate the next saga_id. Monotonic per launcher run.
334    pub fn next_id(&self) -> u64 {
335        self.next_saga_id
336            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
337    }
338
339    /// Bump the launcher's `event_version` for a coordinator-emitted
340    /// lifecycle event. Brief mutex hold — no I/O between lock and
341    /// drop.
342    async fn next_event_version(&self) -> u64 {
343        let mut state = self.state.lock().await;
344        state.bump_version()
345    }
346
347    /// Emit `Event::SagaStarted` after registering a saga.
348    async fn emit_started(&self, saga_id: u64, name: &'static str) {
349        let v = self.next_event_version().await;
350        let _ = self.events_tx.send(Event::SagaStarted {
351            saga_id,
352            name: name.to_string(),
353            version: v,
354        });
355    }
356
357    /// Emit `Event::SagaCompleted` after a saga returns `Done`.
358    async fn emit_completed(&self, saga_id: u64) {
359        let v = self.next_event_version().await;
360        let _ = self.events_tx.send(Event::SagaCompleted {
361            saga_id,
362            version: v,
363        });
364    }
365
366    /// Emit `Event::SagaFailed` after a saga returns `Failed` or
367    /// after evict-and-replace cancels a same-kind in-flight saga.
368    /// F.7 cleanup audit: prior `#[allow(dead_code)]` removed — F.6
369    /// evict-and-replace policy now actively dispatches this on
370    /// every concurrent same-kind retrigger.
371    async fn emit_failed(&self, saga_id: u64, reason: String) {
372        let v = self.next_event_version().await;
373        let _ = self.events_tx.send(Event::SagaFailed {
374            saga_id,
375            reason,
376            version: v,
377        });
378    }
379
380    /// Apply a `SagaAction` returned by `start` or `on_event`. Returns
381    /// an `ApplyOutcome` carrying:
382    ///   - `in_flight`: true → caller keeps the saga in `in_flight`,
383    ///     false → caller removes it.
384    ///   - `awaiting_step`: `Some(idx)` if the action was an
385    ///     `IssueCmd` that allocated step `idx` and the saga is now
386    ///     parked waiting for the echo event. Caller stores this on
387    ///     the InFlightSaga so the next non-`Wait` `on_event` return
388    ///     can call `LauncherSagaLog::finish_step(saga_id, idx, ev)`.
389    ///
390    /// CPD-3 — `IssueCmd::Host` dispatches live through `HostPipe`
391    /// when one is installed. Tests without a host_pipe (the existing
392    /// F.5/F.6 unit + integration tests) fall back to log-only so
393    /// synthetic-terminal-event drivers continue to work.
394    /// `LauncherSelf` and `Srv` targets remain log-only — reserved
395    /// for class-D/E sagas, no consumer today.
396    ///
397    /// LSD-2 — every state transition writes to `LauncherSagaLog`
398    /// when one is installed: `IssueCmd` → `start_step`, `Done` →
399    /// `terminate_saga(Completed)`, `Failed` →
400    /// `terminate_saga(Failed)`.
401    ///
402    /// `step_index` is the caller-allocated index for THIS dispatch.
403    /// On the first dispatch in `spawn_saga`, the caller passes 0;
404    /// on subsequent dispatches from the event loop, the caller pulls
405    /// + bumps the per-InFlightSaga counter.
406    ///
407    /// Terminal paths (`Done`, `Failed`, host-send-error) all go
408    /// through `claim_terminal` so only one of {normal Done/Failed,
409    /// timeout-task, SagaActionFailed listener, host-send-error}
410    /// can win the terminal-event race for a given saga_id.
411    /// (reagent P1 PR #644 round 1 + 2.)
412    async fn apply_action(
413        &self,
414        saga_id: u64,
415        name: &'static str,
416        action: SagaAction,
417        step_index: u32,
418    ) -> ApplyOutcome {
419        match action {
420            SagaAction::IssueCmd { target, cmd } => {
421                // LSD-2 — write the durable `pending` step row BEFORE
422                // dispatch so a crash mid-dispatch leaves a recoverable
423                // breadcrumb (LSD-3's walker upgrades this saga to
424                // `failed_compensation` on next launcher startup).
425                let step_name = derive_step_name(&cmd, target);
426                if let Some(log) = self.log.as_ref() {
427                    if let Err(e) =
428                        log.start_step(saga_id, step_index, &step_name, target, &cmd)
429                    {
430                        crate::log(&format!(
431                            "[saga] saga_id={} name={} start_step log write failed: {} — continuing (in-memory path remains authoritative for this run)",
432                            saga_id, name, e
433                        ));
434                    }
435                }
436
437                match target {
438                    PipeTarget::Host => {
439                        // CPD-3 — dispatch live through the host pipe
440                        // when installed. Tests without a host_pipe
441                        // (most existing F.5/F.6 unit + integration
442                        // tests) fall back to log-only so synthetic-
443                        // terminal-event drivers continue to work.
444                        let Some(host_pipe) = self.host_pipe.as_ref() else {
445                            crate::log(&format!(
446                                "[saga] saga_id={} name={} IssueCmd target=Host cmd={:?} (no host_pipe installed — log-only)",
447                                saga_id, name, cmd
448                            ));
449                            return ApplyOutcome::in_flight_awaiting(step_index);
450                        };
451                        let cmd_with_id = inject_saga_id(cmd, saga_id);
452                        match host_pipe.send_command(&cmd_with_id).await {
453                            Ok(()) => {
454                                crate::log(&format!(
455                                    "[saga] saga_id={} name={} IssueCmd::Host dispatched cmd={:?}",
456                                    saga_id, name, cmd_with_id
457                                ));
458                                ApplyOutcome::in_flight_awaiting(step_index)
459                            }
460                            Err(e) => {
461                                // CRITICAL: do NOT terminate the saga
462                                // here. send_command's send_frame already
463                                // re-buffered the failed frame for retry
464                                // on reconnect (with the real saga_id).
465                                // If we ALSO emit SagaFailed now:
466                                //   - 30s later expire_pending_if_timed_out
467                                //     drains the buffer + emit_drop_failure
468                                //     emits a SECOND SagaFailed for the
469                                //     same saga_id (double-emit), OR
470                                //   - host reconnects in <30s and the
471                                //     buffered command executes as an
472                                //     orphaned side effect after the saga
473                                //     bracket is already closed.
474                                // (reagent P1 PR #644 round 6.)
475                                //
476                                // Round 7 design: keep the saga in_flight.
477                                // The retry path resolves it (Report*
478                                // arrives → on_event drives forward) OR
479                                // the saga's own timeout (5s default,
480                                // 30s for F.6) fires + claim_terminal'd
481                                // SagaFailed. Either way, exactly one
482                                // terminal event is emitted.
483                                crate::log(&format!(
484                                    "[saga] saga_id={} name={} host pipe send transient err: {} — frame buffered for retry; saga stays in-flight (round 7 fix)",
485                                    saga_id, name, e
486                                ));
487                                ApplyOutcome::in_flight_awaiting(step_index)
488                            }
489                        }
490                    }
491                    PipeTarget::LauncherSelf | PipeTarget::Srv => {
492                        // Reserved for class-D/E sagas; out of scope
493                        // per SPEC_CROSS_PROCESS_DISPATCH §3.6. Log-
494                        // only retains framework shape until a
495                        // consumer arrives.
496                        crate::log(&format!(
497                            "[saga] saga_id={} name={} IssueCmd target={:?} cmd={:?} (target not yet wired; log-only)",
498                            saga_id, name, target, cmd
499                        ));
500                        ApplyOutcome::in_flight_awaiting(step_index)
501                    }
502                }
503            }
504            SagaAction::Wait => ApplyOutcome::in_flight_no_change(),
505            SagaAction::Done => {
506                // Atomic claim: only one of {normal Done, timeout-task,
507                // SagaActionFailed listener, host-send-error} can win
508                // the terminal-event race. Without this guard a Done
509                // could fire SagaCompleted while the timeout task
510                // simultaneously fires SagaFailed, producing duplicate
511                // terminal events for the same saga (reagent P1 PR
512                // #644 round 1).
513                if !self.claim_terminal(saga_id).await {
514                    return ApplyOutcome::terminated();
515                }
516                crate::log(&format!(
517                    "[saga] saga_id={} name={} Done — emitting SagaCompleted",
518                    saga_id, name
519                ));
520                if let Some(log) = self.log.as_ref() {
521                    if let Err(e) = log.terminate_saga(saga_id, SagaOutcome::Completed) {
522                        crate::log(&format!(
523                            "[saga] saga_id={} terminate_saga(Completed) log write failed: {}",
524                            saga_id, e
525                        ));
526                    }
527                }
528                self.emit_completed(saga_id).await;
529                ApplyOutcome::terminated()
530            }
531            SagaAction::Failed { reason } => {
532                if !self.claim_terminal(saga_id).await {
533                    return ApplyOutcome::terminated();
534                }
535                crate::log(&format!(
536                    "[saga] saga_id={} name={} Failed reason={} — emitting SagaFailed",
537                    saga_id, name, reason
538                ));
539                if let Some(log) = self.log.as_ref() {
540                    if let Err(e) = log.terminate_saga(
541                        saga_id,
542                        SagaOutcome::Failed {
543                            reason: reason.clone(),
544                        },
545                    ) {
546                        crate::log(&format!(
547                            "[saga] saga_id={} terminate_saga(Failed) log write failed: {}",
548                            saga_id, e
549                        ));
550                    }
551                }
552                self.emit_failed(saga_id, reason).await;
553                ApplyOutcome::terminated()
554            }
555        }
556    }
557
558    /// Atomically remove `saga_id` from `in_flight` and return whether
559    /// the caller was the one who removed it (i.e. has the right to
560    /// emit the terminal `SagaCompleted` / `SagaFailed`). Idempotent:
561    /// a second caller for the same saga_id sees `false`. Used by the
562    /// `SagaAction::Done` and `SagaAction::Failed` paths in
563    /// `apply_action`, by the per-saga timeout task in `spawn_saga`,
564    /// by the `Event::SagaActionFailed` listener in `run_coordinator`,
565    /// by the host-send-failure path in `apply_action::IssueCmd::Host`,
566    /// and by the eviction path in `run_coordinator`.
567    /// (reagent P1 PR #644 round 1.)
568    async fn claim_terminal(&self, saga_id: u64) -> bool {
569        let claimed = {
570            let mut registry = self.in_flight.lock().await;
571            registry.remove(&saga_id).is_some()
572        };
573        // Purge any host-pipe pending frames tagged with this saga_id
574        // so a host reconnect post-terminal doesn't drain them as
575        // orphan side effects, and the 30s expiry path doesn't emit
576        // a second SagaFailed for the same saga_id.
577        // (codex + reagent P1 PR #644 round 7.)
578        if claimed {
579            if let Some(pipe) = self.host_pipe.as_ref() {
580                pipe.cancel_saga(saga_id).await;
581            }
582        }
583        claimed
584    }
585
586    /// Register a fresh saga, calling `start` and applying its first
587    /// action. The caller has already determined that the saga
588    /// should fire (e.g. matched a trigger event). Returns the saga's
589    /// allocated id (logged + bracketed in `SagaStarted`).
590    ///
591    /// LSD-2 — also writes a `running` row to the durable saga log
592    /// before invoking `saga.start()`, and (via `apply_action`)
593    /// records the saga's first dispatched step.
594    ///
595    /// CPD-3 — also arms a per-saga deadline timer. If the saga is
596    /// still in_flight when `saga.timeout()` elapses, a background
597    /// task force-fails it (`SagaFailed { reason: "saga timeout" }`)
598    /// and removes it from the registry. Per spec §3.10. Insert-then-
599    /// start ordering is required so `apply_action`'s `claim_terminal`
600    /// guard can succeed for immediate-completion sagas (codex P2 PR
601    /// #644 round 2).
602    async fn spawn_saga(self: &Arc<Self>, saga: Box<dyn Saga>) -> u64 {
603        let saga_id = self.next_id();
604        let name = saga.name();
605        let input = saga.input_snapshot();
606        let saga_timeout = saga.timeout();
607        crate::log(&format!(
608            "[saga] starting saga_id={} name={}",
609            saga_id, name
610        ));
611        // LSD-2 — write the durable `running` row BEFORE
612        // `emit_started` so a crash between this point and
613        // `apply_action` still leaves a recoverable breadcrumb
614        // (LSD-3's walker will upgrade it to `failed_compensation` on
615        // next launcher startup). Mirrors srv's `emit_saga_started`
616        // ordering: durable log before bus.
617        if let Some(log) = self.log.as_ref() {
618            if let Err(e) = log.start_saga(saga_id, name, &input) {
619                crate::log(&format!(
620                    "[saga] saga_id={} start_saga log write failed: {} — continuing (in-memory path remains authoritative for this run)",
621                    saga_id, e
622                ));
623            }
624        }
625        // Emit SagaStarted FIRST so any subscriber buffering by
626        // saga_id sees the bracket open before any per-step events.
627        // Mirrors `agentmux-srv::sagas::emit_saga_started` ordering.
628        self.emit_started(saga_id, name).await;
629
630        // CPD-3 — insert the saga into in_flight BEFORE calling
631        // start(), so that if start() returns SagaAction::Done or
632        // SagaAction::Failed immediately, apply_action's terminal
633        // paths can claim_terminal successfully and emit the matching
634        // SagaCompleted/SagaFailed bracket. Pre-round-3 we started
635        // saga.start() before insertion, which broke immediate-
636        // completion sagas (claim_terminal returned false → no
637        // terminal event → dangling SagaStarted bracket).
638        // (codex P2 PR #644 round 2.)
639        let action = {
640            let mut registry = self.in_flight.lock().await;
641            // CPD-4 — concurrent same-kind sagas now correlate by
642            // saga_id (`Saga::on_event` filters on
643            // `ctx.saga_id == event.saga_id`). The pre-CPD-4
644            // observability warning (PR #634 codex P1) is retained
645            // at info level for operators tracking concurrency
646            // density via `--diag wrr`, but the underlying
647            // correctness issue is closed.
648            let same_kind_count = registry
649                .values()
650                .filter(|s| s.saga.name() == name)
651                .count();
652            if same_kind_count >= 1 {
653                crate::log(&format!(
654                    "[saga] starting {} saga_id={} while {} other(s) of same kind in flight (CPD-4 per-saga correlation — sagas coexist cleanly)",
655                    name, saga_id, same_kind_count,
656                ));
657            }
658            registry.insert(
659                saga_id,
660                InFlightSaga {
661                    saga,
662                    awaiting_step: None,
663                    next_step_index: 0,
664                },
665            );
666            // Drive start() while still under the registry lock so
667            // we have a mutable reference to the just-inserted saga.
668            // start() is non-async and does no I/O — bounded hold time.
669            let in_flight_saga = registry.get_mut(&saga_id).expect("just inserted");
670            let ctx = SagaCtx { saga_id };
671            in_flight_saga.saga.start(&ctx)
672        };
673
674        // First step on a fresh saga is index 0. apply_action may
675        // remove the saga via claim_terminal (Done/Failed/host-send-
676        // error) without re-locking the registry from here.
677        let outcome = self.apply_action(saga_id, name, action, 0).await;
678        if outcome.in_flight {
679            // Update the bookkeeping for the freshly-issued step.
680            // If the first action allocated step 0, the saga is now
681            // parked awaiting that step's echo event; next allocation
682            // is index 1. Otherwise (Wait — saga has no first
683            // dispatch yet), keep the counter at 0.
684            let mut registry = self.in_flight.lock().await;
685            if let Some(in_flight_saga) = registry.get_mut(&saga_id) {
686                in_flight_saga.awaiting_step = outcome.awaiting_step;
687                in_flight_saga.next_step_index =
688                    if outcome.awaiting_step.is_some() { 1 } else { 0 };
689            }
690            drop(registry);
691
692            // CPD-3 — arm the per-saga deadline timer. If the saga
693            // is still in_flight when `saga_timeout` elapses, fail
694            // it out. Spawned task captures an Arc clone of the
695            // coordinator so it can outlive the saga's deadline.
696            // Uses `claim_terminal` so it doesn't race with the
697            // normal Done/Failed path. (reagent P1 PR #644 round 1.)
698            let coord_for_timeout = Arc::clone(self);
699            tokio::spawn(async move {
700                tokio::time::sleep(saga_timeout).await;
701                if coord_for_timeout.claim_terminal(saga_id).await {
702                    crate::log(&format!(
703                        "[saga] saga_id={} name={} timed out after {:?} — emitting SagaFailed",
704                        saga_id, name, saga_timeout
705                    ));
706                    let reason = "saga timeout".to_string();
707                    if let Some(log) = coord_for_timeout.log.as_ref() {
708                        if let Err(e) = log.terminate_saga(
709                            saga_id,
710                            SagaOutcome::Failed {
711                                reason: reason.clone(),
712                            },
713                        ) {
714                            crate::log(&format!(
715                                "[saga] saga_id={} timeout terminate_saga(Failed) log write failed: {}",
716                                saga_id, e
717                            ));
718                        }
719                    }
720                    coord_for_timeout
721                        .emit_failed(saga_id, reason)
722                        .await;
723                }
724            });
725        }
726        saga_id
727    }
728}
729
730/// LSD-2 — outcome of `apply_action`. Combines the prior `bool` (is
731/// the saga still in flight?) with the new `awaiting_step` book-
732/// keeping the coordinator needs to call `LauncherSagaLog::finish_step`
733/// when the awaited bus event lands.
734#[derive(Debug, Clone, Copy)]
735struct ApplyOutcome {
736    in_flight: bool,
737    /// If the action was an `IssueCmd`, the step index it allocated;
738    /// the coordinator parks this on the InFlightSaga. `None` for
739    /// `Wait` / `Done` / `Failed`.
740    awaiting_step: Option<u32>,
741}
742
743impl ApplyOutcome {
744    fn in_flight_awaiting(step_index: u32) -> Self {
745        Self {
746            in_flight: true,
747            awaiting_step: Some(step_index),
748        }
749    }
750    fn in_flight_no_change() -> Self {
751        Self {
752            in_flight: true,
753            awaiting_step: None,
754        }
755    }
756    fn terminated() -> Self {
757        Self {
758            in_flight: false,
759            awaiting_step: None,
760        }
761    }
762}
763
764/// LSD-2 — short, greppable name for a `Command` dispatched as part
765/// of a saga step. Mirrors srv's `command_discriminant_name` in spirit
766/// (snake_case strings rather than `Debug` formatting) but prefixes
767/// with `issue_cmd_<target>_<discriminant>` so `--diag sagas` output
768/// makes the dispatch target obvious without a separate column lookup.
769///
770/// Falls back to `issue_cmd_<target>_unknown` for variants serde can't
771/// stringify (shouldn't happen for the snake_case-tagged Command enum;
772/// defensive default).
773fn derive_step_name(cmd: &Command, target: PipeTarget) -> String {
774    let target_str = match target {
775        PipeTarget::LauncherSelf => "launcher_self",
776        PipeTarget::Host => "host",
777        PipeTarget::Srv => "srv",
778    };
779    let discriminant = match serde_json::to_value(cmd) {
780        Ok(serde_json::Value::Object(map)) => map
781            .get("cmd")
782            .and_then(|v| v.as_str())
783            .map(|s| s.to_string())
784            .unwrap_or_else(|| "unknown".to_string()),
785        _ => "unknown".to_string(),
786    };
787    format!("issue_cmd_{}_{}", target_str, discriminant)
788}
789
790/// CPD-3 — fill in the `saga_id` field on a host-bound `Command`
791/// before dispatch. Sagas construct their `IssueCmd` actions with a
792/// placeholder `saga_id: 0` (they don't know their coordinator-
793/// allocated id at action-construction time); the coordinator
794/// rewrites the field at dispatch time so the host can echo it back
795/// on the matching `Report*`.
796///
797/// Exhaustive match: any host-bound Command variant added later that
798/// forgets to plumb `saga_id` will refuse to compile here. Non-host-
799/// bound variants (Report*, Identify, etc.) panic loudly because
800/// sagas only emit `IssueCmd::Host` for the three host-bound
801/// command kinds covered below.
802fn inject_saga_id(cmd: Command, saga_id: u64) -> Command {
803    match cmd {
804        Command::SpawnPoolWindow { .. } => Command::SpawnPoolWindow { saga_id },
805        Command::ReapPanes { label, .. } => Command::ReapPanes { label, saga_id },
806        Command::DrainPoolIfLast { label, .. } => {
807            Command::DrainPoolIfLast { label, saga_id }
808        }
809        // Defense-in-depth: every non-host-dispatched Command variant
810        // (Report*, Identify, etc.) is a coding bug if it reaches
811        // this point — the F.5/F.6 sagas only emit IssueCmd::Host
812        // for the three command kinds above.
813        other => panic!(
814            "inject_saga_id called on non-host-bound Command variant: {:?}",
815            other
816        ),
817    }
818}
819
820/// Inspect a bus event for "should this start a fresh saga?" Returns
821/// the constructed saga (boxed) on a hit; `None` otherwise.
822///
823/// Triggers wired to date:
824/// * F.5: `Event::PoolWindowPromoted` → `pool_respawn::PoolRespawn`
825/// * F.6: `Event::WindowClosed` →
826///   `window_cleanup::WindowCleanupCascade`
827///
828/// Future sagas extend this match.
829///
830/// CPD-4 — the coordinator no longer evicts same-kind in-flight
831/// sagas before `spawn_saga`; per-saga event correlation
832/// (`Saga::on_event` filters by `ctx.saga_id`) lets concurrent
833/// same-kind sagas coexist cleanly. The pre-CPD-4 evict-and-replace
834/// gate (PR #634 round 3) is removed in `run_coordinator`.
835fn match_trigger(event: &Event) -> Option<Box<dyn Saga>> {
836    match event {
837        Event::PoolWindowPromoted { label, .. } => {
838            Some(Box::new(pool_respawn::PoolRespawn::new(label.clone())))
839        }
840        // (codex P1 PR #637.) Only fire the cleanup cascade on
841        // CLEAN closes. `Event::WindowClosed { crash_detected: true }`
842        // comes from `wrr::apply_hwnd_destroyed` after a host/renderer
843        // crash; the host never sent `ReportPanesReaped` /
844        // `ReportPoolDrainDecision`, so the saga would stay
845        // in-flight indefinitely (only cleared by a later same-kind
846        // eviction or launcher restart) leaving a SagaStarted
847        // bracket dangling for subscribers that buffer on lifecycle.
848        Event::WindowClosed { label, crash_detected: false, .. } => Some(Box::new(
849            window_cleanup::WindowCleanupCascade::new(label.clone()),
850        )),
851        _ => None,
852    }
853}
854
855/// Run the coordinator's bus-subscription loop.
856///
857/// **Receiver is passed in, not subscribed inside.** Subscribing
858/// before `tokio::spawn` (in `main.rs`) ensures events emitted
859/// between coordinator construction and the first `recv()` aren't
860/// lost to the race window. Same pattern as `event_log::run_disk_writer`.
861/// (reagent P2 PR #609.)
862///
863/// Dispatch order on each event:
864///   1. Match against trigger table → start any new sagas via
865///      `spawn_saga` (which emits `SagaStarted` and applies the
866///      saga's initial action).
867///   2. Feed the same event into every in-flight saga's `on_event`.
868///      Sagas returning `Done` / `Failed` are removed from
869///      `in_flight` after their lifecycle event is emitted.
870///
871/// **Self-emitted events (`SagaStarted` / `SagaCompleted` /
872/// `SagaFailed`) are NOT re-fed into sagas.** A saga reacting to its
873/// own start event would loop. Filtered at the top of the dispatch
874/// path.
875pub async fn run_coordinator(
876    coord: Arc<SagaCoordinator>,
877    mut events_rx: tokio::sync::broadcast::Receiver<Event>,
878) {
879    crate::log("[saga] coordinator started");
880
881    loop {
882        match events_rx.recv().await {
883            Ok(event) => {
884                // Skip our own lifecycle events to avoid loops.
885                if matches!(
886                    event,
887                    Event::SagaStarted { .. } | Event::SagaCompleted { .. } | Event::SagaFailed { .. }
888                ) {
889                    continue;
890                }
891
892                // CPD-3 — host reported via `Command::ReportSagaActionFailed`
893                // that a saga-issued action failed. Launcher reducer
894                // translates this into `Event::SagaActionFailed`
895                // (CPD-1 schema). Terminate the matching saga
896                // immediately rather than waiting for a Report* that
897                // won't come.
898                if let Event::SagaActionFailed { saga_id, reason, .. } = &event {
899                    let saga_id = *saga_id;
900                    let reason = reason.clone();
901                    // claim_terminal: atomic take-ownership of the
902                    // terminal-event slot, preventing duplicate-emission
903                    // races with the saga's own Done/Failed path or its
904                    // timeout task. (reagent P1 PR #644 round 1.)
905                    if coord.claim_terminal(saga_id).await {
906                        crate::log(&format!(
907                            "[saga] saga_id={} terminating from host SagaActionFailed reason={}",
908                            saga_id, reason
909                        ));
910                        let full_reason = format!("host action failed: {}", reason);
911                        if let Some(log) = coord.log.as_ref() {
912                            if let Err(e) = log.terminate_saga(
913                                saga_id,
914                                SagaOutcome::Failed {
915                                    reason: full_reason.clone(),
916                                },
917                            ) {
918                                crate::log(&format!(
919                                    "[saga] saga_id={} SagaActionFailed terminate_saga(Failed) log write failed: {}",
920                                    saga_id, e
921                                ));
922                            }
923                        }
924                        coord.emit_failed(saga_id, full_reason).await;
925                    }
926                    continue;
927                }
928
929                // Step 1 — start any new sagas this event triggers.
930                //
931                // **CPD-4 — evict-and-replace retired.** Pre-CPD-4
932                // (codex P1 PR #634 round 3) the coordinator forced
933                // at-most-one same-kind saga in flight by emitting
934                // `SagaFailed { reason: "evicted" }` on the prior
935                // saga whenever a fresh trigger landed. That was a
936                // workaround for the broadcast-routing limitation:
937                // every `PoolWindowAdded` / `PanesReaped` /
938                // `PoolDrained` / `PoolNotLast` advanced *every*
939                // matching in-flight saga, so two concurrent sagas
940                // would cross-talk and complete each other's bracket.
941                //
942                // CPD-4 closes that limitation. Saga `on_event` now
943                // filters by `event.saga_id == ctx.saga_id`
944                // (`pool_respawn::on_event`,
945                // `window_cleanup::on_event`). Concurrent same-kind
946                // sagas coexist; each consumes only events tagged
947                // with its own coordinator-allocated id. Eviction is
948                // no longer needed — and would actively harm
949                // correctness (a healthy saga gets `SagaFailed`d the
950                // moment a sibling fires).
951                //
952                // Per spec §3.7 + execution-plan §2 batch 3.
953                if let Some(saga) = match_trigger(&event) {
954                    coord.spawn_saga(saga).await;
955                }
956
957                // Step 2 — feed the event into every in-flight saga.
958                // Two-pass to avoid holding the registry lock across
959                // `apply_action` (which itself locks state to bump
960                // version when emitting SagaCompleted/Failed).
961                //
962                // LSD-2 — when a saga's `on_event` returns anything
963                // OTHER than `Wait`, it has consumed its awaited bus
964                // event; record the step's success in the durable log
965                // (`finish_step(saga_id, awaiting_step, &event)`) before
966                // dispatching the next action. The new action then
967                // either parks the saga on a fresh step (next index
968                // pulled from `next_step_index`) or terminates it.
969                struct PendingAction {
970                    saga_id: u64,
971                    name: &'static str,
972                    action: SagaAction,
973                    /// Awaited step index that this on_event return
974                    /// "consumed" — `Some` only when on_event returned
975                    /// non-`Wait`. Caller will `finish_step` on it
976                    /// before invoking `apply_action` for the new
977                    /// action.
978                    consumed_step: Option<u32>,
979                    /// Next free step index for this saga; if the new
980                    /// action is `IssueCmd`, `apply_action` writes to
981                    /// this index.
982                    next_idx: u32,
983                }
984                let actions: Vec<PendingAction> = {
985                    let mut in_flight = coord.in_flight.lock().await;
986                    let mut out = Vec::new();
987                    for (saga_id, in_flight_saga) in in_flight.iter_mut() {
988                        let ctx = SagaCtx { saga_id: *saga_id };
989                        let action = in_flight_saga.saga.on_event(&event, &ctx);
990                        // If the saga consumed its awaited event,
991                        // capture the awaited index now and clear it.
992                        // `Wait` keeps awaiting_step unchanged.
993                        let consumed_step = if matches!(action, SagaAction::Wait) {
994                            None
995                        } else {
996                            in_flight_saga.awaiting_step.take()
997                        };
998                        let next_idx = in_flight_saga.next_step_index;
999                        out.push(PendingAction {
1000                            saga_id: *saga_id,
1001                            name: in_flight_saga.saga.name(),
1002                            action,
1003                            consumed_step,
1004                            next_idx,
1005                        });
1006                    }
1007                    out
1008                };
1009                for pending in actions {
1010                    let PendingAction {
1011                        saga_id,
1012                        name,
1013                        action,
1014                        consumed_step,
1015                        next_idx,
1016                    } = pending;
1017                    // LSD-2 — record the awaited step's success in
1018                    // the durable log. `event` is the event that
1019                    // caused the saga to advance.
1020                    if let Some(idx) = consumed_step {
1021                        if let Some(log) = coord.log.as_ref() {
1022                            if let Err(e) = log.finish_step(saga_id, idx, &event) {
1023                                crate::log(&format!(
1024                                    "[saga] saga_id={} finish_step log write failed: {}",
1025                                    saga_id, e
1026                                ));
1027                            }
1028                        }
1029                    }
1030                    let issued_cmd = matches!(action, SagaAction::IssueCmd { .. });
1031                    let outcome = coord.apply_action(saga_id, name, action, next_idx).await;
1032                    if !outcome.in_flight {
1033                        coord.in_flight.lock().await.remove(&saga_id);
1034                    } else if let Some(awaited) = outcome.awaiting_step {
1035                        // Update the saga's bookkeeping to reflect
1036                        // the freshly-issued step.
1037                        let mut registry = coord.in_flight.lock().await;
1038                        if let Some(in_flight_saga) = registry.get_mut(&saga_id) {
1039                            in_flight_saga.awaiting_step = Some(awaited);
1040                            // Bump only if this dispatch consumed the
1041                            // pre-allocated index (always true for
1042                            // IssueCmd; defensive guard).
1043                            if issued_cmd {
1044                                in_flight_saga.next_step_index = awaited + 1;
1045                            }
1046                        }
1047                    }
1048                }
1049            }
1050            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1051                crate::log(&format!("[saga] coordinator lagged, missed {} events", n));
1052            }
1053            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1054                crate::log("[saga] coordinator stopping (bus closed)");
1055                return;
1056            }
1057        }
1058    }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063    use super::*;
1064    use agentmux_common::ipc::{ClientKind, LifecyclePhase};
1065
1066    /// Smoke test from E.1a, retained: coordinator drains unrelated
1067    /// events without panicking. F.5 expanded the loop body but the
1068    /// invariant — non-trigger events are no-ops — still holds.
1069    #[tokio::test]
1070    async fn coordinator_drains_unrelated_events_without_panic() {
1071        let (events_tx, _rx) = tokio::sync::broadcast::channel::<Event>(64);
1072        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1073        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1074        let coord_rx = events_tx.subscribe();
1075        let handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1076
1077        for v in 1..=5 {
1078            let _ = events_tx.send(Event::LifecyclePhaseChanged {
1079                from: LifecyclePhase::Starting,
1080                to: LifecyclePhase::Running,
1081                version: v,
1082            });
1083        }
1084        let _ = events_tx.send(Event::ProcessSpawned {
1085            pid: 42,
1086            kind: ClientKind::Tool,
1087            client_version: "test".into(),
1088            version: 6,
1089        });
1090
1091        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1092        // No sagas should be in flight (no trigger events).
1093        assert!(coord.in_flight.lock().await.is_empty());
1094        drop(events_tx);
1095        let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
1096    }
1097
1098    #[test]
1099    fn next_id_is_monotonic() {
1100        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
1101        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1102        let coord = SagaCoordinator::new(events_tx, state);
1103        let a = coord.next_id();
1104        let b = coord.next_id();
1105        let c = coord.next_id();
1106        assert!(a < b);
1107        assert!(b < c);
1108    }
1109
1110    /// F.5 — verify the trigger table picks up `PoolWindowPromoted`.
1111    /// The end-to-end coordinator test lives in
1112    /// `pool_respawn::tests::coordinator_brackets_promote_with_saga_lifecycle_events`
1113    /// where the pool-respawn saga's expected behavior is asserted.
1114    #[test]
1115    fn match_trigger_pool_window_promoted_starts_pool_respawn() {
1116        let event = Event::PoolWindowPromoted {
1117            label: "window-pool-abc".into(),
1118            version: 1,
1119        };
1120        let saga = match_trigger(&event).expect("PoolWindowPromoted should trigger a saga");
1121        assert_eq!(saga.name(), "pool_respawn_on_promote");
1122    }
1123
1124    /// F.6 — verify the trigger table picks up `WindowClosed`. The
1125    /// end-to-end coordinator test lives in
1126    /// `window_cleanup::tests::coordinator_brackets_close_with_saga_lifecycle_events`.
1127    #[test]
1128    fn match_trigger_window_closed_starts_window_cleanup_cascade() {
1129        let event = Event::WindowClosed {
1130            label: "main".into(),
1131            version: 1,
1132            crash_detected: false,
1133        };
1134        let saga = match_trigger(&event).expect("WindowClosed should trigger a saga");
1135        assert_eq!(saga.name(), "window_cleanup_cascade");
1136    }
1137
1138    /// (codex P1 PR #637 round 2.) Crash-detected closes (originating
1139    /// from `wrr::apply_hwnd_destroyed`) must NOT trigger the saga —
1140    /// the host never sent the cleanup reports, so the saga would
1141    /// stay in-flight forever.
1142    #[test]
1143    fn match_trigger_skips_crash_detected_window_closed() {
1144        let event = Event::WindowClosed {
1145            label: "crashed-window".into(),
1146            version: 1,
1147            crash_detected: true,
1148        };
1149        assert!(
1150            match_trigger(&event).is_none(),
1151            "crash-detected close should NOT spawn a saga",
1152        );
1153    }
1154
1155    #[test]
1156    fn match_trigger_returns_none_for_non_trigger_events() {
1157        let cases = vec![
1158            Event::PoolWindowAdded {
1159                label: "window-pool-abc".into(),
1160                version: 1,
1161                saga_id: None,
1162            },
1163            Event::PoolWindowRemoved {
1164                label: "window-pool-abc".into(),
1165                version: 1,
1166            },
1167            Event::WindowOpened {
1168                label: "main".into(),
1169                kind: agentmux_common::ipc::WindowKind::FullInstance,
1170                parent_label: None,
1171                version: 1,
1172            },
1173            Event::LifecyclePhaseChanged {
1174                from: LifecyclePhase::Starting,
1175                to: LifecyclePhase::Running,
1176                version: 1,
1177            },
1178            // F.6 step-1/step-2 terminal events are NOT triggers
1179            // themselves — the saga consumes them, but receiving one
1180            // when no saga is in flight should be a no-op.
1181            Event::PanesReaped {
1182                label: "main".into(),
1183                version: 1,
1184                saga_id: None,
1185            },
1186            Event::PoolDrained {
1187                label: "main".into(),
1188                version: 1,
1189                saga_id: None,
1190            },
1191            Event::PoolNotLast {
1192                label: "main".into(),
1193                version: 1,
1194                saga_id: None,
1195            },
1196        ];
1197        for event in cases {
1198            assert!(
1199                match_trigger(&event).is_none(),
1200                "non-trigger event spawned a saga: {:?}",
1201                event
1202            );
1203        }
1204    }
1205
1206    /// Saga lifecycle events on the bus must not feed back into
1207    /// sagas (would cause loops). Sanity-check the filter directly.
1208    #[tokio::test]
1209    async fn coordinator_does_not_self_trigger_on_lifecycle_events() {
1210        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
1211        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1212        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1213        let coord_rx = events_tx.subscribe();
1214        let handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1215        tokio::task::yield_now().await;
1216
1217        // Push a SagaStarted-shaped event; if the coordinator
1218        // re-spawned, it would eventually exhaust the saga_id atomic
1219        // by looping. We verify simpler: in_flight stays empty.
1220        let _ = events_tx.send(Event::SagaStarted {
1221            saga_id: 999,
1222            name: "foreign_saga".into(),
1223            version: 1,
1224        });
1225        let _ = events_tx.send(Event::SagaCompleted {
1226            saga_id: 999,
1227            version: 2,
1228        });
1229        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1230        assert!(coord.in_flight.lock().await.is_empty());
1231        drop(events_tx);
1232        let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
1233    }
1234
1235    // ---------- Phase F.7 — saga-lifecycle property tests ----------
1236    //
1237    // Random sequences of `WindowClosed` / `PoolWindowPromoted`
1238    // events fed through `match_trigger`. Asserts:
1239    //   1. `WindowClosed { crash_detected: true }` NEVER produces a
1240    //      saga (F.6 round-2 gate, codex P1 #637).
1241    //   2. `WindowClosed { crash_detected: false }` ALWAYS produces
1242    //      exactly one window_cleanup_cascade saga.
1243    //   3. `PoolWindowPromoted` ALWAYS produces exactly one
1244    //      pool_respawn_on_promote saga.
1245    //   4. The saga's name (the (kind, key) tuple's "kind" component)
1246    //      is one of the two known names — no foreign saga slips
1247    //      through the trigger table.
1248    //
1249    // The end-to-end coordinator-level "exactly one terminal lifecycle
1250    // event per saga" assertion lives in
1251    // `pool_respawn::tests::coordinator_brackets_promote_with_saga_lifecycle_events`
1252    // and `window_cleanup::tests::coordinator_brackets_close_with_saga_lifecycle_events`
1253    // (already shipped in F.5 + F.6). The proptests here add the
1254    // randomized-input dimension those happy-path tests don't cover.
1255    //
1256    // Cases capped at 64 (default 1024 too slow for CI). Same bound
1257    // the srv reducer's E.7 proptest uses.
1258
1259    use proptest::prelude::*;
1260
1261    /// Trigger event variants the F.5/F.6 sagas can be triggered by,
1262    /// plus crash-detected close (the negative case).
1263    #[derive(Debug, Clone)]
1264    enum F7TriggerEvent {
1265        /// Should spawn `window_cleanup_cascade`.
1266        WindowClosedClean { label: String },
1267        /// Should NOT spawn anything (F.6 round-2 gate).
1268        WindowClosedCrashed { label: String },
1269        /// Should spawn `pool_respawn_on_promote`.
1270        PoolWindowPromoted { label: String },
1271        /// A non-trigger event — must produce no saga.
1272        Unrelated,
1273    }
1274
1275    fn f7_trigger_strategy() -> impl Strategy<Value = F7TriggerEvent> {
1276        prop_oneof![
1277            3 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::WindowClosedClean { label }),
1278            1 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::WindowClosedCrashed { label }),
1279            3 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::PoolWindowPromoted { label }),
1280            2 => Just(F7TriggerEvent::Unrelated),
1281        ]
1282    }
1283
1284    fn make_event(t: F7TriggerEvent, version: u64) -> Event {
1285        match t {
1286            F7TriggerEvent::WindowClosedClean { label } => Event::WindowClosed {
1287                label,
1288                version,
1289                crash_detected: false,
1290            },
1291            F7TriggerEvent::WindowClosedCrashed { label } => Event::WindowClosed {
1292                label,
1293                version,
1294                crash_detected: true,
1295            },
1296            F7TriggerEvent::PoolWindowPromoted { label } => {
1297                Event::PoolWindowPromoted { label, version }
1298            }
1299            F7TriggerEvent::Unrelated => Event::Pong { nonce: 0, version },
1300        }
1301    }
1302
1303    proptest! {
1304        #![proptest_config(ProptestConfig {
1305            cases: 64,
1306            ..ProptestConfig::default()
1307        })]
1308
1309        /// For every event in a random sequence, `match_trigger`
1310        /// returns the expected saga (or None for crash / unrelated).
1311        /// Exhaustive shape check — catches accidental regressions
1312        /// in the trigger table.
1313        #[test]
1314        fn f7_match_trigger_matches_expected_saga_kind(
1315            triggers in prop::collection::vec(f7_trigger_strategy(), 0..40)
1316        ) {
1317            for (i, t) in triggers.into_iter().enumerate() {
1318                let event = make_event(t.clone(), (i + 1) as u64);
1319                let saga = match_trigger(&event);
1320                match (&t, &saga) {
1321                    (F7TriggerEvent::WindowClosedClean { .. }, Some(s)) => {
1322                        prop_assert_eq!(s.name(), "window_cleanup_cascade");
1323                    }
1324                    (F7TriggerEvent::PoolWindowPromoted { .. }, Some(s)) => {
1325                        prop_assert_eq!(s.name(), "pool_respawn_on_promote");
1326                    }
1327                    (F7TriggerEvent::WindowClosedCrashed { .. }, None) => {
1328                        // Expected — crash-detected close must never
1329                        // spawn a saga (F.6 codex P1 #637).
1330                    }
1331                    (F7TriggerEvent::Unrelated, None) => {
1332                        // Expected — non-trigger event.
1333                    }
1334                    (other_t, other_s) => {
1335                        prop_assert!(
1336                            false,
1337                            "trigger {:?} produced unexpected saga match: {:?}",
1338                            other_t,
1339                            other_s.as_ref().map(|s| s.name()),
1340                        );
1341                    }
1342                }
1343            }
1344        }
1345
1346        /// Crash-detected `WindowClosed` events NEVER produce a
1347        /// saga, regardless of label. Reinforces invariant #1
1348        /// independently — a stricter shrinker target.
1349        #[test]
1350        fn f7_crash_detected_close_never_spawns_saga(
1351            label in "[a-c]{1,3}",
1352            version in 1u64..1_000_000,
1353        ) {
1354            let event = Event::WindowClosed { label, version, crash_detected: true };
1355            prop_assert!(
1356                match_trigger(&event).is_none(),
1357                "crash-detected WindowClosed must not spawn a saga",
1358            );
1359        }
1360
1361        /// Clean-close `WindowClosed` ALWAYS produces a
1362        /// window_cleanup_cascade saga, regardless of label.
1363        #[test]
1364        fn f7_clean_close_always_spawns_window_cleanup_cascade(
1365            label in "[a-c]{1,3}",
1366            version in 1u64..1_000_000,
1367        ) {
1368            let event = Event::WindowClosed { label, version, crash_detected: false };
1369            let saga = match_trigger(&event)
1370                .expect("clean-close should spawn a saga");
1371            prop_assert_eq!(saga.name(), "window_cleanup_cascade");
1372        }
1373    }
1374
1375    /// **CPD-4 — concurrent same-kind sagas coexist without
1376    /// cross-talk.** Random sequences of trigger events drive the
1377    /// coordinator; the registry is allowed to hold multiple sagas of
1378    /// the same kind simultaneously (the pre-CPD-4 evict-and-replace
1379    /// gate is retired). Invariant under test: NO `SagaFailed` event
1380    /// fires with reason `"evicted"` — sagas only terminate via
1381    /// `Done` (their tagged echo lands) or via the saga timeout
1382    /// (~5-30s, far longer than this test's 50ms settling window).
1383    ///
1384    /// Replaces `f7_evict_and_replace_keeps_one_saga_per_kind`. The
1385    /// behavioral inversion is the core of CPD-4: pre-CPD-4 we
1386    /// asserted ≤1 in-flight per kind (and accepted premature
1387    /// `SagaFailed { reason: "evicted" }`); CPD-4 asserts no
1388    /// premature evictions.
1389    #[tokio::test]
1390    async fn cpd4_concurrent_sagas_no_eviction() {
1391        // Tiny LCG for deterministic per-seed sequences. Glibc's
1392        // constants — quality irrelevant; we just need reproducible
1393        // bit patterns across CI runs.
1394        fn lcg_next(state: &mut u64) -> u64 {
1395            *state = state
1396                .wrapping_mul(1_103_515_245)
1397                .wrapping_add(12_345);
1398            *state
1399        }
1400
1401        let seeds: &[u64] = &[1, 7, 42, 99, 1234, 5678, 0xDEADBEEF, 0xC0FFEE];
1402        for seed in seeds {
1403            let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1404            let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1405            let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1406            let mut witness = events_tx.subscribe();
1407            let coord_rx = events_tx.subscribe();
1408            let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1409            tokio::task::yield_now().await;
1410
1411            // Build a deterministic sequence from the seed.
1412            let mut rng = *seed;
1413            let labels = ["a", "b", "c"];
1414            for v in 1u64..=20 {
1415                let label_idx = (lcg_next(&mut rng) % labels.len() as u64) as usize;
1416                let label = labels[label_idx].to_string();
1417                let pick = lcg_next(&mut rng) % 3;
1418                let event = match pick {
1419                    0 => Event::WindowClosed {
1420                        label,
1421                        version: v,
1422                        crash_detected: false,
1423                    },
1424                    1 => Event::PoolWindowPromoted { label, version: v },
1425                    _ => Event::Pong { nonce: v, version: v },
1426                };
1427                let _ = events_tx.send(event);
1428                // Yield so the coordinator processes one event before
1429                // we send the next. Tightens the concurrent-overlap
1430                // window.
1431                tokio::task::yield_now().await;
1432            }
1433            // Brief settling — give the coordinator time to process
1434            // the last few events + emit terminal lifecycles.
1435            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1436
1437            // Invariant: NO SagaFailed with "evicted" reason.
1438            // We don't assert the registry size — concurrent sagas
1439            // are allowed to pile up under per-saga correlation
1440            // because we never feed them their tagged terminal
1441            // events in this test. They'll reach their timeout
1442            // eventually, but not within 50ms.
1443            let mut evictions = 0u32;
1444            while let Ok(Ok(ev)) = tokio::time::timeout(
1445                std::time::Duration::from_millis(5),
1446                witness.recv(),
1447            )
1448            .await
1449            {
1450                if let Event::SagaFailed { reason, .. } = ev {
1451                    if reason.contains("evicted") {
1452                        evictions += 1;
1453                    }
1454                }
1455            }
1456            assert_eq!(
1457                evictions, 0,
1458                "seed {} — CPD-4 invariant violated: observed {} SagaFailed events with reason='evicted'; \
1459                 evict-and-replace should be retired so concurrent same-kind sagas coexist",
1460                seed, evictions,
1461            );
1462        }
1463    }
1464
1465    /// Saga forward-path emission contract: every saga that
1466    /// terminates emits exactly ONE terminal lifecycle event
1467    /// (`SagaCompleted` xor `SagaFailed`). Drives a small batch of
1468    /// triggers through the coordinator and counts emitted
1469    /// lifecycle events on the bus. Catches regressions where a
1470    /// saga's `Done` action somehow fires both `emit_completed`
1471    /// and `emit_failed`, or neither.
1472    #[tokio::test]
1473    async fn f7_each_saga_emits_exactly_one_terminal_event() {
1474        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1475        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1476        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1477        let mut witness = events_tx.subscribe();
1478        let coord_rx = events_tx.subscribe();
1479        let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1480        tokio::task::yield_now().await;
1481
1482        // Drive ONE clean cleanup-cascade saga + ONE pool-respawn
1483        // saga to completion. CPD-4: route terminals by saga_id, so
1484        // we trigger each saga, capture its allocated id from
1485        // SagaStarted, then send the matching tagged terminal events.
1486        let _ = events_tx.send(Event::WindowClosed {
1487            label: "main".into(),
1488            version: 1,
1489            crash_detected: false,
1490        });
1491        let _ = events_tx.send(Event::PoolWindowPromoted {
1492            label: "pool-1".into(),
1493            version: 2,
1494        });
1495
1496        // Drain the bus with a deadline. Count one terminal event
1497        // (SagaCompleted xor SagaFailed) per saga_id. As each
1498        // SagaStarted lands, dispatch the matching tagged terminals.
1499        let mut started_ids = std::collections::HashSet::<u64>::new();
1500        let mut cleanup_saga_id: Option<u64> = None;
1501        let mut respawn_saga_id: Option<u64> = None;
1502        let mut version: u64 = 100;
1503        let mut terminal_for_id = std::collections::HashMap::<u64, &'static str>::new();
1504        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
1505        while std::time::Instant::now() < deadline {
1506            match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1507            {
1508                Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
1509                    started_ids.insert(saga_id);
1510                    if name == "window_cleanup_cascade" && cleanup_saga_id.is_none() {
1511                        cleanup_saga_id = Some(saga_id);
1512                        version += 1;
1513                        let _ = events_tx.send(Event::PanesReaped {
1514                            label: "main".into(),
1515                            version,
1516                            saga_id: Some(saga_id),
1517                        });
1518                    } else if name == "pool_respawn_on_promote" && respawn_saga_id.is_none() {
1519                        respawn_saga_id = Some(saga_id);
1520                        version += 1;
1521                        let _ = events_tx.send(Event::PoolWindowAdded {
1522                            label: "pool-2".into(),
1523                            version,
1524                            saga_id: Some(saga_id),
1525                        });
1526                    }
1527                }
1528                Ok(Ok(Event::PanesReaped { saga_id: Some(sid), .. })) => {
1529                    if Some(sid) == cleanup_saga_id {
1530                        version += 1;
1531                        let _ = events_tx.send(Event::PoolDrained {
1532                            label: "main".into(),
1533                            version,
1534                            saga_id: Some(sid),
1535                        });
1536                    }
1537                }
1538                Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
1539                    let prev = terminal_for_id.insert(saga_id, "completed");
1540                    assert!(
1541                        prev.is_none(),
1542                        "saga_id {} got two terminal events: prior={:?} now=completed",
1543                        saga_id,
1544                        prev,
1545                    );
1546                }
1547                Ok(Ok(Event::SagaFailed { saga_id, .. })) => {
1548                    let prev = terminal_for_id.insert(saga_id, "failed");
1549                    assert!(
1550                        prev.is_none(),
1551                        "saga_id {} got two terminal events: prior={:?} now=failed",
1552                        saga_id,
1553                        prev,
1554                    );
1555                }
1556                Ok(Ok(_)) => {}
1557                Ok(Err(_)) => break,
1558                Err(_) => {
1559                    // No new events for 50ms — we've drained.
1560                    if started_ids.len() == 2
1561                        && started_ids.iter().all(|id| terminal_for_id.contains_key(id))
1562                    {
1563                        break;
1564                    }
1565                }
1566            }
1567        }
1568        // Both sagas started.
1569        assert_eq!(
1570            started_ids.len(),
1571            2,
1572            "expected 2 SagaStarted events (one per saga), got {}: {:?}",
1573            started_ids.len(),
1574            started_ids,
1575        );
1576        // Each got exactly one terminal event.
1577        for id in &started_ids {
1578            assert!(
1579                terminal_for_id.contains_key(id),
1580                "saga_id {} never got a terminal lifecycle event",
1581                id,
1582            );
1583        }
1584        // No spurious terminal events for sagas we didn't observe
1585        // start.
1586        for id in terminal_for_id.keys() {
1587            assert!(
1588                started_ids.contains(id),
1589                "saga_id {} got a terminal event but no SagaStarted observed",
1590                id,
1591            );
1592        }
1593    }
1594
1595    // ---------- LSD-2 — coordinator <-> LauncherSagaLog wiring ------
1596    //
1597    // Three end-to-end tests that drive the coordinator with a real
1598    // in-memory `LauncherSagaLog` and assert lifecycle rows land in
1599    // the expected state. Mirror the test inventory pinned in
1600    // `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §4 PR2.
1601    //
1602    // The tests use `pool_respawn::PoolRespawn` because its single-
1603    // step shape (one IssueCmd, one terminal echo) keeps assertions
1604    // tight; the `window_cleanup_cascade` two-step shape is exercised
1605    // implicitly by the existing F.6 coordinator tests + adds a
1606    // multi-step example in `saga_step_finish_records_event_payload`.
1607
1608    use crate::saga::log::LauncherSagaLog;
1609    use std::sync::Arc as StdArc;
1610
1611    /// Helper: spin up coordinator + in-memory log.
1612    fn spawn_coord_with_log()
1613        -> (StdArc<SagaCoordinator>, StdArc<LauncherSagaLog>, tokio::sync::broadcast::Sender<Event>)
1614    {
1615        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1616        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1617        let log = StdArc::new(LauncherSagaLog::open_in_memory().expect("in-memory log"));
1618        let coord = StdArc::new(
1619            SagaCoordinator::new(events_tx.clone(), Arc::clone(&state))
1620                .with_log(StdArc::clone(&log))
1621                .expect("with_log on fresh in-memory log should succeed"),
1622        );
1623        (coord, log, events_tx)
1624    }
1625
1626    /// LSD-2 — drive a saga (PoolRespawn) through to completion and
1627    /// verify the durable log records:
1628    ///   - `launcher_saga` row with state='completed', non-null
1629    ///     ended_at, no failure_reason, input_json round-trips the
1630    ///     `promoted_label`.
1631    ///   - one step row in 'succeeded' state with output_json
1632    ///     populated.
1633    #[tokio::test]
1634    async fn saga_completes_writes_lifecycle_to_log() {
1635        let (coord, log, events_tx) = spawn_coord_with_log();
1636        let mut witness = events_tx.subscribe();
1637        let coord_rx = events_tx.subscribe();
1638        let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1639        tokio::task::yield_now().await;
1640
1641        // Trigger the saga.
1642        let _ = events_tx.send(Event::PoolWindowPromoted {
1643            label: "window-pool-abc".into(),
1644            version: 1,
1645        });
1646        // CPD-4: wait for SagaStarted to learn the saga_id, then send
1647        // the awaited terminal tagged with that id. Pre-CPD-4 we
1648        // could send `saga_id: None`; under per-saga correlation
1649        // the saga only consumes its own echo.
1650        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1651        while std::time::Instant::now() < deadline {
1652            if let Ok(Ok(Event::SagaStarted { saga_id, .. })) =
1653                tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1654            {
1655                let _ = events_tx.send(Event::PoolWindowAdded {
1656                    label: "window-pool-xyz".into(),
1657                    version: 2,
1658                    saga_id: Some(saga_id),
1659                });
1660                break;
1661            }
1662        }
1663
1664        // Settle: coordinator runs apply_action which awaits state
1665        // mutex + bus send. 200ms is generous for the in-process loop.
1666        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1667
1668        let snapshots = log.snapshot_recent(10).expect("snapshot_recent");
1669        assert_eq!(snapshots.len(), 1, "expected exactly one saga row");
1670        let s = &snapshots[0];
1671        assert_eq!(s.name, "pool_respawn_on_promote");
1672        assert_eq!(s.state, "completed");
1673        assert!(s.ended_at.is_some(), "ended_at must be set on completion");
1674        assert!(s.failure_reason.is_none());
1675        assert_eq!(s.step_count, 1, "expected exactly one succeeded step");
1676        // Input snapshot round-trips the saga's constructor arg.
1677        let parsed: serde_json::Value = serde_json::from_str(&s.input_json).unwrap();
1678        assert_eq!(parsed["promoted_label"], "window-pool-abc");
1679    }
1680
1681    /// **CPD-4 — two concurrent same-kind sagas BOTH complete
1682    /// cleanly.** Repurposed from `saga_fails_writes_failed_state`
1683    /// (which previously verified the evict-and-replace path).
1684    /// Per-saga event correlation lets two `pool_respawn_on_promote`
1685    /// sagas coexist; each consumes its own saga_id-tagged refill
1686    /// echo, so the durable log records two `completed` rows — no
1687    /// `failed` rows from the retired eviction policy.
1688    #[tokio::test]
1689    async fn saga_concurrent_same_kind_both_complete() {
1690        let (coord, log, events_tx) = spawn_coord_with_log();
1691        let mut witness = events_tx.subscribe();
1692        let coord_rx = events_tx.subscribe();
1693        let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1694        tokio::task::yield_now().await;
1695
1696        // First promote — kicks off saga A.
1697        let _ = events_tx.send(Event::PoolWindowPromoted {
1698            label: "window-pool-a".into(),
1699            version: 1,
1700        });
1701        // Second promote BEFORE A's terminal arrives — under CPD-4
1702        // saga B coexists with A (no eviction).
1703        let _ = events_tx.send(Event::PoolWindowPromoted {
1704            label: "window-pool-b".into(),
1705            version: 2,
1706        });
1707
1708        // Capture both saga_ids from their `SagaStarted` brackets,
1709        // then fire each saga's tagged refill so both terminate
1710        // Completed. The order doesn't matter because per-saga
1711        // correlation routes events by id, not by FIFO.
1712        let mut ids = Vec::<u64>::new();
1713        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1714        let mut version: u64 = 100;
1715        while std::time::Instant::now() < deadline {
1716            if let Ok(Ok(ev)) =
1717                tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1718            {
1719                if let Event::SagaStarted { saga_id, name, .. } = ev {
1720                    if name == "pool_respawn_on_promote" {
1721                        ids.push(saga_id);
1722                        version += 1;
1723                        let _ = events_tx.send(Event::PoolWindowAdded {
1724                            label: format!("refill-for-{}", saga_id),
1725                            version,
1726                            saga_id: Some(saga_id),
1727                        });
1728                    }
1729                }
1730                if ids.len() == 2 {
1731                    break;
1732                }
1733            }
1734        }
1735        assert_eq!(ids.len(), 2, "expected 2 SagaStarted events");
1736
1737        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1738
1739        let snapshots = log.snapshot_recent(10).expect("snapshot_recent");
1740        assert_eq!(
1741            snapshots.len(),
1742            2,
1743            "expected two saga rows (concurrent, no eviction)"
1744        );
1745        // BOTH must be 'completed' — eviction is retired.
1746        for s in &snapshots {
1747            assert_eq!(
1748                s.state, "completed",
1749                "CPD-4: both concurrent sagas must complete cleanly, got state={:?} reason={:?}",
1750                s.state, s.failure_reason,
1751            );
1752            assert!(s.failure_reason.is_none());
1753        }
1754    }
1755
1756    /// LSD-2 — when a saga's `on_event` consumes its awaited bus
1757    /// event, the coordinator calls `LauncherSagaLog::finish_step`
1758    /// with the event payload. To inspect `output_json` directly via
1759    /// the public LSD-1 API, we drive a saga only PART of the way —
1760    /// the cleanup cascade's Step 1 lands its echo (`PanesReaped`)
1761    /// but Step 2 is left in `pending` so the saga stays unresolved.
1762    /// `unresolved_sagas` then exposes the full step list including
1763    /// Step 0's `output_json`.
1764    #[tokio::test]
1765    async fn saga_step_finish_records_event_payload() {
1766        let (coord, log, events_tx) = spawn_coord_with_log();
1767        let mut witness = events_tx.subscribe();
1768        let coord_rx = events_tx.subscribe();
1769        let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1770        tokio::task::yield_now().await;
1771
1772        // Trigger the cascade. Window-cleanup has two steps; we feed
1773        // only the first echo (`PanesReaped`) so Step 0 transitions
1774        // pending→succeeded but Step 1 (`DrainPoolIfLast`) stays
1775        // pending — the saga remains in_flight + queryable as
1776        // unresolved.
1777        //
1778        // CPD-4: tag PanesReaped with the saga's id so the saga
1779        // actually advances (per-saga correlation).
1780        let _ = events_tx.send(Event::WindowClosed {
1781            label: "main".into(),
1782            version: 1,
1783            crash_detected: false,
1784        });
1785        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1786        while std::time::Instant::now() < deadline {
1787            if let Ok(Ok(Event::SagaStarted { saga_id, .. })) =
1788                tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1789            {
1790                let _ = events_tx.send(Event::PanesReaped {
1791                    label: "main".into(),
1792                    version: 2,
1793                    saga_id: Some(saga_id),
1794                });
1795                break;
1796            }
1797        }
1798        // INTENTIONALLY no `PoolDrained` / `PoolNotLast`.
1799
1800        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1801
1802        let unresolved = log.unresolved_sagas().expect("unresolved_sagas");
1803        assert_eq!(unresolved.len(), 1, "expected one unresolved saga");
1804        let saga = &unresolved[0];
1805        assert_eq!(saga.name, "window_cleanup_cascade");
1806        assert_eq!(saga.state, "running");
1807        // Two step rows: Step 0 succeeded (PanesReaped echo), Step 1
1808        // pending (DrainPoolIfLast dispatched but no echo arrived).
1809        assert_eq!(saga.steps.len(), 2, "expected 2 step rows");
1810        let step0 = &saga.steps[0];
1811        assert_eq!(step0.step_index, 0);
1812        assert_eq!(step0.state, "succeeded");
1813        assert!(step0.ended_at.is_some(), "succeeded step needs ended_at");
1814        let output_json = step0
1815            .output_json
1816            .as_ref()
1817            .expect("output_json must be set after finish_step");
1818        // Round-trip — the JSON should deserialize to the awaited Event.
1819        let parsed: Event = serde_json::from_str(output_json).expect("event round-trips");
1820        match parsed {
1821            Event::PanesReaped { label, .. } => assert_eq!(label, "main"),
1822            other => panic!("expected PanesReaped, got {:?}", other),
1823        }
1824        let step1 = &saga.steps[1];
1825        assert_eq!(step1.step_index, 1);
1826        assert_eq!(step1.state, "pending");
1827        assert!(step1.output_json.is_none(), "pending step has no output");
1828    }
1829}