agentmux_launcher\saga/
recovery.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// LSD-3 — startup recovery walker for unresolved launcher sagas.
5//
6// Spec: `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §3.5.
7// Companion plan: `docs/specs/SAGA_ARCHITECTURE_EXECUTION_PLAN_2026-05-01.md` §2 batch 3.
8//
9// Called from `main.rs::run_windows` AFTER opening `LauncherSagaLog`
10// and BEFORE spawning the saga coordinator. The walker:
11//   1. Reads `saga_log.unresolved_sagas()` (sagas in `running` /
12//      `compensating` / `failed` state — see LSD spec §3.3).
13//   2. For each unresolved saga, calls
14//      `saga_log.mark_failed_compensation(saga_id, ..reason..)` with
15//      a human-readable reason that names the prior state.
16//   3. Logs a `[saga-recovery]` warning per saga (id + name + state)
17//      and an info line with the total count.
18//
19// **Critical: we DO NOT auto-replay or auto-compensate launcher sagas.**
20// LSD spec §3.5 — launcher sagas drive host-side effects on live OS
21// state (pane reaps, pool drains) that may already be partially
22// applied. Re-issuing the forward command might double-act; deriving
23// an inverse without pre-state is unsound. The right escape hatch is
24// operator review via `--diag sagas`. The recovery walker's job is to
25// move the row from "in-flight at restart" to a clearly-marked
26// terminal state so the operator can see it and decide.
27//
28// Compare with srv's recovery (`agentmux-srv/src/sagas/recovery.rs`):
29// srv DOES drive inverse-command compensation through the reducer,
30// because srv saga effects are reducer-level state mutations the
31// reducer can undo. Launcher sagas don't have that property — they
32// dispatch real-world host commands. Hence the asymmetry.
33//
34// The walker is idempotent: `mark_failed_compensation` is itself
35// idempotent (LSD-1 PR), and `unresolved_sagas` only returns rows
36// not already in `failed_compensation`. A second crash mid-recovery
37// just re-touches the same rows on the next restart with the same
38// reason string.
39
40use std::sync::Arc;
41
42use super::LauncherSagaLog;
43
44/// Walk all unresolved sagas in the durable log and mark each as
45/// `failed_compensation`. Returns the count of sagas touched.
46///
47/// Errors propagate from `LauncherSagaLog::unresolved_sagas` (read
48/// failure: corrupt SQLite, schema mismatch). The caller (`main.rs`)
49/// treats a walker failure as **non-fatal** — the launcher logs a
50/// WARN and continues. Rationale: the saga log is open (open()
51/// succeeded), so the schema is intact; a transient SELECT failure
52/// is best-surfaced as a launcher-log warning so the saga
53/// coordinator still spawns. Prior crashed sagas stay in `running`
54/// for one more restart cycle in that case (and get cleaned up
55/// next time). (reagent P2 PR #647 round 1: doc/contract sync.)
56///
57/// Per-saga `mark_failed_compensation` failures are logged but NOT
58/// fatal — the walker continues to subsequent sagas. Stopping on one
59/// row's write failure would leave later unresolved sagas in `running`
60/// when we could have cleaned them. Operators see the per-saga error
61/// in the launcher log.
62pub async fn compensate_unresolved_launcher_sagas(
63    saga_log: &Arc<LauncherSagaLog>,
64) -> Result<usize, String> {
65    let unresolved = saga_log
66        .unresolved_sagas()
67        .map_err(|e| format!("read unresolved sagas: {}", e))?;
68
69    if unresolved.is_empty() {
70        crate::log("[saga-recovery] no unresolved sagas at startup");
71        return Ok(0);
72    }
73
74    let total = unresolved.len();
75    crate::log(&format!(
76        "[saga-recovery] found {} unresolved saga(s) at startup — marking failed_compensation",
77        total
78    ));
79
80    let mut touched = 0usize;
81    for saga in &unresolved {
82        let reason = format!(
83            "launcher restarted while saga in state '{}'",
84            saga.state
85        );
86        crate::log(&format!(
87            "[saga-recovery] WARN saga {} ({}) was '{}' when launcher last exited; marking failed_compensation",
88            saga.saga_id, saga.name, saga.state
89        ));
90        match saga_log.mark_failed_compensation(saga.saga_id, &reason) {
91            Ok(()) => {
92                touched += 1;
93            }
94            Err(e) => {
95                // Best-effort: log + keep walking. A single SQLite
96                // write hiccup shouldn't block the rest of the cleanup.
97                crate::log(&format!(
98                    "[saga-recovery] WARN saga {} mark_failed_compensation failed: {} — leaving in '{}'",
99                    saga.saga_id, e, saga.state
100                ));
101            }
102        }
103    }
104
105    crate::log(&format!(
106        "[saga-recovery] processed {}/{} unresolved sagas",
107        touched, total
108    ));
109    Ok(touched)
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use crate::saga::PipeTarget;
116    use agentmux_common::ipc::Command;
117
118    /// Integration: simulate a "crashed" saga by writing a `running`
119    /// row directly via `LauncherSagaLog`. Run the recovery walker.
120    /// Assert the saga is now `failed_compensation`.
121    #[tokio::test]
122    async fn recovery_walker_marks_running_saga_failed_compensation() {
123        let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
124        // Crashed saga: started + one step dispatched, never terminated.
125        log.start_saga(11, "window_cleanup_cascade", &serde_json::json!({"label": "win-3"}))
126            .unwrap();
127        log.start_step(
128            11,
129            0,
130            "issue_cmd_host_reap_panes",
131            PipeTarget::Host,
132            &Command::Ping { nonce: 0 },
133        )
134        .unwrap();
135
136        let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
137        assert_eq!(touched, 1);
138
139        // Saga has moved to failed_compensation; no longer unresolved.
140        let unresolved = log.unresolved_sagas().unwrap();
141        assert!(
142            unresolved.is_empty(),
143            "saga 11 should no longer be unresolved, got: {:?}",
144            unresolved
145        );
146        let snap = log.snapshot_recent(10).unwrap();
147        let saga = snap.iter().find(|s| s.saga_id == 11).unwrap();
148        assert_eq!(saga.state, "failed_compensation");
149        assert!(
150            saga.failure_reason
151                .as_deref()
152                .unwrap_or("")
153                .contains("launcher restarted while saga in state 'running'"),
154            "expected failure_reason to name the prior state, got: {:?}",
155            saga.failure_reason
156        );
157    }
158
159    /// Recovery on an empty log is a no-op + returns 0.
160    #[tokio::test]
161    async fn recovery_walker_empty_log_returns_zero() {
162        let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
163        let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
164        assert_eq!(touched, 0);
165    }
166
167    /// Recovery walker is idempotent: calling it twice doesn't error
168    /// and doesn't re-touch sagas already in `failed_compensation`.
169    /// Mirrors srv's recovery idempotence guarantee.
170    #[tokio::test]
171    async fn recovery_walker_is_idempotent_across_repeated_calls() {
172        let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
173        log.start_saga(7, "saga_a", &serde_json::json!({})).unwrap();
174
175        let first = compensate_unresolved_launcher_sagas(&log).await.unwrap();
176        assert_eq!(first, 1);
177
178        // Second call — saga 7 is now failed_compensation, NOT
179        // unresolved, so the walker touches 0.
180        let second = compensate_unresolved_launcher_sagas(&log).await.unwrap();
181        assert_eq!(second, 0);
182    }
183
184    /// Multiple unresolved sagas → all get marked, return value is the
185    /// total touched count.
186    #[tokio::test]
187    async fn recovery_walker_handles_multiple_unresolved_sagas() {
188        let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
189        log.start_saga(1, "saga_a", &serde_json::json!({})).unwrap();
190        log.start_saga(2, "saga_b", &serde_json::json!({"x": 2})).unwrap();
191        log.start_saga(3, "saga_c", &serde_json::json!({})).unwrap();
192        // Saga 3 is also failed (still surfaces via unresolved_sagas
193        // per LSD-1 + spec §3.5 — recovery upgrades it too).
194        log.terminate_saga(
195            3,
196            super::super::log::SagaOutcome::Failed {
197                reason: "evicted".into(),
198            },
199        )
200        .unwrap();
201
202        let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
203        assert_eq!(touched, 3);
204
205        let unresolved = log.unresolved_sagas().unwrap();
206        assert!(unresolved.is_empty(), "all sagas resolved post-walk");
207    }
208}