1use axum::{extract::State, response::Json};
6use serde_json::json;
7
8use crate::backend::blockcontroller;
9use crate::backend::service::{self, CloseTabRtnType, WebCallType, WebReturnType};
10use crate::backend::storage::wstore::WaveStore;
11use crate::backend::obj::*;
12use crate::backend::wcore;
13
14use super::AppState;
15
16pub(super) async fn handle_service(
17 State(state): State<AppState>,
18 body: axum::body::Bytes,
19) -> Json<WebReturnType> {
20 let service_start = std::time::Instant::now();
21 let call: WebCallType = match serde_json::from_slice(&body) {
22 Ok(c) => c,
23 Err(e) => return Json(WebReturnType::error(format!("invalid request body: {e}"))),
24 };
25 let result = dispatch_service(&state, &call).await;
26 let elapsed = service_start.elapsed();
27 tracing::info!(
28 "[http-perf] {}.{}: {:.2}ms",
29 call.service,
30 call.method,
31 elapsed.as_secs_f64() * 1000.0,
32 );
33
34 if let Some(updates) = &result.updates {
43 for update in updates {
44 if let Ok(data) = serde_json::to_value(update) {
45 let oref = format!("{}:{}", update.otype, update.oid);
46 state.event_bus.broadcast_event(
47 &crate::backend::eventbus::WSEventType {
48 eventtype: "waveobj:update".to_string(),
49 oref,
50 data: Some(data),
51 },
52 );
53 }
54 }
55 }
56
57 Json(result)
58}
59
60async fn dispatch_service(state: &AppState, call: &WebCallType) -> WebReturnType {
61 let store = &state.wstore;
62 let args = &call.args;
63
64 match (call.service.as_str(), call.method.as_str()) {
65 ("object", "GetObject") => {
67 let oref_str: String = match service::get_arg(args, 0) {
68 Ok(v) => v,
69 Err(e) => return WebReturnType::error(e),
70 };
71 match get_object_by_oref(store, &oref_str) {
72 Ok(data) => WebReturnType::success(data),
73 Err(e) => WebReturnType::error(e),
74 }
75 }
76 ("object", "GetObjects") => {
77 let orefs: Vec<String> = match service::get_arg(args, 0) {
78 Ok(v) => v,
79 Err(e) => return WebReturnType::error(e),
80 };
81 let mut results = Vec::new();
82 for oref_str in &orefs {
83 match get_object_by_oref(store, oref_str) {
84 Ok(data) => results.push(data),
85 Err(_) => results.push(serde_json::Value::Null),
86 }
87 }
88 WebReturnType::success(serde_json::json!(results))
89 }
90 ("object", "CreateBlock") => {
91 let block_def: BlockDef = match service::get_arg(args, 0) {
92 Ok(v) => v,
93 Err(e) => return WebReturnType::error(e),
94 };
95 let explicit_tab_id: Option<String> = match service::get_optional_arg::<String>(args, 2) {
109 Ok(opt) => opt.map(|s| s.trim().to_string()).filter(|s| !s.is_empty()),
110 Err(e) => return WebReturnType::error(format!("invalid tabId arg: {}", e)),
111 };
112 let tab_id = match explicit_tab_id {
113 Some(id) => id,
114 None => match call
115 .uicontext
116 .as_ref()
117 .map(|ctx| ctx.active_tab_id.clone())
118 {
119 Some(id) if !id.is_empty() => id,
120 _ => return WebReturnType::error("missing uicontext.activetabid"),
121 },
122 };
123 let meta_value =
128 serde_json::to_value(&block_def.meta).unwrap_or(serde_json::Value::Null);
129 let events = dispatch_to_reducer(
130 state,
131 agentmux_common::ipc::Command::CreateBlock {
132 tab_id: tab_id.clone(),
133 meta: meta_value,
134 },
135 )
136 .await;
137 if let Some(err_msg) = events.iter().find_map(|e| match e {
139 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
140 _ => None,
141 }) {
142 return WebReturnType::error(err_msg);
143 }
144 let block_id = events.iter().find_map(|e| match e {
145 agentmux_common::ipc::Event::BlockCreated { block_id, .. } => {
146 Some(block_id.clone())
147 }
148 _ => None,
149 });
150 let mut apply_err: Option<String> = None;
151 for ev in &events {
152 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
153 apply_err = Some(e.to_string());
154 break;
155 }
156 }
157 if let Some(err) = apply_err {
158 if let Some(bid) = block_id.as_ref() {
159 compensate_via_reducer(
160 state,
161 agentmux_common::ipc::Command::DeleteBlock {
162 tab_id: tab_id.clone(),
163 block_id: bid.clone(),
164 },
165 store,
166 )
167 .await;
168 }
169 return WebReturnType::error(format!("CreateBlock: SQLite write failed: {}", err));
170 }
171 publish_events(state, &events);
172 match block_id {
173 Some(bid) => {
174 let mut updates = vec![];
175 if let Ok(block) = store.must_get::<Block>(&bid) {
176 updates.push(WaveObjUpdate {
177 updatetype: "update".into(),
178 otype: OTYPE_BLOCK.to_string(),
179 oid: bid.clone(),
180 obj: Some(wave_obj_to_value(&block)),
181 });
182 }
183 if let Ok(tab) = store.must_get::<Tab>(&tab_id) {
184 updates.push(WaveObjUpdate {
185 updatetype: "update".into(),
186 otype: OTYPE_TAB.to_string(),
187 oid: tab_id.clone(),
188 obj: Some(wave_obj_to_value(&tab)),
189 });
190 }
191 WebReturnType::success_data_updates(serde_json::json!(bid), updates)
192 }
193 None => WebReturnType::error(
194 "CreateBlock: reducer did not emit BlockCreated".to_string(),
195 ),
196 }
197 }
198 ("object", "DeleteBlock") => {
205 let tab_id = match call
206 .uicontext
207 .as_ref()
208 .map(|ctx| ctx.active_tab_id.clone())
209 {
210 Some(id) if !id.is_empty() => id,
211 _ => return WebReturnType::error("missing uicontext.activetabid"),
212 };
213 let block_id: String = match service::get_arg(args, 0) {
214 Ok(v) => v,
215 Err(e) => return WebReturnType::error(e),
216 };
217 if let Err(reason) = crate::sagas::delete_block::run(state, tab_id, block_id).await {
218 return WebReturnType::error(reason);
219 }
220 WebReturnType::success_empty()
221 }
222 ("object", "UpdateObject") => {
223 let wave_obj_value: serde_json::Value = match service::get_arg(args, 0) {
224 Ok(v) => v,
225 Err(e) => return WebReturnType::error(e),
226 };
227 let layout_slice: Option<(String, String, String)> = if wave_obj_value
245 .get("otype")
246 .and_then(|v| v.as_str())
247 == Some(OTYPE_LAYOUT)
248 {
249 wave_obj_value
250 .get("oid")
251 .and_then(|v| v.as_str())
252 .and_then(|layout_oid| find_tab_for_layout(store, layout_oid))
253 .map(|tab_id| {
254 let new_focused = wave_obj_value
255 .get("focusednodeid")
256 .and_then(|v| v.as_str())
257 .unwrap_or("")
258 .to_string();
259 let new_magnified = wave_obj_value
260 .get("magnifiednodeid")
261 .and_then(|v| v.as_str())
262 .unwrap_or("")
263 .to_string();
264 (tab_id, new_focused, new_magnified)
265 })
266 } else {
267 None
268 };
269 match update_object(store, wave_obj_value) {
270 Ok((otype, oid, obj_val)) => {
271 if let Some((tab_id, new_focused, new_magnified)) = layout_slice {
276 let focus_events = dispatch_to_reducer(
277 state,
278 agentmux_common::ipc::Command::SetFocusedNode {
279 tab_id: tab_id.clone(),
280 node_id: new_focused,
281 },
282 )
283 .await;
284 publish_events(state, &focus_events);
285 let mag_events = dispatch_to_reducer(
286 state,
287 agentmux_common::ipc::Command::SetMagnifiedNode {
288 tab_id,
289 node_id: new_magnified,
290 },
291 )
292 .await;
293 publish_events(state, &mag_events);
294 }
295 let update = WaveObjUpdate {
296 updatetype: "update".into(),
297 otype,
298 oid,
299 obj: Some(obj_val),
300 };
301 WebReturnType::success_with_updates(vec![update])
302 }
303 Err(e) => WebReturnType::error(e),
304 }
305 }
306 ("object", "UpdateObjectMeta") => {
312 let oref_str: String = match service::get_arg(args, 0) {
313 Ok(v) => v,
314 Err(e) => return WebReturnType::error(e),
315 };
316 let meta_update: MetaMapType = match service::get_arg(args, 1) {
317 Ok(v) => v,
318 Err(e) => return WebReturnType::error(e),
319 };
320 let oref = match crate::backend::ORef::parse(&oref_str) {
321 Ok(v) => v,
322 Err(e) => return WebReturnType::error(e.to_string()),
323 };
324 let meta_value = serde_json::to_value(&meta_update).unwrap_or(serde_json::Value::Null);
325 let cmd = match oref.otype.as_str() {
326 t if t == OTYPE_WORKSPACE => agentmux_common::ipc::Command::UpdateWorkspaceMeta {
327 workspace_id: oref.oid.clone(),
328 meta_patch: meta_value,
329 },
330 t if t == OTYPE_TAB => agentmux_common::ipc::Command::UpdateTabMeta {
331 tab_id: oref.oid.clone(),
332 meta_patch: meta_value,
333 },
334 t if t == OTYPE_BLOCK => agentmux_common::ipc::Command::UpdateBlockMeta {
335 block_id: oref.oid.clone(),
336 meta_patch: meta_value,
337 },
338 t if t == OTYPE_WINDOW => agentmux_common::ipc::Command::UpdateWindowMeta {
339 window_id: oref.oid.clone(),
340 meta_patch: meta_value,
341 },
342 other => {
343 return match update_object_meta(store, &oref_str, &meta_update) {
353 Ok(()) => WebReturnType::success_empty(),
354 Err(e) => WebReturnType::error(format!(
355 "UpdateObjectMeta: unsupported otype {} via reducer; wcore fallback failed: {}",
356 other, e
357 )),
358 };
359 }
360 };
361 let events = dispatch_to_reducer(state, cmd).await;
362 if let Some(err_msg) = events.iter().find_map(|e| match e {
363 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
364 _ => None,
365 }) {
366 return WebReturnType::error(err_msg);
367 }
368 for ev in &events {
369 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
370 return WebReturnType::error(format!(
371 "UpdateObjectMeta: SQLite write failed: {}",
372 e
373 ));
374 }
375 }
376 publish_events(state, &events);
377 if oref.otype == OTYPE_BLOCK {
379 if let Ok(block) = store.must_get::<Block>(&oref.oid) {
380 return WebReturnType::success_with_updates(vec![WaveObjUpdate {
381 updatetype: "update".into(),
382 otype: OTYPE_BLOCK.to_string(),
383 oid: oref.oid.clone(),
384 obj: Some(wave_obj_to_value(&block)),
385 }]);
386 }
387 }
388 if oref.otype == OTYPE_TAB {
389 if let Ok(tab) = store.must_get::<Tab>(&oref.oid) {
390 return WebReturnType::success_with_updates(vec![WaveObjUpdate {
391 updatetype: "update".into(),
392 otype: OTYPE_TAB.to_string(),
393 oid: oref.oid.clone(),
394 obj: Some(wave_obj_to_value(&tab)),
395 }]);
396 }
397 }
398 WebReturnType::success_empty()
399 }
400 ("object", "UpdateTabName") => {
402 let tab_id: String = match service::get_arg(args, 0) {
403 Ok(v) => v,
404 Err(e) => return WebReturnType::error(e),
405 };
406 let name: String = match service::get_arg(args, 1) {
407 Ok(v) => v,
408 Err(e) => return WebReturnType::error(e),
409 };
410 let events = dispatch_to_reducer(
411 state,
412 agentmux_common::ipc::Command::RenameTab {
413 tab_id: tab_id.clone(),
414 name,
415 },
416 )
417 .await;
418 if let Some(err_msg) = events.iter().find_map(|e| match e {
419 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
420 _ => None,
421 }) {
422 return WebReturnType::error(err_msg);
423 }
424 for ev in &events {
425 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
426 return WebReturnType::error(format!(
427 "UpdateTabName: SQLite write failed: {}",
428 e
429 ));
430 }
431 }
432 publish_events(state, &events);
433 if let Ok(updated_tab) = store.must_get::<Tab>(&tab_id) {
434 let update = WaveObjUpdate {
435 updatetype: "update".into(),
436 otype: OTYPE_TAB.to_string(),
437 oid: tab_id.clone(),
438 obj: Some(wave_obj_to_value(&updated_tab)),
439 };
440 return WebReturnType::success_with_updates(vec![update]);
441 }
442 WebReturnType::success_empty()
443 }
444 ("client", "GetClientData") => match wcore::get_client(store) {
446 Ok(client) => {
447 WebReturnType::success(serde_json::to_value(&client).unwrap_or_default())
448 }
449 Err(e) => WebReturnType::error(e.to_string()),
450 },
451 ("client", "GetTab") => {
452 let tab_id: String = match service::get_arg(args, 0) {
453 Ok(v) => v,
454 Err(e) => return WebReturnType::error(e),
455 };
456 match store.must_get::<Tab>(&tab_id) {
457 Ok(tab) => WebReturnType::success(serde_json::to_value(&tab).unwrap_or_default()),
458 Err(e) => WebReturnType::error(e.to_string()),
459 }
460 }
461 ("client", "FocusWindow") => {
462 let window_id: String = match service::get_arg(args, 0) {
463 Ok(v) => v,
464 Err(e) => return WebReturnType::error(e),
465 };
466 match wcore::focus_window(store, &window_id) {
467 Ok(()) => WebReturnType::success_empty(),
468 Err(e) => WebReturnType::error(e.to_string()),
469 }
470 }
471 ("client", "AgreeTos") => match wcore::get_client(store) {
472 Ok(mut client) => {
473 client.tosagreed = chrono::Utc::now().timestamp_millis();
474 match store.update(&mut client) {
475 Ok(_) => WebReturnType::success_empty(),
476 Err(e) => WebReturnType::error(e.to_string()),
477 }
478 }
479 Err(e) => WebReturnType::error(e.to_string()),
480 },
481 ("client", "GetAllConnStatus") => {
482 WebReturnType::success_empty()
485 }
486 ("client", "TelemetryUpdate") => {
487 WebReturnType::success_empty()
489 }
490
491 ("window", "GetWindow") => {
493 let window_id: String = match service::get_arg(args, 0) {
494 Ok(v) => v,
495 Err(e) => return WebReturnType::error(e),
496 };
497 match store.must_get::<Window>(&window_id) {
498 Ok(win) => WebReturnType::success(serde_json::to_value(&win).unwrap_or_default()),
499 Err(e) => WebReturnType::error(e.to_string()),
500 }
501 }
502 ("window", "CreateWindow") => {
512 let requested_ws_id: String = service::get_arg(args, 1).unwrap_or_default();
513 let (ws_id, fresh_workspace_events): (String, Vec<agentmux_common::ipc::Event>) =
515 if requested_ws_id.is_empty() {
516 let ws_events = dispatch_to_reducer(
518 state,
519 agentmux_common::ipc::Command::CreateWorkspace {
520 name: String::new(),
521 },
522 )
523 .await;
524 if let Some(err_msg) = ws_events.iter().find_map(|e| match e {
525 agentmux_common::ipc::Event::Error { message, .. } => {
526 Some(message.clone())
527 }
528 _ => None,
529 }) {
530 return WebReturnType::error(err_msg);
531 }
532 for ev in &ws_events {
533 if let Err(e) =
534 crate::persist_subscriber::apply_event_to_wstore(ev, store)
535 {
536 return WebReturnType::error(format!(
537 "CreateWindow: SQLite write failed: {}",
538 e
539 ));
540 }
541 }
542 let new_ws_id = ws_events
543 .iter()
544 .find_map(|e| match e {
545 agentmux_common::ipc::Event::WorkspaceCreated {
546 workspace_id, ..
547 } => Some(workspace_id.clone()),
548 _ => None,
549 })
550 .unwrap_or_default();
551 let tab_events = dispatch_to_reducer(
553 state,
554 agentmux_common::ipc::Command::CreateTab {
555 workspace_id: new_ws_id.clone(),
556 name: String::new(),
557 },
558 )
559 .await;
560 if let Some(err_msg) = tab_events.iter().find_map(|e| match e {
561 agentmux_common::ipc::Event::Error { message, .. } => {
562 Some(message.clone())
563 }
564 _ => None,
565 }) {
566 let comp = dispatch_to_reducer(
568 state,
569 agentmux_common::ipc::Command::DeleteWorkspace {
570 workspace_id: new_ws_id.clone(),
571 force: false,
575 },
576 )
577 .await;
578 for ev in &comp {
579 let _ = crate::persist_subscriber::apply_event_to_wstore(ev, store);
580 }
581 publish_events(state, &comp);
582 return WebReturnType::error(err_msg);
583 }
584 for ev in &tab_events {
585 if let Err(e) =
586 crate::persist_subscriber::apply_event_to_wstore(ev, store)
587 {
588 return WebReturnType::error(format!(
589 "CreateWindow: SQLite write failed: {}",
590 e
591 ));
592 }
593 }
594 let mut combined = ws_events;
595 combined.extend(tab_events);
596 (new_ws_id, combined)
597 } else {
598 let exists_in_sqlite = match store.get::<Workspace>(&requested_ws_id) {
601 Ok(opt) => opt.is_some(),
602 Err(e) => {
603 return WebReturnType::error(format!(
604 "CreateWindow: workspace lookup failed: {}",
605 e
606 ));
607 }
608 };
609 if !exists_in_sqlite {
610 return WebReturnType::error(format!(
611 "CreateWindow: workspace not found: {}",
612 requested_ws_id
613 ));
614 }
615 (requested_ws_id, Vec::new())
616 };
617
618 let window_id = uuid::Uuid::new_v4().to_string();
620 let win_events = dispatch_to_reducer(
621 state,
622 agentmux_common::ipc::Command::CreateWindow {
623 window_id: window_id.clone(),
624 workspace_id: ws_id.clone(),
625 },
626 )
627 .await;
628 if let Some(err_msg) = win_events.iter().find_map(|e| match e {
629 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
630 _ => None,
631 }) {
632 if !fresh_workspace_events.is_empty() {
634 let comp = dispatch_to_reducer(
635 state,
636 agentmux_common::ipc::Command::DeleteWorkspace {
637 workspace_id: ws_id.clone(),
638 force: false,
640 },
641 )
642 .await;
643 for ev in &comp {
644 let _ = crate::persist_subscriber::apply_event_to_wstore(ev, store);
645 }
646 publish_events(state, &comp);
647 }
648 return WebReturnType::error(err_msg);
649 }
650 for ev in &win_events {
651 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
652 return WebReturnType::error(format!(
653 "CreateWindow: SQLite write failed: {}",
654 e
655 ));
656 }
657 }
658 if let Ok(mut win) = store.must_get::<Window>(&window_id) {
662 if !win.isnew {
663 win.isnew = true;
664 let _ = store.update(&mut win);
665 }
666 }
667 let mut all_events = fresh_workspace_events;
669 all_events.extend(win_events);
670 publish_events(state, &all_events);
671 match store.must_get::<Window>(&window_id) {
673 Ok(win) => WebReturnType::success(serde_json::to_value(&win).unwrap_or_default()),
674 Err(e) => WebReturnType::error(format!(
675 "CreateWindow: window read-back failed: {}",
676 e
677 )),
678 }
679 }
680 ("window", "CloseWindow") => {
692 let window_id: String = match service::get_arg(args, 0) {
693 Ok(v) => v,
694 Err(e) => return WebReturnType::error(e),
695 };
696 let ws_id: Option<String> = match store.get::<Window>(&window_id) {
699 Ok(Some(w)) => Some(w.workspaceid.clone()),
700 Ok(None) => None,
701 Err(e) => {
702 return WebReturnType::error(format!(
703 "CloseWindow: window lookup failed: {}",
704 e
705 ));
706 }
707 };
708 let close_events = dispatch_to_reducer(
710 state,
711 agentmux_common::ipc::Command::CloseWindowInternal {
712 window_id: window_id.clone(),
713 },
714 )
715 .await;
716 if let Some(err_msg) = close_events.iter().find_map(|e| match e {
720 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
721 _ => None,
722 }) {
723 return WebReturnType::error(err_msg);
724 }
725 for ev in &close_events {
726 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
727 return WebReturnType::error(format!(
728 "CloseWindow: SQLite write failed: {}",
729 e
730 ));
731 }
732 }
733 publish_events(state, &close_events);
734 if let Some(ws_id) = ws_id {
748 let any_other_window = {
749 let s = state.srv_state.lock().await;
750 s.windows.values().any(|w| w.workspace_id == ws_id)
751 };
752 if !any_other_window {
753 if let Err(e) =
754 crate::sagas::delete_workspace::run(state, ws_id.clone()).await
755 {
756 tracing::warn!(
757 workspace_id = %ws_id,
758 "CloseWindow: delete_workspace saga failed: {}",
759 e,
760 );
761 }
762 }
763 }
764 WebReturnType::success_empty()
768 }
769 ("window", "SwitchWorkspace") => {
774 let window_id: String = match service::get_arg(args, 0) {
775 Ok(v) => v,
776 Err(e) => return WebReturnType::error(e),
777 };
778 let ws_id: String = match service::get_arg(args, 1) {
779 Ok(v) => v,
780 Err(e) => return WebReturnType::error(e),
781 };
782 let events = dispatch_to_reducer(
783 state,
784 agentmux_common::ipc::Command::SwitchWorkspace {
785 window_id: window_id.clone(),
786 workspace_id: ws_id.clone(),
787 },
788 )
789 .await;
790 if let Some(err_msg) = events.iter().find_map(|e| match e {
791 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
792 _ => None,
793 }) {
794 return WebReturnType::error(err_msg);
795 }
796 for ev in &events {
797 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
798 return WebReturnType::error(format!(
799 "SwitchWorkspace: SQLite write failed: {}",
800 e
801 ));
802 }
803 }
804 publish_events(state, &events);
805 WebReturnType::success_empty()
806 }
807 ("window", "SetWindowPosAndSize") => {
808 let window_id: String = match service::get_arg(args, 0) {
809 Ok(v) => v,
810 Err(e) => return WebReturnType::error(e),
811 };
812 let pos: Option<Point> = service::get_optional_arg(args, 1).unwrap_or(None);
813 let size: Option<WinSize> = service::get_optional_arg(args, 2).unwrap_or(None);
814 match store.must_get::<Window>(&window_id) {
815 Ok(mut win) => {
816 if let Some(p) = pos {
817 win.pos = p;
818 }
819 if let Some(s) = size {
820 win.winsize = s;
821 }
822 match store.update(&mut win) {
823 Ok(_) => WebReturnType::success_empty(),
824 Err(e) => WebReturnType::error(e.to_string()),
825 }
826 }
827 Err(e) => WebReturnType::error(e.to_string()),
828 }
829 }
830
831 ("workspace", "CreateWorkspace") => {
854 let name: String = service::get_arg(args, 0).unwrap_or_default();
855 let events = dispatch_to_reducer(
856 state,
857 agentmux_common::ipc::Command::CreateWorkspace { name: name.clone() },
858 )
859 .await;
860 let workspace_id = events.iter().find_map(|e| match e {
861 agentmux_common::ipc::Event::WorkspaceCreated { workspace_id, .. } => {
862 Some(workspace_id.clone())
863 }
864 _ => None,
865 });
866 let mut apply_err: Option<String> = None;
872 for ev in &events {
873 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
874 apply_err = Some(e.to_string());
875 break;
876 }
877 }
878 if let Some(err) = apply_err {
879 if let Some(id) = workspace_id.as_ref() {
880 compensate_via_reducer(
881 state,
882 agentmux_common::ipc::Command::DeleteWorkspace {
883 workspace_id: id.clone(),
884 force: false,
888 },
889 store,
890 )
891 .await;
892 }
893 return WebReturnType::error(format!(
894 "CreateWorkspace: SQLite write failed: {}",
895 err
896 ));
897 }
898 publish_events(state, &events);
899 match workspace_id {
900 Some(id) => match wcore::get_workspace(store, &id) {
901 Ok(ws) => {
902 WebReturnType::success(serde_json::to_value(&ws).unwrap_or_default())
903 }
904 Err(e) => WebReturnType::error(format!(
905 "CreateWorkspace: post-write read failed: {}",
906 e
907 )),
908 },
909 None => WebReturnType::error(
910 "CreateWorkspace: reducer did not emit WorkspaceCreated".to_string(),
911 ),
912 }
913 }
914 ("workspace", "GetWorkspace") => {
915 let ws_id: String = match service::get_arg(args, 0) {
916 Ok(v) => v,
917 Err(e) => return WebReturnType::error(e),
918 };
919 match wcore::get_workspace(store, &ws_id) {
924 Ok(ws) => WebReturnType::success(serde_json::to_value(&ws).unwrap_or_default()),
925 Err(e) => WebReturnType::error(e.to_string()),
926 }
927 }
928 ("workspace", "DeleteWorkspace") => {
929 let ws_id: String = match service::get_arg(args, 0) {
930 Ok(v) => v,
931 Err(e) => return WebReturnType::error(e),
932 };
933 let exists_in_wstore = match wstore_workspace_exists(store, &ws_id) {
957 Ok(v) => v,
958 Err(e) => {
959 return WebReturnType::error(format!(
960 "DeleteWorkspace: SQLite read failed: {}",
961 e
962 ))
963 }
964 };
965 if !exists_in_wstore {
966 let exists_in_state = state
967 .srv_state
968 .lock()
969 .await
970 .workspaces
971 .contains_key(&ws_id);
972 if !exists_in_state {
973 return WebReturnType::error(format!(
974 "DeleteWorkspace: workspace not found: {}",
975 ws_id
976 ));
977 }
978 }
979 match crate::sagas::delete_workspace::run(state, ws_id.clone()).await {
980 Ok(_) => WebReturnType::success_empty(),
981 Err(e) => WebReturnType::error(format!("DeleteWorkspace failed: {}", e)),
982 }
983 }
984 ("workspace", "ListWorkspaces") => match wcore::list_workspaces(store) {
985 Ok(list) => WebReturnType::success(serde_json::to_value(&list).unwrap_or_default()),
986 Err(e) => WebReturnType::error(e.to_string()),
987 },
988 ("workspace", "CreateTab") => {
995 let ws_id: String = match service::get_arg(args, 0) {
996 Ok(v) => v,
997 Err(e) => return WebReturnType::error(e),
998 };
999 let tab_name: String = service::get_arg(args, 1).unwrap_or_default();
1000 let activate: bool = service::get_arg(args, 2).unwrap_or(true);
1001 let resolved_name = if tab_name.is_empty() {
1009 match store.get::<Workspace>(&ws_id) {
1010 Ok(Some(ws)) => {
1011 format!("tab{}", ws.tabids.len() + ws.pinnedtabids.len() + 1)
1012 }
1013 _ => "tab1".to_string(),
1014 }
1015 } else {
1016 tab_name.clone()
1017 };
1018 let events = dispatch_to_reducer(
1019 state,
1020 agentmux_common::ipc::Command::CreateTab {
1021 workspace_id: ws_id.clone(),
1022 name: resolved_name,
1023 },
1024 )
1025 .await;
1026 if let Some(err_msg) = events.iter().find_map(|e| match e {
1033 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1034 _ => None,
1035 }) {
1036 return WebReturnType::error(err_msg);
1037 }
1038 let tab_id = events.iter().find_map(|e| match e {
1039 agentmux_common::ipc::Event::TabCreated { tab_id, .. } => Some(tab_id.clone()),
1040 _ => None,
1041 });
1042 let mut apply_err: Option<String> = None;
1045 for ev in &events {
1046 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1047 apply_err = Some(e.to_string());
1048 break;
1049 }
1050 }
1051 if let Some(err) = apply_err {
1052 if let Some(tid) = tab_id.as_ref() {
1053 compensate_via_reducer(
1054 state,
1055 agentmux_common::ipc::Command::DeleteTab {
1056 workspace_id: ws_id.clone(),
1057 tab_id: tid.clone(),
1058 force: true,
1063 },
1064 store,
1065 )
1066 .await;
1067 }
1068 return WebReturnType::error(format!("CreateTab: SQLite write failed: {}", err));
1069 }
1070 publish_events(state, &events);
1071 let auto_activated = events
1074 .iter()
1075 .any(|e| matches!(e, agentmux_common::ipc::Event::ActiveTabChanged { .. }));
1076 if activate && !auto_activated {
1077 if let Some(tid) = tab_id.as_ref() {
1078 let active_events = dispatch_to_reducer(
1079 state,
1080 agentmux_common::ipc::Command::SetActiveTab {
1081 workspace_id: ws_id.clone(),
1082 tab_id: tid.clone(),
1083 },
1084 )
1085 .await;
1086 let mut active_err: Option<String> = None;
1087 for ev in &active_events {
1088 if let Err(e) =
1089 crate::persist_subscriber::apply_event_to_wstore(ev, store)
1090 {
1091 active_err = Some(e.to_string());
1092 break;
1093 }
1094 }
1095 if active_err.is_none() {
1096 publish_events(state, &active_events);
1097 }
1098 if let Some(err) = active_err {
1102 tracing::warn!(
1103 "CreateTab: post-create activate failed: {}",
1104 err
1105 );
1106 }
1107 }
1108 }
1109 match tab_id {
1110 Some(id) => {
1111 let mut updates = vec![];
1112 if let Ok(tab) = store.must_get::<Tab>(&id) {
1113 updates.push(WaveObjUpdate {
1114 updatetype: "update".into(),
1115 otype: OTYPE_TAB.to_string(),
1116 oid: id.clone(),
1117 obj: Some(wave_obj_to_value(&tab)),
1118 });
1119 }
1120 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1121 updates.push(WaveObjUpdate {
1122 updatetype: "update".into(),
1123 otype: OTYPE_WORKSPACE.to_string(),
1124 oid: ws_id.clone(),
1125 obj: Some(wave_obj_to_value(&ws)),
1126 });
1127 }
1128 WebReturnType::success_data_updates(
1129 serde_json::to_value(&id).unwrap_or_default(),
1130 updates,
1131 )
1132 }
1133 None => WebReturnType::error(
1134 "CreateTab: reducer did not emit TabCreated".to_string(),
1135 ),
1136 }
1137 }
1138 ("workspace", "SetActiveTab") => {
1143 let ws_id: String = match service::get_arg(args, 0) {
1144 Ok(v) => v,
1145 Err(e) => return WebReturnType::error(e),
1146 };
1147 let tab_id: String = match service::get_arg(args, 1) {
1148 Ok(v) => v,
1149 Err(e) => return WebReturnType::error(e),
1150 };
1151 let healed = wcore::heal_layout(store, &tab_id).unwrap_or(false);
1159 if healed {
1160 if let Ok(tab) = store.must_get::<Tab>(&tab_id) {
1161 if !tab.layoutstate.is_empty() {
1162 if let Ok(Some(layout)) = store.get::<LayoutState>(&tab.layoutstate) {
1163 let focus_events = dispatch_to_reducer(
1166 state,
1167 agentmux_common::ipc::Command::SetFocusedNode {
1168 tab_id: tab_id.clone(),
1169 node_id: layout.focusednodeid.clone(),
1170 },
1171 )
1172 .await;
1173 publish_events(state, &focus_events);
1174 let mag_events = dispatch_to_reducer(
1175 state,
1176 agentmux_common::ipc::Command::SetMagnifiedNode {
1177 tab_id: tab_id.clone(),
1178 node_id: layout.magnifiednodeid.clone(),
1179 },
1180 )
1181 .await;
1182 publish_events(state, &mag_events);
1183 }
1184 }
1185 }
1186 }
1187
1188 let events = dispatch_to_reducer(
1194 state,
1195 agentmux_common::ipc::Command::SetActiveTab {
1196 workspace_id: ws_id.clone(),
1197 tab_id: tab_id.clone(),
1198 },
1199 )
1200 .await;
1201 if let Some(err_msg) = events.iter().find_map(|e| match e {
1204 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1205 _ => None,
1206 }) {
1207 return WebReturnType::error(err_msg);
1208 }
1209 let mut apply_err: Option<String> = None;
1216 for ev in &events {
1217 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1218 apply_err = Some(e.to_string());
1219 break;
1220 }
1221 }
1222 if let Some(err) = apply_err {
1223 return WebReturnType::error(format!("SetActiveTab: SQLite write failed: {}", err));
1224 }
1225 publish_events(state, &events);
1226 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1227 let update = WaveObjUpdate {
1228 updatetype: "update".into(),
1229 otype: OTYPE_WORKSPACE.to_string(),
1230 oid: ws_id.clone(),
1231 obj: Some(wave_obj_to_value(&ws)),
1232 };
1233 WebReturnType::success_with_updates(vec![update])
1234 } else {
1235 WebReturnType::success_empty()
1236 }
1237 }
1238 ("workspace", "CloseTab") => {
1248 let ws_id: String = match service::get_arg(args, 0) {
1249 Ok(v) => v,
1250 Err(e) => return WebReturnType::error(e),
1251 };
1252 let tab_id: String = match service::get_arg(args, 1) {
1253 Ok(v) => v,
1254 Err(e) => return WebReturnType::error(e),
1255 };
1256 if let Err(reason) =
1257 crate::sagas::delete_tab::run(state, ws_id.clone(), tab_id.clone()).await
1258 {
1259 return WebReturnType::error(reason);
1260 }
1261 let rtn = CloseTabRtnType {
1262 closewindow: false,
1263 newactivetabid: String::new(),
1264 };
1265 let mut updates = vec![WaveObjUpdate {
1266 updatetype: "delete".into(),
1267 otype: OTYPE_TAB.to_string(),
1268 oid: tab_id.clone(),
1269 obj: None,
1270 }];
1271 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1272 updates.push(WaveObjUpdate {
1273 updatetype: "update".into(),
1274 otype: OTYPE_WORKSPACE.to_string(),
1275 oid: ws_id.clone(),
1276 obj: Some(wave_obj_to_value(&ws)),
1277 });
1278 }
1279 WebReturnType::success_data_updates(
1280 serde_json::to_value(&rtn).unwrap_or_default(),
1281 updates,
1282 )
1283 }
1284 ("workspace", "UpdateWorkspace") => {
1289 let ws_id: String = match service::get_arg(args, 0) {
1290 Ok(v) => v,
1291 Err(e) => return WebReturnType::error(e),
1292 };
1293 let name: Option<String> = service::get_optional_arg(args, 1).unwrap_or(None);
1294 let Some(name) = name else {
1295 return WebReturnType::success_empty();
1296 };
1297 let events = dispatch_to_reducer(
1298 state,
1299 agentmux_common::ipc::Command::RenameWorkspace {
1300 workspace_id: ws_id.clone(),
1301 name,
1302 },
1303 )
1304 .await;
1305 if let Some(err_msg) = events.iter().find_map(|e| match e {
1306 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1307 _ => None,
1308 }) {
1309 return WebReturnType::error(err_msg);
1310 }
1311 for ev in &events {
1312 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1313 return WebReturnType::error(format!(
1314 "UpdateWorkspace: SQLite write failed: {}",
1315 e
1316 ));
1317 }
1318 }
1319 publish_events(state, &events);
1320 WebReturnType::success_empty()
1321 }
1322 ("workspace", "UpdateTabIds") => {
1333 let ws_id: String = match service::get_arg(args, 0) {
1334 Ok(v) => v,
1335 Err(e) => return WebReturnType::error(e),
1336 };
1337 let tab_ids: Vec<String> = match service::get_arg(args, 1) {
1338 Ok(v) => v,
1339 Err(e) => return WebReturnType::error(e),
1340 };
1341 let events = dispatch_to_reducer(
1343 state,
1344 agentmux_common::ipc::Command::ReorderTabsBulk {
1345 workspace_id: ws_id.clone(),
1346 tab_ids,
1347 },
1348 )
1349 .await;
1350 if let Some(err_msg) = events.iter().find_map(|e| match e {
1351 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1352 _ => None,
1353 }) {
1354 return WebReturnType::error(err_msg);
1355 }
1356 for ev in &events {
1357 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1358 return WebReturnType::error(format!(
1359 "UpdateTabIds: SQLite write failed: {}",
1360 e
1361 ));
1362 }
1363 }
1364 publish_events(state, &events);
1365 if let Ok(updated_ws) = store.must_get::<Workspace>(&ws_id) {
1366 let update = WaveObjUpdate {
1367 updatetype: "update".into(),
1368 otype: OTYPE_WORKSPACE.to_string(),
1369 oid: ws_id.clone(),
1370 obj: Some(wave_obj_to_value(&updated_ws)),
1371 };
1372 return WebReturnType::success_with_updates(vec![update]);
1373 }
1374 WebReturnType::success_empty()
1375 }
1376 ("workspace", "MoveBlockToTab") => {
1382 let ws_id: String = match service::get_arg(args, 0) {
1383 Ok(v) => v,
1384 Err(e) => return WebReturnType::error(e),
1385 };
1386 let block_id: String = match service::get_arg(args, 1) {
1387 Ok(v) => v,
1388 Err(e) => return WebReturnType::error(e),
1389 };
1390 let source_tab_id: String = match service::get_arg(args, 2) {
1391 Ok(v) => v,
1392 Err(e) => return WebReturnType::error(e),
1393 };
1394 let dest_tab_id: String = match service::get_arg(args, 3) {
1395 Ok(v) => v,
1396 Err(e) => return WebReturnType::error(e),
1397 };
1398 let auto_close: bool = service::get_arg(args, 4).unwrap_or(true);
1399 tracing::info!(ws_id = %ws_id, block_id = %block_id, source_tab = %source_tab_id, dest_tab = %dest_tab_id, "[dnd:svc] MoveBlockToTab via reducer");
1400 if source_tab_id == dest_tab_id {
1409 return WebReturnType::success_empty();
1410 }
1411 let events = dispatch_to_reducer(
1417 state,
1418 agentmux_common::ipc::Command::MoveBlock {
1419 block_id: block_id.clone(),
1420 src_tab_id: source_tab_id.clone(),
1421 dst_tab_id: dest_tab_id.clone(),
1422 dst_index: u32::MAX,
1423 },
1424 )
1425 .await;
1426 if let Some(err_msg) = events.iter().find_map(|e| match e {
1427 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1428 _ => None,
1429 }) {
1430 return WebReturnType::error(err_msg);
1431 }
1432 for ev in &events {
1433 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1434 return WebReturnType::error(format!(
1435 "MoveBlockToTab: SQLite write failed: {}",
1436 e
1437 ));
1438 }
1439 }
1440 publish_events(state, &events);
1441 if auto_close {
1443 let should_close = match store.must_get::<Tab>(&source_tab_id) {
1444 Ok(t) => t.blockids.is_empty(),
1445 Err(_) => false,
1446 };
1447 if should_close {
1448 let total_tabs = match store.must_get::<Workspace>(&ws_id) {
1449 Ok(ws) => ws.tabids.len() + ws.pinnedtabids.len(),
1450 Err(_) => 0,
1451 };
1452 if total_tabs > 1 {
1453 let close_events = dispatch_to_reducer(
1454 state,
1455 agentmux_common::ipc::Command::DeleteTab {
1456 workspace_id: ws_id.clone(),
1457 tab_id: source_tab_id.clone(),
1458 force: false,
1463 },
1464 )
1465 .await;
1466 for ev in &close_events {
1467 let _ = crate::persist_subscriber::apply_event_to_wstore(ev, store);
1468 }
1469 publish_events(state, &close_events);
1470 }
1471 }
1472 }
1473 let mut updates = vec![];
1474 if let Ok(src) = store.must_get::<Tab>(&source_tab_id) {
1475 updates.push(WaveObjUpdate {
1476 updatetype: "update".into(),
1477 otype: OTYPE_TAB.to_string(),
1478 oid: source_tab_id.clone(),
1479 obj: Some(wave_obj_to_value(&src)),
1480 });
1481 }
1482 if let Ok(dst) = store.must_get::<Tab>(&dest_tab_id) {
1483 updates.push(WaveObjUpdate {
1484 updatetype: "update".into(),
1485 otype: OTYPE_TAB.to_string(),
1486 oid: dest_tab_id.clone(),
1487 obj: Some(wave_obj_to_value(&dst)),
1488 });
1489 }
1490 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1491 updates.push(WaveObjUpdate {
1492 updatetype: "update".into(),
1493 otype: OTYPE_WORKSPACE.to_string(),
1494 oid: ws_id.clone(),
1495 obj: Some(wave_obj_to_value(&ws)),
1496 });
1497 }
1498 WebReturnType::success_with_updates(updates)
1499 }
1500 ("workspace", "PromoteBlockToTab") => {
1505 let ws_id: String = match service::get_arg(args, 0) {
1506 Ok(v) => v,
1507 Err(e) => return WebReturnType::error(e),
1508 };
1509 let block_id: String = match service::get_arg(args, 1) {
1510 Ok(v) => v,
1511 Err(e) => return WebReturnType::error(e),
1512 };
1513 let source_tab_id: String = match service::get_arg(args, 2) {
1514 Ok(v) => v,
1515 Err(e) => return WebReturnType::error(e),
1516 };
1517 let auto_close: bool = service::get_arg(args, 3).unwrap_or(true);
1518 tracing::info!(ws_id = %ws_id, block_id = %block_id, source_tab = %source_tab_id, "[dnd:svc] PromoteBlockToTab via saga");
1519 let saga_result = crate::sagas::promote_block_to_tab::run(
1520 state,
1521 block_id.clone(),
1522 source_tab_id.clone(),
1523 ws_id.clone(),
1524 )
1525 .await;
1526 let new_tab_oid = match saga_result {
1527 Ok(v) => v
1528 .get("new_tab_id")
1529 .and_then(|v| v.as_str())
1530 .unwrap_or_default()
1531 .to_string(),
1532 Err(reason) => return WebReturnType::error(reason),
1533 };
1534
1535 if let Err(e) = setup_torn_off_block_layout(store, &new_tab_oid, &block_id) {
1539 tracing::warn!(new_tab = %new_tab_oid, "PromoteBlockToTab: layout setup failed: {}", e);
1540 }
1541 if let Err(e) = queue_source_layout_delete(store, &source_tab_id, &block_id) {
1543 tracing::warn!(source_tab = %source_tab_id, "PromoteBlockToTab: source layout delete-action enqueue failed: {}", e);
1544 }
1545 let active_events = dispatch_to_reducer(
1547 state,
1548 agentmux_common::ipc::Command::SetActiveTab {
1549 workspace_id: ws_id.clone(),
1550 tab_id: new_tab_oid.clone(),
1551 },
1552 )
1553 .await;
1554 for ev in &active_events {
1555 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1556 tracing::warn!("PromoteBlockToTab: SetActiveTab apply failed: {}", e);
1557 }
1558 }
1559 publish_events(state, &active_events);
1560
1561 if auto_close {
1563 let should_close = match store.must_get::<Tab>(&source_tab_id) {
1564 Ok(t) => t.blockids.is_empty(),
1565 Err(_) => false,
1566 };
1567 if should_close {
1568 let total_tabs = match store.must_get::<Workspace>(&ws_id) {
1569 Ok(ws) => ws.tabids.len() + ws.pinnedtabids.len(),
1570 Err(_) => 0,
1571 };
1572 if total_tabs > 1 {
1573 let close_events = dispatch_to_reducer(
1574 state,
1575 agentmux_common::ipc::Command::DeleteTab {
1576 workspace_id: ws_id.clone(),
1577 tab_id: source_tab_id.clone(),
1578 force: false,
1583 },
1584 )
1585 .await;
1586 for ev in &close_events {
1587 let _ = crate::persist_subscriber::apply_event_to_wstore(ev, store);
1588 }
1589 publish_events(state, &close_events);
1590 }
1591 }
1592 }
1593
1594 let mut updates = vec![];
1595 if let Ok(new_tab) = store.must_get::<Tab>(&new_tab_oid) {
1596 updates.push(WaveObjUpdate {
1597 updatetype: "update".into(),
1598 otype: OTYPE_TAB.to_string(),
1599 oid: new_tab_oid.clone(),
1600 obj: Some(wave_obj_to_value(&new_tab)),
1601 });
1602 }
1603 if let Ok(src) = store.must_get::<Tab>(&source_tab_id) {
1604 updates.push(WaveObjUpdate {
1605 updatetype: "update".into(),
1606 otype: OTYPE_TAB.to_string(),
1607 oid: source_tab_id.clone(),
1608 obj: Some(wave_obj_to_value(&src)),
1609 });
1610 }
1611 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1612 updates.push(WaveObjUpdate {
1613 updatetype: "update".into(),
1614 otype: OTYPE_WORKSPACE.to_string(),
1615 oid: ws_id.clone(),
1616 obj: Some(wave_obj_to_value(&ws)),
1617 });
1618 }
1619 WebReturnType::success_data_updates(
1620 serde_json::to_value(&new_tab_oid).unwrap_or_default(),
1621 updates,
1622 )
1623 }
1624 ("workspace", "ReorderTab") => {
1631 let ws_id: String = match service::get_arg(args, 0) {
1632 Ok(v) => v,
1633 Err(e) => return WebReturnType::error(e),
1634 };
1635 let tab_id: String = match service::get_arg(args, 1) {
1636 Ok(v) => v,
1637 Err(e) => return WebReturnType::error(e),
1638 };
1639 let new_index: usize = match service::get_arg(args, 2) {
1640 Ok(v) => v,
1641 Err(e) => return WebReturnType::error(e),
1642 };
1643 tracing::info!(ws_id = %ws_id, tab_id = %tab_id, new_index = %new_index, "[dnd:svc] ReorderTab");
1644 let new_index_u32 = u32::try_from(new_index).unwrap_or(u32::MAX);
1650 let events = dispatch_to_reducer(
1651 state,
1652 agentmux_common::ipc::Command::ReorderTab {
1653 workspace_id: ws_id.clone(),
1654 tab_id: tab_id.clone(),
1655 new_index: new_index_u32,
1656 },
1657 )
1658 .await;
1659 if let Some(err_msg) = events.iter().find_map(|e| match e {
1661 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1662 _ => None,
1663 }) {
1664 return WebReturnType::error(err_msg);
1665 }
1666 let mut apply_err: Option<String> = None;
1667 for ev in &events {
1668 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1669 apply_err = Some(e.to_string());
1670 break;
1671 }
1672 }
1673 if let Some(err) = apply_err {
1674 return WebReturnType::error(format!("ReorderTab: SQLite write failed: {}", err));
1675 }
1676 publish_events(state, &events);
1677 if let Ok(ws) = store.must_get::<Workspace>(&ws_id) {
1678 let update = WaveObjUpdate {
1679 updatetype: "update".into(),
1680 otype: OTYPE_WORKSPACE.to_string(),
1681 oid: ws_id.clone(),
1682 obj: Some(wave_obj_to_value(&ws)),
1683 };
1684 WebReturnType::success_with_updates(vec![update])
1685 } else {
1686 WebReturnType::success_empty()
1687 }
1688 }
1689 ("workspace", "MoveTabToWorkspace") => {
1696 let tab_id: String = match service::get_arg(args, 0) {
1697 Ok(v) => v,
1698 Err(e) => return WebReturnType::error(e),
1699 };
1700 let source_ws_id: String = match service::get_arg(args, 1) {
1701 Ok(v) => v,
1702 Err(e) => return WebReturnType::error(e),
1703 };
1704 let dest_ws_id: String = match service::get_arg(args, 2) {
1705 Ok(v) => v,
1706 Err(e) => return WebReturnType::error(e),
1707 };
1708 let insert_index: Option<u32> = service::get_arg::<usize>(args, 3)
1709 .ok()
1710 .map(|v| v.try_into().unwrap_or(u32::MAX));
1711 tracing::info!(tab_id = %tab_id, source_ws = %source_ws_id, dest_ws = %dest_ws_id, insert_index = ?insert_index, "[dnd:svc] MoveTabToWorkspace via reducer");
1712 if source_ws_id == dest_ws_id {
1718 return WebReturnType::success_empty();
1719 }
1720 match store.get::<Workspace>(&source_ws_id) {
1731 Ok(Some(src_ws)) => {
1732 let total_tabs = src_ws.tabids.len() + src_ws.pinnedtabids.len();
1733 if total_tabs <= 1 {
1734 return WebReturnType::error(
1735 "cannot move last tab out of workspace".to_string(),
1736 );
1737 }
1738 }
1739 Ok(None) => {
1740 return WebReturnType::error(format!(
1741 "MoveTabToWorkspace: source workspace not found: {}",
1742 source_ws_id
1743 ));
1744 }
1745 Err(e) => {
1746 return WebReturnType::error(format!(
1747 "MoveTabToWorkspace: workspace read failed: {}",
1748 e
1749 ));
1750 }
1751 }
1752 let dst_index = insert_index.unwrap_or(u32::MAX);
1753 let events = dispatch_to_reducer(
1754 state,
1755 agentmux_common::ipc::Command::MoveTab {
1756 tab_id: tab_id.clone(),
1757 src_workspace_id: source_ws_id.clone(),
1758 dst_workspace_id: dest_ws_id.clone(),
1759 dst_index,
1760 },
1761 )
1762 .await;
1763 if let Some(err_msg) = events.iter().find_map(|e| match e {
1764 agentmux_common::ipc::Event::Error { message, .. } => Some(message.clone()),
1765 _ => None,
1766 }) {
1767 return WebReturnType::error(err_msg);
1768 }
1769 for ev in &events {
1770 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
1771 return WebReturnType::error(format!(
1772 "MoveTabToWorkspace: SQLite write failed: {}",
1773 e
1774 ));
1775 }
1776 }
1777 publish_events(state, &events);
1778 let mut updates = Vec::new();
1779 if let Ok(src_ws) = store.must_get::<Workspace>(&source_ws_id) {
1780 updates.push(WaveObjUpdate {
1781 updatetype: "update".into(),
1782 otype: OTYPE_WORKSPACE.to_string(),
1783 oid: source_ws_id.clone(),
1784 obj: Some(wave_obj_to_value(&src_ws)),
1785 });
1786 }
1787 if let Ok(dst_ws) = store.must_get::<Workspace>(&dest_ws_id) {
1788 updates.push(WaveObjUpdate {
1789 updatetype: "update".into(),
1790 otype: OTYPE_WORKSPACE.to_string(),
1791 oid: dest_ws_id.clone(),
1792 obj: Some(wave_obj_to_value(&dst_ws)),
1793 });
1794 }
1795 WebReturnType::success_with_updates(updates)
1796 }
1797 ("workspace", "RestoreTornOffTab") => {
1803 let tab_id: String = match service::get_arg(args, 0) {
1804 Ok(v) => v,
1805 Err(e) => return WebReturnType::error(e),
1806 };
1807 let source_ws_id: String = match service::get_arg(args, 1) {
1808 Ok(v) => v,
1809 Err(e) => return WebReturnType::error(e),
1810 };
1811 let dest_ws_id: String = match service::get_arg(args, 2) {
1812 Ok(v) => v,
1813 Err(e) => return WebReturnType::error(e),
1814 };
1815 let insert_index: Option<u32> = service::get_arg::<usize>(args, 3)
1816 .ok()
1817 .map(|v| v.try_into().unwrap_or(u32::MAX));
1818 tracing::info!(tab_id = %tab_id, source_ws = %source_ws_id, dest_ws = %dest_ws_id, insert_index = ?insert_index, "[dnd:svc] RestoreTornOffTab via saga");
1819 let saga_result = crate::sagas::restore_torn_off_tab::run(
1820 state,
1821 tab_id,
1822 source_ws_id.clone(),
1823 dest_ws_id.clone(),
1824 insert_index,
1825 )
1826 .await;
1827 match saga_result {
1828 Ok(_) => {
1829 let mut updates = Vec::new();
1830 match store.get::<Workspace>(&source_ws_id) {
1831 Ok(Some(src_ws)) => {
1832 updates.push(WaveObjUpdate {
1833 updatetype: "update".into(),
1834 otype: OTYPE_WORKSPACE.to_string(),
1835 oid: source_ws_id.clone(),
1836 obj: Some(wave_obj_to_value(&src_ws)),
1837 });
1838 }
1839 Ok(None) => {
1840 updates.push(WaveObjUpdate {
1841 updatetype: "delete".into(),
1842 otype: OTYPE_WORKSPACE.to_string(),
1843 oid: source_ws_id.clone(),
1844 obj: None,
1845 });
1846 }
1847 Err(_) => {}
1848 }
1849 if let Ok(dst_ws) = store.must_get::<Workspace>(&dest_ws_id) {
1850 updates.push(WaveObjUpdate {
1851 updatetype: "update".into(),
1852 otype: OTYPE_WORKSPACE.to_string(),
1853 oid: dest_ws_id.clone(),
1854 obj: Some(wave_obj_to_value(&dst_ws)),
1855 });
1856 }
1857 WebReturnType::success_with_updates(updates)
1858 }
1859 Err(reason) => WebReturnType::error(reason),
1860 }
1861 }
1862 ("workspace", "TearOffBlock") => {
1874 let block_id: String = match service::get_arg(args, 0) {
1875 Ok(v) => v,
1876 Err(e) => return WebReturnType::error(e),
1877 };
1878 let source_tab_id: String = match service::get_arg(args, 1) {
1879 Ok(v) => v,
1880 Err(e) => return WebReturnType::error(e),
1881 };
1882 let source_ws_id: String = match service::get_arg(args, 2) {
1883 Ok(v) => v,
1884 Err(e) => return WebReturnType::error(e),
1885 };
1886 let auto_close: bool = service::get_arg(args, 3).unwrap_or(true);
1887 tracing::info!(block_id = %block_id, source_tab = %source_tab_id, source_ws = %source_ws_id, "[dnd:svc] TearOffBlock via saga");
1888 let saga_result = crate::sagas::tear_off_block::run(
1889 state,
1890 block_id.clone(),
1891 source_tab_id.clone(),
1892 source_ws_id.clone(),
1893 )
1894 .await;
1895 let (new_ws_oid, new_tab_oid) = match saga_result {
1896 Ok(value) => {
1897 let new_ws_oid = value
1898 .get("new_workspace_id")
1899 .and_then(|v| v.as_str())
1900 .unwrap_or_default()
1901 .to_string();
1902 let new_tab_oid = value
1903 .get("new_tab_id")
1904 .and_then(|v| v.as_str())
1905 .unwrap_or_default()
1906 .to_string();
1907 (new_ws_oid, new_tab_oid)
1908 }
1909 Err(reason) => return WebReturnType::error(reason),
1910 };
1911
1912 if let Err(e) = setup_torn_off_block_layout(store, &new_tab_oid, &block_id) {
1917 tracing::warn!(new_tab = %new_tab_oid, "TearOffBlock: layout setup failed: {} (block in tab but layout malformed)", e);
1918 }
1919 if let Err(e) = queue_source_layout_delete(store, &source_tab_id, &block_id) {
1922 tracing::warn!(source_tab = %source_tab_id, "TearOffBlock: source layout delete-action enqueue failed: {}", e);
1923 }
1924
1925 if auto_close {
1930 let should_close = match store.must_get::<Tab>(&source_tab_id) {
1931 Ok(t) => t.blockids.is_empty(),
1932 Err(_) => false,
1933 };
1934 if should_close {
1935 let total_tabs = match store.must_get::<Workspace>(&source_ws_id) {
1936 Ok(ws) => ws.tabids.len() + ws.pinnedtabids.len(),
1937 Err(_) => 0,
1938 };
1939 if total_tabs > 1 {
1940 tracing::info!(source_tab = %source_tab_id, "[dnd:svc] auto-closing empty source tab after TearOffBlock");
1941 let close_events = dispatch_to_reducer(
1942 state,
1943 agentmux_common::ipc::Command::DeleteTab {
1944 workspace_id: source_ws_id.clone(),
1945 tab_id: source_tab_id.clone(),
1946 force: false,
1949 },
1950 )
1951 .await;
1952 for ev in &close_events {
1953 let _ = crate::persist_subscriber::apply_event_to_wstore(ev, store);
1954 }
1955 publish_events(state, &close_events);
1956 }
1957 }
1958 }
1959
1960 let mut updates = Vec::new();
1961 if let Ok(src_tab) = store.must_get::<Tab>(&source_tab_id) {
1962 updates.push(WaveObjUpdate {
1963 updatetype: "update".into(),
1964 otype: OTYPE_TAB.to_string(),
1965 oid: source_tab_id.clone(),
1966 obj: Some(wave_obj_to_value(&src_tab)),
1967 });
1968 }
1969 if let Ok(src_ws) = store.must_get::<Workspace>(&source_ws_id) {
1970 updates.push(WaveObjUpdate {
1971 updatetype: "update".into(),
1972 otype: OTYPE_WORKSPACE.to_string(),
1973 oid: source_ws_id.clone(),
1974 obj: Some(wave_obj_to_value(&src_ws)),
1975 });
1976 }
1977 if let Ok(new_ws) = store.must_get::<Workspace>(&new_ws_oid) {
1978 updates.push(WaveObjUpdate {
1979 updatetype: "update".into(),
1980 otype: OTYPE_WORKSPACE.to_string(),
1981 oid: new_ws_oid.clone(),
1982 obj: Some(wave_obj_to_value(&new_ws)),
1983 });
1984 }
1985 WebReturnType::success_data_updates(
1986 serde_json::to_value(&new_ws_oid).unwrap_or_default(),
1987 updates,
1988 )
1989 }
1990 ("workspace", "TearOffTab") => {
1996 let tab_id: String = match service::get_arg(args, 0) {
1997 Ok(v) => v,
1998 Err(e) => return WebReturnType::error(e),
1999 };
2000 let source_ws_id: String = match service::get_arg(args, 1) {
2001 Ok(v) => v,
2002 Err(e) => return WebReturnType::error(e),
2003 };
2004 tracing::info!(tab_id = %tab_id, source_ws = %source_ws_id, "[dnd:svc] TearOffTab via saga");
2005 match crate::sagas::tear_off_tab::run(state, tab_id, source_ws_id.clone()).await {
2006 Ok(saga_result) => {
2007 let new_ws_oid = saga_result
2008 .get("new_workspace_id")
2009 .and_then(|v| v.as_str())
2010 .unwrap_or_default()
2011 .to_string();
2012 let mut updates = Vec::new();
2013 if let Ok(src_ws) = store.must_get::<Workspace>(&source_ws_id) {
2014 updates.push(WaveObjUpdate {
2015 updatetype: "update".into(),
2016 otype: OTYPE_WORKSPACE.to_string(),
2017 oid: source_ws_id.clone(),
2018 obj: Some(wave_obj_to_value(&src_ws)),
2019 });
2020 }
2021 if let Ok(new_ws) = store.must_get::<Workspace>(&new_ws_oid) {
2022 updates.push(WaveObjUpdate {
2023 updatetype: "update".into(),
2024 otype: OTYPE_WORKSPACE.to_string(),
2025 oid: new_ws_oid.clone(),
2026 obj: Some(wave_obj_to_value(&new_ws)),
2027 });
2028 }
2029 WebReturnType::success_data_updates(
2030 serde_json::to_value(&new_ws_oid).unwrap_or_default(),
2031 updates,
2032 )
2033 }
2034 Err(reason) => WebReturnType::error(reason),
2035 }
2036 }
2037 ("userinput", "SendUserInputResponse") => {
2039 WebReturnType::success_empty()
2041 }
2042
2043 ("block", "GetControllerStatus") => {
2045 let block_id: String = match service::get_arg(args, 0) {
2046 Ok(v) => v,
2047 Err(e) => return WebReturnType::error(e),
2048 };
2049 match crate::backend::blockcontroller::get_block_controller_status(&block_id) {
2050 Some(status) => WebReturnType::success(
2051 serde_json::to_value(&status).unwrap_or(serde_json::Value::Null),
2052 ),
2053 None => {
2054 let default_status = crate::backend::blockcontroller::BlockControllerRuntimeStatus {
2055 blockid: block_id,
2056 ..Default::default()
2057 };
2058 WebReturnType::success(
2059 serde_json::to_value(&default_status).unwrap_or(serde_json::Value::Null),
2060 )
2061 }
2062 }
2063 }
2064 ("block", "SendCommand") | ("block", "SaveTerminalState") => {
2065 WebReturnType::success_empty()
2066 }
2067
2068 ("subagent", "ListActive") => {
2070 let subagents = state.subagent_watcher.list_active();
2071 WebReturnType::success(serde_json::to_value(&subagents).unwrap_or_default())
2072 }
2073 ("subagent", "GetHistory") => {
2074 let agent_id: String = match service::get_arg(args, 0) {
2075 Ok(v) => v,
2076 Err(e) => return WebReturnType::error(e),
2077 };
2078 let limit: usize = service::get_arg(args, 1).unwrap_or(100);
2079 let history = state.subagent_watcher.get_history(&agent_id, limit);
2080 WebReturnType::success(serde_json::to_value(&history).unwrap_or_default())
2081 }
2082 ("history", "List") => {
2084 let provider: Option<String> = service::get_optional_arg(args, 0).unwrap_or(None);
2085 let project: Option<String> = service::get_optional_arg(args, 1).unwrap_or(None);
2086 let offset: usize = service::get_arg(args, 2).unwrap_or(0);
2087 let limit: usize = service::get_arg(args, 3).unwrap_or(50);
2088 let sort_by: String = service::get_arg(args, 4).unwrap_or_else(|_| "modified_at".to_string());
2089 let sort_dir: String = service::get_arg(args, 5).unwrap_or_else(|_| "desc".to_string());
2090 let result = state.history_service.list(
2091 provider.as_deref(),
2092 project.as_deref(),
2093 offset,
2094 limit,
2095 &sort_by,
2096 &sort_dir,
2097 );
2098 WebReturnType::success(result)
2099 }
2100 ("history", "Get") => {
2101 let session_id: String = match service::get_arg(args, 0) {
2102 Ok(v) => v,
2103 Err(e) => return WebReturnType::error(e),
2104 };
2105 let result = state.history_service.get(&session_id);
2106 WebReturnType::success(result)
2107 }
2108 ("history", "Refresh") => {
2109 let result = state.history_service.refresh();
2110 WebReturnType::success(result)
2111 }
2112
2113 ("subagent", "WatchAgent") => {
2114 let agent_id: String = match service::get_arg(args, 0) {
2115 Ok(v) => v,
2116 Err(e) => return WebReturnType::error(e),
2117 };
2118 let config_dir: String = match service::get_arg(args, 1) {
2119 Ok(v) => v,
2120 Err(e) => return WebReturnType::error(e),
2121 };
2122 state.subagent_watcher.watch_agent(&agent_id, std::path::PathBuf::from(config_dir));
2123 WebReturnType::success_empty()
2124 }
2125
2126 _ => WebReturnType::error(format!(
2127 "unknown service method: {}.{}",
2128 call.service, call.method
2129 )),
2130 }
2131}
2132
2133fn find_tab_for_layout(store: &WaveStore, layout_oid: &str) -> Option<String> {
2142 let tabs = store.get_all::<Tab>().ok()?;
2143 tabs.into_iter()
2144 .find(|t| t.layoutstate == layout_oid)
2145 .map(|t| t.oid)
2146}
2147
2148fn get_object_by_oref(store: &WaveStore, oref_str: &str) -> Result<serde_json::Value, String> {
2150 let oref = crate::backend::ORef::parse(oref_str).map_err(|e| e.to_string())?;
2151
2152 match oref.otype.as_str() {
2154 OTYPE_CLIENT | OTYPE_WINDOW | OTYPE_WORKSPACE | OTYPE_TAB | OTYPE_LAYOUT | OTYPE_BLOCK => {}
2155 _ => return Err(format!("unknown otype: {}", oref.otype)),
2156 }
2157
2158 store
2162 .get_raw(&oref.otype, &oref.oid)
2163 .map_err(|e| e.to_string())?
2164 .ok_or_else(|| format!("not found: {}", oref_str))
2165}
2166
2167fn update_object(
2172 store: &WaveStore,
2173 mut value: serde_json::Value,
2174) -> Result<(String, String, serde_json::Value), String> {
2175 let otype = value
2176 .get("otype")
2177 .and_then(|v| v.as_str())
2178 .ok_or_else(|| "UpdateObject: missing otype field".to_string())?
2179 .to_string();
2180 let oid = value
2181 .get("oid")
2182 .and_then(|v| v.as_str())
2183 .ok_or_else(|| "UpdateObject: missing oid field".to_string())?
2184 .to_string();
2185
2186 match otype.as_str() {
2188 OTYPE_CLIENT | OTYPE_WINDOW | OTYPE_WORKSPACE | OTYPE_TAB | OTYPE_LAYOUT | OTYPE_BLOCK => {}
2189 _ => return Err(format!("UpdateObject: unsupported otype: {}", otype)),
2190 }
2191
2192 let new_version = store
2196 .update_raw(&otype, &oid, &value)
2197 .map_err(|e| format!("UpdateObject: {}", e))?;
2198
2199 if let Some(obj) = value.as_object_mut() {
2201 obj.insert("version".to_string(), serde_json::json!(new_version));
2202 }
2203
2204 Ok((otype, oid, value))
2205}
2206
2207pub(crate) fn update_object_meta(
2209 store: &WaveStore,
2210 oref_str: &str,
2211 meta_update: &MetaMapType,
2212) -> Result<(), String> {
2213 let oref = crate::backend::ORef::parse(oref_str).map_err(|e| e.to_string())?;
2214 match oref.otype.as_str() {
2215 OTYPE_CLIENT => {
2216 let mut obj = store.must_get::<Client>(&oref.oid).map_err(|e| e.to_string())?;
2217 obj.meta = merge_meta(&obj.meta, meta_update, true);
2218 store.update(&mut obj).map_err(|e| e.to_string())?;
2219 }
2220 OTYPE_WINDOW => {
2221 let mut obj = store.must_get::<Window>(&oref.oid).map_err(|e| e.to_string())?;
2222 obj.meta = merge_meta(&obj.meta, meta_update, true);
2223 store.update(&mut obj).map_err(|e| e.to_string())?;
2224 }
2225 OTYPE_WORKSPACE => {
2226 let mut obj = store
2227 .must_get::<Workspace>(&oref.oid)
2228 .map_err(|e| e.to_string())?;
2229 obj.meta = merge_meta(&obj.meta, meta_update, true);
2230 store.update(&mut obj).map_err(|e| e.to_string())?;
2231 }
2232 OTYPE_TAB => {
2233 let mut obj = store.must_get::<Tab>(&oref.oid).map_err(|e| e.to_string())?;
2234 obj.meta = merge_meta(&obj.meta, meta_update, true);
2235 store.update(&mut obj).map_err(|e| e.to_string())?;
2236 }
2237 OTYPE_BLOCK => {
2238 let mut obj = store.must_get::<Block>(&oref.oid).map_err(|e| e.to_string())?;
2239 obj.meta = merge_meta(&obj.meta, meta_update, true);
2240 store.update(&mut obj).map_err(|e| e.to_string())?;
2241 }
2242 _ => return Err(format!("cannot update meta for otype: {}", oref.otype)),
2243 }
2244 Ok(())
2245}
2246
2247
2248pub(crate) async fn dispatch_to_reducer(
2255 state: &AppState,
2256 cmd: agentmux_common::ipc::Command,
2257) -> Vec<agentmux_common::ipc::Event> {
2258 let now = chrono::Utc::now().to_rfc3339();
2259 let mut s = state.srv_state.lock().await;
2260 let ctx = crate::reducer::Ctx {
2261 now_rfc3339: now,
2262 conn_id: 0,
2264 registered_pid: None,
2265 };
2266 crate::reducer::update(&mut s, cmd, &ctx)
2267}
2268
2269pub(crate) fn publish_events(state: &AppState, events: &[agentmux_common::ipc::Event]) {
2272 for event in events {
2273 let _ = state.srv_events_tx.send(event.clone());
2274 }
2275}
2276
2277async fn compensate_via_reducer(
2285 state: &AppState,
2286 cmd: agentmux_common::ipc::Command,
2287 store: &WaveStore,
2288) {
2289 let events = dispatch_to_reducer(state, cmd).await;
2290 for ev in &events {
2291 if let Err(e) = crate::persist_subscriber::apply_event_to_wstore(ev, store) {
2292 tracing::warn!(
2293 "compensation: SQLite cleanup failed for event {:?}: {}",
2294 std::mem::discriminant(ev),
2295 e
2296 );
2297 }
2298 }
2299}
2300
2301fn setup_torn_off_block_layout(
2313 store: &WaveStore,
2314 new_tab_id: &str,
2315 block_id: &str,
2316) -> Result<(), Box<dyn std::error::Error>> {
2317 let new_tab = store.must_get::<Tab>(new_tab_id)?;
2318 let mut layout = store.must_get::<LayoutState>(&new_tab.layoutstate)?;
2319 let node_id = uuid::Uuid::new_v4().to_string();
2320 layout.rootnode = Some(LayoutNode {
2322 id: node_id.clone(),
2323 flex_direction: FlexDirection::Row,
2324 size: 1.0,
2325 children: Vec::new(),
2326 data: Some(LayoutNodeData {
2327 block_id: block_id.to_string(),
2328 ..Default::default()
2329 }),
2330 ..Default::default()
2331 });
2332 layout.leaforder = Some(vec![LeafOrderEntry {
2333 nodeid: node_id,
2334 blockid: block_id.to_string(),
2335 }]);
2336 store.update(&mut layout)?;
2337 Ok(())
2338}
2339
2340fn queue_source_layout_delete(
2346 store: &WaveStore,
2347 source_tab_id: &str,
2348 block_id: &str,
2349) -> Result<(), Box<dyn std::error::Error>> {
2350 let source_tab = store.must_get::<Tab>(source_tab_id)?;
2351 let mut source_layout = store.must_get::<LayoutState>(&source_tab.layoutstate)?;
2352 let mut actions = source_layout.pendingbackendactions.take().unwrap_or_default();
2353 actions.push(LayoutActionData {
2354 actiontype: "delete".to_string(),
2355 actionid: uuid::Uuid::new_v4().to_string(),
2356 blockid: block_id.to_string(),
2357 nodesize: None,
2358 indexarr: None,
2359 focused: false,
2360 magnified: false,
2361 ephemeral: false,
2362 targetblockid: String::new(),
2363 position: String::new(),
2364 });
2365 source_layout.pendingbackendactions = Some(actions);
2366 store.update(&mut source_layout)?;
2367 Ok(())
2368}
2369
2370fn wstore_workspace_exists(
2379 store: &WaveStore,
2380 workspace_id: &str,
2381) -> Result<bool, crate::backend::storage::StoreError> {
2382 Ok(store.get::<Workspace>(workspace_id)?.is_some())
2383}
2384
2385