1use std::collections::{HashMap, HashSet};
15use std::sync::Mutex;
16
17use serde::{Deserialize, Serialize};
18
19#[allow(dead_code)]
22pub const EVENT_BLOCK_CLOSE: &str = "blockclose";
23#[allow(dead_code)]
24pub const EVENT_CONN_CHANGE: &str = "connchange";
25pub const EVENT_SYS_INFO: &str = "sysinfo";
26pub const EVENT_CONTROLLER_STATUS: &str = "controllerstatus";
27pub const EVENT_WAVE_OBJ_UPDATE: &str = "waveobj:update";
28pub const EVENT_BLOCK_FILE: &str = "blockfile";
29pub const EVENT_INSTALL_PROGRESS: &str = "install_progress";
30#[allow(dead_code)]
31pub const EVENT_CONFIG: &str = "config";
32#[allow(dead_code)]
33pub const EVENT_USER_INPUT: &str = "userinput";
34pub const EVENT_AGENT_MESSAGE_ACCEPTED: &str = "agent-message-accepted";
39#[allow(dead_code)]
40pub const EVENT_ROUTE_GONE: &str = "route:gone";
41pub const EVENT_BLOCK_STATS: &str = "blockstats";
42pub const EVENT_AGENT_HEALTH: &str = "agenthealth";
43
44#[allow(dead_code)]
46pub const FILE_OP_CREATE: &str = "create";
47#[allow(dead_code)]
48pub const FILE_OP_DELETE: &str = "delete";
49pub const FILE_OP_APPEND: &str = "append";
50pub const FILE_OP_TRUNCATE: &str = "truncate";
51#[allow(dead_code)]
52pub const FILE_OP_INVALIDATE: &str = "invalidate";
53
54const MAX_PERSIST: usize = 4096;
55const REMAKE_ARR_THRESHOLD: usize = 10 * 1024;
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct WaveEvent {
61 pub event: String,
62 #[serde(skip_serializing_if = "Vec::is_empty", default)]
63 pub scopes: Vec<String>,
64 #[serde(skip_serializing_if = "String::is_empty", default)]
65 pub sender: String,
66 #[serde(skip_serializing_if = "is_zero", default)]
67 pub persist: usize,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub data: Option<serde_json::Value>,
70}
71
72fn is_zero(v: &usize) -> bool {
73 *v == 0
74}
75
76impl WaveEvent {
77 #[allow(dead_code)]
78 pub fn has_scope(&self, scope: &str) -> bool {
79 self.scopes.iter().any(|s| s == scope)
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SubscriptionRequest {
85 pub event: String,
86 #[serde(skip_serializing_if = "Vec::is_empty", default)]
87 pub scopes: Vec<String>,
88 #[serde(default)]
89 pub allscopes: bool,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct WSFileEventData {
94 pub zoneid: String,
95 pub filename: String,
96 pub fileop: String,
97 #[serde(skip_serializing_if = "String::is_empty", default)]
98 pub data64: String,
99}
100
101pub trait WpsClient: Send + Sync {
105 fn send_event(&self, route_id: &str, event: WaveEvent);
106}
107
108#[derive(Default)]
111struct BrokerSubscription {
112 all_subs: Vec<String>,
114 scope_subs: HashMap<String, Vec<String>>,
116 star_subs: HashMap<String, Vec<String>>,
118}
119
120impl BrokerSubscription {
121 fn is_empty(&self) -> bool {
122 self.all_subs.is_empty() && self.scope_subs.is_empty() && self.star_subs.is_empty()
123 }
124}
125
126#[derive(Hash, Eq, PartialEq, Clone)]
127struct PersistKey {
128 event: String,
129 scope: String,
130}
131
132struct PersistEventWrap {
133 arr_total_adds: usize,
134 events: Vec<WaveEvent>,
135}
136
137pub struct Broker {
141 inner: Mutex<BrokerInner>,
142}
143
144type ReplayKey = (String, String, String);
151
152struct BrokerInner {
153 client: Option<Box<dyn WpsClient>>,
154 sub_map: HashMap<String, BrokerSubscription>,
155 persist_map: HashMap<PersistKey, PersistEventWrap>,
156 replayed: HashSet<ReplayKey>,
157}
158
159impl Broker {
160 pub fn new() -> Self {
161 Self {
162 inner: Mutex::new(BrokerInner {
163 client: None,
164 sub_map: HashMap::new(),
165 persist_map: HashMap::new(),
166 replayed: HashSet::new(),
167 }),
168 }
169 }
170
171 pub fn set_client(&self, client: Box<dyn WpsClient>) {
172 let mut inner = self.inner.lock().unwrap();
173 inner.client = Some(client);
174 }
175
176 pub fn subscribe(&self, route_id: &str, sub: SubscriptionRequest) {
185 if sub.event.is_empty() {
186 return;
187 }
188 let mut inner = self.inner.lock().unwrap();
189 Self::unsubscribe_nolock(&mut inner, route_id, &sub.event);
191
192 let bs = inner
193 .sub_map
194 .entry(sub.event.clone())
195 .or_default();
196
197 if sub.allscopes {
198 add_unique(&mut bs.all_subs, route_id);
199 } else {
200 for scope in &sub.scopes {
201 if scope_has_star(scope) {
202 add_to_scope_map(&mut bs.star_subs, scope, route_id);
203 } else {
204 add_to_scope_map(&mut bs.scope_subs, scope, route_id);
205 }
206 }
207 }
208
209 Self::replay_to_route(&mut inner, route_id, &sub);
210 }
211
212 fn replay_to_route(
230 inner: &mut BrokerInner,
231 route_id: &str,
232 sub: &SubscriptionRequest,
233 ) {
234 let client = match &inner.client {
235 Some(c) => c,
236 None => return,
237 };
238
239 let mut scopes_to_deliver: Vec<String> = Vec::new();
240 if sub.allscopes {
241 scopes_to_deliver.push(String::new());
243 } else {
244 for scope in &sub.scopes {
245 if !scope_has_star(scope) {
246 scopes_to_deliver.push(scope.clone());
247 }
248 }
249 }
250
251 let mut to_send: Vec<WaveEvent> = Vec::new();
252 for scope in scopes_to_deliver {
253 let key = (route_id.to_string(), sub.event.clone(), scope.clone());
254 if inner.replayed.contains(&key) {
255 continue;
256 }
257 let pkey = PersistKey {
258 event: sub.event.clone(),
259 scope: scope.clone(),
260 };
261 if let Some(pe) = inner.persist_map.get(&pkey) {
262 for event in &pe.events {
263 to_send.push(event.clone());
264 }
265 }
266 inner.replayed.insert(key);
267 }
268 for event in to_send {
269 client.send_event(route_id, event);
270 }
271 }
272
273 pub fn unsubscribe(&self, route_id: &str, event_name: &str) {
275 let mut inner = self.inner.lock().unwrap();
276 Self::unsubscribe_nolock(&mut inner, route_id, event_name);
277 }
278
279 pub fn unsubscribe_all(&self, route_id: &str) {
286 let mut inner = self.inner.lock().unwrap();
287 let events: Vec<String> = inner.sub_map.keys().cloned().collect();
288 for event in events {
289 Self::unsubscribe_nolock(&mut inner, route_id, &event);
290 }
291 inner.replayed.retain(|(r, _, _)| r != route_id);
292 }
293
294 fn unsubscribe_nolock(inner: &mut BrokerInner, route_id: &str, event_name: &str) {
295 let bs = match inner.sub_map.get_mut(event_name) {
296 Some(bs) => bs,
297 None => return,
298 };
299 bs.all_subs.retain(|s| s != route_id);
300 remove_from_all_scopes(&mut bs.scope_subs, route_id);
301 remove_from_all_scopes(&mut bs.star_subs, route_id);
302 if bs.is_empty() {
303 inner.sub_map.remove(event_name);
304 }
305 }
306
307 pub fn publish(&self, event: WaveEvent) {
309 let mut inner = self.inner.lock().unwrap();
310
311 if event.persist > 0 {
313 Self::persist_event(&mut inner, &event);
314 }
315
316 let client = match &inner.client {
317 Some(c) => c,
318 None => return,
319 };
320
321 let route_ids = Self::get_matching_routes(&inner, &event);
322 for route_id in route_ids {
323 client.send_event(&route_id, event.clone());
324 }
325 }
326
327 pub fn read_event_history(
329 &self,
330 event_type: &str,
331 scope: &str,
332 max_items: usize,
333 ) -> Vec<WaveEvent> {
334 if max_items == 0 {
335 return Vec::new();
336 }
337 let inner = self.inner.lock().unwrap();
338 let key = PersistKey {
339 event: event_type.to_string(),
340 scope: scope.to_string(),
341 };
342 match inner.persist_map.get(&key) {
343 Some(pe) if !pe.events.is_empty() => {
344 let n = max_items.min(pe.events.len());
345 pe.events[pe.events.len() - n..].to_vec()
346 }
347 _ => Vec::new(),
348 }
349 }
350
351 fn persist_event(inner: &mut BrokerInner, event: &WaveEvent) {
352 let num_persist = event.persist.min(MAX_PERSIST);
353 let mut scope_set: Vec<String> = event.scopes.clone();
354 scope_set.push(String::new()); for scope in scope_set {
357 let key = PersistKey {
358 event: event.event.clone(),
359 scope,
360 };
361 let pe = inner.persist_map.entry(key).or_insert_with(|| {
362 PersistEventWrap {
363 arr_total_adds: 0,
364 events: Vec::with_capacity(num_persist),
365 }
366 });
367 pe.events.push(event.clone());
368 pe.arr_total_adds += 1;
369 if pe.events.len() > num_persist {
371 pe.events.drain(..pe.events.len() - num_persist);
372 }
373 if pe.arr_total_adds > REMAKE_ARR_THRESHOLD {
375 let compacted: Vec<WaveEvent> = pe.events.drain(..).collect();
376 pe.events = compacted;
377 pe.arr_total_adds = pe.events.len();
378 }
379 }
380 }
381
382 fn get_matching_routes(inner: &BrokerInner, event: &WaveEvent) -> Vec<String> {
383 let bs = match inner.sub_map.get(&event.event) {
384 Some(bs) => bs,
385 None => return Vec::new(),
386 };
387
388 let mut route_ids: HashMap<&str, ()> = HashMap::new();
389
390 for route_id in &bs.all_subs {
392 route_ids.insert(route_id, ());
393 }
394
395 for scope in &event.scopes {
397 if let Some(routes) = bs.scope_subs.get(scope) {
398 for route_id in routes {
399 route_ids.insert(route_id, ());
400 }
401 }
402 for (star_scope, routes) in &bs.star_subs {
404 if star_match(star_scope, scope, ":") {
405 for route_id in routes {
406 route_ids.insert(route_id, ());
407 }
408 }
409 }
410 }
411
412 route_ids.keys().map(|s| s.to_string()).collect()
413 }
414}
415
416impl Default for Broker {
417 fn default() -> Self {
418 Self::new()
419 }
420}
421
422fn scope_has_star(scope: &str) -> bool {
425 scope.split(':').any(|part| part == "*" || part == "**")
426}
427
428fn star_match(pattern: &str, value: &str, sep: &str) -> bool {
431 let pat_parts: Vec<&str> = pattern.split(sep).collect();
432 let val_parts: Vec<&str> = value.split(sep).collect();
433
434 let mut pi = 0;
435 let mut vi = 0;
436 while pi < pat_parts.len() && vi < val_parts.len() {
437 if pat_parts[pi] == "**" {
438 return true; }
440 if pat_parts[pi] != "*" && pat_parts[pi] != val_parts[vi] {
441 return false;
442 }
443 pi += 1;
444 vi += 1;
445 }
446 pi == pat_parts.len() && vi == val_parts.len()
447}
448
449fn add_unique(vec: &mut Vec<String>, val: &str) {
450 if !vec.iter().any(|s| s == val) {
451 vec.push(val.to_string());
452 }
453}
454
455fn add_to_scope_map(map: &mut HashMap<String, Vec<String>>, scope: &str, route_id: &str) {
456 let entry = map.entry(scope.to_string()).or_default();
457 add_unique(entry, route_id);
458}
459
460fn remove_from_all_scopes(map: &mut HashMap<String, Vec<String>>, route_id: &str) {
461 let empty_scopes: Vec<String> = map
462 .iter_mut()
463 .filter_map(|(scope, routes)| {
464 routes.retain(|r| r != route_id);
465 if routes.is_empty() {
466 Some(scope.clone())
467 } else {
468 None
469 }
470 })
471 .collect();
472 for scope in empty_scopes {
473 map.remove(&scope);
474 }
475}
476
477pub fn publish_install_progress(broker: &Broker, block_id: &str, message: &str) {
481 let scope = format!("block:{}", block_id);
482 broker.publish(WaveEvent {
483 event: EVENT_INSTALL_PROGRESS.to_string(),
484 scopes: vec![scope],
485 sender: String::new(),
486 persist: 0,
487 data: Some(serde_json::json!({ "message": message })),
488 });
489}
490
491#[cfg(test)]
496mod tests {
497 use super::*;
498 use std::sync::Arc;
499
500 struct TestClient {
501 events: Mutex<Vec<(String, WaveEvent)>>,
502 }
503
504 impl TestClient {
505 fn new() -> Self {
506 Self {
507 events: Mutex::new(Vec::new()),
508 }
509 }
510
511 fn received_events(&self) -> Vec<(String, WaveEvent)> {
512 self.events.lock().unwrap().clone()
513 }
514 }
515
516 impl WpsClient for TestClient {
517 fn send_event(&self, route_id: &str, event: WaveEvent) {
518 self.events
519 .lock()
520 .unwrap()
521 .push((route_id.to_string(), event));
522 }
523 }
524
525 impl WpsClient for Arc<TestClient> {
526 fn send_event(&self, route_id: &str, event: WaveEvent) {
527 self.events
528 .lock()
529 .unwrap()
530 .push((route_id.to_string(), event));
531 }
532 }
533
534 #[test]
535 fn test_subscribe_all_scopes() {
536 let broker = Broker::new();
537 let client = Arc::new(TestClient::new());
538 broker.set_client(Box::new(Arc::clone(&client)));
539
540 broker.subscribe(
541 "route-1",
542 SubscriptionRequest {
543 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
544 scopes: vec![],
545 allscopes: true,
546 },
547 );
548
549 broker.publish(WaveEvent {
550 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
551 scopes: vec!["block:abc".to_string()],
552 sender: String::new(),
553 persist: 0,
554 data: None,
555 });
556
557 let events = client.received_events();
558 assert_eq!(events.len(), 1);
559 assert_eq!(events[0].0, "route-1");
560 }
561
562 #[test]
563 fn test_subscribe_exact_scope() {
564 let broker = Broker::new();
565 let client = Arc::new(TestClient::new());
566 broker.set_client(Box::new(Arc::clone(&client)));
567
568 broker.subscribe(
569 "route-1",
570 SubscriptionRequest {
571 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
572 scopes: vec!["block:abc".to_string()],
573 allscopes: false,
574 },
575 );
576
577 broker.publish(WaveEvent {
579 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
580 scopes: vec!["block:abc".to_string()],
581 sender: String::new(),
582 persist: 0,
583 data: None,
584 });
585
586 broker.publish(WaveEvent {
588 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
589 scopes: vec!["block:xyz".to_string()],
590 sender: String::new(),
591 persist: 0,
592 data: None,
593 });
594
595 let events = client.received_events();
596 assert_eq!(events.len(), 1);
597 }
598
599 #[test]
600 fn test_subscribe_star_scope() {
601 let broker = Broker::new();
602 let client = Arc::new(TestClient::new());
603 broker.set_client(Box::new(Arc::clone(&client)));
604
605 broker.subscribe(
606 "route-1",
607 SubscriptionRequest {
608 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
609 scopes: vec!["block:*".to_string()],
610 allscopes: false,
611 },
612 );
613
614 broker.publish(WaveEvent {
615 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
616 scopes: vec!["block:abc".to_string()],
617 sender: String::new(),
618 persist: 0,
619 data: None,
620 });
621
622 broker.publish(WaveEvent {
623 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
624 scopes: vec!["tab:xyz".to_string()],
625 sender: String::new(),
626 persist: 0,
627 data: None,
628 });
629
630 let events = client.received_events();
631 assert_eq!(events.len(), 1); }
633
634 #[test]
635 fn test_unsubscribe() {
636 let broker = Broker::new();
637 let client = Arc::new(TestClient::new());
638 broker.set_client(Box::new(Arc::clone(&client)));
639
640 broker.subscribe(
641 "route-1",
642 SubscriptionRequest {
643 event: EVENT_BLOCK_CLOSE.to_string(),
644 scopes: vec![],
645 allscopes: true,
646 },
647 );
648
649 broker.unsubscribe("route-1", EVENT_BLOCK_CLOSE);
650
651 broker.publish(WaveEvent {
652 event: EVENT_BLOCK_CLOSE.to_string(),
653 scopes: vec![],
654 sender: String::new(),
655 persist: 0,
656 data: None,
657 });
658
659 assert!(client.received_events().is_empty());
660 }
661
662 #[test]
663 fn test_unsubscribe_all() {
664 let broker = Broker::new();
665 let client = Arc::new(TestClient::new());
666 broker.set_client(Box::new(Arc::clone(&client)));
667
668 broker.subscribe(
669 "route-1",
670 SubscriptionRequest {
671 event: EVENT_BLOCK_CLOSE.to_string(),
672 scopes: vec![],
673 allscopes: true,
674 },
675 );
676 broker.subscribe(
677 "route-1",
678 SubscriptionRequest {
679 event: EVENT_CONFIG.to_string(),
680 scopes: vec![],
681 allscopes: true,
682 },
683 );
684
685 broker.unsubscribe_all("route-1");
686
687 broker.publish(WaveEvent {
688 event: EVENT_BLOCK_CLOSE.to_string(),
689 scopes: vec![],
690 sender: String::new(),
691 persist: 0,
692 data: None,
693 });
694 broker.publish(WaveEvent {
695 event: EVENT_CONFIG.to_string(),
696 scopes: vec![],
697 sender: String::new(),
698 persist: 0,
699 data: None,
700 });
701
702 assert!(client.received_events().is_empty());
703 }
704
705 #[test]
709 fn test_replay_on_subscribe_exact_scope() {
710 let broker = Broker::new();
711 let client = Arc::new(TestClient::new());
712 broker.set_client(Box::new(Arc::clone(&client)));
713
714 for i in 0..5 {
716 broker.publish(WaveEvent {
717 event: "tool_chunk".to_string(),
718 scopes: vec!["block:abc".to_string()],
719 sender: String::new(),
720 persist: 10,
721 data: Some(serde_json::json!({"tool_id": "t1", "n": i})),
722 });
723 }
724 assert!(
725 client.received_events().is_empty(),
726 "no subscriber yet, no delivery"
727 );
728
729 broker.subscribe(
731 "route-1",
732 SubscriptionRequest {
733 event: "tool_chunk".to_string(),
734 scopes: vec!["block:abc".to_string()],
735 allscopes: false,
736 },
737 );
738
739 let events = client.received_events();
740 assert_eq!(events.len(), 5, "all 5 persisted events replayed");
741 assert_eq!(events[0].1.data, Some(serde_json::json!({"tool_id": "t1", "n": 0})));
742 assert_eq!(events[4].1.data, Some(serde_json::json!({"tool_id": "t1", "n": 4})));
743 }
744
745 #[test]
751 fn test_replay_on_resubscribe_is_idempotent() {
752 let broker = Broker::new();
753 let client = Arc::new(TestClient::new());
754 broker.set_client(Box::new(Arc::clone(&client)));
755
756 for i in 0..3 {
757 broker.publish(WaveEvent {
758 event: "tool_chunk".to_string(),
759 scopes: vec!["block:abc".to_string()],
760 sender: String::new(),
761 persist: 10,
762 data: Some(serde_json::json!({"n": i})),
763 });
764 }
765
766 let sub = SubscriptionRequest {
767 event: "tool_chunk".to_string(),
768 scopes: vec!["block:abc".to_string()],
769 allscopes: false,
770 };
771
772 broker.subscribe("route-1", sub.clone());
773 assert_eq!(
774 client.received_events().len(),
775 3,
776 "first subscribe replays all 3 persisted events"
777 );
778
779 broker.subscribe("route-1", sub.clone());
782 assert_eq!(
783 client.received_events().len(),
784 3,
785 "re-subscribe is a no-op for replay; received count stays at 3"
786 );
787
788 broker.publish(WaveEvent {
790 event: "tool_chunk".to_string(),
791 scopes: vec!["block:abc".to_string()],
792 sender: String::new(),
793 persist: 10,
794 data: Some(serde_json::json!({"n": "live"})),
795 });
796 assert_eq!(
797 client.received_events().len(),
798 4,
799 "live publish after resubscribe is delivered exactly once"
800 );
801
802 broker.unsubscribe_all("route-1");
804 broker.subscribe("route-1", sub.clone());
805 assert_eq!(
806 client.received_events().len(),
807 8,
808 "reconnect after unsubscribe_all clears the replayed tracker; \
809 gets all 4 persisted events again"
810 );
811 }
812
813 #[test]
816 fn test_replay_on_subscribe_scope_isolation() {
817 let broker = Broker::new();
818 let client = Arc::new(TestClient::new());
819 broker.set_client(Box::new(Arc::clone(&client)));
820
821 broker.publish(WaveEvent {
822 event: "tool_chunk".to_string(),
823 scopes: vec!["block:xyz".to_string()],
824 sender: String::new(),
825 persist: 10,
826 data: Some(serde_json::json!({"tool_id": "other"})),
827 });
828
829 broker.subscribe(
830 "route-1",
831 SubscriptionRequest {
832 event: "tool_chunk".to_string(),
833 scopes: vec!["block:abc".to_string()],
834 allscopes: false,
835 },
836 );
837
838 let events = client.received_events();
839 assert_eq!(events.len(), 0, "scope:abc must not get block:xyz events");
840 }
841
842 #[test]
843 fn test_event_persistence() {
844 let broker = Broker::new();
845 let client = Arc::new(TestClient::new());
846 broker.set_client(Box::new(Arc::clone(&client)));
847
848 broker.subscribe(
850 "route-1",
851 SubscriptionRequest {
852 event: EVENT_SYS_INFO.to_string(),
853 scopes: vec![],
854 allscopes: true,
855 },
856 );
857
858 for i in 0..5 {
860 broker.publish(WaveEvent {
861 event: EVENT_SYS_INFO.to_string(),
862 scopes: vec!["cpu".to_string()],
863 sender: String::new(),
864 persist: 3, data: Some(serde_json::json!(i)),
866 });
867 }
868
869 let history = broker.read_event_history(EVENT_SYS_INFO, "", 10);
871 assert_eq!(history.len(), 3);
872 assert_eq!(history[0].data, Some(serde_json::json!(2)));
873 assert_eq!(history[2].data, Some(serde_json::json!(4)));
874
875 let scoped = broker.read_event_history(EVENT_SYS_INFO, "cpu", 2);
877 assert_eq!(scoped.len(), 2);
878 }
879
880 #[test]
881 fn test_star_match() {
882 assert!(star_match("block:*", "block:abc", ":"));
883 assert!(star_match("*:abc", "block:abc", ":"));
884 assert!(!star_match("block:*", "tab:abc", ":"));
885 assert!(star_match("**", "block:abc:xyz", ":"));
886 assert!(!star_match("block:*", "block:abc:xyz", ":")); }
888
889 #[test]
890 fn test_wave_event_serialization() {
891 let event = WaveEvent {
892 event: "test".to_string(),
893 scopes: vec!["scope1".to_string()],
894 sender: String::new(),
895 persist: 0,
896 data: Some(serde_json::json!({"key": "value"})),
897 };
898 let json = serde_json::to_string(&event).unwrap();
899 let parsed: WaveEvent = serde_json::from_str(&json).unwrap();
900 assert_eq!(parsed.event, "test");
901 assert_eq!(parsed.scopes, vec!["scope1"]);
902 assert!(!json.contains("\"sender\""));
904 assert!(!json.contains("\"persist\""));
905 }
906
907 #[test]
908 fn test_subscription_request_serialization() {
909 let req = SubscriptionRequest {
910 event: "blockclose".to_string(),
911 scopes: vec!["block:123".to_string()],
912 allscopes: false,
913 };
914 let json = serde_json::to_string(&req).unwrap();
915 let parsed: SubscriptionRequest = serde_json::from_str(&json).unwrap();
916 assert_eq!(parsed.event, "blockclose");
917 }
918
919 #[test]
920 fn test_no_client_publish_does_not_panic() {
921 let broker = Broker::new();
922 broker.publish(WaveEvent {
924 event: "test".to_string(),
925 scopes: vec![],
926 sender: String::new(),
927 persist: 0,
928 data: None,
929 });
930 }
931
932 #[test]
933 fn test_double_star_scope() {
934 let broker = Broker::new();
935 let client = Arc::new(TestClient::new());
936 broker.set_client(Box::new(Arc::clone(&client)));
937
938 broker.subscribe(
939 "route-1",
940 SubscriptionRequest {
941 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
942 scopes: vec!["**".to_string()],
943 allscopes: false,
944 },
945 );
946
947 broker.publish(WaveEvent {
948 event: EVENT_WAVE_OBJ_UPDATE.to_string(),
949 scopes: vec!["block:abc:def".to_string()],
950 sender: String::new(),
951 persist: 0,
952 data: None,
953 });
954
955 assert_eq!(client.received_events().len(), 1);
956 }
957}