agentmux_launcher\saga/window_cleanup.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase F.6 — window-cleanup cascade saga.
5//
6// **What this saga does**
7//
8// When a user-visible top-level window closes, two cleanup steps
9// happen implicitly in the host today:
10//
11// 1. Browser-pane HWNDs that belong to the closing window get
12// reaped (lifecycle entries drained, pane HWND map cleared,
13// subwindow cascade closes children).
14// 2. If that window was the LAST user-visible window, Stage 1 of
15// the wrr two-stage close cascade in
16// `agentmux-cef::client::on_before_close` posts WM_CLOSE to
17// every warm-pool browser so the app can drain to zero.
18// Otherwise the pool stays warm.
19//
20// Both steps fire inside the same `on_before_close` body. Renderers
21// that want to buffer "this window is closing AND its post-close
22// cleanup is settling" atomically have nothing to pivot on today —
23// the launcher's `Event::WindowClosed` is the only signal, and it
24// fires BEFORE the cleanup steps complete.
25//
26// This saga formalizes the implicit cleanup as an explicit state
27// machine so the renderer sees a `SagaStarted` / `SagaCompleted`
28// bracket. Implementing the spec sketch from
29// `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.2:
30//
31// saga WindowCleanupCascade(closed_label):
32// Step 1 — on Event::WindowClosed { label }:
33// issue Command::ReapPanes { label } → host
34// wait for: Event::PanesReaped { label }
35// Step 2 — issue Command::DrainPoolIfLast { label } → host
36// wait for: Event::PoolDrained or Event::PoolNotLast
37// Step 3 — Done
38//
39// Compensation: none (cleanup failure is logged; next close
40// retries).
41//
42// **Scope of F.6 (this PR)**
43//
44// F.6 ships:
45// - the saga state machine itself (this file),
46// - the wire types (`Command::ReapPanes`,
47// `Command::DrainPoolIfLast`, `Command::ReportPanesReaped`,
48// `Command::ReportPoolDrainDecision`, `Event::PanesReaped`,
49// `Event::PoolDrained`, `Event::PoolNotLast`),
50// - host-side `report_panes_reaped` + `report_pool_drain_decision`
51// calls inside `on_before_close`,
52// - launcher reducer arms translating the reports into the typed
53// events,
54// - coordinator wiring that registers the saga as a consumer of
55// `Event::WindowClosed`.
56//
57// F.6 does NOT ship:
58// - F.7 (cleanup audit + property tests).
59// - Launcher-side saga durability — separate concern.
60//
61// **CPD-3 update.** Both `IssueCmd::Host` actions
62// (`Command::ReapPanes` and `Command::DrainPoolIfLast`) are now LIVE:
63// the coordinator dispatches them through `HostPipe::send_command()`
64// (see `agentmux-launcher/src/saga/mod.rs::apply_action`). This saga
65// also overrides `Saga::timeout()` to 30s (vs. 5s default) since
66// pane drain on a workspace with many panes can legitimately take
67// that long — see SPEC_CROSS_PROCESS_DISPATCH §3.10.
68//
69// **Why a saga at all if the IssueCmds are currently passive?**
70//
71// Same reasoning as F.5: the renderer-visible value is the
72// `SagaStarted` / `SagaCompleted` bracket. Subscribers see "saga
73// foo running" and can buffer related events for that saga_id
74// until the bracket closes. That works the same way whether the
75// launcher actively dispatches the cleanup or merely observes the
76// host doing it. When the cross-process pipe lands, only the
77// saga's `IssueCmd` handling needs to change — the renderer-facing
78// semantics are stable.
79//
80// **What if a terminal event never arrives?**
81//
82// Same behavior as F.5: the saga waits forever until the bus
83// closes (saga abandoned on launcher restart) or — once
84// per-saga timeouts land — a deadline force-fails it. F.6 keeps
85// behavior deliberately simple: if cleanup genuinely fails, the
86// next window close starts a fresh saga; the prior failed saga
87// stays in `in_flight` until the launcher exits or it gets
88// evicted by a same-kind successor (see `saga::mod.rs`'s
89// evict-and-replace policy from F.5 round 4).
90//
91// **Known limitation: concurrent-correlation** (inherited from
92// F.5). If two windows close simultaneously, the coordinator
93// broadcasts every event to every in-flight saga. The first
94// `PanesReaped` (or `PoolDrained`/`PoolNotLast`) advances ALL
95// matching sagas regardless of which window's close they belong
96// to — early `SagaCompleted` for one window's bracket, the other
97// window's terminal event left unbracketed.
98//
99// In practice this requires the user to close two windows in
100// rapid succession (under the host's `on_before_close` execution
101// latency, ~few ms). The user-visible consequence is a brief
102// renderer-side bracketing inconsistency; reducer + SQLite state
103// remain correct.
104//
105// Mitigation today: `saga::mod.rs::run_coordinator`'s
106// evict-and-replace policy (F.5 round 4) ensures only ONE
107// window-cleanup-cascade saga is in flight at a time. The second
108// `WindowClosed` evicts the first saga (emitting `SagaFailed`
109// with "evicted" reason), then starts a fresh one. Renderer-
110// visible: one premature `SagaFailed` + one correct
111// `SagaCompleted`. Same trade-off as F.5 documented for
112// concurrent promotes.
113//
114// Proper fix requires coordinator-level FIFO routing or per-saga
115// sequence-number correlation. Both depend on the cross-process
116// dispatch landing first (so the saga's `IssueCmd` is actually
117// causal). Closed alongside the launcher→host command pipe in a
118// future phase.
119
120use agentmux_common::ipc::{Command, Event};
121
122use super::{PipeTarget, Saga, SagaAction, SagaCtx};
123
124/// State of one in-flight window-cleanup-cascade saga.
125#[derive(Debug, Clone, PartialEq, Eq)]
126enum Phase {
127 /// Saga just constructed — waiting for the coordinator to call
128 /// `start`. The first `start` call transitions to `ReapingPanes`
129 /// and emits the `IssueCmd` for `ReapPanes`.
130 Initial,
131 /// `ReapPanes` has been issued; waiting for any
132 /// `Event::PanesReaped` that matches our window label.
133 ReapingPanes,
134 /// `DrainPoolIfLast` has been issued; waiting for either
135 /// `Event::PoolDrained` or `Event::PoolNotLast`. Both are
136 /// terminal — the saga doesn't care WHICH branch fires, only
137 /// that ONE of them does.
138 DrainingPool,
139}
140
141/// Window-cleanup-cascade saga: fires once per window close, drives
142/// the implicit pane-reap + pool-drain-decision flow into an
143/// explicit two-step state machine, then completes.
144pub struct WindowCleanupCascade {
145 /// Label of the window that closed (the trigger event's payload).
146 /// The saga uses this for label-matching on terminal events:
147 /// `PanesReaped { label }` only advances Step 1 when `label`
148 /// matches; same for `PoolDrained`/`PoolNotLast` in Step 2.
149 ///
150 /// Note: under the coordinator's evict-and-replace policy, only
151 /// one window-cleanup-cascade saga is ever in flight at a time,
152 /// so label-matching is technically redundant for correctness.
153 /// Keep it anyway as cheap defense-in-depth + a clear
154 /// invariant-statement: "this saga belongs to *this* window's
155 /// cleanup, not whoever's terminal event happens to land first."
156 closed_label: String,
157 /// Whether the host's drain decision said "yes, that was the
158 /// last window" (`PoolDrained` arm) or "no, more windows
159 /// remain" (`PoolNotLast` arm). `None` until Step 2 resolves.
160 /// Exported for tests.
161 drained_pool: Option<bool>,
162 phase: Phase,
163}
164
165impl WindowCleanupCascade {
166 /// Construct a fresh saga for a close of `closed_label`.
167 /// Coordinator allocates the saga_id and calls `start` once.
168 pub fn new(closed_label: String) -> Self {
169 Self {
170 closed_label,
171 drained_pool: None,
172 phase: Phase::Initial,
173 }
174 }
175
176 /// Whether the host's drain decision flagged this close as
177 /// "last user-visible window" (`true` → `PoolDrained` branch
178 /// fired) or not (`false` → `PoolNotLast` branch fired). `None`
179 /// before Step 2 resolves. Exported for tests.
180 #[cfg(test)]
181 pub fn drained_pool(&self) -> Option<bool> {
182 self.drained_pool
183 }
184
185 /// Label this saga is tracking. Exported for tests.
186 #[cfg(test)]
187 pub fn closed_label(&self) -> &str {
188 &self.closed_label
189 }
190}
191
192impl Saga for WindowCleanupCascade {
193 fn name(&self) -> &'static str {
194 "window_cleanup_cascade"
195 }
196
197 /// LSD-2 — record the closing window's label for `--diag sagas`.
198 /// `drained_pool` is None at start (only known after Step 2)
199 /// so we don't include it; the durable log captures the inputs
200 /// the saga was constructed with, not its evolving state — that
201 /// lives in step rows.
202 fn input_snapshot(&self) -> serde_json::Value {
203 serde_json::json!({ "closed_label": self.closed_label })
204 }
205
206 /// CPD-3 — override the default 5s saga timeout. Pane drain
207 /// (Stage 1 of wrr's two-stage close cascade) on a workspace
208 /// with many panes can legitimately take longer than 5s. Per
209 /// SPEC_CROSS_PROCESS_DISPATCH §3.10.
210 fn timeout(&self) -> std::time::Duration {
211 std::time::Duration::from_secs(30)
212 }
213
214 fn start(&mut self, _ctx: &SagaCtx) -> SagaAction {
215 // Step 1 — issue the ReapPanes command, transition into the
216 // wait state. F.6 routes the action to `PipeTarget::Host`
217 // for forward compatibility with the cross-process dispatch
218 // follow-up; today the coordinator logs the IssueCmd and
219 // doesn't actually transmit it on a launcher→host pipe (no
220 // such pipe exists yet — see module docstring).
221 debug_assert_eq!(self.phase, Phase::Initial);
222 self.phase = Phase::ReapingPanes;
223 // CPD-1 schema-only: saga_id placeholder (0) — coordinator's
224 // `apply_action` for `IssueCmd::Host` remains log-only;
225 // CPD-3 will inject the live saga's id at dispatch time.
226 SagaAction::IssueCmd {
227 target: PipeTarget::Host,
228 cmd: Command::ReapPanes {
229 label: self.closed_label.clone(),
230 saga_id: 0,
231 },
232 }
233 }
234
235 fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction {
236 // **CPD-4 — per-saga event correlation.** Filter
237 // `PanesReaped` / `PoolDrained` / `PoolNotLast` by `saga_id`:
238 // only events tagged with *this* saga's id advance us. The
239 // label match remains as defense-in-depth (events for the
240 // wrong window cannot end up with this saga's id, but the
241 // double-check costs nothing). Untagged events (organic host
242 // reports) and events from sibling concurrent sagas are
243 // ignored — retires the evict-and-replace workaround from
244 // PR #634 so two simultaneous `WindowClosed`s now produce
245 // two clean `SagaCompleted` brackets instead of one
246 // premature `SagaFailed { reason: "evicted" }` + one
247 // `SagaCompleted`.
248 match (&self.phase, event) {
249 // Step 1 → Step 2 transition. `PanesReaped` arrives from
250 // the host (`report_panes_reaped` inside
251 // `on_before_close`).
252 (Phase::ReapingPanes, Event::PanesReaped { label, saga_id, .. })
253 if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
254 {
255 self.phase = Phase::DrainingPool;
256 // CPD-1 schema-only: saga_id placeholder (0); CPD-3
257 // injects the live saga's id at dispatch time.
258 SagaAction::IssueCmd {
259 target: PipeTarget::Host,
260 cmd: Command::DrainPoolIfLast {
261 label: self.closed_label.clone(),
262 saga_id: 0,
263 },
264 }
265 }
266 // Step 2 terminal — drain happened (last window closed).
267 (Phase::DrainingPool, Event::PoolDrained { label, saga_id, .. })
268 if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
269 {
270 self.drained_pool = Some(true);
271 SagaAction::Done
272 }
273 // Step 2 terminal — drain skipped (other windows remain).
274 // Equally a success: the saga's job is to bracket the
275 // drain *decision*, not enforce a particular outcome.
276 (Phase::DrainingPool, Event::PoolNotLast { label, saga_id, .. })
277 if *saga_id == Some(ctx.saga_id) && label == &self.closed_label =>
278 {
279 self.drained_pool = Some(false);
280 SagaAction::Done
281 }
282 // Anything else: still waiting; or — for `Initial` —
283 // coordinator hasn't called `start` yet; or the event's
284 // saga_id doesn't match (concurrent saga or organic
285 // report). Either way: no-op.
286 _ => SagaAction::Wait,
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use std::sync::Arc;
295
296 fn ctx(saga_id: u64) -> SagaCtx {
297 SagaCtx { saga_id }
298 }
299
300 #[test]
301 fn start_issues_reap_panes_to_host() {
302 let mut saga = WindowCleanupCascade::new("main".into());
303 let action = saga.start(&ctx(7));
304 match action {
305 SagaAction::IssueCmd { target, cmd } => {
306 assert_eq!(target, PipeTarget::Host);
307 match cmd {
308 Command::ReapPanes { label, .. } => assert_eq!(label, "main"),
309 other => panic!("expected ReapPanes, got {:?}", other),
310 }
311 }
312 other => panic!(
313 "expected IssueCmd(ReapPanes) on start, got {:?}",
314 other
315 ),
316 }
317 assert_eq!(saga.closed_label(), "main");
318 }
319
320 #[test]
321 fn panes_reaped_advances_to_drain_pool_step() {
322 // CPD-4 — saga_id-tagged terminal event for *this* saga.
323 let mut saga = WindowCleanupCascade::new("main".into());
324 let _ = saga.start(&ctx(7));
325 let action = saga.on_event(
326 &Event::PanesReaped {
327 label: "main".into(),
328 version: 100,
329 saga_id: Some(7),
330 },
331 &ctx(7),
332 );
333 match action {
334 SagaAction::IssueCmd { target, cmd } => {
335 assert_eq!(target, PipeTarget::Host);
336 match cmd {
337 Command::DrainPoolIfLast { label, .. } => assert_eq!(label, "main"),
338 other => panic!("expected DrainPoolIfLast, got {:?}", other),
339 }
340 }
341 other => panic!(
342 "expected IssueCmd(DrainPoolIfLast) on PanesReaped, got {:?}",
343 other
344 ),
345 }
346 // Drain decision not yet known.
347 assert_eq!(saga.drained_pool(), None);
348 }
349
350 #[test]
351 fn pool_drained_completes_the_saga_with_drain_flag_true() {
352 let mut saga = WindowCleanupCascade::new("main".into());
353 let _ = saga.start(&ctx(7));
354 let _ = saga.on_event(
355 &Event::PanesReaped {
356 label: "main".into(),
357 version: 100,
358 saga_id: Some(7),
359 },
360 &ctx(7),
361 );
362 let action = saga.on_event(
363 &Event::PoolDrained {
364 label: "main".into(),
365 version: 101,
366 saga_id: Some(7),
367 },
368 &ctx(7),
369 );
370 assert!(matches!(action, SagaAction::Done));
371 assert_eq!(saga.drained_pool(), Some(true));
372 }
373
374 #[test]
375 fn pool_not_last_completes_the_saga_with_drain_flag_false() {
376 let mut saga = WindowCleanupCascade::new("secondary".into());
377 let _ = saga.start(&ctx(7));
378 let _ = saga.on_event(
379 &Event::PanesReaped {
380 label: "secondary".into(),
381 version: 100,
382 saga_id: Some(7),
383 },
384 &ctx(7),
385 );
386 let action = saga.on_event(
387 &Event::PoolNotLast {
388 label: "secondary".into(),
389 version: 101,
390 saga_id: Some(7),
391 },
392 &ctx(7),
393 );
394 assert!(matches!(action, SagaAction::Done));
395 assert_eq!(saga.drained_pool(), Some(false));
396 }
397
398 /// CPD-4 — terminal events with `saga_id: None` (organic host
399 /// reports) must NOT advance the saga. Pre-CPD-4 ANY label-
400 /// matching `PanesReaped` would advance Step 1; per-saga
401 /// correlation now scopes terminal events to their originating
402 /// saga.
403 #[test]
404 fn organic_panes_reaped_does_not_advance_saga() {
405 let mut saga = WindowCleanupCascade::new("main".into());
406 let _ = saga.start(&ctx(7));
407 let action = saga.on_event(
408 &Event::PanesReaped {
409 label: "main".into(),
410 version: 100,
411 saga_id: None,
412 },
413 &ctx(7),
414 );
415 assert!(matches!(action, SagaAction::Wait));
416 }
417
418 /// CPD-4 — a terminal event tagged with a *foreign* saga_id
419 /// (sibling concurrent cleanup-cascade saga) is ignored. This is
420 /// the invariant that retires evict-and-replace.
421 #[test]
422 fn foreign_saga_id_panes_reaped_does_not_advance_saga() {
423 let mut saga = WindowCleanupCascade::new("main".into());
424 let _ = saga.start(&ctx(7));
425 let action = saga.on_event(
426 &Event::PanesReaped {
427 label: "main".into(),
428 version: 100,
429 saga_id: Some(99),
430 },
431 &ctx(7),
432 );
433 assert!(matches!(action, SagaAction::Wait));
434 }
435
436 #[test]
437 fn unrelated_events_keep_saga_waiting() {
438 let mut saga = WindowCleanupCascade::new("main".into());
439 let _ = saga.start(&ctx(7));
440
441 // WindowOpened — wholly unrelated.
442 let unrelated = Event::WindowOpened {
443 label: "other".into(),
444 kind: agentmux_common::ipc::WindowKind::FullInstance,
445 parent_label: None,
446 version: 50,
447 };
448 let action = saga.on_event(&unrelated, &ctx(7));
449 assert!(matches!(action, SagaAction::Wait));
450
451 // PanesReaped for a DIFFERENT label — must not advance us
452 // (defense-in-depth label match).
453 let action = saga.on_event(
454 &Event::PanesReaped {
455 label: "different-window".into(),
456 version: 51,
457 saga_id: Some(7),
458 },
459 &ctx(7),
460 );
461 assert!(matches!(action, SagaAction::Wait));
462
463 // PoolDrained while we're still in Step 1 — must not advance
464 // us (wrong phase).
465 let action = saga.on_event(
466 &Event::PoolDrained {
467 label: "main".into(),
468 version: 52,
469 saga_id: Some(7),
470 },
471 &ctx(7),
472 );
473 assert!(matches!(action, SagaAction::Wait));
474
475 // The trigger itself (WindowClosed) is what the coordinator
476 // uses to START the saga; the saga MUST NOT mistake a stray
477 // WindowClosed (e.g. from another close) for a step
478 // advancement.
479 let action = saga.on_event(
480 &Event::WindowClosed {
481 label: "main".into(),
482 version: 53,
483 crash_detected: false,
484 },
485 &ctx(7),
486 );
487 assert!(matches!(action, SagaAction::Wait));
488 }
489
490 #[test]
491 fn label_mismatch_in_drain_phase_keeps_saga_waiting() {
492 let mut saga = WindowCleanupCascade::new("main".into());
493 let _ = saga.start(&ctx(7));
494 let _ = saga.on_event(
495 &Event::PanesReaped {
496 label: "main".into(),
497 version: 100,
498 saga_id: Some(7),
499 },
500 &ctx(7),
501 );
502 // Now in DrainingPool. A drain event for a different label
503 // must NOT terminate this saga.
504 let action = saga.on_event(
505 &Event::PoolDrained {
506 label: "other".into(),
507 version: 101,
508 saga_id: Some(7),
509 },
510 &ctx(7),
511 );
512 assert!(matches!(action, SagaAction::Wait));
513 let action = saga.on_event(
514 &Event::PoolNotLast {
515 label: "other".into(),
516 version: 102,
517 saga_id: Some(7),
518 },
519 &ctx(7),
520 );
521 assert!(matches!(action, SagaAction::Wait));
522 // Drain decision still unresolved.
523 assert_eq!(saga.drained_pool(), None);
524 }
525
526 /// End-to-end coordinator integration: emit `WindowClosed`,
527 /// watch the coordinator start the saga, observe the IssueCmds
528 /// via log, emit synthetic `PanesReaped` then
529 /// `PoolDrained`/`PoolNotLast`, watch the coordinator emit
530 /// `SagaStarted` then `SagaCompleted` bracket events on the bus.
531 #[tokio::test]
532 async fn coordinator_brackets_close_with_saga_lifecycle_events() {
533 use crate::saga::{run_coordinator, SagaCoordinator};
534
535 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
536 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
537 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
538
539 let mut witness = events_tx.subscribe();
540 let coord_rx = events_tx.subscribe();
541 let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
542
543 // Yield so coordinator's recv loop is parked before publish.
544 tokio::task::yield_now().await;
545
546 // Trigger.
547 let _ = events_tx.send(Event::WindowClosed {
548 label: "main".into(),
549 version: 1,
550 crash_detected: false,
551 });
552
553 // CPD-4: wait for SagaStarted to learn the coordinator-
554 // allocated saga_id, then publish step-1 + step-2 terminals
555 // tagged with that id.
556 let mut saga_id_started: Option<u64> = None;
557 let mut saw_started = false;
558 let mut saw_completed_after_started = false;
559 let mut sent_terminals = false;
560 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
561 while std::time::Instant::now() < deadline {
562 match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
563 {
564 Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
565 if name == "window_cleanup_cascade" {
566 saw_started = true;
567 saga_id_started = Some(saga_id);
568 if !sent_terminals {
569 // Step 1 terminal.
570 let _ = events_tx.send(Event::PanesReaped {
571 label: "main".into(),
572 version: 2,
573 saga_id: Some(saga_id),
574 });
575 // Step 2 terminal — pick the "drained"
576 // branch.
577 let _ = events_tx.send(Event::PoolDrained {
578 label: "main".into(),
579 version: 3,
580 saga_id: Some(saga_id),
581 });
582 sent_terminals = true;
583 }
584 }
585 }
586 Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
587 if saw_started && Some(saga_id) == saga_id_started {
588 saw_completed_after_started = true;
589 break;
590 }
591 }
592 Ok(Ok(_)) => {}
593 Ok(Err(_)) => break,
594 Err(_) => continue,
595 }
596 }
597 assert!(
598 saw_started,
599 "expected coordinator to emit Event::SagaStarted for window_cleanup_cascade"
600 );
601 assert!(
602 saw_completed_after_started,
603 "expected coordinator to emit Event::SagaCompleted after SagaStarted"
604 );
605 }
606
607 /// **CPD-4 — concurrent same-kind sagas correlate cleanly.**
608 /// Two `WindowClosed` events arrive in close succession; both
609 /// sagas coexist in the registry. Each consumes only its own
610 /// saga_id-tagged terminal events. Both produce a clean
611 /// `SagaStarted` + `SagaCompleted` bracket — no premature
612 /// `SagaFailed { reason: "evicted" }` from the retired
613 /// evict-and-replace policy (PR #634).
614 ///
615 /// Pre-CPD-4 this test (formerly
616 /// `coordinator_evicts_on_concurrent_window_close`) asserted the
617 /// inverse: one premature `SagaFailed` + one `SagaCompleted`. The
618 /// behavioral inversion is the core of CPD-4.
619 #[tokio::test]
620 async fn coordinator_concurrent_window_close_runs_two_sagas_to_completion() {
621 use crate::saga::{run_coordinator, SagaCoordinator};
622
623 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
624 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
625 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
626
627 let mut witness = events_tx.subscribe();
628 let coord_rx = events_tx.subscribe();
629 let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
630
631 tokio::task::yield_now().await;
632
633 // First close → starts saga A.
634 let _ = events_tx.send(Event::WindowClosed {
635 label: "main".into(),
636 version: 1,
637 crash_detected: false,
638 });
639 // Second close BEFORE saga A's terminals arrive → with
640 // evict-and-replace removed, saga B coexists with saga A.
641 let _ = events_tx.send(Event::WindowClosed {
642 label: "secondary".into(),
643 version: 2,
644 crash_detected: false,
645 });
646
647 // Drain the bus: capture both saga_ids from SagaStarted, then
648 // send each saga's terminal events tagged with its own id.
649 // Track a per-saga state machine: we send PanesReaped only
650 // after the saga's start is observed, then PoolDrained/
651 // PoolNotLast.
652 let mut saga_a_id: Option<u64> = None;
653 let mut saga_b_id: Option<u64> = None;
654 let mut saga_a_complete = false;
655 let mut saga_b_complete = false;
656 let mut saga_a_panes_sent = false;
657 let mut saga_b_panes_sent = false;
658 let mut version: u64 = 100;
659 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
660 while std::time::Instant::now() < deadline {
661 match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
662 {
663 Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
664 if name == "window_cleanup_cascade" {
665 if saga_a_id.is_none() {
666 saga_a_id = Some(saga_id);
667 // Saga A's PanesReaped (Step 1 terminal).
668 version += 1;
669 let _ = events_tx.send(Event::PanesReaped {
670 label: "main".into(),
671 version,
672 saga_id: Some(saga_id),
673 });
674 saga_a_panes_sent = true;
675 } else if saga_b_id.is_none() {
676 saga_b_id = Some(saga_id);
677 version += 1;
678 let _ = events_tx.send(Event::PanesReaped {
679 label: "secondary".into(),
680 version,
681 saga_id: Some(saga_id),
682 });
683 saga_b_panes_sent = true;
684 }
685 }
686 }
687 Ok(Ok(Event::PanesReaped { saga_id: Some(sid), .. })) => {
688 // After Step 1 echo lands, send Step 2 terminal
689 // tagged with the same saga id.
690 if Some(sid) == saga_a_id && saga_a_panes_sent {
691 version += 1;
692 let _ = events_tx.send(Event::PoolDrained {
693 label: "main".into(),
694 version,
695 saga_id: Some(sid),
696 });
697 } else if Some(sid) == saga_b_id && saga_b_panes_sent {
698 version += 1;
699 let _ = events_tx.send(Event::PoolNotLast {
700 label: "secondary".into(),
701 version,
702 saga_id: Some(sid),
703 });
704 }
705 }
706 Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
707 if Some(saga_id) == saga_a_id {
708 saga_a_complete = true;
709 } else if Some(saga_id) == saga_b_id {
710 saga_b_complete = true;
711 }
712 if saga_a_complete && saga_b_complete {
713 break;
714 }
715 }
716 Ok(Ok(Event::SagaFailed { reason, saga_id, .. })) => {
717 panic!(
718 "CPD-4 invariant violated: saga_id={} got SagaFailed reason={} — \
719 evict-and-replace should be retired so concurrent sagas no \
720 longer cross-talk",
721 saga_id, reason,
722 );
723 }
724 Ok(Ok(_)) => {}
725 Ok(Err(_)) => break,
726 Err(_) => continue,
727 }
728 }
729
730 assert!(
731 saga_a_id.is_some(),
732 "expected saga A's SagaStarted from first WindowClosed"
733 );
734 assert!(
735 saga_b_id.is_some(),
736 "expected saga B's SagaStarted from second WindowClosed (no eviction)"
737 );
738 assert_ne!(saga_a_id, saga_b_id, "concurrent sagas must have distinct ids");
739 assert!(
740 saga_a_complete,
741 "saga A (label=main) should complete cleanly with its own saga_id-tagged terminals"
742 );
743 assert!(
744 saga_b_complete,
745 "saga B (label=secondary) should complete cleanly with its own saga_id-tagged terminals"
746 );
747 }
748}