agentmux_srv/
persist_subscriber.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase E.2c.1+E.2c.2 — persist subscriber. A tokio task that
5// consumes the srv reducer's broadcast bus and mirrors
6// workspace/tab/block lifecycle events back to SQLite via wcore.
7//
8// Status:
9//   * E.2c.1 (#614) shipped the per-event apply path. Subscriber was
10//     dead-code in production because no producer existed.
11//   * E.2c.2 (this) wires the workspace HTTP/WS RPC handlers through
12//     the reducer, making the subscriber LIVE for workspace events.
13//     Adds full-resync-on-`Lagged` for workspaces (RPC-migrated
14//     entities only — tab/block resync lands in E.2c.3 / E.2c.4).
15//
16// Bus-lag handling:
17//   On RecvError::Lagged(n) the subscriber drops `n` events.
18//   Naive HWM advancement past dropped events would permanently
19//   diverge SQLite from reducer state. Instead we do a workspace-
20//   scoped resync: snapshot `state.workspaces` under the reducer
21//   mutex, write each workspace into SQLite (idempotent insert /
22//   no-op / update). The resync is INSERT/UPDATE only — no deletes —
23//   because workspaces can also be created OUTSIDE the reducer
24//   during the migration window (e.g., the still-wcore-direct
25//   `CreateWindow` flow calls `wcore::create_window_full` which
26//   creates a workspace under the hood). Deleting workspaces
27//   missing from the reducer snapshot would lose those legitimate
28//   rows. Stale workspaces deleted-via-reducer that the subscriber
29//   missed on Lagged linger on disk until the next user-driven
30//   DeleteWorkspace cleans them up via the wcore fallback in
31//   service.rs. (codex P1 #615.)
32//
33//   IMPORTANT: tab/block resync is NOT done here yet. Tabs and
34//   blocks remain RPC-direct via wcore; the reducer's view of them
35//   is FROZEN at bootstrap. Doing tab/block resync before E.2c.3
36//   and E.2c.4 land would clobber RPC-driven writes the reducer
37//   doesn't see. Resync expands as each entity migrates.
38
39use std::sync::Arc;
40
41use agentmux_common::ipc::Event;
42use tokio::sync::{broadcast, Mutex};
43
44use crate::backend::obj::{Block, Client, LayoutState as PersistedLayoutState, Tab, Window, Workspace};
45use crate::backend::storage::wstore::WaveStore;
46use crate::backend::wcore;
47use crate::state::State;
48
49/// Spawn the persist subscriber task. Runs until the broadcast
50/// channel closes (i.e., the reducer's bus is dropped at process
51/// shutdown). The `state` handle is used for workspace-scoped
52/// full-resync after a `RecvError::Lagged`.
53pub fn spawn_persist_subscriber(
54    events_rx: broadcast::Receiver<Event>,
55    wstore: Arc<WaveStore>,
56    state: Arc<Mutex<State>>,
57) -> tokio::task::JoinHandle<()> {
58    tokio::spawn(run_persist_subscriber(events_rx, wstore, state))
59}
60
61async fn run_persist_subscriber(
62    mut events_rx: broadcast::Receiver<Event>,
63    wstore: Arc<WaveStore>,
64    state: Arc<Mutex<State>>,
65) {
66    tracing::info!(target: "srv-persist-subscriber", "[srv-persist-subscriber] started");
67    loop {
68        match events_rx.recv().await {
69            Ok(event) => {
70                if let Err(e) = apply_event_to_wstore(&event, &wstore) {
71                    tracing::warn!(
72                        target: "srv-persist-subscriber",
73                        "[srv-persist-subscriber] apply failed for event {:?}: {}",
74                        event_kind(&event),
75                        e
76                    );
77                }
78            }
79            Err(broadcast::error::RecvError::Lagged(n)) => {
80                // Workspace-scoped full resync. Subscriber dropped
81                // `n` events; rather than guess which entities were
82                // affected, snapshot the reducer's workspace map and
83                // reconcile SQLite against it. RPC-migrated entities
84                // (workspaces in E.2c.2) are correctly recovered;
85                // not-yet-migrated entities (tab/block) are left
86                // alone — see module-level docs.
87                tracing::warn!(
88                    target: "srv-persist-subscriber",
89                    "[srv-persist-subscriber] dropped {} event(s) — running workspace resync",
90                    n
91                );
92                if let Err(e) = resync_workspaces(&state, &wstore).await {
93                    tracing::error!(
94                        target: "srv-persist-subscriber",
95                        "[srv-persist-subscriber] resync failed: {} — SQLite may diverge from reducer until next event",
96                        e
97                    );
98                }
99            }
100            Err(broadcast::error::RecvError::Closed) => {
101                tracing::info!(target: "srv-persist-subscriber", "[srv-persist-subscriber] bus closed — exiting");
102                return;
103            }
104        }
105    }
106}
107
108/// Snapshot `state.workspaces` and reconcile against SQLite —
109/// INSERT/UPDATE only, no deletes. Workspace-scoped (tab/block
110/// resync expands here as each entity's RPC layer migrates).
111///
112/// Why no delete-phase: during the migration window, workspaces
113/// can be created OUTSIDE the reducer (e.g., the still-wcore-direct
114/// `CreateWindow` flow calls `wcore::create_window_full` which
115/// creates a workspace under the hood). Those workspaces aren't in
116/// `state.workspaces`. If the subscriber lagged and we did a
117/// delete-not-in-snapshot pass, we'd cascade-delete legitimate
118/// workspaces — data loss triggered by bus pressure rather than a
119/// user action. Far better for a stale workspace deleted-via-reducer
120/// to linger on disk after a Lagged event than to lose a real one.
121/// (codex P1 #615.)
122///
123/// Stale-after-Lagged scenario: reducer fires WorkspaceDeleted, the
124/// subscriber drops it on Lagged, resync runs but only insert/updates.
125/// SQLite still has the deleted workspace's row. The next user-driven
126/// DeleteWorkspace (which falls through to `wcore::delete_workspace`
127/// for unknown-to-reducer rows) cleans it up. Acceptable behaviour
128/// during the migration; tightens once all RPC is reducer-driven.
129///
130/// Strategy:
131///   1. Lock state, snapshot the workspace map (ids + names),
132///      release the lock.
133///   2. For each workspace in the snapshot: insert if missing,
134///      no-op if name matches, update if name differs.
135async fn resync_workspaces(
136    state: &Arc<Mutex<State>>,
137    wstore: &WaveStore,
138) -> Result<(), Box<dyn std::error::Error>> {
139    // Snapshot under lock; release before any I/O.
140    let snapshot: Vec<(String, String)> = {
141        let s = state.lock().await;
142        s.workspaces
143            .values()
144            .map(|w| (w.workspace_id.clone(), w.name.clone()))
145            .collect()
146    };
147
148    for (workspace_id, name) in &snapshot {
149        match wstore.get::<Workspace>(workspace_id)? {
150            Some(existing) if existing.name == *name => {
151                // Already in sync.
152            }
153            Some(mut existing) => {
154                existing.name = name.clone();
155                wstore.update(&mut existing)?;
156            }
157            None => {
158                let mut ws = Workspace {
159                    oid: workspace_id.clone(),
160                    name: name.clone(),
161                    ..Default::default()
162                };
163                wstore.insert(&mut ws)?;
164            }
165        }
166    }
167    Ok(())
168}
169
170/// Apply one reducer event to the on-disk store. Idempotent: each
171/// arm checks for the entity's current SQLite state before writing
172/// so duplicate events (from at-least-once delivery semantics) don't
173/// produce duplicate rows or wcore errors.
174///
175/// Phase E.2c.2 — exposed at crate visibility so RPC handlers in
176/// `service.rs` can apply events synchronously after dispatching
177/// through the reducer. This closes the race where the async
178/// subscriber hadn't yet written SQLite by the time a follow-up
179/// RPC (e.g., `CreateTab` against a just-created workspace) tried
180/// to read it. Calling this from the RPC handler followed by the
181/// subscriber receiving the broadcast event is safe because the
182/// arms are idempotent — the subscriber's later apply is a no-op.
183pub(crate) fn apply_event_to_wstore(
184    event: &Event,
185    wstore: &WaveStore,
186) -> Result<(), Box<dyn std::error::Error>> {
187    match event {
188        Event::WorkspaceCreated {
189            workspace_id, name, ..
190        } => apply_workspace_created(wstore, workspace_id, name),
191        Event::WorkspaceDeleted { workspace_id, .. } => {
192            apply_workspace_deleted(wstore, workspace_id)
193        }
194        Event::TabCreated {
195            workspace_id,
196            tab_id,
197            name,
198            ..
199        } => apply_tab_created(wstore, workspace_id, tab_id, name),
200        Event::TabDeleted {
201            workspace_id,
202            tab_id,
203            ..
204        } => apply_tab_deleted(wstore, workspace_id, tab_id),
205        Event::ActiveTabChanged {
206            workspace_id,
207            tab_id,
208            ..
209        } => apply_active_tab_changed(wstore, workspace_id, tab_id.as_deref()),
210        Event::TabReordered {
211            workspace_id,
212            tab_id,
213            new_index,
214            ..
215        } => apply_tab_reordered(wstore, workspace_id, tab_id, *new_index),
216        Event::BlockCreated {
217            tab_id,
218            block_id,
219            meta,
220            ..
221        } => apply_block_created(wstore, tab_id, block_id, meta),
222        Event::BlockDeleted {
223            tab_id, block_id, ..
224        } => apply_block_deleted(wstore, tab_id, block_id),
225        Event::SrvWindowOpened {
226            window_id,
227            workspace_id,
228            ..
229        } => apply_srv_window_opened(wstore, window_id, workspace_id),
230        Event::SrvWindowClosed { window_id, .. } => apply_srv_window_closed(wstore, window_id),
231        Event::SrvWindowWorkspaceChanged {
232            window_id,
233            workspace_id,
234            ..
235        } => apply_srv_window_workspace_changed(wstore, window_id, workspace_id),
236        Event::TabsReorderedBulk {
237            workspace_id,
238            tab_ids,
239            ..
240        } => apply_tabs_reordered_bulk(wstore, workspace_id, tab_ids),
241        Event::WorkspaceRenamed {
242            workspace_id, name, ..
243        } => apply_workspace_renamed(wstore, workspace_id, name),
244        Event::TabRenamed { tab_id, name, .. } => apply_tab_renamed(wstore, tab_id, name),
245        Event::WorkspaceMetaUpdated {
246            workspace_id,
247            meta_patch,
248            ..
249        } => apply_workspace_meta_updated(wstore, workspace_id, meta_patch),
250        Event::WindowMetaUpdated {
251            window_id,
252            meta_patch,
253            ..
254        } => apply_window_meta_updated(wstore, window_id, meta_patch),
255        Event::TabMetaUpdated {
256            tab_id,
257            meta_patch,
258            ..
259        } => apply_tab_meta_updated(wstore, tab_id, meta_patch),
260        Event::BlockMetaUpdated {
261            block_id,
262            meta_patch,
263            ..
264        } => apply_block_meta_updated(wstore, block_id, meta_patch),
265        Event::TabMoved {
266            tab_id,
267            src_workspace_id,
268            dst_workspace_id,
269            dst_index,
270            new_src_active_tab_id,
271            new_dst_active_tab_id,
272            ..
273        } => apply_tab_moved(
274            wstore,
275            tab_id,
276            src_workspace_id,
277            dst_workspace_id,
278            *dst_index,
279            new_src_active_tab_id.as_deref(),
280            new_dst_active_tab_id.as_deref(),
281        ),
282        Event::BlockMoved {
283            block_id,
284            src_tab_id,
285            dst_tab_id,
286            dst_index,
287            ..
288        } => apply_block_moved(wstore, block_id, src_tab_id, dst_tab_id, *dst_index),
289        Event::FocusedNodeChanged { tab_id, node_id, .. } => {
290            apply_focused_node_changed(wstore, tab_id, node_id)
291        }
292        Event::MagnifiedNodeChanged { tab_id, node_id, .. } => {
293            apply_magnified_node_changed(wstore, tab_id, node_id)
294        }
295        // All other event variants are not domain-state mutations
296        // (lifecycle, errors, snapshots). The subscriber ignores them.
297        _ => Ok(()),
298    }
299}
300
301fn apply_workspace_created(
302    wstore: &WaveStore,
303    workspace_id: &str,
304    name: &str,
305) -> Result<(), Box<dyn std::error::Error>> {
306    if wstore.get::<Workspace>(workspace_id)?.is_some() {
307        return Ok(()); // already present
308    }
309    let mut ws = Workspace {
310        oid: workspace_id.to_string(),
311        name: name.to_string(),
312        ..Default::default()
313    };
314    wstore.insert(&mut ws)?;
315    Ok(())
316}
317
318fn apply_workspace_deleted(
319    wstore: &WaveStore,
320    workspace_id: &str,
321) -> Result<(), Box<dyn std::error::Error>> {
322    if wstore.get::<Workspace>(workspace_id)?.is_none() {
323        return Ok(()); // already gone
324    }
325    wcore::delete_workspace(wstore, workspace_id)?;
326    Ok(())
327}
328
329fn apply_tab_created(
330    wstore: &WaveStore,
331    workspace_id: &str,
332    tab_id: &str,
333    name: &str,
334) -> Result<(), Box<dyn std::error::Error>> {
335    // F1.A — wrap the insert-layout + insert-tab + update-workspace
336    // sequence in a single SQLite transaction so a partial failure
337    // can't leave a tab without a layout or a workspace whose
338    // tab_ids points at a tab that doesn't exist on disk.
339    wstore.with_tx(|tx| {
340        if tx.get::<Tab>(tab_id)?.is_none() {
341            // Phase E.2c.2 — create a LayoutState row alongside the
342            // Tab so downstream flows that hard-require
343            // `Tab.layoutstate` (e.g., `wcore::tear_off_block`'s
344            // `must_get::<LayoutState>(&tab.layoutstate)`) work for
345            // reducer-originated tabs. Mirrors the wcore::create_tab
346            // pattern. (codex P2 #614.)
347            let mut layout = PersistedLayoutState {
348                oid: uuid::Uuid::new_v4().to_string(),
349                rootnode: None,
350                magnifiednodeid: String::new(),
351                focusednodeid: String::new(),
352                leaforder: None,
353                pendingbackendactions: None,
354                meta: None,
355                ..Default::default()
356            };
357            tx.insert(&mut layout)?;
358            let mut tab = Tab {
359                oid: tab_id.to_string(),
360                name: name.to_string(),
361                layoutstate: layout.oid.clone(),
362                ..Default::default()
363            };
364            tx.insert(&mut tab)?;
365        }
366        // Idempotently link tab into the workspace's tabids.
367        if let Some(mut ws) = tx.get::<Workspace>(workspace_id)? {
368            let already_present = ws.tabids.iter().any(|t| t == tab_id)
369                || ws.pinnedtabids.iter().any(|t| t == tab_id);
370            if !already_present {
371                ws.tabids.push(tab_id.to_string());
372                tx.update(&mut ws)?;
373            }
374        }
375        Ok(())
376    })?;
377    Ok(())
378}
379
380fn apply_tab_deleted(
381    wstore: &WaveStore,
382    workspace_id: &str,
383    tab_id: &str,
384) -> Result<(), Box<dyn std::error::Error>> {
385    if wstore.get::<Tab>(tab_id)?.is_none() {
386        return Ok(()); // already gone (or never existed)
387    }
388    // wcore::delete_tab handles unlinking from the workspace's
389    // tabids, deleting the tab's blocks + layout, and the tab row
390    // itself. Returns NotFound errors if any of those don't exist —
391    // we surface those as the operation having already happened.
392    match wcore::delete_tab(wstore, workspace_id, tab_id) {
393        Ok(()) => Ok(()),
394        Err(crate::backend::storage::StoreError::NotFound) => Ok(()),
395        Err(e) => Err(Box::new(e)),
396    }
397}
398
399fn apply_active_tab_changed(
400    wstore: &WaveStore,
401    workspace_id: &str,
402    tab_id: Option<&str>,
403) -> Result<(), Box<dyn std::error::Error>> {
404    let Some(mut ws) = wstore.get::<Workspace>(workspace_id)? else {
405        return Ok(()); // workspace already gone — nothing to update
406    };
407    let new = tab_id.unwrap_or("").to_string();
408    if ws.activetabid == new {
409        return Ok(());
410    }
411    ws.activetabid = new;
412    wstore.update(&mut ws)?;
413    Ok(())
414}
415
416fn apply_tab_reordered(
417    wstore: &WaveStore,
418    workspace_id: &str,
419    tab_id: &str,
420    new_index: u32,
421) -> Result<(), Box<dyn std::error::Error>> {
422    let Some(mut ws) = wstore.get::<Workspace>(workspace_id)? else {
423        return Ok(());
424    };
425    // Pinning was removed from AgentMux but legacy SQLite databases
426    // can still have entries in `Workspace.pinnedtabids`; bootstrap
427    // surfaces those as regular tabs in the reducer's `tab_ids`. The
428    // reducer can therefore emit a TabReordered for a tab that on
429    // disk still lives in `pinnedtabids`. Search both lists so the
430    // reorder lands wherever the tab actually is. (codex P2 #617.)
431    let target_index = (new_index as usize);
432    if let Some(current_pos) = ws.tabids.iter().position(|t| t == tab_id) {
433        let len = ws.tabids.len();
434        let target = target_index.min(len.saturating_sub(1));
435        if current_pos == target {
436            return Ok(());
437        }
438        let id = ws.tabids.remove(current_pos);
439        ws.tabids.insert(target, id);
440        wstore.update(&mut ws)?;
441    } else if let Some(current_pos) = ws.pinnedtabids.iter().position(|t| t == tab_id) {
442        let len = ws.pinnedtabids.len();
443        let target = target_index.min(len.saturating_sub(1));
444        if current_pos == target {
445            return Ok(());
446        }
447        let id = ws.pinnedtabids.remove(current_pos);
448        ws.pinnedtabids.insert(target, id);
449        wstore.update(&mut ws)?;
450    }
451    // Tab not in either list — silent no-op (idempotent).
452    Ok(())
453}
454
455fn apply_block_created(
456    wstore: &WaveStore,
457    tab_id: &str,
458    block_id: &str,
459    meta: &serde_json::Value,
460) -> Result<(), Box<dyn std::error::Error>> {
461    // F1.A — block insert + tab.blockids update wrapped in one tx.
462    let meta_map: crate::backend::obj::MetaMapType = match meta {
463        serde_json::Value::Object(_) => {
464            serde_json::from_value(meta.clone()).unwrap_or_default()
465        }
466        _ => Default::default(),
467    };
468    wstore.with_tx(|tx| {
469        if tx.get::<Block>(block_id)?.is_none() {
470            // Phase E.2c.4 — write the meta map carried in the event
471            // (`view`, layout hints, etc.) so reducer-routed
472            // CreateBlock RPC produces blocks with valid view/meta in
473            // SQLite. Without this, the frontend sees a block with
474            // empty meta and renders a blank pane.
475            let mut block = Block {
476                oid: block_id.to_string(),
477                parentoref: format!("tab:{}", tab_id),
478                meta: meta_map,
479                ..Default::default()
480            };
481            tx.insert(&mut block)?;
482        }
483        if let Some(mut tab) = tx.get::<Tab>(tab_id)? {
484            if !tab.blockids.iter().any(|b| b == block_id) {
485                tab.blockids.push(block_id.to_string());
486                tx.update(&mut tab)?;
487            }
488        }
489        Ok(())
490    })?;
491    Ok(())
492}
493
494fn apply_block_deleted(
495    wstore: &WaveStore,
496    tab_id: &str,
497    block_id: &str,
498) -> Result<(), Box<dyn std::error::Error>> {
499    if wstore.get::<Block>(block_id)?.is_none() {
500        return Ok(()); // already gone
501    }
502    match wcore::delete_block(wstore, tab_id, block_id) {
503        Ok(()) => Ok(()),
504        Err(crate::backend::storage::StoreError::NotFound) => Ok(()),
505        Err(e) => Err(Box::new(e)),
506    }
507}
508
509/// Phase E.5 — record/update a window's workspace pointer on the
510/// persisted Window row. Idempotent: inserts if missing, updates
511/// if `workspaceid` differs, no-ops if identical.
512///
513/// Also keeps `Client.windowids` in sync — appends the new window_id
514/// when the Window row is freshly created. The legacy
515/// `wcore::create_window_full` path does this too; without it, the
516/// Window row exists but `GetClientData` / focus-order logic can't
517/// see it. (codex P1 #619.)
518fn apply_srv_window_opened(
519    wstore: &WaveStore,
520    window_id: &str,
521    workspace_id: &str,
522) -> Result<(), Box<dyn std::error::Error>> {
523    // F1.A — Window upsert + Client.windowids update wrapped in one
524    // tx so a partial failure can't leave a Window row that
525    // `GetClientData` doesn't see (or a `windowids` entry pointing
526    // at a Window row that wasn't written).
527    wstore.with_tx(|tx| {
528        let was_new = match tx.get::<Window>(window_id)? {
529            Some(existing) if existing.workspaceid == workspace_id => false,
530            Some(mut existing) => {
531                existing.workspaceid = workspace_id.to_string();
532                tx.update(&mut existing)?;
533                false
534            }
535            None => {
536                let mut window = Window {
537                    oid: window_id.to_string(),
538                    workspaceid: workspace_id.to_string(),
539                    ..Default::default()
540                };
541                tx.insert(&mut window)?;
542                true
543            }
544        };
545        if was_new {
546            // Inline `wcore::get_client` against the tx-handle to keep
547            // the Client.windowids update in the same transaction.
548            let clients = tx.get_all::<Client>()?;
549            if let Some(mut client) = clients.into_iter().next() {
550                if !client.windowids.iter().any(|id| id == window_id) {
551                    client.windowids.push(window_id.to_string());
552                    tx.update(&mut client)?;
553                }
554            }
555        }
556        Ok(())
557    })?;
558    Ok(())
559}
560
561/// Phase E.5 — delete the persisted Window row + remove the id
562/// from `Client.windowids`. Mirrors `wcore::close_window`'s order:
563/// prune client.windowids FIRST (so any read between the two ops
564/// doesn't see a dangling id), then delete the Window row.
565/// (codex P1 #619.)
566fn apply_srv_window_closed(
567    wstore: &WaveStore,
568    window_id: &str,
569) -> Result<(), Box<dyn std::error::Error>> {
570    // F1.A — Client.windowids prune + Window-row delete in one tx.
571    // Order matters (mirrors `wcore::close_window`): prune client
572    // FIRST so any read between the two ops doesn't see a dangling
573    // id pointing at a Window row that's already been removed.
574    wstore.with_tx(|tx| {
575        let window_exists = tx.get::<Window>(window_id)?.is_some();
576        // Always defensively prune the client list — handles the
577        // already-gone case where divergence left a stale entry.
578        let clients = tx.get_all::<Client>()?;
579        if let Some(mut client) = clients.into_iter().next() {
580            if client.windowids.iter().any(|id| id == window_id) {
581                client.windowids.retain(|id| id != window_id);
582                tx.update(&mut client)?;
583            }
584        }
585        if window_exists {
586            tx.delete::<Window>(window_id)?;
587        }
588        Ok(())
589    })?;
590    Ok(())
591}
592
593/// Phase E.5 — same shape as `apply_srv_window_opened` for the
594/// upsert behavior; separate function for log-clarity since the
595/// emitted event is distinct.
596fn apply_srv_window_workspace_changed(
597    wstore: &WaveStore,
598    window_id: &str,
599    workspace_id: &str,
600) -> Result<(), Box<dyn std::error::Error>> {
601    apply_srv_window_opened(wstore, window_id, workspace_id)
602}
603
604/// Phase E.5.3 — replace the workspace's `tabids` with the new
605/// list and drain any legacy `pinnedtabids` rows. Pinning was a
606/// Waveterm feature removed from AgentMux; bootstrap merges legacy
607/// pinned tabs into the reducer's `tab_ids`, so the next reorder
608/// is the canonical full ordering. Leaving stale `pinnedtabids`
609/// in SQLite would cause UI double-insertion (`workspace.tsx`
610/// builds the displayed tab list as `[...pinnedtabids, ...tabids]`).
611fn apply_tabs_reordered_bulk(
612    wstore: &WaveStore,
613    workspace_id: &str,
614    tab_ids: &[String],
615) -> Result<(), Box<dyn std::error::Error>> {
616    let Some(mut ws) = wstore.get::<Workspace>(workspace_id)? else {
617        return Ok(());
618    };
619    let tabids_match = ws.tabids == tab_ids;
620    let pinned_clear = ws.pinnedtabids.is_empty();
621    if tabids_match && pinned_clear {
622        return Ok(());
623    }
624    ws.tabids = tab_ids.to_vec();
625    ws.pinnedtabids.clear();
626    wstore.update(&mut ws)?;
627    Ok(())
628}
629
630/// Phase E.5.3 — rename a persisted workspace.
631fn apply_workspace_renamed(
632    wstore: &WaveStore,
633    workspace_id: &str,
634    name: &str,
635) -> Result<(), Box<dyn std::error::Error>> {
636    let Some(mut ws) = wstore.get::<Workspace>(workspace_id)? else {
637        return Ok(());
638    };
639    if ws.name == name {
640        return Ok(());
641    }
642    ws.name = name.to_string();
643    wstore.update(&mut ws)?;
644    Ok(())
645}
646
647/// Phase E.5.3 — rename a persisted tab.
648fn apply_tab_renamed(
649    wstore: &WaveStore,
650    tab_id: &str,
651    name: &str,
652) -> Result<(), Box<dyn std::error::Error>> {
653    let Some(mut tab) = wstore.get::<Tab>(tab_id)? else {
654        return Ok(());
655    };
656    if tab.name == name {
657        return Ok(());
658    }
659    tab.name = name.to_string();
660    wstore.update(&mut tab)?;
661    Ok(())
662}
663
664/// Phase E.4 (Option A) — write the reducer-emitted focused node id
665/// onto the tab's `LayoutState.focusednodeid` column. Tab→layout is
666/// a join through `Tab.layoutstate`. Idempotent: silent no-op when the
667/// tab is unknown (deleted concurrently), the layout row is missing
668/// (legacy tab without one), or the value already matches. Other
669/// `LayoutState` fields stay on their existing wcore-direct path until
670/// Option B lands.
671fn apply_focused_node_changed(
672    wstore: &WaveStore,
673    tab_id: &str,
674    node_id: &str,
675) -> Result<(), Box<dyn std::error::Error>> {
676    let Some(tab) = wstore.get::<Tab>(tab_id)? else {
677        return Ok(());
678    };
679    if tab.layoutstate.is_empty() {
680        return Ok(());
681    }
682    let Some(mut layout) = wstore.get::<PersistedLayoutState>(&tab.layoutstate)? else {
683        return Ok(());
684    };
685    if layout.focusednodeid == node_id {
686        return Ok(());
687    }
688    layout.focusednodeid = node_id.to_string();
689    wstore.update(&mut layout)?;
690    Ok(())
691}
692
693/// Phase E.4 (Option A) — write the reducer-emitted magnified node id
694/// onto the tab's `LayoutState.magnifiednodeid` column. Same shape as
695/// `apply_focused_node_changed`.
696fn apply_magnified_node_changed(
697    wstore: &WaveStore,
698    tab_id: &str,
699    node_id: &str,
700) -> Result<(), Box<dyn std::error::Error>> {
701    let Some(tab) = wstore.get::<Tab>(tab_id)? else {
702        return Ok(());
703    };
704    if tab.layoutstate.is_empty() {
705        return Ok(());
706    }
707    let Some(mut layout) = wstore.get::<PersistedLayoutState>(&tab.layoutstate)? else {
708        return Ok(());
709    };
710    if layout.magnifiednodeid == node_id {
711        return Ok(());
712    }
713    layout.magnifiednodeid = node_id.to_string();
714    wstore.update(&mut layout)?;
715    Ok(())
716}
717
718/// Phase E.5.3 — apply a meta-patch to a workspace's `meta` map.
719/// Reducer doesn't track meta in `WorkspaceRecord`; this subscriber
720/// is the sole authority that mutates persisted meta. Patch is a
721/// JSON object that merges shallow-key-by-shallow-key on top of the
722/// existing meta. `null` values in the patch delete the key.
723fn apply_workspace_meta_updated(
724    wstore: &WaveStore,
725    workspace_id: &str,
726    meta_patch: &serde_json::Value,
727) -> Result<(), Box<dyn std::error::Error>> {
728    let Some(mut ws) = wstore.get::<Workspace>(workspace_id)? else {
729        return Ok(());
730    };
731    if merge_meta_patch(&mut ws.meta, meta_patch) {
732        wstore.update(&mut ws)?;
733    }
734    Ok(())
735}
736
737/// Phase E.5.x (issue #855) — apply a meta-patch to a window's `meta`
738/// map. Same shape as `apply_workspace_meta_updated`. Silent no-op if
739/// the window doesn't exist in wstore (preserves the idempotency
740/// contract — duplicate or stale events fold to no-op).
741fn apply_window_meta_updated(
742    wstore: &WaveStore,
743    window_id: &str,
744    meta_patch: &serde_json::Value,
745) -> Result<(), Box<dyn std::error::Error>> {
746    let Some(mut window) = wstore.get::<Window>(window_id)? else {
747        return Ok(());
748    };
749    if merge_meta_patch(&mut window.meta, meta_patch) {
750        wstore.update(&mut window)?;
751    }
752    Ok(())
753}
754
755/// Phase E.5.3 — apply a meta-patch to a tab's `meta` map.
756fn apply_tab_meta_updated(
757    wstore: &WaveStore,
758    tab_id: &str,
759    meta_patch: &serde_json::Value,
760) -> Result<(), Box<dyn std::error::Error>> {
761    let Some(mut tab) = wstore.get::<Tab>(tab_id)? else {
762        return Ok(());
763    };
764    if merge_meta_patch(&mut tab.meta, meta_patch) {
765        wstore.update(&mut tab)?;
766    }
767    Ok(())
768}
769
770/// Phase E.5.3 — apply a meta-patch to a block's `meta` map.
771fn apply_block_meta_updated(
772    wstore: &WaveStore,
773    block_id: &str,
774    meta_patch: &serde_json::Value,
775) -> Result<(), Box<dyn std::error::Error>> {
776    let Some(mut block) = wstore.get::<Block>(block_id)? else {
777        return Ok(());
778    };
779    if merge_meta_patch(&mut block.meta, meta_patch) {
780        wstore.update(&mut block)?;
781    }
782    Ok(())
783}
784
785/// Phase E.5.5 — apply a `TabMoved` event to SQLite.
786/// Idempotent: detects already-moved state (tab in dst, not in src)
787/// and returns `Ok(())` without re-mutating.
788///
789/// What this writes:
790/// * Removes `tab_id` from `src_workspace.tabids` (and `pinnedtabids`
791///   for legacy rows — bootstrap merges them into reducer state, so
792///   any leftover here is a stray legacy entry that should be drained).
793/// * Updates `src_workspace.activetabid` to `new_src_active_tab_id`
794///   (which may be empty when the source emptied).
795/// * Inserts `tab_id` at `dst_index` in `dst_workspace.tabids`,
796///   clamping to the dst list length.
797/// * Updates the `Tab` row's parent ref so loaders find it under
798///   the new workspace.
799fn apply_tab_moved(
800    wstore: &WaveStore,
801    tab_id: &str,
802    src_workspace_id: &str,
803    dst_workspace_id: &str,
804    dst_index: u32,
805    new_src_active_tab_id: Option<&str>,
806    new_dst_active_tab_id: Option<&str>,
807) -> Result<(), Box<dyn std::error::Error>> {
808    // F1.A — both workspaces' updates wrapped in one tx so a partial
809    // failure can't leave the tab in both src.tabids and dst.tabids
810    // simultaneously, or in neither.
811    wstore.with_tx(|tx| {
812        // Source workspace: remove the tab and update activetabid.
813        if let Some(mut src_ws) = tx.get::<Workspace>(src_workspace_id)? {
814            let len_before_tabids = src_ws.tabids.len();
815            let len_before_pinned = src_ws.pinnedtabids.len();
816            src_ws.tabids.retain(|id| id != tab_id);
817            src_ws.pinnedtabids.retain(|id| id != tab_id);
818            let new_active = new_src_active_tab_id.unwrap_or("").to_string();
819            let active_changed = src_ws.activetabid != new_active;
820            if src_ws.tabids.len() != len_before_tabids
821                || src_ws.pinnedtabids.len() != len_before_pinned
822                || active_changed
823            {
824                src_ws.activetabid = new_active;
825                tx.update(&mut src_ws)?;
826            }
827        }
828
829        // Dest workspace: insert at clamped index + update
830        // active_tab_id if the event carries one (codex P2 #621).
831        // Skip insert if the tab is already present (idempotent on
832        // duplicate delivery).
833        if let Some(mut dst_ws) = tx.get::<Workspace>(dst_workspace_id)? {
834            let mut changed = false;
835            if !dst_ws.tabids.iter().any(|id| id == tab_id) {
836                let clamped = (dst_index as usize).min(dst_ws.tabids.len());
837                dst_ws.tabids.insert(clamped, tab_id.to_string());
838                changed = true;
839            }
840            if let Some(new_active) = new_dst_active_tab_id {
841                if dst_ws.activetabid != new_active {
842                    dst_ws.activetabid = new_active.to_string();
843                    changed = true;
844                }
845            }
846            if changed {
847                tx.update(&mut dst_ws)?;
848            }
849        }
850
851        // No per-Tab parent ref to update — the workspace owns the
852        // parentage relationship via its `tabids` list.
853        Ok(())
854    })?;
855    Ok(())
856}
857
858/// Phase E.5.5 — apply a `BlockMoved` event to SQLite.
859/// Handles both cross-tab moves and intra-tab repositioning.
860/// Idempotent on re-delivery (checks current parent before mutating).
861fn apply_block_moved(
862    wstore: &WaveStore,
863    block_id: &str,
864    src_tab_id: &str,
865    dst_tab_id: &str,
866    dst_index: u32,
867) -> Result<(), Box<dyn std::error::Error>> {
868    // F1.A — src.blockids + dst.blockids + block.parentoref updates
869    // wrapped in one tx so a partial failure can't leave the block in
870    // both tabs' blockids or with parentoref pointing at the wrong
871    // tab.
872    wstore.with_tx(|tx| {
873        if src_tab_id == dst_tab_id {
874            // Intra-tab reposition: remove + re-insert in the same tab.
875            if let Some(mut tab) = tx.get::<Tab>(src_tab_id)? {
876                tab.blockids.retain(|id| id != block_id);
877                let clamped = (dst_index as usize).min(tab.blockids.len());
878                tab.blockids.insert(clamped, block_id.to_string());
879                tx.update(&mut tab)?;
880            }
881            return Ok(());
882        }
883
884        // Cross-tab: remove from src.
885        if let Some(mut src_tab) = tx.get::<Tab>(src_tab_id)? {
886            let before = src_tab.blockids.len();
887            src_tab.blockids.retain(|id| id != block_id);
888            if src_tab.blockids.len() != before {
889                tx.update(&mut src_tab)?;
890            }
891        }
892
893        // Insert into dst (skip if already there).
894        if let Some(mut dst_tab) = tx.get::<Tab>(dst_tab_id)? {
895            if !dst_tab.blockids.iter().any(|id| id == block_id) {
896                let clamped = (dst_index as usize).min(dst_tab.blockids.len());
897                dst_tab.blockids.insert(clamped, block_id.to_string());
898                tx.update(&mut dst_tab)?;
899            }
900        }
901
902        // Block row: update parent.
903        if let Some(mut block) = tx.get::<Block>(block_id)? {
904            let new_parent = format!("tab:{}", dst_tab_id);
905            if block.parentoref != new_parent {
906                block.parentoref = new_parent;
907                tx.update(&mut block)?;
908            }
909        }
910
911        Ok(())
912    })?;
913    Ok(())
914}
915
916/// Phase E.5.3 — shallow merge a JSON object patch into a
917/// `MetaMapType`. Mirrors `backend::obj::merge_meta` semantics so
918/// `UpdateObjectMeta` keeps behaving the same way after the reducer
919/// migration:
920/// - Keys ending in `:*` with a `true` value clear all keys with
921///   that prefix (e.g. `{"term:*": true}` removes every `term*`
922///   key) before regular merging.
923/// - `null` patch values delete the corresponding key.
924/// - Other values replace the key.
925/// Returns `true` if anything actually changed.
926fn merge_meta_patch(
927    meta: &mut crate::backend::obj::MetaMapType,
928    patch: &serde_json::Value,
929) -> bool {
930    let serde_json::Value::Object(patch_map) = patch else {
931        return false;
932    };
933    let mut changed = false;
934    // First pass: section-clear keys (`prefix:*` with `true`).
935    for (k, v) in patch_map {
936        if !k.ends_with(":*") {
937            continue;
938        }
939        if !matches!(v, serde_json::Value::Bool(true)) {
940            continue;
941        }
942        let prefix = k.trim_end_matches(":*");
943        if prefix.is_empty() {
944            continue;
945        }
946        let prefix_colon = format!("{prefix}:");
947        let before = meta.len();
948        meta.retain(|k2, _| k2 != prefix && !k2.starts_with(&prefix_colon));
949        if meta.len() != before {
950            changed = true;
951        }
952    }
953    // Second pass: regular merges and null deletes.
954    for (k, v) in patch_map {
955        if k.ends_with(":*") {
956            continue;
957        }
958        if v.is_null() {
959            if meta.remove(k).is_some() {
960                changed = true;
961            }
962            continue;
963        }
964        match meta.get(k) {
965            Some(existing) if existing == v => {}
966            _ => {
967                meta.insert(k.clone(), v.clone());
968                changed = true;
969            }
970        }
971    }
972    changed
973}
974
975/// Compact textual identifier for an event (debug logging only).
976fn event_kind(event: &Event) -> &'static str {
977    match event {
978        Event::WorkspaceCreated { .. } => "WorkspaceCreated",
979        Event::WorkspaceDeleted { .. } => "WorkspaceDeleted",
980        Event::TabCreated { .. } => "TabCreated",
981        Event::TabDeleted { .. } => "TabDeleted",
982        Event::ActiveTabChanged { .. } => "ActiveTabChanged",
983        Event::TabReordered { .. } => "TabReordered",
984        Event::SrvWindowOpened { .. } => "SrvWindowOpened",
985        Event::SrvWindowClosed { .. } => "SrvWindowClosed",
986        Event::SrvWindowWorkspaceChanged { .. } => "SrvWindowWorkspaceChanged",
987        Event::TabsReorderedBulk { .. } => "TabsReorderedBulk",
988        Event::WorkspaceRenamed { .. } => "WorkspaceRenamed",
989        Event::TabRenamed { .. } => "TabRenamed",
990        Event::WorkspaceMetaUpdated { .. } => "WorkspaceMetaUpdated",
991        Event::WindowMetaUpdated { .. } => "WindowMetaUpdated",
992        Event::TabMetaUpdated { .. } => "TabMetaUpdated",
993        Event::BlockMetaUpdated { .. } => "BlockMetaUpdated",
994        Event::TabMoved { .. } => "TabMoved",
995        Event::BlockMoved { .. } => "BlockMoved",
996        Event::BlockCreated { .. } => "BlockCreated",
997        Event::BlockDeleted { .. } => "BlockDeleted",
998        Event::FocusedNodeChanged { .. } => "FocusedNodeChanged",
999        Event::MagnifiedNodeChanged { .. } => "MagnifiedNodeChanged",
1000        _ => "Other",
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007    use crate::backend::storage::wstore::WaveStore;
1008
1009    fn store() -> Arc<WaveStore> {
1010        // In-memory SQLite for tests (matches existing wstore test pattern).
1011        let store = WaveStore::open_in_memory().expect("in-memory wstore");
1012        Arc::new(store)
1013    }
1014
1015    #[test]
1016    fn workspace_created_inserts_row() {
1017        let s = store();
1018        apply_event_to_wstore(
1019            &Event::WorkspaceCreated {
1020                workspace_id: "ws-1".into(),
1021                name: "Alpha".into(),
1022                version: 1,
1023            },
1024            &s,
1025        )
1026        .unwrap();
1027        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1028        assert_eq!(ws.name, "Alpha");
1029    }
1030
1031    #[test]
1032    fn workspace_created_idempotent_on_duplicate() {
1033        let s = store();
1034        let ev = Event::WorkspaceCreated {
1035            workspace_id: "ws-1".into(),
1036            name: "Alpha".into(),
1037            version: 1,
1038        };
1039        apply_event_to_wstore(&ev, &s).unwrap();
1040        // Second application should not error.
1041        apply_event_to_wstore(&ev, &s).unwrap();
1042        // Original name preserved (no overwrite).
1043        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1044        assert_eq!(ws.name, "Alpha");
1045    }
1046
1047    #[test]
1048    fn workspace_deleted_silent_when_missing() {
1049        let s = store();
1050        apply_event_to_wstore(
1051            &Event::WorkspaceDeleted {
1052                workspace_id: "ghost".into(),
1053                version: 1,
1054            },
1055            &s,
1056        )
1057        .unwrap();
1058    }
1059
1060    #[test]
1061    fn tab_created_links_into_workspace() {
1062        let s = store();
1063        apply_event_to_wstore(
1064            &Event::WorkspaceCreated {
1065                workspace_id: "ws-1".into(),
1066                name: "Alpha".into(),
1067                version: 1,
1068            },
1069            &s,
1070        )
1071        .unwrap();
1072        apply_event_to_wstore(
1073            &Event::TabCreated {
1074                workspace_id: "ws-1".into(),
1075                tab_id: "tab-1".into(),
1076                name: "Tab".into(),
1077                version: 2,
1078            },
1079            &s,
1080        )
1081        .unwrap();
1082        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1083        assert_eq!(ws.tabids, vec!["tab-1".to_string()]);
1084        let tab = s.get::<Tab>("tab-1").unwrap().unwrap();
1085        assert_eq!(tab.name, "Tab");
1086    }
1087
1088    #[test]
1089    fn tab_created_idempotent_on_duplicate_link() {
1090        let s = store();
1091        apply_event_to_wstore(
1092            &Event::WorkspaceCreated {
1093                workspace_id: "ws-1".into(),
1094                name: "Alpha".into(),
1095                version: 1,
1096            },
1097            &s,
1098        )
1099        .unwrap();
1100        let ev = Event::TabCreated {
1101            workspace_id: "ws-1".into(),
1102            tab_id: "tab-1".into(),
1103            name: "Tab".into(),
1104            version: 2,
1105        };
1106        apply_event_to_wstore(&ev, &s).unwrap();
1107        apply_event_to_wstore(&ev, &s).unwrap();
1108        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1109        assert_eq!(ws.tabids.len(), 1);
1110    }
1111
1112    #[test]
1113    fn active_tab_changed_updates_workspace() {
1114        let s = store();
1115        apply_event_to_wstore(
1116            &Event::WorkspaceCreated {
1117                workspace_id: "ws-1".into(),
1118                name: "Alpha".into(),
1119                version: 1,
1120            },
1121            &s,
1122        )
1123        .unwrap();
1124        apply_event_to_wstore(
1125            &Event::TabCreated {
1126                workspace_id: "ws-1".into(),
1127                tab_id: "tab-1".into(),
1128                name: "Tab".into(),
1129                version: 2,
1130            },
1131            &s,
1132        )
1133        .unwrap();
1134        apply_event_to_wstore(
1135            &Event::ActiveTabChanged {
1136                workspace_id: "ws-1".into(),
1137                tab_id: Some("tab-1".into()),
1138                version: 3,
1139            },
1140            &s,
1141        )
1142        .unwrap();
1143        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1144        assert_eq!(ws.activetabid, "tab-1");
1145    }
1146
1147    #[test]
1148    fn active_tab_changed_to_none_clears_activetabid() {
1149        let s = store();
1150        apply_event_to_wstore(
1151            &Event::WorkspaceCreated {
1152                workspace_id: "ws-1".into(),
1153                name: "Alpha".into(),
1154                version: 1,
1155            },
1156            &s,
1157        )
1158        .unwrap();
1159        apply_event_to_wstore(
1160            &Event::TabCreated {
1161                workspace_id: "ws-1".into(),
1162                tab_id: "tab-1".into(),
1163                name: "Tab".into(),
1164                version: 2,
1165            },
1166            &s,
1167        )
1168        .unwrap();
1169        apply_event_to_wstore(
1170            &Event::ActiveTabChanged {
1171                workspace_id: "ws-1".into(),
1172                tab_id: Some("tab-1".into()),
1173                version: 3,
1174            },
1175            &s,
1176        )
1177        .unwrap();
1178        apply_event_to_wstore(
1179            &Event::ActiveTabChanged {
1180                workspace_id: "ws-1".into(),
1181                tab_id: None,
1182                version: 4,
1183            },
1184            &s,
1185        )
1186        .unwrap();
1187        let ws = s.get::<Workspace>("ws-1").unwrap().unwrap();
1188        assert_eq!(ws.activetabid, "");
1189    }
1190
1191    #[test]
1192    fn tab_created_provisions_layoutstate() {
1193        let s = store();
1194        apply_event_to_wstore(
1195            &Event::WorkspaceCreated {
1196                workspace_id: "ws-1".into(),
1197                name: "Alpha".into(),
1198                version: 1,
1199            },
1200            &s,
1201        )
1202        .unwrap();
1203        apply_event_to_wstore(
1204            &Event::TabCreated {
1205                workspace_id: "ws-1".into(),
1206                tab_id: "tab-1".into(),
1207                name: "Tab".into(),
1208                version: 2,
1209            },
1210            &s,
1211        )
1212        .unwrap();
1213        let tab = s.get::<Tab>("tab-1").unwrap().unwrap();
1214        // codex P2 #614: tab must reference a real LayoutState row,
1215        // not be left with empty layoutstate.
1216        assert!(!tab.layoutstate.is_empty());
1217        let layout = s
1218            .get::<PersistedLayoutState>(&tab.layoutstate)
1219            .unwrap()
1220            .expect("layout row must exist");
1221        assert_eq!(layout.oid, tab.layoutstate);
1222    }
1223
1224    #[test]
1225    fn workspace_deleted_cascades_pinned_tabs() {
1226        let s = store();
1227        // Build a workspace with a pinned tab via wcore (the bug
1228        // path: pinned tab created via wcore::create_tab_with_opts).
1229        let ws = wcore::create_workspace(&s, "Alpha").unwrap();
1230        let pinned_tab =
1231            wcore::create_tab_with_opts(&s, &ws.oid, "PinnedTab", true).unwrap();
1232        // Verify pinned tab is in pinnedtabids (sanity).
1233        let ws_loaded = s.get::<Workspace>(&ws.oid).unwrap().unwrap();
1234        assert!(ws_loaded.pinnedtabids.contains(&pinned_tab.oid));
1235        // Delete via the subscriber's WorkspaceDeleted handler.
1236        apply_event_to_wstore(
1237            &Event::WorkspaceDeleted {
1238                workspace_id: ws.oid.clone(),
1239                version: 1,
1240            },
1241            &s,
1242        )
1243        .unwrap();
1244        // codex P1 #614: pinned tab must be cascade-deleted, not
1245        // orphaned.
1246        assert!(s.get::<Tab>(&pinned_tab.oid).unwrap().is_none());
1247        assert!(s.get::<Workspace>(&ws.oid).unwrap().is_none());
1248    }
1249
1250    #[test]
1251    fn block_created_links_into_tab() {
1252        let s = store();
1253        apply_event_to_wstore(
1254            &Event::WorkspaceCreated {
1255                workspace_id: "ws-1".into(),
1256                name: "Alpha".into(),
1257                version: 1,
1258            },
1259            &s,
1260        )
1261        .unwrap();
1262        apply_event_to_wstore(
1263            &Event::TabCreated {
1264                workspace_id: "ws-1".into(),
1265                tab_id: "tab-1".into(),
1266                name: "Tab".into(),
1267                version: 2,
1268            },
1269            &s,
1270        )
1271        .unwrap();
1272        apply_event_to_wstore(
1273            &Event::BlockCreated { tab_id: "tab-1".into(), block_id: "block-1".into(), meta: serde_json::Value::Null, version: 3 },
1274            &s,
1275        )
1276        .unwrap();
1277        let tab = s.get::<Tab>("tab-1").unwrap().unwrap();
1278        assert_eq!(tab.blockids, vec!["block-1".to_string()]);
1279        let block = s.get::<Block>("block-1").unwrap().unwrap();
1280        assert_eq!(block.parentoref, "tab:tab-1");
1281    }
1282
1283    #[test]
1284    fn block_deleted_unlinks_from_tab() {
1285        let s = store();
1286        apply_event_to_wstore(
1287            &Event::WorkspaceCreated {
1288                workspace_id: "ws-1".into(),
1289                name: "Alpha".into(),
1290                version: 1,
1291            },
1292            &s,
1293        )
1294        .unwrap();
1295        apply_event_to_wstore(
1296            &Event::TabCreated {
1297                workspace_id: "ws-1".into(),
1298                tab_id: "tab-1".into(),
1299                name: "Tab".into(),
1300                version: 2,
1301            },
1302            &s,
1303        )
1304        .unwrap();
1305        apply_event_to_wstore(
1306            &Event::BlockCreated { tab_id: "tab-1".into(), block_id: "block-1".into(), meta: serde_json::Value::Null, version: 3 },
1307            &s,
1308        )
1309        .unwrap();
1310        apply_event_to_wstore(
1311            &Event::BlockDeleted {
1312                tab_id: "tab-1".into(),
1313                block_id: "block-1".into(),
1314                version: 4,
1315            },
1316            &s,
1317        )
1318        .unwrap();
1319        assert!(s.get::<Block>("block-1").unwrap().is_none());
1320        let tab = s.get::<Tab>("tab-1").unwrap().unwrap();
1321        assert!(tab.blockids.is_empty());
1322    }
1323
1324    /// codex P1 #620: a workspace with legacy `pinnedtabids` rows must
1325    /// have those drained the first time `TabsReorderedBulk` writes
1326    /// the workspace; otherwise the UI's
1327    /// `[...pinnedtabids, ...tabids]` combine duplicates the pinned
1328    /// tab IDs once they have been merged into the reducer's
1329    /// `tab_ids` at bootstrap.
1330    #[test]
1331    fn tabs_reordered_bulk_drains_legacy_pinned_tabids() {
1332        let s = store();
1333        let ws = wcore::create_workspace(&s, "Alpha").unwrap();
1334        let pinned_tab =
1335            wcore::create_tab_with_opts(&s, &ws.oid, "Pinned", true).unwrap();
1336        let regular_tab =
1337            wcore::create_tab_with_opts(&s, &ws.oid, "Regular", false).unwrap();
1338        // Sanity: pinned tab is in `pinnedtabids` on disk.
1339        let ws_before = s.get::<Workspace>(&ws.oid).unwrap().unwrap();
1340        assert!(ws_before.pinnedtabids.contains(&pinned_tab.oid));
1341
1342        // Reducer-driven bulk reorder treating the pinned tab as a
1343        // regular tab (mirrors what bootstrap-merge produces).
1344        apply_event_to_wstore(
1345            &Event::TabsReorderedBulk {
1346                workspace_id: ws.oid.clone(),
1347                tab_ids: vec![pinned_tab.oid.clone(), regular_tab.oid.clone()],
1348                version: ws_before.version as u64 + 1,
1349            },
1350            &s,
1351        )
1352        .unwrap();
1353        let ws_after = s.get::<Workspace>(&ws.oid).unwrap().unwrap();
1354        assert_eq!(
1355            ws_after.tabids,
1356            vec![pinned_tab.oid.clone(), regular_tab.oid.clone()]
1357        );
1358        assert!(
1359            ws_after.pinnedtabids.is_empty(),
1360            "pinnedtabids must be drained, was {:?}",
1361            ws_after.pinnedtabids
1362        );
1363    }
1364
1365    /// codex P2 #620: `merge_meta_patch` must honour the existing
1366    /// `section:*` clear-prefix semantics so `UpdateObjectMeta`
1367    /// behaviour stays the same after the reducer migration.
1368    #[test]
1369    fn meta_updated_clears_section_prefix() {
1370        let s = store();
1371        apply_event_to_wstore(
1372            &Event::TabCreated {
1373                workspace_id: "ws-1".into(),
1374                tab_id: "tab-1".into(),
1375                name: "Tab".into(),
1376                version: 1,
1377            },
1378            &s,
1379        )
1380        .unwrap();
1381        apply_event_to_wstore(
1382            &Event::WorkspaceCreated {
1383                workspace_id: "ws-1".into(),
1384                name: "Alpha".into(),
1385                version: 1,
1386            },
1387            &s,
1388        )
1389        .unwrap();
1390        // Seed the tab with grouped meta entries.
1391        let mut tab = s.get::<Tab>("tab-1").unwrap().unwrap();
1392        tab.meta
1393            .insert("term:fontsize".into(), serde_json::json!(14));
1394        tab.meta
1395            .insert("term:theme".into(), serde_json::json!("solarized"));
1396        tab.meta.insert("name".into(), serde_json::json!("keep"));
1397        s.update(&mut tab).unwrap();
1398        // Patch with `term:*` clear plus a single replacement key.
1399        apply_event_to_wstore(
1400            &Event::TabMetaUpdated {
1401                tab_id: "tab-1".into(),
1402                meta_patch: serde_json::json!({
1403                    "term:*": true,
1404                    "term:fontsize": 18,
1405                }),
1406                version: 2,
1407            },
1408            &s,
1409        )
1410        .unwrap();
1411        let after = s.get::<Tab>("tab-1").unwrap().unwrap();
1412        assert!(!after.meta.contains_key("term:theme"),
1413            "term:theme should be cleared by `term:*` patch");
1414        assert_eq!(
1415            after.meta.get("term:fontsize"),
1416            Some(&serde_json::json!(18)),
1417            "term:fontsize replacement must take effect after the section clear"
1418        );
1419        assert_eq!(after.meta.get("name"), Some(&serde_json::json!("keep")));
1420    }
1421
1422    // ---- Phase E.5.5 — TabMoved / BlockMoved subscriber tests ----
1423
1424    #[test]
1425    fn tab_moved_cross_workspace_rewrites_both_tabids() {
1426        let s = store();
1427        // Two workspaces, each pre-existing in SQLite.
1428        for (id, name) in &[("src-ws", "Src"), ("dst-ws", "Dst")] {
1429            apply_event_to_wstore(
1430                &Event::WorkspaceCreated {
1431                    workspace_id: id.to_string(),
1432                    name: name.to_string(),
1433                    version: 1,
1434                },
1435                &s,
1436            )
1437            .unwrap();
1438        }
1439        // Tab in src.
1440        apply_event_to_wstore(
1441            &Event::TabCreated {
1442                workspace_id: "src-ws".into(),
1443                tab_id: "tab-1".into(),
1444                name: "T".into(),
1445                version: 2,
1446            },
1447            &s,
1448        )
1449        .unwrap();
1450        let src = s.get::<Workspace>("src-ws").unwrap().unwrap();
1451        assert_eq!(src.tabids, vec!["tab-1".to_string()]);
1452
1453        // Move it. Set dst active to the moved tab (per reducer
1454        // semantics + codex P2 #621).
1455        apply_event_to_wstore(
1456            &Event::TabMoved {
1457                tab_id: "tab-1".into(),
1458                src_workspace_id: "src-ws".into(),
1459                dst_workspace_id: "dst-ws".into(),
1460                dst_index: 0,
1461                new_src_active_tab_id: None,
1462                new_dst_active_tab_id: Some("tab-1".into()),
1463                version: 3,
1464            },
1465            &s,
1466        )
1467        .unwrap();
1468        let src = s.get::<Workspace>("src-ws").unwrap().unwrap();
1469        let dst = s.get::<Workspace>("dst-ws").unwrap().unwrap();
1470        assert!(src.tabids.is_empty());
1471        assert_eq!(src.activetabid, "");
1472        assert_eq!(dst.tabids, vec!["tab-1".to_string()]);
1473        assert_eq!(dst.activetabid, "tab-1", "dst active should be the moved tab");
1474    }
1475
1476    #[test]
1477    fn tab_moved_idempotent_on_re_delivery() {
1478        let s = store();
1479        for (id, name) in &[("src-ws", "Src"), ("dst-ws", "Dst")] {
1480            apply_event_to_wstore(
1481                &Event::WorkspaceCreated {
1482                    workspace_id: id.to_string(),
1483                    name: name.to_string(),
1484                    version: 1,
1485                },
1486                &s,
1487            )
1488            .unwrap();
1489        }
1490        apply_event_to_wstore(
1491            &Event::TabCreated {
1492                workspace_id: "src-ws".into(),
1493                tab_id: "tab-1".into(),
1494                name: "T".into(),
1495                version: 2,
1496            },
1497            &s,
1498        )
1499        .unwrap();
1500        let ev = Event::TabMoved {
1501            tab_id: "tab-1".into(),
1502            src_workspace_id: "src-ws".into(),
1503            dst_workspace_id: "dst-ws".into(),
1504            dst_index: 0,
1505            new_src_active_tab_id: None,
1506            new_dst_active_tab_id: Some("tab-1".into()),
1507            version: 3,
1508        };
1509        apply_event_to_wstore(&ev, &s).unwrap();
1510        // Re-deliver the same event.
1511        apply_event_to_wstore(&ev, &s).unwrap();
1512        let dst = s.get::<Workspace>("dst-ws").unwrap().unwrap();
1513        // Still exactly one entry — no duplicate insert.
1514        assert_eq!(dst.tabids, vec!["tab-1".to_string()]);
1515    }
1516
1517    #[test]
1518    fn block_moved_cross_tab_updates_block_lists_and_parent() {
1519        let s = store();
1520        let ws = wcore::create_workspace(&s, "W").unwrap();
1521        let src_tab = wcore::create_tab_with_opts(&s, &ws.oid, "src", false).unwrap();
1522        let dst_tab = wcore::create_tab_with_opts(&s, &ws.oid, "dst", false).unwrap();
1523        // Create a block in src via the subscriber path.
1524        apply_event_to_wstore(
1525            &Event::BlockCreated {
1526                tab_id: src_tab.oid.clone(),
1527                block_id: "blk-1".into(),
1528                meta: serde_json::Value::Null,
1529                version: 1,
1530            },
1531            &s,
1532        )
1533        .unwrap();
1534        let src_before = s.get::<Tab>(&src_tab.oid).unwrap().unwrap();
1535        assert_eq!(src_before.blockids, vec!["blk-1".to_string()]);
1536
1537        apply_event_to_wstore(
1538            &Event::BlockMoved {
1539                block_id: "blk-1".into(),
1540                src_tab_id: src_tab.oid.clone(),
1541                dst_tab_id: dst_tab.oid.clone(),
1542                dst_index: 0,
1543                version: 2,
1544            },
1545            &s,
1546        )
1547        .unwrap();
1548        let src_after = s.get::<Tab>(&src_tab.oid).unwrap().unwrap();
1549        let dst_after = s.get::<Tab>(&dst_tab.oid).unwrap().unwrap();
1550        let block = s.get::<Block>("blk-1").unwrap().unwrap();
1551        assert!(src_after.blockids.is_empty());
1552        assert_eq!(dst_after.blockids, vec!["blk-1".to_string()]);
1553        assert_eq!(block.parentoref, format!("tab:{}", dst_tab.oid));
1554    }
1555
1556    #[test]
1557    fn block_moved_intra_tab_repositions() {
1558        let s = store();
1559        let ws = wcore::create_workspace(&s, "W").unwrap();
1560        let tab = wcore::create_tab_with_opts(&s, &ws.oid, "t", false).unwrap();
1561        for id in &["b1", "b2", "b3"] {
1562            apply_event_to_wstore(
1563                &Event::BlockCreated {
1564                    tab_id: tab.oid.clone(),
1565                    block_id: id.to_string(),
1566                    meta: serde_json::Value::Null,
1567                    version: 1,
1568                },
1569                &s,
1570            )
1571            .unwrap();
1572        }
1573        // Move b1 to position 2 (post-removal end).
1574        apply_event_to_wstore(
1575            &Event::BlockMoved {
1576                block_id: "b1".into(),
1577                src_tab_id: tab.oid.clone(),
1578                dst_tab_id: tab.oid.clone(),
1579                dst_index: 2,
1580                version: 2,
1581            },
1582            &s,
1583        )
1584        .unwrap();
1585        let tab_after = s.get::<Tab>(&tab.oid).unwrap().unwrap();
1586        assert_eq!(tab_after.blockids, vec!["b2".to_string(), "b3".to_string(), "b1".to_string()]);
1587    }
1588}