agentmux_srv\backend/
agent_session.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent-anchored session zones (Option E, PR 1 of 2).
5//!
6//! A session zone is bound to the **agent definition** (`definition_id`),
7//! not the identity bundle the agent currently uses. Two agents that
8//! share an identity bundle keep separate session histories.
9//!
10//! ## Zone naming
11//!
12//! - Active:   `agent:<defId>:current`
13//! - Archived: `agent:<defId>:archive:<unix_ms>`
14//!
15//! Files inside a zone keep the existing shape:
16//! `output.state.json` (full UI snapshot) and `output` (raw NDJSON
17//! stream for crash-recovery replay).
18//!
19//! ## Migration (`migrate_block_zones_v1`)
20//!
21//! On first srv startup after this PR ships, the per-block zones (the
22//! pre-Option-E layout where each `blockId` owned its own zone) are
23//! back-filled into the new per-agent layout:
24//!
25//! 1. For each `db_block` row with `meta.view = "agent"` and a
26//!    non-empty `output.state.json` in its block-keyed zone, copy the
27//!    contents to `agent:<defId>:archive:<block-zone-createdts>`.
28//! 2. For each definition, copy the most-recently-modified per-block
29//!    snapshot to `agent:<defId>:current`.
30//! 3. Write the marker file `<data_dir>/migration_agent_zones_v1.flag`.
31//!
32//! The migration is read-only against the existing per-block zones —
33//! GC of those is a later PR.
34//!
35//! See `docs/specs/SPEC_CONTINUATION_SESSION_PERSISTENCE_2026_05_23.md`.
36
37use std::collections::HashMap;
38use std::path::Path;
39use std::sync::Arc;
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use crate::backend::obj::Block;
43use crate::backend::storage::filestore::{FileMeta, FileOpts, FileStore};
44use crate::backend::storage::wstore::WaveStore;
45
46// ---------------------------------------------------------------------------
47// File names within an agent session zone (mirrors per-block zone shape)
48// ---------------------------------------------------------------------------
49
50/// Full UI snapshot (JSON). Frontend reads this on pane mount.
51pub const SNAPSHOT_FILE: &str = "output.state.json";
52/// Raw NDJSON stream for crash-recovery replay.
53pub const OUTPUT_FILE: &str = "output";
54
55/// Marker file name for the per-data-dir one-shot migration gate.
56pub const MIGRATION_MARKER_V1: &str = "migration_agent_zones_v1.flag";
57
58// ---------------------------------------------------------------------------
59// Zone helpers
60// ---------------------------------------------------------------------------
61
62/// Returns true if `s` matches `[A-Za-z0-9_-]+`. Rejects empty.
63///
64/// We're embedding `definition_id` into a zone name (a string the
65/// frontend can supply via RPC), so anything outside the safe set would
66/// let an attacker write/read arbitrary zones. UUIDs (the production
67/// definition_id shape) are a strict subset of this character class.
68pub fn is_valid_definition_id(s: &str) -> bool {
69    if s.is_empty() {
70        return false;
71    }
72    s.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
73}
74
75/// `agent:<definition_id>:current`. Panics in debug if `definition_id`
76/// is invalid; callers should `validate_definition_id` first in release.
77pub fn agent_current_zone(definition_id: &str) -> String {
78    debug_assert!(
79        is_valid_definition_id(definition_id),
80        "agent_current_zone: invalid definition_id"
81    );
82    format!("agent:{}:current", definition_id)
83}
84
85/// `agent:<definition_id>:archive:<ts_ms>`.
86pub fn agent_archive_zone(definition_id: &str, ts_ms: u64) -> String {
87    debug_assert!(
88        is_valid_definition_id(definition_id),
89        "agent_archive_zone: invalid definition_id"
90    );
91    format!("agent:{}:archive:{}", definition_id, ts_ms)
92}
93
94/// Convenience: validate + build the current-zone string. Returns
95/// `Err` with a stable error prefix on bad input so RPC callers see a
96/// consistent message.
97pub fn validate_and_current(definition_id: &str) -> Result<String, String> {
98    if !is_valid_definition_id(definition_id) {
99        return Err(format!(
100            "INVALID_DEFINITION_ID: must match [A-Za-z0-9_-]+, got {:?}",
101            definition_id
102        ));
103    }
104    Ok(agent_current_zone(definition_id))
105}
106
107fn now_ms() -> u64 {
108    SystemTime::now()
109        .duration_since(UNIX_EPOCH)
110        .unwrap_or_default()
111        .as_millis() as u64
112}
113
114// ---------------------------------------------------------------------------
115// FileStore operations on agent session zones
116// ---------------------------------------------------------------------------
117
118/// Write `content` to `output.state.json` in `agent:<defId>:current`.
119/// Idempotent — creates the file if missing, overwrites otherwise.
120pub fn write_session_state(
121    filestore: &FileStore,
122    definition_id: &str,
123    content: &[u8],
124) -> Result<(), String> {
125    let zone = validate_and_current(definition_id)?;
126    write_zone_file(filestore, &zone, SNAPSHOT_FILE, content)
127}
128
129/// Append `line` (with a trailing newline added if not present) to
130/// `output` in `agent:<defId>:current`. Creates the file if missing.
131pub fn append_session_output(
132    filestore: &FileStore,
133    definition_id: &str,
134    line: &str,
135) -> Result<u64, String> {
136    let zone = validate_and_current(definition_id)?;
137    // Normalize to NDJSON: each line ends with exactly one '\n'.
138    let mut buf = line.as_bytes().to_vec();
139    if !buf.ends_with(b"\n") {
140        buf.push(b'\n');
141    }
142    ensure_file(filestore, &zone, OUTPUT_FILE)?;
143    filestore
144        .append_data(&zone, OUTPUT_FILE, &buf)
145        .map_err(|e| format!("append_data: {e}"))?;
146    Ok(buf.len() as u64)
147}
148
149/// Read `output.state.json` from `agent:<defId>:current`. Returns
150/// `Ok((None, None))` when the zone doesn't exist — that's the
151/// "fresh agent, nothing to restore" path and is NOT an error.
152pub fn read_session_state(
153    filestore: &FileStore,
154    definition_id: &str,
155) -> Result<(Option<String>, Option<i64>), String> {
156    let zone = validate_and_current(definition_id)?;
157    let stat = filestore
158        .stat(&zone, SNAPSHOT_FILE)
159        .map_err(|e| format!("stat: {e}"))?;
160    let Some(file) = stat else {
161        return Ok((None, None));
162    };
163    let bytes = filestore
164        .read_file(&zone, SNAPSHOT_FILE)
165        .map_err(|e| format!("read_file: {e}"))?
166        .unwrap_or_default();
167    let content = String::from_utf8_lossy(&bytes).into_owned();
168    Ok((Some(content), Some(file.modts)))
169}
170
171/// Archive `agent:<defId>:current` to `agent:<defId>:archive:<now_ms>`.
172///
173/// Atomicity contract:
174/// - We write the archive zone first, then clear the current zone.
175/// - A crash between those two steps leaves both zones populated;
176///   replay-time behaviour is "current wins" so this is safe.
177/// - We never clear `:current` before the archive write has been
178///   acked by FileStore, so the archive-missing case can't happen
179///   without a FileStore I/O failure on the write itself (which is
180///   surfaced as `Err` and aborts the clear step).
181///
182/// Returns:
183/// - `Ok(Some((archive_zoneid, archived_at_ms)))` on successful
184///   archive (current zone had content).
185/// - `Ok(None)` when there was nothing to archive (no
186///   `output.state.json` in :current, OR it was zero-byte). The
187///   caller should treat this as "session was empty, nothing to do"
188///   — we explicitly do NOT create an empty archive zone.
189pub fn archive_session(
190    filestore: &FileStore,
191    definition_id: &str,
192) -> Result<Option<(String, i64)>, String> {
193    let current_zone = validate_and_current(definition_id)?;
194
195    // Determine whether there's anything worth archiving.
196    let state_stat = filestore
197        .stat(&current_zone, SNAPSHOT_FILE)
198        .map_err(|e| format!("stat current: {e}"))?;
199    let has_state = match &state_stat {
200        Some(f) => f.size > 0,
201        None => false,
202    };
203    let output_stat = filestore
204        .stat(&current_zone, OUTPUT_FILE)
205        .map_err(|e| format!("stat current output: {e}"))?;
206    let has_output = match &output_stat {
207        Some(f) => f.size > 0,
208        None => false,
209    };
210    if !has_state && !has_output {
211        return Ok(None);
212    }
213
214    let ts = now_ms();
215    let archive_zone = agent_archive_zone(definition_id, ts);
216
217    // Copy snapshot first (the canonical "history" file).
218    if has_state {
219        let snapshot_bytes = filestore
220            .read_file(&current_zone, SNAPSHOT_FILE)
221            .map_err(|e| format!("read current snapshot: {e}"))?
222            .unwrap_or_default();
223        write_zone_file(filestore, &archive_zone, SNAPSHOT_FILE, &snapshot_bytes)?;
224    }
225    // Copy NDJSON output if present.
226    if has_output {
227        let output_bytes = filestore
228            .read_file(&current_zone, OUTPUT_FILE)
229            .map_err(|e| format!("read current output: {e}"))?
230            .unwrap_or_default();
231        if !output_bytes.is_empty() {
232            write_zone_file(filestore, &archive_zone, OUTPUT_FILE, &output_bytes)?;
233        }
234    }
235
236    // Archive write succeeded. Now safe to clear the current zone.
237    if state_stat.is_some() {
238        if let Err(e) = filestore.delete_file(&current_zone, SNAPSHOT_FILE) {
239            tracing::warn!(
240                definition_id = %definition_id,
241                error = %e,
242                "agent_session: failed to clear current snapshot after archive (archive already persisted)"
243            );
244        }
245    }
246    if output_stat.is_some() {
247        if let Err(e) = filestore.delete_file(&current_zone, OUTPUT_FILE) {
248            tracing::warn!(
249                definition_id = %definition_id,
250                error = %e,
251                "agent_session: failed to clear current output after archive (archive already persisted)"
252            );
253        }
254    }
255
256    tracing::info!(
257        definition_id = %definition_id,
258        archive_zoneid = %archive_zone,
259        archived_at_ms = ts,
260        "agent_session: archived current session"
261    );
262
263    Ok(Some((archive_zone, ts as i64)))
264}
265
266/// List archive zones for `definition_id`, newest first.
267///
268/// Returns up to `limit` rows. `limit = 0` means "default 20"; caller
269/// must clamp upper bounds. Each row carries a small preview lifted
270/// from the archive's `output.state.json`.
271pub fn list_archives(
272    filestore: &FileStore,
273    definition_id: &str,
274    limit: usize,
275) -> Result<Vec<ArchiveSummary>, String> {
276    if !is_valid_definition_id(definition_id) {
277        return Err(format!(
278            "INVALID_DEFINITION_ID: must match [A-Za-z0-9_-]+, got {:?}",
279            definition_id
280        ));
281    }
282    let prefix = format!("agent:{}:archive:", definition_id);
283    let limit = if limit == 0 { 20 } else { limit.min(100) };
284
285    let all_zones = filestore
286        .get_all_zone_ids()
287        .map_err(|e| format!("get_all_zone_ids: {e}"))?;
288
289    let mut matches: Vec<(u64, String)> = Vec::new();
290    for zone in all_zones {
291        if let Some(suffix) = zone.strip_prefix(&prefix) {
292            if let Ok(ts) = suffix.parse::<u64>() {
293                matches.push((ts, zone));
294            }
295        }
296    }
297    // Newest first.
298    matches.sort_by(|a, b| b.0.cmp(&a.0));
299    matches.truncate(limit);
300
301    let mut rows = Vec::with_capacity(matches.len());
302    for (ts, zone) in matches {
303        let (preview, node_count) = read_archive_preview(filestore, &zone);
304        rows.push(ArchiveSummary {
305            archive_zoneid: zone,
306            archived_at_ms: ts as i64,
307            preview,
308            node_count,
309        });
310    }
311    Ok(rows)
312}
313
314/// A single archive row. Mirrors the shape of `RecentSessionRow`'s
315/// preview fields so the frontend can reuse the same row component.
316#[derive(Debug, Clone)]
317pub struct ArchiveSummary {
318    pub archive_zoneid: String,
319    pub archived_at_ms: i64,
320    pub preview: String,
321    pub node_count: usize,
322}
323
324// ---------------------------------------------------------------------------
325// Helpers
326// ---------------------------------------------------------------------------
327
328/// Ensure a file exists in `zone`. No-op when present.
329fn ensure_file(filestore: &FileStore, zone: &str, name: &str) -> Result<(), String> {
330    match filestore.stat(zone, name) {
331        Ok(Some(_)) => Ok(()),
332        Ok(None) => filestore
333            .make_file(zone, name, FileMeta::default(), FileOpts::default())
334            .map_err(|e| format!("make_file: {e}")),
335        Err(e) => Err(format!("stat: {e}")),
336    }
337}
338
339/// Write the entire contents of a file in `zone`. Creates the file if
340/// missing, otherwise replaces all parts atomically (FileStore single-tx).
341fn write_zone_file(
342    filestore: &FileStore,
343    zone: &str,
344    name: &str,
345    content: &[u8],
346) -> Result<(), String> {
347    use crate::backend::storage::StoreError;
348    match filestore.write_file(zone, name, content) {
349        Ok(()) => Ok(()),
350        Err(StoreError::NotFound) => {
351            filestore
352                .make_file(zone, name, FileMeta::default(), FileOpts::default())
353                .map_err(|e| format!("make_file: {e}"))?;
354            filestore
355                .write_file(zone, name, content)
356                .map_err(|e| format!("write_file: {e}"))
357        }
358        Err(e) => Err(format!("write_file: {e}")),
359    }
360}
361
362/// Pull a small preview + node_count out of an archive's
363/// `output.state.json`. Returns `("", 0)` on any error.
364///
365/// Mirrors the heuristics used by `read_session_preview` in
366/// `agent_handlers.rs` (skip the bootstrap "# Session Context" message
367/// when a later user_message exists; cap at 240 chars).
368fn read_archive_preview(filestore: &FileStore, zone: &str) -> (String, usize) {
369    let bytes = match filestore.read_file(zone, SNAPSHOT_FILE) {
370        Ok(Some(b)) => b,
371        _ => return (String::new(), 0),
372    };
373    if bytes.len() > 4 * 1024 * 1024 {
374        return (String::new(), 0);
375    }
376    let json: serde_json::Value = match serde_json::from_slice(&bytes) {
377        Ok(v) => v,
378        Err(_) => return (String::new(), 0),
379    };
380    let nodes = match json.get("nodes").and_then(|v| v.as_array()) {
381        Some(a) => a,
382        None => return (String::new(), 0),
383    };
384    let node_count = nodes.len();
385    let mut preview = String::new();
386    for node in nodes {
387        let ty = node.get("type").and_then(|v| v.as_str()).unwrap_or("");
388        if ty != "user_message" {
389            continue;
390        }
391        let msg = node
392            .get("message")
393            .and_then(|v| v.as_str())
394            .unwrap_or("")
395            .trim();
396        if msg.is_empty() {
397            continue;
398        }
399        if preview.is_empty() && msg.starts_with("# Session Context") {
400            preview = collapse_preview(msg);
401            continue;
402        }
403        preview = collapse_preview(msg);
404        break;
405    }
406    (preview, node_count)
407}
408
409fn collapse_preview(s: &str) -> String {
410    const MAX_CHARS: usize = 240;
411    let mut buf = String::with_capacity(s.len().min(MAX_CHARS + 4));
412    let mut prev_space = false;
413    for ch in s.chars() {
414        if buf.chars().count() >= MAX_CHARS {
415            buf.push('\u{2026}');
416            return buf;
417        }
418        if ch.is_whitespace() {
419            if !prev_space && !buf.is_empty() {
420                buf.push(' ');
421                prev_space = true;
422            }
423        } else {
424            buf.push(ch);
425            prev_space = false;
426        }
427    }
428    buf
429}
430
431// ---------------------------------------------------------------------------
432// One-time migration: per-block zones → per-agent zones
433// ---------------------------------------------------------------------------
434
435/// Stats from `migrate_block_zones_v1`. Logged at INFO at startup.
436#[derive(Debug, Clone, Default)]
437pub struct MigrationStats {
438    pub blocks_scanned: usize,
439    pub archives_written: usize,
440    pub current_zones_seeded: usize,
441    pub skipped_no_snapshot: usize,
442    pub failures: usize,
443}
444
445/// One-shot migration of per-block agent session zones to per-agent
446/// zones. Gated by a marker file under `data_dir`; running twice is a
447/// no-op.
448///
449/// Failure mode: per-block errors are logged + counted; we do NOT
450/// abort startup. The marker file is written even on partial failure
451/// so we don't retry indefinitely — operators can delete the marker
452/// to force a re-run.
453pub fn migrate_block_zones_v1(
454    wstore: &Arc<WaveStore>,
455    filestore: &Arc<FileStore>,
456    data_dir: &Path,
457) -> MigrationStats {
458    let marker_path = data_dir.join(MIGRATION_MARKER_V1);
459    if marker_path.exists() {
460        tracing::debug!(
461            marker = %marker_path.display(),
462            "agent_session migration: marker present, skipping"
463        );
464        return MigrationStats::default();
465    }
466
467    let mut stats = MigrationStats::default();
468
469    let blocks: Vec<Block> = match wstore.get_all::<Block>() {
470        Ok(v) => v,
471        Err(e) => {
472            tracing::warn!(
473                error = %e,
474                "agent_session migration: wstore.get_all<Block> failed; skipping migration"
475            );
476            // Don't write the marker — let the next start retry.
477            return stats;
478        }
479    };
480
481    // Track the most-recently-modified block snapshot per definition_id.
482    // Value: (modts_ms, snapshot_bytes).
483    let mut per_def_latest: HashMap<String, (i64, Vec<u8>)> = HashMap::new();
484
485    for block in &blocks {
486        let view = block.meta.get("view").and_then(|v| v.as_str()).unwrap_or("");
487        if view != "agent" {
488            continue;
489        }
490        // The agent definition id is stored under either `agentId`
491        // (current shape, set by `agent.open` + frontend launch flow)
492        // or the legacy `agent:id`. Skip blocks without an id.
493        let def_id = block
494            .meta
495            .get("agentId")
496            .and_then(|v| v.as_str())
497            .or_else(|| block.meta.get("agent:id").and_then(|v| v.as_str()))
498            .unwrap_or("");
499        if !is_valid_definition_id(def_id) {
500            continue;
501        }
502        stats.blocks_scanned += 1;
503
504        // Read the per-block snapshot. Both missing and zero-byte are
505        // "skip" — no point archiving an empty snapshot.
506        let snapshot_stat = match filestore.stat(&block.oid, SNAPSHOT_FILE) {
507            Ok(Some(f)) => f,
508            Ok(None) => {
509                stats.skipped_no_snapshot += 1;
510                continue;
511            }
512            Err(e) => {
513                tracing::warn!(
514                    block_id = %block.oid,
515                    error = %e,
516                    "agent_session migration: stat failed; skipping"
517                );
518                stats.failures += 1;
519                continue;
520            }
521        };
522        if snapshot_stat.size == 0 {
523            stats.skipped_no_snapshot += 1;
524            continue;
525        }
526
527        let snapshot_bytes = match filestore.read_file(&block.oid, SNAPSHOT_FILE) {
528            Ok(Some(b)) => b,
529            _ => {
530                stats.failures += 1;
531                continue;
532            }
533        };
534
535        // 1) Backfill an archive zone keyed on the block snapshot's
536        //    createdts (closest available proxy for "when this
537        //    conversation started"). Falls back to modts when
538        //    createdts is missing/zero.
539        let mut archive_ts: u64 = if snapshot_stat.createdts > 0 {
540            snapshot_stat.createdts as u64
541        } else if snapshot_stat.modts > 0 {
542            snapshot_stat.modts as u64
543        } else {
544            now_ms()
545        };
546        // Avoid collisions when multiple block zones share the same
547        // createdts (test fixtures, second-precision rounding, etc.):
548        // bump the timestamp by 1ms until the archive zone is unique.
549        loop {
550            let candidate = agent_archive_zone(def_id, archive_ts);
551            let occupied = matches!(
552                filestore.stat(&candidate, SNAPSHOT_FILE),
553                Ok(Some(_))
554            );
555            if !occupied {
556                break;
557            }
558            archive_ts += 1;
559        }
560        let archive_zone = agent_archive_zone(def_id, archive_ts);
561        if let Err(e) = write_zone_file(filestore, &archive_zone, SNAPSHOT_FILE, &snapshot_bytes) {
562            tracing::warn!(
563                block_id = %block.oid,
564                definition_id = %def_id,
565                error = %e,
566                "agent_session migration: archive write failed"
567            );
568            stats.failures += 1;
569            continue;
570        }
571        stats.archives_written += 1;
572
573        // 2) Track the most-recently-modified per definition so we
574        //    can seed the `:current` zone after the scan.
575        let entry = per_def_latest
576            .entry(def_id.to_string())
577            .or_insert_with(|| (0, Vec::new()));
578        if snapshot_stat.modts > entry.0 {
579            *entry = (snapshot_stat.modts, snapshot_bytes);
580        }
581    }
582
583    // 3) Seed `:current` for each definition from its
584    //    most-recently-modified per-block snapshot. If a `:current`
585    //    zone is already populated (e.g. a partial prior migration
586    //    left it behind), skip — we don't want to overwrite live data.
587    for (def_id, (_modts, bytes)) in per_def_latest {
588        let current_zone = agent_current_zone(&def_id);
589        let already = matches!(
590            filestore.stat(&current_zone, SNAPSHOT_FILE),
591            Ok(Some(f)) if f.size > 0
592        );
593        if already {
594            continue;
595        }
596        match write_zone_file(filestore, &current_zone, SNAPSHOT_FILE, &bytes) {
597            Ok(()) => {
598                stats.current_zones_seeded += 1;
599            }
600            Err(e) => {
601                tracing::warn!(
602                    definition_id = %def_id,
603                    error = %e,
604                    "agent_session migration: current-zone seed failed"
605                );
606                stats.failures += 1;
607            }
608        }
609    }
610
611    // Write marker — even on partial failure (see doc comment).
612    if let Err(e) = std::fs::write(&marker_path, b"v1\n") {
613        tracing::warn!(
614            marker = %marker_path.display(),
615            error = %e,
616            "agent_session migration: marker write failed; migration may re-run on next startup"
617        );
618    }
619
620    tracing::info!(
621        blocks_scanned = stats.blocks_scanned,
622        archives_written = stats.archives_written,
623        current_zones_seeded = stats.current_zones_seeded,
624        skipped_no_snapshot = stats.skipped_no_snapshot,
625        failures = stats.failures,
626        "agent_session migration: complete"
627    );
628
629    stats
630}
631
632// ---------------------------------------------------------------------------
633// Two-tier picker — Phase 1 migration (seeded-def → user-agent promote)
634// ---------------------------------------------------------------------------
635
636/// Marker file name for the Phase 1 two-tier-picker migration.
637///
638/// **Vestigial.** Originally gated `migrate_promote_template_sessions_v1`
639/// as a one-shot. The 2026-05-24 self-idempotency rework moved gating
640/// to the data invariant ("no seeded def has a session zone"), so the
641/// migration runs on every startup and is a no-op when the invariant
642/// already holds. The constant + `data_dir` parameter on the migration
643/// function are kept for API/import compatibility and so the legacy
644/// marker file (if present from an earlier portable run) isn't
645/// resurrected. Operators may delete the file; the migration ignores
646/// it either way.
647pub const TEMPLATE_PROMOTE_MARKER_V1: &str = "migration_template_promote_v1.flag";
648
649/// Stats from `migrate_promote_template_sessions_v1`. Logged at INFO.
650#[derive(Debug, Clone, Default)]
651pub struct TemplatePromoteStats {
652    pub templates_scanned: usize,
653    pub templates_promoted: usize,
654    /// Total archive zones moved across all promotions.
655    pub archives_moved: usize,
656    /// Total instances repointed via
657    /// `wstore.instance_repoint_definition`.
658    pub instances_repointed: usize,
659    pub failures: usize,
660}
661
662/// Phase 1 two-tier picker migration: promote any seeded template that
663/// carries a session zone into a fresh user-owned definition, then move
664/// its `:current` + `:archive:*` zones onto the new definition_id.
665///
666/// Why this exists (Q1 = Option C in
667/// `docs/specs/SPEC_AGENT_PICKER_TWO_TIER_2026_05_24.md`):
668/// after the picker UI split, clicking a "template" card in the
669/// Templates section MUST create a new agent — not silently append to
670/// whatever session the user previously ran against that template
671/// directly (e.g. "Maks's conversation" living at `agent:claude:current`).
672/// Without this migration the template card would either reattach to
673/// the existing session (broken — wrong intent) or be effectively
674/// non-functional. The migration moves any such pre-existing session
675/// out of the template namespace onto a new user-owned definition so
676/// the template is pristine post-migration.
677///
678/// Algorithm:
679/// 1. List zone ids; partition the `agent:<id>:current` and
680///    `agent:<id>:archive:*` zones by definition id.
681/// 2. For each definition id with at least one zone, look up the
682///    matching `db_agent_definitions` row.
683///    - Skip if missing (zone refers to a deleted definition).
684///    - Skip if `is_seeded = 0` (already user-owned — no work).
685///    - Otherwise: clone the template into a new user definition
686///      (mirrors `agent_def_create_from_template` semantics).
687/// 3. Pick the new name: most-recently-active named instance's
688///    `instance_name` if any exists, else fall back to the template's
689///    own `name`.
690/// 4. Move every matching zone (`:current` + every `:archive:*`)
691///    from the old defId to the new defId via FileStore's existing
692///    write-then-delete pattern.
693/// 5. Repoint every `db_agent_instances` row that referenced the old
694///    defId to point at the new defId (preserves the
695///    `continueOfInstanceId` reattach flow).
696///
697/// Idempotency: the migration is **self-gated on the data invariant**
698/// ("no seeded def has a session zone"). It runs on every startup;
699/// when the invariant already holds the inner loop has zero
700/// iterations and returns the default stats in sub-ms. There used to
701/// be a marker-file gate (`TEMPLATE_PROMOTE_MARKER_V1`), but it
702/// produced an "early-marker" failure mode: a portable launched at v
703/// N had no seeded-def zones, set the marker, and on v N+1 startups
704/// (when seeded-def zones DID exist from prior real use) the marker
705/// caused the migration to skip. The 2026-05-24 rework dropped the
706/// marker check; this is safe because the seeded-def-with-zone
707/// invariant is detectable per-startup at constant cost. `data_dir`
708/// is retained for API compatibility.
709///
710/// Failure mode: per-template errors are logged + counted; we DO NOT
711/// abort startup. Errors that prevent a template from being promoted
712/// leave its zones in place; the next startup retries (no marker
713/// gate to block retry).
714pub fn migrate_promote_template_sessions_v1(
715    wstore: &Arc<WaveStore>,
716    filestore: &Arc<FileStore>,
717    _data_dir: &Path,
718) -> TemplatePromoteStats {
719
720    let mut stats = TemplatePromoteStats::default();
721
722    let all_zones = match filestore.get_all_zone_ids() {
723        Ok(v) => v,
724        Err(e) => {
725            tracing::warn!(
726                error = %e,
727                "template_promote migration: get_all_zone_ids failed; aborting (will retry next start)"
728            );
729            return stats;
730        }
731    };
732
733    // Group zone ids by definition id. A zone counts if it matches
734    // `agent:<id>:current` OR `agent:<id>:archive:<ts>`. Anything else
735    // (e.g. legacy per-block zones the prior migration didn't sweep)
736    // is ignored by this migration.
737    let mut per_def_zones: HashMap<String, Vec<String>> = HashMap::new();
738    for zone in &all_zones {
739        let rest = match zone.strip_prefix("agent:") {
740            Some(r) => r,
741            None => continue,
742        };
743        // `<defId>:current` or `<defId>:archive:<ts>`
744        let (def_id, tail) = match rest.split_once(':') {
745            Some(p) => p,
746            None => continue,
747        };
748        if !is_valid_definition_id(def_id) {
749            continue;
750        }
751        let is_current = tail == "current";
752        let is_archive = tail.starts_with("archive:");
753        if !is_current && !is_archive {
754            continue;
755        }
756        per_def_zones
757            .entry(def_id.to_string())
758            .or_default()
759            .push(zone.clone());
760    }
761
762    // Fetch all definitions ONCE so per-template lookups don't re-hit
763    // SQLite in a loop.
764    let defs = match wstore.agent_def_list() {
765        Ok(v) => v,
766        Err(e) => {
767            tracing::warn!(
768                error = %e,
769                "template_promote migration: agent_def_list failed; aborting (will retry next start)"
770            );
771            return stats;
772        }
773    };
774
775    for (old_def_id, zones) in per_def_zones {
776        // Look up the definition row this zone is bound to.
777        let template = match defs.iter().find(|d| d.id == old_def_id) {
778            Some(d) => d,
779            None => {
780                // Zone points at a deleted definition — leave it
781                // alone; a future GC pass can clean orphans.
782                continue;
783            }
784        };
785        // Only seeded templates need promotion. User-owned defs are
786        // already on the new model.
787        if template.is_seeded != 1 {
788            continue;
789        }
790        stats.templates_scanned += 1;
791
792        // Pick the new agent name: most-recently-active named instance
793        // for this template, else fall back to the template's own name.
794        // `instance_list_named` already filters to non-hidden + named
795        // rows + sorts by `started_at DESC`, so the first row is the
796        // pick.
797        // Include continuations: a user who clicked Maks today and
798        // resumed three times has only continuation rows for that
799        // definition; the head row is whatever they originally
800        // named the agent. Picking the most-recent continuation
801        // surfaces the same `instance_name` they used last.
802        let new_name = match wstore.instance_list_named(
803            1,
804            Some(&old_def_id),
805            /* include_continuations */ true,
806        ) {
807            Ok(rows) => rows
808                .into_iter()
809                .next()
810                .map(|i| i.instance_name)
811                .filter(|n| !n.is_empty())
812                .unwrap_or_else(|| template.name.clone()),
813            Err(e) => {
814                tracing::warn!(
815                    template_id = %old_def_id,
816                    error = %e,
817                    "template_promote migration: instance_list_named failed; using template name"
818                );
819                template.name.clone()
820            }
821        };
822
823        // Idempotency: the migration uses a DETERMINISTIC clone id
824        // (`template-promote-v1-<template_id>`) so every retry of
825        // every partial-failure scenario targets the same clone.
826        // Successful prior steps (zone moves, instance repoints)
827        // are reused; failed steps re-attempt against the same
828        // destination. There is no way to "fork" the migration
829        // into a different clone id, so the unbounded-duplicate
830        // failure modes from codex P1 rounds 1+2 cannot recur:
831        //
832        //   1. Insert def: idempotent via `SELECT WHERE id = ?1`
833        //      first; new row only on absence. PK uniqueness on
834        //      the deterministic id catches any race.
835        //   2. move_zone: write-then-delete; replay copies the
836        //      same content to the same destination (no-op when
837        //      already moved), retries the source delete.
838        //   3. instance_repoint_definition: UPDATE on rows whose
839        //      definition_id = old; rows already at new are a
840        //      no-op SET.
841        //
842        // The deterministic id also distinguishes the migration's
843        // own clone from any user-created "+ New from template"
844        // clone (which lives under a fresh UUID), so we never
845        // clobber a user's live session.
846        let promote_target_id =
847            format!("template-promote-v1-{}", template.id);
848        debug_assert!(
849            is_valid_definition_id(&promote_target_id),
850            "deterministic promote-target id must satisfy the zone-id charset"
851        );
852
853        let existing_target = match wstore.agent_def_get(&promote_target_id) {
854            Ok(Some(def)) => Some(def),
855            Ok(None) => None,
856            Err(e) => {
857                tracing::warn!(
858                    template_id = %old_def_id,
859                    promote_target_id = %promote_target_id,
860                    error = %e,
861                    "template_promote migration: agent_def_get failed; aborting this template"
862                );
863                stats.failures += 1;
864                continue;
865            }
866        };
867        let new_def = if let Some(existing) = existing_target {
868            tracing::info!(
869                template_id = %old_def_id,
870                promote_target_id = %promote_target_id,
871                "template_promote migration: reusing prior promote-target clone (idempotent retry)"
872            );
873            existing
874        } else {
875            // Clone the template into a new user-owned definition
876            // at the deterministic id. Field copies mirror
877            // `agent_def_create_from_template`.
878            let now = now_ms() as i64;
879            let mut new_def = crate::backend::storage::wstore::AgentDefinition {
880                id: promote_target_id.clone(),
881                slug: String::new(),
882                name: new_name.clone(),
883                icon: template.icon.clone(),
884                provider: template.provider.clone(),
885                description: template.description.clone(),
886                working_directory: String::new(),
887                shell: template.shell.clone(),
888                provider_flags: template.provider_flags.clone(),
889                auto_start: 0,
890                restart_on_crash: template.restart_on_crash,
891                idle_timeout_minutes: template.idle_timeout_minutes,
892                created_at: now,
893                agent_type: template.agent_type.clone(),
894                environment: template.environment.clone(),
895                agent_bus_id: String::new(),
896                is_seeded: 0,
897                accounts: String::new(),
898                parent_id: template.id.clone(),
899                branch_label: String::new(),
900                updated_at: now,
901                user_hidden: 0,
902            };
903            if let Err(e) = wstore.agent_def_insert(&mut new_def) {
904                tracing::warn!(
905                    template_id = %old_def_id,
906                    promote_target_id = %promote_target_id,
907                    error = %e,
908                    "template_promote migration: agent_def_insert failed; skipping this template"
909                );
910                stats.failures += 1;
911                continue;
912            }
913            new_def
914        };
915
916        // Move every matching zone (current + archives) onto the new
917        // definition id. Per-zone failures are logged but don't abort
918        // the whole template — best-effort.
919        let mut archives_for_this_def: usize = 0;
920        for old_zone in &zones {
921            // Build the new zone id by swapping the def-id segment.
922            // We know `old_zone` starts with `agent:<old_def_id>:`
923            // (per the bucketing above), so substring-replace is safe.
924            let suffix = match old_zone.strip_prefix(&format!("agent:{}:", old_def_id)) {
925                Some(s) => s,
926                None => continue,
927            };
928            let new_zone = format!("agent:{}:{}", new_def.id, suffix);
929            let is_archive = suffix.starts_with("archive:");
930
931            if let Err(e) = move_zone(filestore, old_zone, &new_zone) {
932                tracing::warn!(
933                    template_id = %old_def_id,
934                    old_zone = %old_zone,
935                    new_zone = %new_zone,
936                    error = %e,
937                    "template_promote migration: move_zone failed"
938                );
939                stats.failures += 1;
940                continue;
941            }
942            if is_archive {
943                archives_for_this_def += 1;
944            }
945        }
946
947        // Repoint any in-DB instances referencing this template at
948        // the new user-owned definition. Without this, the existing
949        // continueOfInstanceId reattach flow would still look up the
950        // template and pass through the un-promoted definition_id.
951        let repointed = match wstore.instance_repoint_definition(&old_def_id, &new_def.id) {
952            Ok(n) => n,
953            Err(e) => {
954                tracing::warn!(
955                    template_id = %old_def_id,
956                    new_definition_id = %new_def.id,
957                    error = %e,
958                    "template_promote migration: instance_repoint_definition failed"
959                );
960                stats.failures += 1;
961                0
962            }
963        };
964        stats.instances_repointed += repointed;
965        stats.archives_moved += archives_for_this_def;
966        stats.templates_promoted += 1;
967        tracing::info!(
968            template_id = %old_def_id,
969            template_name = %template.name,
970            new_definition_id = %new_def.id,
971            new_name = %new_def.name,
972            archives_moved = archives_for_this_def,
973            instances_repointed = repointed,
974            "template_promote migration: promoted template into user agent"
975        );
976    }
977
978    // Marker write removed in the 2026-05-24 self-idempotency rework
979    // (see doc comment above). The invariant "no seeded def carries a
980    // session zone" is checked on every startup; when it already holds
981    // this function is a sub-ms no-op.
982
983    tracing::info!(
984        templates_scanned = stats.templates_scanned,
985        templates_promoted = stats.templates_promoted,
986        archives_moved = stats.archives_moved,
987        instances_repointed = stats.instances_repointed,
988        failures = stats.failures,
989        "template_promote migration: complete"
990    );
991
992    stats
993}
994
995/// Per-file decision inside `move_zone`'s retry-aware loop. See the
996/// doc comment in `move_zone` for which round each variant addresses.
997#[derive(Debug, Clone, Copy, PartialEq, Eq)]
998enum CopyAction {
999    /// Destination missing the file (R5 partial-copy fill).
1000    Copy,
1001    /// Source strictly newer than destination (R6 newer-source promotion).
1002    Overwrite,
1003    /// Destination strictly newer than source (R4 user-continuation
1004    /// on destination clone) — or equal-modts + equal bytes.
1005    Preserve,
1006    /// Equal modts; need to read both sides and compare bytes.
1007    TieBreakByBytes,
1008    /// Equal modts but bytes differ — neither side is canonical
1009    /// (R7 same-ms conflict). Preserve destination, leave source.
1010    Conflict,
1011}
1012
1013/// Move every file in `old_zone` to `new_zone`, preserving names + bytes.
1014/// Implemented as read-write-delete because FileStore doesn't expose a
1015/// native rename; the cost is bounded by the per-zone file count (1-2
1016/// in practice — `output.state.json` + `output`).
1017fn move_zone(
1018    filestore: &FileStore,
1019    old_zone: &str,
1020    new_zone: &str,
1021) -> Result<(), String> {
1022    let files = filestore
1023        .list_files(old_zone)
1024        .map_err(|e| format!("list_files: {e}"))?;
1025    if files.is_empty() {
1026        return Ok(());
1027    }
1028    // Per-file recency-aware copy (codex P1 rounds 4 + 5 + 6 on
1029    // PR #1017). Three retry shapes need to coexist on the same
1030    // retry path:
1031    //
1032    //   R4 — partial-failure, user continued on the destination
1033    //        clone (`:current` of the new def). Destination has
1034    //        NEWER bytes than source. Keep destination; drop
1035    //        source.
1036    //   R5 — partial-failure, prior `move_zone` wrote SOME of the
1037    //        destination files before crashing. Destination has
1038    //        only some files; the missing ones must be copied
1039    //        from source. Don't drop source until every source
1040    //        file has a counterpart at the destination.
1041    //   R6 — partial-failure, `instance_repoint_definition` was
1042    //        the step that failed. Instances still point at the
1043    //        seeded def, user continued — SOURCE bytes are newer
1044    //        than destination's stale copy. Source must NOT be
1045    //        dropped without first promoting its newer content
1046    //        to the destination.
1047    //
1048    // Resolve all three via a per-file recency-aware copy:
1049    //   - destination missing the file → COPY (R5).
1050    //   - destination has the file, src.modts ≤ dest.modts → keep
1051    //     destination, no copy (R4).
1052    //   - destination has the file, src.modts > dest.modts → copy
1053    //     source over destination (R6).
1054    // After the loop, every source file has a counterpart at the
1055    // destination; source can be safely deleted.
1056    //
1057    // `modts` ties (or zero on either side) are resolved in favor
1058    // of keeping the destination, matching the R4 semantics — the
1059    // common case for a clean first-time retry where both sides
1060    // hold identical bytes.
1061    let dest_meta: std::collections::HashMap<String, crate::backend::storage::filestore::WaveFile> = filestore
1062        .list_files(new_zone)
1063        .map_err(|e| format!("list_files (new): {e}"))?
1064        .into_iter()
1065        .map(|f| (f.name.clone(), f))
1066        .collect();
1067    let mut copied = 0usize;
1068    let mut overwritten = 0usize;
1069    let mut preserved = 0usize;
1070    let mut conflicts = 0usize;
1071    for f in &files {
1072        let dest = dest_meta.get(&f.name);
1073        let action = match dest {
1074            None => CopyAction::Copy, // R5: destination missing
1075            Some(d) if f.modts > d.modts => CopyAction::Overwrite, // R6
1076            Some(d) if d.modts > f.modts => CopyAction::Preserve, // R4
1077            Some(_) => CopyAction::TieBreakByBytes, // R7: equal modts
1078        };
1079        let resolved = match action {
1080            CopyAction::Copy | CopyAction::Overwrite => action,
1081            CopyAction::Preserve => action,
1082            CopyAction::Conflict => action, // unreachable from the matcher above; explicit for exhaustiveness
1083            CopyAction::TieBreakByBytes => {
1084                // R7 — equal modts (millisecond-granular filestore
1085                // can write source + destination within the same
1086                // ms on a real retry). Read both sides and
1087                // disambiguate by bytes.
1088                let src_bytes = filestore
1089                    .read_file(old_zone, &f.name)
1090                    .map_err(|e| format!("read_file {}: {e}", f.name))?
1091                    .unwrap_or_default();
1092                let dest_bytes = filestore
1093                    .read_file(new_zone, &f.name)
1094                    .map_err(|e| format!("read_file (dest) {}: {e}", f.name))?
1095                    .unwrap_or_default();
1096                if src_bytes == dest_bytes {
1097                    CopyAction::Preserve
1098                } else {
1099                    // Conflict: can't tell which side is canonical.
1100                    // Preserve destination (matches the round-4
1101                    // semantics — keep what the user might be
1102                    // looking at), but refuse to delete source so
1103                    // the operator (or a future GC pass that can
1104                    // compare timestamps at a higher resolution)
1105                    // can resolve. The post-loop missing-files
1106                    // check would still pass, so we signal the
1107                    // conflict via a separate counter.
1108                    CopyAction::Conflict
1109                }
1110            }
1111        };
1112        match resolved {
1113            CopyAction::Copy => {
1114                let bytes = filestore
1115                    .read_file(old_zone, &f.name)
1116                    .map_err(|e| format!("read_file {}: {e}", f.name))?
1117                    .unwrap_or_default();
1118                write_zone_file(filestore, new_zone, &f.name, &bytes)?;
1119                copied += 1;
1120            }
1121            CopyAction::Overwrite => {
1122                let bytes = filestore
1123                    .read_file(old_zone, &f.name)
1124                    .map_err(|e| format!("read_file {}: {e}", f.name))?
1125                    .unwrap_or_default();
1126                write_zone_file(filestore, new_zone, &f.name, &bytes)?;
1127                overwritten += 1;
1128            }
1129            CopyAction::Preserve => {
1130                preserved += 1;
1131            }
1132            CopyAction::Conflict => {
1133                conflicts += 1;
1134                tracing::warn!(
1135                    old_zone = %old_zone,
1136                    new_zone = %new_zone,
1137                    file = %f.name,
1138                    modts = f.modts,
1139                    "template_promote migration: same-ms conflict — bytes differ at equal modts; preserving destination + leaving source for manual recovery"
1140                );
1141            }
1142            CopyAction::TieBreakByBytes => unreachable!("resolved above"),
1143        }
1144    }
1145    if preserved > 0 || overwritten > 0 || conflicts > 0 {
1146        tracing::info!(
1147            old_zone = %old_zone,
1148            new_zone = %new_zone,
1149            copied,
1150            overwritten,
1151            preserved,
1152            conflicts,
1153            "template_promote migration: per-file move (R4 user-continuation, R5 partial-copy fill, R6 newer-source promotion, R7 same-ms conflict)"
1154        );
1155    }
1156    if conflicts > 0 {
1157        // R7: an equal-modts byte-diff was detected. We don't know
1158        // which side is canonical, so we preserve both: destination
1159        // keeps its content, source is left in place for operator
1160        // / GC recovery. Migration converges next run only if the
1161        // operator resolves the conflict externally.
1162        return Ok(());
1163    }
1164    // Verify every source file has a counterpart at the
1165    // destination before dropping source — protects against the
1166    // R5 partial-write case where write_zone_file silently leaves
1167    // a file absent at the destination despite returning Ok (no
1168    // current call path does so, but defending the invariant here
1169    // is cheap and future-proofs the helper).
1170    let post_dest: std::collections::HashSet<String> = filestore
1171        .list_files(new_zone)
1172        .map_err(|e| format!("list_files (new, post): {e}"))?
1173        .into_iter()
1174        .map(|f| f.name)
1175        .collect();
1176    let missing: Vec<&str> = files
1177        .iter()
1178        .map(|f| f.name.as_str())
1179        .filter(|n| !post_dest.contains(*n))
1180        .collect();
1181    if !missing.is_empty() {
1182        tracing::warn!(
1183            old_zone = %old_zone,
1184            new_zone = %new_zone,
1185            missing = ?missing,
1186            "template_promote migration: destination missing files post-copy; leaving source in place for retry"
1187        );
1188        return Ok(());
1189    }
1190    // Delete the source files only after every write has succeeded.
1191    // delete_zone wipes the whole zone in one transaction.
1192    if let Err(e) = filestore.delete_zone(old_zone) {
1193        // Source delete failure is non-fatal — the new zone has the
1194        // data; the old zone is now stale duplicate, GC concern.
1195        tracing::warn!(
1196            old_zone = %old_zone,
1197            error = %e,
1198            "template_promote migration: delete_zone failed after copy; source remains"
1199        );
1200    }
1201    Ok(())
1202}
1203
1204// ---------------------------------------------------------------------------
1205// Tests
1206// ---------------------------------------------------------------------------
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::backend::obj::MetaMapType;
1212    use crate::backend::storage::filestore::FileStore;
1213    use crate::backend::storage::wstore::WaveStore;
1214    use std::sync::Arc;
1215    use tempfile::tempdir;
1216
1217    fn fresh_filestore() -> Arc<FileStore> {
1218        Arc::new(FileStore::open_in_memory().unwrap())
1219    }
1220
1221    #[test]
1222    fn zone_names_match_spec() {
1223        assert_eq!(
1224            agent_current_zone("def-abc"),
1225            "agent:def-abc:current"
1226        );
1227        assert_eq!(
1228            agent_archive_zone("def-abc", 1_700_000_000_000),
1229            "agent:def-abc:archive:1700000000000"
1230        );
1231    }
1232
1233    #[test]
1234    fn validate_definition_id_rejects_bad_input() {
1235        assert!(is_valid_definition_id("abc-123_DEF"));
1236        assert!(is_valid_definition_id("a"));
1237        assert!(!is_valid_definition_id(""));
1238        // Path-traversal / zone-injection attempts.
1239        assert!(!is_valid_definition_id("../etc"));
1240        assert!(!is_valid_definition_id("a:b"));
1241        assert!(!is_valid_definition_id("a/b"));
1242        assert!(!is_valid_definition_id("a b"));
1243        assert!(!is_valid_definition_id("a\x00b"));
1244        // Unicode rejected — keeps the zone-name surface ASCII.
1245        assert!(!is_valid_definition_id("café"));
1246    }
1247
1248    #[test]
1249    fn validate_and_current_surfaces_error_prefix() {
1250        let err = validate_and_current("../etc").unwrap_err();
1251        assert!(err.starts_with("INVALID_DEFINITION_ID:"));
1252    }
1253
1254    #[test]
1255    fn read_returns_none_when_zone_missing() {
1256        let fs = fresh_filestore();
1257        // No prior write — no zone exists.
1258        let (content, modts) = read_session_state(&fs, "def-fresh").unwrap();
1259        assert!(content.is_none(), "missing zone should NOT be an error");
1260        assert!(modts.is_none());
1261    }
1262
1263    #[test]
1264    fn read_rejects_invalid_definition_id() {
1265        let fs = fresh_filestore();
1266        let err = read_session_state(&fs, "../bad").unwrap_err();
1267        assert!(err.starts_with("INVALID_DEFINITION_ID:"));
1268    }
1269
1270    #[test]
1271    fn write_then_read_roundtrip() {
1272        let fs = fresh_filestore();
1273        let payload = r#"{"nodes":[{"type":"user_message","message":"hi"}]}"#;
1274        write_session_state(&fs, "def-a", payload.as_bytes()).unwrap();
1275        let (content, modts) = read_session_state(&fs, "def-a").unwrap();
1276        assert_eq!(content.as_deref(), Some(payload));
1277        assert!(modts.unwrap_or(0) > 0);
1278    }
1279
1280    #[test]
1281    fn write_is_idempotent_replaces_content() {
1282        let fs = fresh_filestore();
1283        write_session_state(&fs, "def-a", b"first").unwrap();
1284        write_session_state(&fs, "def-a", b"second").unwrap();
1285        let (content, _) = read_session_state(&fs, "def-a").unwrap();
1286        assert_eq!(content.as_deref(), Some("second"));
1287    }
1288
1289    #[test]
1290    fn append_output_grows_ndjson_file() {
1291        let fs = fresh_filestore();
1292        let n1 = append_session_output(&fs, "def-a", "line1").unwrap();
1293        let n2 = append_session_output(&fs, "def-a", "line2\n").unwrap();
1294        // Each line is normalized to end with '\n'.
1295        assert_eq!(n1, b"line1\n".len() as u64);
1296        assert_eq!(n2, b"line2\n".len() as u64);
1297        let zone = agent_current_zone("def-a");
1298        let bytes = fs.read_file(&zone, OUTPUT_FILE).unwrap().unwrap();
1299        assert_eq!(bytes, b"line1\nline2\n");
1300    }
1301
1302    #[test]
1303    fn archive_moves_content_and_clears_current() {
1304        let fs = fresh_filestore();
1305        let payload = br#"{"nodes":[{"type":"user_message","message":"x"}]}"#;
1306        write_session_state(&fs, "def-a", payload).unwrap();
1307        append_session_output(&fs, "def-a", "raw1").unwrap();
1308
1309        let result = archive_session(&fs, "def-a").unwrap();
1310        let (zone, ts) = result.expect("archive should have happened");
1311        assert!(zone.starts_with("agent:def-a:archive:"));
1312        assert!(ts > 0);
1313
1314        // Archive zone has the original snapshot.
1315        let archived = fs.read_file(&zone, SNAPSHOT_FILE).unwrap();
1316        assert_eq!(archived.as_deref(), Some(payload.as_slice()));
1317        // ...AND the NDJSON output.
1318        let archived_output = fs.read_file(&zone, OUTPUT_FILE).unwrap().unwrap();
1319        assert_eq!(archived_output, b"raw1\n");
1320
1321        // Current zone snapshot is gone.
1322        let current_zone = agent_current_zone("def-a");
1323        let still_there = fs.stat(&current_zone, SNAPSHOT_FILE).unwrap();
1324        assert!(still_there.is_none(), ":current snapshot must be cleared");
1325        let still_output = fs.stat(&current_zone, OUTPUT_FILE).unwrap();
1326        assert!(still_output.is_none(), ":current output must be cleared");
1327
1328        // Subsequent read returns None (fresh).
1329        let (content, _) = read_session_state(&fs, "def-a").unwrap();
1330        assert!(content.is_none());
1331    }
1332
1333    #[test]
1334    fn archive_on_empty_current_is_noop() {
1335        let fs = fresh_filestore();
1336        // Nothing was ever written.
1337        let result = archive_session(&fs, "def-empty").unwrap();
1338        assert!(result.is_none(), "archive on empty :current should no-op");
1339        // No archive zones should exist.
1340        let zones = fs.get_all_zone_ids().unwrap();
1341        assert!(
1342            !zones.iter().any(|z| z.contains(":archive:")),
1343            "no archive zone should have been created"
1344        );
1345    }
1346
1347    #[test]
1348    fn archive_on_zero_byte_state_is_noop() {
1349        let fs = fresh_filestore();
1350        // Touch the file but leave it empty.
1351        let zone = agent_current_zone("def-zero");
1352        fs.make_file(&zone, SNAPSHOT_FILE, FileMeta::default(), FileOpts::default())
1353            .unwrap();
1354        let result = archive_session(&fs, "def-zero").unwrap();
1355        assert!(result.is_none(), "zero-byte :current must NOT create archive");
1356    }
1357
1358    /// Critical scoping invariant: agents are independent, even when
1359    /// they share an identity bundle. Writing to AgentA must NOT
1360    /// expose any data to AgentB.
1361    #[test]
1362    fn two_agents_have_independent_zones() {
1363        let fs = fresh_filestore();
1364        write_session_state(&fs, "def-A", br#"{"nodes":[{"type":"user_message","message":"A"}]}"#)
1365            .unwrap();
1366
1367        // AgentB sees nothing.
1368        let (content_b, _) = read_session_state(&fs, "def-B").unwrap();
1369        assert!(content_b.is_none(), "AgentB must NOT see AgentA's data");
1370
1371        // AgentA still has its content.
1372        let (content_a, _) = read_session_state(&fs, "def-A").unwrap();
1373        assert!(content_a.unwrap().contains("\"A\""));
1374    }
1375
1376    #[test]
1377    fn list_archives_sorted_newest_first_with_previews() {
1378        let fs = fresh_filestore();
1379        // Seed three archive zones for the same def, varying timestamps.
1380        let make = |ts: u64, label: &str| {
1381            let zone = agent_archive_zone("def-a", ts);
1382            let payload = serde_json::json!({
1383                "nodes": [
1384                    {"type": "user_message", "message": label}
1385                ]
1386            });
1387            write_zone_file(&fs, &zone, SNAPSHOT_FILE, payload.to_string().as_bytes()).unwrap();
1388        };
1389        make(1_000, "old");
1390        make(3_000, "newest");
1391        make(2_000, "mid");
1392
1393        let rows = list_archives(&fs, "def-a", 0).unwrap();
1394        assert_eq!(rows.len(), 3);
1395        assert_eq!(rows[0].archived_at_ms, 3_000);
1396        assert_eq!(rows[0].preview, "newest");
1397        assert_eq!(rows[0].node_count, 1);
1398        assert_eq!(rows[1].archived_at_ms, 2_000);
1399        assert_eq!(rows[2].archived_at_ms, 1_000);
1400    }
1401
1402    #[test]
1403    fn list_archives_respects_limit() {
1404        let fs = fresh_filestore();
1405        for ts in 1..=5u64 {
1406            let zone = agent_archive_zone("def-a", ts);
1407            fs.make_file(&zone, SNAPSHOT_FILE, FileMeta::default(), FileOpts::default()).unwrap();
1408            fs.write_file(&zone, SNAPSHOT_FILE, b"{}").unwrap();
1409        }
1410        let rows = list_archives(&fs, "def-a", 2).unwrap();
1411        assert_eq!(rows.len(), 2);
1412    }
1413
1414    #[test]
1415    fn list_archives_rejects_bad_definition_id() {
1416        let fs = fresh_filestore();
1417        assert!(list_archives(&fs, "../bad", 0).is_err());
1418    }
1419
1420    // ---- Migration tests ----
1421
1422    fn open_temp_wstore(dir: &Path) -> Arc<WaveStore> {
1423        let path = dir.join("objects.db");
1424        Arc::new(WaveStore::open(&path).expect("open wstore"))
1425    }
1426
1427    fn insert_agent_block(wstore: &Arc<WaveStore>, def_id: &str) -> String {
1428        let oid = uuid::Uuid::new_v4().to_string();
1429        let mut meta = MetaMapType::new();
1430        meta.insert("view".to_string(), serde_json::json!("agent"));
1431        meta.insert("agentId".to_string(), serde_json::json!(def_id));
1432        let mut block = Block {
1433            oid: oid.clone(),
1434            parentoref: String::new(),
1435            version: 1,
1436            runtimeopts: None,
1437            stickers: None,
1438            meta,
1439            subblockids: None,
1440        };
1441        wstore.insert(&mut block).expect("insert block");
1442        oid
1443    }
1444
1445    fn seed_block_snapshot(filestore: &Arc<FileStore>, block_id: &str, body: &str) {
1446        filestore
1447            .make_file(block_id, SNAPSHOT_FILE, FileMeta::default(), FileOpts::default())
1448            .unwrap();
1449        filestore.write_file(block_id, SNAPSHOT_FILE, body.as_bytes()).unwrap();
1450    }
1451
1452    #[test]
1453    fn migration_backfills_archives_and_seeds_current() {
1454        let dir = tempdir().unwrap();
1455        let wstore = open_temp_wstore(dir.path());
1456        let filestore = fresh_filestore();
1457
1458        // Two blocks for the same definition. Block 2 is written later
1459        // → it should win the `:current` seed.
1460        let block1 = insert_agent_block(&wstore, "def-maks");
1461        seed_block_snapshot(
1462            &filestore,
1463            &block1,
1464            r#"{"nodes":[{"type":"user_message","message":"old"}]}"#,
1465        );
1466        // Sleep briefly so the second block's snapshot has a strictly
1467        // greater modts. FileStore stamps `Self::now_ms()` per write.
1468        std::thread::sleep(std::time::Duration::from_millis(5));
1469        let block2 = insert_agent_block(&wstore, "def-maks");
1470        seed_block_snapshot(
1471            &filestore,
1472            &block2,
1473            r#"{"nodes":[{"type":"user_message","message":"newer"}]}"#,
1474        );
1475
1476        // And one block for a different definition.
1477        let block_other = insert_agent_block(&wstore, "def-other");
1478        seed_block_snapshot(
1479            &filestore,
1480            &block_other,
1481            r#"{"nodes":[{"type":"user_message","message":"other"}]}"#,
1482        );
1483
1484        let stats = migrate_block_zones_v1(&wstore, &filestore, dir.path());
1485        assert_eq!(stats.blocks_scanned, 3);
1486        assert_eq!(stats.archives_written, 3);
1487        assert_eq!(stats.current_zones_seeded, 2);
1488        assert_eq!(stats.failures, 0);
1489
1490        // Marker file written.
1491        assert!(dir.path().join(MIGRATION_MARKER_V1).exists());
1492
1493        // `:current` for def-maks must hold block2's content (the
1494        // most-recently-modified per-block snapshot).
1495        let (content, _) = read_session_state(&filestore, "def-maks").unwrap();
1496        assert!(content.unwrap().contains("newer"));
1497
1498        // Both archives exist for def-maks.
1499        let archives = list_archives(&filestore, "def-maks", 0).unwrap();
1500        assert_eq!(archives.len(), 2);
1501
1502        // Other def isolated.
1503        let (other, _) = read_session_state(&filestore, "def-other").unwrap();
1504        assert!(other.unwrap().contains("other"));
1505        let other_archives = list_archives(&filestore, "def-other", 0).unwrap();
1506        assert_eq!(other_archives.len(), 1);
1507
1508        // Old block zones NOT deleted (GC is a later PR).
1509        let still_block1 = filestore.stat(&block1, SNAPSHOT_FILE).unwrap();
1510        assert!(still_block1.is_some(), "old block zone must remain");
1511    }
1512
1513    #[test]
1514    fn migration_is_idempotent() {
1515        let dir = tempdir().unwrap();
1516        let wstore = open_temp_wstore(dir.path());
1517        let filestore = fresh_filestore();
1518
1519        let block = insert_agent_block(&wstore, "def-a");
1520        seed_block_snapshot(
1521            &filestore,
1522            &block,
1523            r#"{"nodes":[{"type":"user_message","message":"x"}]}"#,
1524        );
1525
1526        let first = migrate_block_zones_v1(&wstore, &filestore, dir.path());
1527        assert_eq!(first.archives_written, 1);
1528        assert_eq!(first.current_zones_seeded, 1);
1529
1530        // Second run is gated by the marker.
1531        let second = migrate_block_zones_v1(&wstore, &filestore, dir.path());
1532        assert_eq!(second.blocks_scanned, 0);
1533        assert_eq!(second.archives_written, 0);
1534        assert_eq!(second.current_zones_seeded, 0);
1535    }
1536
1537    // ---- Two-tier picker Phase 1 migration tests ----
1538
1539    use crate::backend::storage::wstore::{AgentDefinition, AgentInstance, InstanceStatus};
1540
1541    fn insert_template(
1542        wstore: &Arc<WaveStore>,
1543        id: &str,
1544        name: &str,
1545        provider: &str,
1546    ) -> AgentDefinition {
1547        let mut def = AgentDefinition {
1548            id: id.to_string(),
1549            slug: String::new(),
1550            name: name.to_string(),
1551            icon: String::new(),
1552            provider: provider.to_string(),
1553            description: format!("{name} template"),
1554            working_directory: String::new(),
1555            shell: String::new(),
1556            provider_flags: String::new(),
1557            auto_start: 0,
1558            restart_on_crash: 0,
1559            idle_timeout_minutes: 0,
1560            created_at: 1_700_000_000_000,
1561            agent_type: "host".to_string(),
1562            environment: String::new(),
1563            agent_bus_id: String::new(),
1564            is_seeded: 1, // template
1565            accounts: String::new(),
1566            parent_id: String::new(),
1567            branch_label: String::new(),
1568            updated_at: 1_700_000_000_000,
1569            user_hidden: 0,
1570        };
1571        wstore.agent_def_insert(&mut def).unwrap();
1572        def
1573    }
1574
1575    fn insert_named_instance(
1576        wstore: &Arc<WaveStore>,
1577        id: &str,
1578        def_id: &str,
1579        instance_name: &str,
1580        started_at: i64,
1581    ) {
1582        let inst = AgentInstance {
1583            id: id.to_string(),
1584            definition_id: def_id.to_string(),
1585            parent_instance_id: String::new(),
1586            block_id: String::new(),
1587            session_id: String::new(),
1588            status: InstanceStatus::Running.as_str().to_string(),
1589            github_context: String::new(),
1590            started_at,
1591            ended_at: 0,
1592            created_at: started_at,
1593            identity_id: String::new(),
1594            memory_id: String::new(),
1595            instance_name: instance_name.to_string(),
1596            working_directory: String::new(),
1597            display_hidden: false,
1598        };
1599        wstore.instance_create(&inst).unwrap();
1600    }
1601
1602    #[test]
1603    fn template_promote_clones_template_and_moves_zones() {
1604        let dir = tempdir().unwrap();
1605        let wstore = open_temp_wstore(dir.path());
1606        let filestore = fresh_filestore();
1607
1608        // Seeded template "Claude Code" with a current session zone +
1609        // one archive zone (the pre-existing "Maks" conversation).
1610        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1611        insert_named_instance(&wstore, "inst-maks", &template.id, "Maks", 1_700_000_100_000);
1612        write_session_state(
1613            &filestore,
1614            &template.id,
1615            br#"{"nodes":[{"type":"user_message","message":"hi"}]}"#,
1616        )
1617        .unwrap();
1618        // Pre-existing archive (simulates a prior + New session).
1619        let archive_zone = agent_archive_zone(&template.id, 1_699_000_000_000);
1620        write_zone_file(&filestore, &archive_zone, SNAPSHOT_FILE, b"archived").unwrap();
1621
1622        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1623        assert_eq!(stats.templates_scanned, 1);
1624        assert_eq!(stats.templates_promoted, 1);
1625        assert_eq!(stats.archives_moved, 1);
1626        assert_eq!(stats.instances_repointed, 1);
1627        assert_eq!(stats.failures, 0);
1628
1629        // Template's current zone is gone — no `agent:tpl-claude:current`.
1630        let stale_current = agent_current_zone(&template.id);
1631        let stale = filestore.list_files(&stale_current).unwrap();
1632        assert!(stale.is_empty(), "template current zone should be empty post-promote");
1633        // Template's archive zone is gone.
1634        let stale_archive = filestore.list_files(&archive_zone).unwrap();
1635        assert!(stale_archive.is_empty(), "template archive zone should be empty post-promote");
1636
1637        // Find the new user-owned definition. Use the most-recent
1638        // instance name ("Maks") as the new name per spec.
1639        let all = wstore.agent_def_list().unwrap();
1640        let new_def = all
1641            .iter()
1642            .find(|d| d.is_seeded == 0 && d.parent_id == template.id)
1643            .expect("a new user-owned definition should exist");
1644        assert_eq!(new_def.name, "Maks");
1645        assert_eq!(new_def.provider, "claude");
1646
1647        // Zones present on the NEW defId.
1648        let new_current = agent_current_zone(&new_def.id);
1649        let new_files = filestore.list_files(&new_current).unwrap();
1650        assert!(
1651            new_files.iter().any(|f| f.name == SNAPSHOT_FILE),
1652            "new current zone should have output.state.json"
1653        );
1654        let new_archive = agent_archive_zone(&new_def.id, 1_699_000_000_000);
1655        let new_archive_files = filestore.list_files(&new_archive).unwrap();
1656        assert!(
1657            new_archive_files.iter().any(|f| f.name == SNAPSHOT_FILE),
1658            "new archive zone should be populated"
1659        );
1660
1661        // Instance is repointed.
1662        let inst = wstore.instance_get("inst-maks").unwrap().unwrap();
1663        assert_eq!(
1664            inst.definition_id, new_def.id,
1665            "instance should now reference new user-agent def"
1666        );
1667
1668        // Template definition is still around (still seeded), but the
1669        // session it carried is gone.
1670        let still_seeded = all.iter().find(|d| d.id == template.id).unwrap();
1671        assert_eq!(still_seeded.is_seeded, 1);
1672
1673        // Marker file is intentionally NOT written under the
1674        // self-idempotency model (constant still exists for legacy
1675        // file compatibility — see the doc comment on
1676        // `TEMPLATE_PROMOTE_MARKER_V1`).
1677        assert!(!dir.path().join(TEMPLATE_PROMOTE_MARKER_V1).exists());
1678    }
1679
1680    #[test]
1681    fn template_promote_is_idempotent_on_second_run() {
1682        let dir = tempdir().unwrap();
1683        let wstore = open_temp_wstore(dir.path());
1684        let filestore = fresh_filestore();
1685
1686        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1687        write_session_state(&filestore, &template.id, br#"{"nodes":[]}"#).unwrap();
1688
1689        let first = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1690        assert_eq!(first.templates_promoted, 1);
1691
1692        let second = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1693        assert_eq!(second.templates_scanned, 0);
1694        assert_eq!(second.templates_promoted, 0);
1695        assert_eq!(second.archives_moved, 0);
1696        assert_eq!(second.instances_repointed, 0);
1697    }
1698
1699    #[test]
1700    fn template_promote_runs_when_seeded_def_grows_zone_after_first_run() {
1701        // Regression test for the 2026-05-24 "Maks not under My Agents"
1702        // failure mode. Under the old marker-file gate, this scenario
1703        // played out:
1704        //
1705        //   1. Portable v N starts: no seeded defs have session zones
1706        //      (fresh data dir). Migration runs, no-ops, writes marker.
1707        //   2. User clicks "Claude Code" template, has a real
1708        //      conversation. Session zone now lives at
1709        //      `agent:tpl-claude:current` (a seeded def carrying a
1710        //      session — invariant violation).
1711        //   3. Portable v N+1 starts. Marker present → migration
1712        //      skips. Seeded def keeps its session zone forever; the
1713        //      picker can't show the user's agent under My Agents
1714        //      because there is no user-clone definition.
1715        //
1716        // The self-idempotency rework dropped the marker gate and
1717        // re-runs the migration on every startup. This test simulates
1718        // that exact sequence and asserts the second run DOES promote.
1719        let dir = tempdir().unwrap();
1720        let wstore = open_temp_wstore(dir.path());
1721        let filestore = fresh_filestore();
1722
1723        // First startup: a seeded template with no session zone yet.
1724        // Migration finds nothing to do (templates_scanned == 0).
1725        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1726        let first = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1727        assert_eq!(first.templates_scanned, 0);
1728        assert_eq!(first.templates_promoted, 0);
1729        // (Under the old marker-gated model the marker was written here.)
1730        assert!(!dir.path().join(TEMPLATE_PROMOTE_MARKER_V1).exists());
1731
1732        // Between startups: user opens a conversation on the seeded
1733        // template — invariant now violated.
1734        write_session_state(&filestore, &template.id, br#"{"nodes":[]}"#).unwrap();
1735
1736        // Second startup: under the OLD gate this would be a no-op
1737        // (marker still present). Under the new self-idempotent model
1738        // it MUST detect the invariant violation and promote.
1739        let second = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1740        assert_eq!(second.templates_scanned, 1);
1741        assert_eq!(second.templates_promoted, 1);
1742        assert_eq!(second.failures, 0);
1743
1744        // User-owned definition exists post-promotion.
1745        let all = wstore.agent_def_list().unwrap();
1746        assert!(
1747            all.iter().any(|d| d.is_seeded == 0 && d.parent_id == template.id),
1748            "second-run promotion should create a user-owned def"
1749        );
1750
1751        // Third startup: invariant restored, migration no-ops cleanly.
1752        let third = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1753        assert_eq!(third.templates_scanned, 0);
1754        assert_eq!(third.templates_promoted, 0);
1755    }
1756
1757    #[test]
1758    fn template_promote_does_not_reuse_clone_with_active_zone() {
1759        // Codex P1 round 2 on PR #1017: the reuse path must not
1760        // pick a user-clone whose own `agent:<clone_id>:current`
1761        // zone is populated — that clone was created by the user
1762        // through "+ New from template" and has a real conversation
1763        // in it. Reusing it would let `move_zone` overwrite the
1764        // user's live session with the seeded template's session.
1765        // The reuse target must be an empty-zone clone (partial-
1766        // failure shape) only.
1767        let dir = tempdir().unwrap();
1768        let wstore = open_temp_wstore(dir.path());
1769        let filestore = fresh_filestore();
1770
1771        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1772        // A pre-existing user-clone created via "+ New from
1773        // template" — it has its OWN active conversation in its
1774        // own zone.
1775        let now = now_ms() as i64;
1776        let mut user_clone = crate::backend::storage::wstore::AgentDefinition {
1777            id: "user-made-clone".to_string(),
1778            slug: String::new(),
1779            name: "MyAgent".to_string(),
1780            icon: template.icon.clone(),
1781            provider: template.provider.clone(),
1782            description: template.description.clone(),
1783            working_directory: String::new(),
1784            shell: template.shell.clone(),
1785            provider_flags: template.provider_flags.clone(),
1786            auto_start: 0,
1787            restart_on_crash: template.restart_on_crash,
1788            idle_timeout_minutes: template.idle_timeout_minutes,
1789            created_at: now - 2_000,
1790            agent_type: template.agent_type.clone(),
1791            environment: template.environment.clone(),
1792            agent_bus_id: String::new(),
1793            is_seeded: 0,
1794            accounts: String::new(),
1795            parent_id: template.id.clone(),
1796            branch_label: String::new(),
1797            updated_at: now - 2_000,
1798            user_hidden: 0,
1799        };
1800        wstore.agent_def_insert(&mut user_clone).unwrap();
1801        // The user's clone has its OWN active conversation.
1802        write_session_state(
1803            &filestore,
1804            &user_clone.id,
1805            br#"{"nodes":[{"type":"user_message","message":"mine"}]}"#,
1806        )
1807        .unwrap();
1808
1809        // Seeded template ALSO has a session zone (the invariant
1810        // violation we're recovering from).
1811        write_session_state(
1812            &filestore,
1813            &template.id,
1814            br#"{"nodes":[{"type":"user_message","message":"theirs"}]}"#,
1815        )
1816        .unwrap();
1817
1818        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1819        assert_eq!(stats.templates_promoted, 1);
1820
1821        // The user's clone must NOT have been used as the promote
1822        // target — a fresh clone with a new id must have been
1823        // created instead, with its OWN promoted zone.
1824        let user_zone_files = filestore
1825            .list_files(&agent_current_zone(&user_clone.id))
1826            .unwrap();
1827        let user_snapshot = user_zone_files
1828            .iter()
1829            .find(|f| f.name == SNAPSHOT_FILE)
1830            .expect("user-clone's own zone snapshot must still exist");
1831        let user_bytes = filestore
1832            .read_file(&agent_current_zone(&user_clone.id), &user_snapshot.name)
1833            .unwrap()
1834            .unwrap_or_default();
1835        assert!(
1836            std::str::from_utf8(&user_bytes).unwrap().contains("mine"),
1837            "user-clone's existing conversation must NOT be overwritten by the seeded session"
1838        );
1839
1840        // A NEW clone (id != user-made-clone) must own the promoted
1841        // seeded session.
1842        let all = wstore.agent_def_list().unwrap();
1843        let new_clone = all
1844            .iter()
1845            .find(|d| d.is_seeded == 0 && d.parent_id == template.id && d.id != "user-made-clone")
1846            .expect("a NEW clone must have been created (not reusing the user's clone)");
1847        let new_zone_bytes = filestore
1848            .read_file(&agent_current_zone(&new_clone.id), SNAPSHOT_FILE)
1849            .unwrap()
1850            .unwrap_or_default();
1851        assert!(
1852            std::str::from_utf8(&new_zone_bytes).unwrap().contains("theirs"),
1853            "promoted session must land under the fresh clone's id"
1854        );
1855    }
1856
1857    #[test]
1858    fn template_promote_preserves_user_continuation_on_clone() {
1859        // Codex P1 round 4 on PR #1017: data-loss scenario.
1860        // Sequence:
1861        //   1. Run 1 copies seeded `:current` → clone `:current`
1862        //      OK, but `delete_zone` on the seeded source fails.
1863        //   2. User opens the clone, continues the conversation —
1864        //      the clone's `:current` now has NEWER content.
1865        //   3. Run 2 sees the invariant still violated and would
1866        //      re-copy the (older) seeded bytes onto the clone,
1867        //      rolling back the user's continuation.
1868        // The fix: `move_zone` detects a non-empty destination and
1869        // drops the stale source instead of copying.
1870        let dir = tempdir().unwrap();
1871        let wstore = open_temp_wstore(dir.path());
1872        let filestore = fresh_filestore();
1873
1874        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1875        // Prior partial run: deterministic-id clone def already
1876        // exists.
1877        let promote_target_id = format!("template-promote-v1-{}", template.id);
1878        let now = now_ms() as i64;
1879        let mut prior_target = crate::backend::storage::wstore::AgentDefinition {
1880            id: promote_target_id.clone(),
1881            slug: String::new(),
1882            name: "Claude Code".to_string(),
1883            icon: template.icon.clone(),
1884            provider: template.provider.clone(),
1885            description: template.description.clone(),
1886            working_directory: String::new(),
1887            shell: template.shell.clone(),
1888            provider_flags: template.provider_flags.clone(),
1889            auto_start: 0,
1890            restart_on_crash: template.restart_on_crash,
1891            idle_timeout_minutes: template.idle_timeout_minutes,
1892            created_at: now - 1_000,
1893            agent_type: template.agent_type.clone(),
1894            environment: template.environment.clone(),
1895            agent_bus_id: String::new(),
1896            is_seeded: 0,
1897            accounts: String::new(),
1898            parent_id: template.id.clone(),
1899            branch_label: String::new(),
1900            updated_at: now - 1_000,
1901            user_hidden: 0,
1902        };
1903        wstore.agent_def_insert(&mut prior_target).unwrap();
1904        // Seeded `:current` has the OLDER stale snapshot the prior
1905        // run's `delete_zone` failed to remove. Write it FIRST so
1906        // its modts is earlier than the clone's continuation.
1907        write_session_state(
1908            &filestore,
1909            &template.id,
1910            br#"{"nodes":[{"type":"user_message","message":"old-stale-seeded"}]}"#,
1911        )
1912        .unwrap();
1913        // Force a modts gap so the modts-aware copy rule picks
1914        // destination (R4 user-continuation). 10ms is reliable on
1915        // every platform we ship to.
1916        std::thread::sleep(std::time::Duration::from_millis(10));
1917        // Clone's `:current` has the user's NEWER continuation.
1918        write_session_state(
1919            &filestore,
1920            &promote_target_id,
1921            br#"{"nodes":[{"type":"user_message","message":"my-newer-message"}]}"#,
1922        )
1923        .unwrap();
1924
1925        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
1926        assert_eq!(stats.templates_promoted, 1);
1927
1928        // The user's newer content is INTACT on the clone.
1929        let clone_bytes = filestore
1930            .read_file(&agent_current_zone(&promote_target_id), SNAPSHOT_FILE)
1931            .unwrap()
1932            .unwrap_or_default();
1933        let clone_str = std::str::from_utf8(&clone_bytes).unwrap();
1934        assert!(
1935            clone_str.contains("my-newer-message"),
1936            "user's newer continuation must survive the partial-failure retry; got: {clone_str}"
1937        );
1938        assert!(
1939            !clone_str.contains("old-stale-seeded"),
1940            "stale seeded content must NOT overwrite user's newer continuation"
1941        );
1942
1943        // The seeded current zone is drained (source deleted).
1944        let seeded_files = filestore
1945            .list_files(&agent_current_zone(&template.id))
1946            .unwrap();
1947        assert!(
1948            seeded_files.is_empty(),
1949            "seeded current zone must be drained after the retry's safety drop"
1950        );
1951    }
1952
1953    #[test]
1954    fn template_promote_recovers_partial_copy_at_zone() {
1955        // Codex P1 round 5 on PR #1017: a prior `move_zone` that
1956        // wrote SOME destination files but failed before the rest
1957        // must not be mistaken for "fully migrated" — dropping the
1958        // source there would lose the unwritten files forever.
1959        //
1960        // Setup: seeded `:current` has both files (snapshot +
1961        // output stream); the clone's `:current` has only the
1962        // snapshot (the prior copy crashed before the second
1963        // file). After retry: clone has BOTH files; seeded zone
1964        // is drained.
1965        let dir = tempdir().unwrap();
1966        let wstore = open_temp_wstore(dir.path());
1967        let filestore = fresh_filestore();
1968
1969        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
1970        // Prior partial run already created the deterministic-id
1971        // clone def.
1972        let promote_target_id = format!("template-promote-v1-{}", template.id);
1973        let now = now_ms() as i64;
1974        let mut prior_target = crate::backend::storage::wstore::AgentDefinition {
1975            id: promote_target_id.clone(),
1976            slug: String::new(),
1977            name: "Claude Code".to_string(),
1978            icon: template.icon.clone(),
1979            provider: template.provider.clone(),
1980            description: template.description.clone(),
1981            working_directory: String::new(),
1982            shell: template.shell.clone(),
1983            provider_flags: template.provider_flags.clone(),
1984            auto_start: 0,
1985            restart_on_crash: template.restart_on_crash,
1986            idle_timeout_minutes: template.idle_timeout_minutes,
1987            created_at: now - 1_000,
1988            agent_type: template.agent_type.clone(),
1989            environment: template.environment.clone(),
1990            agent_bus_id: String::new(),
1991            is_seeded: 0,
1992            accounts: String::new(),
1993            parent_id: template.id.clone(),
1994            branch_label: String::new(),
1995            updated_at: now - 1_000,
1996            user_hidden: 0,
1997        };
1998        wstore.agent_def_insert(&mut prior_target).unwrap();
1999
2000        // Seeded `:current` has BOTH files.
2001        let seeded_current = agent_current_zone(&template.id);
2002        write_zone_file(&filestore, &seeded_current, SNAPSHOT_FILE, b"seeded-snapshot").unwrap();
2003        write_zone_file(&filestore, &seeded_current, OUTPUT_FILE, b"seeded-output-stream").unwrap();
2004
2005        // Clone `:current` already has ONLY the snapshot (prior
2006        // copy got that far, then failed on OUTPUT_FILE).
2007        write_zone_file(
2008            &filestore,
2009            &agent_current_zone(&promote_target_id),
2010            SNAPSHOT_FILE,
2011            b"seeded-snapshot",
2012        )
2013        .unwrap();
2014
2015        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2016        assert_eq!(stats.templates_promoted, 1);
2017
2018        // Clone now has BOTH files (snapshot preserved, output
2019        // copied over from the source).
2020        let clone_zone = agent_current_zone(&promote_target_id);
2021        let clone_files = filestore.list_files(&clone_zone).unwrap();
2022        let clone_names: std::collections::HashSet<String> =
2023            clone_files.iter().map(|f| f.name.clone()).collect();
2024        assert!(
2025            clone_names.contains(SNAPSHOT_FILE),
2026            "snapshot file must remain at destination"
2027        );
2028        assert!(
2029            clone_names.contains(OUTPUT_FILE),
2030            "output file must be copied over from source on retry (codex R5)"
2031        );
2032        let output_bytes = filestore
2033            .read_file(&clone_zone, OUTPUT_FILE)
2034            .unwrap()
2035            .unwrap_or_default();
2036        assert_eq!(
2037            output_bytes, b"seeded-output-stream",
2038            "the unwritten file from the partial copy must arrive intact"
2039        );
2040
2041        // Source is drained — every source file has a destination
2042        // counterpart now.
2043        let seeded_files = filestore.list_files(&seeded_current).unwrap();
2044        assert!(
2045            seeded_files.is_empty(),
2046            "seeded current zone must be drained after the complete copy"
2047        );
2048    }
2049
2050    #[test]
2051    fn template_promote_promotes_newer_source_over_stale_destination() {
2052        // Codex P1 round 6 on PR #1017: the inverse of R4. If the
2053        // prior run's `instance_repoint_definition` failed,
2054        // instances stay pointed at the SEEDED def — the user's
2055        // continuation lands in the SEEDED zone, not the clone.
2056        // On retry, the SEEDED side has newer bytes. The fix
2057        // promotes the newer source over the stale destination
2058        // (and resolves R4 the other way when destination is
2059        // newer instead).
2060        //
2061        // Test setup: write destination FIRST (older modts), then
2062        // source SECOND (newer modts). After retry: destination
2063        // has the source's bytes; source drained.
2064        let dir = tempdir().unwrap();
2065        let wstore = open_temp_wstore(dir.path());
2066        let filestore = fresh_filestore();
2067
2068        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
2069        let promote_target_id = format!("template-promote-v1-{}", template.id);
2070        let now = now_ms() as i64;
2071        let mut prior_target = crate::backend::storage::wstore::AgentDefinition {
2072            id: promote_target_id.clone(),
2073            slug: String::new(),
2074            name: "Claude Code".to_string(),
2075            icon: template.icon.clone(),
2076            provider: template.provider.clone(),
2077            description: template.description.clone(),
2078            working_directory: String::new(),
2079            shell: template.shell.clone(),
2080            provider_flags: template.provider_flags.clone(),
2081            auto_start: 0,
2082            restart_on_crash: template.restart_on_crash,
2083            idle_timeout_minutes: template.idle_timeout_minutes,
2084            created_at: now - 1_000,
2085            agent_type: template.agent_type.clone(),
2086            environment: template.environment.clone(),
2087            agent_bus_id: String::new(),
2088            is_seeded: 0,
2089            accounts: String::new(),
2090            parent_id: template.id.clone(),
2091            branch_label: String::new(),
2092            updated_at: now - 1_000,
2093            user_hidden: 0,
2094        };
2095        wstore.agent_def_insert(&mut prior_target).unwrap();
2096
2097        // Destination has the prior copy (will become OLDER).
2098        let clone_zone = agent_current_zone(&promote_target_id);
2099        write_zone_file(&filestore, &clone_zone, SNAPSHOT_FILE, b"stale-old-copy").unwrap();
2100        // Sleep just long enough to push modts forward.
2101        // filestore's modts comes from system time; 10ms is enough
2102        // on every platform we ship to.
2103        std::thread::sleep(std::time::Duration::from_millis(10));
2104        // Seeded source has the user's newer continuation (the
2105        // instance_repoint failed in the prior run, so user kept
2106        // typing at the seeded def).
2107        let seeded_zone = agent_current_zone(&template.id);
2108        write_zone_file(&filestore, &seeded_zone, SNAPSHOT_FILE, b"user-newer-continuation").unwrap();
2109
2110        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2111        assert_eq!(stats.templates_promoted, 1);
2112
2113        // Destination now carries the SOURCE's newer bytes.
2114        let clone_bytes = filestore
2115            .read_file(&clone_zone, SNAPSHOT_FILE)
2116            .unwrap()
2117            .unwrap_or_default();
2118        let clone_str = std::str::from_utf8(&clone_bytes).unwrap();
2119        assert!(
2120            clone_str.contains("user-newer-continuation"),
2121            "user's newer continuation must be promoted from seeded source to clone; got: {clone_str}"
2122        );
2123        assert!(
2124            !clone_str.contains("stale-old-copy"),
2125            "stale older destination bytes must be replaced by the newer source"
2126        );
2127
2128        // Source drained.
2129        let seeded_files = filestore.list_files(&seeded_zone).unwrap();
2130        assert!(seeded_files.is_empty(), "seeded zone drained after promotion");
2131    }
2132
2133    #[test]
2134    fn template_promote_uses_deterministic_clone_id() {
2135        // Every run of `migrate_promote_template_sessions_v1` for
2136        // the same template MUST produce a clone at the same
2137        // deterministic id (`template-promote-v1-<template_id>`).
2138        // This is the convergence invariant that makes retries
2139        // safe under any partial-failure mode without ever
2140        // splitting one logical agent across multiple clone ids.
2141        let dir = tempdir().unwrap();
2142        let wstore = open_temp_wstore(dir.path());
2143        let filestore = fresh_filestore();
2144
2145        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
2146        write_session_state(
2147            &filestore,
2148            &template.id,
2149            br#"{"nodes":[{"type":"user_message","message":"hi"}]}"#,
2150        )
2151        .unwrap();
2152
2153        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2154        assert_eq!(stats.templates_promoted, 1);
2155
2156        let expected_id = format!("template-promote-v1-{}", template.id);
2157        let clone = wstore.agent_def_get(&expected_id).unwrap();
2158        assert!(clone.is_some(), "promote target must be created at the deterministic id");
2159        assert_eq!(clone.unwrap().parent_id, template.id);
2160    }
2161
2162    #[test]
2163    fn template_promote_idempotent_under_partial_failure_at_archive_move() {
2164        // Codex P1 round 3 on PR #1017: when a prior run copies
2165        // the seeded `:current` zone successfully but leaves at
2166        // least one seeded zone behind (e.g. `move_zone` succeeds
2167        // for `:current` but the source delete fails OR a later
2168        // `:archive:*` move fails), the next startup re-enters
2169        // migration for that template. The deterministic clone id
2170        // means the retry hits the SAME clone — never splitting
2171        // history across clone ids. Reuses the existing clone def,
2172        // re-runs move_zone (idempotent: write replaces if newer,
2173        // delete is best-effort), and converges.
2174        let dir = tempdir().unwrap();
2175        let wstore = open_temp_wstore(dir.path());
2176        let filestore = fresh_filestore();
2177
2178        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
2179        insert_named_instance(&wstore, "inst-maks", &template.id, "Maks", 1_700_000_100_000);
2180
2181        // Simulate the partial-failure state: prior run created
2182        // the deterministic-id clone, moved :current successfully
2183        // (clone has data), but failed to remove the seeded
2184        // :archive:* zone (still on the seeded id).
2185        let promote_target_id = format!("template-promote-v1-{}", template.id);
2186        let now = now_ms() as i64;
2187        let mut prior_target = crate::backend::storage::wstore::AgentDefinition {
2188            id: promote_target_id.clone(),
2189            slug: String::new(),
2190            name: "Maks".to_string(),
2191            icon: template.icon.clone(),
2192            provider: template.provider.clone(),
2193            description: template.description.clone(),
2194            working_directory: String::new(),
2195            shell: template.shell.clone(),
2196            provider_flags: template.provider_flags.clone(),
2197            auto_start: 0,
2198            restart_on_crash: template.restart_on_crash,
2199            idle_timeout_minutes: template.idle_timeout_minutes,
2200            created_at: now - 1_000,
2201            agent_type: template.agent_type.clone(),
2202            environment: template.environment.clone(),
2203            agent_bus_id: String::new(),
2204            is_seeded: 0,
2205            accounts: String::new(),
2206            parent_id: template.id.clone(),
2207            branch_label: String::new(),
2208            updated_at: now - 1_000,
2209            user_hidden: 0,
2210        };
2211        wstore.agent_def_insert(&mut prior_target).unwrap();
2212        // Realistic partial-failure shape: run 1 copied :current
2213        // successfully (dest and source have IDENTICAL bytes from
2214        // that copy), and run 1's archive-move failed (archive
2215        // still on the seeded side, never copied to the clone).
2216        // Use identical bytes for :current so the modts-aware
2217        // copy gate treats it as no-op (no conflict).
2218        let snapshot_bytes = b"snapshot-from-prior-run".as_slice();
2219        write_zone_file(&filestore, &agent_current_zone(&promote_target_id), SNAPSHOT_FILE, snapshot_bytes).unwrap();
2220        write_zone_file(&filestore, &agent_current_zone(&template.id), SNAPSHOT_FILE, snapshot_bytes).unwrap();
2221        let stale_archive = agent_archive_zone(&template.id, 1_699_000_000_000);
2222        write_zone_file(&filestore, &stale_archive, SNAPSHOT_FILE, b"old archive").unwrap();
2223
2224        // Pre-condition: exactly one user-clone DEF (the
2225        // deterministic-id one). Use the dedicated
2226        // `db_agent_definitions` scan (not `agent_def_list`, which
2227        // reads `db_agents` and surfaces template-instance
2228        // projection rows).
2229        let clones_pre = wstore.user_clone_defs_for_template(&template.id).unwrap();
2230        assert_eq!(clones_pre.len(), 1, "test setup: one prior clone at deterministic id");
2231        assert_eq!(clones_pre[0].id, promote_target_id);
2232
2233        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2234        assert_eq!(stats.templates_scanned, 1);
2235        assert_eq!(stats.templates_promoted, 1);
2236
2237        // Still exactly one user-clone def — the retry reused the
2238        // deterministic-id clone instead of inserting another.
2239        let clones_post = wstore.user_clone_defs_for_template(&template.id).unwrap();
2240        assert_eq!(
2241            clones_post.len(),
2242            1,
2243            "deterministic-id reuse must not create a duplicate clone on partial-failure retry"
2244        );
2245        assert_eq!(clones_post[0].id, promote_target_id);
2246
2247        // Both seeded zones are now drained onto the clone.
2248        let seeded_current = filestore
2249            .list_files(&agent_current_zone(&template.id))
2250            .unwrap();
2251        assert!(
2252            seeded_current.is_empty(),
2253            "seeded current zone should be empty after the retry's successful move"
2254        );
2255        let seeded_archive_files = filestore.list_files(&stale_archive).unwrap();
2256        assert!(
2257            seeded_archive_files.is_empty(),
2258            "seeded archive zone should be empty after the retry's successful move"
2259        );
2260
2261        // Re-run after convergence — pure no-op.
2262        let stats2 = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2263        assert_eq!(stats2.templates_scanned, 0);
2264        assert_eq!(stats2.templates_promoted, 0);
2265    }
2266
2267    #[test]
2268    fn template_promote_ignores_legacy_marker_file() {
2269        // Backward-compat: an existing v1 marker file from a portable
2270        // running pre-self-idempotency code must NOT prevent the
2271        // migration from running. The 2026-05-24 rework leaves any
2272        // existing marker file in place but doesn't read it.
2273        let dir = tempdir().unwrap();
2274        let wstore = open_temp_wstore(dir.path());
2275        let filestore = fresh_filestore();
2276
2277        // Place a vestigial marker as if a prior startup wrote one.
2278        std::fs::write(dir.path().join(TEMPLATE_PROMOTE_MARKER_V1), b"v1\n").unwrap();
2279
2280        // Now set up an invariant violation.
2281        let template = insert_template(&wstore, "tpl-claude", "Claude Code", "claude");
2282        write_session_state(&filestore, &template.id, br#"{"nodes":[]}"#).unwrap();
2283
2284        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2285        // Must NOT skip — the legacy marker is ignored.
2286        assert_eq!(stats.templates_scanned, 1);
2287        assert_eq!(stats.templates_promoted, 1);
2288    }
2289
2290    #[test]
2291    fn template_promote_falls_back_to_template_name_when_no_named_instance() {
2292        let dir = tempdir().unwrap();
2293        let wstore = open_temp_wstore(dir.path());
2294        let filestore = fresh_filestore();
2295
2296        let template = insert_template(&wstore, "tpl-x", "Cursor", "cursor");
2297        write_session_state(&filestore, &template.id, br#"{"nodes":[]}"#).unwrap();
2298        // NO instances inserted.
2299
2300        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2301        assert_eq!(stats.templates_promoted, 1);
2302
2303        let all = wstore.agent_def_list().unwrap();
2304        let new_def = all
2305            .iter()
2306            .find(|d| d.is_seeded == 0 && d.parent_id == template.id)
2307            .expect("should clone the template");
2308        // Falls back to template name when no named instance exists.
2309        assert_eq!(new_def.name, "Cursor");
2310    }
2311
2312    #[test]
2313    fn template_promote_skips_already_user_owned_definitions() {
2314        let dir = tempdir().unwrap();
2315        let wstore = open_temp_wstore(dir.path());
2316        let filestore = fresh_filestore();
2317
2318        // A user-owned definition (is_seeded = 0) with a session — the
2319        // migration should leave it alone.
2320        let mut user_def = AgentDefinition {
2321            id: "user-abc".to_string(),
2322            slug: String::new(),
2323            name: "My Agent".to_string(),
2324            icon: String::new(),
2325            provider: "claude".to_string(),
2326            description: String::new(),
2327            working_directory: String::new(),
2328            shell: String::new(),
2329            provider_flags: String::new(),
2330            auto_start: 0,
2331            restart_on_crash: 0,
2332            idle_timeout_minutes: 0,
2333            created_at: 1_700_000_000_000,
2334            agent_type: "host".to_string(),
2335            environment: String::new(),
2336            agent_bus_id: String::new(),
2337            is_seeded: 0,
2338            accounts: String::new(),
2339            parent_id: String::new(),
2340            branch_label: String::new(),
2341            updated_at: 1_700_000_000_000,
2342            user_hidden: 0,
2343        };
2344        wstore.agent_def_insert(&mut user_def).unwrap();
2345        write_session_state(&filestore, &user_def.id, br#"{"nodes":[]}"#).unwrap();
2346
2347        let stats = migrate_promote_template_sessions_v1(&wstore, &filestore, dir.path());
2348        assert_eq!(stats.templates_scanned, 0);
2349        assert_eq!(stats.templates_promoted, 0);
2350
2351        // Original definition untouched.
2352        let all = wstore.agent_def_list().unwrap();
2353        let still_there = all.iter().find(|d| d.id == "user-abc").unwrap();
2354        assert_eq!(still_there.is_seeded, 0);
2355
2356        // Session zone still present.
2357        let cur = agent_current_zone(&user_def.id);
2358        let files = filestore.list_files(&cur).unwrap();
2359        assert!(!files.is_empty());
2360    }
2361
2362    #[test]
2363    fn migration_skips_non_agent_and_empty_blocks() {
2364        let dir = tempdir().unwrap();
2365        let wstore = open_temp_wstore(dir.path());
2366        let filestore = fresh_filestore();
2367
2368        // A "term" block (not agent) — must be skipped.
2369        let term_oid = uuid::Uuid::new_v4().to_string();
2370        let mut term_meta = MetaMapType::new();
2371        term_meta.insert("view".to_string(), serde_json::json!("term"));
2372        let mut term = Block {
2373            oid: term_oid.clone(),
2374            parentoref: String::new(),
2375            version: 1,
2376            runtimeopts: None,
2377            stickers: None,
2378            meta: term_meta,
2379            subblockids: None,
2380        };
2381        wstore.insert(&mut term).unwrap();
2382        seed_block_snapshot(&filestore, &term_oid, r#"{"nodes":[]}"#);
2383
2384        // An agent block with NO snapshot — should count as skipped.
2385        let _empty = insert_agent_block(&wstore, "def-x");
2386
2387        let stats = migrate_block_zones_v1(&wstore, &filestore, dir.path());
2388        // Only the empty agent block is "scanned" (view == "agent");
2389        // the term block is filtered out before the counter.
2390        assert_eq!(stats.blocks_scanned, 1);
2391        assert_eq!(stats.skipped_no_snapshot, 1);
2392        assert_eq!(stats.archives_written, 0);
2393        assert_eq!(stats.current_zones_seeded, 0);
2394    }
2395}