pub struct SagaCoordinator {
next_saga_id: AtomicU64,
in_flight: Mutex<HashMap<u64, InFlightSaga>>,
events_tx: Sender<Event>,
state: Arc<Mutex<State>>,
log: Option<Arc<LauncherSagaLog>>,
host_pipe: Option<Arc<HostPipe>>,
}Fields§
§next_saga_id: AtomicU64Monotonic saga-id allocator.
in_flight: Mutex<HashMap<u64, InFlightSaga>>In-flight sagas keyed by saga_id, each wrapped with the
durable-log bookkeeping (LSD-2: awaiting_step +
next_step_index).
events_tx: Sender<Event>Reference to the broadcast bus so the coordinator can emit
SagaStarted / SagaCompleted / SagaFailed.
state: Arc<Mutex<State>>Reference to the launcher’s reducer state for bump_version
when emitting saga lifecycle events.
log: Option<Arc<LauncherSagaLog>>LSD-2 — durable saga log. None in tests that don’t exercise
the durability path (the saga then logs + remains in flight,
preserving the pre-LSD-2 behavior tests rely on for end-to-end
bracket assertions). When Some, every saga lifecycle
transition (spawn_saga → start_saga, IssueCmd →
start_step, awaited-event consumed → finish_step,
Done → terminate_saga(Completed), Failed / evicted →
terminate_saga(Failed)) writes to this log.
host_pipe: Option<Arc<HostPipe>>CPD-3 — launcher → host pipe wrapper. apply_action for
IssueCmd::Host dispatches via host_pipe.send_command() when
installed. None in tests that don’t exercise host dispatch
(those drive sagas via synthetic terminal events on the bus).
Implementations§
Source§impl SagaCoordinator
impl SagaCoordinator
pub fn new(events_tx: Sender<Event>, state: Arc<Mutex<State>>) -> Self
Sourcepub fn with_host_pipe(self, host_pipe: Arc<HostPipe>) -> Self
pub fn with_host_pipe(self, host_pipe: Arc<HostPipe>) -> Self
CPD-3 — install the host pipe so IssueCmd::Host actions are
dispatched live (instead of log-only). Builder-style setter so
existing tests + the next_id_is_monotonic smoke don’t have to
construct a HostPipe. Production wiring (main.rs) calls this
once before run_coordinator is spawned.
Sourcepub fn with_log(self, log: Arc<LauncherSagaLog>) -> Result<Self, LogError>
pub fn with_log(self, log: Arc<LauncherSagaLog>) -> Result<Self, LogError>
LSD-2 — install the durable saga log so saga lifecycle
transitions are persisted to <data-dir>/db/launcher-sagas.db.
Builder-style setter rather than a constructor parameter so
existing tests + the next_id_is_monotonic smoke don’t have
to construct an in-memory log. Production wiring (in
main.rs) calls this once before run_coordinator is spawned.
Sourceasync fn next_event_version(&self) -> u64
async fn next_event_version(&self) -> u64
Bump the launcher’s event_version for a coordinator-emitted
lifecycle event. Brief mutex hold — no I/O between lock and
drop.
Sourceasync fn emit_started(&self, saga_id: u64, name: &'static str)
async fn emit_started(&self, saga_id: u64, name: &'static str)
Emit Event::SagaStarted after registering a saga.
Sourceasync fn emit_completed(&self, saga_id: u64)
async fn emit_completed(&self, saga_id: u64)
Emit Event::SagaCompleted after a saga returns Done.
Sourceasync fn emit_failed(&self, saga_id: u64, reason: String)
async fn emit_failed(&self, saga_id: u64, reason: String)
Emit Event::SagaFailed after a saga returns Failed or
after evict-and-replace cancels a same-kind in-flight saga.
F.7 cleanup audit: prior #[allow(dead_code)] removed — F.6
evict-and-replace policy now actively dispatches this on
every concurrent same-kind retrigger.
Sourceasync fn apply_action(
&self,
saga_id: u64,
name: &'static str,
action: SagaAction,
step_index: u32,
) -> ApplyOutcome
async fn apply_action( &self, saga_id: u64, name: &'static str, action: SagaAction, step_index: u32, ) -> ApplyOutcome
Apply a SagaAction returned by start or on_event. Returns
an ApplyOutcome carrying:
in_flight: true → caller keeps the saga inin_flight, false → caller removes it.awaiting_step:Some(idx)if the action was anIssueCmdthat allocated stepidxand the saga is now parked waiting for the echo event. Caller stores this on the InFlightSaga so the next non-Waiton_eventreturn can callLauncherSagaLog::finish_step(saga_id, idx, ev).
CPD-3 — IssueCmd::Host dispatches live through HostPipe
when one is installed. Tests without a host_pipe (the existing
F.5/F.6 unit + integration tests) fall back to log-only so
synthetic-terminal-event drivers continue to work.
LauncherSelf and Srv targets remain log-only — reserved
for class-D/E sagas, no consumer today.
LSD-2 — every state transition writes to LauncherSagaLog
when one is installed: IssueCmd → start_step, Done →
terminate_saga(Completed), Failed →
terminate_saga(Failed).
step_index is the caller-allocated index for THIS dispatch.
On the first dispatch in spawn_saga, the caller passes 0;
on subsequent dispatches from the event loop, the caller pulls
- bumps the per-InFlightSaga counter.
Terminal paths (Done, Failed, host-send-error) all go
through claim_terminal so only one of {normal Done/Failed,
timeout-task, SagaActionFailed listener, host-send-error}
can win the terminal-event race for a given saga_id.
(reagent P1 PR #644 round 1 + 2.)
Sourceasync fn claim_terminal(&self, saga_id: u64) -> bool
async fn claim_terminal(&self, saga_id: u64) -> bool
Atomically remove saga_id from in_flight and return whether
the caller was the one who removed it (i.e. has the right to
emit the terminal SagaCompleted / SagaFailed). Idempotent:
a second caller for the same saga_id sees false. Used by the
SagaAction::Done and SagaAction::Failed paths in
apply_action, by the per-saga timeout task in spawn_saga,
by the Event::SagaActionFailed listener in run_coordinator,
by the host-send-failure path in apply_action::IssueCmd::Host,
and by the eviction path in run_coordinator.
(reagent P1 PR #644 round 1.)
Sourceasync fn spawn_saga(self: &Arc<Self>, saga: Box<dyn Saga>) -> u64
async fn spawn_saga(self: &Arc<Self>, saga: Box<dyn Saga>) -> u64
Register a fresh saga, calling start and applying its first
action. The caller has already determined that the saga
should fire (e.g. matched a trigger event). Returns the saga’s
allocated id (logged + bracketed in SagaStarted).
LSD-2 — also writes a running row to the durable saga log
before invoking saga.start(), and (via apply_action)
records the saga’s first dispatched step.
CPD-3 — also arms a per-saga deadline timer. If the saga is
still in_flight when saga.timeout() elapses, a background
task force-fails it (SagaFailed { reason: "saga timeout" })
and removes it from the registry. Per spec §3.10. Insert-then-
start ordering is required so apply_action’s claim_terminal
guard can succeed for immediate-completion sagas (codex P2 PR
#644 round 2).