agentmux_launcher\saga/
pool_respawn.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase F.5 — pool-respawn-on-promote saga.
5//
6// **What this saga does**
7//
8// When the host promotes a pool window to a user-visible top-level
9// window (the `promote_pool_window` flow in
10// `agentmux-cef::commands::window_pool`), the host immediately
11// calls `spawn_pool_window` to refill the pool. Today that refill
12// fires implicitly between `Event::PoolWindowPromoted` and
13// `Event::PoolWindowAdded` — no saga lifecycle event marks the
14// transaction. Renderers that want to buffer "you're getting a
15// tear-off + the pool is refilling" atomically have nothing to
16// pivot on.
17//
18// This saga formalizes the implicit flow as an explicit cross-
19// process state machine so the renderer sees a `SagaStarted` /
20// `SagaCompleted` bracket. Implementing the spec sketch from
21// `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.1:
22//
23//   saga PoolRespawnOnPromote(promoted_label):
24//       Step 1 — start: emit Event::PoolWindowPromoted { promoted_label }
25//                       (already exists in Phase B; F.5 adds the wire
26//                       variant + host-side report.)
27//       Step 2 — issue Command::SpawnPoolWindow → host
28//                       wait for: Event::PoolWindowAdded { new_label }
29//       Step 3 — Done
30//
31//       Compensation: none (failure to refill is logged + retried
32//       on next promote).
33//
34// **Scope of F.5 (this PR)**
35//
36// F.5 ships:
37//   - the saga state machine itself (this file),
38//   - the wire types (`Command::ReportPoolWindowPromoted`,
39//     `Command::SpawnPoolWindow`, `Event::PoolWindowPromoted`),
40//   - host-side `report_pool_window_promoted` call inside
41//     `promote_pool_window` (between the remove and the open),
42//   - launcher reducer arm translating the report into the typed
43//     event,
44//   - coordinator wiring that registers the saga as a consumer of
45//     `Event::PoolWindowPromoted`.
46//
47// F.5 does NOT ship:
48//   - F.6 window-cleanup cascade saga.
49//   - Launcher-side saga durability (separate concern; srv-side
50//     durability already shipped).
51//
52// **CPD-3 update.** Step 2's `SagaAction::IssueCmd` is now LIVE: the
53// coordinator dispatches `Command::SpawnPoolWindow { saga_id }`
54// through `HostPipe::send_command()` (see
55// `agentmux-launcher/src/saga/mod.rs::apply_action`). The saga is
56// structurally identical — what changed is the coordinator's
57// `IssueCmd::Host` arm. The saga is no longer a passive narrator of
58// the host's implicit refill: it now causally drives it.
59//
60// **Why a saga at all if Step 2 is currently passive?**
61//
62// The renderer-visible value is the `SagaStarted` / `SagaCompleted`
63// bracket: subscribers see "saga foo running" and can buffer
64// related events for that saga_id until the bracket closes. That
65// works the same way whether the launcher actively dispatched the
66// refill or merely observed the host doing it. When the cross-
67// process pipe lands, only the saga's `IssueCmd` handling needs
68// to change — the renderer-facing semantics are stable.
69//
70// **What if `Event::PoolWindowAdded` never arrives?**
71//
72// The saga waits forever until either the bus closes (saga
73// abandoned on launcher restart) or — once F.6+ adds saga
74// timeouts — a per-saga deadline force-fails it. F.5 keeps the
75// behavior deliberately simple: if refill genuinely fails, the
76// next promote will start a fresh saga; the prior failed saga
77// stays in `in_flight` until the launcher exits. This matches the
78// F-spec compensation strategy ("none — failure to refill is
79// logged + retried on next promote").
80//
81// **Known limitation: concurrent-promote correlation** (codex P1
82// PR #634). If two promotes are in flight simultaneously, the
83// coordinator broadcasts every event to every in-flight saga
84// (`saga::mod.rs::run_coordinator`). The first `PoolWindowAdded`
85// completes BOTH sagas — early `SagaCompleted` for the second
86// promote's bracket, later refill event left unbracketed.
87//
88// Concurrent promotes require the user to tear off two windows
89// in rapid succession (under the host's spawn_pool_window
90// completion latency). The user-visible consequence is a brief
91// renderer-side bracketing inconsistency; reducer + SQLite state
92// remain correct.
93//
94// Proper fix requires coordinator-level FIFO routing or per-saga
95// sequence-number correlation. Both are non-trivial and depend on
96// the cross-process dispatch landing first (so the saga's
97// `IssueCmd` is actually causal). Closed in F.6/F.7 alongside the
98// launcher→host command pipe.
99
100use agentmux_common::ipc::{Command, Event};
101
102use super::{PipeTarget, Saga, SagaAction, SagaCtx};
103
104/// State of one in-flight pool-respawn saga.
105#[derive(Debug, Clone, PartialEq, Eq)]
106enum Phase {
107    /// Saga just constructed — waiting for the coordinator to call
108    /// `start`. The first `start` call transitions to
109    /// `WaitingForRefill` and emits the `IssueCmd` for
110    /// `SpawnPoolWindow`.
111    Initial,
112    /// `SpawnPoolWindow` has been issued; the saga is now waiting for
113    /// any `Event::PoolWindowAdded` to land on the bus. The label
114    /// of the new pool entry doesn't need to match the promoted
115    /// label — the refill produces a *new* label, and any
116    /// `PoolWindowAdded` that lands AFTER the saga was constructed
117    /// is the refill we're tracking.
118    WaitingForRefill,
119}
120
121/// Pool-respawn saga: fires once per promote, waits for the matching
122/// refill, then completes.
123pub struct PoolRespawn {
124    /// Label of the window that was promoted (Step 1's input). Held
125    /// for log correlation only — the saga doesn't validate the
126    /// new pool label against this one. Kept on the struct so future
127    /// failure-mode logging (timeout / explicit fail) can include
128    /// "which promote did this saga belong to?" without rethreading.
129    /// LSD-2 — also surfaces in `--diag sagas` via `input_snapshot`.
130    promoted_label: String,
131    /// New pool label observed in `Event::PoolWindowAdded` (Step 2's
132    /// output). `None` until refill is observed.
133    refilled_label: Option<String>,
134    phase: Phase,
135}
136
137impl PoolRespawn {
138    /// Construct a fresh saga for a promote of `promoted_label`.
139    /// Coordinator allocates the saga_id and calls `start` once.
140    pub fn new(promoted_label: String) -> Self {
141        Self {
142            promoted_label,
143            refilled_label: None,
144            phase: Phase::Initial,
145        }
146    }
147
148    /// Label of the new pool window observed in `PoolWindowAdded`.
149    /// `None` until the saga has progressed past `WaitingForRefill`.
150    /// Exported for tests.
151    #[cfg(test)]
152    pub fn refilled_label(&self) -> Option<&str> {
153        self.refilled_label.as_deref()
154    }
155}
156
157impl Saga for PoolRespawn {
158    fn name(&self) -> &'static str {
159        "pool_respawn_on_promote"
160    }
161
162    /// LSD-2 — record the promote's source label for `--diag sagas`.
163    /// `refilled_label` is None at start (only known after Step 2's
164    /// echo lands) so we don't include it; the durable log captures
165    /// the inputs the saga was constructed with, not its evolving
166    /// state — that lives in step rows.
167    fn input_snapshot(&self) -> serde_json::Value {
168        serde_json::json!({ "promoted_label": self.promoted_label })
169    }
170
171    fn start(&mut self, _ctx: &SagaCtx) -> SagaAction {
172        // Step 2 — issue the SpawnPoolWindow command, transition into
173        // the wait state. F.5 routes the action to `PipeTarget::Host`
174        // for forward compatibility with the cross-process dispatch
175        // follow-up; today the coordinator logs the IssueCmd and
176        // doesn't actually transmit it on a launcher→host pipe (no
177        // such pipe exists yet — see module docstring).
178        debug_assert_eq!(self.phase, Phase::Initial);
179        self.phase = Phase::WaitingForRefill;
180        // CPD-1 schema-only: the wire shape now carries saga_id, but
181        // the saga's `apply_action` for `IssueCmd::Host` is still
182        // log-only (CPD-3 wires real dispatch). We pass `0` here as
183        // a placeholder; CPD-3's `inject_saga_id()` helper rewrites
184        // it to the live saga's id at dispatch time.
185        SagaAction::IssueCmd {
186            target: PipeTarget::Host,
187            cmd: Command::SpawnPoolWindow { saga_id: 0 },
188        }
189    }
190
191    fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction {
192        // Only pivot on `PoolWindowAdded`. Every other event the bus
193        // carries is unrelated to this saga's terminal condition.
194        // A coordinator that drove this saga to Done because of an
195        // unrelated event would fire the `SagaCompleted` bracket
196        // before refill actually finished — the renderer's buffered
197        // state would flush prematurely.
198        //
199        // **CPD-4 — per-saga event correlation.** Filter
200        // `PoolWindowAdded` by `saga_id`: only the event tagged with
201        // *this* saga's id advances us. Untagged events (organic pool
202        // refills) and events from sibling concurrent sagas are
203        // ignored. Retires the evict-and-replace workaround from
204        // PR #634 — concurrent same-kind sagas now coexist without
205        // false-positive `SagaCompleted` cross-talk.
206        match (&self.phase, event) {
207            (Phase::WaitingForRefill, Event::PoolWindowAdded { label, saga_id, .. })
208                if *saga_id == Some(ctx.saga_id) =>
209            {
210                // The refill produced a new label. Record + complete.
211                // (Spec § 7.1: Step 3 is just `Done`; no further
212                // dispatch.)
213                self.refilled_label = Some(label.clone());
214                SagaAction::Done
215            }
216            // Still waiting; or — for `Initial` — coordinator
217            // hasn't called `start` yet; or the event's saga_id
218            // doesn't match (concurrent saga or organic refill).
219            // Either way: no-op.
220            _ => SagaAction::Wait,
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use std::sync::Arc;
229
230    fn ctx(saga_id: u64) -> SagaCtx {
231        SagaCtx { saga_id }
232    }
233
234    #[test]
235    fn start_issues_spawn_pool_window_to_host() {
236        let mut saga = PoolRespawn::new("window-pool-abc".into());
237        let action = saga.start(&ctx(7));
238        match action {
239            SagaAction::IssueCmd { target, cmd } => {
240                assert_eq!(target, PipeTarget::Host);
241                assert!(matches!(cmd, Command::SpawnPoolWindow { .. }));
242            }
243            other => panic!(
244                "expected IssueCmd(SpawnPoolWindow) on start, got {:?}",
245                other
246            ),
247        }
248    }
249
250    #[test]
251    fn pool_window_added_completes_the_saga() {
252        // CPD-4 — saga_id-tagged event for *this* saga advances it.
253        let mut saga = PoolRespawn::new("window-pool-abc".into());
254        let _ = saga.start(&ctx(7));
255        let action = saga.on_event(
256            &Event::PoolWindowAdded {
257                label: "window-pool-xyz".into(),
258                version: 100,
259                saga_id: Some(7),
260            },
261            &ctx(7),
262        );
263        assert!(matches!(action, SagaAction::Done));
264        assert_eq!(saga.refilled_label(), Some("window-pool-xyz"));
265    }
266
267    /// CPD-4 — `PoolWindowAdded` with `saga_id: None` (organic refill,
268    /// no saga in flight on the host side) does NOT terminate the
269    /// saga. Pre-CPD-4, ANY `PoolWindowAdded` would complete the saga;
270    /// per-saga correlation now scopes terminal events to the
271    /// originating saga only.
272    #[test]
273    fn organic_pool_window_added_does_not_complete_saga() {
274        let mut saga = PoolRespawn::new("window-pool-abc".into());
275        let _ = saga.start(&ctx(7));
276        let action = saga.on_event(
277            &Event::PoolWindowAdded {
278                label: "window-pool-organic".into(),
279                version: 100,
280                saga_id: None,
281            },
282            &ctx(7),
283        );
284        assert!(matches!(action, SagaAction::Wait));
285        assert!(saga.refilled_label().is_none());
286    }
287
288    /// CPD-4 — `PoolWindowAdded` tagged with a *different* saga_id
289    /// (sibling concurrent saga's echo) is ignored. This is the
290    /// invariant that retires evict-and-replace: two concurrent
291    /// PoolRespawn sagas can coexist, each consuming only its own
292    /// echo.
293    #[test]
294    fn pool_window_added_with_foreign_saga_id_is_ignored() {
295        let mut saga = PoolRespawn::new("window-pool-abc".into());
296        let _ = saga.start(&ctx(7));
297        let action = saga.on_event(
298            &Event::PoolWindowAdded {
299                label: "window-pool-sibling".into(),
300                version: 100,
301                saga_id: Some(8),
302            },
303            &ctx(7),
304        );
305        assert!(matches!(action, SagaAction::Wait));
306        assert!(saga.refilled_label().is_none());
307    }
308
309    #[test]
310    fn unrelated_events_keep_saga_waiting() {
311        let mut saga = PoolRespawn::new("window-pool-abc".into());
312        let _ = saga.start(&ctx(7));
313        let unrelated = Event::WindowOpened {
314            label: "main".into(),
315            kind: agentmux_common::ipc::WindowKind::FullInstance,
316            parent_label: None,
317            version: 50,
318        };
319        let action = saga.on_event(&unrelated, &ctx(7));
320        assert!(matches!(action, SagaAction::Wait));
321        assert!(saga.refilled_label().is_none());
322
323        // Pool removal alone (the original promote signal) doesn't
324        // complete the saga — only PoolWindowAdded does.
325        let pool_removed = Event::PoolWindowRemoved {
326            label: "window-pool-abc".into(),
327            version: 51,
328        };
329        let action = saga.on_event(&pool_removed, &ctx(7));
330        assert!(matches!(action, SagaAction::Wait));
331
332        // PoolWindowPromoted is the start trigger but the saga is
333        // already past that — coordinator only feeds events it
334        // *receives*; the saga MUST NOT mistake a stray promote for
335        // a refill.
336        let pool_promoted = Event::PoolWindowPromoted {
337            label: "window-pool-other".into(),
338            version: 52,
339        };
340        let action = saga.on_event(&pool_promoted, &ctx(7));
341        assert!(matches!(action, SagaAction::Wait));
342    }
343
344    #[test]
345    fn first_pool_window_added_after_start_wins() {
346        // The saga records the FIRST PoolWindowAdded *for its own
347        // saga_id* after start as the refill. CPD-4 — the coordinator
348        // dispatches the saga's IssueCmd with `saga_id` injected, so
349        // the host's matching report carries `saga_id: Some(N)`. The
350        // saga itself filters by `ctx.saga_id`; events with foreign
351        // ids (sibling concurrent sagas) are ignored.
352        let mut saga = PoolRespawn::new("window-pool-abc".into());
353        let _ = saga.start(&ctx(7));
354        let _ = saga.on_event(
355            &Event::PoolWindowAdded {
356                label: "window-pool-first".into(),
357                version: 100,
358                saga_id: Some(7),
359            },
360            &ctx(7),
361        );
362        // Second event after Done is irrelevant; saga is no longer
363        // in flight (coordinator removes it from the registry on
364        // Done). We don't model that here — just assert the first
365        // was captured.
366        assert_eq!(saga.refilled_label(), Some("window-pool-first"));
367    }
368
369    /// End-to-end coordinator integration: emit a promote, watch the
370    /// coordinator start the saga, observe the IssueCmd via log, emit
371    /// a synthetic PoolWindowAdded, watch the coordinator emit
372    /// SagaStarted then SagaCompleted bracket events on the bus.
373    #[tokio::test]
374    async fn coordinator_brackets_promote_with_saga_lifecycle_events() {
375        use crate::saga::{run_coordinator, SagaCoordinator};
376
377        let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
378        let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
379        let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
380
381        // Subscribe a witness BEFORE the coordinator subscribes so we
382        // observe both the input and the coordinator's emitted
383        // SagaStarted/SagaCompleted events.
384        let mut witness = events_tx.subscribe();
385        let coord_rx = events_tx.subscribe();
386        let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
387
388        // Yield so the coordinator's recv loop is parked on its
389        // first recv() before we publish the trigger.
390        tokio::task::yield_now().await;
391
392        // Step 1 — the promote signal kicks off the saga.
393        let _ = events_tx.send(Event::PoolWindowPromoted {
394            label: "window-pool-abc".into(),
395            version: 1,
396        });
397
398        // CPD-4: wait for SagaStarted to learn the coordinator-
399        // allocated saga_id before publishing the matching
400        // PoolWindowAdded. Pre-CPD-4 we could send `saga_id: None`
401        // because every PoolWindowAdded advanced every in-flight
402        // saga; under per-saga correlation the saga only consumes
403        // its own echo.
404        let mut saga_id_started: Option<u64> = None;
405        let mut saw_started = false;
406        let mut saw_completed_after_started = false;
407        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
408        while std::time::Instant::now() < deadline {
409            match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
410            {
411                Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
412                    assert_eq!(name, "pool_respawn_on_promote");
413                    saw_started = true;
414                    saga_id_started = Some(saga_id);
415                    // Step 2 — the saga waits for refill. Push a
416                    // synthetic PoolWindowAdded tagged with the saga's
417                    // allocated id (CPD-4 per-saga correlation).
418                    let _ = events_tx.send(Event::PoolWindowAdded {
419                        label: "window-pool-xyz".into(),
420                        version: 2,
421                        saga_id: Some(saga_id),
422                    });
423                }
424                Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
425                    if saw_started {
426                        assert_eq!(Some(saga_id), saga_id_started);
427                        saw_completed_after_started = true;
428                        break;
429                    }
430                }
431                Ok(Ok(_)) => {}
432                Ok(Err(_)) => break,
433                Err(_) => continue,
434            }
435        }
436        assert!(
437            saw_started,
438            "expected coordinator to emit Event::SagaStarted for pool_respawn_on_promote"
439        );
440        assert!(
441            saw_completed_after_started,
442            "expected coordinator to emit Event::SagaCompleted after SagaStarted"
443        );
444    }
445}