1use std::path::Path;
43
44use rusqlite::{params, Connection};
45use tracing::{info, warn};
46
47use super::error::StoreError;
48
49pub const CONSOLIDATE_MARKER: &str = "migration_agents_consolidate_v1.flag";
52
53#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
55pub struct ConsolidateStats {
56 pub templates_inserted: usize,
57 pub user_defs_inserted: usize,
58 pub instances_as_clone_inserted: usize,
59 pub instances_folded_into_def: usize,
60 pub instances_skipped_continuation: usize,
61 pub instances_skipped_no_definition: usize,
62 pub instances_collision_warned: usize,
63 pub already_done: bool,
64}
65
66pub fn run_consolidate_migration(
76 conn: &mut Connection,
77 data_dir: Option<&Path>,
78) -> Result<ConsolidateStats, StoreError> {
79 if let Some(dir) = data_dir {
81 let marker = dir.join(CONSOLIDATE_MARKER);
82 if marker.exists() {
83 return Ok(ConsolidateStats {
84 already_done: true,
85 ..Default::default()
86 });
87 }
88 }
89
90 let tx = conn.transaction()?;
95
96 let mut stats = ConsolidateStats::default();
97
98 {
100 let mut def_stmt = tx.prepare(
101 "SELECT id, name, icon, provider, description,
102 working_directory, shell, provider_flags, auto_start,
103 restart_on_crash, idle_timeout_minutes, created_at,
104 agent_type, environment, agent_bus_id, is_seeded,
105 accounts, parent_id, branch_label, updated_at, slug
106 FROM db_agent_definitions",
107 )?;
108 let rows = def_stmt.query_map([], |row| {
109 Ok(DefRow {
110 id: row.get(0)?,
111 name: row.get(1)?,
112 icon: row.get(2)?,
113 provider: row.get(3)?,
114 description: row.get(4)?,
115 working_directory: row.get(5)?,
116 shell: row.get(6)?,
117 provider_flags: row.get(7)?,
118 auto_start: row.get(8)?,
119 restart_on_crash: row.get(9)?,
120 idle_timeout_minutes: row.get(10)?,
121 created_at: row.get(11)?,
122 agent_type: row.get(12)?,
123 environment: row.get(13)?,
124 agent_bus_id: row.get(14)?,
125 is_seeded: row.get(15)?,
126 accounts: row.get(16)?,
127 parent_id: row.get(17)?,
128 branch_label: row.get(18)?,
129 updated_at: row.get(19)?,
130 slug: row.get(20)?,
131 })
132 })?;
133 let mut defs: Vec<DefRow> = Vec::new();
134 for r in rows {
135 defs.push(r?);
136 }
137 drop(def_stmt);
138 for def in &defs {
139 let is_template = if def.is_seeded == 1 { 1_i64 } else { 0_i64 };
140 let parent_template_id = if def.is_seeded == 1 {
141 String::new()
142 } else {
143 def.parent_id.clone()
144 };
145 tx.execute(
150 "INSERT OR REPLACE INTO db_agents (
151 id, name, icon, description,
152 is_template, parent_template_id,
153 provider, provider_flags, shell, environment,
154 agent_type, agent_bus_id, accounts,
155 auto_start, restart_on_crash, idle_timeout_minutes,
156 slug, branch_label,
157 identity_id, memory_id, working_directory, github_context,
158 instance_name,
159 created_at, updated_at, is_seeded, user_hidden
160 ) VALUES (
161 ?1, ?2, ?3, ?4,
162 ?5, ?6,
163 ?7, ?8, ?9, ?10,
164 ?11, ?12, ?13,
165 ?14, ?15, ?16,
166 ?17, ?18,
167 '', '', '', '',
168 '',
169 ?19, ?20, ?21, 0
170 )",
171 params![
172 def.id,
173 def.name,
174 def.icon,
175 def.description,
176 is_template,
177 parent_template_id,
178 def.provider,
179 def.provider_flags,
180 def.shell,
181 def.environment,
182 def.agent_type,
183 def.agent_bus_id,
184 def.accounts,
185 def.auto_start,
186 def.restart_on_crash,
187 def.idle_timeout_minutes,
188 def.slug,
189 def.branch_label,
190 def.created_at,
191 def.updated_at,
192 def.is_seeded,
193 ],
194 )?;
195 if def.is_seeded == 1 {
196 stats.templates_inserted += 1;
197 } else {
198 stats.user_defs_inserted += 1;
199 }
200 }
201 }
202
203 let inst_rows: Vec<InstanceRow> = {
208 let mut stmt = tx.prepare(
209 "SELECT i.id, i.definition_id, i.parent_instance_id,
210 i.instance_name, i.identity_id, i.memory_id,
211 i.working_directory, i.github_context,
212 i.created_at, i.display_hidden,
213 d.is_seeded, d.name, d.icon, d.description,
214 d.provider, d.provider_flags, d.shell, d.environment,
215 d.agent_type, d.agent_bus_id, d.accounts,
216 d.auto_start, d.restart_on_crash, d.idle_timeout_minutes,
217 d.slug, d.branch_label
218 FROM db_agent_instances i
219 LEFT JOIN db_agent_definitions d ON d.id = i.definition_id
220 ORDER BY i.created_at DESC",
221 )?;
222 let iter = stmt.query_map([], |row| {
223 Ok(InstanceRow {
224 id: row.get(0)?,
225 definition_id: row.get(1)?,
226 parent_instance_id: row.get(2)?,
227 instance_name: row.get(3)?,
228 identity_id: row.get(4)?,
229 memory_id: row.get(5)?,
230 working_directory: row.get(6)?,
231 github_context: row.get(7)?,
232 created_at: row.get(8)?,
233 display_hidden: row.get::<_, i64>(9)? != 0,
234 def_is_seeded: row.get::<_, Option<i64>>(10)?.unwrap_or(0),
235 def_name: row.get::<_, Option<String>>(11)?.unwrap_or_default(),
236 def_icon: row.get::<_, Option<String>>(12)?.unwrap_or_default(),
237 def_description: row.get::<_, Option<String>>(13)?.unwrap_or_default(),
238 def_provider: row.get::<_, Option<String>>(14)?.unwrap_or_default(),
239 def_provider_flags: row.get::<_, Option<String>>(15)?.unwrap_or_default(),
240 def_shell: row.get::<_, Option<String>>(16)?.unwrap_or_default(),
241 def_environment: row.get::<_, Option<String>>(17)?.unwrap_or_default(),
242 def_agent_type: row
243 .get::<_, Option<String>>(18)?
244 .unwrap_or_else(|| "standalone".to_string()),
245 def_agent_bus_id: row.get::<_, Option<String>>(19)?.unwrap_or_default(),
246 def_accounts: row.get::<_, Option<String>>(20)?.unwrap_or_default(),
247 def_auto_start: row.get::<_, Option<i64>>(21)?.unwrap_or(0),
248 def_restart_on_crash: row.get::<_, Option<i64>>(22)?.unwrap_or(0),
249 def_idle_timeout_minutes: row.get::<_, Option<i64>>(23)?.unwrap_or(0),
250 def_slug: row.get::<_, Option<String>>(24)?.unwrap_or_default(),
251 def_branch_label: row.get::<_, Option<String>>(25)?.unwrap_or_default(),
252 def_present: row.get::<_, Option<i64>>(10)?.is_some(),
253 })
254 })?;
255 let mut out = Vec::new();
256 for r in iter {
257 out.push(r?);
258 }
259 out
260 };
261
262 let mut folded: std::collections::HashSet<String> = std::collections::HashSet::new();
266
267 for inst in &inst_rows {
268 if !inst.parent_instance_id.is_empty() {
269 stats.instances_skipped_continuation += 1;
270 continue;
271 }
272 if !inst.def_present {
273 stats.instances_skipped_no_definition += 1;
277 warn!(
278 instance_id = %inst.id,
279 definition_id = %inst.definition_id,
280 "agents_consolidate: instance has no definition; skipping",
281 );
282 continue;
283 }
284 if inst.def_is_seeded == 1 {
285 let name = if inst.instance_name.is_empty() {
288 inst.def_name.clone()
289 } else {
290 inst.instance_name.clone()
291 };
292 tx.execute(
293 "INSERT OR REPLACE INTO db_agents (
294 id, name, icon, description,
295 is_template, parent_template_id,
296 provider, provider_flags, shell, environment,
297 agent_type, agent_bus_id, accounts,
298 auto_start, restart_on_crash, idle_timeout_minutes,
299 slug, branch_label,
300 identity_id, memory_id, working_directory, github_context,
301 instance_name,
302 created_at, updated_at, is_seeded, user_hidden
303 ) VALUES (
304 ?1, ?2, ?3, ?4,
305 0, ?5,
306 ?6, ?7, ?8, ?9,
307 ?10, ?11, ?12,
308 ?13, ?14, ?15,
309 ?16, ?17,
310 ?18, ?19, ?20, ?21,
311 ?22,
312 ?23, ?23, 0, ?24
313 )",
314 params![
315 inst.id,
316 name,
317 inst.def_icon,
318 inst.def_description,
319 inst.definition_id,
320 inst.def_provider,
321 inst.def_provider_flags,
322 inst.def_shell,
323 inst.def_environment,
324 inst.def_agent_type,
325 inst.def_agent_bus_id,
326 inst.def_accounts,
327 inst.def_auto_start,
328 inst.def_restart_on_crash,
329 inst.def_idle_timeout_minutes,
330 inst.def_slug,
331 inst.def_branch_label,
332 inst.identity_id,
333 inst.memory_id,
334 inst.working_directory,
335 inst.github_context,
336 inst.instance_name,
337 inst.created_at,
338 if inst.display_hidden { 1_i64 } else { 0_i64 },
339 ],
340 )?;
341 stats.instances_as_clone_inserted += 1;
342 } else {
343 if folded.contains(&inst.definition_id) {
348 stats.instances_collision_warned += 1;
349 warn!(
350 instance_id = %inst.id,
351 definition_id = %inst.definition_id,
352 "agents_consolidate: multiple instances on one user-cloned def; keeping most-recent bindings",
353 );
354 continue;
355 }
356 let name = if inst.instance_name.is_empty() {
357 inst.def_name.clone()
358 } else {
359 inst.instance_name.clone()
360 };
361 tx.execute(
362 "UPDATE db_agents SET
363 name = ?1,
364 identity_id = ?2,
365 memory_id = ?3,
366 working_directory = ?4,
367 github_context = ?5,
368 instance_name = ?6,
369 user_hidden = ?7
370 WHERE id = ?8 AND is_template = 0",
371 params![
372 name,
373 inst.identity_id,
374 inst.memory_id,
375 inst.working_directory,
376 inst.github_context,
377 inst.instance_name,
378 if inst.display_hidden { 1_i64 } else { 0_i64 },
379 inst.definition_id,
380 ],
381 )?;
382 folded.insert(inst.definition_id.clone());
383 stats.instances_folded_into_def += 1;
384 }
385 }
386
387 tx.commit()?;
388
389 if let Some(dir) = data_dir {
392 let marker = dir.join(CONSOLIDATE_MARKER);
393 if let Err(e) = std::fs::write(&marker, b"phase3a") {
394 warn!(
398 error = %e,
399 marker = %marker.display(),
400 "agents_consolidate: failed to write marker; next startup will redo backfill",
401 );
402 }
403 }
404
405 info!(
406 templates_inserted = stats.templates_inserted,
407 user_defs_inserted = stats.user_defs_inserted,
408 instances_as_clone_inserted = stats.instances_as_clone_inserted,
409 instances_folded_into_def = stats.instances_folded_into_def,
410 instances_skipped_continuation = stats.instances_skipped_continuation,
411 instances_skipped_no_definition = stats.instances_skipped_no_definition,
412 instances_collision_warned = stats.instances_collision_warned,
413 "agents_consolidate: backfill completed",
414 );
415 Ok(stats)
416}
417
418struct DefRow {
421 id: String,
422 name: String,
423 icon: String,
424 provider: String,
425 description: String,
426 working_directory: String,
427 shell: String,
428 provider_flags: String,
429 auto_start: i64,
430 restart_on_crash: i64,
431 idle_timeout_minutes: i64,
432 created_at: i64,
433 agent_type: String,
434 environment: String,
435 agent_bus_id: String,
436 is_seeded: i64,
437 accounts: String,
438 parent_id: String,
439 branch_label: String,
440 updated_at: i64,
441 slug: String,
442}
443
444struct InstanceRow {
447 id: String,
448 definition_id: String,
449 parent_instance_id: String,
450 instance_name: String,
451 identity_id: String,
452 memory_id: String,
453 working_directory: String,
454 github_context: String,
455 created_at: i64,
456 display_hidden: bool,
457 def_present: bool,
458 def_is_seeded: i64,
459 def_name: String,
460 def_icon: String,
461 def_description: String,
462 def_provider: String,
463 def_provider_flags: String,
464 def_shell: String,
465 def_environment: String,
466 def_agent_type: String,
467 def_agent_bus_id: String,
468 def_accounts: String,
469 def_auto_start: i64,
470 def_restart_on_crash: i64,
471 def_idle_timeout_minutes: i64,
472 def_slug: String,
473 def_branch_label: String,
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::backend::storage::migrations::run_object_schema;
480
481 fn fresh_conn() -> Connection {
482 let conn = Connection::open_in_memory().unwrap();
483 conn.execute_batch("PRAGMA foreign_keys=ON;").unwrap();
484 run_object_schema(&conn).unwrap();
485 conn
486 }
487
488 fn insert_def(
489 conn: &Connection,
490 id: &str,
491 name: &str,
492 is_seeded: i64,
493 parent_id: &str,
494 ) {
495 conn.execute(
496 "INSERT INTO db_agent_definitions
497 (id, slug, name, icon, provider, description, working_directory, shell,
498 provider_flags, auto_start, restart_on_crash, idle_timeout_minutes,
499 created_at, agent_type, environment, agent_bus_id, is_seeded, accounts,
500 parent_id, branch_label, updated_at)
501 VALUES (?1, ?2, ?3, '✦', 'claude', 'desc', '', 'bash',
502 '', 0, 0, 0,
503 ?4, 'standalone', '', '', ?5, '',
504 ?6, '', ?4)",
505 params![id, id, name, 1000_i64, is_seeded, parent_id],
506 )
507 .unwrap();
508 }
509
510 fn insert_instance(
511 conn: &Connection,
512 id: &str,
513 definition_id: &str,
514 instance_name: &str,
515 identity_id: &str,
516 memory_id: &str,
517 working_directory: &str,
518 created_at: i64,
519 display_hidden: bool,
520 ) {
521 conn.execute(
522 "INSERT INTO db_agent_instances
523 (id, definition_id, parent_instance_id, block_id, session_id, status,
524 github_context, started_at, ended_at, created_at, identity_id, memory_id,
525 instance_name, working_directory, display_hidden)
526 VALUES (?1, ?2, '', '', '', 'running', '', ?3, 0, ?3, ?4, ?5, ?6, ?7, ?8)",
527 params![
528 id,
529 definition_id,
530 created_at,
531 identity_id,
532 memory_id,
533 instance_name,
534 working_directory,
535 if display_hidden { 1_i64 } else { 0_i64 },
536 ],
537 )
538 .unwrap();
539 }
540
541 fn count_agents(conn: &Connection, where_clause: &str) -> i64 {
542 let sql = format!("SELECT COUNT(*) FROM db_agents WHERE {where_clause}");
543 conn.query_row(&sql, [], |row| row.get(0)).unwrap()
544 }
545
546 #[test]
547 fn round_trip_template_user_clone_and_instance_lifecycle() {
548 let mut conn = fresh_conn();
549
550 insert_def(&conn, "tpl-1", "Coder", 1, "");
556 insert_def(&conn, "tpl-2", "Reviewer", 1, "");
557 insert_def(&conn, "def-u1", "Maks-the-Coder", 0, "tpl-1");
558 insert_instance(&conn, "inst-A", "tpl-1", "Maks", "id-1", "mem-1", "/wd/a", 100, false);
559 insert_instance(&conn, "inst-B", "tpl-2", "", "", "", "/wd/b", 200, false);
560 insert_instance(&conn, "inst-C", "def-u1", "Custom", "id-2", "mem-2", "/wd/c", 300, false);
561
562 let stats = run_consolidate_migration(&mut conn, None).unwrap();
563 assert_eq!(stats.templates_inserted, 2);
564 assert_eq!(stats.user_defs_inserted, 1);
565 assert_eq!(stats.instances_as_clone_inserted, 2);
566 assert_eq!(stats.instances_folded_into_def, 1);
567 assert_eq!(stats.instances_skipped_continuation, 0);
568
569 assert_eq!(count_agents(&conn, "is_template = 1"), 2);
571 for tpl in &["tpl-1", "tpl-2"] {
572 let parent: String = conn
573 .query_row(
574 "SELECT parent_template_id FROM db_agents WHERE id = ?1",
575 params![tpl],
576 |r| r.get(0),
577 )
578 .unwrap();
579 assert_eq!(parent, "");
580 }
581
582 assert_eq!(count_agents(&conn, "is_template = 0"), 3);
584
585 let (parent, name, identity): (String, String, String) = conn
587 .query_row(
588 "SELECT parent_template_id, name, identity_id FROM db_agents WHERE id = 'inst-A'",
589 [],
590 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
591 )
592 .unwrap();
593 assert_eq!(parent, "tpl-1");
594 assert_eq!(name, "Maks");
595 assert_eq!(identity, "id-1");
596
597 let name: String = conn
599 .query_row(
600 "SELECT name FROM db_agents WHERE id = 'inst-B'",
601 [],
602 |r| r.get(0),
603 )
604 .unwrap();
605 assert_eq!(name, "Reviewer");
606
607 let (name, identity, memory): (String, String, String) = conn
609 .query_row(
610 "SELECT name, identity_id, memory_id FROM db_agents WHERE id = 'def-u1'",
611 [],
612 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
613 )
614 .unwrap();
615 assert_eq!(name, "Custom"); assert_eq!(identity, "id-2");
617 assert_eq!(memory, "mem-2");
618 }
619
620 #[test]
621 fn multiple_instances_on_one_user_clone_keeps_most_recent_bindings() {
622 let mut conn = fresh_conn();
623 insert_def(&conn, "tpl-1", "Coder", 1, "");
624 insert_def(&conn, "def-u1", "User Coder", 0, "tpl-1");
625 insert_instance(&conn, "inst-old", "def-u1", "Old", "id-OLD", "mem-OLD", "/wd/old", 100, false);
627 insert_instance(&conn, "inst-new", "def-u1", "New", "id-RECENT", "mem-RECENT", "/wd/new", 999, false);
628
629 let stats = run_consolidate_migration(&mut conn, None).unwrap();
630 assert_eq!(stats.instances_collision_warned, 1);
631
632 let identity: String = conn
633 .query_row(
634 "SELECT identity_id FROM db_agents WHERE id = 'def-u1'",
635 [],
636 |r| r.get(0),
637 )
638 .unwrap();
639 assert_eq!(identity, "id-RECENT", "most-recent instance's bindings must win");
640 }
641
642 #[test]
643 fn marker_short_circuits_second_run() {
644 let mut conn = fresh_conn();
645 insert_def(&conn, "tpl-1", "Coder", 1, "");
646 let tmp = tempfile::tempdir().unwrap();
647
648 let stats1 = run_consolidate_migration(&mut conn, Some(tmp.path())).unwrap();
649 assert_eq!(stats1.templates_inserted, 1);
650 assert!(!stats1.already_done);
651 assert!(tmp.path().join(CONSOLIDATE_MARKER).exists());
652
653 insert_def(&conn, "tpl-2", "Reviewer", 1, "");
656 let stats2 = run_consolidate_migration(&mut conn, Some(tmp.path())).unwrap();
657 assert!(stats2.already_done);
658 assert_eq!(stats2.templates_inserted, 0);
659 assert_eq!(count_agents(&conn, "is_template = 1"), 1);
662 }
663
664 #[test]
665 fn skips_continuation_rows() {
666 let mut conn = fresh_conn();
667 insert_def(&conn, "tpl-1", "Coder", 1, "");
668 insert_instance(&conn, "inst-A", "tpl-1", "Original", "", "", "/wd/a", 100, false);
669 conn.execute(
671 "INSERT INTO db_agent_instances
672 (id, definition_id, parent_instance_id, block_id, session_id, status,
673 github_context, started_at, ended_at, created_at, identity_id, memory_id,
674 instance_name, working_directory, display_hidden)
675 VALUES ('inst-cont', 'tpl-1', 'inst-A', '', '', 'running', '', 200, 0, 200,
676 '', '', 'Original', '/wd/a', 0)",
677 [],
678 )
679 .unwrap();
680
681 let stats = run_consolidate_migration(&mut conn, None).unwrap();
682 assert_eq!(stats.instances_skipped_continuation, 1);
683 assert_eq!(count_agents(&conn, "1 = 1"), 2);
685 assert_eq!(count_agents(&conn, "id = 'inst-cont'"), 0);
686 }
687
688 #[test]
689 fn empty_database_is_clean_noop() {
690 let mut conn = fresh_conn();
691 let stats = run_consolidate_migration(&mut conn, None).unwrap();
692 assert_eq!(stats, ConsolidateStats::default());
693 assert_eq!(count_agents(&conn, "1 = 1"), 0);
694 }
695
696 #[test]
697 fn preserves_hidden_flag_into_user_hidden() {
698 let mut conn = fresh_conn();
699 insert_def(&conn, "tpl-1", "Coder", 1, "");
700 insert_instance(&conn, "inst-H", "tpl-1", "Hidden", "", "", "/wd/h", 100, true);
701 run_consolidate_migration(&mut conn, None).unwrap();
702 let hidden: i64 = conn
703 .query_row(
704 "SELECT user_hidden FROM db_agents WHERE id = 'inst-H'",
705 [],
706 |r| r.get(0),
707 )
708 .unwrap();
709 assert_eq!(hidden, 1);
710 }
711}