agentmux_srv\server/wave_obj_bridge.rs
1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WaveObjUpdate broadcast bridge.
5//!
6//! Subscribes to `srv_events_tx` (the internal sidecar event bus that the
7//! reducer publishes mutations to) and translates each event into one or
8//! more `WaveObjUpdate` records, broadcast to all connected WS clients via
9//! the existing `event_bus.broadcast_event(...)` plumbing — the same path
10//! that `service.rs:39-52`'s response-broadcast loop uses.
11//!
12//! Why this exists: per-RPC handlers were responsible for attaching
13//! `WaveObjUpdate`s to their responses (`success_with_updates(...)`).
14//! Forgetting that call left the frontend WOS cache stale (e.g. workspace
15//! renames not propagating to the OS title or the InstancePanel — see
16//! `docs/specs/SPEC_REACTIVE_WORKSPACE_SYNC_2026-05-14.md`).
17//!
18//! With this bridge in place, any reducer event automatically reaches the
19//! frontend, so the per-handler convention becomes belt-and-suspenders
20//! instead of load-bearing.
21//!
22//! Spec: `docs/specs/SPEC_OBJ_UPDATE_BRIDGE_2026-05-14.md`.
23//!
24//! Phase 1 scope (this implementation): workspace events only —
25//! immediately fixes the user-reported bug. Phase 2 expands to tabs /
26//! blocks / windows / layouts; Phase 3 retires the per-handler
27//! `success_with_updates(...)` calls now that the bridge covers them.
28
29use std::sync::Arc;
30
31use agentmux_common::ipc::Event;
32use tokio::sync::broadcast;
33
34use crate::backend::eventbus::{EventBus, WSEventType};
35use crate::backend::obj::{
36 wave_obj_to_value, Client, Tab, WaveObj, OTYPE_BLOCK, OTYPE_CLIENT, OTYPE_LAYOUT, OTYPE_TAB,
37 OTYPE_WINDOW, OTYPE_WORKSPACE,
38};
39use crate::backend::storage::wstore::WaveStore;
40
41/// JSON shape that gets broadcast as the `data` payload of a
42/// `waveobj:update` WS event. Matches the shape of `WaveObjUpdate` in
43/// `agentmux-srv/src/backend/obj.rs:465-474` so the frontend's existing
44/// `updateWaveObject` handler accepts it without changes.
45fn build_update_payload(
46 updatetype: &str,
47 otype: &str,
48 oid: &str,
49 obj: Option<serde_json::Value>,
50) -> serde_json::Value {
51 let mut map = serde_json::Map::with_capacity(4);
52 map.insert("updatetype".into(), serde_json::Value::String(updatetype.into()));
53 map.insert("otype".into(), serde_json::Value::String(otype.into()));
54 map.insert("oid".into(), serde_json::Value::String(oid.into()));
55 if let Some(o) = obj {
56 map.insert("obj".into(), o);
57 }
58 serde_json::Value::Object(map)
59}
60
61/// Push one `WaveObjUpdate` payload to all connected WS clients via the
62/// shared event_bus. Mirrors the response-broadcast loop in
63/// `service.rs:39-52`.
64fn emit(event_bus: &EventBus, otype: &str, oid: &str, payload: serde_json::Value) {
65 let oref = format!("{otype}:{oid}");
66 event_bus.broadcast_event(&WSEventType {
67 eventtype: "waveobj:update".to_string(),
68 oref,
69 data: Some(payload),
70 });
71}
72
73/// Fetch one WaveObj by oid and broadcast it as a `waveobj:update`. The
74/// SQLite read is offloaded to the blocking thread pool per ReAgent P1
75/// on PR #852 (WaveStore is `std::sync::Mutex<Connection>`; brief in
76/// steady state but a long reducer transaction would block the tokio
77/// worker thread). Silently logs + skips on missing/error to satisfy
78/// the §8.15 idempotency contract — duplicate or stale events fold to
79/// no-op.
80async fn emit_fetched<T: WaveObj + Send + 'static>(
81 wstore: &Arc<WaveStore>,
82 event_bus: &Arc<EventBus>,
83 otype: &'static str,
84 oid: String,
85 context: &'static str,
86) {
87 let id = oid.clone();
88 let store = Arc::clone(wstore);
89 let result = tokio::task::spawn_blocking(move || store.get::<T>(&id)).await;
90 match result {
91 Ok(Ok(Some(obj))) => {
92 let payload = build_update_payload(
93 "update",
94 otype,
95 &oid,
96 Some(wave_obj_to_value(&obj)),
97 );
98 emit(event_bus, otype, &oid, payload);
99 }
100 Ok(Ok(None)) => {
101 tracing::warn!(
102 target: "wave-obj-bridge",
103 oid = %oid, otype = otype, ctx = context,
104 "object not found in wstore; skipping broadcast"
105 );
106 }
107 Ok(Err(e)) => {
108 tracing::error!(
109 target: "wave-obj-bridge",
110 oid = %oid, otype = otype, ctx = context, error = %e,
111 "wstore.get failed; skipping broadcast"
112 );
113 }
114 Err(join_err) => {
115 tracing::error!(
116 target: "wave-obj-bridge",
117 oid = %oid, otype = otype, ctx = context, error = %join_err,
118 "spawn_blocking join failed (likely panicked); skipping broadcast"
119 );
120 }
121 }
122}
123
124/// Broadcast a "delete" `waveobj:update` for the given oid. No fetch
125/// needed — the frontend's `updateWaveObject` (`wos.ts:263-265`) handles
126/// the delete arm with just the oid.
127fn emit_delete(event_bus: &EventBus, otype: &'static str, oid: &str) {
128 let payload = build_update_payload("delete", otype, oid, None);
129 emit(event_bus, otype, oid, payload);
130}
131
132/// Broadcast the singleton `Client` WaveObj. SrvWindowOpened /
133/// SrvWindowClosed mutate `Client.windowids` (per
134/// `apply_srv_window_opened` in persist_subscriber.rs:518) so renderers
135/// holding a pinned Client need to see the new windowids list — without
136/// this broadcast they'd render stale window membership until reload.
137/// Codex P2 on PR #861.
138///
139/// Client is a singleton — the first `get_all::<Client>()` row is THE
140/// client. Same lookup pattern persist_subscriber uses.
141async fn emit_client_singleton(
142 wstore: &Arc<WaveStore>,
143 event_bus: &Arc<EventBus>,
144 context: &'static str,
145) {
146 let store = Arc::clone(wstore);
147 let result = tokio::task::spawn_blocking(move || store.get_all::<Client>()).await;
148 match result {
149 Ok(Ok(clients)) => {
150 if let Some(client) = clients.into_iter().next() {
151 let oid = client.oid.clone();
152 let payload = build_update_payload(
153 "update",
154 OTYPE_CLIENT,
155 &oid,
156 Some(wave_obj_to_value(&client)),
157 );
158 emit(event_bus, OTYPE_CLIENT, &oid, payload);
159 } else {
160 tracing::warn!(
161 target: "wave-obj-bridge",
162 ctx = context,
163 "no Client row in wstore; skipping Client broadcast"
164 );
165 }
166 }
167 Ok(Err(e)) => {
168 tracing::error!(
169 target: "wave-obj-bridge",
170 ctx = context, error = %e,
171 "wstore.get_all::<Client> failed; skipping broadcast"
172 );
173 }
174 Err(je) => {
175 tracing::error!(
176 target: "wave-obj-bridge",
177 ctx = context, error = %je,
178 "spawn_blocking join failed during Client lookup"
179 );
180 }
181 }
182}
183
184/// Layout events all reference a `tab_id`; the affected WaveObj is the
185/// `LayoutState` referenced by the tab's `layoutstate` field. Two
186/// SQLite reads chained inside one `spawn_blocking` to keep the lock
187/// hold short.
188async fn emit_layout_for_tab(
189 wstore: &Arc<WaveStore>,
190 event_bus: &Arc<EventBus>,
191 tab_id: String,
192 context: &'static str,
193) {
194 use crate::backend::obj::LayoutState;
195 let id_for_log = tab_id.clone();
196 let store = Arc::clone(wstore);
197 let result = tokio::task::spawn_blocking(move || -> Result<Option<LayoutState>, _> {
198 match store.get::<Tab>(&tab_id) {
199 Ok(Some(tab)) => {
200 if tab.layoutstate.is_empty() {
201 Ok(None)
202 } else {
203 store.get::<LayoutState>(&tab.layoutstate)
204 }
205 }
206 Ok(None) => Ok(None),
207 Err(e) => Err(e),
208 }
209 })
210 .await;
211 match result {
212 Ok(Ok(Some(layout))) => {
213 let layout_id = layout.oid.clone();
214 let payload = build_update_payload(
215 "update",
216 OTYPE_LAYOUT,
217 &layout_id,
218 Some(wave_obj_to_value(&layout)),
219 );
220 emit(event_bus, OTYPE_LAYOUT, &layout_id, payload);
221 }
222 Ok(Ok(None)) => {
223 tracing::warn!(
224 target: "wave-obj-bridge",
225 tab_id = %id_for_log, ctx = context,
226 "layout event but tab/layoutstate not found; skipping broadcast"
227 );
228 }
229 Ok(Err(e)) => {
230 tracing::error!(
231 target: "wave-obj-bridge",
232 tab_id = %id_for_log, ctx = context, error = %e,
233 "wstore.get failed during layout resolution; skipping broadcast"
234 );
235 }
236 Err(join_err) => {
237 tracing::error!(
238 target: "wave-obj-bridge",
239 tab_id = %id_for_log, ctx = context, error = %join_err,
240 "spawn_blocking join failed during layout resolution"
241 );
242 }
243 }
244}
245
246/// Translate one reducer event into zero or more `waveobj:update` broadcasts.
247///
248/// **Read source — post-event state guarantee:**
249/// For events emitted via the HTTP `service.rs` RPC handlers,
250/// `apply_event_to_wstore` is called synchronously (`service.rs:1297-1304`
251/// for workspace; equivalent path for tab/block/window/layout commands)
252/// before `publish_events` (`service.rs:1305`). So when the bridge
253/// receives such an event, SQLite is already up-to-date.
254///
255/// **IPC-path caveat:** the launcher → IPC path in `srv_ipc/server.rs:295`
256/// dispatches reducer events directly without first calling
257/// `apply_event_to_wstore`; the persist subscriber and bridge then race.
258/// At time of writing none of the events the bridge handles are emitted
259/// via that path (verified for `Command::UpdateWindowMeta` and the
260/// workspace family). When that changes, options are: (a) make the IPC
261/// path apply synchronously like HTTP does, or (b) read from the
262/// in-memory `srv_state` reducer rather than SQLite. Tracked in
263/// `SPEC_OBJ_UPDATE_BRIDGE §11.1`.
264///
265/// **Lock discipline (per ReAgent P1 on PR #852):** every `wstore.get<T>()`
266/// is wrapped in `tokio::task::spawn_blocking` via the helpers above so
267/// the async runtime stays responsive even under reducer-transaction
268/// contention.
269///
270/// **Coverage:** Phase 1 + 2 covers workspace, window, tab, block, layout
271/// events. Saga events, OS facts, launcher-domain events all fall through
272/// to the catch-all `_ => {}` arm.
273async fn dispatch_event(event: Event, wstore: Arc<WaveStore>, event_bus: Arc<EventBus>) {
274 use crate::backend::obj::{Block, Window, Workspace};
275
276 match event {
277 // ----- Workspace -----
278 Event::WorkspaceRenamed { workspace_id, .. }
279 | Event::WorkspaceMetaUpdated { workspace_id, .. }
280 | Event::WorkspaceCreated { workspace_id, .. } => {
281 emit_fetched::<Workspace>(
282 &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "Workspace*",
283 )
284 .await;
285 }
286 Event::WorkspaceDeleted { workspace_id, .. } => {
287 emit_delete(&event_bus, OTYPE_WORKSPACE, &workspace_id);
288 }
289
290 // ----- Window (Phase 2 + #855) -----
291 Event::WindowMetaUpdated { window_id, .. } => {
292 emit_fetched::<Window>(
293 &wstore, &event_bus, OTYPE_WINDOW, window_id, "WindowMetaUpdated",
294 )
295 .await;
296 }
297 Event::SrvWindowWorkspaceChanged { window_id, .. } => {
298 emit_fetched::<Window>(
299 &wstore, &event_bus, OTYPE_WINDOW, window_id, "SrvWindowWorkspaceChanged",
300 )
301 .await;
302 }
303 // SrvWindowOpened/Closed: the persist path
304 // (apply_srv_window_opened / apply_srv_window_closed) ALSO
305 // mutates Client.windowids inside the same transaction, so
306 // renderers with a pinned Client need to see the updated
307 // singleton too — otherwise their window list lags behind
308 // until reload. (Codex P2 on PR #861.)
309 Event::SrvWindowOpened { window_id, .. } => {
310 emit_fetched::<Window>(
311 &wstore, &event_bus, OTYPE_WINDOW, window_id, "SrvWindowOpened",
312 )
313 .await;
314 emit_client_singleton(&wstore, &event_bus, "SrvWindowOpened").await;
315 }
316 Event::SrvWindowClosed { window_id, .. } => {
317 emit_delete(&event_bus, OTYPE_WINDOW, &window_id);
318 emit_client_singleton(&wstore, &event_bus, "SrvWindowClosed").await;
319 }
320
321 // ----- Tab (Phase 2) -----
322 // TabCreated also touches the parent workspace's tab_ids field
323 // (reducer mutates both in one dispatch). Broadcast both so the
324 // frontend WOS sees the new Tab AND the updated parent ordering.
325 Event::TabCreated {
326 workspace_id,
327 tab_id,
328 ..
329 } => {
330 emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "TabCreated").await;
331 emit_fetched::<Workspace>(
332 &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "TabCreated parent",
333 )
334 .await;
335 }
336 Event::TabDeleted {
337 workspace_id,
338 tab_id,
339 ..
340 } => {
341 emit_delete(&event_bus, OTYPE_TAB, &tab_id);
342 emit_fetched::<Workspace>(
343 &wstore, &event_bus, OTYPE_WORKSPACE, workspace_id, "TabDeleted parent",
344 )
345 .await;
346 }
347 Event::TabRenamed { tab_id, .. } | Event::TabMetaUpdated { tab_id, .. } => {
348 emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "Tab*").await;
349 }
350 Event::ActiveTabChanged { workspace_id, .. }
351 | Event::TabReordered { workspace_id, .. }
352 | Event::TabsReorderedBulk { workspace_id, .. } => {
353 emit_fetched::<Workspace>(
354 &wstore,
355 &event_bus,
356 OTYPE_WORKSPACE,
357 workspace_id,
358 "ActiveTab/Reorder",
359 )
360 .await;
361 }
362 Event::TabMoved {
363 tab_id,
364 src_workspace_id,
365 dst_workspace_id,
366 ..
367 } => {
368 emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, tab_id, "TabMoved").await;
369 emit_fetched::<Workspace>(
370 &wstore,
371 &event_bus,
372 OTYPE_WORKSPACE,
373 src_workspace_id,
374 "TabMoved src",
375 )
376 .await;
377 emit_fetched::<Workspace>(
378 &wstore,
379 &event_bus,
380 OTYPE_WORKSPACE,
381 dst_workspace_id,
382 "TabMoved dst",
383 )
384 .await;
385 }
386
387 // ----- Block (Phase 2) -----
388 // BlockCreated/BlockDeleted touch the parent tab's blockids field.
389 Event::BlockCreated {
390 tab_id, block_id, ..
391 } => {
392 emit_fetched::<Block>(
393 &wstore, &event_bus, OTYPE_BLOCK, block_id, "BlockCreated",
394 )
395 .await;
396 emit_fetched::<Tab>(
397 &wstore, &event_bus, OTYPE_TAB, tab_id, "BlockCreated parent",
398 )
399 .await;
400 }
401 Event::BlockDeleted {
402 tab_id, block_id, ..
403 } => {
404 emit_delete(&event_bus, OTYPE_BLOCK, &block_id);
405 emit_fetched::<Tab>(
406 &wstore, &event_bus, OTYPE_TAB, tab_id, "BlockDeleted parent",
407 )
408 .await;
409 }
410 Event::BlockMetaUpdated { block_id, .. } => {
411 emit_fetched::<Block>(
412 &wstore,
413 &event_bus,
414 OTYPE_BLOCK,
415 block_id,
416 "BlockMetaUpdated",
417 )
418 .await;
419 }
420 Event::BlockMoved {
421 block_id,
422 src_tab_id,
423 dst_tab_id,
424 ..
425 } => {
426 emit_fetched::<Block>(&wstore, &event_bus, OTYPE_BLOCK, block_id, "BlockMoved").await;
427 emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, src_tab_id, "BlockMoved src")
428 .await;
429 emit_fetched::<Tab>(&wstore, &event_bus, OTYPE_TAB, dst_tab_id, "BlockMoved dst")
430 .await;
431 }
432
433 // ----- Layout (Phase 2 — partial) -----
434 // ONLY the focused/magnified events are persisted by
435 // `apply_event_to_wstore` (persist_subscriber.rs:289-292).
436 // The other 11 layout-tree events (Insert/Delete/Move/Swap/
437 // Resize/Replace/Split*/Clear/TreeReplaced) are persisted via
438 // wcore-direct in their RPC handlers — they DON'T appear in
439 // `apply_event_to_wstore`. If the bridge tried to broadcast
440 // LayoutState for those, `wstore.get<LayoutState>` could return
441 // pre-event tree state (subscriber and bridge race; only this
442 // one is the unsafe direction). Tracked as a follow-up issue
443 // for the layout-tree-event persistence migration. Until then,
444 // those handlers' existing `success_with_updates(...)` response
445 // broadcasts cover the frontend (Codex P2 on PR #861).
446 Event::FocusedNodeChanged { tab_id, .. }
447 | Event::MagnifiedNodeChanged { tab_id, .. } => {
448 emit_layout_for_tab(&wstore, &event_bus, tab_id, "Focused/MagnifiedNodeChanged").await;
449 }
450
451 // Saga lifecycle, launcher-domain events, OS facts, etc. — not
452 // WaveObj changes. The catch-all keeps the bridge future-proof
453 // for new event variants the reducer may add.
454 _ => {}
455 }
456}
457
458/// Spawn the bridge task. Returns the `JoinHandle` so callers can keep it
459/// alive (typically forever — the task lives for the lifetime of the srv
460/// process). Per ReAgent P1 on PR #852: the loop is panic-resilient — a
461/// panic inside `dispatch_event` is caught and logged, and the loop
462/// continues processing subsequent events. Without this, a single
463/// malformed event could silently kill the entire bridge task and
464/// frontend WOS would stop seeing updates.
465///
466/// Subscribe ordering: per `SPEC §11.1` the bridge can subscribe in any
467/// order relative to the persist subscriber. For Phase 1's workspace
468/// events the HTTP RPC handler applies SQLite synchronously before
469/// publishing the event, so the bridge always sees post-event state.
470pub fn spawn_wave_obj_bridge(
471 events_rx: broadcast::Receiver<Event>,
472 wstore: Arc<WaveStore>,
473 event_bus: Arc<EventBus>,
474) -> tokio::task::JoinHandle<()> {
475 tokio::spawn(run_wave_obj_bridge(events_rx, wstore, event_bus))
476}
477
478async fn run_wave_obj_bridge(
479 mut events_rx: broadcast::Receiver<Event>,
480 wstore: Arc<WaveStore>,
481 event_bus: Arc<EventBus>,
482) {
483 tracing::info!(target: "wave-obj-bridge", "[wave-obj-bridge] started (Phase 1: workspace events)");
484 loop {
485 match events_rx.recv().await {
486 Ok(event) => {
487 // Per-event panic isolation (ReAgent P1 on PR #852): use
488 // FuturesUnordered with a catch_unwind future would be the
489 // textbook fix, but for a single event-at-a-time loop the
490 // simpler pattern is to spawn the dispatch as its own task
491 // and observe the JoinError if it panics. We `await` it
492 // immediately so events still process serially (matching
493 // the broadcast channel's send order), but a panic in one
494 // event can't kill the bridge.
495 let store = Arc::clone(&wstore);
496 let bus = Arc::clone(&event_bus);
497 let event_dbg = format!("{:?}", &event);
498 let join = tokio::spawn(dispatch_event(event, store, bus)).await;
499 if let Err(join_err) = join {
500 if join_err.is_panic() {
501 tracing::error!(
502 target: "wave-obj-bridge",
503 event = %event_dbg,
504 "dispatch_event panicked; bridge continues with next event. Panic: {}",
505 join_err,
506 );
507 } else {
508 tracing::error!(
509 target: "wave-obj-bridge",
510 event = %event_dbg,
511 error = %join_err,
512 "dispatch_event task aborted unexpectedly"
513 );
514 }
515 }
516 }
517 Err(broadcast::error::RecvError::Lagged(n)) => {
518 // The broadcast channel has 1024 capacity (main.rs:624).
519 // If we lag, frontend WOS state diverges silently — log it
520 // loudly so operators can correlate with user-visible drift
521 // (e.g. the InstancePanel/title showing stale names).
522 // No automatic recovery; the next event resyncs the affected
523 // object and frontend reads everything else from its cache.
524 tracing::error!(
525 target: "wave-obj-bridge",
526 skipped = n,
527 "broadcast channel lagged; some waveobj:update events were dropped — frontend WOS may show stale state until the affected object is mutated again"
528 );
529 }
530 Err(broadcast::error::RecvError::Closed) => {
531 tracing::info!(target: "wave-obj-bridge", "events channel closed; bridge exiting");
532 return;
533 }
534 }
535 }
536}