1use std::path::Path;
28use std::sync::Mutex;
29
30use agentmux_common::ipc::{Command, Event};
31use rusqlite::{params, Connection, OptionalExtension};
32use serde::{Deserialize, Serialize};
33
34use crate::backend::storage::error::StoreError;
35use crate::backend::storage::migrations::{
36 check_schema_compat, run_saga_log_migrations, stamp_version, SAGA_LOG_SCHEMA_VERSION,
37};
38
39#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum SagaOutcome {
42 Completed,
44 Failed { reason: String },
47 Compensated { reason: String },
50}
51
52impl SagaOutcome {
53 fn state_str(&self) -> &'static str {
54 match self {
55 SagaOutcome::Completed => "completed",
56 SagaOutcome::Failed { .. } => "failed",
57 SagaOutcome::Compensated { .. } => "compensated",
58 }
59 }
60
61 fn reason(&self) -> Option<&str> {
62 match self {
63 SagaOutcome::Completed => None,
64 SagaOutcome::Failed { reason } | SagaOutcome::Compensated { reason } => {
65 Some(reason.as_str())
66 }
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
75#[allow(dead_code)] pub struct UnresolvedSaga {
77 pub saga_id: u64,
78 pub name: String,
79 pub state: String,
80 pub started_at: i64,
81 pub input_json: String,
82 pub steps: Vec<UnresolvedStep>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
89#[allow(dead_code)] pub struct UnresolvedStep {
91 pub step_index: u32,
92 pub name: String,
93 pub state: String,
94 pub cmd_json: String,
95 pub output_json: Option<String>,
96 pub started_at: i64,
97 pub ended_at: Option<i64>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102#[allow(dead_code)] pub struct SagaSnapshot {
104 pub saga_id: u64,
105 pub name: String,
106 pub state: String,
107 pub started_at: i64,
108 pub terminal_at: Option<i64>,
109 pub failure_reason: Option<String>,
110 pub step_count: u32,
111 pub input_json: String,
115}
116
117pub struct SagaLog {
121 conn: Mutex<Connection>,
122}
123
124impl SagaLog {
125 pub fn open(path: &Path) -> Result<Self, StoreError> {
129 let conn = Connection::open(path)?;
130 Self::configure_and_migrate(conn)
131 }
132
133 #[allow(dead_code)]
135 pub fn open_in_memory() -> Result<Self, StoreError> {
136 let conn = Connection::open_in_memory()?;
137 Self::configure_and_migrate(conn)
138 }
139
140 fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError> {
141 conn.execute_batch(
147 "PRAGMA journal_mode=WAL;
148 PRAGMA busy_timeout=5000;
149 PRAGMA synchronous=NORMAL;
150 PRAGMA temp_store=MEMORY;
151 PRAGMA foreign_keys=ON;",
152 )?;
153 check_schema_compat(&conn, SAGA_LOG_SCHEMA_VERSION, "sagas.db")?;
157 run_saga_log_migrations(&conn)?;
158 stamp_version(&conn, SAGA_LOG_SCHEMA_VERSION)?;
159 Ok(Self {
160 conn: Mutex::new(conn),
161 })
162 }
163
164 pub fn max_saga_id(&self) -> Result<u64, StoreError> {
170 let conn = self.conn.lock().unwrap();
171 let max: Option<i64> = conn.query_row("SELECT MAX(saga_id) FROM saga", [], |r| r.get(0))?;
181 Ok(max.unwrap_or(0).max(0) as u64)
182 }
183
184 pub fn start_saga(
188 &self,
189 saga_id: u64,
190 name: &str,
191 input: &serde_json::Value,
192 ) -> Result<(), StoreError> {
193 let now = now_ms();
194 let input_json = serde_json::to_string(input)?;
195 let conn = self.conn.lock().unwrap();
196 conn.execute(
203 "INSERT INTO saga
204 (saga_id, name, state, started_at, terminal_at, failure_reason, input_json)
205 VALUES (?1, ?2, 'running', ?3, NULL, NULL, ?4)",
206 params![saga_id as i64, name, now, input_json],
207 )?;
208 Ok(())
209 }
210
211 pub fn start_step(
215 &self,
216 saga_id: u64,
217 step_index: u32,
218 name: &str,
219 cmd: &Command,
220 ) -> Result<(), StoreError> {
221 let now = now_ms();
222 let cmd_json = serde_json::to_string(cmd)?;
223 let conn = self.conn.lock().unwrap();
224 conn.execute(
226 "INSERT INTO saga_step
227 (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
228 VALUES (?1, ?2, ?3, 'pending', ?4, NULL, ?5, NULL)",
229 params![saga_id as i64, step_index, name, cmd_json, now],
230 )?;
231 Ok(())
232 }
233
234 pub fn finish_step(
237 &self,
238 saga_id: u64,
239 step_index: u32,
240 output_events: &[Event],
241 ) -> Result<(), StoreError> {
242 let now = now_ms();
243 let output_json = serde_json::to_string(output_events)?;
244 let conn = self.conn.lock().unwrap();
245 conn.execute(
246 "UPDATE saga_step
247 SET state = 'succeeded', output_json = ?1, ended_at = ?2
248 WHERE saga_id = ?3 AND step_index = ?4",
249 params![output_json, now, saga_id as i64, step_index],
250 )?;
251 Ok(())
252 }
253
254 pub fn fail_step(
258 &self,
259 saga_id: u64,
260 step_index: u32,
261 reason: &str,
262 ) -> Result<(), StoreError> {
263 let now = now_ms();
264 let output_json = serde_json::to_string(&serde_json::json!({ "error": reason }))
265 ?;
266 let conn = self.conn.lock().unwrap();
267 conn.execute(
268 "UPDATE saga_step
269 SET state = 'failed', output_json = ?1, ended_at = ?2
270 WHERE saga_id = ?3 AND step_index = ?4",
271 params![output_json, now, saga_id as i64, step_index],
272 )?;
273 Ok(())
274 }
275
276 pub fn compensate_step(
281 &self,
282 saga_id: u64,
283 step_index: u32,
284 output_events: &[Event],
285 ) -> Result<(), StoreError> {
286 let now = now_ms();
287 let output_json = serde_json::to_string(output_events)?;
288 let conn = self.conn.lock().unwrap();
289 conn.execute(
290 "INSERT OR REPLACE INTO saga_step
291 (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
292 VALUES (
293 ?1, ?2,
294 COALESCE((SELECT name FROM saga_step WHERE saga_id=?1 AND step_index=?2), 'compensate'),
295 'compensated',
296 COALESCE((SELECT cmd_json FROM saga_step WHERE saga_id=?1 AND step_index=?2), ''),
297 ?3,
298 COALESCE((SELECT started_at FROM saga_step WHERE saga_id=?1 AND step_index=?2), ?4),
299 ?4
300 )",
301 params![saga_id as i64, step_index, output_json, now],
302 )?;
303 Ok(())
304 }
305
306 pub fn next_step_index(&self, saga_id: u64) -> Result<u32, StoreError> {
312 let conn = self.conn.lock().unwrap();
313 let max: Option<i64> = conn.query_row(
314 "SELECT MAX(step_index) FROM saga_step WHERE saga_id = ?1",
315 params![saga_id as i64],
316 |r| r.get(0),
317 )?;
318 Ok((max.unwrap_or(-1) + 1) as u32)
319 }
320
321 pub fn append_recovery_step(
330 &self,
331 saga_id: u64,
332 step_index: u32,
333 name: &str,
334 cmd: &Command,
335 events: &[Event],
336 error: Option<&str>,
337 ) -> Result<(), StoreError> {
338 let now = now_ms();
339 let cmd_json = serde_json::to_string(cmd)?;
340 let (state, output_json) = match error {
341 None => ("compensated", serde_json::to_string(events)?),
342 Some(err) => (
343 "failed",
344 serde_json::to_string(&serde_json::json!({ "error": err }))?,
345 ),
346 };
347 let conn = self.conn.lock().unwrap();
348 conn.execute(
349 "INSERT INTO saga_step
350 (saga_id, step_index, name, state, cmd_json, output_json, started_at, ended_at)
351 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?7)",
352 params![saga_id as i64, step_index, name, state, cmd_json, output_json, now],
353 )?;
354 Ok(())
355 }
356
357 pub fn mark_compensating(&self, saga_id: u64) -> Result<(), StoreError> {
367 let conn = self.conn.lock().unwrap();
368 conn.execute(
369 "UPDATE saga SET state = 'compensating' WHERE saga_id = ?1",
370 params![saga_id as i64],
371 )?;
372 Ok(())
373 }
374
375 pub fn mark_all_succeeded_steps_compensated(
387 &self,
388 saga_id: u64,
389 ) -> Result<(), StoreError> {
390 let now = now_ms();
391 let conn = self.conn.lock().unwrap();
392 conn.execute(
393 "UPDATE saga_step
394 SET state = 'compensated', ended_at = ?1
395 WHERE saga_id = ?2 AND state = 'succeeded'",
396 params![now, saga_id as i64],
397 )?;
398 Ok(())
399 }
400
401 pub fn mark_step_compensated(
412 &self,
413 saga_id: u64,
414 step_index: u32,
415 ) -> Result<(), StoreError> {
416 let now = now_ms();
417 let conn = self.conn.lock().unwrap();
418 conn.execute(
419 "UPDATE saga_step
420 SET state = 'compensated', ended_at = ?1
421 WHERE saga_id = ?2 AND step_index = ?3 AND state = 'succeeded'",
422 params![now, saga_id as i64, step_index],
423 )?;
424 Ok(())
425 }
426
427 pub fn mark_failed_compensation(
432 &self,
433 saga_id: u64,
434 reason: &str,
435 ) -> Result<(), StoreError> {
436 let now = now_ms();
437 let conn = self.conn.lock().unwrap();
438 conn.execute(
439 "UPDATE saga
440 SET state = 'failed_compensation', terminal_at = ?1, failure_reason = ?2
441 WHERE saga_id = ?3",
442 params![now, reason, saga_id as i64],
443 )?;
444 Ok(())
445 }
446
447 pub fn terminate(&self, saga_id: u64, outcome: SagaOutcome) -> Result<(), StoreError> {
450 let now = now_ms();
451 let state = outcome.state_str();
452 let reason = outcome.reason();
453 let conn = self.conn.lock().unwrap();
454 conn.execute(
455 "UPDATE saga
456 SET state = ?1, terminal_at = ?2, failure_reason = ?3
457 WHERE saga_id = ?4",
458 params![state, now, reason, saga_id as i64],
459 )?;
460 Ok(())
461 }
462
463 #[allow(dead_code)] pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedSaga>, StoreError> {
477 let conn = self.conn.lock().unwrap();
478 let mut stmt = conn.prepare(
479 "SELECT saga_id, name, state, started_at, input_json
480 FROM saga
481 WHERE state IN ('running', 'compensating', 'failed')
482 ORDER BY saga_id ASC",
483 )?;
484 let saga_rows: Vec<(i64, String, String, i64, String)> = stmt
485 .query_map([], |row| {
486 Ok((
487 row.get::<_, i64>(0)?,
488 row.get::<_, String>(1)?,
489 row.get::<_, String>(2)?,
490 row.get::<_, i64>(3)?,
491 row.get::<_, String>(4)?,
492 ))
493 })?
494 .collect::<Result<Vec<_>, _>>()?;
495 drop(stmt);
496
497 let mut out = Vec::with_capacity(saga_rows.len());
498 for (saga_id, name, state, started_at, input_json) in saga_rows {
499 let mut step_stmt = conn.prepare(
500 "SELECT step_index, name, state, cmd_json, output_json, started_at, ended_at
501 FROM saga_step
502 WHERE saga_id = ?1
503 ORDER BY step_index ASC",
504 )?;
505 let steps: Vec<UnresolvedStep> = step_stmt
506 .query_map(params![saga_id], |row| {
507 Ok(UnresolvedStep {
508 step_index: row.get::<_, i64>(0)? as u32,
509 name: row.get::<_, String>(1)?,
510 state: row.get::<_, String>(2)?,
511 cmd_json: row.get::<_, String>(3)?,
512 output_json: row.get::<_, Option<String>>(4)?,
513 started_at: row.get::<_, i64>(5)?,
514 ended_at: row.get::<_, Option<i64>>(6)?,
515 })
516 })?
517 .collect::<Result<Vec<_>, _>>()?;
518 out.push(UnresolvedSaga {
519 saga_id: saga_id as u64,
520 name,
521 state,
522 started_at,
523 input_json,
524 steps,
525 });
526 }
527 Ok(out)
528 }
529
530 #[allow(dead_code)] pub fn snapshot_recent(&self, limit: u32) -> Result<Vec<SagaSnapshot>, StoreError> {
535 let conn = self.conn.lock().unwrap();
536 let mut stmt = conn.prepare(
537 "SELECT saga_id, name, state, started_at, terminal_at, failure_reason, input_json
538 FROM saga
539 ORDER BY COALESCE(terminal_at, started_at) DESC
540 LIMIT ?1",
541 )?;
542 let rows: Vec<(i64, String, String, i64, Option<i64>, Option<String>, String)> = stmt
543 .query_map(params![limit], |row| {
544 Ok((
545 row.get::<_, i64>(0)?,
546 row.get::<_, String>(1)?,
547 row.get::<_, String>(2)?,
548 row.get::<_, i64>(3)?,
549 row.get::<_, Option<i64>>(4)?,
550 row.get::<_, Option<String>>(5)?,
551 row.get::<_, String>(6)?,
552 ))
553 })?
554 .collect::<Result<Vec<_>, _>>()?;
555 drop(stmt);
556
557 let mut out = Vec::with_capacity(rows.len());
558 for (saga_id, name, state, started_at, terminal_at, failure_reason, input_json) in rows {
559 let count: Option<i64> = conn
560 .query_row(
561 "SELECT COUNT(*) FROM saga_step
562 WHERE saga_id = ?1 AND state IN ('succeeded', 'compensated')",
563 params![saga_id],
564 |row| row.get(0),
565 )
566 .optional()?;
567 out.push(SagaSnapshot {
568 saga_id: saga_id as u64,
569 name,
570 state,
571 started_at,
572 terminal_at,
573 failure_reason,
574 step_count: count.unwrap_or(0) as u32,
575 input_json,
576 });
577 }
578 Ok(out)
579 }
580}
581
582fn now_ms() -> i64 {
583 chrono::Utc::now().timestamp_millis()
584}
585
586pub(crate) fn command_discriminant_name(cmd: &Command) -> String {
590 match serde_json::to_value(cmd) {
591 Ok(serde_json::Value::Object(map)) => map
592 .get("cmd")
593 .and_then(|v| v.as_str())
594 .map(|s| s.to_string())
595 .unwrap_or_else(|| "unknown".to_string()),
596 _ => "unknown".to_string(),
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use agentmux_common::ipc::{ClientKind, ErrorCode};
604 use tempfile::NamedTempFile;
605
606 fn temp_log() -> (NamedTempFile, SagaLog) {
607 let f = NamedTempFile::new().expect("tempfile");
608 let log = SagaLog::open(f.path()).expect("open");
609 (f, log)
610 }
611
612 fn ping(nonce: u64) -> Command {
613 Command::Ping { nonce }
614 }
615
616 fn pong(nonce: u64) -> Event {
617 Event::Pong { nonce, version: 0 }
618 }
619
620 #[test]
621 fn schema_migration_clean_db() {
622 let f = NamedTempFile::new().unwrap();
623 let _log = SagaLog::open(f.path()).unwrap();
625 let _log = SagaLog::open(f.path()).unwrap();
627 }
628
629 #[test]
630 fn round_trip_completed() {
631 let (_f, log) = temp_log();
632 log.start_saga(1, "tear_off_tab", &serde_json::json!({"tab_id": "abc"}))
633 .unwrap();
634 log.start_step(1, 0, "Ping", &ping(7)).unwrap();
635 log.finish_step(1, 0, &[pong(7)]).unwrap();
636 log.start_step(1, 1, "Ping", &ping(8)).unwrap();
637 log.finish_step(1, 1, &[pong(8)]).unwrap();
638 log.terminate(1, SagaOutcome::Completed).unwrap();
639
640 let snap = log.snapshot_recent(10).unwrap();
641 assert_eq!(snap.len(), 1);
642 assert_eq!(snap[0].saga_id, 1);
643 assert_eq!(snap[0].name, "tear_off_tab");
644 assert_eq!(snap[0].state, "completed");
645 assert!(snap[0].terminal_at.is_some());
646 assert!(snap[0].failure_reason.is_none());
647 assert_eq!(snap[0].step_count, 2);
648 }
649
650 #[test]
651 fn round_trip_failed_with_compensation() {
652 let (_f, log) = temp_log();
653 log.start_saga(2, "tear_off_block", &serde_json::json!({}))
654 .unwrap();
655 log.start_step(2, 0, "Ping", &ping(1)).unwrap();
656 log.finish_step(2, 0, &[pong(1)]).unwrap();
657 log.start_step(2, 1, "Ping", &ping(2)).unwrap();
658 log.fail_step(2, 1, "boom").unwrap();
659 log.compensate_step(2, 0, &[pong(99)]).unwrap();
661 log.terminate(
662 2,
663 SagaOutcome::Compensated {
664 reason: "boom".to_string(),
665 },
666 )
667 .unwrap();
668
669 let snap = log.snapshot_recent(10).unwrap();
670 assert_eq!(snap.len(), 1);
671 assert_eq!(snap[0].state, "compensated");
672 assert_eq!(snap[0].failure_reason.as_deref(), Some("boom"));
673 }
674
675 #[test]
676 fn unresolved_sagas_returns_only_in_flight() {
677 let (_f, log) = temp_log();
678
679 log.start_saga(1, "saga_a", &serde_json::json!({})).unwrap();
681 log.terminate(1, SagaOutcome::Completed).unwrap();
682
683 log.start_saga(2, "saga_b", &serde_json::json!({"x": 1}))
685 .unwrap();
686 log.start_step(2, 0, "Ping", &ping(0)).unwrap();
687 log.finish_step(2, 0, &[pong(0)]).unwrap();
688
689 log.start_saga(3, "saga_c", &serde_json::json!({})).unwrap();
694 log.terminate(
695 3,
696 SagaOutcome::Failed {
697 reason: "oops".to_string(),
698 },
699 )
700 .unwrap();
701
702 log.start_saga(4, "saga_d", &serde_json::json!({})).unwrap();
704 log.conn
707 .lock()
708 .unwrap()
709 .execute(
710 "UPDATE saga SET state = 'compensating' WHERE saga_id = 4",
711 [],
712 )
713 .unwrap();
714
715 log.start_saga(5, "saga_e", &serde_json::json!({})).unwrap();
717 log.terminate(
718 5,
719 SagaOutcome::Compensated {
720 reason: "rolled back".to_string(),
721 },
722 )
723 .unwrap();
724
725 let unresolved = log.unresolved_sagas().unwrap();
726 let mut ids: Vec<u64> = unresolved.iter().map(|u| u.saga_id).collect();
727 ids.sort();
728 assert_eq!(ids, vec![2, 3, 4]);
731
732 let saga2 = unresolved.iter().find(|u| u.saga_id == 2).unwrap();
734 assert_eq!(saga2.state, "running");
735 assert_eq!(saga2.steps.len(), 1);
736 assert_eq!(saga2.steps[0].state, "succeeded");
737 assert_eq!(saga2.steps[0].name, "Ping");
738
739 let saga3 = unresolved.iter().find(|u| u.saga_id == 3).unwrap();
741 assert_eq!(saga3.state, "failed");
742
743 let saga4 = unresolved.iter().find(|u| u.saga_id == 4).unwrap();
745 assert_eq!(saga4.state, "compensating");
746 assert!(saga4.steps.is_empty());
747 }
748
749 #[test]
750 fn fail_step_records_reason_in_output_json() {
751 let (_f, log) = temp_log();
752 log.start_saga(5, "saga_fail", &serde_json::json!({})).unwrap();
753 log.start_step(5, 0, "Ping", &ping(0)).unwrap();
754 log.fail_step(5, 0, "reducer rejected").unwrap();
755
756 let unresolved = log.unresolved_sagas().unwrap();
759 assert_eq!(unresolved.len(), 1);
760 let step = &unresolved[0].steps[0];
761 assert_eq!(step.state, "failed");
762 let parsed: serde_json::Value =
763 serde_json::from_str(step.output_json.as_ref().unwrap()).unwrap();
764 assert_eq!(parsed["error"], "reducer rejected");
765 }
766
767 #[test]
768 fn snapshot_recent_orders_most_recent_first_and_respects_limit() {
769 let (_f, log) = temp_log();
770 for i in 1..=5 {
771 log.start_saga(i, &format!("saga_{i}"), &serde_json::json!({}))
772 .unwrap();
773 log.terminate(i, SagaOutcome::Completed).unwrap();
774 std::thread::sleep(std::time::Duration::from_millis(2));
778 }
779
780 let snap = log.snapshot_recent(3).unwrap();
781 assert_eq!(snap.len(), 3);
782 assert_eq!(snap[0].saga_id, 5);
784 assert_eq!(snap[1].saga_id, 4);
785 assert_eq!(snap[2].saga_id, 3);
786 }
787
788 #[test]
789 fn command_discriminant_name_uses_serde_tag() {
790 assert_eq!(command_discriminant_name(&ping(0)), "ping");
791 assert_eq!(
792 command_discriminant_name(&Command::Goodbye),
793 "goodbye"
794 );
795 assert_eq!(
796 command_discriminant_name(&Command::Register {
797 kind: ClientKind::Tool,
798 pid: 1,
799 version: "v".to_string()
800 }),
801 "register"
802 );
803 }
804
805 #[test]
809 fn imports_are_live() {
810 let _ = ErrorCode::InvalidCommand;
811 }
812
813 #[test]
818 fn start_saga_rejects_duplicate_id() {
819 let (_tmp, log) = temp_log();
820 log.start_saga(42, "tear_off_tab", &serde_json::json!({"a": 1}))
821 .unwrap();
822 let err = log
824 .start_saga(42, "tear_off_tab", &serde_json::json!({"a": 2}))
825 .unwrap_err();
826 let msg = format!("{}", err);
828 assert!(
829 msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"),
830 "expected unique-constraint error, got: {msg}",
831 );
832 }
833
834 #[test]
838 fn max_saga_id_returns_highest_persisted() {
839 let (_tmp, log) = temp_log();
840 assert_eq!(log.max_saga_id().unwrap(), 0); log.start_saga(7, "a", &serde_json::Value::Null).unwrap();
842 log.start_saga(42, "b", &serde_json::Value::Null).unwrap();
843 log.start_saga(13, "c", &serde_json::Value::Null).unwrap();
844 assert_eq!(log.max_saga_id().unwrap(), 42);
845 }
846
847 #[test]
850 fn foreign_keys_enforced_on_saga_step() {
851 let (_tmp, log) = temp_log();
852 let err = log
854 .start_step(999, 0, "MoveTab", &ping(1))
855 .unwrap_err();
856 let msg = format!("{}", err);
857 assert!(
858 msg.to_lowercase().contains("foreign key")
859 || msg.to_lowercase().contains("constraint"),
860 "expected foreign-key error, got: {msg}",
861 );
862 }
863
864 #[test]
867 fn start_saga_persists_input_json() {
868 let (_tmp, log) = temp_log();
869 let input = serde_json::json!({
870 "tab_id": "tab-abc",
871 "source_workspace_id": "ws-1",
872 });
873 log.start_saga(1, "tear_off_tab", &input).unwrap();
874 let snapshot = log.snapshot_recent(1).unwrap();
875 assert_eq!(snapshot.len(), 1);
876 let parsed: serde_json::Value =
877 serde_json::from_str(&snapshot[0].input_json).unwrap();
878 assert_eq!(parsed, input);
879 }
880
881 #[test]
901 fn unresolved_saga_exposes_succeeded_steps_in_index_order_for_reverse_walk() {
902 let (_f, log) = temp_log();
903
904 let input = serde_json::json!({
909 "tab_id": "tab-abc",
910 "source_workspace_id": "ws-1",
911 });
912 log.start_saga(101, "tear_off_tab", &input).unwrap();
913
914 let create_cmd = Command::Ping { nonce: 1 }; let move_cmd = Command::Ping { nonce: 2 }; log.start_step(101, 0, "CreateWorkspace", &create_cmd)
917 .unwrap();
918 log.finish_step(101, 0, &[pong(1)]).unwrap();
919 log.start_step(101, 1, "MoveTab", &move_cmd).unwrap();
920 log.finish_step(101, 1, &[pong(2)]).unwrap();
921
922 let unresolved = log.unresolved_sagas().unwrap();
926 assert_eq!(unresolved.len(), 1);
927 let saga = &unresolved[0];
928
929 assert_eq!(saga.saga_id, 101);
931 assert_eq!(saga.name, "tear_off_tab");
932 assert_eq!(saga.state, "running");
933
934 assert_eq!(saga.steps.len(), 2);
937 assert_eq!(saga.steps[0].step_index, 0);
938 assert_eq!(saga.steps[0].name, "CreateWorkspace");
939 assert_eq!(saga.steps[0].state, "succeeded");
940 assert_eq!(saga.steps[1].step_index, 1);
941 assert_eq!(saga.steps[1].name, "MoveTab");
942 assert_eq!(saga.steps[1].state, "succeeded");
943
944 let parsed_create: Command =
947 serde_json::from_str(&saga.steps[0].cmd_json).unwrap();
948 let parsed_move: Command = serde_json::from_str(&saga.steps[1].cmd_json).unwrap();
949 assert!(matches!(parsed_create, Command::Ping { nonce: 1 }));
950 assert!(matches!(parsed_move, Command::Ping { nonce: 2 }));
951
952 let parsed_input: serde_json::Value =
954 serde_json::from_str(&saga.input_json).unwrap();
955 assert_eq!(parsed_input["tab_id"], "tab-abc");
956 assert_eq!(parsed_input["source_workspace_id"], "ws-1");
957
958 let reverse_succeeded: Vec<u32> = saga
962 .steps
963 .iter()
964 .rev()
965 .filter(|s| s.state == "succeeded")
966 .map(|s| s.step_index)
967 .collect();
968 assert_eq!(reverse_succeeded, vec![1, 0]);
969 }
970
971 #[test]
979 fn unresolved_saga_with_mid_step_failure_exposes_succeeded_prefix() {
980 let (_f, log) = temp_log();
981 log.start_saga(202, "tear_off_block", &serde_json::json!({})).unwrap();
982 log.start_step(202, 0, "Ping", &ping(1)).unwrap();
983 log.finish_step(202, 0, &[pong(1)]).unwrap();
984 log.start_step(202, 1, "Ping", &ping(2)).unwrap();
985 log.fail_step(202, 1, "reducer rejected").unwrap();
986 let unresolved = log.unresolved_sagas().unwrap();
990 let saga = unresolved.iter().find(|s| s.saga_id == 202).unwrap();
991 assert_eq!(saga.state, "running");
992 assert_eq!(saga.steps.len(), 2);
995 assert_eq!(saga.steps[0].state, "succeeded");
996 assert_eq!(saga.steps[1].state, "failed");
997 let to_compensate: Vec<u32> = saga
1000 .steps
1001 .iter()
1002 .rev()
1003 .filter(|s| s.state == "succeeded")
1004 .map(|s| s.step_index)
1005 .collect();
1006 assert_eq!(to_compensate, vec![0]);
1007 }
1008}