1use agentmux_common::ipc::{Command, Event};
46
47use crate::sagas::log::{command_discriminant_name, SagaOutcome, UnresolvedSaga, UnresolvedStep};
48use crate::server::AppState;
49
50pub async fn compensate_unresolved(state: &AppState) -> Result<usize, String> {
58 let unresolved = state
59 .saga_log
60 .unresolved_sagas()
61 .map_err(|e| format!("read unresolved sagas: {}", e))?;
62
63 if unresolved.is_empty() {
64 return Ok(0);
65 }
66
67 tracing::info!(
68 "[saga] resume-on-startup: found {} unresolved saga(s) from prior run",
69 unresolved.len()
70 );
71
72 let mut resumed = 0usize;
73 for saga in &unresolved {
74 match recover_saga(state, saga).await {
75 Ok(()) => {
76 resumed += 1;
77 tracing::info!(
78 saga_id = saga.saga_id,
79 name = %saga.name,
80 "[saga] resume-on-startup compensated saga"
81 );
82 }
83 Err(e) => {
84 tracing::error!(
85 saga_id = saga.saga_id,
86 name = %saga.name,
87 "[saga] resume-on-startup failed to compensate: {} — saga marked failed_compensation",
88 e
89 );
90 if let Err(log_err) = state
91 .saga_log
92 .mark_failed_compensation(saga.saga_id, &e)
93 {
94 tracing::warn!(
95 saga_id = saga.saga_id,
96 "[saga] mark_failed_compensation log write failed: {}",
97 log_err
98 );
99 }
100 resumed += 1;
104 }
105 }
106 }
107
108 Ok(resumed)
109}
110
111async fn recover_saga(state: &AppState, saga: &UnresolvedSaga) -> Result<(), String> {
115 state
119 .saga_log
120 .mark_compensating(saga.saga_id)
121 .map_err(|e| format!("mark compensating: {}", e))?;
122
123 let mut next_idx = state
127 .saga_log
128 .next_step_index(saga.saga_id)
129 .map_err(|e| format!("next_step_index: {}", e))?;
130
131 let succeeded_steps: Vec<&UnresolvedStep> = saga
132 .steps
133 .iter()
134 .rev()
135 .filter(|s| s.state == "succeeded")
136 .collect();
137
138 let pending_steps: Vec<&UnresolvedStep> = saga
147 .steps
148 .iter()
149 .filter(|s| s.state == "pending")
150 .collect();
151 if !pending_steps.is_empty() {
152 return Err(format!(
153 "saga has {} pending step(s) (crashed mid-dispatch); cannot auto-recover — operator review needed",
154 pending_steps.len()
155 ));
156 }
157
158 if succeeded_steps.is_empty() {
159 let reason = format!(
164 "no succeeded steps to compensate (saga state at restart: {})",
165 saga.state
166 );
167 return state
168 .saga_log
169 .terminate(saga.saga_id, SagaOutcome::Compensated { reason })
170 .map_err(|e| format!("terminate(Compensated, no-op): {}", e));
171 }
172
173 let mut errors: Vec<String> = Vec::new();
174 for step in succeeded_steps {
175 let forward_cmd: Command = match serde_json::from_str(&step.cmd_json) {
176 Ok(c) => c,
177 Err(e) => {
178 let msg = format!(
179 "step {} cmd_json deserialization failed: {} — skipping",
180 step.step_index, e
181 );
182 tracing::warn!(saga_id = saga.saga_id, "[saga] {}", msg);
183 errors.push(msg);
184 continue;
185 }
186 };
187 let inverse = match derive_inverse_command(&forward_cmd, step) {
188 Some(inv) => inv,
189 None => {
190 let msg = format!(
201 "step {} ({}): no inverse derivable from saga log — operator review needed",
202 step.step_index, step.name,
203 );
204 tracing::error!(
205 saga_id = saga.saga_id,
206 step_index = step.step_index,
207 forward = %step.name,
208 "[saga] {}",
209 msg,
210 );
211 if let Err(log_err) = state.saga_log.append_recovery_step(
212 saga.saga_id,
213 next_idx,
214 &format!("no_inverse_for_{}", step.name),
215 &forward_cmd,
216 &[],
217 Some("no inverse derivable from saga log"),
218 ) {
219 tracing::warn!(
220 saga_id = saga.saga_id,
221 step_index = next_idx,
222 "[saga] append_recovery_step (no-inverse) log write failed: {}",
223 log_err
224 );
225 }
226 next_idx += 1;
227 errors.push(msg);
228 continue;
229 }
230 };
231 let inv_name = command_discriminant_name(&inverse);
232 match dispatch_inverse(state, inverse.clone()).await {
233 Ok(events) => {
234 if let Err(e) = state.saga_log.append_recovery_step(
235 saga.saga_id,
236 next_idx,
237 &inv_name,
238 &inverse,
239 &events,
240 None,
241 ) {
242 tracing::warn!(
243 saga_id = saga.saga_id,
244 step_index = next_idx,
245 "[saga] append_recovery_step (success) log write failed: {}",
246 e
247 );
248 }
249 if let Err(e) = state
255 .saga_log
256 .mark_step_compensated(saga.saga_id, step.step_index)
257 {
258 tracing::warn!(
259 saga_id = saga.saga_id,
260 step_index = step.step_index,
261 "[saga] mark_step_compensated log write failed: {} — second restart may re-replay this inverse",
262 e
263 );
264 }
265 }
266 Err(e) => {
267 tracing::warn!(
268 saga_id = saga.saga_id,
269 step_index = next_idx,
270 "[saga] recovery dispatch of inverse '{}' failed: {}",
271 inv_name,
272 e
273 );
274 if let Err(log_err) = state.saga_log.append_recovery_step(
275 saga.saga_id,
276 next_idx,
277 &inv_name,
278 &inverse,
279 &[],
280 Some(&e),
281 ) {
282 tracing::warn!(
283 saga_id = saga.saga_id,
284 step_index = next_idx,
285 "[saga] append_recovery_step (failure) log write failed: {}",
286 log_err
287 );
288 }
289 errors.push(format!("step {}: {}", step.step_index, e));
290 }
291 }
292 next_idx += 1;
293 }
294
295 if errors.is_empty() {
296 state
297 .saga_log
298 .terminate(
299 saga.saga_id,
300 SagaOutcome::Compensated {
301 reason: format!(
302 "resumed on srv restart (was {} at startup)",
303 saga.state
304 ),
305 },
306 )
307 .map_err(|e| format!("terminate(Compensated): {}", e))
308 } else {
309 Err(errors.join("; "))
310 }
311}
312
313async fn dispatch_inverse(state: &AppState, cmd: Command) -> Result<Vec<Event>, String> {
318 let events = crate::server::service::dispatch_to_reducer(state, cmd).await;
319 if let Some(message) = events.iter().find_map(|e| match e {
320 Event::Error { message, .. } => Some(message.clone()),
321 _ => None,
322 }) {
323 return Err(message);
324 }
325 for ev in &events {
326 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, &state.wstore) {
327 return Err(format!("wstore apply failed: {}", e));
328 }
329 }
330 crate::server::service::publish_events(state, &events);
331 Ok(events)
332}
333
334pub fn derive_inverse_command(forward: &Command, step: &UnresolvedStep) -> Option<Command> {
357 match forward {
358 Command::CreateWorkspace { .. } => {
361 let new_id = extract_workspace_id_from_output(step)?;
362 Some(Command::DeleteWorkspace {
363 workspace_id: new_id,
364 force: false,
368 })
369 }
370 Command::CreateTab { workspace_id, .. } => {
371 let new_id = extract_tab_id_from_output(step)?;
372 Some(Command::DeleteTab {
373 workspace_id: workspace_id.clone(),
374 tab_id: new_id,
375 force: true,
379 })
380 }
381 Command::CreateBlock { tab_id, .. } => {
382 let new_id = extract_block_id_from_output(step)?;
383 Some(Command::DeleteBlock {
384 tab_id: tab_id.clone(),
385 block_id: new_id,
386 })
387 }
388 Command::MoveTab {
390 tab_id,
391 src_workspace_id,
392 dst_workspace_id,
393 ..
394 } => Some(Command::MoveTab {
395 tab_id: tab_id.clone(),
396 src_workspace_id: dst_workspace_id.clone(),
397 dst_workspace_id: src_workspace_id.clone(),
398 dst_index: 0,
399 }),
400 Command::MoveBlock {
401 block_id,
402 src_tab_id,
403 dst_tab_id,
404 ..
405 } => Some(Command::MoveBlock {
406 block_id: block_id.clone(),
407 src_tab_id: dst_tab_id.clone(),
408 dst_tab_id: src_tab_id.clone(),
409 dst_index: 0,
410 }),
411 Command::CreateWindow { window_id, .. } => Some(Command::CloseWindowInternal {
415 window_id: window_id.clone(),
416 }),
417 _ => None,
427 }
428}
429
430fn extract_workspace_id_from_output(step: &UnresolvedStep) -> Option<String> {
433 let output = step.output_json.as_ref()?;
434 let events: Vec<Event> = serde_json::from_str(output).ok()?;
435 events.iter().find_map(|e| match e {
436 Event::WorkspaceCreated { workspace_id, .. } => Some(workspace_id.clone()),
437 _ => None,
438 })
439}
440
441fn extract_tab_id_from_output(step: &UnresolvedStep) -> Option<String> {
442 let output = step.output_json.as_ref()?;
443 let events: Vec<Event> = serde_json::from_str(output).ok()?;
444 events.iter().find_map(|e| match e {
445 Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
446 _ => None,
447 })
448}
449
450fn extract_block_id_from_output(step: &UnresolvedStep) -> Option<String> {
451 let output = step.output_json.as_ref()?;
452 let events: Vec<Event> = serde_json::from_str(output).ok()?;
453 events.iter().find_map(|e| match e {
454 Event::BlockCreated { block_id, .. } => Some(block_id.clone()),
455 _ => None,
456 })
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::sagas::log::SagaLog;
463
464 fn dummy_step(state: &str, cmd_json: &str, output_json: Option<&str>) -> UnresolvedStep {
465 UnresolvedStep {
466 step_index: 0,
467 name: "test".into(),
468 state: state.to_string(),
469 cmd_json: cmd_json.to_string(),
470 output_json: output_json.map(str::to_string),
471 started_at: 0,
472 ended_at: None,
473 }
474 }
475
476 #[test]
479 fn create_workspace_inverse_is_delete_workspace_with_new_id() {
480 let cmd = Command::CreateWorkspace { name: "test".into() };
481 let output = serde_json::to_string(&vec![Event::WorkspaceCreated {
482 workspace_id: "ws-1".into(),
483 name: "test".into(),
484 version: 1,
485 }])
486 .unwrap();
487 let step = dummy_step("succeeded", "{}", Some(&output));
488 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
489 match inv {
490 Command::DeleteWorkspace { workspace_id, force } => {
491 assert_eq!(workspace_id, "ws-1");
492 assert!(!force, "recovery-time inverse should not be saga-flagged");
493 }
494 other => panic!("expected DeleteWorkspace, got {:?}", other),
495 }
496 }
497
498 #[test]
499 fn create_workspace_without_output_returns_none() {
500 let cmd = Command::CreateWorkspace { name: "test".into() };
501 let step = dummy_step("succeeded", "{}", None);
502 assert!(derive_inverse_command(&cmd, &step).is_none());
503 }
504
505 #[test]
506 fn create_tab_inverse_is_delete_tab_force_true() {
507 let cmd = Command::CreateTab {
508 workspace_id: "ws-1".into(),
509 name: "tab".into(),
510 };
511 let output = serde_json::to_string(&vec![Event::TabCreated {
512 workspace_id: "ws-1".into(),
513 tab_id: "tab-1".into(),
514 name: "tab".into(),
515 version: 1,
516 }])
517 .unwrap();
518 let step = dummy_step("succeeded", "{}", Some(&output));
519 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
520 match inv {
521 Command::DeleteTab {
522 workspace_id,
523 tab_id,
524 force,
525 } => {
526 assert_eq!(workspace_id, "ws-1");
527 assert_eq!(tab_id, "tab-1");
528 assert!(force, "compensating DeleteTab must bypass last-tab guard");
529 }
530 other => panic!("expected DeleteTab, got {:?}", other),
531 }
532 }
533
534 #[test]
535 fn create_block_inverse_is_delete_block() {
536 let cmd = Command::CreateBlock {
537 tab_id: "tab-1".into(),
538 meta: serde_json::Value::Null,
539 };
540 let output = serde_json::to_string(&vec![Event::BlockCreated {
541 tab_id: "tab-1".into(),
542 block_id: "blk-1".into(),
543 meta: serde_json::Value::Null,
544 version: 1,
545 }])
546 .unwrap();
547 let step = dummy_step("succeeded", "{}", Some(&output));
548 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
549 match inv {
550 Command::DeleteBlock { tab_id, block_id } => {
551 assert_eq!(tab_id, "tab-1");
552 assert_eq!(block_id, "blk-1");
553 }
554 other => panic!("expected DeleteBlock, got {:?}", other),
555 }
556 }
557
558 #[test]
559 fn move_tab_inverse_swaps_src_and_dst() {
560 let cmd = Command::MoveTab {
561 tab_id: "tab-1".into(),
562 src_workspace_id: "ws-src".into(),
563 dst_workspace_id: "ws-dst".into(),
564 dst_index: 5,
565 };
566 let step = dummy_step("succeeded", "{}", None);
567 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
568 match inv {
569 Command::MoveTab {
570 tab_id,
571 src_workspace_id,
572 dst_workspace_id,
573 dst_index,
574 } => {
575 assert_eq!(tab_id, "tab-1");
576 assert_eq!(src_workspace_id, "ws-dst");
578 assert_eq!(dst_workspace_id, "ws-src");
579 assert_eq!(dst_index, 0);
581 }
582 other => panic!("expected MoveTab, got {:?}", other),
583 }
584 }
585
586 #[test]
587 fn move_block_inverse_swaps_src_and_dst() {
588 let cmd = Command::MoveBlock {
589 block_id: "blk-1".into(),
590 src_tab_id: "tab-src".into(),
591 dst_tab_id: "tab-dst".into(),
592 dst_index: 3,
593 };
594 let step = dummy_step("succeeded", "{}", None);
595 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
596 match inv {
597 Command::MoveBlock {
598 block_id,
599 src_tab_id,
600 dst_tab_id,
601 dst_index,
602 } => {
603 assert_eq!(block_id, "blk-1");
604 assert_eq!(src_tab_id, "tab-dst");
605 assert_eq!(dst_tab_id, "tab-src");
606 assert_eq!(dst_index, 0);
607 }
608 other => panic!("expected MoveBlock, got {:?}", other),
609 }
610 }
611
612 #[test]
613 fn create_window_inverse_is_close_window_internal() {
614 let cmd = Command::CreateWindow {
615 window_id: "win-1".into(),
616 workspace_id: "ws-1".into(),
617 };
618 let step = dummy_step("succeeded", "{}", None);
619 let inv = derive_inverse_command(&cmd, &step).expect("should derive");
620 match inv {
621 Command::CloseWindowInternal { window_id } => {
622 assert_eq!(window_id, "win-1");
623 }
624 other => panic!("expected CloseWindowInternal, got {:?}", other),
625 }
626 }
627
628 #[test]
629 fn delete_commands_have_no_derivable_inverse() {
630 let cmd = Command::DeleteWorkspace {
633 workspace_id: "ws-1".into(),
634 force: false,
635 };
636 let step = dummy_step("succeeded", "{}", None);
637 assert!(derive_inverse_command(&cmd, &step).is_none());
638
639 let cmd = Command::DeleteTab {
640 workspace_id: "ws-1".into(),
641 tab_id: "tab-1".into(),
642 force: false,
643 };
644 assert!(derive_inverse_command(&cmd, &step).is_none());
645
646 let cmd = Command::DeleteBlock {
647 tab_id: "tab-1".into(),
648 block_id: "blk-1".into(),
649 };
650 assert!(derive_inverse_command(&cmd, &step).is_none());
651 }
652
653 #[test]
654 fn pure_meta_commands_have_no_derivable_inverse() {
655 let cmd = Command::RenameWorkspace {
658 workspace_id: "ws-1".into(),
659 name: "new".into(),
660 };
661 let step = dummy_step("succeeded", "{}", None);
662 assert!(derive_inverse_command(&cmd, &step).is_none());
663
664 let cmd = Command::SetActiveTab {
665 workspace_id: "ws-1".into(),
666 tab_id: "tab-1".into(),
667 };
668 assert!(derive_inverse_command(&cmd, &step).is_none());
669 }
670
671 #[tokio::test]
674 async fn compensate_unresolved_no_unresolved_returns_zero() {
675 let state = crate::server::tests::test_state();
676 let n = compensate_unresolved(&state).await.unwrap();
677 assert_eq!(n, 0);
678 }
679
680 #[tokio::test]
681 async fn compensate_unresolved_with_no_succeeded_steps_marks_compensated() {
682 let state = crate::server::tests::test_state();
686 state
687 .saga_log
688 .start_saga(99, "ghost_saga", &serde_json::json!({"x": 1}))
689 .unwrap();
690 let n = compensate_unresolved(&state).await.unwrap();
694 assert_eq!(n, 1);
695
696 let unresolved = state.saga_log.unresolved_sagas().unwrap();
698 assert!(unresolved.is_empty());
699 let snap = state.saga_log.snapshot_recent(10).unwrap();
700 let ghost = snap.iter().find(|s| s.saga_id == 99).unwrap();
701 assert_eq!(ghost.state, "compensated");
702 }
703
704 #[tokio::test]
723 async fn compensate_unresolved_picks_up_running_saga_with_succeeded_steps() {
724 let tmp = tempfile::NamedTempFile::new().unwrap();
727 let saga_log = std::sync::Arc::new(SagaLog::open(tmp.path()).unwrap());
728
729 saga_log
734 .start_saga(1, "tear_off_tab", &serde_json::json!({"tab_id": "tab-x"}))
735 .unwrap();
736 let create_cmd = Command::CreateWorkspace {
737 name: "".to_string(),
738 };
739 saga_log.start_step(1, 0, "CreateWorkspace", &create_cmd).unwrap();
740 saga_log
741 .finish_step(
742 1,
743 0,
744 &[Event::WorkspaceCreated {
745 workspace_id: "new-ws".into(),
746 name: "".into(),
747 version: 1,
748 }],
749 )
750 .unwrap();
751 let move_cmd = Command::MoveTab {
752 tab_id: "tab-x".into(),
753 src_workspace_id: "src-ws".into(),
754 dst_workspace_id: "new-ws".into(),
755 dst_index: 0,
756 };
757 saga_log.start_step(1, 1, "MoveTab", &move_cmd).unwrap();
758 saga_log
759 .finish_step(
760 1,
761 1,
762 &[Event::TabMoved {
763 tab_id: "tab-x".into(),
764 src_workspace_id: "src-ws".into(),
765 dst_workspace_id: "new-ws".into(),
766 dst_index: 0,
767 new_src_active_tab_id: None,
768 new_dst_active_tab_id: None,
769 version: 2,
770 }],
771 )
772 .unwrap();
773 let mut state = crate::server::tests::test_state();
777 state.saga_log = std::sync::Arc::clone(&saga_log);
778
779 let n = compensate_unresolved(&state).await.unwrap();
781 assert_eq!(n, 1, "expected to recover exactly 1 saga");
782
783 let unresolved = state.saga_log.unresolved_sagas().unwrap();
787 assert!(
788 unresolved.is_empty(),
789 "saga should no longer be unresolved, got: {:?}",
790 unresolved
791 );
792 let snap = state.saga_log.snapshot_recent(10).unwrap();
793 let resumed = snap.iter().find(|s| s.saga_id == 1).expect("saga 1 missing");
794 assert!(
795 resumed.state == "compensated" || resumed.state == "failed_compensation",
796 "expected compensated or failed_compensation, got {}",
797 resumed.state
798 );
799 }
800
801 #[tokio::test]
805 async fn compensate_unresolved_skips_failed_steps() {
806 let tmp = tempfile::NamedTempFile::new().unwrap();
807 let saga_log = std::sync::Arc::new(SagaLog::open(tmp.path()).unwrap());
808
809 saga_log
810 .start_saga(2, "test_saga", &serde_json::json!({}))
811 .unwrap();
812 let cmd_a = Command::CreateBlock {
813 tab_id: "tab-1".into(),
814 meta: serde_json::Value::Null,
815 };
816 saga_log.start_step(2, 0, "CreateBlock", &cmd_a).unwrap();
817 saga_log
818 .finish_step(
819 2,
820 0,
821 &[Event::BlockCreated {
822 tab_id: "tab-1".into(),
823 block_id: "blk-A".into(),
824 meta: serde_json::Value::Null,
825 version: 1,
826 }],
827 )
828 .unwrap();
829 let cmd_b = Command::MoveTab {
830 tab_id: "tab-1".into(),
831 src_workspace_id: "ws-src".into(),
832 dst_workspace_id: "ws-dst".into(),
833 dst_index: 0,
834 };
835 saga_log.start_step(2, 1, "MoveTab", &cmd_b).unwrap();
836 saga_log.fail_step(2, 1, "reducer rejected").unwrap();
837 let mut state = crate::server::tests::test_state();
840 state.saga_log = std::sync::Arc::clone(&saga_log);
841
842 compensate_unresolved(&state).await.unwrap();
843
844 let unresolved = state.saga_log.unresolved_sagas().unwrap();
848 assert!(unresolved.is_empty());
849 let snap = state.saga_log.snapshot_recent(10).unwrap();
850 let saga = snap.iter().find(|s| s.saga_id == 2).unwrap();
851 assert!(saga.step_count >= 1);
856 }
857}