agentmux_srv\sagas/delete_block.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.5.7 (Step 5 PR 1) — DeleteBlock saga.
5//
6// Replaces the SQLite-first delete pattern in `service.rs`'s
7// `("object", "DeleteBlock")` handler with a reducer-driven saga.
8// The legacy handler called `wcore::delete_block` first and then
9// dispatched `Command::DeleteBlock` to keep the reducer in sync — a
10// short-circuit that pre-dates the saga coordinator + persist
11// subscriber pattern (closes gap §4 in
12// `docs/retro/reducer-architecture-gaps-2026-05-01.md`).
13//
14// **Steps:**
15// 1. `DeleteBlock { tab_id, block_id }` — reducer removes the block
16// from canonical state and emits `Event::BlockDeleted`. The
17// persist subscriber writes SQLite via `wcore::delete_block`
18// (cascades to layout pruning).
19//
20// **Block controller side-effect:** the legacy RPC handler killed
21// the block's PTY/controller via `blockcontroller::delete_controller`
22// BEFORE the wcore SQLite delete. The saga preserves that ordering
23// — controller-kill happens in this function before the reducer
24// dispatch, since the persist subscriber's `wcore::delete_block`
25// only handles SQLite and layout pruning, not process teardown. We
26// still drop the controller even if the saga later short-circuits
27// (block-not-found): the controller registry is a process-local
28// map, idempotent on missing keys.
29//
30// **Compensation:** delete sagas are awkward to compensate — once
31// the block row + controller are gone, "un-delete" requires
32// reconstructing both the SQLite row and the PTY/process subtree,
33// neither of which is meaningful from saga state. We follow the
34// brief's pragma: log a warning on dispatch failure, no automatic
35// re-create. The reducer's `DeleteBlock` is silent-no-op on missing
36// inputs (see reducer.rs handle_delete_block), so the only failure
37// path is wstore write errors surfaced by the persist subscriber —
38// in which case the controller is already gone (intentional; the
39// PTY can't be partially-killed) and the SQLite row may or may not
40// have been written. PR 2's `compensate_unresolved` resume scan
41// surfaces these via the durable saga log for operator follow-up.
42//
43// **Pre-condition:** the block must exist in the reducer state.
44// Without this, the reducer would silently no-op (handle_delete_block
45// returns an empty event vec on missing tab/block) and the user
46// would see "delete succeeded" while nothing happened. The saga
47// surfaces a clear "block not found" error instead. We check the
48// reducer state (not SQLite) because `Command::DeleteBlock` carries
49// `tab_id` from the RPC's `uicontext.activetabid` and we want to
50// validate against the reducer's view of (tab → blocks) — that's
51// what the dispatch will mutate.
52
53use agentmux_common::ipc::Command;
54use serde_json::{json, Value};
55
56use super::{
57 alloc_saga_id, classify_run_saga_result, emit_saga_started, emit_terminal, run_saga, SagaCtx,
58};
59use crate::server::AppState;
60
61/// Run the DeleteBlock saga. On success returns
62/// `{"block_id": "...", "tab_id": "..."}`.
63pub async fn run(
64 state: &AppState,
65 tab_id: String,
66 block_id: String,
67) -> Result<Value, String> {
68 // Pre-condition: block exists and is in the named tab. Reducer
69 // would silent-no-op otherwise (see handle_delete_block); the
70 // saga surfaces a clear error instead.
71 {
72 let s = state.srv_state.lock().await;
73 match s.blocks.get(&block_id) {
74 None => {
75 return Err(format!("DeleteBlock: block not found: {}", block_id));
76 }
77 Some(block) if block.tab_id != tab_id => {
78 return Err(format!(
79 "DeleteBlock: block {} is in tab {}, not {}",
80 block_id, block.tab_id, tab_id
81 ));
82 }
83 _ => {}
84 }
85 if !s.tabs.contains_key(&tab_id) {
86 return Err(format!("DeleteBlock: tab not found: {}", tab_id));
87 }
88 }
89
90 let saga_id = alloc_saga_id(state);
91 if let Err(e) = emit_saga_started(
92 state,
93 saga_id,
94 "delete_block",
95 json!({
96 "tab_id": &tab_id,
97 "block_id": &block_id,
98 }),
99 )
100 .await
101 {
102 return Err(e);
103 }
104 let ctx = SagaCtx::new(state, saga_id);
105 let result = run_saga("delete_block", run_inner(ctx, tab_id, block_id.clone())).await;
106 // Controller-kill ordering. Three rounds of bot review:
107 // * Round 1 (agent): killed BEFORE emit_saga_started → reagent
108 // P2: side-effect leak if start_saga collides.
109 // * Round 2 (this PR): conditional on result.is_ok() → codex P2
110 // round 2: leaks PTY when reducer succeeds but
111 // `apply_event_to_wstore` fails inside `SagaCtx::dispatch`
112 // (block already removed from reducer state, RPC returns
113 // error, retry pre-check sees "block not found" → controller
114 // never cleaned up).
115 // * Round 3 (this fix): kill controller whenever the reducer
116 // dispatched DeleteBlock — i.e., whenever block was removed
117 // from reducer state. This includes both success and the
118 // reducer-succeeded-wstore-failed cases. We approximate this
119 // by checking reducer state for the block: if it's gone, the
120 // reducer dispatched (regardless of wstore outcome), so kill
121 // the controller. Idempotent on missing controller.
122 {
123 let block_still_in_reducer =
124 state.srv_state.lock().await.blocks.contains_key(&block_id);
125 if !block_still_in_reducer {
126 crate::backend::blockcontroller::delete_controller(&block_id);
127 }
128 }
129 emit_terminal(state, saga_id, classify_run_saga_result(&result)).await;
130 result
131}
132
133async fn run_inner(
134 ctx: SagaCtx<'_>,
135 tab_id: String,
136 block_id: String,
137) -> Result<Value, String> {
138 // Step 1: dispatch DeleteBlock through the reducer. The persist
139 // subscriber sees the BlockDeleted event and runs
140 // `wcore::delete_block` (SQLite delete + layout prune).
141 if let Err(reason) = ctx
142 .dispatch(Command::DeleteBlock {
143 tab_id: tab_id.clone(),
144 block_id: block_id.clone(),
145 })
146 .await
147 {
148 // No automatic compensation — un-deleting a block requires
149 // reconstructing both SQLite + PTY which we cannot do from
150 // saga state. Surface the failure; PR 2's restart-recovery
151 // scan picks up the durable log row for operator review.
152 tracing::warn!(
153 tab_id = %tab_id,
154 block_id = %block_id,
155 "[saga] DeleteBlock dispatch failed (no automatic compensation): {}",
156 reason
157 );
158 return Err(format!("DeleteBlock: {}", reason));
159 }
160
161 Ok(json!({
162 "tab_id": tab_id,
163 "block_id": block_id,
164 }))
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::backend::obj::Block;
171 use crate::server::tests::test_state;
172 use agentmux_common::ipc::Event;
173
174 async fn dispatch_apply(
175 state: &crate::server::AppState,
176 cmd: agentmux_common::ipc::Command,
177 ) -> Vec<agentmux_common::ipc::Event> {
178 let events = crate::server::service::dispatch_to_reducer(state, cmd).await;
179 for ev in &events {
180 crate::persist_subscriber::apply_event_to_wstore(ev, &state.wstore).unwrap();
181 }
182 events
183 }
184
185 /// Seed a workspace + tab + block and return their ids.
186 async fn seed() -> (
187 crate::server::AppState,
188 String, // workspace_id
189 String, // tab_id
190 String, // block_id
191 ) {
192 let state = test_state();
193 let ws_evs = dispatch_apply(
194 &state,
195 agentmux_common::ipc::Command::CreateWorkspace { name: "w".into() },
196 )
197 .await;
198 let ws_id = ws_evs
199 .iter()
200 .find_map(|e| match e {
201 Event::WorkspaceCreated { workspace_id, .. } => Some(workspace_id.clone()),
202 _ => None,
203 })
204 .unwrap();
205 let tab_evs = dispatch_apply(
206 &state,
207 agentmux_common::ipc::Command::CreateTab {
208 workspace_id: ws_id.clone(),
209 name: "t".into(),
210 },
211 )
212 .await;
213 let tab_id = tab_evs
214 .iter()
215 .find_map(|e| match e {
216 Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
217 _ => None,
218 })
219 .unwrap();
220 let blk_evs = dispatch_apply(
221 &state,
222 agentmux_common::ipc::Command::CreateBlock {
223 tab_id: tab_id.clone(),
224 meta: serde_json::Value::Null,
225 },
226 )
227 .await;
228 let block_id = blk_evs
229 .iter()
230 .find_map(|e| match e {
231 Event::BlockCreated { block_id, .. } => Some(block_id.clone()),
232 _ => None,
233 })
234 .unwrap();
235 (state, ws_id, tab_id, block_id)
236 }
237
238 #[tokio::test]
239 async fn happy_path_removes_block_from_reducer_and_sqlite() {
240 let (state, _ws_id, tab_id, block_id) = seed().await;
241
242 // Sanity: block is present pre-delete.
243 {
244 let s = state.srv_state.lock().await;
245 assert!(s.blocks.contains_key(&block_id));
246 assert_eq!(s.tabs[&tab_id].block_ids, vec![block_id.clone()]);
247 }
248 assert!(state.wstore.get::<Block>(&block_id).unwrap().is_some());
249
250 let result = run(&state, tab_id.clone(), block_id.clone()).await.unwrap();
251 assert_eq!(result["block_id"], block_id);
252 assert_eq!(result["tab_id"], tab_id);
253
254 // Reducer: block gone, tab's block_ids empty.
255 let s = state.srv_state.lock().await;
256 assert!(!s.blocks.contains_key(&block_id));
257 assert!(s.tabs[&tab_id].block_ids.is_empty());
258 drop(s);
259
260 // SQLite: block gone.
261 assert!(state.wstore.get::<Block>(&block_id).unwrap().is_none());
262 }
263
264 #[tokio::test]
265 async fn rejects_when_block_not_found() {
266 let (state, _ws_id, tab_id, _block_id) = seed().await;
267 let err = run(&state, tab_id, "ghost-block".into()).await.unwrap_err();
268 assert!(err.contains("block not found"), "got: {}", err);
269 }
270
271 #[tokio::test]
272 async fn rejects_when_block_in_different_tab() {
273 let (state, _ws_id, tab_id, block_id) = seed().await;
274 // Create a second tab; ask to delete block via that tab's id.
275 let tab_evs = dispatch_apply(
276 &state,
277 agentmux_common::ipc::Command::CreateTab {
278 workspace_id: _ws_id.clone(),
279 name: "other".into(),
280 },
281 )
282 .await;
283 let other_tab = tab_evs
284 .iter()
285 .find_map(|e| match e {
286 Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
287 _ => None,
288 })
289 .unwrap();
290 let err = run(&state, other_tab, block_id.clone()).await.unwrap_err();
291 assert!(
292 err.contains("is in tab") && err.contains(&tab_id),
293 "got: {}",
294 err
295 );
296 // Block must still be present (saga rejected pre-dispatch).
297 let s = state.srv_state.lock().await;
298 assert!(s.blocks.contains_key(&block_id));
299 }
300}