pub struct SagaLog {
conn: Mutex<Connection>,
}Expand description
SQLite-backed saga log. Owned by AppState as Arc<SagaLog>;
every SagaCtx::dispatch and compensate call writes through
it. See module-level docs for design notes.
Fields§
§conn: Mutex<Connection>Implementations§
Source§impl SagaLog
impl SagaLog
Sourcepub fn open(path: &Path) -> Result<Self, StoreError>
pub fn open(path: &Path) -> Result<Self, StoreError>
Open a saga log backed by the given SQLite file. Configures
WAL mode + 5s busy timeout (mirroring WaveStore::open) and
applies the schema migration.
Sourcepub fn open_in_memory() -> Result<Self, StoreError>
pub fn open_in_memory() -> Result<Self, StoreError>
Open an in-memory saga log for testing.
fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError>
Sourcepub fn max_saga_id(&self) -> Result<u64, StoreError>
pub fn max_saga_id(&self) -> Result<u64, StoreError>
Highest existing saga_id in the durable log, or 0 if the log
is empty. Used at startup to seed state.saga_id_alloc so new
sagas don’t reuse IDs from prior srv-process runs (reagent P1
PR #631 — INSERT would fail on collision but the saga itself
would have already started, so we seed defensively).
Sourcepub fn start_saga(
&self,
saga_id: u64,
name: &str,
input: &Value,
) -> Result<(), StoreError>
pub fn start_saga( &self, saga_id: u64, name: &str, input: &Value, ) -> Result<(), StoreError>
Insert a fresh saga row in running state. Called by the
coordinator immediately after alloc_saga_id (and before any
per-step writes).
Sourcepub fn start_step(
&self,
saga_id: u64,
step_index: u32,
name: &str,
cmd: &Command,
) -> Result<(), StoreError>
pub fn start_step( &self, saga_id: u64, step_index: u32, name: &str, cmd: &Command, ) -> Result<(), StoreError>
Insert a pending step row before dispatching the command.
name is a short discriminant string (e.g. “MoveTab”); cmd
is serialized as JSON for replay/debugging.
Sourcepub fn finish_step(
&self,
saga_id: u64,
step_index: u32,
output_events: &[Event],
) -> Result<(), StoreError>
pub fn finish_step( &self, saga_id: u64, step_index: u32, output_events: &[Event], ) -> Result<(), StoreError>
Mark a step succeeded and store its emitted events as
JSON, used by compensation to reconstruct context if needed.
Sourcepub fn fail_step(
&self,
saga_id: u64,
step_index: u32,
reason: &str,
) -> Result<(), StoreError>
pub fn fail_step( &self, saga_id: u64, step_index: u32, reason: &str, ) -> Result<(), StoreError>
Mark a step failed. Stores the reducer’s error message in
output_json as {"error": ...} so PR 2’s --diag sagas
can surface it without a separate column.
Sourcepub fn compensate_step(
&self,
saga_id: u64,
step_index: u32,
output_events: &[Event],
) -> Result<(), StoreError>
pub fn compensate_step( &self, saga_id: u64, step_index: u32, output_events: &[Event], ) -> Result<(), StoreError>
Mark a compensation step compensated. Same shape as
finish_step but distinct state. Called from
SagaCtx::compensate after the reducer applies the
compensating command successfully.
Sourcepub fn next_step_index(&self, saga_id: u64) -> Result<u32, StoreError>
pub fn next_step_index(&self, saga_id: u64) -> Result<u32, StoreError>
Highest existing step_index for a saga, +1, or 0 if no steps
exist yet. Used by PR 2’s resume-on-startup so recovery
compensation rows are appended AFTER the saga’s original step
indices (rather than overwriting in place via compensate_step,
which would lose the original step’s output_json provenance).
Sourcepub fn append_recovery_step(
&self,
saga_id: u64,
step_index: u32,
name: &str,
cmd: &Command,
events: &[Event],
error: Option<&str>,
) -> Result<(), StoreError>
pub fn append_recovery_step( &self, saga_id: u64, step_index: u32, name: &str, cmd: &Command, events: &[Event], error: Option<&str>, ) -> Result<(), StoreError>
Append a new compensation step row at step_index. Distinct
from compensate_step() (which overwrites an existing step row
by index): used by PR 2’s resume-on-startup to record a
recovery dispatch as a NEW row so the original succeeded
step’s provenance is preserved.
state='compensated' if error.is_none(), else 'failed'
(with the reason in output_json).
Sourcepub fn mark_compensating(&self, saga_id: u64) -> Result<(), StoreError>
pub fn mark_compensating(&self, saga_id: u64) -> Result<(), StoreError>
Mark a saga as compensating — used by PR 2’s resume-on-startup
to flag a saga as undergoing recovery before walking its
succeeded steps in reverse. After all compensating dispatches
run, the caller invokes terminate() with the appropriate
outcome (Compensated / Failed).
(codex follow-up.) Distinct from terminate(state='compensated')
because the saga is only mid-compensation here — its
terminal_at and failure_reason are not known yet.
Sourcepub fn mark_all_succeeded_steps_compensated(
&self,
saga_id: u64,
) -> Result<(), StoreError>
pub fn mark_all_succeeded_steps_compensated( &self, saga_id: u64, ) -> Result<(), StoreError>
Mark ALL of a saga’s remaining succeeded forward steps as
compensated in one shot. (codex P1 PR #636 round 5.) Used
at emit_terminal time when the outcome is Compensated —
the saga author is asserting “I’ve unwound everything,” so
any forward step still in succeeded is by definition done.
Catches the case where one compensating command undoes
multiple forward steps (e.g. tear_off_block’s single
DeleteWorkspace undoes both CreateWorkspace and
CreateTab) — SagaCtx::compensate’s per-call stack pop
only marks one, leaving the other as still-succeeded and
triggering double-compensation on restart.
Sourcepub fn mark_step_compensated(
&self,
saga_id: u64,
step_index: u32,
) -> Result<(), StoreError>
pub fn mark_step_compensated( &self, saga_id: u64, step_index: u32, ) -> Result<(), StoreError>
Mark an original forward step as compensated. (codex P1
PR #636 round 2.) Without this, a second crash-recovery
startup re-reads the still-succeeded original steps and
replays their inverses again — flipping a previously-recovered
saga into failed_compensation (or worse, applying duplicate
side effects like a second delete). The recovery rows
(append_recovery_step) live at fresh indices and don’t
affect the original step’s state on their own; this update
is the explicit “this forward step has been compensated”
marker that the next recovery scan needs to see.
Sourcepub fn mark_failed_compensation(
&self,
saga_id: u64,
reason: &str,
) -> Result<(), StoreError>
pub fn mark_failed_compensation( &self, saga_id: u64, reason: &str, ) -> Result<(), StoreError>
Mark a saga as failed_compensation — used by PR 2’s resume-
on-startup when at least one compensating dispatch errored. The
saga is left in this terminal state for operator review;
--diag sagas surfaces it.
Sourcepub fn terminate(
&self,
saga_id: u64,
outcome: SagaOutcome,
) -> Result<(), StoreError>
pub fn terminate( &self, saga_id: u64, outcome: SagaOutcome, ) -> Result<(), StoreError>
Write the saga’s terminal lifecycle row. Called from
emit_terminal after the inner future returns.
Sourcepub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedSaga>, StoreError>
pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedSaga>, StoreError>
Return all sagas still in running, compensating, or
failed state, each with its full step list (for PR 2’s
compensate-on-restart reverse walk). Used at startup before
the API server begins accepting requests.
(codex P1 PR #631 round 2.) failed is included because
classify_run_saga_result records timeout/cancel paths as
failed — those are exactly the partial-apply cases this
durability layer exists to recover. A failed saga’s step
list may have succeeded rows whose effects need
compensation; if we filtered them out here, recovery would
silently leave that state in place.
Sourcepub fn snapshot_recent(
&self,
limit: u32,
) -> Result<Vec<SagaSnapshot>, StoreError>
pub fn snapshot_recent( &self, limit: u32, ) -> Result<Vec<SagaSnapshot>, StoreError>
Return up to limit recent sagas for --diag sagas. Sorted
most-recent-first. step_count is the count of succeeded
or compensated steps (i.e. progress through the saga).
Auto Trait Implementations§
impl !Freeze for SagaLog
impl RefUnwindSafe for SagaLog
impl Send for SagaLog
impl Sync for SagaLog
impl Unpin for SagaLog
impl UnwindSafe for SagaLog
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can
then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be
further downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.