agentmux_srv\sagas/
mod.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.5.5 — srv-side saga coordinator.
5//
6// **Why srv, not launcher:** the existing E.1a coordinator
7// framework lives in `agentmux-launcher::saga` (which still does
8// nothing — no consumers). The original Phase E spec assumed
9// sagas would fan out across host/launcher/srv via cross-process
10// IPC; the actual implementation kept that fan-out in the frontend
11// (`requestTearOff` calls srv-rpc and host-rpc directly), so every
12// saga in the E.5 plan mutates only srv state. In-process oneshot
13// dispatch beats an IPC round-trip on every saga step. See
14// `docs/retro/saga-coordinator-location-analysis-2026-04-30.md` for
15// the full reasoning, including the robustness trade-offs (which
16// are the same for both placements).
17//
18// **Shape:** sagas are async functions that:
19//   1. allocate a fresh saga_id via `alloc_saga_id`,
20//   2. emit `Event::SagaStarted` via `emit_saga_started`,
21//   3. drive their state machine via `SagaCtx::dispatch` /
22//      `SagaCtx::compensate`,
23//   4. emit `Event::SagaCompleted` or `Event::SagaFailed` via
24//      `emit_terminal` once the inner work returns.
25//
26// `run_saga(state, name, future)` is a thin wrapper that does
27// 1+2+4 + applies a 5 s timeout. Sagas pass the future directly
28// (not a closure), avoiding the lifetime-of-SagaCtx complication
29// that closure-style coordinators run into.
30//
31// **Compensation:** the saga's inner future is responsible for
32// driving compensation before returning `Err`. `SagaCtx::compensate`
33// is a best-effort dispatch that swallows errors (the saga is
34// already failing; secondary failures get logged). Idempotency of
35// compensating commands (`MoveTab` back to source, `DeleteWorkspace`,
36// etc.) keeps the cleanup safe even if a step partially applied.
37//
38// What this module does NOT close (per the location analysis §4.2):
39// * Per-step SQLite transactions in the subscriber (gap; F1.A).
40// * Host pool-promote and renderer registration outside the saga
41//   (gap; Phase F).
42// * Saga state across srv restart (gap; Phase F+).
43
44pub mod delete_block;
45pub mod delete_tab;
46pub mod delete_workspace;
47pub mod log;
48pub mod promote_block_to_tab;
49pub mod recovery;
50pub mod restore_torn_off_tab;
51pub mod tear_off_block;
52pub mod tear_off_tab;
53
54// Step 7 — E.7 integration tests. Cross-saga end-to-end coverage
55// that exercises reducer + saga coordinator + persist subscriber +
56// saga log together against a real `AppState` (in-memory wstore +
57// sagalog). Per-saga unit tests under each saga module already cover
58// happy + reject paths in isolation; this module focuses on
59// multi-surface consistency (reducer/wstore/saga-log) that PR 2's
60// `compensate_unresolved` will rely on.
61#[cfg(test)]
62mod integration_tests;
63
64use std::sync::atomic::{AtomicU32, Ordering};
65
66use agentmux_common::ipc::{Command, Event};
67use serde_json::Value;
68
69use crate::sagas::log::{command_discriminant_name, SagaOutcome};
70use crate::server::AppState;
71
72/// Maximum wall-clock time a saga is allowed to run before the
73/// coordinator force-fails it. Tear-off sagas should complete in
74/// tens of milliseconds; the budget is generous to absorb SQLite
75/// write spikes without flapping in CI.
76const SAGA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
77
78/// Read-only context passed to a saga's inner async function.
79/// Wraps the AppState handle and the saga's allocated id.
80///
81/// Construct via [`SagaCtx::new`] — the durability log requires the
82/// per-step counter to start at zero and be owned by the ctx (so
83/// concurrent sagas don't interleave step indices).
84pub struct SagaCtx<'a> {
85    pub(crate) state: &'a AppState,
86    pub(crate) saga_id: u64,
87    /// Monotonic step index (0, 1, 2, ...) for this saga. Each
88    /// `dispatch` / `compensate` call `fetch_add(1)`s and writes the
89    /// resulting index into the saga log. Atomic because saga inner
90    /// futures may parallelise dispatches in the future (today they
91    /// don't, but the cost is one cache line).
92    pub(crate) step_index: AtomicU32,
93    /// (codex P1 PR #636 round 4.) Stack of forward-step indices that
94    /// have completed successfully and are eligible to be undone by
95    /// the next `compensate` call. `dispatch` pushes on success;
96    /// `compensate` pops to determine which original forward step
97    /// it's reversing, and marks that step `compensated` in the log.
98    /// Without this, in-process compensation only writes new
99    /// `compensated` rows at fresh indices; the original `succeeded`
100    /// rows stay `succeeded`, so resume-on-restart re-replays them
101    /// and either no-ops or worse double-applies the inverse.
102    ///
103    /// `Mutex<Vec>` (rather than a lock-free counter) because saga
104    /// inner futures could in theory parallelize compensations; in
105    /// practice they're serial today, so contention is zero.
106    pub(crate) forward_step_stack: tokio::sync::Mutex<Vec<u32>>,
107}
108
109impl<'a> SagaCtx<'a> {
110    /// Construct a fresh context for a saga that has just allocated
111    /// its `saga_id` (via [`alloc_saga_id`]).
112    pub fn new(state: &'a AppState, saga_id: u64) -> Self {
113        Self {
114            state,
115            saga_id,
116            step_index: AtomicU32::new(0),
117            forward_step_stack: tokio::sync::Mutex::new(Vec::new()),
118        }
119    }
120
121    /// Saga-id this context belongs to. Used by sagas that need to
122    /// log progress with the saga prefix.
123    #[allow(dead_code)]
124    pub fn saga_id(&self) -> u64 {
125        self.saga_id
126    }
127
128    /// Acquire the reducer's state lock for read-only inspection.
129    /// Used by sagas that need to inspect post-step state to decide
130    /// the next step (e.g. RestoreTornOffTab checking whether the
131    /// source workspace is now empty before issuing the cascade
132    /// delete). Hold briefly — the reducer is single-mutex.
133    pub async fn state_lock(&self) -> tokio::sync::MutexGuard<'_, crate::state::State> {
134        self.state.srv_state.lock().await
135    }
136
137    /// Dispatch `cmd` through the srv reducer and apply the emitted
138    /// events to SQLite + the broadcast bus, exactly like the
139    /// in-handler reducer-dispatch helpers.
140    ///
141    /// Returns the emitted event vec on success. If the reducer
142    /// emits any `Event::Error`, the error message is returned and
143    /// SQLite/bus side-effects are skipped — the caller must then
144    /// dispatch compensation for the saga's already-applied steps.
145    pub async fn dispatch(&self, cmd: Command) -> Result<Vec<Event>, String> {
146        // Saga durability — write a `pending` step row before
147        // dispatch so a crash mid-dispatch leaves a recoverable
148        // breadcrumb (PR 2's compensate-on-restart will see it).
149        let idx = self.step_index.fetch_add(1, Ordering::Relaxed);
150        let step_name = command_discriminant_name(&cmd);
151        if let Err(e) = self
152            .state
153            .saga_log
154            .start_step(self.saga_id, idx, &step_name, &cmd)
155        {
156            // Log-write failure is non-fatal: the in-memory saga
157            // path is still authoritative for THIS srv run; we lose
158            // crash-recovery for this step, but the user's command
159            // shouldn't fail because durability hiccupped.
160            tracing::warn!(
161                saga_id = self.saga_id,
162                step_index = idx,
163                "[saga] start_step log write failed: {} — continuing without durable log for this step",
164                e
165            );
166        }
167
168        let events = crate::server::service::dispatch_to_reducer(self.state, cmd).await;
169        if let Some(message) = events.iter().find_map(|e| match e {
170            Event::Error { message, .. } => Some(message.clone()),
171            _ => None,
172        }) {
173            if let Err(e) = self.state.saga_log.fail_step(self.saga_id, idx, &message) {
174                tracing::warn!(
175                    saga_id = self.saga_id,
176                    step_index = idx,
177                    "[saga] fail_step log write failed: {}",
178                    e
179                );
180            }
181            return Err(message);
182        }
183        for ev in &events {
184            if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, &self.state.wstore)
185            {
186                // (reagent P1 PR #631 round 2) Mark the step as
187                // failed in the durable log BEFORE returning. Without
188                // this, the step row stays in `pending` state even
189                // though the reducer already applied the command
190                // (line 139); PR 2's compensate-on-restart sees a
191                // `pending` step and can't determine whether the
192                // command was applied.
193                let err_msg = e.to_string();
194                if let Err(log_err) =
195                    self.state.saga_log.fail_step(self.saga_id, idx, &err_msg)
196                {
197                    tracing::warn!(
198                        saga_id = self.saga_id,
199                        step_index = idx,
200                        "[saga] fail_step log write failed during wstore-apply error path: {}",
201                        log_err,
202                    );
203                }
204                return Err(err_msg);
205            }
206        }
207        if let Err(e) = self.state.saga_log.finish_step(self.saga_id, idx, &events) {
208            tracing::warn!(
209                saga_id = self.saga_id,
210                step_index = idx,
211                "[saga] finish_step log write failed: {}",
212                e
213            );
214        }
215        // (codex P1 PR #636 round 4.) Track this idx as a successful
216        // forward step eligible for compensation. The next
217        // `compensate` call will pop this and mark the original step
218        // `compensated`, preventing resume-on-restart from re-replaying
219        // an inverse that already ran in-process.
220        self.forward_step_stack.lock().await.push(idx);
221        crate::server::service::publish_events(self.state, &events);
222        Ok(events)
223    }
224
225    /// Best-effort compensating dispatch. Same as `dispatch` but
226    /// SQLite-write failures are logged and swallowed. Intended for
227    /// the unwind path: the saga is already returning an error to
228    /// the caller; throwing on cleanup hides the original cause and
229    /// prevents subsequent compensating commands from running.
230    pub async fn compensate(&self, cmd: Command) {
231        // Compensation gets its own step row so the durable log
232        // distinguishes "step that succeeded forward" from "step
233        // that ran in unwind". Index continues monotonically from
234        // forward steps so `--diag sagas` shows the full sequence.
235        let idx = self.step_index.fetch_add(1, Ordering::Relaxed);
236        let step_name = command_discriminant_name(&cmd);
237        if let Err(e) = self
238            .state
239            .saga_log
240            .start_step(self.saga_id, idx, &step_name, &cmd)
241        {
242            tracing::warn!(
243                saga_id = self.saga_id,
244                step_index = idx,
245                "[saga] compensate start_step log write failed: {}",
246                e
247            );
248        }
249        let events =
250            crate::server::service::dispatch_to_reducer(self.state, cmd.clone()).await;
251        if let Some(message) = events.iter().find_map(|e| match e {
252            Event::Error { message, .. } => Some(message.clone()),
253            _ => None,
254        }) {
255            tracing::warn!(
256                saga_id = self.saga_id,
257                "[saga] compensation rejected by reducer: {} (cmd discriminant: {:?})",
258                message,
259                std::mem::discriminant(&cmd),
260            );
261            if let Err(e) = self.state.saga_log.fail_step(self.saga_id, idx, &message) {
262                tracing::warn!(
263                    saga_id = self.saga_id,
264                    step_index = idx,
265                    "[saga] compensate fail_step log write failed: {}",
266                    e
267                );
268            }
269            return;
270        }
271        for ev in &events {
272            if let Err(e) =
273                crate::persist_subscriber::apply_event_to_wstore(ev, &self.state.wstore)
274            {
275                tracing::warn!(
276                    saga_id = self.saga_id,
277                    "[saga] compensation: SQLite write failed: {}",
278                    e
279                );
280            }
281        }
282        if let Err(e) = self
283            .state
284            .saga_log
285            .compensate_step(self.saga_id, idx, &events)
286        {
287            tracing::warn!(
288                saga_id = self.saga_id,
289                step_index = idx,
290                "[saga] compensate_step log write failed: {}",
291                e
292            );
293        }
294        // (codex P1 PR #636 round 4.) Pop the most-recent successful
295        // forward step from the stack and mark its original log row
296        // as compensated. This prevents resume-on-restart from
297        // double-replaying the inverse of a step that already had
298        // in-process compensation. Idempotent — UPDATE only matches
299        // rows still in `succeeded` state.
300        if let Some(forward_idx) = self.forward_step_stack.lock().await.pop() {
301            if let Err(e) = self
302                .state
303                .saga_log
304                .mark_step_compensated(self.saga_id, forward_idx)
305            {
306                tracing::warn!(
307                    saga_id = self.saga_id,
308                    forward_step_index = forward_idx,
309                    "[saga] mark_step_compensated (live) log write failed: {} — restart may re-replay this inverse",
310                    e
311                );
312            }
313        }
314        crate::server::service::publish_events(self.state, &events);
315    }
316}
317
318/// Allocate the next saga_id. Monotonic per srv-process run.
319pub fn alloc_saga_id(state: &AppState) -> u64 {
320    state.saga_id_alloc.fetch_add(1, Ordering::Relaxed) + 1
321}
322
323/// Emit `Event::SagaStarted` for a freshly-allocated saga_id.
324/// Sagas call this immediately after `alloc_saga_id` so subscribers
325/// see the start record before any per-step events.
326///
327/// Also writes a `running` row to the durable saga log (PR 1 of
328/// SPEC_SAGA_DURABILITY_2026-05-01.md), recording `input` as the
329/// saga's arguments serialized to JSON. PR 2's `compensate_unresolved`
330/// + `--diag sagas` rely on this for crash-recovery provenance, so
331/// callers should pass a structured representation of their inputs
332/// (typically `serde_json::json!({...})`). (reagent P1 PR #631 —
333/// `Value::Null` placeholder erased provenance.)
334///
335/// **Fail-fast on log error.** (codex P1 PR #631 round 2.) If
336/// `start_saga` fails — most likely a UNIQUE constraint violation
337/// from a saga_id collision — the saga MUST NOT proceed. Otherwise
338/// later `terminate()` calls would `UPDATE saga SET ... WHERE saga_id=?`
339/// against a *different run's* row, mixing lifecycle data across
340/// sagas and silently corrupting the durability log. Returning
341/// `Err` here propagates up to the caller, which records the
342/// failure via `emit_terminal` (with a fresh saga_id allocated by
343/// the caller's `alloc_saga_id` retry path, if any).
344pub async fn emit_saga_started(
345    state: &AppState,
346    saga_id: u64,
347    name: &'static str,
348    input: serde_json::Value,
349) -> Result<(), String> {
350    if let Err(e) = state.saga_log.start_saga(saga_id, name, &input) {
351        let msg = format!(
352            "saga durable start row insert failed for saga_id={}: {} (likely ID collision; refusing to run)",
353            saga_id, e
354        );
355        tracing::error!(
356            saga_id,
357            name,
358            "[saga] {} — aborting saga to avoid corrupting prior run's lifecycle row",
359            msg,
360        );
361        return Err(msg);
362    }
363    let v = state.srv_state.lock().await.bump_version();
364    let _ = state.srv_events_tx.send(Event::SagaStarted {
365        saga_id,
366        name: name.to_string(),
367        version: v,
368    });
369    Ok(())
370}
371
372/// Outcome a saga's inner future hands back to `emit_terminal`.
373///
374/// (codex P1 PR #631) The original PR 1 implementation mapped every
375/// `Err` to `SagaOutcome::Compensated`, which is wrong for timeout/
376/// abort paths: `run_saga` wraps the inner future in
377/// `tokio::time::timeout`, and a timeout cancels the future *before*
378/// it can run its compensation block. Recording "compensated" when
379/// nothing was compensated would hide partially-applied state from
380/// PR 2's `compensate_unresolved` resume scan — exactly the failure
381/// mode this log exists to catch.
382///
383/// `Compensated` should only be recorded when compensation actually
384/// completed; everything else (timeout, panic-converted-to-error,
385/// pre-compensation early-return) records `Failed`, which leaves the
386/// saga visible to PR 2's resume scan.
387#[derive(Debug)]
388pub enum SagaTerminal<'a> {
389    /// All steps applied successfully.
390    Completed,
391    /// Compensation block ran to completion. Caller asserts this only
392    /// after every compensating dispatch returned without error.
393    Compensated { reason: &'a str },
394    /// Saga aborted before/during compensation: timeout, panic,
395    /// pre-compensation early-return, or any other path where
396    /// compensation can't be assumed to have run. Default for the
397    /// "I don't know if compensation completed" case.
398    Failed { reason: &'a str },
399}
400
401/// Emit the saga's terminal lifecycle event + durable log row.
402///
403/// Maps `SagaTerminal` to:
404/// - `Completed` → `Event::SagaCompleted` + log state `completed`.
405/// - `Compensated { reason }` → `Event::SagaFailed { reason }` + log state `compensated`.
406/// - `Failed { reason }` → `Event::SagaFailed { reason }` + log state `failed`.
407///
408/// The renderer-facing event is the same `SagaFailed` for both
409/// non-success paths (the renderer doesn't currently distinguish).
410/// The durable log distinguishes — PR 2's resume scan picks up
411/// `failed` rows where compensation may not have run.
412pub async fn emit_terminal(state: &AppState, saga_id: u64, terminal: SagaTerminal<'_>) {
413    let log_outcome = match &terminal {
414        SagaTerminal::Completed => SagaOutcome::Completed,
415        SagaTerminal::Compensated { reason } => SagaOutcome::Compensated {
416            reason: reason.to_string(),
417        },
418        SagaTerminal::Failed { reason } => SagaOutcome::Failed {
419            reason: reason.to_string(),
420        },
421    };
422    // (codex P1 PR #636 round 7 — reverted from round 6.)
423    // Bulk-mark only on Compensated. Round 6 extended to Failed too,
424    // but BOTH bots flagged that as data-loss: timeout/abort paths
425    // classify as Failed and never run compensation, but the bulk-
426    // mark would relabel forward steps as `compensated`, hiding
427    // them from recovery and leaving side effects permanently
428    // applied.
429    //
430    // Sagas that DO unwind via inner-future ctx.compensate calls
431    // should classify as Compensated (the per-step pop already
432    // marks 1:1; this bulk call catches residual 1:N cases like
433    // tear_off_block's single DeleteWorkspace undoing both
434    // CreateWorkspace + CreateTab). `classify_run_saga_result`
435    // maps non-timeout Err → Compensated to support this; timeouts
436    // → Failed so recovery picks up un-undone rows.
437    if matches!(terminal, SagaTerminal::Compensated { .. }) {
438        if let Err(e) = state.saga_log.mark_all_succeeded_steps_compensated(saga_id) {
439            tracing::warn!(
440                saga_id,
441                "[saga] mark_all_succeeded_steps_compensated failed: {} — restart may re-replay an inverse",
442                e
443            );
444        }
445    }
446    if let Err(e) = state.saga_log.terminate(saga_id, log_outcome) {
447        tracing::warn!(
448            saga_id,
449            "[saga] terminate log write failed: {} — saga lifecycle row will look 'running' to PR 2's resume scan, which will then compensate it",
450            e
451        );
452    }
453    let v = state.srv_state.lock().await.bump_version();
454    let event = match terminal {
455        SagaTerminal::Completed => Event::SagaCompleted {
456            saga_id,
457            version: v,
458        },
459        SagaTerminal::Compensated { reason } | SagaTerminal::Failed { reason } => {
460            Event::SagaFailed {
461                saga_id,
462                reason: reason.to_string(),
463                version: v,
464            }
465        }
466    };
467    let _ = state.srv_events_tx.send(event);
468}
469
470/// Convenience: classify the standard `run_saga` `Result<Value, String>`
471/// outcome into a `SagaTerminal`.
472///
473/// - `Ok(_)` → `Completed`.
474/// - `Err(_)` → `Failed`.
475///
476/// (codex P1 PR #631 round 2.) The earlier round mapped non-timeout
477/// `Err` to `Compensated` on the assumption that "our sagas drive
478/// compensation in their inner future before returning `Err`."
479/// That's true for the *forward* dispatch failures, but
480/// `SagaCtx::compensate` is **best-effort** — if a compensating
481/// dispatch is itself rejected by the reducer, `compensate` logs a
482/// warning and returns without signaling failure. Marking those as
483/// `Compensated` would hide partially-applied state from PR 2's
484/// restart recovery (which scans for `running`/`failed` to know what
485/// to compensate).
486///
487/// Conservative default: classify all errors as `Failed`. Sagas that
488/// can *prove* compensation succeeded (e.g. a future per-step
489/// compensation-success log) construct `SagaTerminal::Compensated`
490/// directly without going through this helper.
491pub fn classify_run_saga_result(result: &Result<serde_json::Value, String>) -> SagaTerminal<'_> {
492    match result {
493        Ok(_) => SagaTerminal::Completed,
494        // Timeouts/aborts: compensation never ran (run_saga's
495        // tokio::time::timeout cancels the inner future before it
496        // can compensate). Classify as Failed so recovery picks up
497        // the un-undone forward steps.
498        Err(reason) if reason.contains("timed out") => SagaTerminal::Failed { reason },
499        // Other Err: by convention, our sagas drive compensation
500        // in their inner future before returning Err (each
501        // ctx.compensate call already marked its target). Classify
502        // as Compensated so emit_terminal's bulk-mark cleans up any
503        // residual succeeded rows from 1:N compensation patterns
504        // (e.g. tear_off_block's single DeleteWorkspace undoing
505        // multiple CreateX steps). Sagas that abort without
506        // compensating should explicitly construct
507        // SagaTerminal::Failed instead of using this helper.
508        // (codex round 7 reversal of round 1's blanket-Failed.)
509        Err(reason) => SagaTerminal::Compensated { reason },
510    }
511}
512
513/// Run a saga's inner future under a 5 s timeout. The inner future
514/// is responsible for emitting `SagaStarted` (the saga itself, since
515/// it owns the saga_id allocation) and any compensation it needs;
516/// `run_saga` only enforces the timeout and emits the terminal
517/// `SagaCompleted` / `SagaFailed`.
518///
519/// Concrete usage (per saga):
520/// ```ignore
521/// pub async fn run(state: &AppState, ...) -> Result<Value, String> {
522///     let saga_id = alloc_saga_id(state);
523///     emit_saga_started(state, saga_id, "tear_off_tab", serde_json::json!({})).await;
524///     let ctx = SagaCtx::new(state, saga_id);
525///     let result = run_saga(run_inner(ctx, ...)).await;
526///     emit_terminal(state, saga_id, classify_run_saga_result(&result)).await;
527///     result
528/// }
529/// ```
530pub async fn run_saga<Fut>(name: &'static str, fut: Fut) -> Result<Value, String>
531where
532    Fut: std::future::Future<Output = Result<Value, String>>,
533{
534    match tokio::time::timeout(SAGA_TIMEOUT, fut).await {
535        Ok(r) => r,
536        Err(_) => Err(format!("saga '{}' timed out after {:?}", name, SAGA_TIMEOUT)),
537    }
538}