agentmux_srv\sagas/mod.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.5.5 — srv-side saga coordinator.
5//
6// **Why srv, not launcher:** the existing E.1a coordinator
7// framework lives in `agentmux-launcher::saga` (which still does
8// nothing — no consumers). The original Phase E spec assumed
9// sagas would fan out across host/launcher/srv via cross-process
10// IPC; the actual implementation kept that fan-out in the frontend
11// (`requestTearOff` calls srv-rpc and host-rpc directly), so every
12// saga in the E.5 plan mutates only srv state. In-process oneshot
13// dispatch beats an IPC round-trip on every saga step. See
14// `docs/retro/saga-coordinator-location-analysis-2026-04-30.md` for
15// the full reasoning, including the robustness trade-offs (which
16// are the same for both placements).
17//
18// **Shape:** sagas are async functions that:
19// 1. allocate a fresh saga_id via `alloc_saga_id`,
20// 2. emit `Event::SagaStarted` via `emit_saga_started`,
21// 3. drive their state machine via `SagaCtx::dispatch` /
22// `SagaCtx::compensate`,
23// 4. emit `Event::SagaCompleted` or `Event::SagaFailed` via
24// `emit_terminal` once the inner work returns.
25//
26// `run_saga(state, name, future)` is a thin wrapper that does
27// 1+2+4 + applies a 5 s timeout. Sagas pass the future directly
28// (not a closure), avoiding the lifetime-of-SagaCtx complication
29// that closure-style coordinators run into.
30//
31// **Compensation:** the saga's inner future is responsible for
32// driving compensation before returning `Err`. `SagaCtx::compensate`
33// is a best-effort dispatch that swallows errors (the saga is
34// already failing; secondary failures get logged). Idempotency of
35// compensating commands (`MoveTab` back to source, `DeleteWorkspace`,
36// etc.) keeps the cleanup safe even if a step partially applied.
37//
38// What this module does NOT close (per the location analysis §4.2):
39// * Per-step SQLite transactions in the subscriber (gap; F1.A).
40// * Host pool-promote and renderer registration outside the saga
41// (gap; Phase F).
42// * Saga state across srv restart (gap; Phase F+).
43
44pub mod delete_block;
45pub mod delete_tab;
46pub mod delete_workspace;
47pub mod log;
48pub mod promote_block_to_tab;
49pub mod recovery;
50pub mod restore_torn_off_tab;
51pub mod tear_off_block;
52pub mod tear_off_tab;
53
54// Step 7 — E.7 integration tests. Cross-saga end-to-end coverage
55// that exercises reducer + saga coordinator + persist subscriber +
56// saga log together against a real `AppState` (in-memory wstore +
57// sagalog). Per-saga unit tests under each saga module already cover
58// happy + reject paths in isolation; this module focuses on
59// multi-surface consistency (reducer/wstore/saga-log) that PR 2's
60// `compensate_unresolved` will rely on.
61#[cfg(test)]
62mod integration_tests;
63
64use std::sync::atomic::{AtomicU32, Ordering};
65
66use agentmux_common::ipc::{Command, Event};
67use serde_json::Value;
68
69use crate::sagas::log::{command_discriminant_name, SagaOutcome};
70use crate::server::AppState;
71
72/// Maximum wall-clock time a saga is allowed to run before the
73/// coordinator force-fails it. Tear-off sagas should complete in
74/// tens of milliseconds; the budget is generous to absorb SQLite
75/// write spikes without flapping in CI.
76const SAGA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
77
78/// Read-only context passed to a saga's inner async function.
79/// Wraps the AppState handle and the saga's allocated id.
80///
81/// Construct via [`SagaCtx::new`] — the durability log requires the
82/// per-step counter to start at zero and be owned by the ctx (so
83/// concurrent sagas don't interleave step indices).
84pub struct SagaCtx<'a> {
85 pub(crate) state: &'a AppState,
86 pub(crate) saga_id: u64,
87 /// Monotonic step index (0, 1, 2, ...) for this saga. Each
88 /// `dispatch` / `compensate` call `fetch_add(1)`s and writes the
89 /// resulting index into the saga log. Atomic because saga inner
90 /// futures may parallelise dispatches in the future (today they
91 /// don't, but the cost is one cache line).
92 pub(crate) step_index: AtomicU32,
93 /// (codex P1 PR #636 round 4.) Stack of forward-step indices that
94 /// have completed successfully and are eligible to be undone by
95 /// the next `compensate` call. `dispatch` pushes on success;
96 /// `compensate` pops to determine which original forward step
97 /// it's reversing, and marks that step `compensated` in the log.
98 /// Without this, in-process compensation only writes new
99 /// `compensated` rows at fresh indices; the original `succeeded`
100 /// rows stay `succeeded`, so resume-on-restart re-replays them
101 /// and either no-ops or worse double-applies the inverse.
102 ///
103 /// `Mutex<Vec>` (rather than a lock-free counter) because saga
104 /// inner futures could in theory parallelize compensations; in
105 /// practice they're serial today, so contention is zero.
106 pub(crate) forward_step_stack: tokio::sync::Mutex<Vec<u32>>,
107}
108
109impl<'a> SagaCtx<'a> {
110 /// Construct a fresh context for a saga that has just allocated
111 /// its `saga_id` (via [`alloc_saga_id`]).
112 pub fn new(state: &'a AppState, saga_id: u64) -> Self {
113 Self {
114 state,
115 saga_id,
116 step_index: AtomicU32::new(0),
117 forward_step_stack: tokio::sync::Mutex::new(Vec::new()),
118 }
119 }
120
121 /// Saga-id this context belongs to. Used by sagas that need to
122 /// log progress with the saga prefix.
123 #[allow(dead_code)]
124 pub fn saga_id(&self) -> u64 {
125 self.saga_id
126 }
127
128 /// Acquire the reducer's state lock for read-only inspection.
129 /// Used by sagas that need to inspect post-step state to decide
130 /// the next step (e.g. RestoreTornOffTab checking whether the
131 /// source workspace is now empty before issuing the cascade
132 /// delete). Hold briefly — the reducer is single-mutex.
133 pub async fn state_lock(&self) -> tokio::sync::MutexGuard<'_, crate::state::State> {
134 self.state.srv_state.lock().await
135 }
136
137 /// Dispatch `cmd` through the srv reducer and apply the emitted
138 /// events to SQLite + the broadcast bus, exactly like the
139 /// in-handler reducer-dispatch helpers.
140 ///
141 /// Returns the emitted event vec on success. If the reducer
142 /// emits any `Event::Error`, the error message is returned and
143 /// SQLite/bus side-effects are skipped — the caller must then
144 /// dispatch compensation for the saga's already-applied steps.
145 pub async fn dispatch(&self, cmd: Command) -> Result<Vec<Event>, String> {
146 // Saga durability — write a `pending` step row before
147 // dispatch so a crash mid-dispatch leaves a recoverable
148 // breadcrumb (PR 2's compensate-on-restart will see it).
149 let idx = self.step_index.fetch_add(1, Ordering::Relaxed);
150 let step_name = command_discriminant_name(&cmd);
151 if let Err(e) = self
152 .state
153 .saga_log
154 .start_step(self.saga_id, idx, &step_name, &cmd)
155 {
156 // Log-write failure is non-fatal: the in-memory saga
157 // path is still authoritative for THIS srv run; we lose
158 // crash-recovery for this step, but the user's command
159 // shouldn't fail because durability hiccupped.
160 tracing::warn!(
161 saga_id = self.saga_id,
162 step_index = idx,
163 "[saga] start_step log write failed: {} — continuing without durable log for this step",
164 e
165 );
166 }
167
168 let events = crate::server::service::dispatch_to_reducer(self.state, cmd).await;
169 if let Some(message) = events.iter().find_map(|e| match e {
170 Event::Error { message, .. } => Some(message.clone()),
171 _ => None,
172 }) {
173 if let Err(e) = self.state.saga_log.fail_step(self.saga_id, idx, &message) {
174 tracing::warn!(
175 saga_id = self.saga_id,
176 step_index = idx,
177 "[saga] fail_step log write failed: {}",
178 e
179 );
180 }
181 return Err(message);
182 }
183 for ev in &events {
184 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, &self.state.wstore)
185 {
186 // (reagent P1 PR #631 round 2) Mark the step as
187 // failed in the durable log BEFORE returning. Without
188 // this, the step row stays in `pending` state even
189 // though the reducer already applied the command
190 // (line 139); PR 2's compensate-on-restart sees a
191 // `pending` step and can't determine whether the
192 // command was applied.
193 let err_msg = e.to_string();
194 if let Err(log_err) =
195 self.state.saga_log.fail_step(self.saga_id, idx, &err_msg)
196 {
197 tracing::warn!(
198 saga_id = self.saga_id,
199 step_index = idx,
200 "[saga] fail_step log write failed during wstore-apply error path: {}",
201 log_err,
202 );
203 }
204 return Err(err_msg);
205 }
206 }
207 if let Err(e) = self.state.saga_log.finish_step(self.saga_id, idx, &events) {
208 tracing::warn!(
209 saga_id = self.saga_id,
210 step_index = idx,
211 "[saga] finish_step log write failed: {}",
212 e
213 );
214 }
215 // (codex P1 PR #636 round 4.) Track this idx as a successful
216 // forward step eligible for compensation. The next
217 // `compensate` call will pop this and mark the original step
218 // `compensated`, preventing resume-on-restart from re-replaying
219 // an inverse that already ran in-process.
220 self.forward_step_stack.lock().await.push(idx);
221 crate::server::service::publish_events(self.state, &events);
222 Ok(events)
223 }
224
225 /// Best-effort compensating dispatch. Same as `dispatch` but
226 /// SQLite-write failures are logged and swallowed. Intended for
227 /// the unwind path: the saga is already returning an error to
228 /// the caller; throwing on cleanup hides the original cause and
229 /// prevents subsequent compensating commands from running.
230 pub async fn compensate(&self, cmd: Command) {
231 // Compensation gets its own step row so the durable log
232 // distinguishes "step that succeeded forward" from "step
233 // that ran in unwind". Index continues monotonically from
234 // forward steps so `--diag sagas` shows the full sequence.
235 let idx = self.step_index.fetch_add(1, Ordering::Relaxed);
236 let step_name = command_discriminant_name(&cmd);
237 if let Err(e) = self
238 .state
239 .saga_log
240 .start_step(self.saga_id, idx, &step_name, &cmd)
241 {
242 tracing::warn!(
243 saga_id = self.saga_id,
244 step_index = idx,
245 "[saga] compensate start_step log write failed: {}",
246 e
247 );
248 }
249 let events =
250 crate::server::service::dispatch_to_reducer(self.state, cmd.clone()).await;
251 if let Some(message) = events.iter().find_map(|e| match e {
252 Event::Error { message, .. } => Some(message.clone()),
253 _ => None,
254 }) {
255 tracing::warn!(
256 saga_id = self.saga_id,
257 "[saga] compensation rejected by reducer: {} (cmd discriminant: {:?})",
258 message,
259 std::mem::discriminant(&cmd),
260 );
261 if let Err(e) = self.state.saga_log.fail_step(self.saga_id, idx, &message) {
262 tracing::warn!(
263 saga_id = self.saga_id,
264 step_index = idx,
265 "[saga] compensate fail_step log write failed: {}",
266 e
267 );
268 }
269 return;
270 }
271 for ev in &events {
272 if let Err(e) =
273 crate::persist_subscriber::apply_event_to_wstore(ev, &self.state.wstore)
274 {
275 tracing::warn!(
276 saga_id = self.saga_id,
277 "[saga] compensation: SQLite write failed: {}",
278 e
279 );
280 }
281 }
282 if let Err(e) = self
283 .state
284 .saga_log
285 .compensate_step(self.saga_id, idx, &events)
286 {
287 tracing::warn!(
288 saga_id = self.saga_id,
289 step_index = idx,
290 "[saga] compensate_step log write failed: {}",
291 e
292 );
293 }
294 // (codex P1 PR #636 round 4.) Pop the most-recent successful
295 // forward step from the stack and mark its original log row
296 // as compensated. This prevents resume-on-restart from
297 // double-replaying the inverse of a step that already had
298 // in-process compensation. Idempotent — UPDATE only matches
299 // rows still in `succeeded` state.
300 if let Some(forward_idx) = self.forward_step_stack.lock().await.pop() {
301 if let Err(e) = self
302 .state
303 .saga_log
304 .mark_step_compensated(self.saga_id, forward_idx)
305 {
306 tracing::warn!(
307 saga_id = self.saga_id,
308 forward_step_index = forward_idx,
309 "[saga] mark_step_compensated (live) log write failed: {} — restart may re-replay this inverse",
310 e
311 );
312 }
313 }
314 crate::server::service::publish_events(self.state, &events);
315 }
316}
317
318/// Allocate the next saga_id. Monotonic per srv-process run.
319pub fn alloc_saga_id(state: &AppState) -> u64 {
320 state.saga_id_alloc.fetch_add(1, Ordering::Relaxed) + 1
321}
322
323/// Emit `Event::SagaStarted` for a freshly-allocated saga_id.
324/// Sagas call this immediately after `alloc_saga_id` so subscribers
325/// see the start record before any per-step events.
326///
327/// Also writes a `running` row to the durable saga log (PR 1 of
328/// SPEC_SAGA_DURABILITY_2026-05-01.md), recording `input` as the
329/// saga's arguments serialized to JSON. PR 2's `compensate_unresolved`
330/// + `--diag sagas` rely on this for crash-recovery provenance, so
331/// callers should pass a structured representation of their inputs
332/// (typically `serde_json::json!({...})`). (reagent P1 PR #631 —
333/// `Value::Null` placeholder erased provenance.)
334///
335/// **Fail-fast on log error.** (codex P1 PR #631 round 2.) If
336/// `start_saga` fails — most likely a UNIQUE constraint violation
337/// from a saga_id collision — the saga MUST NOT proceed. Otherwise
338/// later `terminate()` calls would `UPDATE saga SET ... WHERE saga_id=?`
339/// against a *different run's* row, mixing lifecycle data across
340/// sagas and silently corrupting the durability log. Returning
341/// `Err` here propagates up to the caller, which records the
342/// failure via `emit_terminal` (with a fresh saga_id allocated by
343/// the caller's `alloc_saga_id` retry path, if any).
344pub async fn emit_saga_started(
345 state: &AppState,
346 saga_id: u64,
347 name: &'static str,
348 input: serde_json::Value,
349) -> Result<(), String> {
350 if let Err(e) = state.saga_log.start_saga(saga_id, name, &input) {
351 let msg = format!(
352 "saga durable start row insert failed for saga_id={}: {} (likely ID collision; refusing to run)",
353 saga_id, e
354 );
355 tracing::error!(
356 saga_id,
357 name,
358 "[saga] {} — aborting saga to avoid corrupting prior run's lifecycle row",
359 msg,
360 );
361 return Err(msg);
362 }
363 let v = state.srv_state.lock().await.bump_version();
364 let _ = state.srv_events_tx.send(Event::SagaStarted {
365 saga_id,
366 name: name.to_string(),
367 version: v,
368 });
369 Ok(())
370}
371
372/// Outcome a saga's inner future hands back to `emit_terminal`.
373///
374/// (codex P1 PR #631) The original PR 1 implementation mapped every
375/// `Err` to `SagaOutcome::Compensated`, which is wrong for timeout/
376/// abort paths: `run_saga` wraps the inner future in
377/// `tokio::time::timeout`, and a timeout cancels the future *before*
378/// it can run its compensation block. Recording "compensated" when
379/// nothing was compensated would hide partially-applied state from
380/// PR 2's `compensate_unresolved` resume scan — exactly the failure
381/// mode this log exists to catch.
382///
383/// `Compensated` should only be recorded when compensation actually
384/// completed; everything else (timeout, panic-converted-to-error,
385/// pre-compensation early-return) records `Failed`, which leaves the
386/// saga visible to PR 2's resume scan.
387#[derive(Debug)]
388pub enum SagaTerminal<'a> {
389 /// All steps applied successfully.
390 Completed,
391 /// Compensation block ran to completion. Caller asserts this only
392 /// after every compensating dispatch returned without error.
393 Compensated { reason: &'a str },
394 /// Saga aborted before/during compensation: timeout, panic,
395 /// pre-compensation early-return, or any other path where
396 /// compensation can't be assumed to have run. Default for the
397 /// "I don't know if compensation completed" case.
398 Failed { reason: &'a str },
399}
400
401/// Emit the saga's terminal lifecycle event + durable log row.
402///
403/// Maps `SagaTerminal` to:
404/// - `Completed` → `Event::SagaCompleted` + log state `completed`.
405/// - `Compensated { reason }` → `Event::SagaFailed { reason }` + log state `compensated`.
406/// - `Failed { reason }` → `Event::SagaFailed { reason }` + log state `failed`.
407///
408/// The renderer-facing event is the same `SagaFailed` for both
409/// non-success paths (the renderer doesn't currently distinguish).
410/// The durable log distinguishes — PR 2's resume scan picks up
411/// `failed` rows where compensation may not have run.
412pub async fn emit_terminal(state: &AppState, saga_id: u64, terminal: SagaTerminal<'_>) {
413 let log_outcome = match &terminal {
414 SagaTerminal::Completed => SagaOutcome::Completed,
415 SagaTerminal::Compensated { reason } => SagaOutcome::Compensated {
416 reason: reason.to_string(),
417 },
418 SagaTerminal::Failed { reason } => SagaOutcome::Failed {
419 reason: reason.to_string(),
420 },
421 };
422 // (codex P1 PR #636 round 7 — reverted from round 6.)
423 // Bulk-mark only on Compensated. Round 6 extended to Failed too,
424 // but BOTH bots flagged that as data-loss: timeout/abort paths
425 // classify as Failed and never run compensation, but the bulk-
426 // mark would relabel forward steps as `compensated`, hiding
427 // them from recovery and leaving side effects permanently
428 // applied.
429 //
430 // Sagas that DO unwind via inner-future ctx.compensate calls
431 // should classify as Compensated (the per-step pop already
432 // marks 1:1; this bulk call catches residual 1:N cases like
433 // tear_off_block's single DeleteWorkspace undoing both
434 // CreateWorkspace + CreateTab). `classify_run_saga_result`
435 // maps non-timeout Err → Compensated to support this; timeouts
436 // → Failed so recovery picks up un-undone rows.
437 if matches!(terminal, SagaTerminal::Compensated { .. }) {
438 if let Err(e) = state.saga_log.mark_all_succeeded_steps_compensated(saga_id) {
439 tracing::warn!(
440 saga_id,
441 "[saga] mark_all_succeeded_steps_compensated failed: {} — restart may re-replay an inverse",
442 e
443 );
444 }
445 }
446 if let Err(e) = state.saga_log.terminate(saga_id, log_outcome) {
447 tracing::warn!(
448 saga_id,
449 "[saga] terminate log write failed: {} — saga lifecycle row will look 'running' to PR 2's resume scan, which will then compensate it",
450 e
451 );
452 }
453 let v = state.srv_state.lock().await.bump_version();
454 let event = match terminal {
455 SagaTerminal::Completed => Event::SagaCompleted {
456 saga_id,
457 version: v,
458 },
459 SagaTerminal::Compensated { reason } | SagaTerminal::Failed { reason } => {
460 Event::SagaFailed {
461 saga_id,
462 reason: reason.to_string(),
463 version: v,
464 }
465 }
466 };
467 let _ = state.srv_events_tx.send(event);
468}
469
470/// Convenience: classify the standard `run_saga` `Result<Value, String>`
471/// outcome into a `SagaTerminal`.
472///
473/// - `Ok(_)` → `Completed`.
474/// - `Err(_)` → `Failed`.
475///
476/// (codex P1 PR #631 round 2.) The earlier round mapped non-timeout
477/// `Err` to `Compensated` on the assumption that "our sagas drive
478/// compensation in their inner future before returning `Err`."
479/// That's true for the *forward* dispatch failures, but
480/// `SagaCtx::compensate` is **best-effort** — if a compensating
481/// dispatch is itself rejected by the reducer, `compensate` logs a
482/// warning and returns without signaling failure. Marking those as
483/// `Compensated` would hide partially-applied state from PR 2's
484/// restart recovery (which scans for `running`/`failed` to know what
485/// to compensate).
486///
487/// Conservative default: classify all errors as `Failed`. Sagas that
488/// can *prove* compensation succeeded (e.g. a future per-step
489/// compensation-success log) construct `SagaTerminal::Compensated`
490/// directly without going through this helper.
491pub fn classify_run_saga_result(result: &Result<serde_json::Value, String>) -> SagaTerminal<'_> {
492 match result {
493 Ok(_) => SagaTerminal::Completed,
494 // Timeouts/aborts: compensation never ran (run_saga's
495 // tokio::time::timeout cancels the inner future before it
496 // can compensate). Classify as Failed so recovery picks up
497 // the un-undone forward steps.
498 Err(reason) if reason.contains("timed out") => SagaTerminal::Failed { reason },
499 // Other Err: by convention, our sagas drive compensation
500 // in their inner future before returning Err (each
501 // ctx.compensate call already marked its target). Classify
502 // as Compensated so emit_terminal's bulk-mark cleans up any
503 // residual succeeded rows from 1:N compensation patterns
504 // (e.g. tear_off_block's single DeleteWorkspace undoing
505 // multiple CreateX steps). Sagas that abort without
506 // compensating should explicitly construct
507 // SagaTerminal::Failed instead of using this helper.
508 // (codex round 7 reversal of round 1's blanket-Failed.)
509 Err(reason) => SagaTerminal::Compensated { reason },
510 }
511}
512
513/// Run a saga's inner future under a 5 s timeout. The inner future
514/// is responsible for emitting `SagaStarted` (the saga itself, since
515/// it owns the saga_id allocation) and any compensation it needs;
516/// `run_saga` only enforces the timeout and emits the terminal
517/// `SagaCompleted` / `SagaFailed`.
518///
519/// Concrete usage (per saga):
520/// ```ignore
521/// pub async fn run(state: &AppState, ...) -> Result<Value, String> {
522/// let saga_id = alloc_saga_id(state);
523/// emit_saga_started(state, saga_id, "tear_off_tab", serde_json::json!({})).await;
524/// let ctx = SagaCtx::new(state, saga_id);
525/// let result = run_saga(run_inner(ctx, ...)).await;
526/// emit_terminal(state, saga_id, classify_run_saga_result(&result)).await;
527/// result
528/// }
529/// ```
530pub async fn run_saga<Fut>(name: &'static str, fut: Fut) -> Result<Value, String>
531where
532 Fut: std::future::Future<Output = Result<Value, String>>,
533{
534 match tokio::time::timeout(SAGA_TIMEOUT, fut).await {
535 Ok(r) => r,
536 Err(_) => Err(format!("saga '{}' timed out after {:?}", name, SAGA_TIMEOUT)),
537 }
538}