agentmux_srv\sagas/
log.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Saga durability — durable on-disk log of saga lifecycle.
5//
6// See `docs/specs/SPEC_SAGA_DURABILITY_2026-05-01.md` for the full
7// design. This module ships PR 1 of the spec: schema + log API +
8// `SagaCtx` instrumentation. Resume-on-startup, `--diag sagas`, and
9// crash-recovery integration tests follow in PR 2.
10//
11// Why an isolated SQLite file (`sagas.db`) rather than co-locating
12// inside `objects.db`: we want saga writes to commit independently
13// of the WaveStore migration / connection. A separate connection
14// also keeps the saga log's mutex contention isolated from the
15// reducer's persistence path. (The two-store atomicity concern from
16// spec §2.1 is not load-bearing for PR 1; saga steps are written
17// after the reducer-emitted event has already been applied to
18// wstore by `apply_event_to_wstore`. Compensate-on-restart in PR 2
19// will reconcile any divergence by walking succeeded steps in
20// reverse.)
21//
22// Concurrency: a single `Mutex<Connection>` serializes writes. Each
23// `start_step` / `finish_step` call holds the lock for <1ms. If
24// profiling shows the mutex becomes hot under load, switch to a
25// connection pool — defer until measurement justifies it (spec §5).
26
27use std::path::Path;
28use std::sync::Mutex;
29
30use agentmux_common::ipc::{Command, Event};
31use rusqlite::{params, Connection, OptionalExtension};
32use serde::{Deserialize, Serialize};
33
34use crate::backend::storage::error::StoreError;
35use crate::backend::storage::migrations::{
36    check_schema_compat, run_saga_log_migrations, stamp_version, SAGA_LOG_SCHEMA_VERSION,
37};
38
39/// Outcome of a saga, written by `terminate`.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum SagaOutcome {
42    /// Saga ran to completion successfully.
43    Completed,
44    /// Saga failed without compensation completing (or before
45    /// compensation began). `reason` is operator-readable.
46    Failed { reason: String },
47    /// Saga failed and compensation completed cleanly. `reason` is
48    /// the original failure that triggered compensation.
49    Compensated { reason: String },
50}
51
52impl SagaOutcome {
53    fn state_str(&self) -> &'static str {
54        match self {
55            SagaOutcome::Completed => "completed",
56            SagaOutcome::Failed { .. } => "failed",
57            SagaOutcome::Compensated { .. } => "compensated",
58        }
59    }
60
61    fn reason(&self) -> Option<&str> {
62        match self {
63            SagaOutcome::Completed => None,
64            SagaOutcome::Failed { reason } | SagaOutcome::Compensated { reason } => {
65                Some(reason.as_str())
66            }
67        }
68    }
69}
70
71/// A saga in `running` or `compensating` state at startup. Returned
72/// by `unresolved_sagas`; consumed by PR 2's `compensate_unresolved`
73/// to walk succeeded steps in reverse and dispatch compensation.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[allow(dead_code)] // Fields consumed by PR 2's compensate_unresolved.
76pub struct UnresolvedSaga {
77    pub saga_id: u64,
78    pub name: String,
79    pub state: String,
80    pub started_at: i64,
81    pub input_json: String,
82    pub steps: Vec<UnresolvedStep>,
83}
84
85/// A step row attached to an `UnresolvedSaga`. Steps are returned in
86/// `step_index` ascending order; PR 2's reverse-walker iterates over
87/// the `succeeded` entries.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[allow(dead_code)] // Fields consumed by PR 2's compensate_unresolved.
90pub struct UnresolvedStep {
91    pub step_index: u32,
92    pub name: String,
93    pub state: String,
94    pub cmd_json: String,
95    pub output_json: Option<String>,
96    pub started_at: i64,
97    pub ended_at: Option<i64>,
98}
99
100/// Operator-facing snapshot of a recent saga, for `--diag sagas`.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102#[allow(dead_code)] // Fields consumed by PR 2's `--diag sagas`.
103pub struct SagaSnapshot {
104    pub saga_id: u64,
105    pub name: String,
106    pub state: String,
107    pub started_at: i64,
108    pub terminal_at: Option<i64>,
109    pub failure_reason: Option<String>,
110    pub step_count: u32,
111    /// JSON serialization of the saga's input args, captured by the
112    /// caller in `emit_saga_started` (reagent P1 PR #631 — was being
113    /// stubbed `null`). Operator-readable provenance for `--diag sagas`.
114    pub input_json: String,
115}
116
117/// SQLite-backed saga log. Owned by `AppState` as `Arc<SagaLog>`;
118/// every `SagaCtx::dispatch` and `compensate` call writes through
119/// it. See module-level docs for design notes.
120pub struct SagaLog {
121    conn: Mutex<Connection>,
122}
123
124impl SagaLog {
125    /// Open a saga log backed by the given SQLite file. Configures
126    /// WAL mode + 5s busy timeout (mirroring `WaveStore::open`) and
127    /// applies the schema migration.
128    pub fn open(path: &Path) -> Result<Self, StoreError> {
129        let conn = Connection::open(path)?;
130        Self::configure_and_migrate(conn)
131    }
132
133    /// Open an in-memory saga log for testing.
134    #[allow(dead_code)]
135    pub fn open_in_memory() -> Result<Self, StoreError> {
136        let conn = Connection::open_in_memory()?;
137        Self::configure_and_migrate(conn)
138    }
139
140    fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError> {
141        // `foreign_keys=ON` enforces the `saga_step.saga_id REFERENCES
142        // saga(saga_id)` declaration; SQLite defaults this to OFF
143        // (codex P2 PR #631). Without it, orphan step rows can be
144        // written silently — corrupts diagnostics + PR 2's resume
145        // logic which reconstructs state from saga + saga_step joins.
146        conn.execute_batch(
147            "PRAGMA journal_mode=WAL;
148             PRAGMA busy_timeout=5000;
149             PRAGMA synchronous=NORMAL;
150             PRAGMA temp_store=MEMORY;
151             PRAGMA foreign_keys=ON;",
152        )?;
153        // Safety lock BEFORE migrations — same discipline as wstore /
154        // filestore: refuse to touch a newer-schema DB on disk before
155        // any mutating step runs. See `check_schema_compat` doc.
156        check_schema_compat(&conn, SAGA_LOG_SCHEMA_VERSION, "sagas.db")?;
157        run_saga_log_migrations(&conn)?;
158        stamp_version(&conn, SAGA_LOG_SCHEMA_VERSION)?;
159        Ok(Self {
160            conn: Mutex::new(conn),
161        })
162    }
163
164    /// Highest existing `saga_id` in the durable log, or 0 if the log
165    /// is empty. Used at startup to seed `state.saga_id_alloc` so new
166    /// sagas don't reuse IDs from prior srv-process runs (reagent P1
167    /// PR #631 — `INSERT` would fail on collision but the saga itself
168    /// would have already started, so we seed defensively).
169    pub fn max_saga_id(&self) -> Result<u64, StoreError> {
170        let conn = self.conn.lock().unwrap();
171        // Propagate query errors. (codex P2 PR #631 round 2.) The
172        // earlier `.ok().flatten()` swallowed SQLite errors and
173        // returned 0 as if the log were empty — a transient read
174        // failure would then reseed `saga_id_alloc=0` on the next
175        // restart, sagas would reuse IDs 1, 2, 3..., and `start_saga`
176        // would reject them (no OR REPLACE), leaving live sagas with
177        // no durable lifecycle row. Better to surface the error and
178        // let the startup hook log the explicit warning + accept the
179        // collision risk knowingly.
180        let max: Option<i64> = conn.query_row("SELECT MAX(saga_id) FROM saga", [], |r| r.get(0))?;
181        Ok(max.unwrap_or(0).max(0) as u64)
182    }
183
184    /// Insert a fresh saga row in `running` state. Called by the
185    /// coordinator immediately after `alloc_saga_id` (and before any
186    /// per-step writes).
187    pub fn start_saga(
188        &self,
189        saga_id: u64,
190        name: &str,
191        input: &serde_json::Value,
192    ) -> Result<(), StoreError> {
193        let now = now_ms();
194        let input_json = serde_json::to_string(input)?;
195        let conn = self.conn.lock().unwrap();
196        // Plain INSERT (not OR REPLACE) so saga_id collisions surface
197        // as errors instead of silently overwriting prior runs'
198        // history. The allocator is seeded from MAX(saga_id) at
199        // startup (see `max_saga_id` + `main.rs`), so collisions
200        // shouldn't happen in practice — if they do, that's a bug
201        // worth surfacing. (codex P1 + reagent P1 PR #631.)
202        conn.execute(
203            "INSERT INTO saga
204             (saga_id, name, state, started_at, terminal_at, failure_reason, input_json)
205             VALUES (?1, ?2, 'running', ?3, NULL, NULL, ?4)",
206            params![saga_id as i64, name, now, input_json],
207        )?;
208        Ok(())
209    }
210
211    /// Insert a `pending` step row before dispatching the command.
212    /// `name` is a short discriminant string (e.g. "MoveTab"); `cmd`
213    /// is serialized as JSON for replay/debugging.
214    pub fn start_step(
215        &self,
216        saga_id: u64,
217        step_index: u32,
218        name: &str,
219        cmd: &Command,
220    ) -> Result<(), StoreError> {
221        let now = now_ms();
222        let cmd_json = serde_json::to_string(cmd)?;
223        let conn = self.conn.lock().unwrap();
224        // Plain INSERT — same rationale as `start_saga`.
225        conn.execute(
226            "INSERT INTO saga_step
227             (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
228             VALUES (?1, ?2, ?3, 'pending', ?4, NULL, ?5, NULL)",
229            params![saga_id as i64, step_index, name, cmd_json, now],
230        )?;
231        Ok(())
232    }
233
234    /// Mark a step `succeeded` and store its emitted events as
235    /// JSON, used by compensation to reconstruct context if needed.
236    pub fn finish_step(
237        &self,
238        saga_id: u64,
239        step_index: u32,
240        output_events: &[Event],
241    ) -> Result<(), StoreError> {
242        let now = now_ms();
243        let output_json = serde_json::to_string(output_events)?;
244        let conn = self.conn.lock().unwrap();
245        conn.execute(
246            "UPDATE saga_step
247             SET state = 'succeeded', output_json = ?1, ended_at = ?2
248             WHERE saga_id = ?3 AND step_index = ?4",
249            params![output_json, now, saga_id as i64, step_index],
250        )?;
251        Ok(())
252    }
253
254    /// Mark a step `failed`. Stores the reducer's error message in
255    /// `output_json` as `{"error": ...}` so PR 2's `--diag sagas`
256    /// can surface it without a separate column.
257    pub fn fail_step(
258        &self,
259        saga_id: u64,
260        step_index: u32,
261        reason: &str,
262    ) -> Result<(), StoreError> {
263        let now = now_ms();
264        let output_json = serde_json::to_string(&serde_json::json!({ "error": reason }))
265            ?;
266        let conn = self.conn.lock().unwrap();
267        conn.execute(
268            "UPDATE saga_step
269             SET state = 'failed', output_json = ?1, ended_at = ?2
270             WHERE saga_id = ?3 AND step_index = ?4",
271            params![output_json, now, saga_id as i64, step_index],
272        )?;
273        Ok(())
274    }
275
276    /// Mark a compensation step `compensated`. Same shape as
277    /// `finish_step` but distinct state. Called from
278    /// `SagaCtx::compensate` after the reducer applies the
279    /// compensating command successfully.
280    pub fn compensate_step(
281        &self,
282        saga_id: u64,
283        step_index: u32,
284        output_events: &[Event],
285    ) -> Result<(), StoreError> {
286        let now = now_ms();
287        let output_json = serde_json::to_string(output_events)?;
288        let conn = self.conn.lock().unwrap();
289        conn.execute(
290            "INSERT OR REPLACE INTO saga_step
291             (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
292             VALUES (
293                 ?1, ?2,
294                 COALESCE((SELECT name FROM saga_step WHERE saga_id=?1 AND step_index=?2), 'compensate'),
295                 'compensated',
296                 COALESCE((SELECT cmd_json FROM saga_step WHERE saga_id=?1 AND step_index=?2), ''),
297                 ?3,
298                 COALESCE((SELECT started_at FROM saga_step WHERE saga_id=?1 AND step_index=?2), ?4),
299                 ?4
300             )",
301            params![saga_id as i64, step_index, output_json, now],
302        )?;
303        Ok(())
304    }
305
306    /// Highest existing `step_index` for a saga, +1, or 0 if no steps
307    /// exist yet. Used by PR 2's resume-on-startup so recovery
308    /// compensation rows are appended AFTER the saga's original step
309    /// indices (rather than overwriting in place via `compensate_step`,
310    /// which would lose the original step's `output_json` provenance).
311    pub fn next_step_index(&self, saga_id: u64) -> Result<u32, StoreError> {
312        let conn = self.conn.lock().unwrap();
313        let max: Option<i64> = conn.query_row(
314            "SELECT MAX(step_index) FROM saga_step WHERE saga_id = ?1",
315            params![saga_id as i64],
316            |r| r.get(0),
317        )?;
318        Ok((max.unwrap_or(-1) + 1) as u32)
319    }
320
321    /// Append a new compensation step row at `step_index`. Distinct
322    /// from `compensate_step()` (which overwrites an existing step row
323    /// by index): used by PR 2's resume-on-startup to record a
324    /// recovery dispatch as a NEW row so the original succeeded
325    /// step's provenance is preserved.
326    ///
327    /// `state='compensated'` if `error.is_none()`, else `'failed'`
328    /// (with the reason in `output_json`).
329    pub fn append_recovery_step(
330        &self,
331        saga_id: u64,
332        step_index: u32,
333        name: &str,
334        cmd: &Command,
335        events: &[Event],
336        error: Option<&str>,
337    ) -> Result<(), StoreError> {
338        let now = now_ms();
339        let cmd_json = serde_json::to_string(cmd)?;
340        let (state, output_json) = match error {
341            None => ("compensated", serde_json::to_string(events)?),
342            Some(err) => (
343                "failed",
344                serde_json::to_string(&serde_json::json!({ "error": err }))?,
345            ),
346        };
347        let conn = self.conn.lock().unwrap();
348        conn.execute(
349            "INSERT INTO saga_step
350             (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
351             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?7)",
352            params![saga_id as i64, step_index, name, state, cmd_json, output_json, now],
353        )?;
354        Ok(())
355    }
356
357    /// Mark a saga as `compensating` — used by PR 2's resume-on-startup
358    /// to flag a saga as undergoing recovery before walking its
359    /// succeeded steps in reverse. After all compensating dispatches
360    /// run, the caller invokes `terminate()` with the appropriate
361    /// outcome (`Compensated` / `Failed`).
362    ///
363    /// (codex follow-up.) Distinct from `terminate(state='compensated')`
364    /// because the saga is only **mid-compensation** here — its
365    /// terminal_at and failure_reason are not known yet.
366    pub fn mark_compensating(&self, saga_id: u64) -> Result<(), StoreError> {
367        let conn = self.conn.lock().unwrap();
368        conn.execute(
369            "UPDATE saga SET state = 'compensating' WHERE saga_id = ?1",
370            params![saga_id as i64],
371        )?;
372        Ok(())
373    }
374
375    /// Mark ALL of a saga's remaining `succeeded` forward steps as
376    /// `compensated` in one shot. (codex P1 PR #636 round 5.) Used
377    /// at `emit_terminal` time when the outcome is `Compensated` —
378    /// the saga author is asserting "I've unwound everything," so
379    /// any forward step still in `succeeded` is by definition done.
380    /// Catches the case where one compensating command undoes
381    /// multiple forward steps (e.g. `tear_off_block`'s single
382    /// `DeleteWorkspace` undoes both `CreateWorkspace` and
383    /// `CreateTab`) — `SagaCtx::compensate`'s per-call stack pop
384    /// only marks one, leaving the other as still-`succeeded` and
385    /// triggering double-compensation on restart.
386    pub fn mark_all_succeeded_steps_compensated(
387        &self,
388        saga_id: u64,
389    ) -> Result<(), StoreError> {
390        let now = now_ms();
391        let conn = self.conn.lock().unwrap();
392        conn.execute(
393            "UPDATE saga_step
394             SET state = 'compensated', ended_at = ?1
395             WHERE saga_id = ?2 AND state = 'succeeded'",
396            params![now, saga_id as i64],
397        )?;
398        Ok(())
399    }
400
401    /// Mark an *original* forward step as `compensated`. (codex P1
402    /// PR #636 round 2.) Without this, a second crash-recovery
403    /// startup re-reads the still-`succeeded` original steps and
404    /// replays their inverses again — flipping a previously-recovered
405    /// saga into `failed_compensation` (or worse, applying duplicate
406    /// side effects like a second delete). The recovery rows
407    /// (`append_recovery_step`) live at fresh indices and don't
408    /// affect the original step's state on their own; this update
409    /// is the explicit "this forward step has been compensated"
410    /// marker that the next recovery scan needs to see.
411    pub fn mark_step_compensated(
412        &self,
413        saga_id: u64,
414        step_index: u32,
415    ) -> Result<(), StoreError> {
416        let now = now_ms();
417        let conn = self.conn.lock().unwrap();
418        conn.execute(
419            "UPDATE saga_step
420             SET state = 'compensated', ended_at = ?1
421             WHERE saga_id = ?2 AND step_index = ?3 AND state = 'succeeded'",
422            params![now, saga_id as i64, step_index],
423        )?;
424        Ok(())
425    }
426
427    /// Mark a saga as `failed_compensation` — used by PR 2's resume-
428    /// on-startup when at least one compensating dispatch errored. The
429    /// saga is left in this terminal state for operator review;
430    /// `--diag sagas` surfaces it.
431    pub fn mark_failed_compensation(
432        &self,
433        saga_id: u64,
434        reason: &str,
435    ) -> Result<(), StoreError> {
436        let now = now_ms();
437        let conn = self.conn.lock().unwrap();
438        conn.execute(
439            "UPDATE saga
440             SET state = 'failed_compensation', terminal_at = ?1, failure_reason = ?2
441             WHERE saga_id = ?3",
442            params![now, reason, saga_id as i64],
443        )?;
444        Ok(())
445    }
446
447    /// Write the saga's terminal lifecycle row. Called from
448    /// `emit_terminal` after the inner future returns.
449    pub fn terminate(&self, saga_id: u64, outcome: SagaOutcome) -> Result<(), StoreError> {
450        let now = now_ms();
451        let state = outcome.state_str();
452        let reason = outcome.reason();
453        let conn = self.conn.lock().unwrap();
454        conn.execute(
455            "UPDATE saga
456             SET state = ?1, terminal_at = ?2, failure_reason = ?3
457             WHERE saga_id = ?4",
458            params![state, now, reason, saga_id as i64],
459        )?;
460        Ok(())
461    }
462
463    /// Return all sagas still in `running`, `compensating`, or
464    /// `failed` state, each with its full step list (for PR 2's
465    /// compensate-on-restart reverse walk). Used at startup before
466    /// the API server begins accepting requests.
467    ///
468    /// (codex P1 PR #631 round 2.) `failed` is included because
469    /// `classify_run_saga_result` records timeout/cancel paths as
470    /// `failed` — those are exactly the partial-apply cases this
471    /// durability layer exists to recover. A `failed` saga's step
472    /// list may have `succeeded` rows whose effects need
473    /// compensation; if we filtered them out here, recovery would
474    /// silently leave that state in place.
475    #[allow(dead_code)] // Used by PR 2's resume-on-startup; tests exercise it now.
476    pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedSaga>, StoreError> {
477        let conn = self.conn.lock().unwrap();
478        let mut stmt = conn.prepare(
479            "SELECT saga_id, name, state, started_at, input_json
480             FROM saga
481             WHERE state IN ('running', 'compensating', 'failed')
482             ORDER BY saga_id ASC",
483        )?;
484        let saga_rows: Vec<(i64, String, String, i64, String)> = stmt
485            .query_map([], |row| {
486                Ok((
487                    row.get::<_, i64>(0)?,
488                    row.get::<_, String>(1)?,
489                    row.get::<_, String>(2)?,
490                    row.get::<_, i64>(3)?,
491                    row.get::<_, String>(4)?,
492                ))
493            })?
494            .collect::<Result<Vec<_>, _>>()?;
495        drop(stmt);
496
497        let mut out = Vec::with_capacity(saga_rows.len());
498        for (saga_id, name, state, started_at, input_json) in saga_rows {
499            let mut step_stmt = conn.prepare(
500                "SELECT step_index, name, state, cmd_json, output_json, started_at, ended_at
501                 FROM saga_step
502                 WHERE saga_id = ?1
503                 ORDER BY step_index ASC",
504            )?;
505            let steps: Vec<UnresolvedStep> = step_stmt
506                .query_map(params![saga_id], |row| {
507                    Ok(UnresolvedStep {
508                        step_index: row.get::<_, i64>(0)? as u32,
509                        name: row.get::<_, String>(1)?,
510                        state: row.get::<_, String>(2)?,
511                        cmd_json: row.get::<_, String>(3)?,
512                        output_json: row.get::<_, Option<String>>(4)?,
513                        started_at: row.get::<_, i64>(5)?,
514                        ended_at: row.get::<_, Option<i64>>(6)?,
515                    })
516                })?
517                .collect::<Result<Vec<_>, _>>()?;
518            out.push(UnresolvedSaga {
519                saga_id: saga_id as u64,
520                name,
521                state,
522                started_at,
523                input_json,
524                steps,
525            });
526        }
527        Ok(out)
528    }
529
530    /// Return up to `limit` recent sagas for `--diag sagas`. Sorted
531    /// most-recent-first. `step_count` is the count of `succeeded`
532    /// or `compensated` steps (i.e. progress through the saga).
533    #[allow(dead_code)] // Used by PR 2's `--diag sagas`; tests exercise it now.
534    pub fn snapshot_recent(&self, limit: u32) -> Result<Vec<SagaSnapshot>, StoreError> {
535        let conn = self.conn.lock().unwrap();
536        let mut stmt = conn.prepare(
537            "SELECT saga_id, name, state, started_at, terminal_at, failure_reason, input_json
538             FROM saga
539             ORDER BY COALESCE(terminal_at, started_at) DESC
540             LIMIT ?1",
541        )?;
542        let rows: Vec<(i64, String, String, i64, Option<i64>, Option<String>, String)> = stmt
543            .query_map(params![limit], |row| {
544                Ok((
545                    row.get::<_, i64>(0)?,
546                    row.get::<_, String>(1)?,
547                    row.get::<_, String>(2)?,
548                    row.get::<_, i64>(3)?,
549                    row.get::<_, Option<i64>>(4)?,
550                    row.get::<_, Option<String>>(5)?,
551                    row.get::<_, String>(6)?,
552                ))
553            })?
554            .collect::<Result<Vec<_>, _>>()?;
555        drop(stmt);
556
557        let mut out = Vec::with_capacity(rows.len());
558        for (saga_id, name, state, started_at, terminal_at, failure_reason, input_json) in rows {
559            let count: Option<i64> = conn
560                .query_row(
561                    "SELECT COUNT(*) FROM saga_step
562                     WHERE saga_id = ?1 AND state IN ('succeeded', 'compensated')",
563                    params![saga_id],
564                    |row| row.get(0),
565                )
566                .optional()?;
567            out.push(SagaSnapshot {
568                saga_id: saga_id as u64,
569                name,
570                state,
571                started_at,
572                terminal_at,
573                failure_reason,
574                step_count: count.unwrap_or(0) as u32,
575                input_json,
576            });
577        }
578        Ok(out)
579    }
580}
581
582fn now_ms() -> i64 {
583    chrono::Utc::now().timestamp_millis()
584}
585
586/// Discriminant name for a `Command`. Uses the serde tag (the
587/// `cmd` field of the snake_case-tagged enum) so the saga log row
588/// is easy to read in `--diag sagas`.
589pub(crate) fn command_discriminant_name(cmd: &Command) -> String {
590    match serde_json::to_value(cmd) {
591        Ok(serde_json::Value::Object(map)) => map
592            .get("cmd")
593            .and_then(|v| v.as_str())
594            .map(|s| s.to_string())
595            .unwrap_or_else(|| "unknown".to_string()),
596        _ => "unknown".to_string(),
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use agentmux_common::ipc::{ClientKind, ErrorCode};
604    use tempfile::NamedTempFile;
605
606    fn temp_log() -> (NamedTempFile, SagaLog) {
607        let f = NamedTempFile::new().expect("tempfile");
608        let log = SagaLog::open(f.path()).expect("open");
609        (f, log)
610    }
611
612    fn ping(nonce: u64) -> Command {
613        Command::Ping { nonce }
614    }
615
616    fn pong(nonce: u64) -> Event {
617        Event::Pong { nonce, version: 0 }
618    }
619
620    #[test]
621    fn schema_migration_clean_db() {
622        let f = NamedTempFile::new().unwrap();
623        // First open creates schema.
624        let _log = SagaLog::open(f.path()).unwrap();
625        // Second open is idempotent (CREATE TABLE IF NOT EXISTS).
626        let _log = SagaLog::open(f.path()).unwrap();
627    }
628
629    #[test]
630    fn round_trip_completed() {
631        let (_f, log) = temp_log();
632        log.start_saga(1, "tear_off_tab", &serde_json::json!({"tab_id": "abc"}))
633            .unwrap();
634        log.start_step(1, 0, "Ping", &ping(7)).unwrap();
635        log.finish_step(1, 0, &[pong(7)]).unwrap();
636        log.start_step(1, 1, "Ping", &ping(8)).unwrap();
637        log.finish_step(1, 1, &[pong(8)]).unwrap();
638        log.terminate(1, SagaOutcome::Completed).unwrap();
639
640        let snap = log.snapshot_recent(10).unwrap();
641        assert_eq!(snap.len(), 1);
642        assert_eq!(snap[0].saga_id, 1);
643        assert_eq!(snap[0].name, "tear_off_tab");
644        assert_eq!(snap[0].state, "completed");
645        assert!(snap[0].terminal_at.is_some());
646        assert!(snap[0].failure_reason.is_none());
647        assert_eq!(snap[0].step_count, 2);
648    }
649
650    #[test]
651    fn round_trip_failed_with_compensation() {
652        let (_f, log) = temp_log();
653        log.start_saga(2, "tear_off_block", &serde_json::json!({}))
654            .unwrap();
655        log.start_step(2, 0, "Ping", &ping(1)).unwrap();
656        log.finish_step(2, 0, &[pong(1)]).unwrap();
657        log.start_step(2, 1, "Ping", &ping(2)).unwrap();
658        log.fail_step(2, 1, "boom").unwrap();
659        // Compensation: walk the one succeeded step in reverse.
660        log.compensate_step(2, 0, &[pong(99)]).unwrap();
661        log.terminate(
662            2,
663            SagaOutcome::Compensated {
664                reason: "boom".to_string(),
665            },
666        )
667        .unwrap();
668
669        let snap = log.snapshot_recent(10).unwrap();
670        assert_eq!(snap.len(), 1);
671        assert_eq!(snap[0].state, "compensated");
672        assert_eq!(snap[0].failure_reason.as_deref(), Some("boom"));
673    }
674
675    #[test]
676    fn unresolved_sagas_returns_only_in_flight() {
677        let (_f, log) = temp_log();
678
679        // Saga 1: completed.
680        log.start_saga(1, "saga_a", &serde_json::json!({})).unwrap();
681        log.terminate(1, SagaOutcome::Completed).unwrap();
682
683        // Saga 2: still running.
684        log.start_saga(2, "saga_b", &serde_json::json!({"x": 1}))
685            .unwrap();
686        log.start_step(2, 0, "Ping", &ping(0)).unwrap();
687        log.finish_step(2, 0, &[pong(0)]).unwrap();
688
689        // Saga 3: failed (terminal). NOW INCLUDED in unresolved
690        // (codex P1 round 2): classify_run_saga_result records
691        // timeout/cancel paths as `failed`, and those are exactly
692        // the partial-apply cases recovery needs to handle.
693        log.start_saga(3, "saga_c", &serde_json::json!({})).unwrap();
694        log.terminate(
695            3,
696            SagaOutcome::Failed {
697                reason: "oops".to_string(),
698            },
699        )
700        .unwrap();
701
702        // Saga 4: compensating (terminal not reached).
703        log.start_saga(4, "saga_d", &serde_json::json!({})).unwrap();
704        // Manually flip to compensating without going through
705        // terminate (mirrors what PR 2's compensate path will do).
706        log.conn
707            .lock()
708            .unwrap()
709            .execute(
710                "UPDATE saga SET state = 'compensating' WHERE saga_id = 4",
711                [],
712            )
713            .unwrap();
714
715        // Saga 5: compensated (terminal — recovery already ran).
716        log.start_saga(5, "saga_e", &serde_json::json!({})).unwrap();
717        log.terminate(
718            5,
719            SagaOutcome::Compensated {
720                reason: "rolled back".to_string(),
721            },
722        )
723        .unwrap();
724
725        let unresolved = log.unresolved_sagas().unwrap();
726        let mut ids: Vec<u64> = unresolved.iter().map(|u| u.saga_id).collect();
727        ids.sort();
728        // 2 (running), 3 (failed), 4 (compensating). Excludes 1
729        // (completed) and 5 (compensated).
730        assert_eq!(ids, vec![2, 3, 4]);
731
732        // Saga 2 carries its succeeded step.
733        let saga2 = unresolved.iter().find(|u| u.saga_id == 2).unwrap();
734        assert_eq!(saga2.state, "running");
735        assert_eq!(saga2.steps.len(), 1);
736        assert_eq!(saga2.steps[0].state, "succeeded");
737        assert_eq!(saga2.steps[0].name, "Ping");
738
739        // Saga 3 is failed but eligible for recovery.
740        let saga3 = unresolved.iter().find(|u| u.saga_id == 3).unwrap();
741        assert_eq!(saga3.state, "failed");
742
743        // Saga 4 has no steps yet (started but never dispatched).
744        let saga4 = unresolved.iter().find(|u| u.saga_id == 4).unwrap();
745        assert_eq!(saga4.state, "compensating");
746        assert!(saga4.steps.is_empty());
747    }
748
749    #[test]
750    fn fail_step_records_reason_in_output_json() {
751        let (_f, log) = temp_log();
752        log.start_saga(5, "saga_fail", &serde_json::json!({})).unwrap();
753        log.start_step(5, 0, "Ping", &ping(0)).unwrap();
754        log.fail_step(5, 0, "reducer rejected").unwrap();
755
756        // Inspect via unresolved (saga is still 'running' since
757        // terminate wasn't called).
758        let unresolved = log.unresolved_sagas().unwrap();
759        assert_eq!(unresolved.len(), 1);
760        let step = &unresolved[0].steps[0];
761        assert_eq!(step.state, "failed");
762        let parsed: serde_json::Value =
763            serde_json::from_str(step.output_json.as_ref().unwrap()).unwrap();
764        assert_eq!(parsed["error"], "reducer rejected");
765    }
766
767    #[test]
768    fn snapshot_recent_orders_most_recent_first_and_respects_limit() {
769        let (_f, log) = temp_log();
770        for i in 1..=5 {
771            log.start_saga(i, &format!("saga_{i}"), &serde_json::json!({}))
772                .unwrap();
773            log.terminate(i, SagaOutcome::Completed).unwrap();
774            // Tiny pause so terminal_at differs (millisecond resolution
775            // on most platforms is enough but not guaranteed in
776            // tight loops).
777            std::thread::sleep(std::time::Duration::from_millis(2));
778        }
779
780        let snap = log.snapshot_recent(3).unwrap();
781        assert_eq!(snap.len(), 3);
782        // Most recent first → ids 5, 4, 3.
783        assert_eq!(snap[0].saga_id, 5);
784        assert_eq!(snap[1].saga_id, 4);
785        assert_eq!(snap[2].saga_id, 3);
786    }
787
788    #[test]
789    fn command_discriminant_name_uses_serde_tag() {
790        assert_eq!(command_discriminant_name(&ping(0)), "ping");
791        assert_eq!(
792            command_discriminant_name(&Command::Goodbye),
793            "goodbye"
794        );
795        assert_eq!(
796            command_discriminant_name(&Command::Register {
797                kind: ClientKind::Tool,
798                pid: 1,
799                version: "v".to_string()
800            }),
801            "register"
802        );
803    }
804
805    // Touch the imported types so test compilation surfaces any
806    // future enum-shape drift early; these symbols are otherwise
807    // referenced only by saga authors.
808    #[test]
809    fn imports_are_live() {
810        let _ = ErrorCode::InvalidCommand;
811    }
812
813    // codex P1 PR #631 — start_saga must NOT silently overwrite an
814    // existing saga_id row. With the seed-from-MAX(saga_id) startup
815    // logic, collisions should never happen in practice; if one ever
816    // does, it's a bug worth surfacing.
817    #[test]
818    fn start_saga_rejects_duplicate_id() {
819        let (_tmp, log) = temp_log();
820        log.start_saga(42, "tear_off_tab", &serde_json::json!({"a": 1}))
821            .unwrap();
822        // A second start_saga with the same id must fail (no OR REPLACE).
823        let err = log
824            .start_saga(42, "tear_off_tab", &serde_json::json!({"a": 2}))
825            .unwrap_err();
826        // Surfaces as a SQLite UNIQUE constraint violation.
827        let msg = format!("{}", err);
828        assert!(
829            msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"),
830            "expected unique-constraint error, got: {msg}",
831        );
832    }
833
834    // reagent P1 + codex P1 PR #631 — max_saga_id seeds the
835    // allocator at startup; without this, restarts would reuse
836    // saga_ids 1, 2, 3... and silently overwrite prior runs.
837    #[test]
838    fn max_saga_id_returns_highest_persisted() {
839        let (_tmp, log) = temp_log();
840        assert_eq!(log.max_saga_id().unwrap(), 0); // empty
841        log.start_saga(7, "a", &serde_json::Value::Null).unwrap();
842        log.start_saga(42, "b", &serde_json::Value::Null).unwrap();
843        log.start_saga(13, "c", &serde_json::Value::Null).unwrap();
844        assert_eq!(log.max_saga_id().unwrap(), 42);
845    }
846
847    // codex P2 PR #631 — foreign keys enabled means a saga_step
848    // referencing a non-existent saga_id is rejected.
849    #[test]
850    fn foreign_keys_enforced_on_saga_step() {
851        let (_tmp, log) = temp_log();
852        // Try to insert a step for a saga that was never started.
853        let err = log
854            .start_step(999, 0, "MoveTab", &ping(1))
855            .unwrap_err();
856        let msg = format!("{}", err);
857        assert!(
858            msg.to_lowercase().contains("foreign key")
859                || msg.to_lowercase().contains("constraint"),
860            "expected foreign-key error, got: {msg}",
861        );
862    }
863
864    // reagent P1 PR #631 — input_json must round-trip the saga's
865    // actual input args (was being stubbed with Value::Null).
866    #[test]
867    fn start_saga_persists_input_json() {
868        let (_tmp, log) = temp_log();
869        let input = serde_json::json!({
870            "tab_id": "tab-abc",
871            "source_workspace_id": "ws-1",
872        });
873        log.start_saga(1, "tear_off_tab", &input).unwrap();
874        let snapshot = log.snapshot_recent(1).unwrap();
875        assert_eq!(snapshot.len(), 1);
876        let parsed: serde_json::Value =
877            serde_json::from_str(&snapshot[0].input_json).unwrap();
878        assert_eq!(parsed, input);
879    }
880
881    // Step 7 — E.7 recovery-from-crash scaffold.
882    //
883    // Asserts the API surface PR 2's `compensate_unresolved` will
884    // need to walk back partially-applied sagas. The full
885    // compensate-on-restart logic ships in PR 2; this test pins the
886    // shape of `unresolved_sagas()` so PR 2 can wire against a
887    // stable contract.
888    //
889    // Scenario: a saga went through TWO succeeded forward steps,
890    // then the srv process crashed mid-third-step (no terminal row).
891    // After restart, `unresolved_sagas()` must:
892    //   1. Return the saga in `running` state.
893    //   2. Surface its succeeded steps in `step_index ASC` order so
894    //      PR 2's reverse walker can iterate `.iter().rev()` and
895    //      dispatch compensation in LIFO order (most-recently-applied
896    //      first, matching standard saga compensation semantics).
897    //   3. Preserve `cmd_json` per step so PR 2 can reconstruct the
898    //      compensating command shape (e.g., `MoveTab src↔dst` swap).
899    //   4. Carry the saga's `input_json` for `--diag sagas` triage.
900    #[test]
901    fn unresolved_saga_exposes_succeeded_steps_in_index_order_for_reverse_walk() {
902        let (_f, log) = temp_log();
903
904        // A saga that got through two forward steps (CreateWorkspace
905        // + MoveTab from a tear_off_tab run), then "crashed" before
906        // terminate(). No fail/compensate writes — the lifecycle row
907        // stays in `running`, the steps stay `succeeded`.
908        let input = serde_json::json!({
909            "tab_id": "tab-abc",
910            "source_workspace_id": "ws-1",
911        });
912        log.start_saga(101, "tear_off_tab", &input).unwrap();
913
914        let create_cmd = Command::Ping { nonce: 1 }; // stand-in: CreateWorkspace
915        let move_cmd = Command::Ping { nonce: 2 }; // stand-in: MoveTab
916        log.start_step(101, 0, "CreateWorkspace", &create_cmd)
917            .unwrap();
918        log.finish_step(101, 0, &[pong(1)]).unwrap();
919        log.start_step(101, 1, "MoveTab", &move_cmd).unwrap();
920        log.finish_step(101, 1, &[pong(2)]).unwrap();
921
922        // Imagine srv crashed here — no terminate(). On restart we
923        // open a fresh SagaLog over the same DB; saga 101 should
924        // appear in unresolved_sagas with both succeeded steps.
925        let unresolved = log.unresolved_sagas().unwrap();
926        assert_eq!(unresolved.len(), 1);
927        let saga = &unresolved[0];
928
929        // (1) Saga is in `running` state — PR 2 picks these up first.
930        assert_eq!(saga.saga_id, 101);
931        assert_eq!(saga.name, "tear_off_tab");
932        assert_eq!(saga.state, "running");
933
934        // (2) Steps are returned in ascending step_index order — PR 2's
935        // reverse-walker iterates `.iter().rev()` to compensate LIFO.
936        assert_eq!(saga.steps.len(), 2);
937        assert_eq!(saga.steps[0].step_index, 0);
938        assert_eq!(saga.steps[0].name, "CreateWorkspace");
939        assert_eq!(saga.steps[0].state, "succeeded");
940        assert_eq!(saga.steps[1].step_index, 1);
941        assert_eq!(saga.steps[1].name, "MoveTab");
942        assert_eq!(saga.steps[1].state, "succeeded");
943
944        // (3) cmd_json round-trips so PR 2 can re-deserialize and
945        // construct the compensating command shape.
946        let parsed_create: Command =
947            serde_json::from_str(&saga.steps[0].cmd_json).unwrap();
948        let parsed_move: Command = serde_json::from_str(&saga.steps[1].cmd_json).unwrap();
949        assert!(matches!(parsed_create, Command::Ping { nonce: 1 }));
950        assert!(matches!(parsed_move, Command::Ping { nonce: 2 }));
951
952        // (4) input_json survives the restart for operator triage.
953        let parsed_input: serde_json::Value =
954            serde_json::from_str(&saga.input_json).unwrap();
955        assert_eq!(parsed_input["tab_id"], "tab-abc");
956        assert_eq!(parsed_input["source_workspace_id"], "ws-1");
957
958        // Sanity check: walking succeeded-only in reverse (the actual
959        // PR 2 algorithm) yields step indices [1, 0] — most-recently-
960        // applied first.
961        let reverse_succeeded: Vec<u32> = saga
962            .steps
963            .iter()
964            .rev()
965            .filter(|s| s.state == "succeeded")
966            .map(|s| s.step_index)
967            .collect();
968        assert_eq!(reverse_succeeded, vec![1, 0]);
969    }
970
971    // Step 7 — E.7 recovery scaffold continued.
972    //
973    // Failed-mid-step is a different recovery shape: the lifecycle
974    // is still `running`, but one step is `failed` and earlier steps
975    // are `succeeded`. PR 2's reverse-walker must skip the failed
976    // step (its effects didn't apply by definition) and compensate
977    // the prior succeeded ones.
978    #[test]
979    fn unresolved_saga_with_mid_step_failure_exposes_succeeded_prefix() {
980        let (_f, log) = temp_log();
981        log.start_saga(202, "tear_off_block", &serde_json::json!({})).unwrap();
982        log.start_step(202, 0, "Ping", &ping(1)).unwrap();
983        log.finish_step(202, 0, &[pong(1)]).unwrap();
984        log.start_step(202, 1, "Ping", &ping(2)).unwrap();
985        log.fail_step(202, 1, "reducer rejected").unwrap();
986        // Imagine crash before compensation could finish — saga
987        // lifecycle row stays in `running`.
988
989        let unresolved = log.unresolved_sagas().unwrap();
990        let saga = unresolved.iter().find(|s| s.saga_id == 202).unwrap();
991        assert_eq!(saga.state, "running");
992        // Both step rows present, in index order, with the failure
993        // distinguishable so PR 2's recovery skips it.
994        assert_eq!(saga.steps.len(), 2);
995        assert_eq!(saga.steps[0].state, "succeeded");
996        assert_eq!(saga.steps[1].state, "failed");
997        // PR 2 will iterate succeeded prefix for compensation —
998        // demonstrate the iteration shape.
999        let to_compensate: Vec<u32> = saga
1000            .steps
1001            .iter()
1002            .rev()
1003            .filter(|s| s.state == "succeeded")
1004            .map(|s| s.step_index)
1005            .collect();
1006        assert_eq!(to_compensate, vec![0]);
1007    }
1008}