agentmux_srv\drone/
storage.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WaveStore extension methods for `db_drone_definitions` +
5//! `db_drone_runs`. Lives in the drone module because the table
6//! schema is local to this feature; if Drone ever ships separately
7//! we can pull these methods out into a wstore module without changing
8//! call sites.
9
10use rusqlite::params;
11
12use crate::backend::storage::error::StoreError;
13use crate::backend::storage::wstore::WaveStore;
14
15use super::types::{DroneDefinition, DroneRun};
16
17pub trait DroneStore {
18    fn drone_list(&self) -> Result<Vec<DroneDefinition>, StoreError>;
19    fn drone_get(&self, id: &str) -> Result<Option<DroneDefinition>, StoreError>;
20    fn drone_upsert(&self, wf: &DroneDefinition) -> Result<(), StoreError>;
21    fn drone_delete(&self, id: &str) -> Result<bool, StoreError>;
22    fn drone_run_insert(&self, run: &DroneRun) -> Result<(), StoreError>;
23    /// Update an existing run row in place. Returns rows affected
24    /// (0 if no row matched `run.id`). Used to flip a placeholder
25    /// `running` row into its terminal state once the drain task
26    /// completes, so the row exists from the moment the RPC returns.
27    fn drone_run_update(&self, run: &DroneRun) -> Result<usize, StoreError>;
28    fn drone_runs_for(
29        &self,
30        drone_id: &str,
31        limit: i64,
32    ) -> Result<Vec<DroneRun>, StoreError>;
33}
34
35impl DroneStore for WaveStore {
36    fn drone_list(&self) -> Result<Vec<DroneDefinition>, StoreError> {
37        let conn = self.conn().lock().unwrap();
38        let mut stmt = conn.prepare(
39            "SELECT id, name, description, graph, viewport, created_at, updated_at
40             FROM db_drone_definitions
41             ORDER BY updated_at DESC",
42        )?;
43        let iter = stmt.query_map([], |row| {
44            let graph_s: String = row.get(3)?;
45            let viewport_s: String = row.get(4)?;
46            Ok(DroneDefinition {
47                id: row.get(0)?,
48                name: row.get(1)?,
49                description: row.get(2)?,
50                graph: serde_json::from_str(&graph_s).unwrap_or_default(),
51                viewport: serde_json::from_str(&viewport_s).unwrap_or_default(),
52                created_at: row.get(5)?,
53                updated_at: row.get(6)?,
54            })
55        })?;
56        let mut out = Vec::new();
57        for r in iter {
58            out.push(r?);
59        }
60        Ok(out)
61    }
62
63    fn drone_get(&self, id: &str) -> Result<Option<DroneDefinition>, StoreError> {
64        let conn = self.conn().lock().unwrap();
65        let mut stmt = conn.prepare(
66            "SELECT id, name, description, graph, viewport, created_at, updated_at
67             FROM db_drone_definitions WHERE id = ?1",
68        )?;
69        let result = stmt.query_row(params![id], |row| {
70            let graph_s: String = row.get(3)?;
71            let viewport_s: String = row.get(4)?;
72            Ok(DroneDefinition {
73                id: row.get(0)?,
74                name: row.get(1)?,
75                description: row.get(2)?,
76                graph: serde_json::from_str(&graph_s).unwrap_or_default(),
77                viewport: serde_json::from_str(&viewport_s).unwrap_or_default(),
78                created_at: row.get(5)?,
79                updated_at: row.get(6)?,
80            })
81        });
82        match result {
83            Ok(w) => Ok(Some(w)),
84            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
85            Err(e) => Err(e.into()),
86        }
87    }
88
89    fn drone_upsert(&self, wf: &DroneDefinition) -> Result<(), StoreError> {
90        let conn = self.conn().lock().unwrap();
91        let graph = serde_json::to_string(&wf.graph).unwrap_or_else(|_| "{}".to_string());
92        let viewport = serde_json::to_string(&wf.viewport).unwrap_or_else(|_| "{}".to_string());
93        conn.execute(
94            "INSERT INTO db_drone_definitions
95                (id, name, description, graph, viewport, created_at, updated_at)
96             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
97             ON CONFLICT(id) DO UPDATE SET
98                name = excluded.name,
99                description = excluded.description,
100                graph = excluded.graph,
101                viewport = excluded.viewport,
102                updated_at = excluded.updated_at",
103            params![
104                wf.id,
105                wf.name,
106                wf.description,
107                graph,
108                viewport,
109                wf.created_at,
110                wf.updated_at,
111            ],
112        )?;
113        Ok(())
114    }
115
116    fn drone_delete(&self, id: &str) -> Result<bool, StoreError> {
117        let conn = self.conn().lock().unwrap();
118        let rows =
119            conn.execute("DELETE FROM db_drone_definitions WHERE id = ?1", params![id])?;
120        Ok(rows > 0)
121    }
122
123    fn drone_run_insert(&self, run: &DroneRun) -> Result<(), StoreError> {
124        let conn = self.conn().lock().unwrap();
125        // Plain INSERT — the run-history table is append-only by
126        // design (one row per RunDrone invocation). Switching from
127        // INSERT OR REPLACE means a duplicate run_id fails loudly
128        // rather than silently overwriting a historical record (kimi
129        // P1 on PR #755). run_id is a fresh UUID per invocation so
130        // collisions in normal flow are vanishingly unlikely; the
131        // loud failure is the point — anything that does collide is
132        // a real bug worth surfacing.
133        //
134        // Status transitions (running → done/failed) happen via
135        // `drone_run_update`, NOT a second INSERT. See codex P1
136        // on PR #755 v0.33.842 (RPC timeout truncating the drain).
137        let block_states =
138            serde_json::to_string(&run.block_states).unwrap_or_else(|_| "{}".to_string());
139        conn.execute(
140            "INSERT INTO db_drone_runs
141                (id, drone_id, status, started_at, ended_at, block_states, output, error)
142             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
143            params![
144                run.id,
145                run.drone_id,
146                run.status,
147                run.started_at,
148                run.ended_at,
149                block_states,
150                run.output,
151                run.error,
152            ],
153        )?;
154        Ok(())
155    }
156
157    fn drone_run_update(&self, run: &DroneRun) -> Result<usize, StoreError> {
158        let conn = self.conn().lock().unwrap();
159        let block_states =
160            serde_json::to_string(&run.block_states).unwrap_or_else(|_| "{}".to_string());
161        let rows = conn.execute(
162            "UPDATE db_drone_runs SET
163                status      = ?2,
164                ended_at    = ?3,
165                block_states= ?4,
166                output      = ?5,
167                error       = ?6
168             WHERE id = ?1",
169            params![
170                run.id,
171                run.status,
172                run.ended_at,
173                block_states,
174                run.output,
175                run.error,
176            ],
177        )?;
178        Ok(rows)
179    }
180
181    fn drone_runs_for(
182        &self,
183        drone_id: &str,
184        limit: i64,
185    ) -> Result<Vec<DroneRun>, StoreError> {
186        let conn = self.conn().lock().unwrap();
187        let mut stmt = conn.prepare(
188            "SELECT id, drone_id, status, started_at, ended_at, block_states, output, error
189             FROM db_drone_runs
190             WHERE drone_id = ?1
191             ORDER BY started_at DESC
192             LIMIT ?2",
193        )?;
194        let iter = stmt.query_map(params![drone_id, limit], |row| {
195            let block_states_s: String = row.get(5)?;
196            Ok(DroneRun {
197                id: row.get(0)?,
198                drone_id: row.get(1)?,
199                status: row.get(2)?,
200                started_at: row.get(3)?,
201                ended_at: row.get(4)?,
202                block_states: serde_json::from_str(&block_states_s).unwrap_or_default(),
203                output: row.get(6)?,
204                error: row.get(7)?,
205            })
206        })?;
207        let mut out = Vec::new();
208        for r in iter {
209            out.push(r?);
210        }
211        Ok(out)
212    }
213}