agentmux_srv\server/
wave_obj_bridge.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WaveObjUpdate broadcast bridge.
5//!
6//! Subscribes to `srv_events_tx` (the internal sidecar event bus that the
7//! reducer publishes mutations to) and translates each event into one or
8//! more `WaveObjUpdate` records, broadcast to all connected WS clients via
9//! the existing `event_bus.broadcast_event(...)` plumbing — the same path
10//! that `service.rs:39-52`'s response-broadcast loop uses.
11//!
12//! Why this exists: per-RPC handlers were responsible for attaching
13//! `WaveObjUpdate`s to their responses (`success_with_updates(...)`).
14//! Forgetting that call left the frontend WOS cache stale (e.g. workspace
15//! renames not propagating to the OS title or the InstancePanel — see
16//! `docs/specs/SPEC_REACTIVE_WORKSPACE_SYNC_2026-05-14.md`).
17//!
18//! With this bridge in place, any reducer event automatically reaches the
19//! frontend, so the per-handler convention becomes belt-and-suspenders
20//! instead of load-bearing.
21//!
22//! Spec: `docs/specs/SPEC_OBJ_UPDATE_BRIDGE_2026-05-14.md`.
23//!
24//! Phase 1 scope (this implementation): workspace events only —
25//! immediately fixes the user-reported bug. Phase 2 expands to tabs /
26//! blocks / windows / layouts; Phase 3 retires the per-handler
27//! `success_with_updates(...)` calls now that the bridge covers them.
28
29use std::sync::Arc;
30
31use agentmux_common::ipc::Event;
32use tokio::sync::broadcast;
33
34use crate::backend::eventbus::{EventBus, WSEventType};
35use crate::backend::obj::{
36    wave_obj_to_value, Client, Tab, WaveObj, OTYPE_BLOCK, OTYPE_CLIENT, OTYPE_LAYOUT, OTYPE_TAB,
37    OTYPE_WINDOW, OTYPE_WORKSPACE,
38};
39use crate::backend::storage::wstore::WaveStore;
40
41/// JSON shape that gets broadcast as the `data` payload of a
42/// `waveobj:update` WS event. Matches the shape of `WaveObjUpdate` in
43/// `agentmux-srv/src/backend/obj.rs:465-474` so the frontend's existing
44/// `updateWaveObject` handler accepts it without changes.
45fn build_update_payload(
46    updatetype: &str,
47    otype: &str,
48    oid: &str,
49    obj: Option<serde_json::Value>,
50) -> serde_json::Value {
51    let mut map = serde_json::Map::with_capacity(4);
52    map.insert("updatetype".into(), serde_json::Value::String(updatetype.into()));
53    map.insert("otype".into(), serde_json::Value::String(otype.into()));
54    map.insert("oid".into(), serde_json::Value::String(oid.into()));
55    if let Some(o) = obj {
56        map.insert("obj".into(), o);
57    }
58    serde_json::Value::Object(map)
59}
60
61/// Push one `WaveObjUpdate` payload to all connected WS clients via the
62/// shared event_bus. Mirrors the response-broadcast loop in
63/// `service.rs:39-52`.
64fn emit(event_bus: &EventBus, otype: &str, oid: &str, payload: serde_json::Value) {
65    let oref = format!("{otype}:{oid}");
66    event_bus.broadcast_event(&WSEventType {
67        eventtype: "waveobj:update".to_string(),
68        oref,
69        data: Some(payload),
70    });
71}
72
73/// Fetch one WaveObj by oid and broadcast it as a `waveobj:update`. The
74/// SQLite read is offloaded to the blocking thread pool per ReAgent P1
75/// on PR #852 (WaveStore is `std::sync::Mutex<Connection>`; brief in
76/// steady state but a long reducer transaction would block the tokio
77/// worker thread). Silently logs + skips on missing/error to satisfy
78/// the §8.15 idempotency contract — duplicate or stale events fold to
79/// no-op.
80async fn emit_fetched<T: WaveObj + Send + 'static>(
81    wstore: &Arc<WaveStore>,
82    event_bus: &Arc<EventBus>,
83    otype: &'static str,
84    oid: String,
85    context: &'static str,
86) {
87    let id = oid.clone();
88    let store = Arc::clone(wstore);
89    let result = tokio::task::spawn_blocking(move || store.get::<T>(&id)).await;
90    match result {
91        Ok(Ok(Some(obj))) => {
92            let payload = build_update_payload(
93                "update",
94                otype,
95                &oid,
96                Some(wave_obj_to_value(&obj)),
97            );
98            emit(event_bus, otype, &oid, payload);
99        }
100        Ok(Ok(None)) => {
101            tracing::warn!(
102                target: "wave-obj-bridge",
103                oid = %oid, otype = otype, ctx = context,
104                "object not found in wstore; skipping broadcast"
105            );
106        }
107        Ok(Err(e)) => {
108            tracing::error!(
109                target: "wave-obj-bridge",
110                oid = %oid, otype = otype, ctx = context, error = %e,
111                "wstore.get failed; skipping broadcast"
112            );
113        }
114        Err(join_err) => {
115            tracing::error!(
116                target: "wave-obj-bridge",
117                oid = %oid, otype = otype, ctx = context, error = %join_err,
118                "spawn_blocking join failed (likely panicked); skipping broadcast"
119            );
120        }
121    }
122}
123
124/// Broadcast a "delete" `waveobj:update` for the given oid. No fetch
125/// needed — the frontend's `updateWaveObject` (`wos.ts:263-265`) handles
126/// the delete arm with just the oid.
127fn emit_delete(event_bus: &EventBus, otype: &'static str, oid: &str) {
128    let payload = build_update_payload("delete", otype, oid, None);
129    emit(event_bus, otype, oid, payload);
130}
131
132/// Broadcast the singleton `Client` WaveObj. SrvWindowOpened /
133/// SrvWindowClosed mutate `Client.windowids` (per
134/// `apply_srv_window_opened` in persist_subscriber.rs:518) so renderers
135/// holding a pinned Client need to see the new windowids list — without
136/// this broadcast they'd render stale window membership until reload.
137/// Codex P2 on PR #861.
138///
139/// Client is a singleton — the first `get_all::<Client>()` row is THE
140/// client. Same lookup pattern persist_subscriber uses.
141async fn emit_client_singleton(
142    wstore: &Arc<WaveStore>,
143    event_bus: &Arc<EventBus>,
144    context: &'static str,
145) {
146    let store = Arc::clone(wstore);
147    let result = tokio::task::spawn_blocking(move || store.get_all::<Client>()).await;
148    match result {
149        Ok(Ok(clients)) => {
150            if let Some(client) = clients.into_iter().next() {
151                let oid = client.oid.clone();
152                let payload = build_update_payload(
153                    "update",
154                    OTYPE_CLIENT,
155                    &oid,
156                    Some(wave_obj_to_value(&client)),
157                );
158                emit(event_bus, OTYPE_CLIENT, &oid, payload);
159            } else {
160                tracing::warn!(
161                    target: "wave-obj-bridge",
162                    ctx = context,
163                    "no Client row in wstore; skipping Client broadcast"
164                );
165            }
166        }
167        Ok(Err(e)) => {
168            tracing::error!(
169                target: "wave-obj-bridge",
170                ctx = context, error = %e,
171                "wstore.get_all::<Client> failed; skipping broadcast"
172            );
173        }
174        Err(je) => {
175            tracing::error!(
176                target: "wave-obj-bridge",
177                ctx = context, error = %je,
178                "spawn_blocking join failed during Client lookup"
179            );
180        }
181    }
182}
183
184/// Layout events all reference a `tab_id`; the affected WaveObj is the
185/// `LayoutState` referenced by the tab's `layoutstate` field. Two
186/// SQLite reads chained inside one `spawn_blocking` to keep the lock
187/// hold short.
188async fn emit_layout_for_tab(
189    wstore: &Arc<WaveStore>,
190    event_bus: &Arc<EventBus>,
191    tab_id: String,
192    context: &'static str,
193) {
194    use crate::backend::obj::LayoutState;
195    let id_for_log = tab_id.clone();
196    let store = Arc::clone(wstore);
197    let result = tokio::task::spawn_blocking(move || -> Result<Option<LayoutState>, _> {
198        match store.get::<Tab>(&tab_id) {
199            Ok(Some(tab)) => {
200                if tab.layoutstate.is_empty() {
201                    Ok(None)
202                } else {
203                    store.get::<LayoutState>(&tab.layoutstate)
204                }
205            }
206            Ok(None) => Ok(None),
207            Err(e) => Err(e),
208        }
209    })
210    .await;
211    match result {
212        Ok(Ok(Some(layout))) => {
213            let layout_id = layout.oid.clone();
214            let payload = build_update_payload(
215                "update",
216                OTYPE_LAYOUT,
217                &layout_id,
218                Some(wave_obj_to_value(&layout)),
219            );
220            emit(event_bus, OTYPE_LAYOUT, &layout_id, payload);
221        }
222        Ok(Ok(None)) => {
223            tracing::warn!(
224                target: "wave-obj-bridge",
225                tab_id = %id_for_log, ctx = context,
226                "layout event but tab/layoutstate not found; skipping broadcast"
227            );
228        }
229        Ok(Err(e)) => {
230            tracing::error!(
231                target: "wave-obj-bridge",
232                tab_id = %id_for_log, ctx = context, error = %e,
233                "wstore.get failed during layout resolution; skipping broadcast"
234            );
235        }
236        Err(join_err) => {
237            tracing::error!(
238                target: "wave-obj-bridge",
239                tab_id = %id_for_log, ctx = context, error = %join_err,
240                "spawn_blocking join failed during layout resolution"
241            );
242        }
243    }
244}
245
246/// Translate one reducer event into zero or more `waveobj:update` broadcasts.
247///
248/// **Read source — post-event state guarantee:**
249/// For events emitted via the HTTP `service.rs` RPC handlers,
250/// `apply_event_to_wstore` is called synchronously (`service.rs:1297-1304`
251/// for workspace; equivalent path for tab/block/window/layout commands)
252/// before `publish_events` (`service.rs:1305`). So when the bridge
253/// receives such an event, SQLite is already up-to-date.
254///
255/// **IPC-path caveat:** the launcher → IPC path in `srv_ipc/server.rs:295`
256/// dispatches reducer events directly without first calling
257/// `apply_event_to_wstore`; the persist subscriber and bridge then race.
258/// At time of writing none of the events the bridge handles are emitted
259/// via that path (verified for `Command::UpdateWindowMeta` and the
260/// workspace family). When that changes, options are: (a) make the IPC
261/// path apply synchronously like HTTP does, or (b) read from the
262/// in-memory `srv_state` reducer rather than SQLite. Tracked in
263/// `SPEC_OBJ_UPDATE_BRIDGE §11.1`.
264///
265/// **Lock discipline (per ReAgent P1 on PR #852):** every `wstore.get<T>()`
266/// is wrapped in `tokio::task::spawn_blocking` via the helpers above so
267/// the async runtime stays responsive even under reducer-transaction
268/// contention.
269///
270/// **Coverage:** Phase 1 + 2 covers workspace, window, tab, block, layout
271/// events. Saga events, OS facts, launcher-domain events all fall through
272/// to the catch-all `_ => {}` arm.
273async fn dispatch_event(event: Event, wstore: Arc<WaveStore>, event_bus: Arc<EventBus>) {
274    use crate::backend::obj::{Block, Window, Workspace};
275
276    match event {
277        // ----- Workspace -----
278        Event::WorkspaceRenamed { workspace_id, .. }
279        | Event::WorkspaceMetaUpdated { workspace_id, .. }
280        | Event::WorkspaceCreated { workspace_id, .. } => {
281            emit_fetched::<Workspace>(
282                &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "Workspace*",
283            )
284            .await;
285        }
286        Event::WorkspaceDeleted { workspace_id, .. } => {
287            emit_delete(&event_bus, OTYPE_WORKSPACE, &workspace_id);
288        }
289
290        // ----- Window (Phase 2 + #855) -----
291        Event::WindowMetaUpdated { window_id, .. } => {
292            emit_fetched::<Window>(
293                &wstore, &event_bus, OTYPE_WINDOW, window_id, "WindowMetaUpdated",
294            )
295            .await;
296        }
297        Event::SrvWindowWorkspaceChanged { window_id, .. } => {
298            emit_fetched::<Window>(
299                &wstore, &event_bus, OTYPE_WINDOW, window_id, "SrvWindowWorkspaceChanged",
300            )
301            .await;
302        }
303        // SrvWindowOpened/Closed: the persist path
304        // (apply_srv_window_opened / apply_srv_window_closed) ALSO
305        // mutates Client.windowids inside the same transaction, so
306        // renderers with a pinned Client need to see the updated
307        // singleton too — otherwise their window list lags behind
308        // until reload. (Codex P2 on PR #861.)
309        Event::SrvWindowOpened { window_id, .. } => {
310            emit_fetched::<Window>(
311                &wstore, &event_bus, OTYPE_WINDOW, window_id, "SrvWindowOpened",
312            )
313            .await;
314            emit_client_singleton(&wstore, &event_bus, "SrvWindowOpened").await;
315        }
316        Event::SrvWindowClosed { window_id, .. } => {
317            emit_delete(&event_bus, OTYPE_WINDOW, &window_id);
318            emit_client_singleton(&wstore, &event_bus, "SrvWindowClosed").await;
319        }
320
321        // ----- Tab (Phase 2) -----
322        // TabCreated also touches the parent workspace's tab_ids field
323        // (reducer mutates both in one dispatch). Broadcast both so the
324        // frontend WOS sees the new Tab AND the updated parent ordering.
325        Event::TabCreated {
326            workspace_id,
327            tab_id,
328            ..
329        } => {
330            emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "TabCreated").await;
331            emit_fetched::<Workspace>(
332                &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "TabCreated parent",
333            )
334            .await;
335        }
336        Event::TabDeleted {
337            workspace_id,
338            tab_id,
339            ..
340        } => {
341            emit_delete(&event_bus, OTYPE_TAB, &tab_id);
342            emit_fetched::<Workspace>(
343                &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "TabDeleted parent",
344            )
345            .await;
346        }
347        Event::TabRenamed { tab_id, .. } | Event::TabMetaUpdated { tab_id, .. } => {
348            emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "Tab*").await;
349        }
350        Event::ActiveTabChanged { workspace_id, .. }
351        | Event::TabReordered { workspace_id, .. }
352        | Event::TabsReorderedBulk { workspace_id, .. } => {
353            emit_fetched::<Workspace>(
354                &wstore,
355                &event_bus,
356                OTYPE_WORKSPACE,
357                workspace_id,
358                "ActiveTab/Reorder",
359            )
360            .await;
361        }
362        Event::TabMoved {
363            tab_id,
364            src_workspace_id,
365            dst_workspace_id,
366            ..
367        } => {
368            emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "TabMoved").await;
369            emit_fetched::<Workspace>(
370                &wstore,
371                &event_bus,
372                OTYPE_WORKSPACE,
373                src_workspace_id,
374                "TabMoved src",
375            )
376            .await;
377            emit_fetched::<Workspace>(
378                &wstore,
379                &event_bus,
380                OTYPE_WORKSPACE,
381                dst_workspace_id,
382                "TabMoved dst",
383            )
384            .await;
385        }
386
387        // ----- Block (Phase 2) -----
388        // BlockCreated/BlockDeleted touch the parent tab's blockids field.
389        Event::BlockCreated {
390            tab_id, block_id, ..
391        } => {
392            emit_fetched::<Block>(
393                &wstore, &event_bus, OTYPE_BLOCK, block_id, "BlockCreated",
394            )
395            .await;
396            emit_fetched::<Tab>(
397                &wstore, &event_bus, OTYPE_TAB, tab_id, "BlockCreated parent",
398            )
399            .await;
400        }
401        Event::BlockDeleted {
402            tab_id, block_id, ..
403        } => {
404            emit_delete(&event_bus, OTYPE_BLOCK, &block_id);
405            emit_fetched::<Tab>(
406                &wstore, &event_bus, OTYPE_TAB, tab_id, "BlockDeleted parent",
407            )
408            .await;
409        }
410        Event::BlockMetaUpdated { block_id, .. } => {
411            emit_fetched::<Block>(
412                &wstore,
413                &event_bus,
414                OTYPE_BLOCK,
415                block_id,
416                "BlockMetaUpdated",
417            )
418            .await;
419        }
420        Event::BlockMoved {
421            block_id,
422            src_tab_id,
423            dst_tab_id,
424            ..
425        } => {
426            emit_fetched::<Block>(&wstore, &event_bus, OTYPE_BLOCK, block_id, "BlockMoved").await;
427            emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, src_tab_id, "BlockMoved src")
428                .await;
429            emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, dst_tab_id, "BlockMoved dst")
430                .await;
431        }
432
433        // ----- Layout (Phase 2 — partial) -----
434        // ONLY the focused/magnified events are persisted by
435        // `apply_event_to_wstore` (persist_subscriber.rs:289-292).
436        // The other 11 layout-tree events (Insert/Delete/Move/Swap/
437        // Resize/Replace/Split*/Clear/TreeReplaced) are persisted via
438        // wcore-direct in their RPC handlers — they DON'T appear in
439        // `apply_event_to_wstore`. If the bridge tried to broadcast
440        // LayoutState for those, `wstore.get<LayoutState>` could return
441        // pre-event tree state (subscriber and bridge race; only this
442        // one is the unsafe direction). Tracked as a follow-up issue
443        // for the layout-tree-event persistence migration. Until then,
444        // those handlers' existing `success_with_updates(...)` response
445        // broadcasts cover the frontend (Codex P2 on PR #861).
446        Event::FocusedNodeChanged { tab_id, .. }
447        | Event::MagnifiedNodeChanged { tab_id, .. } => {
448            emit_layout_for_tab(&wstore, &event_bus, tab_id, "Focused/MagnifiedNodeChanged").await;
449        }
450
451        // Saga lifecycle, launcher-domain events, OS facts, etc. — not
452        // WaveObj changes. The catch-all keeps the bridge future-proof
453        // for new event variants the reducer may add.
454        _ => {}
455    }
456}
457
458/// Spawn the bridge task. Returns the `JoinHandle` so callers can keep it
459/// alive (typically forever — the task lives for the lifetime of the srv
460/// process). Per ReAgent P1 on PR #852: the loop is panic-resilient — a
461/// panic inside `dispatch_event` is caught and logged, and the loop
462/// continues processing subsequent events. Without this, a single
463/// malformed event could silently kill the entire bridge task and
464/// frontend WOS would stop seeing updates.
465///
466/// Subscribe ordering: per `SPEC §11.1` the bridge can subscribe in any
467/// order relative to the persist subscriber. For Phase 1's workspace
468/// events the HTTP RPC handler applies SQLite synchronously before
469/// publishing the event, so the bridge always sees post-event state.
470pub fn spawn_wave_obj_bridge(
471    events_rx: broadcast::Receiver<Event>,
472    wstore: Arc<WaveStore>,
473    event_bus: Arc<EventBus>,
474) -> tokio::task::JoinHandle<()> {
475    tokio::spawn(run_wave_obj_bridge(events_rx, wstore, event_bus))
476}
477
478async fn run_wave_obj_bridge(
479    mut events_rx: broadcast::Receiver<Event>,
480    wstore: Arc<WaveStore>,
481    event_bus: Arc<EventBus>,
482) {
483    tracing::info!(target: "wave-obj-bridge", "[wave-obj-bridge] started (Phase 1: workspace events)");
484    loop {
485        match events_rx.recv().await {
486            Ok(event) => {
487                // Per-event panic isolation (ReAgent P1 on PR #852): use
488                // FuturesUnordered with a catch_unwind future would be the
489                // textbook fix, but for a single event-at-a-time loop the
490                // simpler pattern is to spawn the dispatch as its own task
491                // and observe the JoinError if it panics. We `await` it
492                // immediately so events still process serially (matching
493                // the broadcast channel's send order), but a panic in one
494                // event can't kill the bridge.
495                let store = Arc::clone(&wstore);
496                let bus = Arc::clone(&event_bus);
497                let event_dbg = format!("{:?}", &event);
498                let join = tokio::spawn(dispatch_event(event, store, bus)).await;
499                if let Err(join_err) = join {
500                    if join_err.is_panic() {
501                        tracing::error!(
502                            target: "wave-obj-bridge",
503                            event = %event_dbg,
504                            "dispatch_event panicked; bridge continues with next event. Panic: {}",
505                            join_err,
506                        );
507                    } else {
508                        tracing::error!(
509                            target: "wave-obj-bridge",
510                            event = %event_dbg,
511                            error = %join_err,
512                            "dispatch_event task aborted unexpectedly"
513                        );
514                    }
515                }
516            }
517            Err(broadcast::error::RecvError::Lagged(n)) => {
518                // The broadcast channel has 1024 capacity (main.rs:624).
519                // If we lag, frontend WOS state diverges silently — log it
520                // loudly so operators can correlate with user-visible drift
521                // (e.g. the InstancePanel/title showing stale names).
522                // No automatic recovery; the next event resyncs the affected
523                // object and frontend reads everything else from its cache.
524                tracing::error!(
525                    target: "wave-obj-bridge",
526                    skipped = n,
527                    "broadcast channel lagged; some waveobj:update events were dropped — frontend WOS may show stale state until the affected object is mutated again"
528                );
529            }
530            Err(broadcast::error::RecvError::Closed) => {
531                tracing::info!(target: "wave-obj-bridge", "events channel closed; bridge exiting");
532                return;
533            }
534        }
535    }
536}