agentmux_launcher\saga\log/mod.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// LSD-1 — durable launcher saga log + API.
5//
6// Spec: `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md`
7// - §3.1 storage (separate SQLite file at
8// `<data-dir>/db/launcher-sagas.db`, WAL + 5s busy timeout +
9// foreign_keys=ON; same configuration as srv saga log. Pre-
10// AUDIT_SQLITE_SYSTEMS_2026_05_19.md the file lived directly
11// in `<data-dir>/`; a back-compat migration in
12// `data_dir::launcher_saga_log_path` moves the legacy file
13// into `db/` on first launch.)
14// - §3.2 schema (see `schema.rs`)
15// - §3.3 API surface (this module)
16// - §4 PR1 scope: log exists in isolation, NO coordinator wiring
17//
18// Design parallels to `agentmux-srv/src/sagas/log.rs`. Method names
19// match where shape allows (`open`, `start_saga`, `terminate_saga`,
20// `start_step`, `finish_step`, `fail_step`, `unresolved_sagas`,
21// `mark_failed_compensation`, `snapshot_recent`, `max_saga_id`) so
22// anyone reading both feels at home (LSD spec §3.3 last paragraph).
23//
24// Differences from srv's `SagaLog` driven by LSD spec §3.2:
25// - `target` column on the step table — launcher sagas dispatch to
26// self / host / srv; the column carries the `PipeTarget` so
27// `--diag sagas` can show "where did this step go?"
28// - No `compensated` saga state — F.5/F.6 sagas don't auto-compensate
29// (LSD spec §3.5). `failed_compensation` is the recovery-marked
30// terminal state for unresolved sagas at startup (PR LSD-3 wires
31// the recovery walker; this PR just defines the row).
32// - Timestamps are RFC3339 TEXT instead of epoch-ms INTEGER. Same
33// storage cost; greppable in raw SQLite shells. Conversion happens
34// in `now_rfc3339()` below.
35// - `vacuum_older_than(cutoff)` API is NEW relative to srv's log
36// (LSD spec §3.6 retention; srv doesn't ship this yet).
37//
38// Concurrency: a single `Mutex<Connection>` serializes writes. Each
39// `start_step` / `finish_step` call holds the lock for <1ms; launcher
40// saga rate is ≤ a few per second (LSD spec §3.7 — F.5/F.6 fire on
41// rare user-initiated triggers). No connection pool needed.
42//
43// **PR LSD-1 is foundations-only.** The saga coordinator does NOT
44// call any of these methods yet; LSD-2 wires them in. The module is
45// declared on the saga tree (via `mod log;` in `saga/mod.rs`) and
46// re-exports `LauncherSagaLog` so coordinator code can pick it up
47// later without further plumbing changes.
48
49use std::path::Path;
50use std::sync::Mutex;
51
52use agentmux_common::ipc::{Command, Event};
53use chrono::{DateTime, Utc};
54use rusqlite::{params, Connection, OptionalExtension};
55use serde::{Deserialize, Serialize};
56
57use super::PipeTarget;
58
59mod schema;
60
61#[cfg(test)]
62mod tests;
63
64/// Errors from the launcher saga log. Wraps the three error sources
65/// the API can encounter: SQLite, JSON serialization, and (for the
66/// public `open(path)` constructor) underlying file IO. Distinct
67/// from srv's `StoreError` because srv's WaveStore wraps additional
68/// migration-specific variants the launcher log doesn't need.
69#[derive(Debug, thiserror::Error)]
70pub enum LogError {
71 #[error("sqlite: {0}")]
72 Sqlite(#[from] rusqlite::Error),
73 #[error("json: {0}")]
74 Json(#[from] serde_json::Error),
75 #[error("io: {0}")]
76 Io(#[from] std::io::Error),
77}
78
79/// Outcome of a launcher saga, written by `terminate_saga`.
80///
81/// PR LSD-1 declares all variants up-front so the LSD-2 coordinator
82/// wiring + LSD-3 recovery walker can land without further enum
83/// edits. `dead_code` is suppressed at the enum level because every
84/// variant is wired in a follow-up PR — opt-in suppression at the
85/// type avoids per-variant `#[allow]` clutter.
86///
87/// Mirrors srv's `SagaOutcome` shape but adds `FailedCompensation`
88/// (LSD spec §3.5 — recovery-walker terminal state) and removes
89/// `Compensated` (no auto-compensation in launcher sagas yet).
90///
91/// PR LSD-2 calls `terminate_saga(.., Completed)` from `apply_action`
92/// when a saga returns `SagaAction::Done`, and `Failed` when a saga
93/// returns `SagaAction::Failed` or is evicted by the same-kind
94/// concurrent gate. PR LSD-3 calls `mark_failed_compensation`
95/// directly from the recovery walker; that path uses the dedicated
96/// helper below rather than `terminate_saga(FailedCompensation { .. })`
97/// because recovery wants to be idempotent across repeated
98/// crash-restart cycles (the row may already be in
99/// `failed_compensation` from a prior recovery).
100///
101/// Note: launcher sagas have no `Compensated` terminal state today
102/// (per LSD spec §3.2 + §7 open question). F.5/F.6 sagas don't
103/// auto-compensate; the schema CHECK constraint on `launcher_saga.state`
104/// intentionally omits `'compensated'`. If a future class-D/E saga
105/// needs compensation, add the variant + matching CHECK constraint
106/// migration together — never one without the other.
107#[derive(Debug, Clone, PartialEq, Eq)]
108#[allow(dead_code)] // every variant wired by PR LSD-2 / LSD-3; pinned today by tests.rs
109pub enum SagaOutcome {
110 /// Saga ran to completion successfully. `SagaAction::Done` path.
111 Completed,
112 /// Saga failed with no compensation having run.
113 Failed { reason: String },
114 /// Saga was unresolved at launcher restart and the recovery walker
115 /// marked it as such. Distinct from `Failed` so operators can
116 /// filter for "interesting" cases via `--diag sagas`.
117 /// Constructed by PR LSD-3's recovery walker (via
118 /// `mark_failed_compensation`); included here for symmetry with
119 /// the schema CHECK constraint.
120 FailedCompensation { reason: String },
121}
122
123impl SagaOutcome {
124 fn state_str(&self) -> &'static str {
125 match self {
126 SagaOutcome::Completed => "completed",
127 SagaOutcome::Failed { .. } => "failed",
128 SagaOutcome::FailedCompensation { .. } => "failed_compensation",
129 }
130 }
131
132 fn reason(&self) -> Option<&str> {
133 match self {
134 SagaOutcome::Completed => None,
135 SagaOutcome::Failed { reason }
136 | SagaOutcome::FailedCompensation { reason } => Some(reason.as_str()),
137 }
138 }
139}
140
141/// Serialize a `PipeTarget` to the schema's `target` column. Mirrors
142/// srv's `command_discriminant_name` style (snake_case strings rather
143/// than Debug formatting) so `--diag sagas` output is greppable.
144fn pipe_target_str(t: PipeTarget) -> &'static str {
145 match t {
146 PipeTarget::LauncherSelf => "launcher_self",
147 PipeTarget::Host => "host",
148 PipeTarget::Srv => "srv",
149 }
150}
151
152/// A saga in `running`, `compensating`, or `failed` state at startup.
153/// Returned by `unresolved_sagas`; consumed by PR LSD-3's recovery
154/// walker to mark each as `failed_compensation` (LSD spec §3.5).
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[allow(dead_code)] // Fields consumed by PR LSD-3's recovery walker.
157pub struct UnresolvedLauncherSaga {
158 pub saga_id: u64,
159 pub name: String,
160 pub state: String,
161 pub started_at: String,
162 pub input_json: String,
163 pub failure_reason: Option<String>,
164 pub steps: Vec<UnresolvedLauncherStep>,
165}
166
167/// A step row attached to an `UnresolvedLauncherSaga`. Steps are
168/// returned in `step_index` ascending order.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170#[allow(dead_code)] // Fields consumed by PR LSD-3's recovery walker.
171pub struct UnresolvedLauncherStep {
172 pub step_index: u32,
173 pub name: String,
174 pub state: String,
175 pub target: Option<String>,
176 pub cmd_json: Option<String>,
177 pub output_json: Option<String>,
178 pub started_at: String,
179 pub ended_at: Option<String>,
180 pub failure_reason: Option<String>,
181}
182
183/// Operator-facing snapshot of a recent saga, for `--diag sagas`.
184/// Returned by `snapshot_recent`. Sorted most-recent-first by
185/// `COALESCE(ended_at, started_at)`.
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[allow(dead_code)] // Fields consumed by PR LSD-3's `--diag sagas`.
188pub struct SagaSummary {
189 pub saga_id: u64,
190 pub name: String,
191 pub state: String,
192 pub started_at: String,
193 pub ended_at: Option<String>,
194 pub failure_reason: Option<String>,
195 /// Count of steps in `succeeded` or `compensated` state — i.e.
196 /// progress through the saga.
197 pub step_count: u32,
198 /// JSON of saga input args, for operator triage.
199 pub input_json: String,
200}
201
202/// SQLite-backed launcher saga log. Owned by `SagaCoordinator` as
203/// `Arc<LauncherSagaLog>` once PR LSD-2 wires it; PR LSD-1 only
204/// constructs and tests it in isolation.
205pub struct LauncherSagaLog {
206 conn: Mutex<Connection>,
207}
208
209impl LauncherSagaLog {
210 /// Open a saga log backed by the given SQLite file. Configures
211 /// WAL mode + 5s busy timeout + `foreign_keys=ON` (mirroring
212 /// `SagaLog::open` in `agentmux-srv/src/sagas/log.rs`) and
213 /// applies the schema migration from `schema.rs`.
214 ///
215 /// Idempotent: reopening the same DB applies the same DDL via
216 /// `CREATE TABLE IF NOT EXISTS` — no double-creation, no error.
217 #[allow(dead_code)] // wired in PR LSD-2 (`main.rs` opens the log on startup)
218 pub fn open(path: &Path) -> Result<Self, LogError> {
219 let conn = Connection::open(path)?;
220 Self::configure_and_migrate(conn)
221 }
222
223 /// Open the saga log in read-only mode. Used by `--diag sagas`
224 /// so an operator's diagnostic invocation can't accidentally
225 /// schema-migrate or modify a log that a running launcher owns.
226 /// Skips `configure_and_migrate` (read-only opens shouldn't run
227 /// migrations) and skips the `foreign_keys=ON` pragma since we
228 /// don't write. (codex P2 PR #647 round 3.)
229 pub fn open_read_only(path: &Path) -> Result<Self, LogError> {
230 let conn = Connection::open_with_flags(
231 path,
232 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
233 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
234 )?;
235 // Read-only PRAGMAs only: busy_timeout for WAL coexistence with
236 // a running launcher, no journal_mode change, no migrations.
237 conn.execute_batch("PRAGMA busy_timeout=5000;")?;
238 Ok(Self {
239 conn: Mutex::new(conn),
240 })
241 }
242
243 /// Open an in-memory saga log for testing. Used by `tests.rs`
244 /// and by future PR LSD-2 coordinator integration tests.
245 #[allow(dead_code)] // exercised under #[cfg(test)] only; see saga/log/tests.rs
246 pub fn open_in_memory() -> Result<Self, LogError> {
247 let conn = Connection::open_in_memory()?;
248 Self::configure_and_migrate(conn)
249 }
250
251 fn configure_and_migrate(conn: Connection) -> Result<Self, LogError> {
252 // Same pragma block as srv's `SagaLog::configure_and_migrate`
253 // (codex P2 PR #631). `foreign_keys=ON` enforces the
254 // `launcher_saga_step.saga_id REFERENCES launcher_saga(saga_id)`
255 // declaration; without it, orphan step rows can be written
256 // silently and corrupt diagnostics.
257 conn.execute_batch(
258 "PRAGMA journal_mode=WAL;
259 PRAGMA busy_timeout=5000;
260 PRAGMA synchronous=NORMAL;
261 PRAGMA temp_store=MEMORY;
262 PRAGMA foreign_keys=ON;",
263 )?;
264 schema::run_migrations(&conn)?;
265 schema::stamp_and_check_version(&conn)?;
266 Ok(Self {
267 conn: Mutex::new(conn),
268 })
269 }
270
271 /// Highest existing `saga_id` in the durable log, or 0 if empty.
272 /// PR LSD-2 calls this at coordinator startup to seed
273 /// `next_saga_id` so a launcher restart doesn't reuse ids that
274 /// already have rows in the log.
275 #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::new` seed)
276 pub fn max_saga_id(&self) -> Result<u64, LogError> {
277 let conn = self.conn.lock().unwrap();
278 // Mirror srv's `max_saga_id` — propagate query errors so a
279 // transient SQLite read failure doesn't silently reseed the
280 // allocator to 0 and re-collide with prior rows (codex P2 PR
281 // #631 round 2 rationale; same hazard applies here).
282 let max: Option<i64> =
283 conn.query_row("SELECT MAX(saga_id) FROM launcher_saga", [], |r| r.get(0))?;
284 Ok(max.unwrap_or(0).max(0) as u64)
285 }
286
287 /// Insert a fresh saga row in `running` state. Plain INSERT (not
288 /// OR REPLACE): a duplicate saga_id is a bug worth surfacing,
289 /// not a silent overwrite. Same rationale as srv's `start_saga`
290 /// (codex P1 + reagent P1 PR #631).
291 #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::spawn_saga`)
292 pub fn start_saga(
293 &self,
294 saga_id: u64,
295 name: &str,
296 input: &serde_json::Value,
297 ) -> Result<(), LogError> {
298 let now = now_rfc3339();
299 let input_json = serde_json::to_string(input)?;
300 let conn = self.conn.lock().unwrap();
301 conn.execute(
302 "INSERT INTO launcher_saga
303 (saga_id, name, state, started_at, ended_at, input_json, failure_reason)
304 VALUES (?1, ?2, 'running', ?3, NULL, ?4, NULL)",
305 params![saga_id as i64, name, now, input_json],
306 )?;
307 Ok(())
308 }
309
310 /// Write a saga's terminal lifecycle row. Called by PR LSD-2's
311 /// `apply_action` when the saga returns `Done` / `Failed`. The
312 /// recovery walker uses `mark_failed_compensation` instead.
313 #[allow(dead_code)] // wired in PR LSD-2
314 pub fn terminate_saga(&self, saga_id: u64, outcome: SagaOutcome) -> Result<(), LogError> {
315 let now = now_rfc3339();
316 let state = outcome.state_str();
317 let reason = outcome.reason();
318 let conn = self.conn.lock().unwrap();
319 conn.execute(
320 "UPDATE launcher_saga
321 SET state = ?1, ended_at = ?2, failure_reason = ?3
322 WHERE saga_id = ?4",
323 params![state, now, reason, saga_id as i64],
324 )?;
325 Ok(())
326 }
327
328 /// Insert a `pending` step row before dispatching the command.
329 /// `name` is a short discriminant string (e.g. "issue_cmd_host_reap_panes");
330 /// `cmd` is serialized as JSON for replay/debugging; `target`
331 /// records which pipe the command was destined for so `--diag
332 /// sagas` can show provenance.
333 #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::apply_action::IssueCmd`)
334 pub fn start_step(
335 &self,
336 saga_id: u64,
337 step_index: u32,
338 name: &str,
339 target: PipeTarget,
340 cmd: &Command,
341 ) -> Result<(), LogError> {
342 let now = now_rfc3339();
343 let cmd_json = serde_json::to_string(cmd)?;
344 let target_str = pipe_target_str(target);
345 let conn = self.conn.lock().unwrap();
346 conn.execute(
347 "INSERT INTO launcher_saga_step
348 (saga_id, step_index, name, state, cmd_json, target, started_at, ended_at, output_json, failure_reason)
349 VALUES (?1, ?2, ?3, 'pending', ?4, ?5, ?6, NULL, NULL, NULL)",
350 params![
351 saga_id as i64,
352 step_index,
353 name,
354 cmd_json,
355 target_str,
356 now
357 ],
358 )?;
359 Ok(())
360 }
361
362 /// Mark a step `succeeded` and store the awaited event as JSON.
363 /// PR LSD-2 calls this from `route_event_to_sagas` when a saga's
364 /// `on_event` consumes its awaited bus event.
365 #[allow(dead_code)] // wired in PR LSD-2
366 pub fn finish_step(
367 &self,
368 saga_id: u64,
369 step_index: u32,
370 output: &Event,
371 ) -> Result<(), LogError> {
372 let now = now_rfc3339();
373 let output_json = serde_json::to_string(output)?;
374 let conn = self.conn.lock().unwrap();
375 conn.execute(
376 "UPDATE launcher_saga_step
377 SET state = 'succeeded', output_json = ?1, ended_at = ?2
378 WHERE saga_id = ?3 AND step_index = ?4",
379 params![output_json, now, saga_id as i64, step_index],
380 )?;
381 Ok(())
382 }
383
384 /// Mark a step `failed`. Stores the reason in the step's
385 /// `failure_reason` column (distinct from srv's log, which
386 /// stuffs the reason into `output_json` as `{"error": ...}`;
387 /// LSD schema gives us a dedicated column so we use it).
388 #[allow(dead_code)] // wired in PR LSD-2 (saga timeout / dispatch error path)
389 pub fn fail_step(
390 &self,
391 saga_id: u64,
392 step_index: u32,
393 reason: &str,
394 ) -> Result<(), LogError> {
395 let now = now_rfc3339();
396 let conn = self.conn.lock().unwrap();
397 conn.execute(
398 "UPDATE launcher_saga_step
399 SET state = 'failed', failure_reason = ?1, ended_at = ?2
400 WHERE saga_id = ?3 AND step_index = ?4",
401 params![reason, now, saga_id as i64, step_index],
402 )?;
403 Ok(())
404 }
405
406 /// Return all sagas still in `running`, `compensating`, or
407 /// `failed` state, each with its full step list. PR LSD-3's
408 /// startup recovery walker iterates this list and calls
409 /// `mark_failed_compensation` on each (LSD spec §3.5).
410 ///
411 /// `failed` is included for symmetry with srv's `unresolved_sagas`
412 /// (codex P1 PR #631 round 2): a launcher saga marked `failed`
413 /// without restart-time recovery would still benefit from the
414 /// `failed_compensation` upgrade so `--diag sagas` consistently
415 /// surfaces it as "needs operator attention."
416 pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedLauncherSaga>, LogError> {
417 let conn = self.conn.lock().unwrap();
418 let mut stmt = conn.prepare(
419 "SELECT saga_id, name, state, started_at, input_json, failure_reason
420 FROM launcher_saga
421 WHERE state IN ('running', 'compensating', 'failed')
422 ORDER BY saga_id ASC",
423 )?;
424 let saga_rows: Vec<(i64, String, String, String, String, Option<String>)> = stmt
425 .query_map([], |row| {
426 Ok((
427 row.get::<_, i64>(0)?,
428 row.get::<_, String>(1)?,
429 row.get::<_, String>(2)?,
430 row.get::<_, String>(3)?,
431 row.get::<_, String>(4)?,
432 row.get::<_, Option<String>>(5)?,
433 ))
434 })?
435 .collect::<Result<Vec<_>, _>>()?;
436 drop(stmt);
437
438 let mut out = Vec::with_capacity(saga_rows.len());
439 for (saga_id, name, state, started_at, input_json, failure_reason) in saga_rows {
440 let mut step_stmt = conn.prepare(
441 "SELECT step_index, name, state, target, cmd_json,
442 output_json, started_at, ended_at, failure_reason
443 FROM launcher_saga_step
444 WHERE saga_id = ?1
445 ORDER BY step_index ASC",
446 )?;
447 let steps: Vec<UnresolvedLauncherStep> = step_stmt
448 .query_map(params![saga_id], |row| {
449 Ok(UnresolvedLauncherStep {
450 step_index: row.get::<_, i64>(0)? as u32,
451 name: row.get::<_, String>(1)?,
452 state: row.get::<_, String>(2)?,
453 target: row.get::<_, Option<String>>(3)?,
454 cmd_json: row.get::<_, Option<String>>(4)?,
455 output_json: row.get::<_, Option<String>>(5)?,
456 started_at: row.get::<_, String>(6)?,
457 ended_at: row.get::<_, Option<String>>(7)?,
458 failure_reason: row.get::<_, Option<String>>(8)?,
459 })
460 })?
461 .collect::<Result<Vec<_>, _>>()?;
462 out.push(UnresolvedLauncherSaga {
463 saga_id: saga_id as u64,
464 name,
465 state,
466 started_at,
467 input_json,
468 failure_reason,
469 steps,
470 });
471 }
472 Ok(out)
473 }
474
475 /// Fetch the step rows for a single saga regardless of saga state.
476 /// `unresolved_sagas` filters out `failed_compensation` (and other
477 /// terminal states), but `--diag sagas` needs to surface step rows
478 /// for sagas the recovery walker just marked `failed_compensation`
479 /// — operators triaging a recovered crash need to see what was
480 /// pending when the launcher exited. (codex P1 PR #647 round 1.)
481 pub fn get_saga_steps(&self, saga_id: u64) -> Result<Vec<UnresolvedLauncherStep>, LogError> {
482 let conn = self.conn.lock().unwrap();
483 let mut stmt = conn.prepare(
484 "SELECT step_index, name, state, target, cmd_json,
485 output_json, started_at, ended_at, failure_reason
486 FROM launcher_saga_step
487 WHERE saga_id = ?1
488 ORDER BY step_index ASC",
489 )?;
490 let steps: Vec<UnresolvedLauncherStep> = stmt
491 .query_map(params![saga_id as i64], |row| {
492 Ok(UnresolvedLauncherStep {
493 step_index: row.get::<_, i64>(0)? as u32,
494 name: row.get::<_, String>(1)?,
495 state: row.get::<_, String>(2)?,
496 target: row.get::<_, Option<String>>(3)?,
497 cmd_json: row.get::<_, Option<String>>(4)?,
498 output_json: row.get::<_, Option<String>>(5)?,
499 started_at: row.get::<_, String>(6)?,
500 ended_at: row.get::<_, Option<String>>(7)?,
501 failure_reason: row.get::<_, Option<String>>(8)?,
502 })
503 })?
504 .collect::<Result<Vec<_>, _>>()?;
505 Ok(steps)
506 }
507
508 /// Mark a saga as `failed_compensation` — the recovery walker's
509 /// terminal write. Idempotent across repeated calls (the saga
510 /// stays in `failed_compensation`; `ended_at` is rewritten to
511 /// the latest call's timestamp; `failure_reason` is preserved
512 /// when already populated and the new reason is APPENDED rather
513 /// than overwritten — see SQL CASE WHEN below). This preserves
514 /// the original failure cause (e.g. timeout) while adding the
515 /// recovery context. (codex P2 PR #647 round 1: post-mortem
516 /// preservation.) LSD spec §3.5 — operator-review terminal state.
517 pub fn mark_failed_compensation(
518 &self,
519 saga_id: u64,
520 reason: &str,
521 ) -> Result<(), LogError> {
522 let now = now_rfc3339();
523 let conn = self.conn.lock().unwrap();
524 // Preserve original failure_reason when already populated.
525 // A saga in `failed` state pre-crash carries the precise
526 // original cause (timeout, dispatch error, etc.) that
527 // operators need for post-mortem. Recovery transitions
528 // state to `failed_compensation` but augments rather than
529 // replaces failure_reason — appends the restart context
530 // so both signals are visible in `--diag sagas`.
531 // (codex P2 PR #647 round 1.)
532 conn.execute(
533 "UPDATE launcher_saga
534 SET state = 'failed_compensation',
535 ended_at = ?1,
536 failure_reason = CASE
537 WHEN failure_reason IS NULL OR failure_reason = ''
538 THEN ?2
539 ELSE failure_reason || ' | recovered: ' || ?2
540 END
541 WHERE saga_id = ?3",
542 params![now, reason, saga_id as i64],
543 )?;
544 Ok(())
545 }
546
547 /// Return up to `limit` recent sagas for `--diag sagas`. Sorted
548 /// most-recent-first by `COALESCE(ended_at, started_at)`. Mirrors
549 /// srv's `snapshot_recent` shape.
550 pub fn snapshot_recent(&self, limit: usize) -> Result<Vec<SagaSummary>, LogError> {
551 let conn = self.conn.lock().unwrap();
552 let mut stmt = conn.prepare(
553 "SELECT saga_id, name, state, started_at, ended_at, failure_reason, input_json
554 FROM launcher_saga
555 ORDER BY COALESCE(ended_at, started_at) DESC, saga_id DESC
556 LIMIT ?1",
557 )?;
558 let rows: Vec<(
559 i64,
560 String,
561 String,
562 String,
563 Option<String>,
564 Option<String>,
565 String,
566 )> = stmt
567 .query_map(params![limit as i64], |row| {
568 Ok((
569 row.get::<_, i64>(0)?,
570 row.get::<_, String>(1)?,
571 row.get::<_, String>(2)?,
572 row.get::<_, String>(3)?,
573 row.get::<_, Option<String>>(4)?,
574 row.get::<_, Option<String>>(5)?,
575 row.get::<_, String>(6)?,
576 ))
577 })?
578 .collect::<Result<Vec<_>, _>>()?;
579 drop(stmt);
580
581 let mut out = Vec::with_capacity(rows.len());
582 for (saga_id, name, state, started_at, ended_at, failure_reason, input_json) in rows {
583 let count: Option<i64> = conn
584 .query_row(
585 "SELECT COUNT(*) FROM launcher_saga_step
586 WHERE saga_id = ?1 AND state IN ('succeeded', 'compensated')",
587 params![saga_id],
588 |row| row.get(0),
589 )
590 .optional()?;
591 out.push(SagaSummary {
592 saga_id: saga_id as u64,
593 name,
594 state,
595 started_at,
596 ended_at,
597 failure_reason,
598 step_count: count.unwrap_or(0) as u32,
599 input_json,
600 });
601 }
602 Ok(out)
603 }
604
605 /// Delete saga rows whose `ended_at` is before `cutoff` AND whose
606 /// state is terminal (`completed`, `failed`, `failed_compensation`).
607 /// Returns the number of rows deleted. In-flight sagas (`running`,
608 /// `compensating`) are NEVER vacuumed — that would mask
609 /// crashed-mid-saga incidents from the recovery walker (LSD spec §3.6).
610 ///
611 /// `ON DELETE CASCADE` on `launcher_saga_step.saga_id` ensures
612 /// the corresponding step rows go with the saga in a single
613 /// SQLite transaction — no manual cleanup needed.
614 // LSD-4: wired by `main.rs::run_windows` startup retention task.
615 pub fn vacuum_older_than(&self, cutoff: DateTime<Utc>) -> Result<usize, LogError> {
616 let cutoff_str = cutoff.to_rfc3339();
617 let conn = self.conn.lock().unwrap();
618 let removed = conn.execute(
619 "DELETE FROM launcher_saga
620 WHERE state IN ('completed', 'failed', 'failed_compensation')
621 AND ended_at IS NOT NULL
622 AND ended_at < ?1",
623 params![cutoff_str],
624 )?;
625 Ok(removed)
626 }
627}
628
629/// RFC3339 timestamp for `started_at` / `ended_at` columns. Single
630/// helper so test+production paths agree on format precisely.
631fn now_rfc3339() -> String {
632 Utc::now().to_rfc3339()
633}