agentmux_srv\server/
drone_handlers.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WSH RPC handlers for the Drone pane (issue #753 Phase 1).
5//!
6//! Commands:
7//!   * `listdrones`       → `Vec<DroneDefinition>`
8//!   * `getdrone`         → `Option<DroneDefinition>`
9//!   * `upsertdrone`      → `DroneDefinition` (echoed back, normalized)
10//!   * `deletedrone`      → `{ deleted: bool }`
11//!   * `rundrone`         → `{ run_id: String }` (synchronous; SSE
12//!                              streaming added in Phase 1 PR-4)
13//!   * `listdroneruns`    → `Vec<DroneRun>`
14//!
15//! Run streaming: the executor emits `RunEvent`s over an mpsc channel.
16//! Phase 1 of this PR drains the channel server-side and stores the
17//! final block-state snapshot in `db_drone_runs`. A future commit
18//! will tee the channel to the renderer via the existing `wps` event
19//! broker so `RunPanel` shows live per-block status.
20
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25use serde::{Deserialize, Serialize};
26
27use crate::backend::rpc_types::{
28    COMMAND_DELETE_DRONE, COMMAND_GET_DRONE, COMMAND_LIST_DRONES,
29    COMMAND_LIST_DRONE_RUNS, COMMAND_RUN_DRONE, COMMAND_UPSERT_DRONE,
30};
31use crate::backend::wps::WaveEvent;
32use crate::server::AppState;
33use crate::backend::rpc::engine::WshRpcEngine;
34use crate::drone::executor::{run_drone, RunEvent};
35use crate::drone::storage::DroneStore;
36use crate::drone::types::{RunStatus, DroneDefinition, DroneRun};
37
38#[derive(Debug, Deserialize)]
39struct GetDroneReq {
40    id: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct DeleteDroneReq {
45    id: String,
46}
47
48#[derive(Debug, Deserialize)]
49struct RunDroneReq {
50    drone_id: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct ListRunsReq {
55    drone_id: String,
56    #[serde(default = "default_limit")]
57    limit: i64,
58}
59
60fn default_limit() -> i64 {
61    50
62}
63
64/// Hard cap on `ListRunsReq.limit` — guards against a malicious or
65/// buggy client passing e.g. `i64::MAX` and pulling the entire run
66/// history (DoS / memory blow-up). 200 covers any plausible UI page
67/// size with headroom; Phase 2 pagination cursor work will move this
68/// to client-driven slicing. (kimi P1 on PR #755.)
69const MAX_LIST_LIMIT: i64 = 200;
70
71#[derive(Debug, Serialize)]
72struct DeleteResp {
73    deleted: bool,
74}
75
76#[derive(Debug, Serialize)]
77struct RunResp {
78    run_id: String,
79}
80
81pub fn register_drone_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
82    let wstore = state.wstore.clone();
83    engine.register_handler(
84        COMMAND_LIST_DRONES,
85        Box::new(move |_data, _ctx| {
86            let wstore = wstore.clone();
87            Box::pin(async move {
88                let list = wstore
89                    .drone_list()
90                    .map_err(|e| format!("listdrones: {e}"))?;
91                Ok(Some(serde_json::to_value(&list).unwrap_or_default()))
92            })
93        }),
94    );
95
96    let wstore = state.wstore.clone();
97    engine.register_handler(
98        COMMAND_GET_DRONE,
99        Box::new(move |data, _ctx| {
100            let wstore = wstore.clone();
101            Box::pin(async move {
102                let cmd: GetDroneReq = serde_json::from_value(data)
103                    .map_err(|e| format!("getdrone: {e}"))?;
104                let row = wstore
105                    .drone_get(&cmd.id)
106                    .map_err(|e| format!("getdrone: {e}"))?;
107                Ok(Some(serde_json::to_value(&row).unwrap_or_default()))
108            })
109        }),
110    );
111
112    let wstore = state.wstore.clone();
113    let broker = state.broker.clone();
114    engine.register_handler(
115        COMMAND_UPSERT_DRONE,
116        Box::new(move |data, _ctx| {
117            let wstore = wstore.clone();
118            let broker = broker.clone();
119            Box::pin(async move {
120                let mut cmd: DroneDefinition = serde_json::from_value(data)
121                    .map_err(|e| format!("upsertdrone: {e}"))?;
122                let now = now_ms();
123                if cmd.created_at == 0 {
124                    cmd.created_at = now;
125                }
126                cmd.updated_at = now;
127                if cmd.id.is_empty() {
128                    cmd.id = uuid::Uuid::new_v4().to_string();
129                }
130                wstore
131                    .drone_upsert(&cmd)
132                    .map_err(|e| format!("upsertdrone: {e}"))?;
133                broker.publish(WaveEvent {
134                    event: "drones:changed".to_string(),
135                    scopes: vec![],
136                    sender: String::new(),
137                    persist: 0,
138                    data: None,
139                });
140                Ok(Some(serde_json::to_value(&cmd).unwrap_or_default()))
141            })
142        }),
143    );
144
145    let wstore = state.wstore.clone();
146    let broker = state.broker.clone();
147    engine.register_handler(
148        COMMAND_DELETE_DRONE,
149        Box::new(move |data, _ctx| {
150            let wstore = wstore.clone();
151            let broker = broker.clone();
152            Box::pin(async move {
153                let cmd: DeleteDroneReq = serde_json::from_value(data)
154                    .map_err(|e| format!("deletedrone: {e}"))?;
155                let deleted = wstore
156                    .drone_delete(&cmd.id)
157                    .map_err(|e| format!("deletedrone: {e}"))?;
158                if deleted {
159                    broker.publish(WaveEvent {
160                        event: "drones:changed".to_string(),
161                        scopes: vec![],
162                        sender: String::new(),
163                        persist: 0,
164                        data: None,
165                    });
166                }
167                Ok(Some(serde_json::to_value(&DeleteResp { deleted }).unwrap_or_default()))
168            })
169        }),
170    );
171
172    let wstore = state.wstore.clone();
173    let broker = state.broker.clone();
174    engine.register_handler(
175        COMMAND_RUN_DRONE,
176        Box::new(move |data, _ctx| {
177            let wstore = wstore.clone();
178            let broker = broker.clone();
179            Box::pin(async move {
180                let cmd: RunDroneReq = serde_json::from_value(data)
181                    .map_err(|e| format!("rundrone: {e}"))?;
182                let wf = wstore
183                    .drone_get(&cmd.drone_id)
184                    .map_err(|e| format!("rundrone: {e}"))?
185                    .ok_or_else(|| {
186                        format!("rundrone: drone {} not found", cmd.drone_id)
187                    })?;
188                let started_at = now_ms();
189                let mut handle = run_drone(wf.id.clone(), wf.graph.clone())
190                    .await
191                    .map_err(|e| format!("rundrone: {e}"))?;
192                let run_id = handle.run_id.clone();
193                let drone_id = wf.id.clone();
194
195                // Persist a `running` placeholder row synchronously
196                // BEFORE returning. Guarantees:
197                //   1. The frontend's `refreshRuns` (called right
198                //      after this RPC resolves) always sees the row,
199                //      no spawn-race against the drain (codex+reagent
200                //      P2 from earlier rounds).
201                //   2. The drain can run as a background spawn
202                //      again — so drones longer than the RPC
203                //      timeout (5s default) don't get their drain
204                //      truncated mid-flight (codex P1 v0.33.842).
205                //   3. Server-restart safety: an orphaned `running`
206                //      row signals "drain was interrupted" — a
207                //      future startup task can mark stale rows as
208                //      `interrupted` (cleaner than the prior
209                //      fire-and-forget that left no row at all).
210                let placeholder = DroneRun {
211                    id: run_id.clone(),
212                    drone_id: drone_id.clone(),
213                    status: RunStatus::Running.as_str().to_string(),
214                    started_at,
215                    ended_at: 0,
216                    block_states: HashMap::new(),
217                    output: String::new(),
218                    error: String::new(),
219                };
220                wstore
221                    .drone_run_insert(&placeholder)
222                    .map_err(|e| format!("rundrone placeholder: {e}"))?;
223
224                // Drain on a background task; on completion, UPDATE
225                // the placeholder row in place.
226                let wstore_for_drain = wstore.clone();
227                let broker_for_drain = broker.clone();
228                let run_id_for_drain = run_id.clone();
229                let drone_id_for_drain = drone_id.clone();
230                tokio::spawn(async move {
231                    let mut last_status = RunStatus::Running;
232                    let mut output = String::new();
233                    let mut error = String::new();
234                    while let Some(ev) = handle.events.recv().await {
235                        // For terminal events, persist the final row
236                        // BEFORE publishing the event. The frontend
237                        // subscription path refreshes the runs list on
238                        // RunDone / RunFailed, and if the publish
239                        // happens first the refresh sees a stale
240                        // `running` row (codex P2 on PR #843).
241                        let is_terminal = matches!(
242                            ev,
243                            RunEvent::RunDone { .. } | RunEvent::RunFailed { .. }
244                        );
245                        if is_terminal {
246                            match &ev {
247                                RunEvent::RunDone { output: o, .. } => {
248                                    last_status = RunStatus::Done;
249                                    // Engine already unwraps Response's
250                                    // `{ "value": ... }` wrapper.
251                                    output = match o {
252                                        serde_json::Value::String(s) => s.clone(),
253                                        other => {
254                                            serde_json::to_string(other).unwrap_or_default()
255                                        }
256                                    };
257                                }
258                                RunEvent::RunFailed { error: e, .. } => {
259                                    last_status = RunStatus::Failed;
260                                    error = e.clone();
261                                }
262                                _ => unreachable!(),
263                            }
264                            let states = handle.final_states.lock().await.clone();
265                            let row = DroneRun {
266                                id: run_id_for_drain.clone(),
267                                drone_id: drone_id_for_drain.clone(),
268                                status: last_status.as_str().to_string(),
269                                started_at,
270                                ended_at: now_ms(),
271                                block_states: states,
272                                output: output.clone(),
273                                error: error.clone(),
274                            };
275                            match wstore_for_drain.drone_run_update(&row) {
276                                Ok(0) => tracing::warn!(
277                                    run_id = %run_id_for_drain,
278                                    "drone_run_update: placeholder row missing (race?)"
279                                ),
280                                Ok(_) => {}
281                                Err(e) => tracing::warn!(
282                                    run_id = %run_id_for_drain,
283                                    error = %e,
284                                    "drone_run_update failed"
285                                ),
286                            }
287                        }
288                        broker_for_drain.publish(WaveEvent {
289                            event: format!("dronerun:{}", run_id_for_drain),
290                            scopes: vec![],
291                            sender: String::new(),
292                            persist: 0,
293                            data: Some(serde_json::to_value(&ev).unwrap_or_default()),
294                        });
295                    }
296                });
297
298                Ok(Some(serde_json::to_value(&RunResp { run_id }).unwrap_or_default()))
299            })
300        }),
301    );
302
303    let wstore = state.wstore.clone();
304    engine.register_handler(
305        COMMAND_LIST_DRONE_RUNS,
306        Box::new(move |data, _ctx| {
307            let wstore = wstore.clone();
308            Box::pin(async move {
309                let cmd: ListRunsReq = serde_json::from_value(data)
310                    .map_err(|e| format!("listdroneruns: {e}"))?;
311                let limit = cmd.limit.clamp(0, MAX_LIST_LIMIT);
312                let list = wstore
313                    .drone_runs_for(&cmd.drone_id, limit)
314                    .map_err(|e| format!("listdroneruns: {e}"))?;
315                Ok(Some(serde_json::to_value(&list).unwrap_or_default()))
316            })
317        }),
318    );
319}
320
321fn now_ms() -> i64 {
322    SystemTime::now()
323        .duration_since(UNIX_EPOCH)
324        .map(|d| d.as_millis() as i64)
325        .unwrap_or(0)
326}