agentmux_launcher\saga/recovery.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// LSD-3 — startup recovery walker for unresolved launcher sagas.
5//
6// Spec: `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md` §3.5.
7// Companion plan: `docs/specs/SAGA_ARCHITECTURE_EXECUTION_PLAN_2026-05-01.md` §2 batch 3.
8//
9// Called from `main.rs::run_windows` AFTER opening `LauncherSagaLog`
10// and BEFORE spawning the saga coordinator. The walker:
11// 1. Reads `saga_log.unresolved_sagas()` (sagas in `running` /
12// `compensating` / `failed` state — see LSD spec §3.3).
13// 2. For each unresolved saga, calls
14// `saga_log.mark_failed_compensation(saga_id, ..reason..)` with
15// a human-readable reason that names the prior state.
16// 3. Logs a `[saga-recovery]` warning per saga (id + name + state)
17// and an info line with the total count.
18//
19// **Critical: we DO NOT auto-replay or auto-compensate launcher sagas.**
20// LSD spec §3.5 — launcher sagas drive host-side effects on live OS
21// state (pane reaps, pool drains) that may already be partially
22// applied. Re-issuing the forward command might double-act; deriving
23// an inverse without pre-state is unsound. The right escape hatch is
24// operator review via `--diag sagas`. The recovery walker's job is to
25// move the row from "in-flight at restart" to a clearly-marked
26// terminal state so the operator can see it and decide.
27//
28// Compare with srv's recovery (`agentmux-srv/src/sagas/recovery.rs`):
29// srv DOES drive inverse-command compensation through the reducer,
30// because srv saga effects are reducer-level state mutations the
31// reducer can undo. Launcher sagas don't have that property — they
32// dispatch real-world host commands. Hence the asymmetry.
33//
34// The walker is idempotent: `mark_failed_compensation` is itself
35// idempotent (LSD-1 PR), and `unresolved_sagas` only returns rows
36// not already in `failed_compensation`. A second crash mid-recovery
37// just re-touches the same rows on the next restart with the same
38// reason string.
39
40use std::sync::Arc;
41
42use super::LauncherSagaLog;
43
44/// Walk all unresolved sagas in the durable log and mark each as
45/// `failed_compensation`. Returns the count of sagas touched.
46///
47/// Errors propagate from `LauncherSagaLog::unresolved_sagas` (read
48/// failure: corrupt SQLite, schema mismatch). The caller (`main.rs`)
49/// treats a walker failure as **non-fatal** — the launcher logs a
50/// WARN and continues. Rationale: the saga log is open (open()
51/// succeeded), so the schema is intact; a transient SELECT failure
52/// is best-surfaced as a launcher-log warning so the saga
53/// coordinator still spawns. Prior crashed sagas stay in `running`
54/// for one more restart cycle in that case (and get cleaned up
55/// next time). (reagent P2 PR #647 round 1: doc/contract sync.)
56///
57/// Per-saga `mark_failed_compensation` failures are logged but NOT
58/// fatal — the walker continues to subsequent sagas. Stopping on one
59/// row's write failure would leave later unresolved sagas in `running`
60/// when we could have cleaned them. Operators see the per-saga error
61/// in the launcher log.
62pub async fn compensate_unresolved_launcher_sagas(
63 saga_log: &Arc<LauncherSagaLog>,
64) -> Result<usize, String> {
65 let unresolved = saga_log
66 .unresolved_sagas()
67 .map_err(|e| format!("read unresolved sagas: {}", e))?;
68
69 if unresolved.is_empty() {
70 crate::log("[saga-recovery] no unresolved sagas at startup");
71 return Ok(0);
72 }
73
74 let total = unresolved.len();
75 crate::log(&format!(
76 "[saga-recovery] found {} unresolved saga(s) at startup — marking failed_compensation",
77 total
78 ));
79
80 let mut touched = 0usize;
81 for saga in &unresolved {
82 let reason = format!(
83 "launcher restarted while saga in state '{}'",
84 saga.state
85 );
86 crate::log(&format!(
87 "[saga-recovery] WARN saga {} ({}) was '{}' when launcher last exited; marking failed_compensation",
88 saga.saga_id, saga.name, saga.state
89 ));
90 match saga_log.mark_failed_compensation(saga.saga_id, &reason) {
91 Ok(()) => {
92 touched += 1;
93 }
94 Err(e) => {
95 // Best-effort: log + keep walking. A single SQLite
96 // write hiccup shouldn't block the rest of the cleanup.
97 crate::log(&format!(
98 "[saga-recovery] WARN saga {} mark_failed_compensation failed: {} — leaving in '{}'",
99 saga.saga_id, e, saga.state
100 ));
101 }
102 }
103 }
104
105 crate::log(&format!(
106 "[saga-recovery] processed {}/{} unresolved sagas",
107 touched, total
108 ));
109 Ok(touched)
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use crate::saga::PipeTarget;
116 use agentmux_common::ipc::Command;
117
118 /// Integration: simulate a "crashed" saga by writing a `running`
119 /// row directly via `LauncherSagaLog`. Run the recovery walker.
120 /// Assert the saga is now `failed_compensation`.
121 #[tokio::test]
122 async fn recovery_walker_marks_running_saga_failed_compensation() {
123 let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
124 // Crashed saga: started + one step dispatched, never terminated.
125 log.start_saga(11, "window_cleanup_cascade", &serde_json::json!({"label": "win-3"}))
126 .unwrap();
127 log.start_step(
128 11,
129 0,
130 "issue_cmd_host_reap_panes",
131 PipeTarget::Host,
132 &Command::Ping { nonce: 0 },
133 )
134 .unwrap();
135
136 let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
137 assert_eq!(touched, 1);
138
139 // Saga has moved to failed_compensation; no longer unresolved.
140 let unresolved = log.unresolved_sagas().unwrap();
141 assert!(
142 unresolved.is_empty(),
143 "saga 11 should no longer be unresolved, got: {:?}",
144 unresolved
145 );
146 let snap = log.snapshot_recent(10).unwrap();
147 let saga = snap.iter().find(|s| s.saga_id == 11).unwrap();
148 assert_eq!(saga.state, "failed_compensation");
149 assert!(
150 saga.failure_reason
151 .as_deref()
152 .unwrap_or("")
153 .contains("launcher restarted while saga in state 'running'"),
154 "expected failure_reason to name the prior state, got: {:?}",
155 saga.failure_reason
156 );
157 }
158
159 /// Recovery on an empty log is a no-op + returns 0.
160 #[tokio::test]
161 async fn recovery_walker_empty_log_returns_zero() {
162 let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
163 let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
164 assert_eq!(touched, 0);
165 }
166
167 /// Recovery walker is idempotent: calling it twice doesn't error
168 /// and doesn't re-touch sagas already in `failed_compensation`.
169 /// Mirrors srv's recovery idempotence guarantee.
170 #[tokio::test]
171 async fn recovery_walker_is_idempotent_across_repeated_calls() {
172 let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
173 log.start_saga(7, "saga_a", &serde_json::json!({})).unwrap();
174
175 let first = compensate_unresolved_launcher_sagas(&log).await.unwrap();
176 assert_eq!(first, 1);
177
178 // Second call — saga 7 is now failed_compensation, NOT
179 // unresolved, so the walker touches 0.
180 let second = compensate_unresolved_launcher_sagas(&log).await.unwrap();
181 assert_eq!(second, 0);
182 }
183
184 /// Multiple unresolved sagas → all get marked, return value is the
185 /// total touched count.
186 #[tokio::test]
187 async fn recovery_walker_handles_multiple_unresolved_sagas() {
188 let log = Arc::new(LauncherSagaLog::open_in_memory().unwrap());
189 log.start_saga(1, "saga_a", &serde_json::json!({})).unwrap();
190 log.start_saga(2, "saga_b", &serde_json::json!({"x": 2})).unwrap();
191 log.start_saga(3, "saga_c", &serde_json::json!({})).unwrap();
192 // Saga 3 is also failed (still surfaces via unresolved_sagas
193 // per LSD-1 + spec §3.5 — recovery upgrades it too).
194 log.terminate_saga(
195 3,
196 super::super::log::SagaOutcome::Failed {
197 reason: "evicted".into(),
198 },
199 )
200 .unwrap();
201
202 let touched = compensate_unresolved_launcher_sagas(&log).await.unwrap();
203 assert_eq!(touched, 3);
204
205 let unresolved = log.unresolved_sagas().unwrap();
206 assert!(unresolved.is_empty(), "all sagas resolved post-walk");
207 }
208}