agentmux_srv\sagas/
delete_block.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.5.7 (Step 5 PR 1) — DeleteBlock saga.
5//
6// Replaces the SQLite-first delete pattern in `service.rs`'s
7// `("object", "DeleteBlock")` handler with a reducer-driven saga.
8// The legacy handler called `wcore::delete_block` first and then
9// dispatched `Command::DeleteBlock` to keep the reducer in sync — a
10// short-circuit that pre-dates the saga coordinator + persist
11// subscriber pattern (closes gap §4 in
12// `docs/retro/reducer-architecture-gaps-2026-05-01.md`).
13//
14// **Steps:**
15// 1. `DeleteBlock { tab_id, block_id }` — reducer removes the block
16//    from canonical state and emits `Event::BlockDeleted`. The
17//    persist subscriber writes SQLite via `wcore::delete_block`
18//    (cascades to layout pruning).
19//
20// **Block controller side-effect:** the legacy RPC handler killed
21// the block's PTY/controller via `blockcontroller::delete_controller`
22// BEFORE the wcore SQLite delete. The saga preserves that ordering
23// — controller-kill happens in this function before the reducer
24// dispatch, since the persist subscriber's `wcore::delete_block`
25// only handles SQLite and layout pruning, not process teardown. We
26// still drop the controller even if the saga later short-circuits
27// (block-not-found): the controller registry is a process-local
28// map, idempotent on missing keys.
29//
30// **Compensation:** delete sagas are awkward to compensate — once
31// the block row + controller are gone, "un-delete" requires
32// reconstructing both the SQLite row and the PTY/process subtree,
33// neither of which is meaningful from saga state. We follow the
34// brief's pragma: log a warning on dispatch failure, no automatic
35// re-create. The reducer's `DeleteBlock` is silent-no-op on missing
36// inputs (see reducer.rs handle_delete_block), so the only failure
37// path is wstore write errors surfaced by the persist subscriber —
38// in which case the controller is already gone (intentional; the
39// PTY can't be partially-killed) and the SQLite row may or may not
40// have been written. PR 2's `compensate_unresolved` resume scan
41// surfaces these via the durable saga log for operator follow-up.
42//
43// **Pre-condition:** the block must exist in the reducer state.
44// Without this, the reducer would silently no-op (handle_delete_block
45// returns an empty event vec on missing tab/block) and the user
46// would see "delete succeeded" while nothing happened. The saga
47// surfaces a clear "block not found" error instead. We check the
48// reducer state (not SQLite) because `Command::DeleteBlock` carries
49// `tab_id` from the RPC's `uicontext.activetabid` and we want to
50// validate against the reducer's view of (tab → blocks) — that's
51// what the dispatch will mutate.
52
53use agentmux_common::ipc::Command;
54use serde_json::{json, Value};
55
56use super::{
57    alloc_saga_id, classify_run_saga_result, emit_saga_started, emit_terminal, run_saga, SagaCtx,
58};
59use crate::server::AppState;
60
61/// Run the DeleteBlock saga. On success returns
62/// `{"block_id": "...", "tab_id": "..."}`.
63pub async fn run(
64    state: &AppState,
65    tab_id: String,
66    block_id: String,
67) -> Result<Value, String> {
68    // Pre-condition: block exists and is in the named tab. Reducer
69    // would silent-no-op otherwise (see handle_delete_block); the
70    // saga surfaces a clear error instead.
71    {
72        let s = state.srv_state.lock().await;
73        match s.blocks.get(&block_id) {
74            None => {
75                return Err(format!("DeleteBlock: block not found: {}", block_id));
76            }
77            Some(block) if block.tab_id != tab_id => {
78                return Err(format!(
79                    "DeleteBlock: block {} is in tab {}, not {}",
80                    block_id, block.tab_id, tab_id
81                ));
82            }
83            _ => {}
84        }
85        if !s.tabs.contains_key(&tab_id) {
86            return Err(format!("DeleteBlock: tab not found: {}", tab_id));
87        }
88    }
89
90    let saga_id = alloc_saga_id(state);
91    if let Err(e) = emit_saga_started(
92        state,
93        saga_id,
94        "delete_block",
95        json!({
96            "tab_id": &tab_id,
97            "block_id": &block_id,
98        }),
99    )
100    .await
101    {
102        return Err(e);
103    }
104    let ctx = SagaCtx::new(state, saga_id);
105    let result = run_saga("delete_block", run_inner(ctx, tab_id, block_id.clone())).await;
106    // Controller-kill ordering. Three rounds of bot review:
107    //   * Round 1 (agent): killed BEFORE emit_saga_started → reagent
108    //     P2: side-effect leak if start_saga collides.
109    //   * Round 2 (this PR): conditional on result.is_ok() → codex P2
110    //     round 2: leaks PTY when reducer succeeds but
111    //     `apply_event_to_wstore` fails inside `SagaCtx::dispatch`
112    //     (block already removed from reducer state, RPC returns
113    //     error, retry pre-check sees "block not found" → controller
114    //     never cleaned up).
115    //   * Round 3 (this fix): kill controller whenever the reducer
116    //     dispatched DeleteBlock — i.e., whenever block was removed
117    //     from reducer state. This includes both success and the
118    //     reducer-succeeded-wstore-failed cases. We approximate this
119    //     by checking reducer state for the block: if it's gone, the
120    //     reducer dispatched (regardless of wstore outcome), so kill
121    //     the controller. Idempotent on missing controller.
122    {
123        let block_still_in_reducer =
124            state.srv_state.lock().await.blocks.contains_key(&block_id);
125        if !block_still_in_reducer {
126            crate::backend::blockcontroller::delete_controller(&block_id);
127        }
128    }
129    emit_terminal(state, saga_id, classify_run_saga_result(&result)).await;
130    result
131}
132
133async fn run_inner(
134    ctx: SagaCtx<'_>,
135    tab_id: String,
136    block_id: String,
137) -> Result<Value, String> {
138    // Step 1: dispatch DeleteBlock through the reducer. The persist
139    // subscriber sees the BlockDeleted event and runs
140    // `wcore::delete_block` (SQLite delete + layout prune).
141    if let Err(reason) = ctx
142        .dispatch(Command::DeleteBlock {
143            tab_id: tab_id.clone(),
144            block_id: block_id.clone(),
145        })
146        .await
147    {
148        // No automatic compensation — un-deleting a block requires
149        // reconstructing both SQLite + PTY which we cannot do from
150        // saga state. Surface the failure; PR 2's restart-recovery
151        // scan picks up the durable log row for operator review.
152        tracing::warn!(
153            tab_id = %tab_id,
154            block_id = %block_id,
155            "[saga] DeleteBlock dispatch failed (no automatic compensation): {}",
156            reason
157        );
158        return Err(format!("DeleteBlock: {}", reason));
159    }
160
161    Ok(json!({
162        "tab_id": tab_id,
163        "block_id": block_id,
164    }))
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::backend::obj::Block;
171    use crate::server::tests::test_state;
172    use agentmux_common::ipc::Event;
173
174    async fn dispatch_apply(
175        state: &crate::server::AppState,
176        cmd: agentmux_common::ipc::Command,
177    ) -> Vec<agentmux_common::ipc::Event> {
178        let events = crate::server::service::dispatch_to_reducer(state, cmd).await;
179        for ev in &events {
180            crate::persist_subscriber::apply_event_to_wstore(ev, &state.wstore).unwrap();
181        }
182        events
183    }
184
185    /// Seed a workspace + tab + block and return their ids.
186    async fn seed() -> (
187        crate::server::AppState,
188        String, // workspace_id
189        String, // tab_id
190        String, // block_id
191    ) {
192        let state = test_state();
193        let ws_evs = dispatch_apply(
194            &state,
195            agentmux_common::ipc::Command::CreateWorkspace { name: "w".into() },
196        )
197        .await;
198        let ws_id = ws_evs
199            .iter()
200            .find_map(|e| match e {
201                Event::WorkspaceCreated { workspace_id, .. } => Some(workspace_id.clone()),
202                _ => None,
203            })
204            .unwrap();
205        let tab_evs = dispatch_apply(
206            &state,
207            agentmux_common::ipc::Command::CreateTab {
208                workspace_id: ws_id.clone(),
209                name: "t".into(),
210            },
211        )
212        .await;
213        let tab_id = tab_evs
214            .iter()
215            .find_map(|e| match e {
216                Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
217                _ => None,
218            })
219            .unwrap();
220        let blk_evs = dispatch_apply(
221            &state,
222            agentmux_common::ipc::Command::CreateBlock {
223                tab_id: tab_id.clone(),
224                meta: serde_json::Value::Null,
225            },
226        )
227        .await;
228        let block_id = blk_evs
229            .iter()
230            .find_map(|e| match e {
231                Event::BlockCreated { block_id, .. } => Some(block_id.clone()),
232                _ => None,
233            })
234            .unwrap();
235        (state, ws_id, tab_id, block_id)
236    }
237
238    #[tokio::test]
239    async fn happy_path_removes_block_from_reducer_and_sqlite() {
240        let (state, _ws_id, tab_id, block_id) = seed().await;
241
242        // Sanity: block is present pre-delete.
243        {
244            let s = state.srv_state.lock().await;
245            assert!(s.blocks.contains_key(&block_id));
246            assert_eq!(s.tabs[&tab_id].block_ids, vec![block_id.clone()]);
247        }
248        assert!(state.wstore.get::<Block>(&block_id).unwrap().is_some());
249
250        let result = run(&state, tab_id.clone(), block_id.clone()).await.unwrap();
251        assert_eq!(result["block_id"], block_id);
252        assert_eq!(result["tab_id"], tab_id);
253
254        // Reducer: block gone, tab's block_ids empty.
255        let s = state.srv_state.lock().await;
256        assert!(!s.blocks.contains_key(&block_id));
257        assert!(s.tabs[&tab_id].block_ids.is_empty());
258        drop(s);
259
260        // SQLite: block gone.
261        assert!(state.wstore.get::<Block>(&block_id).unwrap().is_none());
262    }
263
264    #[tokio::test]
265    async fn rejects_when_block_not_found() {
266        let (state, _ws_id, tab_id, _block_id) = seed().await;
267        let err = run(&state, tab_id, "ghost-block".into()).await.unwrap_err();
268        assert!(err.contains("block not found"), "got: {}", err);
269    }
270
271    #[tokio::test]
272    async fn rejects_when_block_in_different_tab() {
273        let (state, _ws_id, tab_id, block_id) = seed().await;
274        // Create a second tab; ask to delete block via that tab's id.
275        let tab_evs = dispatch_apply(
276            &state,
277            agentmux_common::ipc::Command::CreateTab {
278                workspace_id: _ws_id.clone(),
279                name: "other".into(),
280            },
281        )
282        .await;
283        let other_tab = tab_evs
284            .iter()
285            .find_map(|e| match e {
286                Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
287                _ => None,
288            })
289            .unwrap();
290        let err = run(&state, other_tab, block_id.clone()).await.unwrap_err();
291        assert!(
292            err.contains("is in tab") && err.contains(&tab_id),
293            "got: {}",
294            err
295        );
296        // Block must still be present (saga rejected pre-dispatch).
297        let s = state.srv_state.lock().await;
298        assert!(s.blocks.contains_key(&block_id));
299    }
300}