agentmux_srv\sagas/
delete_workspace.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.5.7 (Step 5 PR 2) — DeleteWorkspace saga.
5//
6// Replaces the inline cascade-on-dispatch pattern in
7// `service.rs::("workspace", "DeleteWorkspace")` with a saga that
8// records lifecycle brackets in the durable saga log. Closes the
9// remaining gap in the Step 5 series:
10//
11// * PR 1 already migrated `DeleteBlock` and `DeleteTab` to sagas.
12// * The `("workspace", "DeleteWorkspace")` RPC handler still issued
13//   a single `Command::DeleteWorkspace` whose reducer arm cascaded
14//   through tabs+blocks atomically, but a crash mid-cascade left
15//   inconsistent state with no durable record of WHAT was deleted.
16//
17// **Saga-as-narrator pattern.** The reducer's `handle_delete_workspace`
18// remains the canonical mutator (it cascades through tabs+blocks and
19// drops window mappings in a single in-memory step). The saga's
20// contribution is durability: it walks the workspace's tabs first via
21// per-tab `Command::DeleteTab { force: true }` dispatches, recording
22// each in the saga log, then issues the final
23// `Command::DeleteWorkspace { force: true }` to drop the (now-empty)
24// workspace + window mappings.
25//
26// This decomposition means crash recovery (`recovery::compensate_unresolved`,
27// merged in #636) sees per-tab progress markers and can mark the saga
28// `failed_compensation` for operator review if the cascade was
29// interrupted — versus the legacy single-shot `DeleteWorkspace` which
30// either fully applied or left the reducer state untouched, with no
31// durable trace either way.
32//
33// **Steps:**
34// 1. Snapshot the workspace's tabs+blocks (read-only) for the saga
35//    log's `input` field. Provenance for `--diag sagas`: which entities
36//    existed at saga start, so an operator can reason about a partial
37//    cascade later.
38// 2. For each tab (in `tab_ids` order): dispatch
39//    `Command::DeleteTab { force: true }`. The reducer cascades the
40//    tab's blocks atomically; the persist subscriber writes SQLite
41//    via `wcore::delete_tab` (which kills PTY controllers via
42//    `delete_tab_inner` → `delete_controller(block_id)`).
43//    `force: true` bypasses the reducer's last-tab guard — we're
44//    intentionally draining the workspace.
45// 3. Final dispatch: `Command::DeleteWorkspace { force: true }`. The
46//    workspace is empty by this point (step 2 deleted every tab), so
47//    the reducer's cascade only removes the workspace record + drops
48//    window mappings (emitting `SrvWindowClosed` per affected window).
49//    The `force: true` flag is provenance-only — the reducer's
50//    behaviour is identical regardless.
51//
52// **Compensation.** Delete sagas are awkward to compensate by design:
53// once a tab + its blocks are gone (SQLite rows deleted, controllers
54// killed), reconstruction would require persisting the pre-state, which
55// no current saga does. Per the brief: compensation is **record-only**.
56// If a step fails mid-cascade we rely on `classify_run_saga_result` to
57// classify the error path:
58//
59//   * `Err` (non-timeout) → `Compensated` — the saga's per-step log
60//     rows already record what was deleted; subsequent crash-recovery
61//     does NOT replay (Delete commands have no derivable inverse in
62//     `recovery::derive_inverse_command`, by design).
63//   * `Err` containing `"timed out"` → `Failed` — the saga timed out
64//     before completing; recovery will see `running` lifecycle + the
65//     succeeded step prefix and mark it `failed_compensation` (since
66//     Delete has no derivable inverse, the operator must reconcile).
67//   * `Ok` → `Completed` — clean cascade.
68//
69// **Pre-condition:** the workspace must exist in the reducer state OR
70// in SQLite. Bootstrap loads SQLite into reducer at startup, so they
71// normally match; we accept either to handle migration-window flows
72// where SQLite-direct writes haven't yet flowed through the reducer.
73
74use agentmux_common::ipc::Command;
75use serde_json::{json, Value};
76
77use super::{
78    alloc_saga_id, classify_run_saga_result, emit_saga_started, emit_terminal, run_saga, SagaCtx,
79};
80use crate::server::AppState;
81
82/// Run the DeleteWorkspace saga. On success returns
83/// `{"workspace_id": "...", "deleted_tab_count": N, "deleted_block_count": M}`.
84pub async fn run(state: &AppState, workspace_id: String) -> Result<Value, String> {
85    // Pre-condition + snapshot: read the workspace's tabs+blocks
86    // before any dispatch. We need the tab list to drive step 2's
87    // per-tab DeleteTab dispatches, and we record block ids in the
88    // saga log so `--diag sagas` can show what was destroyed.
89    let (tab_ids, block_count) = {
90        let s = state.srv_state.lock().await;
91        let Some(workspace) = s.workspaces.get(&workspace_id) else {
92            return Err(format!(
93                "DeleteWorkspace: workspace not found: {}",
94                workspace_id
95            ));
96        };
97        let tab_ids: Vec<String> = workspace.tab_ids.clone();
98        let block_count: usize = tab_ids
99            .iter()
100            .map(|tid| s.tabs.get(tid).map(|t| t.block_ids.len()).unwrap_or(0))
101            .sum();
102        (tab_ids, block_count)
103    };
104
105    let saga_id = alloc_saga_id(state);
106    if let Err(e) = emit_saga_started(
107        state,
108        saga_id,
109        "delete_workspace",
110        json!({
111            "workspace_id": &workspace_id,
112            "tab_ids": &tab_ids,
113            "block_count": block_count,
114        }),
115    )
116    .await
117    {
118        return Err(e);
119    }
120    let ctx = SagaCtx::new(state, saga_id);
121    let result = run_saga(
122        "delete_workspace",
123        run_inner(ctx, workspace_id.clone(), tab_ids.clone(), block_count),
124    )
125    .await;
126    emit_terminal(state, saga_id, classify_run_saga_result(&result)).await;
127    result
128}
129
130async fn run_inner(
131    ctx: SagaCtx<'_>,
132    workspace_id: String,
133    tab_ids: Vec<String>,
134    block_count: usize,
135) -> Result<Value, String> {
136    // Step 2: per-tab DeleteTab dispatch. `force: true` bypasses the
137    // reducer's last-tab guard — the saga is intentionally draining
138    // the workspace, the guard exists to protect user-facing CloseTab
139    // flows from emptying a workspace by accident. The persist
140    // subscriber's `apply_tab_deleted` runs `wcore::delete_tab` which
141    // kills each block's PTY controller via `delete_tab_inner` →
142    // `delete_controller(block_id)`. That's the same controller-cleanup
143    // path the user-facing DeleteTab saga (Step 5 PR 1) relies on,
144    // so we don't replicate the controller-kill here.
145    //
146    // **No compensation.** If a tab's DeleteTab dispatch fails mid-
147    // cascade, the already-deleted prefix is gone — we can't
148    // reconstruct the SQLite rows or respawn the PTY controllers.
149    // Returning `Err` records the failure in the saga log (per-step
150    // succeeded rows for the prefix + a failed row for the rejecting
151    // tab). `classify_run_saga_result` maps non-timeout `Err` to
152    // `Compensated`, which is technically inaccurate (nothing was
153    // un-done) but matches the saga framework's convention for
154    // non-timeout errors and avoids surfacing the saga to recovery's
155    // `failed`-state replay path (Delete has no derivable inverse
156    // anyway, so recovery would just mark `failed_compensation` —
157    // which is fine; either outcome surfaces in `--diag sagas`).
158    for (i, tab_id) in tab_ids.iter().enumerate() {
159        if let Err(reason) = ctx
160            .dispatch(Command::DeleteTab {
161                workspace_id: workspace_id.clone(),
162                tab_id: tab_id.clone(),
163                // Saga is draining the workspace; bypass the
164                // user-facing last-tab guard. (Saga's own pre-checks
165                // already validated the workspace exists.)
166                force: true,
167            })
168            .await
169        {
170            tracing::warn!(
171                workspace_id = %workspace_id,
172                tab_id = %tab_id,
173                step = i,
174                "[saga] DeleteWorkspace step 2 (DeleteTab): dispatch failed: {} — succeeded prefix already gone, no compensation possible",
175                reason,
176            );
177            return Err(format!(
178                "DeleteWorkspace step 2 (DeleteTab {}): {}",
179                tab_id, reason
180            ));
181        }
182    }
183
184    // Step 3: drop the (now-empty) workspace + cascade window mappings.
185    // The reducer's `handle_delete_workspace` removes the workspace
186    // record + emits `SrvWindowClosed` per affected window; the
187    // persist subscriber's `apply_workspace_deleted` runs
188    // `wcore::delete_workspace` for SQLite (idempotent if the
189    // workspace was already removed by the per-tab cascade in step 2).
190    if let Err(reason) = ctx
191        .dispatch(Command::DeleteWorkspace {
192            workspace_id: workspace_id.clone(),
193            // Saga-driven dispatch — provenance flag for the durable
194            // log. Reducer behaviour is identical for both values.
195            force: true,
196        })
197        .await
198    {
199        tracing::warn!(
200            workspace_id = %workspace_id,
201            "[saga] DeleteWorkspace step 3 (DeleteWorkspace): dispatch failed: {} — tabs already gone, workspace partially deleted",
202            reason,
203        );
204        return Err(format!("DeleteWorkspace step 3 (DeleteWorkspace): {}", reason));
205    }
206
207    Ok(json!({
208        "workspace_id": workspace_id,
209        "deleted_tab_count": tab_ids.len(),
210        "deleted_block_count": block_count,
211    }))
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::backend::obj::{Block, Tab, Workspace};
218    use crate::server::tests::test_state;
219    use agentmux_common::ipc::Event;
220
221    async fn dispatch_apply(
222        state: &crate::server::AppState,
223        cmd: Command,
224    ) -> Vec<Event> {
225        let events = crate::server::service::dispatch_to_reducer(state, cmd).await;
226        for ev in &events {
227            crate::persist_subscriber::apply_event_to_wstore(ev, &state.wstore).unwrap();
228        }
229        events
230    }
231
232    /// Seed a workspace with N tabs, each containing one block.
233    /// Returns `(state, workspace_id, tab_ids, block_ids)`.
234    async fn seed_workspace_with_tabs_and_blocks(
235        n: usize,
236    ) -> (
237        crate::server::AppState,
238        String,
239        Vec<String>,
240        Vec<String>,
241    ) {
242        let state = test_state();
243        let ws_evs = dispatch_apply(
244            &state,
245            Command::CreateWorkspace { name: "w".into() },
246        )
247        .await;
248        let ws_id = ws_evs
249            .iter()
250            .find_map(|e| match e {
251                Event::WorkspaceCreated { workspace_id, .. } => Some(workspace_id.clone()),
252                _ => None,
253            })
254            .unwrap();
255        let mut tab_ids = Vec::new();
256        let mut block_ids = Vec::new();
257        for i in 0..n {
258            let tab_evs = dispatch_apply(
259                &state,
260                Command::CreateTab {
261                    workspace_id: ws_id.clone(),
262                    name: format!("tab-{}", i),
263                },
264            )
265            .await;
266            let tab_id = tab_evs
267                .iter()
268                .find_map(|e| match e {
269                    Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
270                    _ => None,
271                })
272                .unwrap();
273            let blk_evs = dispatch_apply(
274                &state,
275                Command::CreateBlock {
276                    tab_id: tab_id.clone(),
277                    meta: serde_json::Value::Null,
278                },
279            )
280            .await;
281            let block_id = blk_evs
282                .iter()
283                .find_map(|e| match e {
284                    Event::BlockCreated { block_id, .. } => Some(block_id.clone()),
285                    _ => None,
286                })
287                .unwrap();
288            tab_ids.push(tab_id);
289            block_ids.push(block_id);
290        }
291        (state, ws_id, tab_ids, block_ids)
292    }
293
294    #[tokio::test]
295    async fn happy_path_cascades_tabs_and_blocks() {
296        let (state, ws_id, tab_ids, block_ids) =
297            seed_workspace_with_tabs_and_blocks(2).await;
298
299        // Sanity: pre-state has workspace + tabs + blocks in both
300        // reducer state and SQLite.
301        {
302            let s = state.srv_state.lock().await;
303            assert!(s.workspaces.contains_key(&ws_id));
304            assert_eq!(s.workspaces[&ws_id].tab_ids.len(), 2);
305            assert_eq!(s.tabs.len(), 2);
306            assert_eq!(s.blocks.len(), 2);
307        }
308        for tab_id in &tab_ids {
309            assert!(state.wstore.get::<Tab>(tab_id).unwrap().is_some());
310        }
311        for block_id in &block_ids {
312            assert!(state.wstore.get::<Block>(block_id).unwrap().is_some());
313        }
314
315        let result = run(&state, ws_id.clone()).await.unwrap();
316        assert_eq!(result["workspace_id"], ws_id);
317        assert_eq!(result["deleted_tab_count"], 2);
318        assert_eq!(result["deleted_block_count"], 2);
319
320        // Reducer: workspace + all tabs + all blocks gone.
321        let s = state.srv_state.lock().await;
322        assert!(!s.workspaces.contains_key(&ws_id));
323        assert!(s.tabs.is_empty());
324        assert!(s.blocks.is_empty());
325        drop(s);
326
327        // SQLite: matches.
328        assert!(state.wstore.get::<Workspace>(&ws_id).unwrap().is_none());
329        for tab_id in &tab_ids {
330            assert!(state.wstore.get::<Tab>(tab_id).unwrap().is_none());
331        }
332        for block_id in &block_ids {
333            assert!(state.wstore.get::<Block>(block_id).unwrap().is_none());
334        }
335    }
336
337    #[tokio::test]
338    async fn rejects_when_workspace_not_found() {
339        let state = test_state();
340        let err = run(&state, "ghost-ws".into()).await.unwrap_err();
341        assert!(err.contains("workspace not found"), "got: {}", err);
342    }
343
344    #[tokio::test]
345    async fn empty_workspace_succeeds() {
346        // Workspace exists but has zero tabs — saga should skip the
347        // per-tab loop and proceed directly to step 3 (DeleteWorkspace).
348        let state = test_state();
349        let ws_evs = dispatch_apply(
350            &state,
351            Command::CreateWorkspace { name: "empty".into() },
352        )
353        .await;
354        let ws_id = ws_evs
355            .iter()
356            .find_map(|e| match e {
357                Event::WorkspaceCreated { workspace_id, .. } => Some(workspace_id.clone()),
358                _ => None,
359            })
360            .unwrap();
361
362        let result = run(&state, ws_id.clone()).await.unwrap();
363        assert_eq!(result["deleted_tab_count"], 0);
364        assert_eq!(result["deleted_block_count"], 0);
365
366        let s = state.srv_state.lock().await;
367        assert!(!s.workspaces.contains_key(&ws_id));
368    }
369
370    #[tokio::test]
371    async fn writes_lifecycle_brackets_to_saga_log() {
372        // Verify the saga records `start_saga` + per-step `succeeded`
373        // rows + a terminal `completed` lifecycle row. This is what
374        // PR 2's `--diag sagas` and `compensate_unresolved` rely on.
375        let (state, ws_id, _tab_ids, _block_ids) =
376            seed_workspace_with_tabs_and_blocks(1).await;
377        run(&state, ws_id.clone()).await.unwrap();
378
379        let snap = state.saga_log.snapshot_recent(10).unwrap();
380        let saga = snap
381            .iter()
382            .find(|s| s.name == "delete_workspace")
383            .expect("delete_workspace saga should appear in snapshot");
384        assert_eq!(saga.state, "completed", "saga should terminate completed");
385        // 1 DeleteTab step + 1 DeleteWorkspace step = 2 forward steps.
386        assert!(
387            saga.step_count >= 2,
388            "expected >= 2 steps, got {}",
389            saga.step_count
390        );
391        // No unresolved sagas — recovery shouldn't pick this up.
392        let unresolved = state.saga_log.unresolved_sagas().unwrap();
393        assert!(
394            unresolved.iter().all(|s| s.saga_id != saga.saga_id),
395            "saga should not be unresolved post-completion"
396        );
397    }
398
399    #[tokio::test]
400    async fn cascade_drops_window_mappings() {
401        // Seed a workspace mapped to a window; saga should cascade
402        // window-removal via the reducer's existing
403        // handle_delete_workspace logic (which emits SrvWindowClosed
404        // per affected window).
405        let (state, ws_id, _tab_ids, _block_ids) =
406            seed_workspace_with_tabs_and_blocks(1).await;
407        let win_id = "win-test".to_string();
408        let _ = dispatch_apply(
409            &state,
410            Command::CreateWindow {
411                window_id: win_id.clone(),
412                workspace_id: ws_id.clone(),
413            },
414        )
415        .await;
416        {
417            let s = state.srv_state.lock().await;
418            assert!(s.windows.contains_key(&win_id));
419        }
420
421        run(&state, ws_id.clone()).await.unwrap();
422
423        let s = state.srv_state.lock().await;
424        assert!(!s.windows.contains_key(&win_id));
425    }
426}