agentmux_srv\backend/
wps.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wave Pub/Sub system: event brokering with scoped subscriptions.
5//! Port of Go's pkg/wps/wps.go + wpstypes.go.
6
7//!
8//! The Broker supports:
9//! - All-scope subscriptions (receive all events of a type)
10//! - Exact-scope subscriptions (e.g., "block:uuid")
11//! - Star-scope subscriptions (e.g., "block:*")
12//! - Event persistence (history/replay)
13
14use std::collections::{HashMap, HashSet};
15use std::sync::Mutex;
16
17use serde::{Deserialize, Serialize};
18
19// ---- Event type constants (match Go) ----
20
21#[allow(dead_code)]
22pub const EVENT_BLOCK_CLOSE: &str = "blockclose";
23#[allow(dead_code)]
24pub const EVENT_CONN_CHANGE: &str = "connchange";
25pub const EVENT_SYS_INFO: &str = "sysinfo";
26pub const EVENT_CONTROLLER_STATUS: &str = "controllerstatus";
27pub const EVENT_WAVE_OBJ_UPDATE: &str = "waveobj:update";
28pub const EVENT_BLOCK_FILE: &str = "blockfile";
29pub const EVENT_INSTALL_PROGRESS: &str = "install_progress";
30#[allow(dead_code)]
31pub const EVENT_CONFIG: &str = "config";
32#[allow(dead_code)]
33pub const EVENT_USER_INPUT: &str = "userinput";
34/// Fired by `SubprocessController::spawn_turn` when a user message is
35/// picked up (either direct-spawn or queue drain). Frontend uses this to
36/// promote pending `PendingMessage` entries into the conversation
37/// document. Payload: `{ block_id, message_id }`.
38pub const EVENT_AGENT_MESSAGE_ACCEPTED: &str = "agent-message-accepted";
39#[allow(dead_code)]
40pub const EVENT_ROUTE_GONE: &str = "route:gone";
41pub const EVENT_BLOCK_STATS: &str = "blockstats";
42pub const EVENT_AGENT_HEALTH: &str = "agenthealth";
43
44// File operation constants
45#[allow(dead_code)]
46pub const FILE_OP_CREATE: &str = "create";
47#[allow(dead_code)]
48pub const FILE_OP_DELETE: &str = "delete";
49pub const FILE_OP_APPEND: &str = "append";
50pub const FILE_OP_TRUNCATE: &str = "truncate";
51#[allow(dead_code)]
52pub const FILE_OP_INVALIDATE: &str = "invalidate";
53
54const MAX_PERSIST: usize = 4096;
55const REMAKE_ARR_THRESHOLD: usize = 10 * 1024;
56
57// ---- Types ----
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct WaveEvent {
61    pub event: String,
62    #[serde(skip_serializing_if = "Vec::is_empty", default)]
63    pub scopes: Vec<String>,
64    #[serde(skip_serializing_if = "String::is_empty", default)]
65    pub sender: String,
66    #[serde(skip_serializing_if = "is_zero", default)]
67    pub persist: usize,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub data: Option<serde_json::Value>,
70}
71
72fn is_zero(v: &usize) -> bool {
73    *v == 0
74}
75
76impl WaveEvent {
77    #[allow(dead_code)]
78    pub fn has_scope(&self, scope: &str) -> bool {
79        self.scopes.iter().any(|s| s == scope)
80    }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SubscriptionRequest {
85    pub event: String,
86    #[serde(skip_serializing_if = "Vec::is_empty", default)]
87    pub scopes: Vec<String>,
88    #[serde(default)]
89    pub allscopes: bool,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct WSFileEventData {
94    pub zoneid: String,
95    pub filename: String,
96    pub fileop: String,
97    #[serde(skip_serializing_if = "String::is_empty", default)]
98    pub data64: String,
99}
100
101// ---- Client trait ----
102
103/// Trait for event delivery to connected clients.
104pub trait WpsClient: Send + Sync {
105    fn send_event(&self, route_id: &str, event: WaveEvent);
106}
107
108// ---- Subscription internals ----
109
110#[derive(Default)]
111struct BrokerSubscription {
112    /// Route IDs subscribed to all scopes for this event.
113    all_subs: Vec<String>,
114    /// Exact scope → route IDs.
115    scope_subs: HashMap<String, Vec<String>>,
116    /// Star/wildcard scope → route IDs.
117    star_subs: HashMap<String, Vec<String>>,
118}
119
120impl BrokerSubscription {
121    fn is_empty(&self) -> bool {
122        self.all_subs.is_empty() && self.scope_subs.is_empty() && self.star_subs.is_empty()
123    }
124}
125
126#[derive(Hash, Eq, PartialEq, Clone)]
127struct PersistKey {
128    event: String,
129    scope: String,
130}
131
132struct PersistEventWrap {
133    arr_total_adds: usize,
134    events: Vec<WaveEvent>,
135}
136
137// ---- Broker ----
138
139/// The central pub/sub broker for WaveEvents.
140pub struct Broker {
141    inner: Mutex<BrokerInner>,
142}
143
144/// Tracks `(route_id, event_name, scope)` tuples whose persisted
145/// history has already been replayed to a given route. Skipping
146/// replay on resubscribe prevents the frontend's `eventsub`
147/// flushes (sent on every listener add/remove against the shared
148/// `ws-main` route) from re-emitting completed bash logs on every
149/// pane mount or tab switch. Codex P2 on PR #817.
150type ReplayKey = (String, String, String);
151
152struct BrokerInner {
153    client: Option<Box<dyn WpsClient>>,
154    sub_map: HashMap<String, BrokerSubscription>,
155    persist_map: HashMap<PersistKey, PersistEventWrap>,
156    replayed: HashSet<ReplayKey>,
157}
158
159impl Broker {
160    pub fn new() -> Self {
161        Self {
162            inner: Mutex::new(BrokerInner {
163                client: None,
164                sub_map: HashMap::new(),
165                persist_map: HashMap::new(),
166                replayed: HashSet::new(),
167            }),
168        }
169    }
170
171    pub fn set_client(&self, client: Box<dyn WpsClient>) {
172        let mut inner = self.inner.lock().unwrap();
173        inner.client = Some(client);
174    }
175
176    /// Subscribe a route to an event, optionally scoped.
177    ///
178    /// **Replay-on-subscribe**: after registering the route, immediately
179    /// deliver any persisted events that match the subscription. Lets
180    /// late subscribers catch up on the most recent state without
181    /// waiting for the next publish — closes the race for live-log
182    /// streaming where the frontend learns the tool_use_id only after
183    /// the wrapper has already finished publishing.
184    pub fn subscribe(&self, route_id: &str, sub: SubscriptionRequest) {
185        if sub.event.is_empty() {
186            return;
187        }
188        let mut inner = self.inner.lock().unwrap();
189        // Remove existing subscription first (re-subscribe)
190        Self::unsubscribe_nolock(&mut inner, route_id, &sub.event);
191
192        let bs = inner
193            .sub_map
194            .entry(sub.event.clone())
195            .or_default();
196
197        if sub.allscopes {
198            add_unique(&mut bs.all_subs, route_id);
199        } else {
200            for scope in &sub.scopes {
201                if scope_has_star(scope) {
202                    add_to_scope_map(&mut bs.star_subs, scope, route_id);
203                } else {
204                    add_to_scope_map(&mut bs.scope_subs, scope, route_id);
205                }
206            }
207        }
208
209        Self::replay_to_route(&mut inner, route_id, &sub);
210    }
211
212    /// Deliver any persisted events matching `sub` to `route_id`.
213    /// Called inside `subscribe` so replay happens atomically under
214    /// the broker lock — no live event published mid-replay can
215    /// interleave.
216    ///
217    /// **Once-per-(route, event, scope).** The frontend
218    /// (`frontend/app/store/wps.ts`) flushes `eventsub` for the
219    /// shared `ws-main` route on every listener add/remove. Replaying
220    /// persisted history on each of those flushes would re-emit
221    /// completed bash logs every pane mount / tab switch / sibling
222    /// subscription. The `replayed` set tracks tuples that already
223    /// received their backfill and short-circuits subsequent
224    /// resubscribes. Cleared per-route in `unsubscribe_all` so a true
225    /// reconnect (route dropped + re-registered) gets a fresh replay.
226    ///
227    /// Star-scope replay is intentionally not implemented (rare,
228    /// requires scanning every persist key; can add later if needed).
229    fn replay_to_route(
230        inner: &mut BrokerInner,
231        route_id: &str,
232        sub: &SubscriptionRequest,
233    ) {
234        let client = match &inner.client {
235            Some(c) => c,
236            None => return,
237        };
238
239        let mut scopes_to_deliver: Vec<String> = Vec::new();
240        if sub.allscopes {
241            // "" key holds the global history per persist_event's scope_set.
242            scopes_to_deliver.push(String::new());
243        } else {
244            for scope in &sub.scopes {
245                if !scope_has_star(scope) {
246                    scopes_to_deliver.push(scope.clone());
247                }
248            }
249        }
250
251        let mut to_send: Vec<WaveEvent> = Vec::new();
252        for scope in scopes_to_deliver {
253            let key = (route_id.to_string(), sub.event.clone(), scope.clone());
254            if inner.replayed.contains(&key) {
255                continue;
256            }
257            let pkey = PersistKey {
258                event: sub.event.clone(),
259                scope: scope.clone(),
260            };
261            if let Some(pe) = inner.persist_map.get(&pkey) {
262                for event in &pe.events {
263                    to_send.push(event.clone());
264                }
265            }
266            inner.replayed.insert(key);
267        }
268        for event in to_send {
269            client.send_event(route_id, event);
270        }
271    }
272
273    /// Unsubscribe a route from a specific event.
274    pub fn unsubscribe(&self, route_id: &str, event_name: &str) {
275        let mut inner = self.inner.lock().unwrap();
276        Self::unsubscribe_nolock(&mut inner, route_id, event_name);
277    }
278
279    /// Unsubscribe a route from all events.
280    ///
281    /// Also clears the `replayed` tracker for this route so a future
282    /// reconnect (route registers again from scratch) gets a fresh
283    /// replay of persisted history. Without this, a transient
284    /// WebSocket drop would silently lose all subsequent replay.
285    pub fn unsubscribe_all(&self, route_id: &str) {
286        let mut inner = self.inner.lock().unwrap();
287        let events: Vec<String> = inner.sub_map.keys().cloned().collect();
288        for event in events {
289            Self::unsubscribe_nolock(&mut inner, route_id, &event);
290        }
291        inner.replayed.retain(|(r, _, _)| r != route_id);
292    }
293
294    fn unsubscribe_nolock(inner: &mut BrokerInner, route_id: &str, event_name: &str) {
295        let bs = match inner.sub_map.get_mut(event_name) {
296            Some(bs) => bs,
297            None => return,
298        };
299        bs.all_subs.retain(|s| s != route_id);
300        remove_from_all_scopes(&mut bs.scope_subs, route_id);
301        remove_from_all_scopes(&mut bs.star_subs, route_id);
302        if bs.is_empty() {
303            inner.sub_map.remove(event_name);
304        }
305    }
306
307    /// Publish an event to all matching subscribers.
308    pub fn publish(&self, event: WaveEvent) {
309        let mut inner = self.inner.lock().unwrap();
310
311        // Persist if requested
312        if event.persist > 0 {
313            Self::persist_event(&mut inner, &event);
314        }
315
316        let client = match &inner.client {
317            Some(c) => c,
318            None => return,
319        };
320
321        let route_ids = Self::get_matching_routes(&inner, &event);
322        for route_id in route_ids {
323            client.send_event(&route_id, event.clone());
324        }
325    }
326
327    /// Read persisted event history.
328    pub fn read_event_history(
329        &self,
330        event_type: &str,
331        scope: &str,
332        max_items: usize,
333    ) -> Vec<WaveEvent> {
334        if max_items == 0 {
335            return Vec::new();
336        }
337        let inner = self.inner.lock().unwrap();
338        let key = PersistKey {
339            event: event_type.to_string(),
340            scope: scope.to_string(),
341        };
342        match inner.persist_map.get(&key) {
343            Some(pe) if !pe.events.is_empty() => {
344                let n = max_items.min(pe.events.len());
345                pe.events[pe.events.len() - n..].to_vec()
346            }
347            _ => Vec::new(),
348        }
349    }
350
351    fn persist_event(inner: &mut BrokerInner, event: &WaveEvent) {
352        let num_persist = event.persist.min(MAX_PERSIST);
353        let mut scope_set: Vec<String> = event.scopes.clone();
354        scope_set.push(String::new()); // "" scope for global persistence
355
356        for scope in scope_set {
357            let key = PersistKey {
358                event: event.event.clone(),
359                scope,
360            };
361            let pe = inner.persist_map.entry(key).or_insert_with(|| {
362                PersistEventWrap {
363                    arr_total_adds: 0,
364                    events: Vec::with_capacity(num_persist),
365                }
366            });
367            pe.events.push(event.clone());
368            pe.arr_total_adds += 1;
369            // Trim to max persist
370            if pe.events.len() > num_persist {
371                pe.events.drain(..pe.events.len() - num_persist);
372            }
373            // Compact if too many additions (reduce memory fragmentation)
374            if pe.arr_total_adds > REMAKE_ARR_THRESHOLD {
375                let compacted: Vec<WaveEvent> = pe.events.drain(..).collect();
376                pe.events = compacted;
377                pe.arr_total_adds = pe.events.len();
378            }
379        }
380    }
381
382    fn get_matching_routes(inner: &BrokerInner, event: &WaveEvent) -> Vec<String> {
383        let bs = match inner.sub_map.get(&event.event) {
384            Some(bs) => bs,
385            None => return Vec::new(),
386        };
387
388        let mut route_ids: HashMap<&str, ()> = HashMap::new();
389
390        // All-scope subscribers
391        for route_id in &bs.all_subs {
392            route_ids.insert(route_id, ());
393        }
394
395        // Exact-scope subscribers
396        for scope in &event.scopes {
397            if let Some(routes) = bs.scope_subs.get(scope) {
398                for route_id in routes {
399                    route_ids.insert(route_id, ());
400                }
401            }
402            // Star-scope subscribers
403            for (star_scope, routes) in &bs.star_subs {
404                if star_match(star_scope, scope, ":") {
405                    for route_id in routes {
406                        route_ids.insert(route_id, ());
407                    }
408                }
409            }
410        }
411
412        route_ids.keys().map(|s| s.to_string()).collect()
413    }
414}
415
416impl Default for Broker {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422// ---- Helpers ----
423
424fn scope_has_star(scope: &str) -> bool {
425    scope.split(':').any(|part| part == "*" || part == "**")
426}
427
428/// Simple star matching: each segment separated by `sep` is compared.
429/// "*" matches any single segment, "**" matches any remaining segments.
430fn star_match(pattern: &str, value: &str, sep: &str) -> bool {
431    let pat_parts: Vec<&str> = pattern.split(sep).collect();
432    let val_parts: Vec<&str> = value.split(sep).collect();
433
434    let mut pi = 0;
435    let mut vi = 0;
436    while pi < pat_parts.len() && vi < val_parts.len() {
437        if pat_parts[pi] == "**" {
438            return true; // matches everything remaining
439        }
440        if pat_parts[pi] != "*" && pat_parts[pi] != val_parts[vi] {
441            return false;
442        }
443        pi += 1;
444        vi += 1;
445    }
446    pi == pat_parts.len() && vi == val_parts.len()
447}
448
449fn add_unique(vec: &mut Vec<String>, val: &str) {
450    if !vec.iter().any(|s| s == val) {
451        vec.push(val.to_string());
452    }
453}
454
455fn add_to_scope_map(map: &mut HashMap<String, Vec<String>>, scope: &str, route_id: &str) {
456    let entry = map.entry(scope.to_string()).or_default();
457    add_unique(entry, route_id);
458}
459
460fn remove_from_all_scopes(map: &mut HashMap<String, Vec<String>>, route_id: &str) {
461    let empty_scopes: Vec<String> = map
462        .iter_mut()
463        .filter_map(|(scope, routes)| {
464            routes.retain(|r| r != route_id);
465            if routes.is_empty() {
466                Some(scope.clone())
467            } else {
468                None
469            }
470        })
471        .collect();
472    for scope in empty_scopes {
473        map.remove(&scope);
474    }
475}
476
477/// Publish a single install-progress line to the frontend for a given block.
478/// The frontend subscribes to `install_progress` events scoped to `block:{block_id}`
479/// and displays each message as a log line in the agent presentation view.
480pub fn publish_install_progress(broker: &Broker, block_id: &str, message: &str) {
481    let scope = format!("block:{}", block_id);
482    broker.publish(WaveEvent {
483        event: EVENT_INSTALL_PROGRESS.to_string(),
484        scopes: vec![scope],
485        sender: String::new(),
486        persist: 0,
487        data: Some(serde_json::json!({ "message": message })),
488    });
489}
490
491// ====================================================================
492// Tests
493// ====================================================================
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use std::sync::Arc;
499
500    struct TestClient {
501        events: Mutex<Vec<(String, WaveEvent)>>,
502    }
503
504    impl TestClient {
505        fn new() -> Self {
506            Self {
507                events: Mutex::new(Vec::new()),
508            }
509        }
510
511        fn received_events(&self) -> Vec<(String, WaveEvent)> {
512            self.events.lock().unwrap().clone()
513        }
514    }
515
516    impl WpsClient for TestClient {
517        fn send_event(&self, route_id: &str, event: WaveEvent) {
518            self.events
519                .lock()
520                .unwrap()
521                .push((route_id.to_string(), event));
522        }
523    }
524
525    impl WpsClient for Arc<TestClient> {
526        fn send_event(&self, route_id: &str, event: WaveEvent) {
527            self.events
528                .lock()
529                .unwrap()
530                .push((route_id.to_string(), event));
531        }
532    }
533
534    #[test]
535    fn test_subscribe_all_scopes() {
536        let broker = Broker::new();
537        let client = Arc::new(TestClient::new());
538        broker.set_client(Box::new(Arc::clone(&client)));
539
540        broker.subscribe(
541            "route-1",
542            SubscriptionRequest {
543                event: EVENT_WAVE_OBJ_UPDATE.to_string(),
544                scopes: vec![],
545                allscopes: true,
546            },
547        );
548
549        broker.publish(WaveEvent {
550            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
551            scopes: vec!["block:abc".to_string()],
552            sender: String::new(),
553            persist: 0,
554            data: None,
555        });
556
557        let events = client.received_events();
558        assert_eq!(events.len(), 1);
559        assert_eq!(events[0].0, "route-1");
560    }
561
562    #[test]
563    fn test_subscribe_exact_scope() {
564        let broker = Broker::new();
565        let client = Arc::new(TestClient::new());
566        broker.set_client(Box::new(Arc::clone(&client)));
567
568        broker.subscribe(
569            "route-1",
570            SubscriptionRequest {
571                event: EVENT_WAVE_OBJ_UPDATE.to_string(),
572                scopes: vec!["block:abc".to_string()],
573                allscopes: false,
574            },
575        );
576
577        // Should match
578        broker.publish(WaveEvent {
579            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
580            scopes: vec!["block:abc".to_string()],
581            sender: String::new(),
582            persist: 0,
583            data: None,
584        });
585
586        // Should NOT match
587        broker.publish(WaveEvent {
588            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
589            scopes: vec!["block:xyz".to_string()],
590            sender: String::new(),
591            persist: 0,
592            data: None,
593        });
594
595        let events = client.received_events();
596        assert_eq!(events.len(), 1);
597    }
598
599    #[test]
600    fn test_subscribe_star_scope() {
601        let broker = Broker::new();
602        let client = Arc::new(TestClient::new());
603        broker.set_client(Box::new(Arc::clone(&client)));
604
605        broker.subscribe(
606            "route-1",
607            SubscriptionRequest {
608                event: EVENT_WAVE_OBJ_UPDATE.to_string(),
609                scopes: vec!["block:*".to_string()],
610                allscopes: false,
611            },
612        );
613
614        broker.publish(WaveEvent {
615            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
616            scopes: vec!["block:abc".to_string()],
617            sender: String::new(),
618            persist: 0,
619            data: None,
620        });
621
622        broker.publish(WaveEvent {
623            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
624            scopes: vec!["tab:xyz".to_string()],
625            sender: String::new(),
626            persist: 0,
627            data: None,
628        });
629
630        let events = client.received_events();
631        assert_eq!(events.len(), 1); // only block:* matches block:abc
632    }
633
634    #[test]
635    fn test_unsubscribe() {
636        let broker = Broker::new();
637        let client = Arc::new(TestClient::new());
638        broker.set_client(Box::new(Arc::clone(&client)));
639
640        broker.subscribe(
641            "route-1",
642            SubscriptionRequest {
643                event: EVENT_BLOCK_CLOSE.to_string(),
644                scopes: vec![],
645                allscopes: true,
646            },
647        );
648
649        broker.unsubscribe("route-1", EVENT_BLOCK_CLOSE);
650
651        broker.publish(WaveEvent {
652            event: EVENT_BLOCK_CLOSE.to_string(),
653            scopes: vec![],
654            sender: String::new(),
655            persist: 0,
656            data: None,
657        });
658
659        assert!(client.received_events().is_empty());
660    }
661
662    #[test]
663    fn test_unsubscribe_all() {
664        let broker = Broker::new();
665        let client = Arc::new(TestClient::new());
666        broker.set_client(Box::new(Arc::clone(&client)));
667
668        broker.subscribe(
669            "route-1",
670            SubscriptionRequest {
671                event: EVENT_BLOCK_CLOSE.to_string(),
672                scopes: vec![],
673                allscopes: true,
674            },
675        );
676        broker.subscribe(
677            "route-1",
678            SubscriptionRequest {
679                event: EVENT_CONFIG.to_string(),
680                scopes: vec![],
681                allscopes: true,
682            },
683        );
684
685        broker.unsubscribe_all("route-1");
686
687        broker.publish(WaveEvent {
688            event: EVENT_BLOCK_CLOSE.to_string(),
689            scopes: vec![],
690            sender: String::new(),
691            persist: 0,
692            data: None,
693        });
694        broker.publish(WaveEvent {
695            event: EVENT_CONFIG.to_string(),
696            scopes: vec![],
697            sender: String::new(),
698            persist: 0,
699            data: None,
700        });
701
702        assert!(client.received_events().is_empty());
703    }
704
705    /// Regression: replay-on-subscribe delivers persisted events that
706    /// were published BEFORE the route subscribed. This closes the
707    /// late-subscribe race for tool_chunk streaming.
708    #[test]
709    fn test_replay_on_subscribe_exact_scope() {
710        let broker = Broker::new();
711        let client = Arc::new(TestClient::new());
712        broker.set_client(Box::new(Arc::clone(&client)));
713
714        // Publish 5 persisted events BEFORE any subscriber exists.
715        for i in 0..5 {
716            broker.publish(WaveEvent {
717                event: "tool_chunk".to_string(),
718                scopes: vec!["block:abc".to_string()],
719                sender: String::new(),
720                persist: 10,
721                data: Some(serde_json::json!({"tool_id": "t1", "n": i})),
722            });
723        }
724        assert!(
725            client.received_events().is_empty(),
726            "no subscriber yet, no delivery"
727        );
728
729        // Subscribe to the matching scope — replay should fire.
730        broker.subscribe(
731            "route-1",
732            SubscriptionRequest {
733                event: "tool_chunk".to_string(),
734                scopes: vec!["block:abc".to_string()],
735                allscopes: false,
736            },
737        );
738
739        let events = client.received_events();
740        assert_eq!(events.len(), 5, "all 5 persisted events replayed");
741        assert_eq!(events[0].1.data, Some(serde_json::json!({"tool_id": "t1", "n": 0})));
742        assert_eq!(events[4].1.data, Some(serde_json::json!({"tool_id": "t1", "n": 4})));
743    }
744
745    /// Regression: re-subscribing the SAME route to the SAME
746    /// (event, scope) does NOT replay again. The frontend's
747    /// `eventsub` flushes happen on every listener add/remove, so
748    /// the broker has to be idempotent across resubscribe calls.
749    /// Codex P2 on PR #817.
750    #[test]
751    fn test_replay_on_resubscribe_is_idempotent() {
752        let broker = Broker::new();
753        let client = Arc::new(TestClient::new());
754        broker.set_client(Box::new(Arc::clone(&client)));
755
756        for i in 0..3 {
757            broker.publish(WaveEvent {
758                event: "tool_chunk".to_string(),
759                scopes: vec!["block:abc".to_string()],
760                sender: String::new(),
761                persist: 10,
762                data: Some(serde_json::json!({"n": i})),
763            });
764        }
765
766        let sub = SubscriptionRequest {
767            event: "tool_chunk".to_string(),
768            scopes: vec!["block:abc".to_string()],
769            allscopes: false,
770        };
771
772        broker.subscribe("route-1", sub.clone());
773        assert_eq!(
774            client.received_events().len(),
775            3,
776            "first subscribe replays all 3 persisted events"
777        );
778
779        // Re-subscribe (same route, same event+scope) — must not
780        // replay a second time.
781        broker.subscribe("route-1", sub.clone());
782        assert_eq!(
783            client.received_events().len(),
784            3,
785            "re-subscribe is a no-op for replay; received count stays at 3"
786        );
787
788        // Live publish after re-subscribe still delivers.
789        broker.publish(WaveEvent {
790            event: "tool_chunk".to_string(),
791            scopes: vec!["block:abc".to_string()],
792            sender: String::new(),
793            persist: 10,
794            data: Some(serde_json::json!({"n": "live"})),
795        });
796        assert_eq!(
797            client.received_events().len(),
798            4,
799            "live publish after resubscribe is delivered exactly once"
800        );
801
802        // Disconnect (unsubscribe_all) + reconnect — fresh replay.
803        broker.unsubscribe_all("route-1");
804        broker.subscribe("route-1", sub.clone());
805        assert_eq!(
806            client.received_events().len(),
807            8,
808            "reconnect after unsubscribe_all clears the replayed tracker; \
809             gets all 4 persisted events again"
810        );
811    }
812
813    /// Regression: replay does NOT cross-pollute scopes. Subscriber to
814    /// `block:abc` must not receive events persisted for `block:xyz`.
815    #[test]
816    fn test_replay_on_subscribe_scope_isolation() {
817        let broker = Broker::new();
818        let client = Arc::new(TestClient::new());
819        broker.set_client(Box::new(Arc::clone(&client)));
820
821        broker.publish(WaveEvent {
822            event: "tool_chunk".to_string(),
823            scopes: vec!["block:xyz".to_string()],
824            sender: String::new(),
825            persist: 10,
826            data: Some(serde_json::json!({"tool_id": "other"})),
827        });
828
829        broker.subscribe(
830            "route-1",
831            SubscriptionRequest {
832                event: "tool_chunk".to_string(),
833                scopes: vec!["block:abc".to_string()],
834                allscopes: false,
835            },
836        );
837
838        let events = client.received_events();
839        assert_eq!(events.len(), 0, "scope:abc must not get block:xyz events");
840    }
841
842    #[test]
843    fn test_event_persistence() {
844        let broker = Broker::new();
845        let client = Arc::new(TestClient::new());
846        broker.set_client(Box::new(Arc::clone(&client)));
847
848        // Subscribe so events are dispatched
849        broker.subscribe(
850            "route-1",
851            SubscriptionRequest {
852                event: EVENT_SYS_INFO.to_string(),
853                scopes: vec![],
854                allscopes: true,
855            },
856        );
857
858        // Publish persistent events
859        for i in 0..5 {
860            broker.publish(WaveEvent {
861                event: EVENT_SYS_INFO.to_string(),
862                scopes: vec!["cpu".to_string()],
863                sender: String::new(),
864                persist: 3, // keep last 3
865                data: Some(serde_json::json!(i)),
866            });
867        }
868
869        // Read history (global scope "")
870        let history = broker.read_event_history(EVENT_SYS_INFO, "", 10);
871        assert_eq!(history.len(), 3);
872        assert_eq!(history[0].data, Some(serde_json::json!(2)));
873        assert_eq!(history[2].data, Some(serde_json::json!(4)));
874
875        // Read scoped history
876        let scoped = broker.read_event_history(EVENT_SYS_INFO, "cpu", 2);
877        assert_eq!(scoped.len(), 2);
878    }
879
880    #[test]
881    fn test_star_match() {
882        assert!(star_match("block:*", "block:abc", ":"));
883        assert!(star_match("*:abc", "block:abc", ":"));
884        assert!(!star_match("block:*", "tab:abc", ":"));
885        assert!(star_match("**", "block:abc:xyz", ":"));
886        assert!(!star_match("block:*", "block:abc:xyz", ":")); // * matches one segment only
887    }
888
889    #[test]
890    fn test_wave_event_serialization() {
891        let event = WaveEvent {
892            event: "test".to_string(),
893            scopes: vec!["scope1".to_string()],
894            sender: String::new(),
895            persist: 0,
896            data: Some(serde_json::json!({"key": "value"})),
897        };
898        let json = serde_json::to_string(&event).unwrap();
899        let parsed: WaveEvent = serde_json::from_str(&json).unwrap();
900        assert_eq!(parsed.event, "test");
901        assert_eq!(parsed.scopes, vec!["scope1"]);
902        // Empty sender and zero persist should be omitted
903        assert!(!json.contains("\"sender\""));
904        assert!(!json.contains("\"persist\""));
905    }
906
907    #[test]
908    fn test_subscription_request_serialization() {
909        let req = SubscriptionRequest {
910            event: "blockclose".to_string(),
911            scopes: vec!["block:123".to_string()],
912            allscopes: false,
913        };
914        let json = serde_json::to_string(&req).unwrap();
915        let parsed: SubscriptionRequest = serde_json::from_str(&json).unwrap();
916        assert_eq!(parsed.event, "blockclose");
917    }
918
919    #[test]
920    fn test_no_client_publish_does_not_panic() {
921        let broker = Broker::new();
922        // No client set — should not panic
923        broker.publish(WaveEvent {
924            event: "test".to_string(),
925            scopes: vec![],
926            sender: String::new(),
927            persist: 0,
928            data: None,
929        });
930    }
931
932    #[test]
933    fn test_double_star_scope() {
934        let broker = Broker::new();
935        let client = Arc::new(TestClient::new());
936        broker.set_client(Box::new(Arc::clone(&client)));
937
938        broker.subscribe(
939            "route-1",
940            SubscriptionRequest {
941                event: EVENT_WAVE_OBJ_UPDATE.to_string(),
942                scopes: vec!["**".to_string()],
943                allscopes: false,
944            },
945        );
946
947        broker.publish(WaveEvent {
948            event: EVENT_WAVE_OBJ_UPDATE.to_string(),
949            scopes: vec!["block:abc:def".to_string()],
950            sender: String::new(),
951            persist: 0,
952            data: None,
953        });
954
955        assert_eq!(client.received_events().len(), 1);
956    }
957}