SagaLog

Struct SagaLog 

Source
pub struct SagaLog {
    conn: Mutex<Connection>,
}
Expand description

SQLite-backed saga log. Owned by AppState as Arc<SagaLog>; every SagaCtx::dispatch and compensate call writes through it. See module-level docs for design notes.

Fields§

§conn: Mutex<Connection>

Implementations§

Source§

impl SagaLog

Source

pub fn open(path: &Path) -> Result<Self, StoreError>

Open a saga log backed by the given SQLite file. Configures WAL mode + 5s busy timeout (mirroring WaveStore::open) and applies the schema migration.

Source

pub fn open_in_memory() -> Result<Self, StoreError>

Open an in-memory saga log for testing.

Source

fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError>

Source

pub fn max_saga_id(&self) -> Result<u64, StoreError>

Highest existing saga_id in the durable log, or 0 if the log is empty. Used at startup to seed state.saga_id_alloc so new sagas don’t reuse IDs from prior srv-process runs (reagent P1 PR #631 — INSERT would fail on collision but the saga itself would have already started, so we seed defensively).

Source

pub fn start_saga( &self, saga_id: u64, name: &str, input: &Value, ) -> Result<(), StoreError>

Insert a fresh saga row in running state. Called by the coordinator immediately after alloc_saga_id (and before any per-step writes).

Source

pub fn start_step( &self, saga_id: u64, step_index: u32, name: &str, cmd: &Command, ) -> Result<(), StoreError>

Insert a pending step row before dispatching the command. name is a short discriminant string (e.g. “MoveTab”); cmd is serialized as JSON for replay/debugging.

Source

pub fn finish_step( &self, saga_id: u64, step_index: u32, output_events: &[Event], ) -> Result<(), StoreError>

Mark a step succeeded and store its emitted events as JSON, used by compensation to reconstruct context if needed.

Source

pub fn fail_step( &self, saga_id: u64, step_index: u32, reason: &str, ) -> Result<(), StoreError>

Mark a step failed. Stores the reducer’s error message in output_json as {"error": ...} so PR 2’s --diag sagas can surface it without a separate column.

Source

pub fn compensate_step( &self, saga_id: u64, step_index: u32, output_events: &[Event], ) -> Result<(), StoreError>

Mark a compensation step compensated. Same shape as finish_step but distinct state. Called from SagaCtx::compensate after the reducer applies the compensating command successfully.

Source

pub fn next_step_index(&self, saga_id: u64) -> Result<u32, StoreError>

Highest existing step_index for a saga, +1, or 0 if no steps exist yet. Used by PR 2’s resume-on-startup so recovery compensation rows are appended AFTER the saga’s original step indices (rather than overwriting in place via compensate_step, which would lose the original step’s output_json provenance).

Source

pub fn append_recovery_step( &self, saga_id: u64, step_index: u32, name: &str, cmd: &Command, events: &[Event], error: Option<&str>, ) -> Result<(), StoreError>

Append a new compensation step row at step_index. Distinct from compensate_step() (which overwrites an existing step row by index): used by PR 2’s resume-on-startup to record a recovery dispatch as a NEW row so the original succeeded step’s provenance is preserved.

state='compensated' if error.is_none(), else 'failed' (with the reason in output_json).

Source

pub fn mark_compensating(&self, saga_id: u64) -> Result<(), StoreError>

Mark a saga as compensating — used by PR 2’s resume-on-startup to flag a saga as undergoing recovery before walking its succeeded steps in reverse. After all compensating dispatches run, the caller invokes terminate() with the appropriate outcome (Compensated / Failed).

(codex follow-up.) Distinct from terminate(state='compensated') because the saga is only mid-compensation here — its terminal_at and failure_reason are not known yet.

Source

pub fn mark_all_succeeded_steps_compensated( &self, saga_id: u64, ) -> Result<(), StoreError>

Mark ALL of a saga’s remaining succeeded forward steps as compensated in one shot. (codex P1 PR #636 round 5.) Used at emit_terminal time when the outcome is Compensated — the saga author is asserting “I’ve unwound everything,” so any forward step still in succeeded is by definition done. Catches the case where one compensating command undoes multiple forward steps (e.g. tear_off_block’s single DeleteWorkspace undoes both CreateWorkspace and CreateTab) — SagaCtx::compensate’s per-call stack pop only marks one, leaving the other as still-succeeded and triggering double-compensation on restart.

Source

pub fn mark_step_compensated( &self, saga_id: u64, step_index: u32, ) -> Result<(), StoreError>

Mark an original forward step as compensated. (codex P1 PR #636 round 2.) Without this, a second crash-recovery startup re-reads the still-succeeded original steps and replays their inverses again — flipping a previously-recovered saga into failed_compensation (or worse, applying duplicate side effects like a second delete). The recovery rows (append_recovery_step) live at fresh indices and don’t affect the original step’s state on their own; this update is the explicit “this forward step has been compensated” marker that the next recovery scan needs to see.

Source

pub fn mark_failed_compensation( &self, saga_id: u64, reason: &str, ) -> Result<(), StoreError>

Mark a saga as failed_compensation — used by PR 2’s resume- on-startup when at least one compensating dispatch errored. The saga is left in this terminal state for operator review; --diag sagas surfaces it.

Source

pub fn terminate( &self, saga_id: u64, outcome: SagaOutcome, ) -> Result<(), StoreError>

Write the saga’s terminal lifecycle row. Called from emit_terminal after the inner future returns.

Source

pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedSaga>, StoreError>

Return all sagas still in running, compensating, or failed state, each with its full step list (for PR 2’s compensate-on-restart reverse walk). Used at startup before the API server begins accepting requests.

(codex P1 PR #631 round 2.) failed is included because classify_run_saga_result records timeout/cancel paths as failed — those are exactly the partial-apply cases this durability layer exists to recover. A failed saga’s step list may have succeeded rows whose effects need compensation; if we filtered them out here, recovery would silently leave that state in place.

Source

pub fn snapshot_recent( &self, limit: u32, ) -> Result<Vec<SagaSnapshot>, StoreError>

Return up to limit recent sagas for --diag sagas. Sorted most-recent-first. step_count is the count of succeeded or compensated steps (i.e. progress through the saga).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Downcast for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

Convert Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

Convert Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
§

fn as_any(&self) -> &(dyn Any + 'static)

Convert &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
§

fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)

Convert &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
§

impl<T> DowncastSync for T
where T: Any + Send + Sync,

§

fn into_any_arc(self: Arc<T>) -> Arc<dyn Any + Sync + Send>

Convert Arc<Trait> (where Trait: Downcast) to Arc<Any>. Arc<Any> can then be further downcast into Arc<ConcreteType> where ConcreteType implements Trait.
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,