agentmux_launcher\saga/
window_cleanup.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase F.6 — window-cleanup cascade saga.
5//
6// **What this saga does**
7//
8// When a user-visible top-level window closes, two cleanup steps
9// happen implicitly in the host today:
10//
11//   1. Browser-pane HWNDs that belong to the closing window get
12//      reaped (lifecycle entries drained, pane HWND map cleared,
13//      subwindow cascade closes children).
14//   2. If that window was the LAST user-visible window, Stage 1 of
15//      the wrr two-stage close cascade in
16//      `agentmux-cef::client::on_before_close` posts WM_CLOSE to
17//      every warm-pool browser so the app can drain to zero.
18//      Otherwise the pool stays warm.
19//
20// Both steps fire inside the same `on_before_close` body. Renderers
21// that want to buffer "this window is closing AND its post-close
22// cleanup is settling" atomically have nothing to pivot on today —
23// the launcher's `Event::WindowClosed` is the only signal, and it
24// fires BEFORE the cleanup steps complete.
25//
26// This saga formalizes the implicit cleanup as an explicit state
27// machine so the renderer sees a `SagaStarted` / `SagaCompleted`
28// bracket. Implementing the spec sketch from
29// `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.2:
30//
31//   saga WindowCleanupCascade(closed_label):
32//       Step 1 — on Event::WindowClosed { label }:
33//                 issue Command::ReapPanes { label } → host
34//                 wait for: Event::PanesReaped { label }
35//       Step 2 — issue Command::DrainPoolIfLast { label } → host
36//                 wait for: Event::PoolDrained or Event::PoolNotLast
37//       Step 3 — Done
38//
39//       Compensation: none (cleanup failure is logged; next close
40//                     retries).
41//
42// **Scope of F.6 (this PR)**
43//
44// F.6 ships:
45//   - the saga state machine itself (this file),
46//   - the wire types (`Command::ReapPanes`,
47//     `Command::DrainPoolIfLast`, `Command::ReportPanesReaped`,
48//     `Command::ReportPoolDrainDecision`, `Event::PanesReaped`,
49//     `Event::PoolDrained`, `Event::PoolNotLast`),
50//   - host-side `report_panes_reaped` + `report_pool_drain_decision`
51//     calls inside `on_before_close`,
52//   - launcher reducer arms translating the reports into the typed
53//     events,
54//   - coordinator wiring that registers the saga as a consumer of
55//     `Event::WindowClosed`.
56//
57// F.6 does NOT ship:
58//   - F.7 (cleanup audit + property tests).
59//   - Launcher-side saga durability — separate concern.
60//
61// **CPD-3 update.** Both `IssueCmd::Host` actions
62// (`Command::ReapPanes` and `Command::DrainPoolIfLast`) are now LIVE:
63// the coordinator dispatches them through `HostPipe::send_command()`
64// (see `agentmux-launcher/src/saga/mod.rs::apply_action`). This saga
65// also overrides `Saga::timeout()` to 30s (vs. 5s default) since
66// pane drain on a workspace with many panes can legitimately take
67// that long — see SPEC_CROSS_PROCESS_DISPATCH §3.10.
68//
69// **Why a saga at all if the IssueCmds are currently passive?**
70//
71// Same reasoning as F.5: the renderer-visible value is the
72// `SagaStarted` / `SagaCompleted` bracket. Subscribers see "saga
73// foo running" and can buffer related events for that saga_id
74// until the bracket closes. That works the same way whether the
75// launcher actively dispatches the cleanup or merely observes the
76// host doing it. When the cross-process pipe lands, only the
77// saga's `IssueCmd` handling needs to change — the renderer-facing
78// semantics are stable.
79//
80// **What if a terminal event never arrives?**
81//
82// Same behavior as F.5: the saga waits forever until the bus
83// closes (saga abandoned on launcher restart) or — once
84// per-saga timeouts land — a deadline force-fails it. F.6 keeps
85// behavior deliberately simple: if cleanup genuinely fails, the
86// next window close starts a fresh saga; the prior failed saga
87// stays in `in_flight` until the launcher exits or it gets
88// evicted by a same-kind successor (see `saga::mod.rs`'s
89// evict-and-replace policy from F.5 round 4).
90//
91// **Known limitation: concurrent-correlation** (inherited from
92// F.5). If two windows close simultaneously, the coordinator
93// broadcasts every event to every in-flight saga. The first
94// `PanesReaped` (or `PoolDrained`/`PoolNotLast`) advances ALL
95// matching sagas regardless of which window's close they belong
96// to — early `SagaCompleted` for one window's bracket, the other
97// window's terminal event left unbracketed.
98//
99// In practice this requires the user to close two windows in
100// rapid succession (under the host's `on_before_close` execution
101// latency, ~few ms). The user-visible consequence is a brief
102// renderer-side bracketing inconsistency; reducer + SQLite state
103// remain correct.
104//
105// Mitigation today: `saga::mod.rs::run_coordinator`'s
106// evict-and-replace policy (F.5 round 4) ensures only ONE
107// window-cleanup-cascade saga is in flight at a time. The second
108// `WindowClosed` evicts the first saga (emitting `SagaFailed`
109// with "evicted" reason), then starts a fresh one. Renderer-
110// visible: one premature `SagaFailed` + one correct
111// `SagaCompleted`. Same trade-off as F.5 documented for
112// concurrent promotes.
113//
114// Proper fix requires coordinator-level FIFO routing or per-saga
115// sequence-number correlation. Both depend on the cross-process
116// dispatch landing first (so the saga's `IssueCmd` is actually
117// causal). Closed alongside the launcher→host command pipe in a
118// future phase.
119
120use agentmux_common::ipc::{Command, Event};
121
122use super::{PipeTarget, Saga, SagaAction, SagaCtx};
123
124/// State of one in-flight window-cleanup-cascade saga.
125#[derive(Debug, Clone, PartialEq, Eq)]
126enum Phase {
127    /// Saga just constructed — waiting for the coordinator to call
128    /// `start`. The first `start` call transitions to `ReapingPanes`
129    /// and emits the `IssueCmd` for `ReapPanes`.
130    Initial,
131    /// `ReapPanes` has been issued; waiting for any
132    /// `Event::PanesReaped` that matches our window label.
133    ReapingPanes,
134    /// `DrainPoolIfLast` has been issued; waiting for either
135    /// `Event::PoolDrained` or `Event::PoolNotLast`. Both are
136    /// terminal — the saga doesn't care WHICH branch fires, only
137    /// that ONE of them does.
138    DrainingPool,
139}
140
141/// Window-cleanup-cascade saga: fires once per window close, drives
142/// the implicit pane-reap + pool-drain-decision flow into an
143/// explicit two-step state machine, then completes.
144pub struct WindowCleanupCascade {
145    /// Label of the window that closed (the trigger event's payload).
146    /// The saga uses this for label-matching on terminal events:
147    /// `PanesReaped { label }` only advances Step 1 when `label`
148    /// matches; same for `PoolDrained`/`PoolNotLast` in Step 2.
149    ///
150    /// Note: under the coordinator's evict-and-replace policy, only
151    /// one window-cleanup-cascade saga is ever in flight at a time,
152    /// so label-matching is technically redundant for correctness.
153    /// Keep it anyway as cheap defense-in-depth + a clear
154    /// invariant-statement: "this saga belongs to *this* window's
155    /// cleanup, not whoever's terminal event happens to land first."
156    closed_label: String,
157    /// Whether the host's drain decision said "yes, that was the
158    /// last window" (`PoolDrained` arm) or "no, more windows
159    /// remain" (`PoolNotLast` arm). `None` until Step 2 resolves.
160    /// Exported for tests.
161    drained_pool: Option<bool>,
162    phase: Phase,
163}
164
165impl WindowCleanupCascade {
166    /// Construct a fresh saga for a close of `closed_label`.
167    /// Coordinator allocates the saga_id and calls `start` once.
168    pub fn new(closed_label: String) -> Self {
169        Self {
170            closed_label,
171            drained_pool: None,
172            phase: Phase::Initial,
173        }
174    }
175
176    /// Whether the host's drain decision flagged this close as
177    /// "last user-visible window" (`true` → `PoolDrained` branch
178    /// fired) or not (`false` → `PoolNotLast` branch fired). `None`
179    /// before Step 2 resolves. Exported for tests.
180    #[cfg(test)]
181    pub fn drained_pool(&self) -> Option<bool> {
182        self.drained_pool
183    }
184
185    /// Label this saga is tracking. Exported for tests.
186    #[cfg(test)]
187    pub fn closed_label(&self) -> &str {
188        &self.closed_label
189    }
190}
191
192impl Saga for WindowCleanupCascade {
193    fn name(&self) -> &'static str {
194        "window_cleanup_cascade"
195    }
196
197    /// LSD-2 — record the closing window's label for `--diag sagas`.
198    /// `drained_pool` is None at start (only known after Step 2)
199    /// so we don't include it; the durable log captures the inputs
200    /// the saga was constructed with, not its evolving state — that
201    /// lives in step rows.
202    fn input_snapshot(&self) -> serde_json::Value {
203        serde_json::json!({ "closed_label": self.closed_label })
204    }
205
206    /// CPD-3 — override the default 5s saga timeout. Pane drain
207    /// (Stage 1 of wrr's two-stage close cascade) on a workspace
208    /// with many panes can legitimately take longer than 5s. Per
209    /// SPEC_CROSS_PROCESS_DISPATCH §3.10.
210    fn timeout(&self) -> std::time::Duration {
211        std::time::Duration::from_secs(30)
212    }
213
214    fn start(&mut self, _ctx: &SagaCtx) -> SagaAction {
215        // Step 1 — issue the ReapPanes command, transition into the
216        // wait state. F.6 routes the action to `PipeTarget::Host`
217        // for forward compatibility with the cross-process dispatch
218        // follow-up; today the coordinator logs the IssueCmd and
219        // doesn't actually transmit it on a launcher→host pipe (no
220        // such pipe exists yet — see module docstring).
221        debug_assert_eq!(self.phase, Phase::Initial);
222        self.phase = Phase::ReapingPanes;
223        // CPD-1 schema-only: saga_id placeholder (0) — coordinator's
224        // `apply_action` for `IssueCmd::Host` remains log-only;
225        // CPD-3 will inject the live saga's id at dispatch time.
226        SagaAction::IssueCmd {
227            target: PipeTarget::Host,
228            cmd: Command::ReapPanes {
229                label: self.closed_label.clone(),
230                saga_id: 0,
231            },
232        }
233    }
234
235    fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction {
236        // **CPD-4 — per-saga event correlation.** Filter
237        // `PanesReaped` / `PoolDrained` / `PoolNotLast` by `saga_id`:
238        // only events tagged with *this* saga's id advance us. The
239        // label match remains as defense-in-depth (events for the
240        // wrong window cannot end up with this saga's id, but the
241        // double-check costs nothing). Untagged events (organic host
242        // reports) and events from sibling concurrent sagas are
243        // ignored — retires the evict-and-replace workaround from
244        // PR #634 so two simultaneous `WindowClosed`s now produce
245        // two clean `SagaCompleted` brackets instead of one
246        // premature `SagaFailed { reason: "evicted" }` + one
247        // `SagaCompleted`.
248        match (&self.phase, event) {
249            // Step 1 → Step 2 transition. `PanesReaped` arrives from
250            // the host (`report_panes_reaped` inside
251            // `on_before_close`).
252            (Phase::ReapingPanes, Event::PanesReaped { label, saga_id, .. })
253                if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
254            {
255                self.phase = Phase::DrainingPool;
256                // CPD-1 schema-only: saga_id placeholder (0); CPD-3
257                // injects the live saga's id at dispatch time.
258                SagaAction::IssueCmd {
259                    target: PipeTarget::Host,
260                    cmd: Command::DrainPoolIfLast {
261                        label: self.closed_label.clone(),
262                        saga_id: 0,
263                    },
264                }
265            }
266            // Step 2 terminal — drain happened (last window closed).
267            (Phase::DrainingPool, Event::PoolDrained { label, saga_id, .. })
268                if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
269            {
270                self.drained_pool = Some(true);
271                SagaAction::Done
272            }
273            // Step 2 terminal — drain skipped (other windows remain).
274            // Equally a success: the saga's job is to bracket the
275            // drain *decision*, not enforce a particular outcome.
276            (Phase::DrainingPool, Event::PoolNotLast { label, saga_id, .. })
277                if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
278            {
279                self.drained_pool = Some(false);
280                SagaAction::Done
281            }
282            // Anything else: still waiting; or — for `Initial` —
283            // coordinator hasn't called `start` yet; or the event's
284            // saga_id doesn't match (concurrent saga or organic
285            // report). Either way: no-op.
286            _ => SagaAction::Wait,
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use std::sync::Arc;
295
296    fn ctx(saga_id: u64) -> SagaCtx {
297        SagaCtx { saga_id }
298    }
299
300    #[test]
301    fn start_issues_reap_panes_to_host() {
302        let mut saga = WindowCleanupCascade::new("main".into());
303        let action = saga.start(&ctx(7));
304        match action {
305            SagaAction::IssueCmd { target, cmd } => {
306                assert_eq!(target, PipeTarget::Host);
307                match cmd {
308                    Command::ReapPanes { label, .. } => assert_eq!(label, "main"),
309                    other => panic!("expected ReapPanes, got {:?}", other),
310                }
311            }
312            other => panic!(
313                "expected IssueCmd(ReapPanes) on start, got {:?}",
314                other
315            ),
316        }
317        assert_eq!(saga.closed_label(), "main");
318    }
319
320    #[test]
321    fn panes_reaped_advances_to_drain_pool_step() {
322        // CPD-4 — saga_id-tagged terminal event for *this* saga.
323        let mut saga = WindowCleanupCascade::new("main".into());
324        let _ = saga.start(&ctx(7));
325        let action = saga.on_event(
326            &Event::PanesReaped {
327                label: "main".into(),
328                version: 100,
329                saga_id: Some(7),
330            },
331            &ctx(7),
332        );
333        match action {
334            SagaAction::IssueCmd { target, cmd } => {
335                assert_eq!(target, PipeTarget::Host);
336                match cmd {
337                    Command::DrainPoolIfLast { label, .. } => assert_eq!(label, "main"),
338                    other => panic!("expected DrainPoolIfLast, got {:?}", other),
339                }
340            }
341            other => panic!(
342                "expected IssueCmd(DrainPoolIfLast) on PanesReaped, got {:?}",
343                other
344            ),
345        }
346        // Drain decision not yet known.
347        assert_eq!(saga.drained_pool(), None);
348    }
349
350    #[test]
351    fn pool_drained_completes_the_saga_with_drain_flag_true() {
352        let mut saga = WindowCleanupCascade::new("main".into());
353        let _ = saga.start(&ctx(7));
354        let _ = saga.on_event(
355            &Event::PanesReaped {
356                label: "main".into(),
357                version: 100,
358                saga_id: Some(7),
359            },
360            &ctx(7),
361        );
362        let action = saga.on_event(
363            &Event::PoolDrained {
364                label: "main".into(),
365                version: 101,
366                saga_id: Some(7),
367            },
368            &ctx(7),
369        );
370        assert!(matches!(action, SagaAction::Done));
371        assert_eq!(saga.drained_pool(), Some(true));
372    }
373
374    #[test]
375    fn pool_not_last_completes_the_saga_with_drain_flag_false() {
376        let mut saga = WindowCleanupCascade::new("secondary".into());
377        let _ = saga.start(&ctx(7));
378        let _ = saga.on_event(
379            &Event::PanesReaped {
380                label: "secondary".into(),
381                version: 100,
382                saga_id: Some(7),
383            },
384            &ctx(7),
385        );
386        let action = saga.on_event(
387            &Event::PoolNotLast {
388                label: "secondary".into(),
389                version: 101,
390                saga_id: Some(7),
391            },
392            &ctx(7),
393        );
394        assert!(matches!(action, SagaAction::Done));
395        assert_eq!(saga.drained_pool(), Some(false));
396    }
397
398    /// CPD-4 — terminal events with `saga_id: None` (organic host
399    /// reports) must NOT advance the saga. Pre-CPD-4 ANY label-
400    /// matching `PanesReaped` would advance Step 1; per-saga
401    /// correlation now scopes terminal events to their originating
402    /// saga.
403    #[test]
404    fn organic_panes_reaped_does_not_advance_saga() {
405        let mut saga = WindowCleanupCascade::new("main".into());
406        let _ = saga.start(&ctx(7));
407        let action = saga.on_event(
408            &Event::PanesReaped {
409                label: "main".into(),
410                version: 100,
411                saga_id: None,
412            },
413            &ctx(7),
414        );
415        assert!(matches!(action, SagaAction::Wait));
416    }
417
418    /// CPD-4 — a terminal event tagged with a *foreign* saga_id
419    /// (sibling concurrent cleanup-cascade saga) is ignored. This is
420    /// the invariant that retires evict-and-replace.
421    #[test]
422    fn foreign_saga_id_panes_reaped_does_not_advance_saga() {
423        let mut saga = WindowCleanupCascade::new("main".into());
424        let _ = saga.start(&ctx(7));
425        let action = saga.on_event(
426            &Event::PanesReaped {
427                label: "main".into(),
428                version: 100,
429                saga_id: Some(99),
430            },
431            &ctx(7),
432        );
433        assert!(matches!(action, SagaAction::Wait));
434    }
435
436    #[test]
437    fn unrelated_events_keep_saga_waiting() {
438        let mut saga = WindowCleanupCascade::new("main".into());
439        let _ = saga.start(&ctx(7));
440
441        // WindowOpened — wholly unrelated.
442        let unrelated = Event::WindowOpened {
443            label: "other".into(),
444            kind: agentmux_common::ipc::WindowKind::FullInstance,
445            parent_label: None,
446            version: 50,
447        };
448        let action = saga.on_event(&unrelated, &ctx(7));
449        assert!(matches!(action, SagaAction::Wait));
450
451        // PanesReaped for a DIFFERENT label — must not advance us
452        // (defense-in-depth label match).
453        let action = saga.on_event(
454            &Event::PanesReaped {
455                label: "different-window".into(),
456                version: 51,
457                saga_id: Some(7),
458            },
459            &ctx(7),
460        );
461        assert!(matches!(action, SagaAction::Wait));
462
463        // PoolDrained while we're still in Step 1 — must not advance
464        // us (wrong phase).
465        let action = saga.on_event(
466            &Event::PoolDrained {
467                label: "main".into(),
468                version: 52,
469                saga_id: Some(7),
470            },
471            &ctx(7),
472        );
473        assert!(matches!(action, SagaAction::Wait));
474
475        // The trigger itself (WindowClosed) is what the coordinator
476        // uses to START the saga; the saga MUST NOT mistake a stray
477        // WindowClosed (e.g. from another close) for a step
478        // advancement.
479        let action = saga.on_event(
480            &Event::WindowClosed {
481                label: "main".into(),
482                version: 53,
483                crash_detected: false,
484            },
485            &ctx(7),
486        );
487        assert!(matches!(action, SagaAction::Wait));
488    }
489
490    #[test]
491    fn label_mismatch_in_drain_phase_keeps_saga_waiting() {
492        let mut saga = WindowCleanupCascade::new("main".into());
493        let _ = saga.start(&ctx(7));
494        let _ = saga.on_event(
495            &Event::PanesReaped {
496                label: "main".into(),
497                version: 100,
498                saga_id: Some(7),
499            },
500            &ctx(7),
501        );
502        // Now in DrainingPool. A drain event for a different label
503        // must NOT terminate this saga.
504        let action = saga.on_event(
505            &Event::PoolDrained {
506                label: "other".into(),
507                version: 101,
508                saga_id: Some(7),
509            },
510            &ctx(7),
511        );
512        assert!(matches!(action, SagaAction::Wait));
513        let action = saga.on_event(
514            &Event::PoolNotLast {
515                label: "other".into(),
516                version: 102,
517                saga_id: Some(7),
518            },
519            &ctx(7),
520        );
521        assert!(matches!(action, SagaAction::Wait));
522        // Drain decision still unresolved.
523        assert_eq!(saga.drained_pool(), None);
524    }
525
526    /// End-to-end coordinator integration: emit `WindowClosed`,
527    /// watch the coordinator start the saga, observe the IssueCmds
528    /// via log, emit synthetic `PanesReaped` then
529    /// `PoolDrained`/`PoolNotLast`, watch the coordinator emit
530    /// `SagaStarted` then `SagaCompleted` bracket events on the bus.
531    #[tokio::test]
532    async fn coordinator_brackets_close_with_saga_lifecycle_events() {
533        use crate::saga::{run_coordinator, SagaCoordinator};
534
535        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
536        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
537        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
538
539        let mut witness = events_tx.subscribe();
540        let coord_rx = events_tx.subscribe();
541        let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
542
543        // Yield so coordinator's recv loop is parked before publish.
544        tokio::task::yield_now().await;
545
546        // Trigger.
547        let _ = events_tx.send(Event::WindowClosed {
548            label: "main".into(),
549            version: 1,
550            crash_detected: false,
551        });
552
553        // CPD-4: wait for SagaStarted to learn the coordinator-
554        // allocated saga_id, then publish step-1 + step-2 terminals
555        // tagged with that id.
556        let mut saga_id_started: Option<u64> = None;
557        let mut saw_started = false;
558        let mut saw_completed_after_started = false;
559        let mut sent_terminals = false;
560        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
561        while std::time::Instant::now() < deadline {
562            match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
563            {
564                Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
565                    if name == "window_cleanup_cascade" {
566                        saw_started = true;
567                        saga_id_started = Some(saga_id);
568                        if !sent_terminals {
569                            // Step 1 terminal.
570                            let _ = events_tx.send(Event::PanesReaped {
571                                label: "main".into(),
572                                version: 2,
573                                saga_id: Some(saga_id),
574                            });
575                            // Step 2 terminal — pick the "drained"
576                            // branch.
577                            let _ = events_tx.send(Event::PoolDrained {
578                                label: "main".into(),
579                                version: 3,
580                                saga_id: Some(saga_id),
581                            });
582                            sent_terminals = true;
583                        }
584                    }
585                }
586                Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
587                    if saw_started && Some(saga_id) == saga_id_started {
588                        saw_completed_after_started = true;
589                        break;
590                    }
591                }
592                Ok(Ok(_)) => {}
593                Ok(Err(_)) => break,
594                Err(_) => continue,
595            }
596        }
597        assert!(
598            saw_started,
599            "expected coordinator to emit Event::SagaStarted for window_cleanup_cascade"
600        );
601        assert!(
602            saw_completed_after_started,
603            "expected coordinator to emit Event::SagaCompleted after SagaStarted"
604        );
605    }
606
607    /// **CPD-4 — concurrent same-kind sagas correlate cleanly.**
608    /// Two `WindowClosed` events arrive in close succession; both
609    /// sagas coexist in the registry. Each consumes only its own
610    /// saga_id-tagged terminal events. Both produce a clean
611    /// `SagaStarted` + `SagaCompleted` bracket — no premature
612    /// `SagaFailed { reason: "evicted" }` from the retired
613    /// evict-and-replace policy (PR #634).
614    ///
615    /// Pre-CPD-4 this test (formerly
616    /// `coordinator_evicts_on_concurrent_window_close`) asserted the
617    /// inverse: one premature `SagaFailed` + one `SagaCompleted`. The
618    /// behavioral inversion is the core of CPD-4.
619    #[tokio::test]
620    async fn coordinator_concurrent_window_close_runs_two_sagas_to_completion() {
621        use crate::saga::{run_coordinator, SagaCoordinator};
622
623        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
624        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
625        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
626
627        let mut witness = events_tx.subscribe();
628        let coord_rx = events_tx.subscribe();
629        let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
630
631        tokio::task::yield_now().await;
632
633        // First close → starts saga A.
634        let _ = events_tx.send(Event::WindowClosed {
635            label: "main".into(),
636            version: 1,
637            crash_detected: false,
638        });
639        // Second close BEFORE saga A's terminals arrive → with
640        // evict-and-replace removed, saga B coexists with saga A.
641        let _ = events_tx.send(Event::WindowClosed {
642            label: "secondary".into(),
643            version: 2,
644            crash_detected: false,
645        });
646
647        // Drain the bus: capture both saga_ids from SagaStarted, then
648        // send each saga's terminal events tagged with its own id.
649        // Track a per-saga state machine: we send PanesReaped only
650        // after the saga's start is observed, then PoolDrained/
651        // PoolNotLast.
652        let mut saga_a_id: Option<u64> = None;
653        let mut saga_b_id: Option<u64> = None;
654        let mut saga_a_complete = false;
655        let mut saga_b_complete = false;
656        let mut saga_a_panes_sent = false;
657        let mut saga_b_panes_sent = false;
658        let mut version: u64 = 100;
659        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
660        while std::time::Instant::now() < deadline {
661            match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
662            {
663                Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
664                    if name == "window_cleanup_cascade" {
665                        if saga_a_id.is_none() {
666                            saga_a_id = Some(saga_id);
667                            // Saga A's PanesReaped (Step 1 terminal).
668                            version += 1;
669                            let _ = events_tx.send(Event::PanesReaped {
670                                label: "main".into(),
671                                version,
672                                saga_id: Some(saga_id),
673                            });
674                            saga_a_panes_sent = true;
675                        } else if saga_b_id.is_none() {
676                            saga_b_id = Some(saga_id);
677                            version += 1;
678                            let _ = events_tx.send(Event::PanesReaped {
679                                label: "secondary".into(),
680                                version,
681                                saga_id: Some(saga_id),
682                            });
683                            saga_b_panes_sent = true;
684                        }
685                    }
686                }
687                Ok(Ok(Event::PanesReaped { saga_id: Some(sid), .. })) => {
688                    // After Step 1 echo lands, send Step 2 terminal
689                    // tagged with the same saga id.
690                    if Some(sid) == saga_a_id && saga_a_panes_sent {
691                        version += 1;
692                        let _ = events_tx.send(Event::PoolDrained {
693                            label: "main".into(),
694                            version,
695                            saga_id: Some(sid),
696                        });
697                    } else if Some(sid) == saga_b_id && saga_b_panes_sent {
698                        version += 1;
699                        let _ = events_tx.send(Event::PoolNotLast {
700                            label: "secondary".into(),
701                            version,
702                            saga_id: Some(sid),
703                        });
704                    }
705                }
706                Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
707                    if Some(saga_id) == saga_a_id {
708                        saga_a_complete = true;
709                    } else if Some(saga_id) == saga_b_id {
710                        saga_b_complete = true;
711                    }
712                    if saga_a_complete && saga_b_complete {
713                        break;
714                    }
715                }
716                Ok(Ok(Event::SagaFailed { reason, saga_id, .. })) => {
717                    panic!(
718                        "CPD-4 invariant violated: saga_id={} got SagaFailed reason={} — \
719                         evict-and-replace should be retired so concurrent sagas no \
720                         longer cross-talk",
721                        saga_id, reason,
722                    );
723                }
724                Ok(Ok(_)) => {}
725                Ok(Err(_)) => break,
726                Err(_) => continue,
727            }
728        }
729
730        assert!(
731            saga_a_id.is_some(),
732            "expected saga A's SagaStarted from first WindowClosed"
733        );
734        assert!(
735            saga_b_id.is_some(),
736            "expected saga B's SagaStarted from second WindowClosed (no eviction)"
737        );
738        assert_ne!(saga_a_id, saga_b_id, "concurrent sagas must have distinct ids");
739        assert!(
740            saga_a_complete,
741            "saga A (label=main) should complete cleanly with its own saga_id-tagged terminals"
742        );
743        assert!(
744            saga_b_complete,
745            "saga B (label=secondary) should complete cleanly with its own saga_id-tagged terminals"
746        );
747    }
748}