agentmux_srv\registry/
store.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `Registry` — file-per-agent CRUD on `<root>/<uuid>.json` with
5//! a sibling `retired/<uuid>.json` tombstone tree.
6//!
7//! Concurrency: cross-process safety comes from filesystem atomic
8//! rename. The internal `Mutex` only serializes partial-merge writes
9//! from threads inside the same `srv` so a v1 binary touching
10//! `last_launched_at_ms` doesn't race itself.
11
12use std::path::{Path, PathBuf};
13use std::sync::Mutex;
14
15use serde_json::Value;
16use thiserror::Error;
17
18use super::atomic::{rename_atomic, write_atomic};
19use super::schema::{validate, NamedAgentRecord, ValidationError, MAX_SUPPORTED_SCHEMA};
20
21#[derive(Debug, Error)]
22pub enum RegistryError {
23    #[error("io: {0}")]
24    Io(#[from] std::io::Error),
25    #[error("json: {0}")]
26    Json(#[from] serde_json::Error),
27    #[error("validation: {0}")]
28    Validation(#[from] ValidationError),
29}
30
31pub struct Registry {
32    root: PathBuf,
33    write_lock: Mutex<()>,
34}
35
36impl Registry {
37    /// Open or create the registry rooted at `root`. Ensures the
38    /// active dir and `retired/` subdir both exist.
39    pub fn open(root: PathBuf) -> Result<Self, RegistryError> {
40        std::fs::create_dir_all(&root)?;
41        std::fs::create_dir_all(root.join("retired"))?;
42        Ok(Self {
43            root,
44            write_lock: Mutex::new(()),
45        })
46    }
47
48    pub fn root(&self) -> &Path {
49        &self.root
50    }
51
52    /// Resolved `<shared_home>/agents/` — one level above `root`.
53    /// Used by callers that need to express working-directory paths
54    /// as relative subpaths under the shared agents tree. Returns
55    /// `None` if the registry root has no parent (only happens in
56    /// pathological filesystem-root setups; production always nests
57    /// under `~/.agentmux/agents/registry`).
58    pub fn agents_root(&self) -> Option<&Path> {
59        self.root.parent()
60    }
61
62    fn active_path(&self, instance_id: &str) -> PathBuf {
63        self.root.join(format!("{instance_id}.json"))
64    }
65
66    fn retired_path(&self, instance_id: &str) -> PathBuf {
67        self.root.join("retired").join(format!("{instance_id}.json"))
68    }
69
70    /// Insert or update a record. If the file already exists, unknown
71    /// top-level + `data` fields are preserved (forward-compat with
72    /// future schemas that add columns this binary doesn't know).
73    ///
74    /// Forward-compat invariant (spec §6): never write a higher-schema
75    /// row into a lower schema, never overwrite a corrupt/unparseable
76    /// file. Both cases skip the mirror with a warning — the on-disk
77    /// file stays intact for the binary that authored it (or for ops
78    /// triage). Skipping is `Ok(())`: SQLite remains authoritative.
79    pub fn upsert(&self, rec: &NamedAgentRecord) -> Result<(), RegistryError> {
80        let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
81        let path = self.active_path(&rec.data.instance_id);
82        let bytes = match std::fs::read(&path) {
83            Ok(existing) => match merge_for_write(&existing, rec)? {
84                Some(b) => b,
85                None => return Ok(()),
86            },
87            Err(e) if e.kind() == std::io::ErrorKind::NotFound => to_pretty(rec)?,
88            Err(e) => return Err(e.into()),
89        };
90        write_atomic(&path, &bytes)?;
91        Ok(())
92    }
93
94    /// Move record into `retired/` (soft delete — keeps the working
95    /// dir intact, drops it from the launch-modal dropdown). Idempotent.
96    pub fn retire(&self, instance_id: &str) -> Result<(), RegistryError> {
97        let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
98        let from = self.active_path(instance_id);
99        if !from.exists() {
100            return Ok(());
101        }
102        rename_atomic(&from, &self.retired_path(instance_id))?;
103        Ok(())
104    }
105
106    /// Move record back from `retired/` to active. Idempotent.
107    pub fn unretire(&self, instance_id: &str) -> Result<(), RegistryError> {
108        let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
109        let from = self.retired_path(instance_id);
110        if !from.exists() {
111            return Ok(());
112        }
113        rename_atomic(&from, &self.active_path(instance_id))?;
114        Ok(())
115    }
116
117    /// Hard-delete (drops both active and retired files). Mirrors
118    /// SQLite `instance_delete`.
119    pub fn hard_delete(&self, instance_id: &str) -> Result<(), RegistryError> {
120        let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
121        for p in [self.active_path(instance_id), self.retired_path(instance_id)] {
122            match std::fs::remove_file(&p) {
123                Ok(()) => {}
124                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
125                Err(e) => return Err(e.into()),
126            }
127        }
128        Ok(())
129    }
130
131    /// Whether an active record exists for `instance_id`. Doesn't
132    /// validate — useful for migration idempotency checks. Use
133    /// [`Self::exists_anywhere`] when retired records should also
134    /// count (e.g. so migration doesn't resurrect a hidden agent).
135    pub fn exists(&self, instance_id: &str) -> bool {
136        self.active_path(instance_id).exists()
137    }
138
139    /// Whether a record exists in either active or retired. Used by
140    /// migration to skip already-tombstoned records (avoid
141    /// resurrecting a user's deliberate "Forget agent" via a
142    /// per-version SQLite row that still has `display_hidden = 0`).
143    pub fn exists_anywhere(&self, instance_id: &str) -> bool {
144        self.active_path(instance_id).exists() || self.retired_path(instance_id).exists()
145    }
146
147    /// Read every valid active record. Invalid files are skipped +
148    /// logged. PR A doesn't wire this into the RPC path; included so
149    /// PR B can swap reads over without further restructuring.
150    pub fn list_active(&self) -> Result<Vec<NamedAgentRecord>, RegistryError> {
151        let mut out = Vec::new();
152        for entry in std::fs::read_dir(&self.root)? {
153            let entry = entry?;
154            let path = entry.path();
155            if !path.is_file() {
156                continue;
157            }
158            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
159                continue;
160            };
161            if path.extension().and_then(|s| s.to_str()) != Some("json") {
162                continue;
163            }
164            match read_and_validate(&path, stem) {
165                Ok(rec) => out.push(rec),
166                Err(e) => {
167                    tracing::warn!(
168                        file = %path.display(),
169                        error = %e,
170                        "registry: skipping invalid record"
171                    );
172                }
173            }
174        }
175        Ok(out)
176    }
177}
178
179fn read_and_validate(path: &Path, stem: &str) -> Result<NamedAgentRecord, RegistryError> {
180    let bytes = std::fs::read(path)?;
181    let rec: NamedAgentRecord = serde_json::from_slice(&bytes)?;
182    validate(stem, &rec)?;
183    Ok(rec)
184}
185
186fn to_pretty(rec: &NamedAgentRecord) -> Result<Vec<u8>, serde_json::Error> {
187    let mut bytes = serde_json::to_vec_pretty(rec)?;
188    bytes.push(b'\n');
189    Ok(bytes)
190}
191
192/// Merge an in-memory record into an on-disk file's raw JSON,
193/// preserving fields beyond this binary's struct shape.
194///
195/// Returns `Ok(None)` when the writer must refuse the merge (corrupt
196/// JSON, missing `schema_version`, or `schema_version` above
197/// `MAX_SUPPORTED_SCHEMA`). The caller treats `None` as a skip and
198/// leaves the on-disk file intact.
199fn merge_for_write(
200    existing: &[u8],
201    rec: &NamedAgentRecord,
202) -> Result<Option<Vec<u8>>, RegistryError> {
203    let on_disk: Value = match serde_json::from_slice(existing) {
204        Ok(v) => v,
205        Err(e) => {
206            tracing::warn!(
207                error = %e,
208                "registry: existing file is unparseable JSON — refusing to overwrite (may be a newer schema)"
209            );
210            return Ok(None);
211        }
212    };
213    // Use `try_from` so any number above `u32::MAX` is treated as
214    // unparseable rather than wrapping into the supported range. A
215    // wrap would let an oversized envelope downgrade-bypass the
216    // forward-compat guard below.
217    let on_disk_version = on_disk
218        .get("schema_version")
219        .and_then(|v| v.as_u64())
220        .and_then(|v| u32::try_from(v).ok());
221    match on_disk_version {
222        Some(v) if v > MAX_SUPPORTED_SCHEMA => {
223            tracing::warn!(
224                on_disk = v,
225                writer_max = MAX_SUPPORTED_SCHEMA,
226                "registry: on-disk schema_version > writer max — refusing downgrade"
227            );
228            return Ok(None);
229        }
230        Some(_) => {}
231        None => {
232            tracing::warn!(
233                "registry: existing file lacks schema_version — refusing to overwrite"
234            );
235            return Ok(None);
236        }
237    }
238    let mut merged = on_disk;
239    let updates = serde_json::to_value(rec)?;
240    merge_known(&mut merged, &updates);
241    let mut bytes = serde_json::to_vec_pretty(&merged)?;
242    bytes.push(b'\n');
243    Ok(Some(bytes))
244}
245
246/// Overwrite `target`'s top-level keys with `updates`' keys. Keys in
247/// `target` that aren't in `updates` are preserved. Recurses into
248/// the `data` sub-object so future fields inside `data` also survive.
249fn merge_known(target: &mut Value, updates: &Value) {
250    let (Some(t), Some(u)) = (target.as_object_mut(), updates.as_object()) else {
251        *target = updates.clone();
252        return;
253    };
254    for (k, v) in u {
255        if k == "data" {
256            if let Some(t_data) = t.get_mut("data") {
257                merge_known(t_data, v);
258                continue;
259            }
260        }
261        t.insert(k.clone(), v.clone());
262    }
263}