agentmux_launcher\saga/pool_respawn.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase F.5 — pool-respawn-on-promote saga.
5//
6// **What this saga does**
7//
8// When the host promotes a pool window to a user-visible top-level
9// window (the `promote_pool_window` flow in
10// `agentmux-cef::commands::window_pool`), the host immediately
11// calls `spawn_pool_window` to refill the pool. Today that refill
12// fires implicitly between `Event::PoolWindowPromoted` and
13// `Event::PoolWindowAdded` — no saga lifecycle event marks the
14// transaction. Renderers that want to buffer "you're getting a
15// tear-off + the pool is refilling" atomically have nothing to
16// pivot on.
17//
18// This saga formalizes the implicit flow as an explicit cross-
19// process state machine so the renderer sees a `SagaStarted` /
20// `SagaCompleted` bracket. Implementing the spec sketch from
21// `docs/specs/SPEC_PHASE_F_HOST_REDUCER_2026-05-01.md` §7.1:
22//
23// saga PoolRespawnOnPromote(promoted_label):
24// Step 1 — start: emit Event::PoolWindowPromoted { promoted_label }
25// (already exists in Phase B; F.5 adds the wire
26// variant + host-side report.)
27// Step 2 — issue Command::SpawnPoolWindow → host
28// wait for: Event::PoolWindowAdded { new_label }
29// Step 3 — Done
30//
31// Compensation: none (failure to refill is logged + retried
32// on next promote).
33//
34// **Scope of F.5 (this PR)**
35//
36// F.5 ships:
37// - the saga state machine itself (this file),
38// - the wire types (`Command::ReportPoolWindowPromoted`,
39// `Command::SpawnPoolWindow`, `Event::PoolWindowPromoted`),
40// - host-side `report_pool_window_promoted` call inside
41// `promote_pool_window` (between the remove and the open),
42// - launcher reducer arm translating the report into the typed
43// event,
44// - coordinator wiring that registers the saga as a consumer of
45// `Event::PoolWindowPromoted`.
46//
47// F.5 does NOT ship:
48// - F.6 window-cleanup cascade saga.
49// - Launcher-side saga durability (separate concern; srv-side
50// durability already shipped).
51//
52// **CPD-3 update.** Step 2's `SagaAction::IssueCmd` is now LIVE: the
53// coordinator dispatches `Command::SpawnPoolWindow { saga_id }`
54// through `HostPipe::send_command()` (see
55// `agentmux-launcher/src/saga/mod.rs::apply_action`). The saga is
56// structurally identical — what changed is the coordinator's
57// `IssueCmd::Host` arm. The saga is no longer a passive narrator of
58// the host's implicit refill: it now causally drives it.
59//
60// **Why a saga at all if Step 2 is currently passive?**
61//
62// The renderer-visible value is the `SagaStarted` / `SagaCompleted`
63// bracket: subscribers see "saga foo running" and can buffer
64// related events for that saga_id until the bracket closes. That
65// works the same way whether the launcher actively dispatched the
66// refill or merely observed the host doing it. When the cross-
67// process pipe lands, only the saga's `IssueCmd` handling needs
68// to change — the renderer-facing semantics are stable.
69//
70// **What if `Event::PoolWindowAdded` never arrives?**
71//
72// The saga waits forever until either the bus closes (saga
73// abandoned on launcher restart) or — once F.6+ adds saga
74// timeouts — a per-saga deadline force-fails it. F.5 keeps the
75// behavior deliberately simple: if refill genuinely fails, the
76// next promote will start a fresh saga; the prior failed saga
77// stays in `in_flight` until the launcher exits. This matches the
78// F-spec compensation strategy ("none — failure to refill is
79// logged + retried on next promote").
80//
81// **Known limitation: concurrent-promote correlation** (codex P1
82// PR #634). If two promotes are in flight simultaneously, the
83// coordinator broadcasts every event to every in-flight saga
84// (`saga::mod.rs::run_coordinator`). The first `PoolWindowAdded`
85// completes BOTH sagas — early `SagaCompleted` for the second
86// promote's bracket, later refill event left unbracketed.
87//
88// Concurrent promotes require the user to tear off two windows
89// in rapid succession (under the host's spawn_pool_window
90// completion latency). The user-visible consequence is a brief
91// renderer-side bracketing inconsistency; reducer + SQLite state
92// remain correct.
93//
94// Proper fix requires coordinator-level FIFO routing or per-saga
95// sequence-number correlation. Both are non-trivial and depend on
96// the cross-process dispatch landing first (so the saga's
97// `IssueCmd` is actually causal). Closed in F.6/F.7 alongside the
98// launcher→host command pipe.
99
100use agentmux_common::ipc::{Command, Event};
101
102use super::{PipeTarget, Saga, SagaAction, SagaCtx};
103
104/// State of one in-flight pool-respawn saga.
105#[derive(Debug, Clone, PartialEq, Eq)]
106enum Phase {
107 /// Saga just constructed — waiting for the coordinator to call
108 /// `start`. The first `start` call transitions to
109 /// `WaitingForRefill` and emits the `IssueCmd` for
110 /// `SpawnPoolWindow`.
111 Initial,
112 /// `SpawnPoolWindow` has been issued; the saga is now waiting for
113 /// any `Event::PoolWindowAdded` to land on the bus. The label
114 /// of the new pool entry doesn't need to match the promoted
115 /// label — the refill produces a *new* label, and any
116 /// `PoolWindowAdded` that lands AFTER the saga was constructed
117 /// is the refill we're tracking.
118 WaitingForRefill,
119}
120
121/// Pool-respawn saga: fires once per promote, waits for the matching
122/// refill, then completes.
123pub struct PoolRespawn {
124 /// Label of the window that was promoted (Step 1's input). Held
125 /// for log correlation only — the saga doesn't validate the
126 /// new pool label against this one. Kept on the struct so future
127 /// failure-mode logging (timeout / explicit fail) can include
128 /// "which promote did this saga belong to?" without rethreading.
129 /// LSD-2 — also surfaces in `--diag sagas` via `input_snapshot`.
130 promoted_label: String,
131 /// New pool label observed in `Event::PoolWindowAdded` (Step 2's
132 /// output). `None` until refill is observed.
133 refilled_label: Option<String>,
134 phase: Phase,
135}
136
137impl PoolRespawn {
138 /// Construct a fresh saga for a promote of `promoted_label`.
139 /// Coordinator allocates the saga_id and calls `start` once.
140 pub fn new(promoted_label: String) -> Self {
141 Self {
142 promoted_label,
143 refilled_label: None,
144 phase: Phase::Initial,
145 }
146 }
147
148 /// Label of the new pool window observed in `PoolWindowAdded`.
149 /// `None` until the saga has progressed past `WaitingForRefill`.
150 /// Exported for tests.
151 #[cfg(test)]
152 pub fn refilled_label(&self) -> Option<&str> {
153 self.refilled_label.as_deref()
154 }
155}
156
157impl Saga for PoolRespawn {
158 fn name(&self) -> &'static str {
159 "pool_respawn_on_promote"
160 }
161
162 /// LSD-2 — record the promote's source label for `--diag sagas`.
163 /// `refilled_label` is None at start (only known after Step 2's
164 /// echo lands) so we don't include it; the durable log captures
165 /// the inputs the saga was constructed with, not its evolving
166 /// state — that lives in step rows.
167 fn input_snapshot(&self) -> serde_json::Value {
168 serde_json::json!({ "promoted_label": self.promoted_label })
169 }
170
171 fn start(&mut self, _ctx: &SagaCtx) -> SagaAction {
172 // Step 2 — issue the SpawnPoolWindow command, transition into
173 // the wait state. F.5 routes the action to `PipeTarget::Host`
174 // for forward compatibility with the cross-process dispatch
175 // follow-up; today the coordinator logs the IssueCmd and
176 // doesn't actually transmit it on a launcher→host pipe (no
177 // such pipe exists yet — see module docstring).
178 debug_assert_eq!(self.phase, Phase::Initial);
179 self.phase = Phase::WaitingForRefill;
180 // CPD-1 schema-only: the wire shape now carries saga_id, but
181 // the saga's `apply_action` for `IssueCmd::Host` is still
182 // log-only (CPD-3 wires real dispatch). We pass `0` here as
183 // a placeholder; CPD-3's `inject_saga_id()` helper rewrites
184 // it to the live saga's id at dispatch time.
185 SagaAction::IssueCmd {
186 target: PipeTarget::Host,
187 cmd: Command::SpawnPoolWindow { saga_id: 0 },
188 }
189 }
190
191 fn on_event(&mut self, event: &Event, ctx: &SagaCtx) -> SagaAction {
192 // Only pivot on `PoolWindowAdded`. Every other event the bus
193 // carries is unrelated to this saga's terminal condition.
194 // A coordinator that drove this saga to Done because of an
195 // unrelated event would fire the `SagaCompleted` bracket
196 // before refill actually finished — the renderer's buffered
197 // state would flush prematurely.
198 //
199 // **CPD-4 — per-saga event correlation.** Filter
200 // `PoolWindowAdded` by `saga_id`: only the event tagged with
201 // *this* saga's id advances us. Untagged events (organic pool
202 // refills) and events from sibling concurrent sagas are
203 // ignored. Retires the evict-and-replace workaround from
204 // PR #634 — concurrent same-kind sagas now coexist without
205 // false-positive `SagaCompleted` cross-talk.
206 match (&self.phase, event) {
207 (Phase::WaitingForRefill, Event::PoolWindowAdded { label, saga_id, .. })
208 if *saga_id == Some(ctx.saga_id) =>
209 {
210 // The refill produced a new label. Record + complete.
211 // (Spec § 7.1: Step 3 is just `Done`; no further
212 // dispatch.)
213 self.refilled_label = Some(label.clone());
214 SagaAction::Done
215 }
216 // Still waiting; or — for `Initial` — coordinator
217 // hasn't called `start` yet; or the event's saga_id
218 // doesn't match (concurrent saga or organic refill).
219 // Either way: no-op.
220 _ => SagaAction::Wait,
221 }
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use std::sync::Arc;
229
230 fn ctx(saga_id: u64) -> SagaCtx {
231 SagaCtx { saga_id }
232 }
233
234 #[test]
235 fn start_issues_spawn_pool_window_to_host() {
236 let mut saga = PoolRespawn::new("window-pool-abc".into());
237 let action = saga.start(&ctx(7));
238 match action {
239 SagaAction::IssueCmd { target, cmd } => {
240 assert_eq!(target, PipeTarget::Host);
241 assert!(matches!(cmd, Command::SpawnPoolWindow { .. }));
242 }
243 other => panic!(
244 "expected IssueCmd(SpawnPoolWindow) on start, got {:?}",
245 other
246 ),
247 }
248 }
249
250 #[test]
251 fn pool_window_added_completes_the_saga() {
252 // CPD-4 — saga_id-tagged event for *this* saga advances it.
253 let mut saga = PoolRespawn::new("window-pool-abc".into());
254 let _ = saga.start(&ctx(7));
255 let action = saga.on_event(
256 &Event::PoolWindowAdded {
257 label: "window-pool-xyz".into(),
258 version: 100,
259 saga_id: Some(7),
260 },
261 &ctx(7),
262 );
263 assert!(matches!(action, SagaAction::Done));
264 assert_eq!(saga.refilled_label(), Some("window-pool-xyz"));
265 }
266
267 /// CPD-4 — `PoolWindowAdded` with `saga_id: None` (organic refill,
268 /// no saga in flight on the host side) does NOT terminate the
269 /// saga. Pre-CPD-4, ANY `PoolWindowAdded` would complete the saga;
270 /// per-saga correlation now scopes terminal events to the
271 /// originating saga only.
272 #[test]
273 fn organic_pool_window_added_does_not_complete_saga() {
274 let mut saga = PoolRespawn::new("window-pool-abc".into());
275 let _ = saga.start(&ctx(7));
276 let action = saga.on_event(
277 &Event::PoolWindowAdded {
278 label: "window-pool-organic".into(),
279 version: 100,
280 saga_id: None,
281 },
282 &ctx(7),
283 );
284 assert!(matches!(action, SagaAction::Wait));
285 assert!(saga.refilled_label().is_none());
286 }
287
288 /// CPD-4 — `PoolWindowAdded` tagged with a *different* saga_id
289 /// (sibling concurrent saga's echo) is ignored. This is the
290 /// invariant that retires evict-and-replace: two concurrent
291 /// PoolRespawn sagas can coexist, each consuming only its own
292 /// echo.
293 #[test]
294 fn pool_window_added_with_foreign_saga_id_is_ignored() {
295 let mut saga = PoolRespawn::new("window-pool-abc".into());
296 let _ = saga.start(&ctx(7));
297 let action = saga.on_event(
298 &Event::PoolWindowAdded {
299 label: "window-pool-sibling".into(),
300 version: 100,
301 saga_id: Some(8),
302 },
303 &ctx(7),
304 );
305 assert!(matches!(action, SagaAction::Wait));
306 assert!(saga.refilled_label().is_none());
307 }
308
309 #[test]
310 fn unrelated_events_keep_saga_waiting() {
311 let mut saga = PoolRespawn::new("window-pool-abc".into());
312 let _ = saga.start(&ctx(7));
313 let unrelated = Event::WindowOpened {
314 label: "main".into(),
315 kind: agentmux_common::ipc::WindowKind::FullInstance,
316 parent_label: None,
317 version: 50,
318 };
319 let action = saga.on_event(&unrelated, &ctx(7));
320 assert!(matches!(action, SagaAction::Wait));
321 assert!(saga.refilled_label().is_none());
322
323 // Pool removal alone (the original promote signal) doesn't
324 // complete the saga — only PoolWindowAdded does.
325 let pool_removed = Event::PoolWindowRemoved {
326 label: "window-pool-abc".into(),
327 version: 51,
328 };
329 let action = saga.on_event(&pool_removed, &ctx(7));
330 assert!(matches!(action, SagaAction::Wait));
331
332 // PoolWindowPromoted is the start trigger but the saga is
333 // already past that — coordinator only feeds events it
334 // *receives*; the saga MUST NOT mistake a stray promote for
335 // a refill.
336 let pool_promoted = Event::PoolWindowPromoted {
337 label: "window-pool-other".into(),
338 version: 52,
339 };
340 let action = saga.on_event(&pool_promoted, &ctx(7));
341 assert!(matches!(action, SagaAction::Wait));
342 }
343
344 #[test]
345 fn first_pool_window_added_after_start_wins() {
346 // The saga records the FIRST PoolWindowAdded *for its own
347 // saga_id* after start as the refill. CPD-4 — the coordinator
348 // dispatches the saga's IssueCmd with `saga_id` injected, so
349 // the host's matching report carries `saga_id: Some(N)`. The
350 // saga itself filters by `ctx.saga_id`; events with foreign
351 // ids (sibling concurrent sagas) are ignored.
352 let mut saga = PoolRespawn::new("window-pool-abc".into());
353 let _ = saga.start(&ctx(7));
354 let _ = saga.on_event(
355 &Event::PoolWindowAdded {
356 label: "window-pool-first".into(),
357 version: 100,
358 saga_id: Some(7),
359 },
360 &ctx(7),
361 );
362 // Second event after Done is irrelevant; saga is no longer
363 // in flight (coordinator removes it from the registry on
364 // Done). We don't model that here — just assert the first
365 // was captured.
366 assert_eq!(saga.refilled_label(), Some("window-pool-first"));
367 }
368
369 /// End-to-end coordinator integration: emit a promote, watch the
370 /// coordinator start the saga, observe the IssueCmd via log, emit
371 /// a synthetic PoolWindowAdded, watch the coordinator emit
372 /// SagaStarted then SagaCompleted bracket events on the bus.
373 #[tokio::test]
374 async fn coordinator_brackets_promote_with_saga_lifecycle_events() {
375 use crate::saga::{run_coordinator, SagaCoordinator};
376
377 let (events_tx, _) = tokio::sync::broadcast::channel::<Event>(64);
378 let state = Arc::new(tokio::sync::Mutex::new(crate::state::State::default()));
379 let coord = Arc::new(SagaCoordinator::new(events_tx.clone(), Arc::clone(&state)));
380
381 // Subscribe a witness BEFORE the coordinator subscribes so we
382 // observe both the input and the coordinator's emitted
383 // SagaStarted/SagaCompleted events.
384 let mut witness = events_tx.subscribe();
385 let coord_rx = events_tx.subscribe();
386 let _handle = tokio::spawn(run_coordinator(Arc::clone(&coord), coord_rx));
387
388 // Yield so the coordinator's recv loop is parked on its
389 // first recv() before we publish the trigger.
390 tokio::task::yield_now().await;
391
392 // Step 1 — the promote signal kicks off the saga.
393 let _ = events_tx.send(Event::PoolWindowPromoted {
394 label: "window-pool-abc".into(),
395 version: 1,
396 });
397
398 // CPD-4: wait for SagaStarted to learn the coordinator-
399 // allocated saga_id before publishing the matching
400 // PoolWindowAdded. Pre-CPD-4 we could send `saga_id: None`
401 // because every PoolWindowAdded advanced every in-flight
402 // saga; under per-saga correlation the saga only consumes
403 // its own echo.
404 let mut saga_id_started: Option<u64> = None;
405 let mut saw_started = false;
406 let mut saw_completed_after_started = false;
407 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
408 while std::time::Instant::now() < deadline {
409 match tokio::time::timeout(std::time::Duration::from_millis(50), witness.recv()).await
410 {
411 Ok(Ok(Event::SagaStarted { saga_id, name, .. })) => {
412 assert_eq!(name, "pool_respawn_on_promote");
413 saw_started = true;
414 saga_id_started = Some(saga_id);
415 // Step 2 — the saga waits for refill. Push a
416 // synthetic PoolWindowAdded tagged with the saga's
417 // allocated id (CPD-4 per-saga correlation).
418 let _ = events_tx.send(Event::PoolWindowAdded {
419 label: "window-pool-xyz".into(),
420 version: 2,
421 saga_id: Some(saga_id),
422 });
423 }
424 Ok(Ok(Event::SagaCompleted { saga_id, .. })) => {
425 if saw_started {
426 assert_eq!(Some(saga_id), saga_id_started);
427 saw_completed_after_started = true;
428 break;
429 }
430 }
431 Ok(Ok(_)) => {}
432 Ok(Err(_)) => break,
433 Err(_) => continue,
434 }
435 }
436 assert!(
437 saw_started,
438 "expected coordinator to emit Event::SagaStarted for pool_respawn_on_promote"
439 );
440 assert!(
441 saw_completed_after_started,
442 "expected coordinator to emit Event::SagaCompleted after SagaStarted"
443 );
444 }
445}