agentmux_srv\backend\blockcontroller/
session_recovery.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Session recovery after unclean shutdown (Phase 4.2 — ultra-long-sessions).
5//!
6//! When a persistent agent subprocess is running and the server is killed
7//! (crash, OS reboot, task-kill), the subprocess dies with it but the block's
8//! session history is preserved in FileStore. On next startup, we want to:
9//!
10//!   1. Detect that a session was running when the server died, so we can
11//!      surface it to the user as "this session was interrupted".
12//!   2. Let the user resume: the persistent controller already supports
13//!      `--resume <session_id>` on the next input, so the recovery UX is
14//!      simply "click resume, type your next message, get picked up mid-flight".
15//!
16//! The mechanism is a single boolean meta flag: `session:active_pid`. It's
17//! set on subprocess spawn, cleared on clean exit (graceful or killed). If
18//! this flag is still set when the server boots, the process is definitely
19//! gone (old PID from a dead process), so we transfer it to
20//! `session:was_interrupted = true`.
21//!
22//! `session:was_interrupted` is a frontend-only signal — the backend doesn't
23//! consume it. The frontend `AgentControlBar` renders a banner when it's set,
24//! and `service:update_object_meta` clears it when the user dismisses.
25
26use std::sync::Arc;
27
28use crate::backend::obj::{Block, MetaMapType};
29use crate::backend::storage::wstore::WaveStore;
30
31/// PID of the current running subprocess; 0 or missing = no process.
32pub const META_SESSION_ACTIVE_PID: &str = "session:active_pid";
33/// Set to `true` by startup scan when a pre-existing `active_pid` is found.
34pub const META_SESSION_WAS_INTERRUPTED: &str = "session:was_interrupted";
35
36/// Record that a subprocess with `pid` has been spawned for `block_id`.
37/// Best-effort — logs on failure but never panics.
38pub fn mark_active_pid(wstore: &Arc<WaveStore>, block_id: &str, pid: u32) {
39    let mut meta = MetaMapType::new();
40    meta.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(pid));
41    // Clear any stale interrupt flag — we're running again.
42    meta.insert(META_SESSION_WAS_INTERRUPTED.to_string(), serde_json::Value::Null);
43    let oref_str = format!("block:{}", block_id);
44    if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
45        tracing::warn!(block_id = %block_id, error = %e, "session_recovery: failed to mark active pid");
46    }
47}
48
49/// Clear `session:active_pid` — called when the subprocess exits for any reason.
50pub fn clear_active_pid(wstore: &Arc<WaveStore>, block_id: &str) {
51    let mut meta = MetaMapType::new();
52    meta.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::Value::Null);
53    let oref_str = format!("block:{}", block_id);
54    if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
55        tracing::warn!(block_id = %block_id, error = %e, "session_recovery: failed to clear active pid");
56    }
57}
58
59/// Scan all blocks at server startup. For any agent block that still has
60/// `session:active_pid` set, transfer the flag to `session:was_interrupted`.
61///
62/// Returns the number of orphaned sessions found.
63pub fn scan_orphans(wstore: &Arc<WaveStore>) -> u32 {
64    let all_blocks = match wstore.get_all::<Block>() {
65        Ok(blocks) => blocks,
66        Err(e) => {
67            tracing::warn!(error = %e, "session_recovery: scan_orphans get_all failed");
68            return 0;
69        }
70    };
71
72    let mut count = 0u32;
73    for block in &all_blocks {
74        // Only agent panes
75        let view = block.meta.get("view").and_then(|v| v.as_str()).unwrap_or("");
76        if view != "agent" {
77            continue;
78        }
79
80        // Check for a stale active_pid
81        let pid = block
82            .meta
83            .get(META_SESSION_ACTIVE_PID)
84            .and_then(|v| v.as_u64())
85            .unwrap_or(0);
86        if pid == 0 {
87            continue;
88        }
89
90        // Transfer: clear active_pid, set was_interrupted
91        let mut update = MetaMapType::new();
92        update.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::Value::Null);
93        update.insert(
94            META_SESSION_WAS_INTERRUPTED.to_string(),
95            serde_json::json!(true),
96        );
97        let oref_str = format!("block:{}", block.oid);
98        match crate::server::service::update_object_meta(wstore, &oref_str, &update) {
99            Ok(()) => {
100                count += 1;
101                tracing::info!(
102                    block_id = %block.oid,
103                    stale_pid = pid,
104                    "session_recovery: marked orphaned session as interrupted"
105                );
106            }
107            Err(e) => {
108                tracing::warn!(
109                    block_id = %block.oid,
110                    error = %e,
111                    "session_recovery: failed to mark orphan"
112                );
113            }
114        }
115    }
116
117    if count > 0 {
118        tracing::info!(count = count, "session_recovery: scan_orphans complete");
119    }
120    count
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::backend::obj::Block;
127
128    /// Verify that scan_orphans transfers active_pid → was_interrupted on
129    /// agent blocks, and leaves non-agent blocks + blocks without active_pid
130    /// untouched.
131    #[test]
132    fn test_scan_orphans_transfers_flag() {
133        let tmp = tempfile::tempdir().unwrap();
134        let wstore = Arc::new(WaveStore::open(&tmp.path().join("objects.db")).unwrap());
135
136        // ORef parser requires valid UUIDs, so generate them inline.
137        let orphan_id = "11111111-1111-1111-1111-111111111111";
138        let clean_id = "22222222-2222-2222-2222-222222222222";
139        let term_id = "33333333-3333-3333-3333-333333333333";
140
141        // Agent block with stale active_pid — should be flagged.
142        let mut orphan = Block {
143            oid: orphan_id.to_string(),
144            parentoref: String::new(),
145            version: 1,
146            runtimeopts: None,
147            stickers: None,
148            meta: {
149                let mut m = MetaMapType::new();
150                m.insert("view".to_string(), serde_json::json!("agent"));
151                m.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(12345u32));
152                m
153            },
154            subblockids: None,
155        };
156        wstore.insert(&mut orphan).unwrap();
157
158        // Agent block without active_pid — should be left alone.
159        let mut clean = Block {
160            oid: clean_id.to_string(),
161            parentoref: String::new(),
162            version: 1,
163            runtimeopts: None,
164            stickers: None,
165            meta: {
166                let mut m = MetaMapType::new();
167                m.insert("view".to_string(), serde_json::json!("agent"));
168                m
169            },
170            subblockids: None,
171        };
172        wstore.insert(&mut clean).unwrap();
173
174        // Non-agent block with active_pid (shouldn't exist in practice, but
175        // sanity check the view filter) — should be left alone.
176        let mut nonagent = Block {
177            oid: term_id.to_string(),
178            parentoref: String::new(),
179            version: 1,
180            runtimeopts: None,
181            stickers: None,
182            meta: {
183                let mut m = MetaMapType::new();
184                m.insert("view".to_string(), serde_json::json!("term"));
185                m.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(67890u32));
186                m
187            },
188            subblockids: None,
189        };
190        wstore.insert(&mut nonagent).unwrap();
191
192        let count = scan_orphans(&wstore);
193        assert_eq!(count, 1, "exactly one agent orphan should be flagged");
194
195        // Verify orphan block meta
196        let after: Block = wstore.get(orphan_id).unwrap().unwrap();
197        assert!(
198            after.meta.get(META_SESSION_ACTIVE_PID).and_then(|v| v.as_u64()).unwrap_or(0) == 0,
199            "active_pid should be cleared"
200        );
201        assert_eq!(
202            after.meta.get(META_SESSION_WAS_INTERRUPTED).and_then(|v| v.as_bool()),
203            Some(true),
204            "was_interrupted should be true"
205        );
206
207        // Clean block untouched
208        let clean_after: Block = wstore.get(clean_id).unwrap().unwrap();
209        assert!(clean_after.meta.get(META_SESSION_WAS_INTERRUPTED).is_none());
210
211        // Non-agent block untouched (still has active_pid)
212        let term_after: Block = wstore.get(term_id).unwrap().unwrap();
213        assert_eq!(
214            term_after.meta.get(META_SESSION_ACTIVE_PID).and_then(|v| v.as_u64()),
215            Some(67890),
216        );
217    }
218}