1use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25use serde::{Deserialize, Serialize};
26
27use crate::backend::rpc_types::{
28 COMMAND_DELETE_DRONE, COMMAND_GET_DRONE, COMMAND_LIST_DRONES,
29 COMMAND_LIST_DRONE_RUNS, COMMAND_RUN_DRONE, COMMAND_UPSERT_DRONE,
30};
31use crate::backend::wps::WaveEvent;
32use crate::server::AppState;
33use crate::backend::rpc::engine::WshRpcEngine;
34use crate::drone::executor::{run_drone, RunEvent};
35use crate::drone::storage::DroneStore;
36use crate::drone::types::{RunStatus, DroneDefinition, DroneRun};
37
38#[derive(Debug, Deserialize)]
39struct GetDroneReq {
40 id: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct DeleteDroneReq {
45 id: String,
46}
47
48#[derive(Debug, Deserialize)]
49struct RunDroneReq {
50 drone_id: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct ListRunsReq {
55 drone_id: String,
56 #[serde(default = "default_limit")]
57 limit: i64,
58}
59
60fn default_limit() -> i64 {
61 50
62}
63
64const MAX_LIST_LIMIT: i64 = 200;
70
71#[derive(Debug, Serialize)]
72struct DeleteResp {
73 deleted: bool,
74}
75
76#[derive(Debug, Serialize)]
77struct RunResp {
78 run_id: String,
79}
80
81pub fn register_drone_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
82 let wstore = state.wstore.clone();
83 engine.register_handler(
84 COMMAND_LIST_DRONES,
85 Box::new(move |_data, _ctx| {
86 let wstore = wstore.clone();
87 Box::pin(async move {
88 let list = wstore
89 .drone_list()
90 .map_err(|e| format!("listdrones: {e}"))?;
91 Ok(Some(serde_json::to_value(&list).unwrap_or_default()))
92 })
93 }),
94 );
95
96 let wstore = state.wstore.clone();
97 engine.register_handler(
98 COMMAND_GET_DRONE,
99 Box::new(move |data, _ctx| {
100 let wstore = wstore.clone();
101 Box::pin(async move {
102 let cmd: GetDroneReq = serde_json::from_value(data)
103 .map_err(|e| format!("getdrone: {e}"))?;
104 let row = wstore
105 .drone_get(&cmd.id)
106 .map_err(|e| format!("getdrone: {e}"))?;
107 Ok(Some(serde_json::to_value(&row).unwrap_or_default()))
108 })
109 }),
110 );
111
112 let wstore = state.wstore.clone();
113 let broker = state.broker.clone();
114 engine.register_handler(
115 COMMAND_UPSERT_DRONE,
116 Box::new(move |data, _ctx| {
117 let wstore = wstore.clone();
118 let broker = broker.clone();
119 Box::pin(async move {
120 let mut cmd: DroneDefinition = serde_json::from_value(data)
121 .map_err(|e| format!("upsertdrone: {e}"))?;
122 let now = now_ms();
123 if cmd.created_at == 0 {
124 cmd.created_at = now;
125 }
126 cmd.updated_at = now;
127 if cmd.id.is_empty() {
128 cmd.id = uuid::Uuid::new_v4().to_string();
129 }
130 wstore
131 .drone_upsert(&cmd)
132 .map_err(|e| format!("upsertdrone: {e}"))?;
133 broker.publish(WaveEvent {
134 event: "drones:changed".to_string(),
135 scopes: vec![],
136 sender: String::new(),
137 persist: 0,
138 data: None,
139 });
140 Ok(Some(serde_json::to_value(&cmd).unwrap_or_default()))
141 })
142 }),
143 );
144
145 let wstore = state.wstore.clone();
146 let broker = state.broker.clone();
147 engine.register_handler(
148 COMMAND_DELETE_DRONE,
149 Box::new(move |data, _ctx| {
150 let wstore = wstore.clone();
151 let broker = broker.clone();
152 Box::pin(async move {
153 let cmd: DeleteDroneReq = serde_json::from_value(data)
154 .map_err(|e| format!("deletedrone: {e}"))?;
155 let deleted = wstore
156 .drone_delete(&cmd.id)
157 .map_err(|e| format!("deletedrone: {e}"))?;
158 if deleted {
159 broker.publish(WaveEvent {
160 event: "drones:changed".to_string(),
161 scopes: vec![],
162 sender: String::new(),
163 persist: 0,
164 data: None,
165 });
166 }
167 Ok(Some(serde_json::to_value(&DeleteResp { deleted }).unwrap_or_default()))
168 })
169 }),
170 );
171
172 let wstore = state.wstore.clone();
173 let broker = state.broker.clone();
174 engine.register_handler(
175 COMMAND_RUN_DRONE,
176 Box::new(move |data, _ctx| {
177 let wstore = wstore.clone();
178 let broker = broker.clone();
179 Box::pin(async move {
180 let cmd: RunDroneReq = serde_json::from_value(data)
181 .map_err(|e| format!("rundrone: {e}"))?;
182 let wf = wstore
183 .drone_get(&cmd.drone_id)
184 .map_err(|e| format!("rundrone: {e}"))?
185 .ok_or_else(|| {
186 format!("rundrone: drone {} not found", cmd.drone_id)
187 })?;
188 let started_at = now_ms();
189 let mut handle = run_drone(wf.id.clone(), wf.graph.clone())
190 .await
191 .map_err(|e| format!("rundrone: {e}"))?;
192 let run_id = handle.run_id.clone();
193 let drone_id = wf.id.clone();
194
195 let placeholder = DroneRun {
211 id: run_id.clone(),
212 drone_id: drone_id.clone(),
213 status: RunStatus::Running.as_str().to_string(),
214 started_at,
215 ended_at: 0,
216 block_states: HashMap::new(),
217 output: String::new(),
218 error: String::new(),
219 };
220 wstore
221 .drone_run_insert(&placeholder)
222 .map_err(|e| format!("rundrone placeholder: {e}"))?;
223
224 let wstore_for_drain = wstore.clone();
227 let broker_for_drain = broker.clone();
228 let run_id_for_drain = run_id.clone();
229 let drone_id_for_drain = drone_id.clone();
230 tokio::spawn(async move {
231 let mut last_status = RunStatus::Running;
232 let mut output = String::new();
233 let mut error = String::new();
234 while let Some(ev) = handle.events.recv().await {
235 let is_terminal = matches!(
242 ev,
243 RunEvent::RunDone { .. } | RunEvent::RunFailed { .. }
244 );
245 if is_terminal {
246 match &ev {
247 RunEvent::RunDone { output: o, .. } => {
248 last_status = RunStatus::Done;
249 output = match o {
252 serde_json::Value::String(s) => s.clone(),
253 other => {
254 serde_json::to_string(other).unwrap_or_default()
255 }
256 };
257 }
258 RunEvent::RunFailed { error: e, .. } => {
259 last_status = RunStatus::Failed;
260 error = e.clone();
261 }
262 _ => unreachable!(),
263 }
264 let states = handle.final_states.lock().await.clone();
265 let row = DroneRun {
266 id: run_id_for_drain.clone(),
267 drone_id: drone_id_for_drain.clone(),
268 status: last_status.as_str().to_string(),
269 started_at,
270 ended_at: now_ms(),
271 block_states: states,
272 output: output.clone(),
273 error: error.clone(),
274 };
275 match wstore_for_drain.drone_run_update(&row) {
276 Ok(0) => tracing::warn!(
277 run_id = %run_id_for_drain,
278 "drone_run_update: placeholder row missing (race?)"
279 ),
280 Ok(_) => {}
281 Err(e) => tracing::warn!(
282 run_id = %run_id_for_drain,
283 error = %e,
284 "drone_run_update failed"
285 ),
286 }
287 }
288 broker_for_drain.publish(WaveEvent {
289 event: format!("dronerun:{}", run_id_for_drain),
290 scopes: vec![],
291 sender: String::new(),
292 persist: 0,
293 data: Some(serde_json::to_value(&ev).unwrap_or_default()),
294 });
295 }
296 });
297
298 Ok(Some(serde_json::to_value(&RunResp { run_id }).unwrap_or_default()))
299 })
300 }),
301 );
302
303 let wstore = state.wstore.clone();
304 engine.register_handler(
305 COMMAND_LIST_DRONE_RUNS,
306 Box::new(move |data, _ctx| {
307 let wstore = wstore.clone();
308 Box::pin(async move {
309 let cmd: ListRunsReq = serde_json::from_value(data)
310 .map_err(|e| format!("listdroneruns: {e}"))?;
311 let limit = cmd.limit.clamp(0, MAX_LIST_LIMIT);
312 let list = wstore
313 .drone_runs_for(&cmd.drone_id, limit)
314 .map_err(|e| format!("listdroneruns: {e}"))?;
315 Ok(Some(serde_json::to_value(&list).unwrap_or_default()))
316 })
317 }),
318 );
319}
320
321fn now_ms() -> i64 {
322 SystemTime::now()
323 .duration_since(UNIX_EPOCH)
324 .map(|d| d.as_millis() as i64)
325 .unwrap_or(0)
326}