agentmux_srv\registry/
store.rs1use std::path::{Path, PathBuf};
13use std::sync::Mutex;
14
15use serde_json::Value;
16use thiserror::Error;
17
18use super::atomic::{rename_atomic, write_atomic};
19use super::schema::{validate, NamedAgentRecord, ValidationError, MAX_SUPPORTED_SCHEMA};
20
21#[derive(Debug, Error)]
22pub enum RegistryError {
23 #[error("io: {0}")]
24 Io(#[from] std::io::Error),
25 #[error("json: {0}")]
26 Json(#[from] serde_json::Error),
27 #[error("validation: {0}")]
28 Validation(#[from] ValidationError),
29}
30
31pub struct Registry {
32 root: PathBuf,
33 write_lock: Mutex<()>,
34}
35
36impl Registry {
37 pub fn open(root: PathBuf) -> Result<Self, RegistryError> {
40 std::fs::create_dir_all(&root)?;
41 std::fs::create_dir_all(root.join("retired"))?;
42 Ok(Self {
43 root,
44 write_lock: Mutex::new(()),
45 })
46 }
47
48 pub fn root(&self) -> &Path {
49 &self.root
50 }
51
52 pub fn agents_root(&self) -> Option<&Path> {
59 self.root.parent()
60 }
61
62 fn active_path(&self, instance_id: &str) -> PathBuf {
63 self.root.join(format!("{instance_id}.json"))
64 }
65
66 fn retired_path(&self, instance_id: &str) -> PathBuf {
67 self.root.join("retired").join(format!("{instance_id}.json"))
68 }
69
70 pub fn upsert(&self, rec: &NamedAgentRecord) -> Result<(), RegistryError> {
80 let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
81 let path = self.active_path(&rec.data.instance_id);
82 let bytes = match std::fs::read(&path) {
83 Ok(existing) => match merge_for_write(&existing, rec)? {
84 Some(b) => b,
85 None => return Ok(()),
86 },
87 Err(e) if e.kind() == std::io::ErrorKind::NotFound => to_pretty(rec)?,
88 Err(e) => return Err(e.into()),
89 };
90 write_atomic(&path, &bytes)?;
91 Ok(())
92 }
93
94 pub fn retire(&self, instance_id: &str) -> Result<(), RegistryError> {
97 let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
98 let from = self.active_path(instance_id);
99 if !from.exists() {
100 return Ok(());
101 }
102 rename_atomic(&from, &self.retired_path(instance_id))?;
103 Ok(())
104 }
105
106 pub fn unretire(&self, instance_id: &str) -> Result<(), RegistryError> {
108 let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
109 let from = self.retired_path(instance_id);
110 if !from.exists() {
111 return Ok(());
112 }
113 rename_atomic(&from, &self.active_path(instance_id))?;
114 Ok(())
115 }
116
117 pub fn hard_delete(&self, instance_id: &str) -> Result<(), RegistryError> {
120 let _g = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
121 for p in [self.active_path(instance_id), self.retired_path(instance_id)] {
122 match std::fs::remove_file(&p) {
123 Ok(()) => {}
124 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
125 Err(e) => return Err(e.into()),
126 }
127 }
128 Ok(())
129 }
130
131 pub fn exists(&self, instance_id: &str) -> bool {
136 self.active_path(instance_id).exists()
137 }
138
139 pub fn exists_anywhere(&self, instance_id: &str) -> bool {
144 self.active_path(instance_id).exists() || self.retired_path(instance_id).exists()
145 }
146
147 pub fn list_active(&self) -> Result<Vec<NamedAgentRecord>, RegistryError> {
151 let mut out = Vec::new();
152 for entry in std::fs::read_dir(&self.root)? {
153 let entry = entry?;
154 let path = entry.path();
155 if !path.is_file() {
156 continue;
157 }
158 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
159 continue;
160 };
161 if path.extension().and_then(|s| s.to_str()) != Some("json") {
162 continue;
163 }
164 match read_and_validate(&path, stem) {
165 Ok(rec) => out.push(rec),
166 Err(e) => {
167 tracing::warn!(
168 file = %path.display(),
169 error = %e,
170 "registry: skipping invalid record"
171 );
172 }
173 }
174 }
175 Ok(out)
176 }
177}
178
179fn read_and_validate(path: &Path, stem: &str) -> Result<NamedAgentRecord, RegistryError> {
180 let bytes = std::fs::read(path)?;
181 let rec: NamedAgentRecord = serde_json::from_slice(&bytes)?;
182 validate(stem, &rec)?;
183 Ok(rec)
184}
185
186fn to_pretty(rec: &NamedAgentRecord) -> Result<Vec<u8>, serde_json::Error> {
187 let mut bytes = serde_json::to_vec_pretty(rec)?;
188 bytes.push(b'\n');
189 Ok(bytes)
190}
191
192fn merge_for_write(
200 existing: &[u8],
201 rec: &NamedAgentRecord,
202) -> Result<Option<Vec<u8>>, RegistryError> {
203 let on_disk: Value = match serde_json::from_slice(existing) {
204 Ok(v) => v,
205 Err(e) => {
206 tracing::warn!(
207 error = %e,
208 "registry: existing file is unparseable JSON — refusing to overwrite (may be a newer schema)"
209 );
210 return Ok(None);
211 }
212 };
213 let on_disk_version = on_disk
218 .get("schema_version")
219 .and_then(|v| v.as_u64())
220 .and_then(|v| u32::try_from(v).ok());
221 match on_disk_version {
222 Some(v) if v > MAX_SUPPORTED_SCHEMA => {
223 tracing::warn!(
224 on_disk = v,
225 writer_max = MAX_SUPPORTED_SCHEMA,
226 "registry: on-disk schema_version > writer max — refusing downgrade"
227 );
228 return Ok(None);
229 }
230 Some(_) => {}
231 None => {
232 tracing::warn!(
233 "registry: existing file lacks schema_version — refusing to overwrite"
234 );
235 return Ok(None);
236 }
237 }
238 let mut merged = on_disk;
239 let updates = serde_json::to_value(rec)?;
240 merge_known(&mut merged, &updates);
241 let mut bytes = serde_json::to_vec_pretty(&merged)?;
242 bytes.push(b'\n');
243 Ok(Some(bytes))
244}
245
246fn merge_known(target: &mut Value, updates: &Value) {
250 let (Some(t), Some(u)) = (target.as_object_mut(), updates.as_object()) else {
251 *target = updates.clone();
252 return;
253 };
254 for (k, v) in u {
255 if k == "data" {
256 if let Some(t_data) = t.get_mut("data") {
257 merge_known(t_data, v);
258 continue;
259 }
260 }
261 t.insert(k.clone(), v.clone());
262 }
263}