agentmux_launcher\saga/mod.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.1a — saga coordinator infrastructure.
5// Phase F.5 — first concrete saga consumer (`pool_respawn`).
6//
7// A saga is a state machine that orchestrates a multi-step,
8// multi-reducer flow (e.g. tear-off touches host pool + srv
9// workspace + launcher window registration). Sagas exist where
10// per-flow correctness needs explicit coordination — distributed
11// subscriber callbacks aren't enough.
12//
13// E.1a shipped this module as framework-only (no real sagas).
14// **F.5 lights up the first concrete saga (`pool_respawn`) plus the
15// minimal coordinator wiring needed to drive it**: the bus-
16// subscription loop now starts sagas in response to trigger events,
17// routes subsequent bus events into in-flight sagas via
18// `Saga::on_event`, dispatches `SagaAction` results, and emits
19// `SagaStarted` / `SagaCompleted` / `SagaFailed` brackets.
20//
21// Design (per `docs/specs/SPEC_PHASE_E_SRV_REDUCER_2026_04_29.md` §7
22// + `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.1):
23//
24// trait Saga {
25// fn start(&mut self, ctx: &SagaCtx) -> SagaAction;
26// fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction;
27// fn name(&self) -> &'static str;
28// }
29//
30// enum SagaAction {
31// IssueCmd { target: PipeTarget, cmd: Command },
32// Done,
33// Failed { reason: String },
34// Wait,
35// }
36//
37// The `SagaCoordinator` task subscribes to the broadcast bus,
38// routes events to in-flight sagas, dispatches IssueCmd actions to
39// the appropriate pipe, and emits `SagaStarted` / `SagaCompleted`
40// / `SagaFailed` so subscribers (renderer) can buffer-until-complete.
41//
42// **Cross-process dispatch — CPD-3 LIVE.** The launcher → host
43// command pipe is now wired via `HostPipe::send_command()` (CPD-2
44// + CPD-3). F.5's `pool_respawn` and F.6's `window_cleanup_cascade`
45// sagas issue `IssueCmd::Host` actions; the coordinator dispatches
46// them through the wire instead of merely logging. Per-saga
47// timeouts (`Saga::timeout()`, default 5s, F.6 overrides 30s)
48// + per-saga deadline tasks fire `SagaFailed` if a host action
49// stays unresolved past its budget. Host-emitted
50// `Command::ReportSagaActionFailed` translates into
51// `Event::SagaActionFailed` and terminates the matching saga.
52//
53// Per-variant `saga_id` tagging on existing Command/Event variants
54// is deferred; F.5's coordinator doesn't yet need it because:
55// - only one `pool_respawn` saga can be in flight per promote
56// (sagas are dispatched on `PoolWindowPromoted` 1:1);
57// - `Event::PoolWindowAdded` from the *implicit* refill is the
58// unique terminal signal — no foreign event type collides.
59// When concurrent promotes can happen (and produce overlapping
60// `PoolWindowAdded` streams), a future PR adds saga_id correlation.
61//
62// Saga state is in-memory. Launcher restart abandons in-flight
63// sagas; renderer-side timeouts cover the visible consequence.
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use agentmux_common::ipc::{Command, Event};
69
70use crate::host_pipe::HostPipe;
71
72pub mod pool_respawn;
73pub mod window_cleanup;
74
75#[cfg(test)]
76mod integration_tests;
77
78// LSD-1 (PR LSD-1) — durable launcher saga log + API. Foundations
79// only: the coordinator does NOT call any of these methods yet.
80// Module is declared here so it compiles + tests run; PR LSD-2 wires
81// the coordinator to write through `LauncherSagaLog` on every state
82// transition. See `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md`
83// §4 PR1 for the staged-rollout rationale.
84pub(crate) mod log;
85pub use log::LauncherSagaLog;
86use log::SagaOutcome;
87
88// LSD-3 — startup recovery walker for unresolved launcher sagas.
89// `main.rs::run_windows` calls `compensate_unresolved_launcher_sagas`
90// after opening the durable saga log and BEFORE spawning the saga
91// coordinator. See `saga/recovery.rs` for design notes.
92mod recovery;
93pub use recovery::compensate_unresolved_launcher_sagas;
94
95/// Where a `SagaAction::IssueCmd` should be dispatched.
96///
97/// `LauncherSelf` means "feed this command to the launcher's own
98/// reducer" (in-process); `Host` and `Srv` mean "forward to the
99/// peer's pipe."
100///
101/// **CPD-3 status:** `Host` is now LIVE — the saga coordinator's
102/// `apply_action` dispatches `IssueCmd::Host` actions through
103/// `HostPipe::send_command()` over the launcher → host wire.
104/// `LauncherSelf` and `Srv` remain reserved for class-D/E sagas.
105///
106/// F.7 cleanup audit: only `Host` is constructed today (F.5/F.6 saga
107/// IssueCmds). `LauncherSelf` and `Srv` are framework slots reserved
108/// for the cross-process dispatch follow-up phase per
109/// `SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §4.3. Allow stays.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111#[allow(dead_code)] // LauncherSelf + Srv reserved for future cross-process sagas
112pub enum PipeTarget {
113 LauncherSelf,
114 Host,
115 Srv,
116}
117
118/// What a saga decides to do next, returned from `start` /
119/// `on_event`. The coordinator drives the saga forward by reacting
120/// to this enum.
121///
122/// F.7 cleanup audit: `IssueCmd`, `Done`, `Wait` are all constructed
123/// by F.5 + F.6 sagas. `Failed` is consumed in `apply_action` (and
124/// its corresponding `emit_failed` is now actively called by the
125/// evict-and-replace path), but no shipped saga *constructs*
126/// `Failed` yet — sagas today only succeed or wait. Variant-level
127/// allow scopes the dead-code suppression precisely to that one
128/// reserved variant.
129#[derive(Debug)]
130pub enum SagaAction {
131 /// Dispatch a command on the target pipe; saga remains in flight.
132 IssueCmd {
133 target: PipeTarget,
134 cmd: Command,
135 },
136 /// Saga succeeded. Coordinator removes it from `in_flight`
137 /// and emits `Event::SagaCompleted`.
138 Done,
139 /// Saga failed irrecoverably. Coordinator emits `Event::SagaFailed`
140 /// after dispatching any compensation IssueCmds the saga issued
141 /// before returning Failed. Reserved for sagas with explicit
142 /// failure conditions (e.g. cross-process dispatch follow-up
143 /// + saga timeouts).
144 #[allow(dead_code)] // reserved for sagas that explicitly fail; F.5/F.6 only succeed or wait
145 Failed { reason: String },
146 /// Saga is waiting for an event it hasn't seen yet. No-op for
147 /// the coordinator until the next bus event.
148 Wait,
149}
150
151/// Read-only context passed to saga callbacks. Currently carries
152/// only the saga_id; kept as a struct (rather than a bare u64) so
153/// future fields can be added without touching every `Saga` impl.
154///
155/// F.7 cleanup audit: `saga_id` is unread by the shipped sagas
156/// (F.5/F.6 don't need it — coordinator already routes events
157/// per-registry). Reserved for the per-event saga_id correlation
158/// follow-up; allow stays.
159#[allow(dead_code)] // saga_id reserved for per-event correlation follow-up
160#[derive(Debug, Clone, Copy)]
161pub struct SagaCtx {
162 pub saga_id: u64,
163}
164
165/// A multi-step, multi-reducer state machine. Implementations
166/// describe one logical operation (tear-off, pool-respawn, etc.).
167///
168/// Lifecycle: coordinator calls `start` once when the saga is
169/// added to `in_flight`. After that, every event on the bus is
170/// passed to `on_event`. The saga inspects the event, advances
171/// its internal state, returns the next action.
172///
173/// **Identification:** sagas know which events belong to them by
174/// inspecting saga_id in lifecycle events or by matching patterns
175/// in event payloads (e.g. specific labels). F.5 sagas correlate
176/// by event type only (`pool_respawn` matches any
177/// `PoolWindowAdded` after start). Per-variant `saga_id` tagging
178/// on commands/events is deferred until concurrent same-type
179/// sagas are needed.
180pub trait Saga: Send {
181 fn start(&mut self, ctx: &SagaCtx) -> SagaAction;
182 fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction;
183 fn name(&self) -> &'static str;
184 /// LSD-2 — saga's input arguments serialized for the durable log's
185 /// `input_json` column. The coordinator calls this once at
186 /// `spawn_saga` time (before `start`) and writes the result via
187 /// `LauncherSagaLog::start_saga`. Operators see this in
188 /// `--diag sagas` output (e.g. `{"closed_label":"win-3"}`) so they
189 /// can tell which window's cleanup a recovered-failed saga
190 /// belonged to.
191 ///
192 /// Default `Value::Null` — sagas with no input fields can ignore
193 /// it. Concrete sagas should override with a `serde_json::json!`
194 /// of their constructor args.
195 fn input_snapshot(&self) -> serde_json::Value {
196 serde_json::Value::Null
197 }
198
199 /// CPD-3 — per-saga deadline budget for completing all
200 /// `IssueCmd`+wait cycles. Coordinator arms a timer when the saga
201 /// registers; if the saga is still in_flight when `timeout()`
202 /// elapses, it is force-failed (`SagaFailed { reason: "saga
203 /// timeout" }`) and removed from the registry.
204 ///
205 /// Default 5s — fits class-C single-step host dispatch (e.g.
206 /// pool respawn). `WindowCleanupCascade` overrides to 30s
207 /// because pane drain on a workspace with many panes can
208 /// legitimately take that long. Per spec §3.10.
209 fn timeout(&self) -> Duration {
210 Duration::from_secs(5)
211 }
212}
213
214/// Saga coordinator task.
215///
216/// Owns the registry of in-flight sagas, allocates saga ids,
217/// routes events to sagas, dispatches IssueCmd actions to the
218/// appropriate pipes, and emits lifecycle events on the broadcast
219/// bus.
220///
221/// F.5 adds the first real consumer (`pool_respawn`). The
222/// `in_flight` registry is now actually populated; the bus loop
223/// dispatches events into it.
224/// LSD-2 — coordinator's per-saga book-keeping. Wraps the boxed saga
225/// with the durable-log step bookkeeping the coordinator needs to call
226/// `LauncherSagaLog::finish_step` when the awaited bus event lands.
227///
228/// `awaiting_step` is `Some(idx)` when the saga is parked on a
229/// `Wait`-then-event pivot (the most recent `IssueCmd` allocated step
230/// `idx`); `None` between dispatches and after termination. Tracking
231/// it on the in-flight record (rather than inside each saga impl) keeps
232/// the durability concern out of saga authors' hands — they only deal
233/// with `SagaAction`.
234///
235/// `next_step_index` is the monotonic counter the coordinator
236/// `fetch_add(1)`s on every `IssueCmd` for this saga. Mirrors srv's
237/// `SagaCtx::step_index`. Lives per-saga (not coordinator-global) so
238/// concurrent sagas don't interleave indices in the log.
239struct InFlightSaga {
240 saga: Box<dyn Saga>,
241 awaiting_step: Option<u32>,
242 next_step_index: u32,
243}
244
245pub struct SagaCoordinator {
246 /// Monotonic saga-id allocator.
247 next_saga_id: std::sync::atomic::AtomicU64,
248 /// In-flight sagas keyed by saga_id, each wrapped with the
249 /// durable-log bookkeeping (LSD-2: `awaiting_step` +
250 /// `next_step_index`).
251 in_flight: tokio::sync::Mutex<std::collections::HashMap<u64, InFlightSaga>>,
252 /// Reference to the broadcast bus so the coordinator can emit
253 /// `SagaStarted` / `SagaCompleted` / `SagaFailed`.
254 events_tx: tokio::sync::broadcast::Sender<Event>,
255 /// Reference to the launcher's reducer state for `bump_version`
256 /// when emitting saga lifecycle events.
257 state: Arc<tokio::sync::Mutex<crate::state::State>>,
258 /// LSD-2 — durable saga log. `None` in tests that don't exercise
259 /// the durability path (the saga then logs + remains in flight,
260 /// preserving the pre-LSD-2 behavior tests rely on for end-to-end
261 /// bracket assertions). When `Some`, every saga lifecycle
262 /// transition (`spawn_saga` → `start_saga`, `IssueCmd` →
263 /// `start_step`, awaited-event consumed → `finish_step`,
264 /// `Done` → `terminate_saga(Completed)`, `Failed` / evicted →
265 /// `terminate_saga(Failed)`) writes to this log.
266 log: Option<Arc<LauncherSagaLog>>,
267 /// CPD-3 — launcher → host pipe wrapper. `apply_action` for
268 /// `IssueCmd::Host` dispatches via `host_pipe.send_command()` when
269 /// installed. `None` in tests that don't exercise host dispatch
270 /// (those drive sagas via synthetic terminal events on the bus).
271 host_pipe: Option<Arc<HostPipe>>,
272}
273
274impl SagaCoordinator {
275 pub fn new(
276 events_tx: tokio::sync::broadcast::Sender<Event>,
277 state: Arc<tokio::sync::Mutex<crate::state::State>>,
278 ) -> Self {
279 Self {
280 next_saga_id: std::sync::atomic::AtomicU64::new(1),
281 in_flight: tokio::sync::Mutex::new(std::collections::HashMap::new()),
282 events_tx,
283 state,
284 log: None,
285 host_pipe: None,
286 }
287 }
288
289 /// CPD-3 — install the host pipe so `IssueCmd::Host` actions are
290 /// dispatched live (instead of log-only). Builder-style setter so
291 /// existing tests + the `next_id_is_monotonic` smoke don't have to
292 /// construct a HostPipe. Production wiring (`main.rs`) calls this
293 /// once before `run_coordinator` is spawned.
294 pub fn with_host_pipe(mut self, host_pipe: Arc<HostPipe>) -> Self {
295 self.host_pipe = Some(host_pipe);
296 self
297 }
298
299 /// LSD-2 — install the durable saga log so saga lifecycle
300 /// transitions are persisted to `<data-dir>/db/launcher-sagas.db`.
301 /// Builder-style setter rather than a constructor parameter so
302 /// existing tests + the `next_id_is_monotonic` smoke don't have
303 /// to construct an in-memory log. Production wiring (in
304 /// `main.rs`) calls this once before `run_coordinator` is spawned.
305 pub fn with_log(mut self, log: Arc<LauncherSagaLog>) -> Result<Self, crate::saga::log::LogError> {
306 // Seed the saga_id allocator from the highest persisted id
307 // so a launcher restart cannot reuse an id that already
308 // exists in `launcher_saga`. Reusing would (a) fail new
309 // INSERTs on duplicate-PK, and (b) silently mutate prior
310 // saga rows via `terminate_saga` / `finish_step` UPDATEs
311 // keyed by saga_id — corrupting saga history + recovery
312 // diagnostics. (codex P1 PR #645 round 1.)
313 //
314 // On `max_saga_id()` error: return Err so caller (main.rs)
315 // can fail launcher startup loudly. Continuing with a default
316 // next_saga_id=1 while the log is still attached would leave
317 // the coordinator in the exact failure mode this seed is
318 // meant to prevent. (codex P1 PR #645 round 2.)
319 let max = log.max_saga_id()?;
320 self.next_saga_id
321 .store(max + 1, std::sync::atomic::Ordering::Relaxed);
322 if max > 0 {
323 crate::log(&format!(
324 "[saga] seeded next_saga_id={} from launcher_saga.max(saga_id)={}",
325 max + 1,
326 max
327 ));
328 }
329 self.log = Some(log);
330 Ok(self)
331 }
332
333 /// Allocate the next saga_id. Monotonic per launcher run.
334 pub fn next_id(&self) -> u64 {
335 self.next_saga_id
336 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
337 }
338
339 /// Bump the launcher's `event_version` for a coordinator-emitted
340 /// lifecycle event. Brief mutex hold — no I/O between lock and
341 /// drop.
342 async fn next_event_version(&self) -> u64 {
343 let mut state = self.state.lock().await;
344 state.bump_version()
345 }
346
347 /// Emit `Event::SagaStarted` after registering a saga.
348 async fn emit_started(&self, saga_id: u64, name: &'static str) {
349 let v = self.next_event_version().await;
350 let _ = self.events_tx.send(Event::SagaStarted {
351 saga_id,
352 name: name.to_string(),
353 version: v,
354 });
355 }
356
357 /// Emit `Event::SagaCompleted` after a saga returns `Done`.
358 async fn emit_completed(&self, saga_id: u64) {
359 let v = self.next_event_version().await;
360 let _ = self.events_tx.send(Event::SagaCompleted {
361 saga_id,
362 version: v,
363 });
364 }
365
366 /// Emit `Event::SagaFailed` after a saga returns `Failed` or
367 /// after evict-and-replace cancels a same-kind in-flight saga.
368 /// F.7 cleanup audit: prior `#[allow(dead_code)]` removed — F.6
369 /// evict-and-replace policy now actively dispatches this on
370 /// every concurrent same-kind retrigger.
371 async fn emit_failed(&self, saga_id: u64, reason: String) {
372 let v = self.next_event_version().await;
373 let _ = self.events_tx.send(Event::SagaFailed {
374 saga_id,
375 reason,
376 version: v,
377 });
378 }
379
380 /// Apply a `SagaAction` returned by `start` or `on_event`. Returns
381 /// an `ApplyOutcome` carrying:
382 /// - `in_flight`: true → caller keeps the saga in `in_flight`,
383 /// false → caller removes it.
384 /// - `awaiting_step`: `Some(idx)` if the action was an
385 /// `IssueCmd` that allocated step `idx` and the saga is now
386 /// parked waiting for the echo event. Caller stores this on
387 /// the InFlightSaga so the next non-`Wait` `on_event` return
388 /// can call `LauncherSagaLog::finish_step(saga_id, idx, ev)`.
389 ///
390 /// CPD-3 — `IssueCmd::Host` dispatches live through `HostPipe`
391 /// when one is installed. Tests without a host_pipe (the existing
392 /// F.5/F.6 unit + integration tests) fall back to log-only so
393 /// synthetic-terminal-event drivers continue to work.
394 /// `LauncherSelf` and `Srv` targets remain log-only — reserved
395 /// for class-D/E sagas, no consumer today.
396 ///
397 /// LSD-2 — every state transition writes to `LauncherSagaLog`
398 /// when one is installed: `IssueCmd` → `start_step`, `Done` →
399 /// `terminate_saga(Completed)`, `Failed` →
400 /// `terminate_saga(Failed)`.
401 ///
402 /// `step_index` is the caller-allocated index for THIS dispatch.
403 /// On the first dispatch in `spawn_saga`, the caller passes 0;
404 /// on subsequent dispatches from the event loop, the caller pulls
405 /// + bumps the per-InFlightSaga counter.
406 ///
407 /// Terminal paths (`Done`, `Failed`, host-send-error) all go
408 /// through `claim_terminal` so only one of {normal Done/Failed,
409 /// timeout-task, SagaActionFailed listener, host-send-error}
410 /// can win the terminal-event race for a given saga_id.
411 /// (reagent P1 PR #644 round 1 + 2.)
412 async fn apply_action(
413 &self,
414 saga_id: u64,
415 name: &'static str,
416 action: SagaAction,
417 step_index: u32,
418 ) -> ApplyOutcome {
419 match action {
420 SagaAction::IssueCmd { target, cmd } => {
421 // LSD-2 — write the durable `pending` step row BEFORE
422 // dispatch so a crash mid-dispatch leaves a recoverable
423 // breadcrumb (LSD-3's walker upgrades this saga to
424 // `failed_compensation` on next launcher startup).
425 let step_name = derive_step_name(&cmd, target);
426 if let Some(log) = self.log.as_ref() {
427 if let Err(e) =
428 log.start_step(saga_id, step_index, &step_name, target, &cmd)
429 {
430 crate::log(&format!(
431 "[saga] saga_id={} name={} start_step log write failed: {} — continuing (in-memory path remains authoritative for this run)",
432 saga_id, name, e
433 ));
434 }
435 }
436
437 match target {
438 PipeTarget::Host => {
439 // CPD-3 — dispatch live through the host pipe
440 // when installed. Tests without a host_pipe
441 // (most existing F.5/F.6 unit + integration
442 // tests) fall back to log-only so synthetic-
443 // terminal-event drivers continue to work.
444 let Some(host_pipe) = self.host_pipe.as_ref() else {
445 crate::log(&format!(
446 "[saga] saga_id={} name={} IssueCmd target=Host cmd={:?} (no host_pipe installed — log-only)",
447 saga_id, name, cmd
448 ));
449 return ApplyOutcome::in_flight_awaiting(step_index);
450 };
451 let cmd_with_id = inject_saga_id(cmd, saga_id);
452 match host_pipe.send_command(&cmd_with_id).await {
453 Ok(()) => {
454 crate::log(&format!(
455 "[saga] saga_id={} name={} IssueCmd::Host dispatched cmd={:?}",
456 saga_id, name, cmd_with_id
457 ));
458 ApplyOutcome::in_flight_awaiting(step_index)
459 }
460 Err(e) => {
461 // CRITICAL: do NOT terminate the saga
462 // here. send_command's send_frame already
463 // re-buffered the failed frame for retry
464 // on reconnect (with the real saga_id).
465 // If we ALSO emit SagaFailed now:
466 // - 30s later expire_pending_if_timed_out
467 // drains the buffer + emit_drop_failure
468 // emits a SECOND SagaFailed for the
469 // same saga_id (double-emit), OR
470 // - host reconnects in <30s and the
471 // buffered command executes as an
472 // orphaned side effect after the saga
473 // bracket is already closed.
474 // (reagent P1 PR #644 round 6.)
475 //
476 // Round 7 design: keep the saga in_flight.
477 // The retry path resolves it (Report*
478 // arrives → on_event drives forward) OR
479 // the saga's own timeout (5s default,
480 // 30s for F.6) fires + claim_terminal'd
481 // SagaFailed. Either way, exactly one
482 // terminal event is emitted.
483 crate::log(&format!(
484 "[saga] saga_id={} name={} host pipe send transient err: {} — frame buffered for retry; saga stays in-flight (round 7 fix)",
485 saga_id, name, e
486 ));
487 ApplyOutcome::in_flight_awaiting(step_index)
488 }
489 }
490 }
491 PipeTarget::LauncherSelf | PipeTarget::Srv => {
492 // Reserved for class-D/E sagas; out of scope
493 // per SPEC_CROSS_PROCESS_DISPATCH §3.6. Log-
494 // only retains framework shape until a
495 // consumer arrives.
496 crate::log(&format!(
497 "[saga] saga_id={} name={} IssueCmd target={:?} cmd={:?} (target not yet wired; log-only)",
498 saga_id, name, target, cmd
499 ));
500 ApplyOutcome::in_flight_awaiting(step_index)
501 }
502 }
503 }
504 SagaAction::Wait => ApplyOutcome::in_flight_no_change(),
505 SagaAction::Done => {
506 // Atomic claim: only one of {normal Done, timeout-task,
507 // SagaActionFailed listener, host-send-error} can win
508 // the terminal-event race. Without this guard a Done
509 // could fire SagaCompleted while the timeout task
510 // simultaneously fires SagaFailed, producing duplicate
511 // terminal events for the same saga (reagent P1 PR
512 // #644 round 1).
513 if !self.claim_terminal(saga_id).await {
514 return ApplyOutcome::terminated();
515 }
516 crate::log(&format!(
517 "[saga] saga_id={} name={} Done — emitting SagaCompleted",
518 saga_id, name
519 ));
520 if let Some(log) = self.log.as_ref() {
521 if let Err(e) = log.terminate_saga(saga_id, SagaOutcome::Completed) {
522 crate::log(&format!(
523 "[saga] saga_id={} terminate_saga(Completed) log write failed: {}",
524 saga_id, e
525 ));
526 }
527 }
528 self.emit_completed(saga_id).await;
529 ApplyOutcome::terminated()
530 }
531 SagaAction::Failed { reason } => {
532 if !self.claim_terminal(saga_id).await {
533 return ApplyOutcome::terminated();
534 }
535 crate::log(&format!(
536 "[saga] saga_id={} name={} Failed reason={} — emitting SagaFailed",
537 saga_id, name, reason
538 ));
539 if let Some(log) = self.log.as_ref() {
540 if let Err(e) = log.terminate_saga(
541 saga_id,
542 SagaOutcome::Failed {
543 reason: reason.clone(),
544 },
545 ) {
546 crate::log(&format!(
547 "[saga] saga_id={} terminate_saga(Failed) log write failed: {}",
548 saga_id, e
549 ));
550 }
551 }
552 self.emit_failed(saga_id, reason).await;
553 ApplyOutcome::terminated()
554 }
555 }
556 }
557
558 /// Atomically remove `saga_id` from `in_flight` and return whether
559 /// the caller was the one who removed it (i.e. has the right to
560 /// emit the terminal `SagaCompleted` / `SagaFailed`). Idempotent:
561 /// a second caller for the same saga_id sees `false`. Used by the
562 /// `SagaAction::Done` and `SagaAction::Failed` paths in
563 /// `apply_action`, by the per-saga timeout task in `spawn_saga`,
564 /// by the `Event::SagaActionFailed` listener in `run_coordinator`,
565 /// by the host-send-failure path in `apply_action::IssueCmd::Host`,
566 /// and by the eviction path in `run_coordinator`.
567 /// (reagent P1 PR #644 round 1.)
568 async fn claim_terminal(&self, saga_id: u64) -> bool {
569 let claimed = {
570 let mut registry = self.in_flight.lock().await;
571 registry.remove(&saga_id).is_some()
572 };
573 // Purge any host-pipe pending frames tagged with this saga_id
574 // so a host reconnect post-terminal doesn't drain them as
575 // orphan side effects, and the 30s expiry path doesn't emit
576 // a second SagaFailed for the same saga_id.
577 // (codex + reagent P1 PR #644 round 7.)
578 if claimed {
579 if let Some(pipe) = self.host_pipe.as_ref() {
580 pipe.cancel_saga(saga_id).await;
581 }
582 }
583 claimed
584 }
585
586 /// Register a fresh saga, calling `start` and applying its first
587 /// action. The caller has already determined that the saga
588 /// should fire (e.g. matched a trigger event). Returns the saga's
589 /// allocated id (logged + bracketed in `SagaStarted`).
590 ///
591 /// LSD-2 — also writes a `running` row to the durable saga log
592 /// before invoking `saga.start()`, and (via `apply_action`)
593 /// records the saga's first dispatched step.
594 ///
595 /// CPD-3 — also arms a per-saga deadline timer. If the saga is
596 /// still in_flight when `saga.timeout()` elapses, a background
597 /// task force-fails it (`SagaFailed { reason: "saga timeout" }`)
598 /// and removes it from the registry. Per spec §3.10. Insert-then-
599 /// start ordering is required so `apply_action`'s `claim_terminal`
600 /// guard can succeed for immediate-completion sagas (codex P2 PR
601 /// #644 round 2).
602 async fn spawn_saga(self: &Arc<Self>, saga: Box<dyn Saga>) -> u64 {
603 let saga_id = self.next_id();
604 let name = saga.name();
605 let input = saga.input_snapshot();
606 let saga_timeout = saga.timeout();
607 crate::log(&format!(
608 "[saga] starting saga_id={} name={}",
609 saga_id, name
610 ));
611 // LSD-2 — write the durable `running` row BEFORE
612 // `emit_started` so a crash between this point and
613 // `apply_action` still leaves a recoverable breadcrumb
614 // (LSD-3's walker will upgrade it to `failed_compensation` on
615 // next launcher startup). Mirrors srv's `emit_saga_started`
616 // ordering: durable log before bus.
617 if let Some(log) = self.log.as_ref() {
618 if let Err(e) = log.start_saga(saga_id, name, &input) {
619 crate::log(&format!(
620 "[saga] saga_id={} start_saga log write failed: {} — continuing (in-memory path remains authoritative for this run)",
621 saga_id, e
622 ));
623 }
624 }
625 // Emit SagaStarted FIRST so any subscriber buffering by
626 // saga_id sees the bracket open before any per-step events.
627 // Mirrors `agentmux-srv::sagas::emit_saga_started` ordering.
628 self.emit_started(saga_id, name).await;
629
630 // CPD-3 — insert the saga into in_flight BEFORE calling
631 // start(), so that if start() returns SagaAction::Done or
632 // SagaAction::Failed immediately, apply_action's terminal
633 // paths can claim_terminal successfully and emit the matching
634 // SagaCompleted/SagaFailed bracket. Pre-round-3 we started
635 // saga.start() before insertion, which broke immediate-
636 // completion sagas (claim_terminal returned false → no
637 // terminal event → dangling SagaStarted bracket).
638 // (codex P2 PR #644 round 2.)
639 let action = {
640 let mut registry = self.in_flight.lock().await;
641 // CPD-4 — concurrent same-kind sagas now correlate by
642 // saga_id (`Saga::on_event` filters on
643 // `ctx.saga_id == event.saga_id`). The pre-CPD-4
644 // observability warning (PR #634 codex P1) is retained
645 // at info level for operators tracking concurrency
646 // density via `--diag wrr`, but the underlying
647 // correctness issue is closed.
648 let same_kind_count = registry
649 .values()
650 .filter(|s| s.saga.name() == name)
651 .count();
652 if same_kind_count >= 1 {
653 crate::log(&format!(
654 "[saga] starting {} saga_id={} while {} other(s) of same kind in flight (CPD-4 per-saga correlation — sagas coexist cleanly)",
655 name, saga_id, same_kind_count,
656 ));
657 }
658 registry.insert(
659 saga_id,
660 InFlightSaga {
661 saga,
662 awaiting_step: None,
663 next_step_index: 0,
664 },
665 );
666 // Drive start() while still under the registry lock so
667 // we have a mutable reference to the just-inserted saga.
668 // start() is non-async and does no I/O — bounded hold time.
669 let in_flight_saga = registry.get_mut(&saga_id).expect("just inserted");
670 let ctx = SagaCtx { saga_id };
671 in_flight_saga.saga.start(&ctx)
672 };
673
674 // First step on a fresh saga is index 0. apply_action may
675 // remove the saga via claim_terminal (Done/Failed/host-send-
676 // error) without re-locking the registry from here.
677 let outcome = self.apply_action(saga_id, name, action, 0).await;
678 if outcome.in_flight {
679 // Update the bookkeeping for the freshly-issued step.
680 // If the first action allocated step 0, the saga is now
681 // parked awaiting that step's echo event; next allocation
682 // is index 1. Otherwise (Wait — saga has no first
683 // dispatch yet), keep the counter at 0.
684 let mut registry = self.in_flight.lock().await;
685 if let Some(in_flight_saga) = registry.get_mut(&saga_id) {
686 in_flight_saga.awaiting_step = outcome.awaiting_step;
687 in_flight_saga.next_step_index =
688 if outcome.awaiting_step.is_some() { 1 } else { 0 };
689 }
690 drop(registry);
691
692 // CPD-3 — arm the per-saga deadline timer. If the saga
693 // is still in_flight when `saga_timeout` elapses, fail
694 // it out. Spawned task captures an Arc clone of the
695 // coordinator so it can outlive the saga's deadline.
696 // Uses `claim_terminal` so it doesn't race with the
697 // normal Done/Failed path. (reagent P1 PR #644 round 1.)
698 let coord_for_timeout = Arc::clone(self);
699 tokio::spawn(async move {
700 tokio::time::sleep(saga_timeout).await;
701 if coord_for_timeout.claim_terminal(saga_id).await {
702 crate::log(&format!(
703 "[saga] saga_id={} name={} timed out after {:?} — emitting SagaFailed",
704 saga_id, name, saga_timeout
705 ));
706 let reason = "saga timeout".to_string();
707 if let Some(log) = coord_for_timeout.log.as_ref() {
708 if let Err(e) = log.terminate_saga(
709 saga_id,
710 SagaOutcome::Failed {
711 reason: reason.clone(),
712 },
713 ) {
714 crate::log(&format!(
715 "[saga] saga_id={} timeout terminate_saga(Failed) log write failed: {}",
716 saga_id, e
717 ));
718 }
719 }
720 coord_for_timeout
721 .emit_failed(saga_id, reason)
722 .await;
723 }
724 });
725 }
726 saga_id
727 }
728}
729
730/// LSD-2 — outcome of `apply_action`. Combines the prior `bool` (is
731/// the saga still in flight?) with the new `awaiting_step` book-
732/// keeping the coordinator needs to call `LauncherSagaLog::finish_step`
733/// when the awaited bus event lands.
734#[derive(Debug, Clone, Copy)]
735struct ApplyOutcome {
736 in_flight: bool,
737 /// If the action was an `IssueCmd`, the step index it allocated;
738 /// the coordinator parks this on the InFlightSaga. `None` for
739 /// `Wait` / `Done` / `Failed`.
740 awaiting_step: Option<u32>,
741}
742
743impl ApplyOutcome {
744 fn in_flight_awaiting(step_index: u32) -> Self {
745 Self {
746 in_flight: true,
747 awaiting_step: Some(step_index),
748 }
749 }
750 fn in_flight_no_change() -> Self {
751 Self {
752 in_flight: true,
753 awaiting_step: None,
754 }
755 }
756 fn terminated() -> Self {
757 Self {
758 in_flight: false,
759 awaiting_step: None,
760 }
761 }
762}
763
764/// LSD-2 — short, greppable name for a `Command` dispatched as part
765/// of a saga step. Mirrors srv's `command_discriminant_name` in spirit
766/// (snake_case strings rather than `Debug` formatting) but prefixes
767/// with `issue_cmd_<target>_<discriminant>` so `--diag sagas` output
768/// makes the dispatch target obvious without a separate column lookup.
769///
770/// Falls back to `issue_cmd_<target>_unknown` for variants serde can't
771/// stringify (shouldn't happen for the snake_case-tagged Command enum;
772/// defensive default).
773fn derive_step_name(cmd: &Command, target: PipeTarget) -> String {
774 let target_str = match target {
775 PipeTarget::LauncherSelf => "launcher_self",
776 PipeTarget::Host => "host",
777 PipeTarget::Srv => "srv",
778 };
779 let discriminant = match serde_json::to_value(cmd) {
780 Ok(serde_json::Value::Object(map)) => map
781 .get("cmd")
782 .and_then(|v| v.as_str())
783 .map(|s| s.to_string())
784 .unwrap_or_else(|| "unknown".to_string()),
785 _ => "unknown".to_string(),
786 };
787 format!("issue_cmd_{}_{}", target_str, discriminant)
788}
789
790/// CPD-3 — fill in the `saga_id` field on a host-bound `Command`
791/// before dispatch. Sagas construct their `IssueCmd` actions with a
792/// placeholder `saga_id: 0` (they don't know their coordinator-
793/// allocated id at action-construction time); the coordinator
794/// rewrites the field at dispatch time so the host can echo it back
795/// on the matching `Report*`.
796///
797/// Exhaustive match: any host-bound Command variant added later that
798/// forgets to plumb `saga_id` will refuse to compile here. Non-host-
799/// bound variants (Report*, Identify, etc.) panic loudly because
800/// sagas only emit `IssueCmd::Host` for the three host-bound
801/// command kinds covered below.
802fn inject_saga_id(cmd: Command, saga_id: u64) -> Command {
803 match cmd {
804 Command::SpawnPoolWindow { .. } => Command::SpawnPoolWindow { saga_id },
805 Command::ReapPanes { label, .. } => Command::ReapPanes { label, saga_id },
806 Command::DrainPoolIfLast { label, .. } => {
807 Command::DrainPoolIfLast { label, saga_id }
808 }
809 // Defense-in-depth: every non-host-dispatched Command variant
810 // (Report*, Identify, etc.) is a coding bug if it reaches
811 // this point — the F.5/F.6 sagas only emit IssueCmd::Host
812 // for the three command kinds above.
813 other => panic!(
814 "inject_saga_id called on non-host-bound Command variant: {:?}",
815 other
816 ),
817 }
818}
819
820/// Inspect a bus event for "should this start a fresh saga?" Returns
821/// the constructed saga (boxed) on a hit; `None` otherwise.
822///
823/// Triggers wired to date:
824/// * F.5: `Event::PoolWindowPromoted` → `pool_respawn::PoolRespawn`
825/// * F.6: `Event::WindowClosed` →
826/// `window_cleanup::WindowCleanupCascade`
827///
828/// Future sagas extend this match.
829///
830/// CPD-4 — the coordinator no longer evicts same-kind in-flight
831/// sagas before `spawn_saga`; per-saga event correlation
832/// (`Saga::on_event` filters by `ctx.saga_id`) lets concurrent
833/// same-kind sagas coexist cleanly. The pre-CPD-4 evict-and-replace
834/// gate (PR #634 round 3) is removed in `run_coordinator`.
835fn match_trigger(event: &Event) -> Option<Box<dyn Saga>> {
836 match event {
837 Event::PoolWindowPromoted { label, .. } => {
838 Some(Box::new(pool_respawn::PoolRespawn::new(label.clone())))
839 }
840 // (codex P1 PR #637.) Only fire the cleanup cascade on
841 // CLEAN closes. `Event::WindowClosed { crash_detected: true }`
842 // comes from `wrr::apply_hwnd_destroyed` after a host/renderer
843 // crash; the host never sent `ReportPanesReaped` /
844 // `ReportPoolDrainDecision`, so the saga would stay
845 // in-flight indefinitely (only cleared by a later same-kind
846 // eviction or launcher restart) leaving a SagaStarted
847 // bracket dangling for subscribers that buffer on lifecycle.
848 Event::WindowClosed { label, crash_detected: false, .. } => Some(Box::new(
849 window_cleanup::WindowCleanupCascade::new(label.clone()),
850 )),
851 _ => None,
852 }
853}
854
855/// Run the coordinator's bus-subscription loop.
856///
857/// **Receiver is passed in, not subscribed inside.** Subscribing
858/// before `tokio::spawn` (in `main.rs`) ensures events emitted
859/// between coordinator construction and the first `recv()` aren't
860/// lost to the race window. Same pattern as `event_log::run_disk_writer`.
861/// (reagent P2 PR #609.)
862///
863/// Dispatch order on each event:
864/// 1. Match against trigger table → start any new sagas via
865/// `spawn_saga` (which emits `SagaStarted` and applies the
866/// saga's initial action).
867/// 2. Feed the same event into every in-flight saga's `on_event`.
868/// Sagas returning `Done` / `Failed` are removed from
869/// `in_flight` after their lifecycle event is emitted.
870///
871/// **Self-emitted events (`SagaStarted` / `SagaCompleted` /
872/// `SagaFailed`) are NOT re-fed into sagas.** A saga reacting to its
873/// own start event would loop. Filtered at the top of the dispatch
874/// path.
875pub async fn run_coordinator(
876 coord: Arc<SagaCoordinator>,
877 mut events_rx: tokio::sync::broadcast::Receiver<Event>,
878) {
879 crate::log("[saga] coordinator started");
880
881 loop {
882 match events_rx.recv().await {
883 Ok(event) => {
884 // Skip our own lifecycle events to avoid loops.
885 if matches!(
886 event,
887 Event::SagaStarted { .. } | Event::SagaCompleted { .. } | Event::SagaFailed { .. }
888 ) {
889 continue;
890 }
891
892 // CPD-3 — host reported via `Command::ReportSagaActionFailed`
893 // that a saga-issued action failed. Launcher reducer
894 // translates this into `Event::SagaActionFailed`
895 // (CPD-1 schema). Terminate the matching saga
896 // immediately rather than waiting for a Report* that
897 // won't come.
898 if let Event::SagaActionFailed { saga_id, reason, .. } = &event {
899 let saga_id = *saga_id;
900 let reason = reason.clone();
901 // claim_terminal: atomic take-ownership of the
902 // terminal-event slot, preventing duplicate-emission
903 // races with the saga's own Done/Failed path or its
904 // timeout task. (reagent P1 PR #644 round 1.)
905 if coord.claim_terminal(saga_id).await {
906 crate::log(&format!(
907 "[saga] saga_id={} terminating from host SagaActionFailed reason={}",
908 saga_id, reason
909 ));
910 let full_reason = format!("host action failed: {}", reason);
911 if let Some(log) = coord.log.as_ref() {
912 if let Err(e) = log.terminate_saga(
913 saga_id,
914 SagaOutcome::Failed {
915 reason: full_reason.clone(),
916 },
917 ) {
918 crate::log(&format!(
919 "[saga] saga_id={} SagaActionFailed terminate_saga(Failed) log write failed: {}",
920 saga_id, e
921 ));
922 }
923 }
924 coord.emit_failed(saga_id, full_reason).await;
925 }
926 continue;
927 }
928
929 // Step 1 — start any new sagas this event triggers.
930 //
931 // **CPD-4 — evict-and-replace retired.** Pre-CPD-4
932 // (codex P1 PR #634 round 3) the coordinator forced
933 // at-most-one same-kind saga in flight by emitting
934 // `SagaFailed { reason: "evicted" }` on the prior
935 // saga whenever a fresh trigger landed. That was a
936 // workaround for the broadcast-routing limitation:
937 // every `PoolWindowAdded` / `PanesReaped` /
938 // `PoolDrained` / `PoolNotLast` advanced *every*
939 // matching in-flight saga, so two concurrent sagas
940 // would cross-talk and complete each other's bracket.
941 //
942 // CPD-4 closes that limitation. Saga `on_event` now
943 // filters by `event.saga_id == ctx.saga_id`
944 // (`pool_respawn::on_event`,
945 // `window_cleanup::on_event`). Concurrent same-kind
946 // sagas coexist; each consumes only events tagged
947 // with its own coordinator-allocated id. Eviction is
948 // no longer needed — and would actively harm
949 // correctness (a healthy saga gets `SagaFailed`d the
950 // moment a sibling fires).
951 //
952 // Per spec §3.7 + execution-plan §2 batch 3.
953 if let Some(saga) = match_trigger(&event) {
954 coord.spawn_saga(saga).await;
955 }
956
957 // Step 2 — feed the event into every in-flight saga.
958 // Two-pass to avoid holding the registry lock across
959 // `apply_action` (which itself locks state to bump
960 // version when emitting SagaCompleted/Failed).
961 //
962 // LSD-2 — when a saga's `on_event` returns anything
963 // OTHER than `Wait`, it has consumed its awaited bus
964 // event; record the step's success in the durable log
965 // (`finish_step(saga_id, awaiting_step, &event)`) before
966 // dispatching the next action. The new action then
967 // either parks the saga on a fresh step (next index
968 // pulled from `next_step_index`) or terminates it.
969 struct PendingAction {
970 saga_id: u64,
971 name: &'static str,
972 action: SagaAction,
973 /// Awaited step index that this on_event return
974 /// "consumed" — `Some` only when on_event returned
975 /// non-`Wait`. Caller will `finish_step` on it
976 /// before invoking `apply_action` for the new
977 /// action.
978 consumed_step: Option<u32>,
979 /// Next free step index for this saga; if the new
980 /// action is `IssueCmd`, `apply_action` writes to
981 /// this index.
982 next_idx: u32,
983 }
984 let actions: Vec<PendingAction> = {
985 let mut in_flight = coord.in_flight.lock().await;
986 let mut out = Vec::new();
987 for (saga_id, in_flight_saga) in in_flight.iter_mut() {
988 let ctx = SagaCtx { saga_id: *saga_id };
989 let action = in_flight_saga.saga.on_event(&event, &ctx);
990 // If the saga consumed its awaited event,
991 // capture the awaited index now and clear it.
992 // `Wait` keeps awaiting_step unchanged.
993 let consumed_step = if matches!(action, SagaAction::Wait) {
994 None
995 } else {
996 in_flight_saga.awaiting_step.take()
997 };
998 let next_idx = in_flight_saga.next_step_index;
999 out.push(PendingAction {
1000 saga_id: *saga_id,
1001 name: in_flight_saga.saga.name(),
1002 action,
1003 consumed_step,
1004 next_idx,
1005 });
1006 }
1007 out
1008 };
1009 for pending in actions {
1010 let PendingAction {
1011 saga_id,
1012 name,
1013 action,
1014 consumed_step,
1015 next_idx,
1016 } = pending;
1017 // LSD-2 — record the awaited step's success in
1018 // the durable log. `event` is the event that
1019 // caused the saga to advance.
1020 if let Some(idx) = consumed_step {
1021 if let Some(log) = coord.log.as_ref() {
1022 if let Err(e) = log.finish_step(saga_id, idx, &event) {
1023 crate::log(&format!(
1024 "[saga] saga_id={} finish_step log write failed: {}",
1025 saga_id, e
1026 ));
1027 }
1028 }
1029 }
1030 let issued_cmd = matches!(action, SagaAction::IssueCmd { .. });
1031 let outcome = coord.apply_action(saga_id, name, action, next_idx).await;
1032 if !outcome.in_flight {
1033 coord.in_flight.lock().await.remove(&saga_id);
1034 } else if let Some(awaited) = outcome.awaiting_step {
1035 // Update the saga's bookkeeping to reflect
1036 // the freshly-issued step.
1037 let mut registry = coord.in_flight.lock().await;
1038 if let Some(in_flight_saga) = registry.get_mut(&saga_id) {
1039 in_flight_saga.awaiting_step = Some(awaited);
1040 // Bump only if this dispatch consumed the
1041 // pre-allocated index (always true for
1042 // IssueCmd; defensive guard).
1043 if issued_cmd {
1044 in_flight_saga.next_step_index = awaited + 1;
1045 }
1046 }
1047 }
1048 }
1049 }
1050 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1051 crate::log(&format!("[saga] coordinator lagged, missed {} events", n));
1052 }
1053 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1054 crate::log("[saga] coordinator stopping (bus closed)");
1055 return;
1056 }
1057 }
1058 }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063 use super::*;
1064 use agentmux_common::ipc::{ClientKind, LifecyclePhase};
1065
1066 /// Smoke test from E.1a, retained: coordinator drains unrelated
1067 /// events without panicking. F.5 expanded the loop body but the
1068 /// invariant — non-trigger events are no-ops — still holds.
1069 #[tokio::test]
1070 async fn coordinator_drains_unrelated_events_without_panic() {
1071 let (events_tx, _rx) = tokio::sync::broadcast::channel::<Event>(64);
1072 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1073 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1074 let coord_rx = events_tx.subscribe();
1075 let handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1076
1077 for v in 1..=5 {
1078 let _ = events_tx.send(Event::LifecyclePhaseChanged {
1079 from: LifecyclePhase::Starting,
1080 to: LifecyclePhase::Running,
1081 version: v,
1082 });
1083 }
1084 let _ = events_tx.send(Event::ProcessSpawned {
1085 pid: 42,
1086 kind: ClientKind::Tool,
1087 client_version: "test".into(),
1088 version: 6,
1089 });
1090
1091 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1092 // No sagas should be in flight (no trigger events).
1093 assert!(coord.in_flight.lock().await.is_empty());
1094 drop(events_tx);
1095 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
1096 }
1097
1098 #[test]
1099 fn next_id_is_monotonic() {
1100 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
1101 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1102 let coord = SagaCoordinator::new(events_tx, state);
1103 let a = coord.next_id();
1104 let b = coord.next_id();
1105 let c = coord.next_id();
1106 assert!(a < b);
1107 assert!(b < c);
1108 }
1109
1110 /// F.5 — verify the trigger table picks up `PoolWindowPromoted`.
1111 /// The end-to-end coordinator test lives in
1112 /// `pool_respawn::tests::coordinator_brackets_promote_with_saga_lifecycle_events`
1113 /// where the pool-respawn saga's expected behavior is asserted.
1114 #[test]
1115 fn match_trigger_pool_window_promoted_starts_pool_respawn() {
1116 let event = Event::PoolWindowPromoted {
1117 label: "window-pool-abc".into(),
1118 version: 1,
1119 };
1120 let saga = match_trigger(&event).expect("PoolWindowPromoted should trigger a saga");
1121 assert_eq!(saga.name(), "pool_respawn_on_promote");
1122 }
1123
1124 /// F.6 — verify the trigger table picks up `WindowClosed`. The
1125 /// end-to-end coordinator test lives in
1126 /// `window_cleanup::tests::coordinator_brackets_close_with_saga_lifecycle_events`.
1127 #[test]
1128 fn match_trigger_window_closed_starts_window_cleanup_cascade() {
1129 let event = Event::WindowClosed {
1130 label: "main".into(),
1131 version: 1,
1132 crash_detected: false,
1133 };
1134 let saga = match_trigger(&event).expect("WindowClosed should trigger a saga");
1135 assert_eq!(saga.name(), "window_cleanup_cascade");
1136 }
1137
1138 /// (codex P1 PR #637 round 2.) Crash-detected closes (originating
1139 /// from `wrr::apply_hwnd_destroyed`) must NOT trigger the saga —
1140 /// the host never sent the cleanup reports, so the saga would
1141 /// stay in-flight forever.
1142 #[test]
1143 fn match_trigger_skips_crash_detected_window_closed() {
1144 let event = Event::WindowClosed {
1145 label: "crashed-window".into(),
1146 version: 1,
1147 crash_detected: true,
1148 };
1149 assert!(
1150 match_trigger(&event).is_none(),
1151 "crash-detected close should NOT spawn a saga",
1152 );
1153 }
1154
1155 #[test]
1156 fn match_trigger_returns_none_for_non_trigger_events() {
1157 let cases = vec![
1158 Event::PoolWindowAdded {
1159 label: "window-pool-abc".into(),
1160 version: 1,
1161 saga_id: None,
1162 },
1163 Event::PoolWindowRemoved {
1164 label: "window-pool-abc".into(),
1165 version: 1,
1166 },
1167 Event::WindowOpened {
1168 label: "main".into(),
1169 kind: agentmux_common::ipc::WindowKind::FullInstance,
1170 parent_label: None,
1171 version: 1,
1172 },
1173 Event::LifecyclePhaseChanged {
1174 from: LifecyclePhase::Starting,
1175 to: LifecyclePhase::Running,
1176 version: 1,
1177 },
1178 // F.6 step-1/step-2 terminal events are NOT triggers
1179 // themselves — the saga consumes them, but receiving one
1180 // when no saga is in flight should be a no-op.
1181 Event::PanesReaped {
1182 label: "main".into(),
1183 version: 1,
1184 saga_id: None,
1185 },
1186 Event::PoolDrained {
1187 label: "main".into(),
1188 version: 1,
1189 saga_id: None,
1190 },
1191 Event::PoolNotLast {
1192 label: "main".into(),
1193 version: 1,
1194 saga_id: None,
1195 },
1196 ];
1197 for event in cases {
1198 assert!(
1199 match_trigger(&event).is_none(),
1200 "non-trigger event spawned a saga: {:?}",
1201 event
1202 );
1203 }
1204 }
1205
1206 /// Saga lifecycle events on the bus must not feed back into
1207 /// sagas (would cause loops). Sanity-check the filter directly.
1208 #[tokio::test]
1209 async fn coordinator_does_not_self_trigger_on_lifecycle_events() {
1210 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
1211 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1212 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1213 let coord_rx = events_tx.subscribe();
1214 let handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1215 tokio::task::yield_now().await;
1216
1217 // Push a SagaStarted-shaped event; if the coordinator
1218 // re-spawned, it would eventually exhaust the saga_id atomic
1219 // by looping. We verify simpler: in_flight stays empty.
1220 let _ = events_tx.send(Event::SagaStarted {
1221 saga_id: 999,
1222 name: "foreign_saga".into(),
1223 version: 1,
1224 });
1225 let _ = events_tx.send(Event::SagaCompleted {
1226 saga_id: 999,
1227 version: 2,
1228 });
1229 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1230 assert!(coord.in_flight.lock().await.is_empty());
1231 drop(events_tx);
1232 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
1233 }
1234
1235 // ---------- Phase F.7 — saga-lifecycle property tests ----------
1236 //
1237 // Random sequences of `WindowClosed` / `PoolWindowPromoted`
1238 // events fed through `match_trigger`. Asserts:
1239 // 1. `WindowClosed { crash_detected: true }` NEVER produces a
1240 // saga (F.6 round-2 gate, codex P1 #637).
1241 // 2. `WindowClosed { crash_detected: false }` ALWAYS produces
1242 // exactly one window_cleanup_cascade saga.
1243 // 3. `PoolWindowPromoted` ALWAYS produces exactly one
1244 // pool_respawn_on_promote saga.
1245 // 4. The saga's name (the (kind, key) tuple's "kind" component)
1246 // is one of the two known names — no foreign saga slips
1247 // through the trigger table.
1248 //
1249 // The end-to-end coordinator-level "exactly one terminal lifecycle
1250 // event per saga" assertion lives in
1251 // `pool_respawn::tests::coordinator_brackets_promote_with_saga_lifecycle_events`
1252 // and `window_cleanup::tests::coordinator_brackets_close_with_saga_lifecycle_events`
1253 // (already shipped in F.5 + F.6). The proptests here add the
1254 // randomized-input dimension those happy-path tests don't cover.
1255 //
1256 // Cases capped at 64 (default 1024 too slow for CI). Same bound
1257 // the srv reducer's E.7 proptest uses.
1258
1259 use proptest::prelude::*;
1260
1261 /// Trigger event variants the F.5/F.6 sagas can be triggered by,
1262 /// plus crash-detected close (the negative case).
1263 #[derive(Debug, Clone)]
1264 enum F7TriggerEvent {
1265 /// Should spawn `window_cleanup_cascade`.
1266 WindowClosedClean { label: String },
1267 /// Should NOT spawn anything (F.6 round-2 gate).
1268 WindowClosedCrashed { label: String },
1269 /// Should spawn `pool_respawn_on_promote`.
1270 PoolWindowPromoted { label: String },
1271 /// A non-trigger event — must produce no saga.
1272 Unrelated,
1273 }
1274
1275 fn f7_trigger_strategy() -> impl Strategy<Value = F7TriggerEvent> {
1276 prop_oneof![
1277 3 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::WindowClosedClean { label }),
1278 1 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::WindowClosedCrashed { label }),
1279 3 => "[a-c]{1,3}".prop_map(|label| F7TriggerEvent::PoolWindowPromoted { label }),
1280 2 => Just(F7TriggerEvent::Unrelated),
1281 ]
1282 }
1283
1284 fn make_event(t: F7TriggerEvent, version: u64) -> Event {
1285 match t {
1286 F7TriggerEvent::WindowClosedClean { label } => Event::WindowClosed {
1287 label,
1288 version,
1289 crash_detected: false,
1290 },
1291 F7TriggerEvent::WindowClosedCrashed { label } => Event::WindowClosed {
1292 label,
1293 version,
1294 crash_detected: true,
1295 },
1296 F7TriggerEvent::PoolWindowPromoted { label } => {
1297 Event::PoolWindowPromoted { label, version }
1298 }
1299 F7TriggerEvent::Unrelated => Event::Pong { nonce: 0, version },
1300 }
1301 }
1302
1303 proptest! {
1304 #![proptest_config(ProptestConfig {
1305 cases: 64,
1306 ..ProptestConfig::default()
1307 })]
1308
1309 /// For every event in a random sequence, `match_trigger`
1310 /// returns the expected saga (or None for crash / unrelated).
1311 /// Exhaustive shape check — catches accidental regressions
1312 /// in the trigger table.
1313 #[test]
1314 fn f7_match_trigger_matches_expected_saga_kind(
1315 triggers in prop::collection::vec(f7_trigger_strategy(), 0..40)
1316 ) {
1317 for (i, t) in triggers.into_iter().enumerate() {
1318 let event = make_event(t.clone(), (i + 1) as u64);
1319 let saga = match_trigger(&event);
1320 match (&t, &saga) {
1321 (F7TriggerEvent::WindowClosedClean { .. }, Some(s)) => {
1322 prop_assert_eq!(s.name(), "window_cleanup_cascade");
1323 }
1324 (F7TriggerEvent::PoolWindowPromoted { .. }, Some(s)) => {
1325 prop_assert_eq!(s.name(), "pool_respawn_on_promote");
1326 }
1327 (F7TriggerEvent::WindowClosedCrashed { .. }, None) => {
1328 // Expected — crash-detected close must never
1329 // spawn a saga (F.6 codex P1 #637).
1330 }
1331 (F7TriggerEvent::Unrelated, None) => {
1332 // Expected — non-trigger event.
1333 }
1334 (other_t, other_s) => {
1335 prop_assert!(
1336 false,
1337 "trigger {:?} produced unexpected saga match: {:?}",
1338 other_t,
1339 other_s.as_ref().map(|s| s.name()),
1340 );
1341 }
1342 }
1343 }
1344 }
1345
1346 /// Crash-detected `WindowClosed` events NEVER produce a
1347 /// saga, regardless of label. Reinforces invariant #1
1348 /// independently — a stricter shrinker target.
1349 #[test]
1350 fn f7_crash_detected_close_never_spawns_saga(
1351 label in "[a-c]{1,3}",
1352 version in 1u64..1_000_000,
1353 ) {
1354 let event = Event::WindowClosed { label, version, crash_detected: true };
1355 prop_assert!(
1356 match_trigger(&event).is_none(),
1357 "crash-detected WindowClosed must not spawn a saga",
1358 );
1359 }
1360
1361 /// Clean-close `WindowClosed` ALWAYS produces a
1362 /// window_cleanup_cascade saga, regardless of label.
1363 #[test]
1364 fn f7_clean_close_always_spawns_window_cleanup_cascade(
1365 label in "[a-c]{1,3}",
1366 version in 1u64..1_000_000,
1367 ) {
1368 let event = Event::WindowClosed { label, version, crash_detected: false };
1369 let saga = match_trigger(&event)
1370 .expect("clean-close should spawn a saga");
1371 prop_assert_eq!(saga.name(), "window_cleanup_cascade");
1372 }
1373 }
1374
1375 /// **CPD-4 — concurrent same-kind sagas coexist without
1376 /// cross-talk.** Random sequences of trigger events drive the
1377 /// coordinator; the registry is allowed to hold multiple sagas of
1378 /// the same kind simultaneously (the pre-CPD-4 evict-and-replace
1379 /// gate is retired). Invariant under test: NO `SagaFailed` event
1380 /// fires with reason `"evicted"` — sagas only terminate via
1381 /// `Done` (their tagged echo lands) or via the saga timeout
1382 /// (~5-30s, far longer than this test's 50ms settling window).
1383 ///
1384 /// Replaces `f7_evict_and_replace_keeps_one_saga_per_kind`. The
1385 /// behavioral inversion is the core of CPD-4: pre-CPD-4 we
1386 /// asserted ≤1 in-flight per kind (and accepted premature
1387 /// `SagaFailed { reason: "evicted" }`); CPD-4 asserts no
1388 /// premature evictions.
1389 #[tokio::test]
1390 async fn cpd4_concurrent_sagas_no_eviction() {
1391 // Tiny LCG for deterministic per-seed sequences. Glibc's
1392 // constants — quality irrelevant; we just need reproducible
1393 // bit patterns across CI runs.
1394 fn lcg_next(state: &mut u64) -> u64 {
1395 *state = state
1396 .wrapping_mul(1_103_515_245)
1397 .wrapping_add(12_345);
1398 *state
1399 }
1400
1401 let seeds: &[u64] = &[1, 7, 42, 99, 1234, 5678, 0xDEADBEEF, 0xC0FFEE];
1402 for seed in seeds {
1403 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1404 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1405 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1406 let mut witness = events_tx.subscribe();
1407 let coord_rx = events_tx.subscribe();
1408 let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1409 tokio::task::yield_now().await;
1410
1411 // Build a deterministic sequence from the seed.
1412 let mut rng = *seed;
1413 let labels = ["a", "b", "c"];
1414 for v in 1u64..=20 {
1415 let label_idx = (lcg_next(&mut rng) % labels.len() as u64) as usize;
1416 let label = labels[label_idx].to_string();
1417 let pick = lcg_next(&mut rng) % 3;
1418 let event = match pick {
1419 0 => Event::WindowClosed {
1420 label,
1421 version: v,
1422 crash_detected: false,
1423 },
1424 1 => Event::PoolWindowPromoted { label, version: v },
1425 _ => Event::Pong { nonce: v, version: v },
1426 };
1427 let _ = events_tx.send(event);
1428 // Yield so the coordinator processes one event before
1429 // we send the next. Tightens the concurrent-overlap
1430 // window.
1431 tokio::task::yield_now().await;
1432 }
1433 // Brief settling — give the coordinator time to process
1434 // the last few events + emit terminal lifecycles.
1435 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1436
1437 // Invariant: NO SagaFailed with "evicted" reason.
1438 // We don't assert the registry size — concurrent sagas
1439 // are allowed to pile up under per-saga correlation
1440 // because we never feed them their tagged terminal
1441 // events in this test. They'll reach their timeout
1442 // eventually, but not within 50ms.
1443 let mut evictions = 0u32;
1444 while let Ok(Ok(ev)) = tokio::time::timeout(
1445 std::time::Duration::from_millis(5),
1446 witness.recv(),
1447 )
1448 .await
1449 {
1450 if let Event::SagaFailed { reason, .. } = ev {
1451 if reason.contains("evicted") {
1452 evictions += 1;
1453 }
1454 }
1455 }
1456 assert_eq!(
1457 evictions, 0,
1458 "seed {} — CPD-4 invariant violated: observed {} SagaFailed events with reason='evicted'; \
1459 evict-and-replace should be retired so concurrent same-kind sagas coexist",
1460 seed, evictions,
1461 );
1462 }
1463 }
1464
1465 /// Saga forward-path emission contract: every saga that
1466 /// terminates emits exactly ONE terminal lifecycle event
1467 /// (`SagaCompleted` xor `SagaFailed`). Drives a small batch of
1468 /// triggers through the coordinator and counts emitted
1469 /// lifecycle events on the bus. Catches regressions where a
1470 /// saga's `Done` action somehow fires both `emit_completed`
1471 /// and `emit_failed`, or neither.
1472 #[tokio::test]
1473 async fn f7_each_saga_emits_exactly_one_terminal_event() {
1474 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1475 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1476 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
1477 let mut witness = events_tx.subscribe();
1478 let coord_rx = events_tx.subscribe();
1479 let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
1480 tokio::task::yield_now().await;
1481
1482 // Drive ONE clean cleanup-cascade saga + ONE pool-respawn
1483 // saga to completion. CPD-4: route terminals by saga_id, so
1484 // we trigger each saga, capture its allocated id from
1485 // SagaStarted, then send the matching tagged terminal events.
1486 let _ = events_tx.send(Event::WindowClosed {
1487 label: "main".into(),
1488 version: 1,
1489 crash_detected: false,
1490 });
1491 let _ = events_tx.send(Event::PoolWindowPromoted {
1492 label: "pool-1".into(),
1493 version: 2,
1494 });
1495
1496 // Drain the bus with a deadline. Count one terminal event
1497 // (SagaCompleted xor SagaFailed) per saga_id. As each
1498 // SagaStarted lands, dispatch the matching tagged terminals.
1499 let mut started_ids = std::collections::HashSet::<u64>::new();
1500 let mut cleanup_saga_id: Option<u64> = None;
1501 let mut respawn_saga_id: Option<u64> = None;
1502 let mut version: u64 = 100;
1503 let mut terminal_for_id = std::collections::HashMap::<u64, &'static str>::new();
1504 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
1505 while std::time::Instant::now() < deadline {
1506 match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1507 {
1508 Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
1509 started_ids.insert(saga_id);
1510 if name == "window_cleanup_cascade" && cleanup_saga_id.is_none() {
1511 cleanup_saga_id = Some(saga_id);
1512 version += 1;
1513 let _ = events_tx.send(Event::PanesReaped {
1514 label: "main".into(),
1515 version,
1516 saga_id: Some(saga_id),
1517 });
1518 } else if name == "pool_respawn_on_promote" && respawn_saga_id.is_none() {
1519 respawn_saga_id = Some(saga_id);
1520 version += 1;
1521 let _ = events_tx.send(Event::PoolWindowAdded {
1522 label: "pool-2".into(),
1523 version,
1524 saga_id: Some(saga_id),
1525 });
1526 }
1527 }
1528 Ok(Ok(Event::PanesReaped { saga_id: Some(sid), .. })) => {
1529 if Some(sid) == cleanup_saga_id {
1530 version += 1;
1531 let _ = events_tx.send(Event::PoolDrained {
1532 label: "main".into(),
1533 version,
1534 saga_id: Some(sid),
1535 });
1536 }
1537 }
1538 Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
1539 let prev = terminal_for_id.insert(saga_id, "completed");
1540 assert!(
1541 prev.is_none(),
1542 "saga_id {} got two terminal events: prior={:?} now=completed",
1543 saga_id,
1544 prev,
1545 );
1546 }
1547 Ok(Ok(Event::SagaFailed { saga_id, .. })) => {
1548 let prev = terminal_for_id.insert(saga_id, "failed");
1549 assert!(
1550 prev.is_none(),
1551 "saga_id {} got two terminal events: prior={:?} now=failed",
1552 saga_id,
1553 prev,
1554 );
1555 }
1556 Ok(Ok(_)) => {}
1557 Ok(Err(_)) => break,
1558 Err(_) => {
1559 // No new events for 50ms — we've drained.
1560 if started_ids.len() == 2
1561 && started_ids.iter().all(|id| terminal_for_id.contains_key(id))
1562 {
1563 break;
1564 }
1565 }
1566 }
1567 }
1568 // Both sagas started.
1569 assert_eq!(
1570 started_ids.len(),
1571 2,
1572 "expected 2 SagaStarted events (one per saga), got {}: {:?}",
1573 started_ids.len(),
1574 started_ids,
1575 );
1576 // Each got exactly one terminal event.
1577 for id in &started_ids {
1578 assert!(
1579 terminal_for_id.contains_key(id),
1580 "saga_id {} never got a terminal lifecycle event",
1581 id,
1582 );
1583 }
1584 // No spurious terminal events for sagas we didn't observe
1585 // start.
1586 for id in terminal_for_id.keys() {
1587 assert!(
1588 started_ids.contains(id),
1589 "saga_id {} got a terminal event but no SagaStarted observed",
1590 id,
1591 );
1592 }
1593 }
1594
1595 // ---------- LSD-2 — coordinator <-> LauncherSagaLog wiring ------
1596 //
1597 // Three end-to-end tests that drive the coordinator with a real
1598 // in-memory `LauncherSagaLog` and assert lifecycle rows land in
1599 // the expected state. Mirror the test inventory pinned in
1600 // `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §4 PR2.
1601 //
1602 // The tests use `pool_respawn::PoolRespawn` because its single-
1603 // step shape (one IssueCmd, one terminal echo) keeps assertions
1604 // tight; the `window_cleanup_cascade` two-step shape is exercised
1605 // implicitly by the existing F.6 coordinator tests + adds a
1606 // multi-step example in `saga_step_finish_records_event_payload`.
1607
1608 use crate::saga::log::LauncherSagaLog;
1609 use std::sync::Arc as StdArc;
1610
1611 /// Helper: spin up coordinator + in-memory log.
1612 fn spawn_coord_with_log()
1613 -> (StdArc<SagaCoordinator>, StdArc<LauncherSagaLog>, tokio::sync::broadcast::Sender<Event>)
1614 {
1615 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(256);
1616 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
1617 let log = StdArc::new(LauncherSagaLog::open_in_memory().expect("in-memory log"));
1618 let coord = StdArc::new(
1619 SagaCoordinator::new(events_tx.clone(), Arc::clone(&state))
1620 .with_log(StdArc::clone(&log))
1621 .expect("with_log on fresh in-memory log should succeed"),
1622 );
1623 (coord, log, events_tx)
1624 }
1625
1626 /// LSD-2 — drive a saga (PoolRespawn) through to completion and
1627 /// verify the durable log records:
1628 /// - `launcher_saga` row with state='completed', non-null
1629 /// ended_at, no failure_reason, input_json round-trips the
1630 /// `promoted_label`.
1631 /// - one step row in 'succeeded' state with output_json
1632 /// populated.
1633 #[tokio::test]
1634 async fn saga_completes_writes_lifecycle_to_log() {
1635 let (coord, log, events_tx) = spawn_coord_with_log();
1636 let mut witness = events_tx.subscribe();
1637 let coord_rx = events_tx.subscribe();
1638 let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1639 tokio::task::yield_now().await;
1640
1641 // Trigger the saga.
1642 let _ = events_tx.send(Event::PoolWindowPromoted {
1643 label: "window-pool-abc".into(),
1644 version: 1,
1645 });
1646 // CPD-4: wait for SagaStarted to learn the saga_id, then send
1647 // the awaited terminal tagged with that id. Pre-CPD-4 we
1648 // could send `saga_id: None`; under per-saga correlation
1649 // the saga only consumes its own echo.
1650 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1651 while std::time::Instant::now() < deadline {
1652 if let Ok(Ok(Event::SagaStarted { saga_id, .. })) =
1653 tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1654 {
1655 let _ = events_tx.send(Event::PoolWindowAdded {
1656 label: "window-pool-xyz".into(),
1657 version: 2,
1658 saga_id: Some(saga_id),
1659 });
1660 break;
1661 }
1662 }
1663
1664 // Settle: coordinator runs apply_action which awaits state
1665 // mutex + bus send. 200ms is generous for the in-process loop.
1666 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1667
1668 let snapshots = log.snapshot_recent(10).expect("snapshot_recent");
1669 assert_eq!(snapshots.len(), 1, "expected exactly one saga row");
1670 let s = &snapshots[0];
1671 assert_eq!(s.name, "pool_respawn_on_promote");
1672 assert_eq!(s.state, "completed");
1673 assert!(s.ended_at.is_some(), "ended_at must be set on completion");
1674 assert!(s.failure_reason.is_none());
1675 assert_eq!(s.step_count, 1, "expected exactly one succeeded step");
1676 // Input snapshot round-trips the saga's constructor arg.
1677 let parsed: serde_json::Value = serde_json::from_str(&s.input_json).unwrap();
1678 assert_eq!(parsed["promoted_label"], "window-pool-abc");
1679 }
1680
1681 /// **CPD-4 — two concurrent same-kind sagas BOTH complete
1682 /// cleanly.** Repurposed from `saga_fails_writes_failed_state`
1683 /// (which previously verified the evict-and-replace path).
1684 /// Per-saga event correlation lets two `pool_respawn_on_promote`
1685 /// sagas coexist; each consumes its own saga_id-tagged refill
1686 /// echo, so the durable log records two `completed` rows — no
1687 /// `failed` rows from the retired eviction policy.
1688 #[tokio::test]
1689 async fn saga_concurrent_same_kind_both_complete() {
1690 let (coord, log, events_tx) = spawn_coord_with_log();
1691 let mut witness = events_tx.subscribe();
1692 let coord_rx = events_tx.subscribe();
1693 let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1694 tokio::task::yield_now().await;
1695
1696 // First promote — kicks off saga A.
1697 let _ = events_tx.send(Event::PoolWindowPromoted {
1698 label: "window-pool-a".into(),
1699 version: 1,
1700 });
1701 // Second promote BEFORE A's terminal arrives — under CPD-4
1702 // saga B coexists with A (no eviction).
1703 let _ = events_tx.send(Event::PoolWindowPromoted {
1704 label: "window-pool-b".into(),
1705 version: 2,
1706 });
1707
1708 // Capture both saga_ids from their `SagaStarted` brackets,
1709 // then fire each saga's tagged refill so both terminate
1710 // Completed. The order doesn't matter because per-saga
1711 // correlation routes events by id, not by FIFO.
1712 let mut ids = Vec::<u64>::new();
1713 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1714 let mut version: u64 = 100;
1715 while std::time::Instant::now() < deadline {
1716 if let Ok(Ok(ev)) =
1717 tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1718 {
1719 if let Event::SagaStarted { saga_id, name, .. } = ev {
1720 if name == "pool_respawn_on_promote" {
1721 ids.push(saga_id);
1722 version += 1;
1723 let _ = events_tx.send(Event::PoolWindowAdded {
1724 label: format!("refill-for-{}", saga_id),
1725 version,
1726 saga_id: Some(saga_id),
1727 });
1728 }
1729 }
1730 if ids.len() == 2 {
1731 break;
1732 }
1733 }
1734 }
1735 assert_eq!(ids.len(), 2, "expected 2 SagaStarted events");
1736
1737 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1738
1739 let snapshots = log.snapshot_recent(10).expect("snapshot_recent");
1740 assert_eq!(
1741 snapshots.len(),
1742 2,
1743 "expected two saga rows (concurrent, no eviction)"
1744 );
1745 // BOTH must be 'completed' — eviction is retired.
1746 for s in &snapshots {
1747 assert_eq!(
1748 s.state, "completed",
1749 "CPD-4: both concurrent sagas must complete cleanly, got state={:?} reason={:?}",
1750 s.state, s.failure_reason,
1751 );
1752 assert!(s.failure_reason.is_none());
1753 }
1754 }
1755
1756 /// LSD-2 — when a saga's `on_event` consumes its awaited bus
1757 /// event, the coordinator calls `LauncherSagaLog::finish_step`
1758 /// with the event payload. To inspect `output_json` directly via
1759 /// the public LSD-1 API, we drive a saga only PART of the way —
1760 /// the cleanup cascade's Step 1 lands its echo (`PanesReaped`)
1761 /// but Step 2 is left in `pending` so the saga stays unresolved.
1762 /// `unresolved_sagas` then exposes the full step list including
1763 /// Step 0's `output_json`.
1764 #[tokio::test]
1765 async fn saga_step_finish_records_event_payload() {
1766 let (coord, log, events_tx) = spawn_coord_with_log();
1767 let mut witness = events_tx.subscribe();
1768 let coord_rx = events_tx.subscribe();
1769 let _handle = tokio::spawn(run_coordinator(StdArc::clone(&coord), coord_rx));
1770 tokio::task::yield_now().await;
1771
1772 // Trigger the cascade. Window-cleanup has two steps; we feed
1773 // only the first echo (`PanesReaped`) so Step 0 transitions
1774 // pending→succeeded but Step 1 (`DrainPoolIfLast`) stays
1775 // pending — the saga remains in_flight + queryable as
1776 // unresolved.
1777 //
1778 // CPD-4: tag PanesReaped with the saga's id so the saga
1779 // actually advances (per-saga correlation).
1780 let _ = events_tx.send(Event::WindowClosed {
1781 label: "main".into(),
1782 version: 1,
1783 crash_detected: false,
1784 });
1785 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1786 while std::time::Instant::now() < deadline {
1787 if let Ok(Ok(Event::SagaStarted { saga_id, .. })) =
1788 tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
1789 {
1790 let _ = events_tx.send(Event::PanesReaped {
1791 label: "main".into(),
1792 version: 2,
1793 saga_id: Some(saga_id),
1794 });
1795 break;
1796 }
1797 }
1798 // INTENTIONALLY no `PoolDrained` / `PoolNotLast`.
1799
1800 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1801
1802 let unresolved = log.unresolved_sagas().expect("unresolved_sagas");
1803 assert_eq!(unresolved.len(), 1, "expected one unresolved saga");
1804 let saga = &unresolved[0];
1805 assert_eq!(saga.name, "window_cleanup_cascade");
1806 assert_eq!(saga.state, "running");
1807 // Two step rows: Step 0 succeeded (PanesReaped echo), Step 1
1808 // pending (DrainPoolIfLast dispatched but no echo arrived).
1809 assert_eq!(saga.steps.len(), 2, "expected 2 step rows");
1810 let step0 = &saga.steps[0];
1811 assert_eq!(step0.step_index, 0);
1812 assert_eq!(step0.state, "succeeded");
1813 assert!(step0.ended_at.is_some(), "succeeded step needs ended_at");
1814 let output_json = step0
1815 .output_json
1816 .as_ref()
1817 .expect("output_json must be set after finish_step");
1818 // Round-trip — the JSON should deserialize to the awaited Event.
1819 let parsed: Event = serde_json::from_str(output_json).expect("event round-trips");
1820 match parsed {
1821 Event::PanesReaped { label, .. } => assert_eq!(label, "main"),
1822 other => panic!("expected PanesReaped, got {:?}", other),
1823 }
1824 let step1 = &saga.steps[1];
1825 assert_eq!(step1.step_index, 1);
1826 assert_eq!(step1.state, "pending");
1827 assert!(step1.output_json.is_none(), "pending step has no output");
1828 }
1829}