SagaCoordinator

Struct SagaCoordinator 

Source
pub struct SagaCoordinator {
    next_saga_id: AtomicU64,
    in_flight: Mutex<HashMap<u64, InFlightSaga>>,
    events_tx: Sender<Event>,
    state: Arc<Mutex<State>>,
    log: Option<Arc<LauncherSagaLog>>,
    host_pipe: Option<Arc<HostPipe>>,
}

Fields§

§next_saga_id: AtomicU64

Monotonic saga-id allocator.

§in_flight: Mutex<HashMap<u64, InFlightSaga>>

In-flight sagas keyed by saga_id, each wrapped with the durable-log bookkeeping (LSD-2: awaiting_step + next_step_index).

§events_tx: Sender<Event>

Reference to the broadcast bus so the coordinator can emit SagaStarted / SagaCompleted / SagaFailed.

§state: Arc<Mutex<State>>

Reference to the launcher’s reducer state for bump_version when emitting saga lifecycle events.

§log: Option<Arc<LauncherSagaLog>>

LSD-2 — durable saga log. None in tests that don’t exercise the durability path (the saga then logs + remains in flight, preserving the pre-LSD-2 behavior tests rely on for end-to-end bracket assertions). When Some, every saga lifecycle transition (spawn_sagastart_saga, IssueCmdstart_step, awaited-event consumed → finish_step, Doneterminate_saga(Completed), Failed / evicted → terminate_saga(Failed)) writes to this log.

§host_pipe: Option<Arc<HostPipe>>

CPD-3 — launcher → host pipe wrapper. apply_action for IssueCmd::Host dispatches via host_pipe.send_command() when installed. None in tests that don’t exercise host dispatch (those drive sagas via synthetic terminal events on the bus).

Implementations§

Source§

impl SagaCoordinator

Source

pub fn new(events_tx: Sender<Event>, state: Arc<Mutex<State>>) -> Self

Source

pub fn with_host_pipe(self, host_pipe: Arc<HostPipe>) -> Self

CPD-3 — install the host pipe so IssueCmd::Host actions are dispatched live (instead of log-only). Builder-style setter so existing tests + the next_id_is_monotonic smoke don’t have to construct a HostPipe. Production wiring (main.rs) calls this once before run_coordinator is spawned.

Source

pub fn with_log(self, log: Arc<LauncherSagaLog>) -> Result<Self, LogError>

LSD-2 — install the durable saga log so saga lifecycle transitions are persisted to <data-dir>/db/launcher-sagas.db. Builder-style setter rather than a constructor parameter so existing tests + the next_id_is_monotonic smoke don’t have to construct an in-memory log. Production wiring (in main.rs) calls this once before run_coordinator is spawned.

Source

pub fn next_id(&self) -> u64

Allocate the next saga_id. Monotonic per launcher run.

Source

async fn next_event_version(&self) -> u64

Bump the launcher’s event_version for a coordinator-emitted lifecycle event. Brief mutex hold — no I/O between lock and drop.

Source

async fn emit_started(&self, saga_id: u64, name: &'static str)

Emit Event::SagaStarted after registering a saga.

Source

async fn emit_completed(&self, saga_id: u64)

Emit Event::SagaCompleted after a saga returns Done.

Source

async fn emit_failed(&self, saga_id: u64, reason: String)

Emit Event::SagaFailed after a saga returns Failed or after evict-and-replace cancels a same-kind in-flight saga. F.7 cleanup audit: prior #[allow(dead_code)] removed — F.6 evict-and-replace policy now actively dispatches this on every concurrent same-kind retrigger.

Source

async fn apply_action( &self, saga_id: u64, name: &'static str, action: SagaAction, step_index: u32, ) -> ApplyOutcome

Apply a SagaAction returned by start or on_event. Returns an ApplyOutcome carrying:

  • in_flight: true → caller keeps the saga in in_flight, false → caller removes it.
  • awaiting_step: Some(idx) if the action was an IssueCmd that allocated step idx and the saga is now parked waiting for the echo event. Caller stores this on the InFlightSaga so the next non-Wait on_event return can call LauncherSagaLog::finish_step(saga_id, idx, ev).

CPD-3 — IssueCmd::Host dispatches live through HostPipe when one is installed. Tests without a host_pipe (the existing F.5/F.6 unit + integration tests) fall back to log-only so synthetic-terminal-event drivers continue to work. LauncherSelf and Srv targets remain log-only — reserved for class-D/E sagas, no consumer today.

LSD-2 — every state transition writes to LauncherSagaLog when one is installed: IssueCmdstart_step, Doneterminate_saga(Completed), Failedterminate_saga(Failed).

step_index is the caller-allocated index for THIS dispatch. On the first dispatch in spawn_saga, the caller passes 0; on subsequent dispatches from the event loop, the caller pulls

  • bumps the per-InFlightSaga counter.

Terminal paths (Done, Failed, host-send-error) all go through claim_terminal so only one of {normal Done/Failed, timeout-task, SagaActionFailed listener, host-send-error} can win the terminal-event race for a given saga_id. (reagent P1 PR #644 round 1 + 2.)

Source

async fn claim_terminal(&self, saga_id: u64) -> bool

Atomically remove saga_id from in_flight and return whether the caller was the one who removed it (i.e. has the right to emit the terminal SagaCompleted / SagaFailed). Idempotent: a second caller for the same saga_id sees false. Used by the SagaAction::Done and SagaAction::Failed paths in apply_action, by the per-saga timeout task in spawn_saga, by the Event::SagaActionFailed listener in run_coordinator, by the host-send-failure path in apply_action::IssueCmd::Host, and by the eviction path in run_coordinator. (reagent P1 PR #644 round 1.)

Source

async fn spawn_saga(self: &Arc<Self>, saga: Box<dyn Saga>) -> u64

Register a fresh saga, calling start and applying its first action. The caller has already determined that the saga should fire (e.g. matched a trigger event). Returns the saga’s allocated id (logged + bracketed in SagaStarted).

LSD-2 — also writes a running row to the durable saga log before invoking saga.start(), and (via apply_action) records the saga’s first dispatched step.

CPD-3 — also arms a per-saga deadline timer. If the saga is still in_flight when saga.timeout() elapses, a background task force-fails it (SagaFailed { reason: "saga timeout" }) and removes it from the registry. Per spec §3.10. Insert-then- start ordering is required so apply_action’s claim_terminal guard can succeed for immediate-completion sagas (codex P2 PR #644 round 2).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more