agentmux_srv\backend\reactive/
registry.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! File-based cross-instance agent registry.
5//!
6//! Each AgentMux instance writes agent registrations to
7//! `{data_dir}/agents/{agent_id}.json`. When a local inject fails with
8//! "agent not found", the inject handler looks up this registry and
9//! HTTP-forwards the request to the owning instance.
10//!
11//! Lifecycle:
12//! - Register: write file (on HTTP register endpoint + shell auto-register)
13//! - Unregister: delete file (on HTTP unregister endpoint + process exit)
14//! - Cleanup: TTL-based removal of stale files at startup
15
16use serde::{Deserialize, Serialize};
17use std::path::{Path, PathBuf};
18use std::sync::OnceLock;
19
20use super::now_unix_millis;
21
22/// One entry per registered agent in the shared data dir.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct AgentEntry {
25    pub agent_id: String,
26    /// Local HTTP URL of the owning AgentMux instance (e.g. http://127.0.0.1:PORT).
27    pub local_url: String,
28    pub block_id: String,
29    /// OS PID of the owning agentmux-srv process.
30    pub pid: u32,
31    /// Unix milliseconds of last update.
32    pub updated_at: u64,
33    /// Per-launch auth key of the owning instance. Required by peers
34    /// performing cross-instance HTTP forward of `/agentmux/reactive/inject`
35    /// after the route moved under auth_middleware. Optional in the
36    /// struct (serde default) so older on-disk entries still deserialize;
37    /// a missing or empty value means a forward to this entry will be
38    /// rejected by the peer's auth layer (graceful — falls back to cloud
39    /// agentbus).
40    #[serde(default)]
41    pub auth_key: String,
42}
43
44/// Process-wide auth key for the local AgentMux instance.
45///
46/// Initialised once by `main.rs` after `Config::from_env_and_args` reads
47/// `AGENTMUX_AUTH_KEY` and removes it from the env. The registry write
48/// path reads this to populate `AgentEntry::auth_key`, which lets peers
49/// authenticate cross-instance inject forwards.
50///
51/// Tests and the `register` HTTP handler (which has `state.auth_key`)
52/// can both pre-set this safely — the first `set` wins.
53static LOCAL_AUTH_KEY: OnceLock<String> = OnceLock::new();
54
55/// Initialise the process's local auth key. Idempotent — first call wins.
56pub fn init_local_auth_key(key: impl Into<String>) {
57    let _ = LOCAL_AUTH_KEY.set(key.into());
58}
59
60fn local_auth_key() -> &'static str {
61    LOCAL_AUTH_KEY.get().map(String::as_str).unwrap_or("")
62}
63
64fn agents_dir(data_dir: &Path) -> PathBuf {
65    data_dir.join("agents")
66}
67
68fn agent_path(data_dir: &Path, agent_id: &str) -> PathBuf {
69    // Sanitize: only allow alphanumeric, dash, underscore to prevent path traversal.
70    let safe: String = agent_id
71        .chars()
72        .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' })
73        .collect();
74    agents_dir(data_dir).join(format!("{}.json", safe))
75}
76
77/// Write (create or update) an agent entry in the shared registry.
78///
79/// The entry includes the writing instance's auth_key (from
80/// `LOCAL_AUTH_KEY`) so a peer performing an HTTP forward of a missed
81/// inject can authenticate. On Unix the file is created with mode 0600
82/// **at open time** (not write-then-chmod, which would briefly expose
83/// the file at the default umask — same security boundary as the
84/// existing `authkey.dev` file). On Windows, default ACLs inherit
85/// user-only on user-owned directories.
86pub fn write(data_dir: &Path, agent_id: &str, local_url: &str, block_id: &str) {
87    let dir = agents_dir(data_dir);
88    let _ = std::fs::create_dir_all(&dir);
89    let entry = AgentEntry {
90        agent_id: agent_id.to_string(),
91        local_url: local_url.to_string(),
92        block_id: block_id.to_string(),
93        pid: std::process::id(),
94        updated_at: now_unix_millis(),
95        auth_key: local_auth_key().to_string(),
96    };
97    let path = agent_path(data_dir, agent_id);
98    let Ok(json) = serde_json::to_string(&entry) else { return };
99
100    let mut opts = std::fs::OpenOptions::new();
101    opts.write(true).create(true).truncate(true);
102    #[cfg(unix)]
103    {
104        use std::os::unix::fs::OpenOptionsExt;
105        opts.mode(0o600);
106    }
107    if let Ok(mut f) = opts.open(&path) {
108        use std::io::Write;
109        let _ = f.write_all(json.as_bytes());
110    }
111}
112
113/// Remove an agent entry from the shared registry.
114pub fn remove(data_dir: &Path, agent_id: &str) {
115    let _ = std::fs::remove_file(agent_path(data_dir, agent_id));
116}
117
118/// Look up an agent entry. Returns None if not found or file is malformed.
119pub fn lookup(data_dir: &Path, agent_id: &str) -> Option<AgentEntry> {
120    let content = std::fs::read_to_string(agent_path(data_dir, agent_id)).ok()?;
121    serde_json::from_str(&content).ok()
122}
123
124/// Remove stale entries at startup.
125///
126/// An entry is considered stale if `updated_at` is older than `max_age_ms`.
127/// The default is 4 hours — well beyond any reasonable agent session.
128/// Entries are also removed if their JSON is malformed.
129pub fn cleanup_stale(data_dir: &Path, max_age_ms: u64) {
130    let dir = agents_dir(data_dir);
131    let Ok(entries) = std::fs::read_dir(&dir) else { return };
132    let cutoff = now_unix_millis().saturating_sub(max_age_ms);
133    for entry in entries.flatten() {
134        let path = entry.path();
135        if path.extension().and_then(|e| e.to_str()) != Some("json") {
136            continue;
137        }
138        let Ok(content) = std::fs::read_to_string(&path) else {
139            let _ = std::fs::remove_file(&path);
140            continue;
141        };
142        match serde_json::from_str::<AgentEntry>(&content) {
143            Ok(agent) if agent.updated_at >= cutoff => {} // still fresh
144            _ => {
145                let _ = std::fs::remove_file(&path);
146            }
147        }
148    }
149}