agentmux_srv\backend\blockcontroller/
session_recovery.rs1use std::sync::Arc;
27
28use crate::backend::obj::{Block, MetaMapType};
29use crate::backend::storage::wstore::WaveStore;
30
31pub const META_SESSION_ACTIVE_PID: &str = "session:active_pid";
33pub const META_SESSION_WAS_INTERRUPTED: &str = "session:was_interrupted";
35
36pub fn mark_active_pid(wstore: &Arc<WaveStore>, block_id: &str, pid: u32) {
39 let mut meta = MetaMapType::new();
40 meta.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(pid));
41 meta.insert(META_SESSION_WAS_INTERRUPTED.to_string(), serde_json::Value::Null);
43 let oref_str = format!("block:{}", block_id);
44 if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
45 tracing::warn!(block_id = %block_id, error = %e, "session_recovery: failed to mark active pid");
46 }
47}
48
49pub fn clear_active_pid(wstore: &Arc<WaveStore>, block_id: &str) {
51 let mut meta = MetaMapType::new();
52 meta.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::Value::Null);
53 let oref_str = format!("block:{}", block_id);
54 if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
55 tracing::warn!(block_id = %block_id, error = %e, "session_recovery: failed to clear active pid");
56 }
57}
58
59pub fn scan_orphans(wstore: &Arc<WaveStore>) -> u32 {
64 let all_blocks = match wstore.get_all::<Block>() {
65 Ok(blocks) => blocks,
66 Err(e) => {
67 tracing::warn!(error = %e, "session_recovery: scan_orphans get_all failed");
68 return 0;
69 }
70 };
71
72 let mut count = 0u32;
73 for block in &all_blocks {
74 let view = block.meta.get("view").and_then(|v| v.as_str()).unwrap_or("");
76 if view != "agent" {
77 continue;
78 }
79
80 let pid = block
82 .meta
83 .get(META_SESSION_ACTIVE_PID)
84 .and_then(|v| v.as_u64())
85 .unwrap_or(0);
86 if pid == 0 {
87 continue;
88 }
89
90 let mut update = MetaMapType::new();
92 update.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::Value::Null);
93 update.insert(
94 META_SESSION_WAS_INTERRUPTED.to_string(),
95 serde_json::json!(true),
96 );
97 let oref_str = format!("block:{}", block.oid);
98 match crate::server::service::update_object_meta(wstore, &oref_str, &update) {
99 Ok(()) => {
100 count += 1;
101 tracing::info!(
102 block_id = %block.oid,
103 stale_pid = pid,
104 "session_recovery: marked orphaned session as interrupted"
105 );
106 }
107 Err(e) => {
108 tracing::warn!(
109 block_id = %block.oid,
110 error = %e,
111 "session_recovery: failed to mark orphan"
112 );
113 }
114 }
115 }
116
117 if count > 0 {
118 tracing::info!(count = count, "session_recovery: scan_orphans complete");
119 }
120 count
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::backend::obj::Block;
127
128 #[test]
132 fn test_scan_orphans_transfers_flag() {
133 let tmp = tempfile::tempdir().unwrap();
134 let wstore = Arc::new(WaveStore::open(&tmp.path().join("objects.db")).unwrap());
135
136 let orphan_id = "11111111-1111-1111-1111-111111111111";
138 let clean_id = "22222222-2222-2222-2222-222222222222";
139 let term_id = "33333333-3333-3333-3333-333333333333";
140
141 let mut orphan = Block {
143 oid: orphan_id.to_string(),
144 parentoref: String::new(),
145 version: 1,
146 runtimeopts: None,
147 stickers: None,
148 meta: {
149 let mut m = MetaMapType::new();
150 m.insert("view".to_string(), serde_json::json!("agent"));
151 m.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(12345u32));
152 m
153 },
154 subblockids: None,
155 };
156 wstore.insert(&mut orphan).unwrap();
157
158 let mut clean = Block {
160 oid: clean_id.to_string(),
161 parentoref: String::new(),
162 version: 1,
163 runtimeopts: None,
164 stickers: None,
165 meta: {
166 let mut m = MetaMapType::new();
167 m.insert("view".to_string(), serde_json::json!("agent"));
168 m
169 },
170 subblockids: None,
171 };
172 wstore.insert(&mut clean).unwrap();
173
174 let mut nonagent = Block {
177 oid: term_id.to_string(),
178 parentoref: String::new(),
179 version: 1,
180 runtimeopts: None,
181 stickers: None,
182 meta: {
183 let mut m = MetaMapType::new();
184 m.insert("view".to_string(), serde_json::json!("term"));
185 m.insert(META_SESSION_ACTIVE_PID.to_string(), serde_json::json!(67890u32));
186 m
187 },
188 subblockids: None,
189 };
190 wstore.insert(&mut nonagent).unwrap();
191
192 let count = scan_orphans(&wstore);
193 assert_eq!(count, 1, "exactly one agent orphan should be flagged");
194
195 let after: Block = wstore.get(orphan_id).unwrap().unwrap();
197 assert!(
198 after.meta.get(META_SESSION_ACTIVE_PID).and_then(|v| v.as_u64()).unwrap_or(0) == 0,
199 "active_pid should be cleared"
200 );
201 assert_eq!(
202 after.meta.get(META_SESSION_WAS_INTERRUPTED).and_then(|v| v.as_bool()),
203 Some(true),
204 "was_interrupted should be true"
205 );
206
207 let clean_after: Block = wstore.get(clean_id).unwrap().unwrap();
209 assert!(clean_after.meta.get(META_SESSION_WAS_INTERRUPTED).is_none());
210
211 let term_after: Block = wstore.get(term_id).unwrap().unwrap();
213 assert_eq!(
214 term_after.meta.get(META_SESSION_ACTIVE_PID).and_then(|v| v.as_u64()),
215 Some(67890),
216 );
217 }
218}