1use std::collections::HashMap;
9use std::path::Path;
10
11use rusqlite::{Connection, OpenFlags};
12
13use super::schema::{NamedAgentRecord, NamedAgentRecordV1, MAX_SUPPORTED_SCHEMA};
14use super::store::{Registry, RegistryError};
15
16#[derive(Debug, Default, Clone, Copy)]
18pub struct MigrateStats {
19 pub versions_scanned: usize,
20 pub rows_seen: usize,
21 pub records_written: usize,
22 pub records_skipped_existing: usize,
23 pub records_skipped_unmappable: usize,
24 pub complete: bool,
29}
30
31const MARKER: &str = ".migrated_from_sqlite";
35
36pub fn migrate_from_sqlite_once(
45 shared_home: &Path,
46 registry: &Registry,
47) -> Result<MigrateStats, RegistryError> {
48 let marker_path = registry.root().join(MARKER);
49 if marker_path.exists() {
50 return Ok(MigrateStats {
53 complete: true,
54 ..MigrateStats::default()
55 });
56 }
57
58 let mut stats = MigrateStats::default();
59 let agents_root = registry.agents_root().ok_or_else(|| {
60 RegistryError::Io(std::io::Error::new(
61 std::io::ErrorKind::InvalidInput,
62 "registry root has no parent",
63 ))
64 })?;
65
66 let versions_root = shared_home.join("versions");
67 if !versions_root.is_dir() {
68 stats.complete = true;
69 write_marker(&marker_path, &stats)?;
70 return Ok(stats);
71 }
72
73 let mut latest_by_id: HashMap<String, RowSnapshot> = HashMap::new();
74 let mut any_db_failed = false;
79
80 for entry in std::fs::read_dir(&versions_root)? {
81 let v_dir = entry?.path();
82 if !v_dir.is_dir() {
83 continue;
84 }
85 let db_path = v_dir.join("data").join("db").join("objects.db");
86 if !db_path.is_file() {
87 continue;
88 }
89 stats.versions_scanned += 1;
90
91 match read_named_rows(&db_path) {
92 Ok(rows) => {
93 for row in rows {
94 stats.rows_seen += 1;
95 let key = row.id.clone();
96 match latest_by_id.get_mut(&key) {
97 Some(existing) if existing.started_at >= row.started_at => {
98 existing.display_hidden =
102 existing.display_hidden || row.display_hidden;
103 }
104 Some(existing) => {
105 let merged_hidden =
106 existing.display_hidden || row.display_hidden;
107 *existing = row;
108 existing.display_hidden = merged_hidden;
109 }
110 None => {
111 latest_by_id.insert(key, row);
112 }
113 }
114 }
115 }
116 Err(e) => {
117 tracing::warn!(
118 db = %db_path.display(),
119 error = %e,
120 "registry-migrate: per-version DB unreadable — will retry on next launch"
121 );
122 any_db_failed = true;
123 }
124 }
125 }
126
127 for (id, row) in latest_by_id {
128 if registry.exists_anywhere(&id) {
133 stats.records_skipped_existing += 1;
134 continue;
135 }
136 let display_hidden = row.display_hidden;
137 let Some(rec) = row_to_record(&row, agents_root) else {
138 stats.records_skipped_unmappable += 1;
139 continue;
140 };
141 if let Err(e) = registry.upsert(&rec) {
142 tracing::warn!(
143 instance_id = %id,
144 error = %e,
145 "registry-migrate: upsert failed"
146 );
147 stats.records_skipped_unmappable += 1;
148 continue;
149 }
150 if display_hidden {
155 if let Err(e) = registry.retire(&id) {
156 tracing::warn!(
157 instance_id = %id,
158 error = %e,
159 "registry-migrate: failed to retire migrated tombstone — record may surface as active"
160 );
161 }
162 }
163 stats.records_written += 1;
164 }
165
166 stats.complete = !any_db_failed;
172 if stats.complete {
173 write_marker(&marker_path, &stats)?;
174 } else {
175 tracing::info!(
176 "registry-migrate: deferring marker write; one or more per-version DBs were unreadable and will be retried next launch"
177 );
178 }
179 Ok(stats)
180}
181
182fn write_marker(path: &Path, stats: &MigrateStats) -> std::io::Result<()> {
183 let now = chrono::Utc::now().to_rfc3339();
184 let body = format!(
185 "migrated_at: {now}\n\
186 versions_scanned: {}\n\
187 rows_seen: {}\n\
188 records_written: {}\n\
189 records_skipped_existing: {}\n\
190 records_skipped_unmappable: {}\n",
191 stats.versions_scanned,
192 stats.rows_seen,
193 stats.records_written,
194 stats.records_skipped_existing,
195 stats.records_skipped_unmappable,
196 );
197 std::fs::write(path, body)
198}
199
200struct RowSnapshot {
201 id: String,
202 instance_name: String,
203 definition_id: String,
204 identity_id: String,
205 memory_id: String,
206 working_directory: String,
207 started_at: i64,
208 created_at: i64,
209 display_hidden: bool,
210}
211
212fn is_missing_column_or_table(e: &rusqlite::Error) -> bool {
224 let msg = match e {
225 rusqlite::Error::SqliteFailure(_, Some(msg)) => msg.as_str(),
226 rusqlite::Error::SqlInputError { msg, .. } => msg.as_str(),
227 _ => return false,
228 };
229 msg.starts_with("no such column") || msg.starts_with("no such table")
230}
231
232fn read_named_rows(db_path: &Path) -> Result<Vec<RowSnapshot>, rusqlite::Error> {
233 let conn = Connection::open_with_flags(
234 db_path,
235 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
236 )?;
237 let mut stmt = match conn.prepare(
245 "SELECT id, instance_name, definition_id, identity_id, memory_id,
246 working_directory, started_at, created_at, display_hidden
247 FROM db_agent_instances
248 WHERE instance_name <> ''
249 AND parent_instance_id = ''",
250 ) {
251 Ok(s) => s,
252 Err(e) if is_missing_column_or_table(&e) => return Ok(Vec::new()),
253 Err(e) => return Err(e),
254 };
255 let iter = stmt.query_map([], |row| {
256 Ok(RowSnapshot {
257 id: row.get(0)?,
258 instance_name: row.get(1)?,
259 definition_id: row.get(2)?,
260 identity_id: row.get(3)?,
261 memory_id: row.get(4)?,
262 working_directory: row.get(5)?,
263 started_at: row.get(6)?,
264 created_at: row.get(7)?,
265 display_hidden: row.get::<_, i64>(8)? != 0,
266 })
267 })?;
268 iter.collect()
269}
270
271fn row_to_record(row: &RowSnapshot, agents_root: &Path) -> Option<NamedAgentRecord> {
272 let abs = std::path::Path::new(&row.working_directory);
273 let rel = abs.strip_prefix(agents_root).ok()?;
274 let rel_str = rel.to_string_lossy().to_string();
275 if rel_str.is_empty() || rel_str == "." {
276 return None;
277 }
278 Some(NamedAgentRecord {
279 schema_version: MAX_SUPPORTED_SCHEMA,
280 data: NamedAgentRecordV1 {
281 instance_id: row.id.clone(),
282 instance_name: row.instance_name.clone(),
283 definition_id: row.definition_id.clone(),
284 identity_id: empty_to_none(&row.identity_id),
285 memory_id: empty_to_none(&row.memory_id),
286 working_dir: rel_str,
287 created_at_ms: row.created_at,
288 last_launched_at_ms: row.started_at,
289 created_by_version: "(legacy)".to_string(),
293 last_launched_by_version: "(legacy)".to_string(),
294 },
295 })
296}
297
298fn empty_to_none(s: &str) -> Option<String> {
299 if s.is_empty() {
300 None
301 } else {
302 Some(s.to_string())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use rusqlite::params;
310
311 fn make_version_db(version_dir: &Path, rows: &[(&str, &str, i64, &str)]) {
314 let rows: Vec<_> = rows.iter().map(|(a, b, c, d)| (*a, *b, *c, *d, false)).collect();
315 make_version_db_with_hidden(version_dir, &rows);
316 }
317
318 fn make_version_db_with_hidden(
321 version_dir: &Path,
322 rows: &[(&str, &str, i64, &str, bool)],
323 ) {
324 let db_path = version_dir.join("data").join("db");
325 std::fs::create_dir_all(&db_path).unwrap();
326 let db_path = db_path.join("objects.db");
327 let conn = Connection::open(&db_path).unwrap();
328 conn.execute_batch(
329 "CREATE TABLE db_agent_instances (
330 id TEXT PRIMARY KEY,
331 definition_id TEXT NOT NULL DEFAULT '',
332 parent_instance_id TEXT NOT NULL DEFAULT '',
333 block_id TEXT NOT NULL DEFAULT '',
334 session_id TEXT NOT NULL DEFAULT '',
335 status TEXT NOT NULL DEFAULT 'running',
336 github_context TEXT NOT NULL DEFAULT '',
337 started_at INTEGER NOT NULL DEFAULT 0,
338 ended_at INTEGER NOT NULL DEFAULT 0,
339 created_at INTEGER NOT NULL DEFAULT 0,
340 identity_id TEXT NOT NULL DEFAULT '',
341 memory_id TEXT NOT NULL DEFAULT '',
342 instance_name TEXT NOT NULL DEFAULT '',
343 working_directory TEXT NOT NULL DEFAULT '',
344 display_hidden INTEGER NOT NULL DEFAULT 0
345 );",
346 )
347 .unwrap();
348 for (id, name, started_at, working_directory, hidden) in rows {
349 conn.execute(
350 "INSERT INTO db_agent_instances
351 (id, definition_id, instance_name, working_directory, started_at, created_at, display_hidden)
352 VALUES (?1, 'claude-code', ?2, ?3, ?4, ?4, ?5)",
353 params![id, name, working_directory, started_at, if *hidden { 1_i64 } else { 0_i64 }],
354 )
355 .unwrap();
356 }
357 }
358
359 fn fresh_home() -> (tempfile::TempDir, Registry) {
360 let home = tempfile::tempdir().unwrap();
361 let reg = Registry::open(home.path().join("agents").join("registry")).unwrap();
362 (home, reg)
363 }
364
365 #[test]
366 fn migrate_with_no_versions_dir_writes_marker_and_no_rows() {
367 let (home, reg) = fresh_home();
368 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
369 assert_eq!(stats.versions_scanned, 0);
370 assert_eq!(stats.records_written, 0);
371 assert!(reg.root().join(MARKER).exists());
372 }
373
374 #[test]
375 fn migrate_is_idempotent() {
376 let (home, reg) = fresh_home();
377 migrate_from_sqlite_once(home.path(), ®).unwrap();
379 let agents_root = home.path().join("agents");
381 let v_dir = home.path().join("versions").join("0.33.821");
382 let wd = agents_root.join("demo-1");
383 std::fs::create_dir_all(&wd).unwrap();
384 make_version_db(
385 &v_dir,
386 &[("inst-1", "demo", 100, &wd.to_string_lossy())],
387 );
388 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
389 assert_eq!(
390 stats.records_written, 0,
391 "marker must short-circuit subsequent runs"
392 );
393 assert!(reg.list_active().unwrap().is_empty());
394 }
395
396 #[test]
397 fn migrate_writes_one_record_per_unique_id() {
398 let (home, reg) = fresh_home();
399 let agents_root = home.path().join("agents");
400 std::fs::create_dir_all(&agents_root).unwrap();
401 let wd_a = agents_root.join("demo-a");
402 let wd_b = agents_root.join("demo-b");
403 std::fs::create_dir_all(&wd_a).unwrap();
404 std::fs::create_dir_all(&wd_b).unwrap();
405 let v_dir = home.path().join("versions").join("0.33.821");
406 make_version_db(
407 &v_dir,
408 &[
409 ("inst-a", "demoA", 100, &wd_a.to_string_lossy()),
410 ("inst-b", "demoB", 200, &wd_b.to_string_lossy()),
411 ],
412 );
413
414 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
415 assert_eq!(stats.rows_seen, 2);
416 assert_eq!(stats.records_written, 2);
417 assert_eq!(reg.list_active().unwrap().len(), 2);
418 }
419
420 #[test]
421 fn migrate_picks_latest_started_at_on_dedup() {
422 let (home, reg) = fresh_home();
423 let agents_root = home.path().join("agents");
424 std::fs::create_dir_all(&agents_root).unwrap();
425 let wd = agents_root.join("demo");
426 std::fs::create_dir_all(&wd).unwrap();
427 let v1 = home.path().join("versions").join("0.33.800");
429 let v2 = home.path().join("versions").join("0.33.821");
430 make_version_db(&v1, &[("inst-1", "demo", 100, &wd.to_string_lossy())]);
431 make_version_db(&v2, &[("inst-1", "demo", 200, &wd.to_string_lossy())]);
432
433 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
434 assert_eq!(stats.rows_seen, 2);
435 assert_eq!(stats.records_written, 1);
436 let recs = reg.list_active().unwrap();
437 assert_eq!(recs.len(), 1);
438 assert_eq!(recs[0].data.last_launched_at_ms, 200);
439 }
440
441 #[test]
442 fn migrate_skips_when_registry_already_has_record() {
443 let (home, reg) = fresh_home();
444 let agents_root = home.path().join("agents");
445 std::fs::create_dir_all(&agents_root).unwrap();
446 let wd = agents_root.join("demo");
447 std::fs::create_dir_all(&wd).unwrap();
448 reg.upsert(&NamedAgentRecord {
450 schema_version: MAX_SUPPORTED_SCHEMA,
451 data: NamedAgentRecordV1 {
452 instance_id: "inst-1".to_string(),
453 instance_name: "preexisting".to_string(),
454 definition_id: "claude-code".to_string(),
455 identity_id: None,
456 memory_id: None,
457 working_dir: "demo".to_string(),
458 created_at_ms: 50,
459 last_launched_at_ms: 500,
460 created_by_version: "0.33.823".to_string(),
461 last_launched_by_version: "0.33.823".to_string(),
462 },
463 })
464 .unwrap();
465 let v_dir = home.path().join("versions").join("0.33.821");
467 make_version_db(&v_dir, &[("inst-1", "legacyname", 100, &wd.to_string_lossy())]);
468
469 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
470 assert_eq!(stats.records_skipped_existing, 1);
471 assert_eq!(stats.records_written, 0);
472 let recs = reg.list_active().unwrap();
474 assert_eq!(recs.len(), 1);
475 assert_eq!(recs[0].data.instance_name, "preexisting");
476 }
477
478 #[test]
479 fn migrate_skips_when_record_is_retired() {
480 let (home, reg) = fresh_home();
484 let agents_root = home.path().join("agents");
485 std::fs::create_dir_all(&agents_root).unwrap();
486 let wd = agents_root.join("demo");
487 std::fs::create_dir_all(&wd).unwrap();
488
489 let retired_record = NamedAgentRecord {
491 schema_version: MAX_SUPPORTED_SCHEMA,
492 data: NamedAgentRecordV1 {
493 instance_id: "inst-1".to_string(),
494 instance_name: "demo".to_string(),
495 definition_id: "claude-code".to_string(),
496 identity_id: None,
497 memory_id: None,
498 working_dir: "demo".to_string(),
499 created_at_ms: 50,
500 last_launched_at_ms: 50,
501 created_by_version: "0.33.823".to_string(),
502 last_launched_by_version: "0.33.823".to_string(),
503 },
504 };
505 reg.upsert(&retired_record).unwrap();
506 reg.retire("inst-1").unwrap();
507 assert!(reg.list_active().unwrap().is_empty());
508 assert!(reg.exists_anywhere("inst-1"));
509
510 let v_dir = home.path().join("versions").join("0.33.821");
512 make_version_db(&v_dir, &[("inst-1", "demo", 100, &wd.to_string_lossy())]);
513
514 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
515 assert_eq!(stats.rows_seen, 1);
516 assert_eq!(stats.records_skipped_existing, 1);
517 assert_eq!(stats.records_written, 0);
518 assert!(reg.list_active().unwrap().is_empty());
520 assert!(reg.root().join("retired").join("inst-1.json").exists());
521 }
522
523 #[test]
524 fn migrate_silently_skips_pre_v8_schema() {
525 let (home, reg) = fresh_home();
531 let v_dir = home.path().join("versions").join("0.33.643");
534 let db_dir = v_dir.join("data").join("db");
535 std::fs::create_dir_all(&db_dir).unwrap();
536 let db_path = db_dir.join("objects.db");
537 let conn = Connection::open(&db_path).unwrap();
538 conn.execute_batch(
539 "CREATE TABLE db_agent_instances (
540 id TEXT PRIMARY KEY,
541 definition_id TEXT NOT NULL DEFAULT '',
542 parent_instance_id TEXT NOT NULL DEFAULT '',
543 started_at INTEGER NOT NULL DEFAULT 0,
544 created_at INTEGER NOT NULL DEFAULT 0
545 );",
546 )
547 .unwrap();
548 drop(conn);
549
550 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
551 assert_eq!(stats.versions_scanned, 1);
552 assert_eq!(stats.rows_seen, 0);
553 assert!(stats.complete, "pre-v8 schema must not block the marker");
554 assert!(reg.root().join(MARKER).exists());
555 }
556
557 #[test]
558 fn migrate_writes_legacy_hidden_row_as_tombstone() {
559 let (home, reg) = fresh_home();
563 let agents_root = home.path().join("agents");
564 std::fs::create_dir_all(&agents_root).unwrap();
565 let wd = agents_root.join("forgotten");
566 std::fs::create_dir_all(&wd).unwrap();
567 let v_dir = home.path().join("versions").join("0.33.821");
568 make_version_db_with_hidden(
569 &v_dir,
570 &[("inst-1", "forgotten", 100, &wd.to_string_lossy(), true)],
571 );
572
573 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
574 assert_eq!(stats.records_written, 1);
575 assert!(reg.list_active().unwrap().is_empty(),
576 "hidden legacy row must NOT appear active");
577 assert!(reg.root().join("retired").join("inst-1.json").exists(),
578 "hidden legacy row must be migrated as retired tombstone");
579 }
580
581 #[test]
582 fn migrate_preserves_forget_intent_across_versions() {
583 let (home, reg) = fresh_home();
587 let agents_root = home.path().join("agents");
588 std::fs::create_dir_all(&agents_root).unwrap();
589 let wd = agents_root.join("toggled");
590 std::fs::create_dir_all(&wd).unwrap();
591 let v1 = home.path().join("versions").join("0.33.800");
592 let v2 = home.path().join("versions").join("0.33.821");
593 make_version_db_with_hidden(
595 &v1,
596 &[("inst-1", "toggled", 100, &wd.to_string_lossy(), false)],
597 );
598 make_version_db_with_hidden(
599 &v2,
600 &[("inst-1", "toggled", 200, &wd.to_string_lossy(), true)],
601 );
602
603 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
604 assert_eq!(stats.records_written, 1);
605 assert!(reg.list_active().unwrap().is_empty(),
606 "hidden intent in any version must propagate to registry tombstone");
607 assert!(reg.root().join("retired").join("inst-1.json").exists());
608 }
609
610 #[test]
611 fn migrate_defers_marker_on_unreadable_db() {
612 let (home, reg) = fresh_home();
616 let agents_root = home.path().join("agents");
618 std::fs::create_dir_all(&agents_root).unwrap();
619 let wd = agents_root.join("demo");
620 std::fs::create_dir_all(&wd).unwrap();
621 let good_v = home.path().join("versions").join("0.33.821");
622 make_version_db(&good_v, &[("inst-good", "demo", 100, &wd.to_string_lossy())]);
623 let bad_v = home.path().join("versions").join("0.33.800");
625 let bad_db_dir = bad_v.join("data").join("db");
626 std::fs::create_dir_all(&bad_db_dir).unwrap();
627 std::fs::write(bad_db_dir.join("objects.db"), b"not actually sqlite").unwrap();
628
629 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
630 assert_eq!(stats.records_written, 1, "good DB still migrated");
631 assert!(
632 !reg.root().join(MARKER).exists(),
633 "marker MUST NOT be written when any DB was unreadable"
634 );
635
636 std::fs::remove_file(bad_db_dir.join("objects.db")).unwrap();
642 make_version_db(&bad_v, &[("inst-other", "demo", 50, &wd.to_string_lossy())]);
643 let stats2 = migrate_from_sqlite_once(home.path(), ®).unwrap();
644 assert!(stats2.complete, "complete flag set on clean retry");
645 assert!(
646 reg.root().join(MARKER).exists(),
647 "marker written on the retry once all DBs read successfully"
648 );
649 assert_eq!(stats2.records_skipped_existing, 1);
651 assert_eq!(stats2.records_written, 1);
652 }
653
654 #[test]
655 fn migrate_skips_unmappable_working_dirs() {
656 let (home, reg) = fresh_home();
657 let v_dir = home.path().join("versions").join("0.33.821");
659 let outside = home.path().join("not_under_agents").join("foo");
660 make_version_db(&v_dir, &[("inst-x", "demo", 100, &outside.to_string_lossy())]);
661
662 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
663 assert_eq!(stats.rows_seen, 1);
664 assert_eq!(stats.records_skipped_unmappable, 1);
665 assert_eq!(stats.records_written, 0);
666 }
667
668 #[test]
669 fn migrate_tolerates_missing_or_corrupt_dbs() {
670 let (home, reg) = fresh_home();
671 std::fs::create_dir_all(home.path().join("versions").join("0.33.700")).unwrap();
673 let bad_v = home.path().join("versions").join("0.33.701");
675 let db_dir = bad_v.join("data").join("db");
676 std::fs::create_dir_all(&db_dir).unwrap();
677 std::fs::write(db_dir.join("objects.db"), b"not a sqlite file").unwrap();
678
679 let stats = migrate_from_sqlite_once(home.path(), ®).unwrap();
680 assert_eq!(stats.versions_scanned, 1);
685 assert_eq!(stats.records_written, 0);
686 assert!(
687 !reg.root().join(MARKER).exists(),
688 "marker deferred on unreadable DB"
689 );
690 }
691}