1use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use flate2::Compression;
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25
26use crate::backend::blockcontroller::session_stats::{
27 META_SESSION_LAST_ACTIVITY_MS, META_SESSION_LINE_COUNT,
28};
29use crate::backend::obj::{Block, MetaMapType};
30use crate::backend::storage::filestore::{FileStore, FileMeta, FileOpts};
31use crate::backend::storage::wstore::WaveStore;
32
33pub const META_SESSION_ARCHIVED_AT: &str = "session:archived_at";
37pub const META_SESSION_ARCHIVED_BYTES: &str = "session:archived_bytes";
39pub const META_SESSION_ARCHIVE_PATH: &str = "session:archive_path";
41
42const OUTPUT_FILENAME: &str = "output";
44
45fn now_ms() -> i64 {
50 SystemTime::now()
51 .duration_since(UNIX_EPOCH)
52 .unwrap_or_default()
53 .as_millis() as i64
54}
55
56fn write_gz(data: &[u8], dest_path: &Path) -> Result<(), String> {
58 let file = std::fs::File::create(dest_path)
59 .map_err(|e| format!("create archive file {}: {e}", dest_path.display()))?;
60 let mut encoder = GzEncoder::new(file, Compression::default());
61 encoder.write_all(data)
62 .map_err(|e| format!("compress: {e}"))?;
63 encoder.finish()
64 .map_err(|e| format!("gz finish: {e}"))?;
65 Ok(())
66}
67
68fn read_gz(src_path: &Path) -> Result<Vec<u8>, String> {
70 let file = std::fs::File::open(src_path)
71 .map_err(|e| format!("open archive {}: {e}", src_path.display()))?;
72 let mut decoder = GzDecoder::new(file);
73 let mut out = Vec::new();
74 decoder.read_to_end(&mut out)
75 .map_err(|e| format!("decompress: {e}"))?;
76 Ok(out)
77}
78
79fn ensure_archive_dir(archive_dir: &Path) -> Result<(), String> {
81 std::fs::create_dir_all(archive_dir)
82 .map_err(|e| format!("create archive dir {}: {e}", archive_dir.display()))
83}
84
85pub fn archive_session_output(
98 wstore: &Arc<WaveStore>,
99 filestore: &Arc<FileStore>,
100 block_id: &str,
101 archive_dir: &Path,
102) -> Result<(u64, i64), String> {
103 let raw_bytes = match filestore.read_file(block_id, OUTPUT_FILENAME) {
105 Ok(Some(b)) if !b.is_empty() => b,
106 Ok(_) => {
107 return Ok((0, now_ms()));
109 }
110 Err(e) => return Err(format!("filestore read: {e}")),
111 };
112
113 let archived_bytes = raw_bytes.len() as u64;
114
115 ensure_archive_dir(archive_dir)?;
116
117 let archive_path = archive_dir.join(format!("{}.jsonl.gz", block_id));
118 write_gz(&raw_bytes, &archive_path)?;
119
120 let archived_at = now_ms();
121
122 let mut meta = MetaMapType::new();
127 meta.insert(META_SESSION_ARCHIVED_AT.to_string(), serde_json::json!(archived_at));
128 meta.insert(META_SESSION_ARCHIVED_BYTES.to_string(), serde_json::json!(archived_bytes));
129 meta.insert(
130 META_SESSION_ARCHIVE_PATH.to_string(),
131 serde_json::json!(archive_path.to_string_lossy().as_ref()),
132 );
133
134 let oref_str = format!("block:{}", block_id);
135 if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
136 let _ = std::fs::remove_file(&archive_path);
138 return Err(format!("update_object_meta: {e}"));
139 }
140
141 if let Err(e) = filestore.delete_file(block_id, OUTPUT_FILENAME) {
143 tracing::warn!(
144 block_id = %block_id,
145 error = %e,
146 "session_archive: failed to delete filestore entry after archiving (meta already updated)"
147 );
148 }
149
150 tracing::info!(
151 block_id = %block_id,
152 archived_bytes = archived_bytes,
153 archive_path = %archive_path.display(),
154 "session archived"
155 );
156
157 Ok((archived_bytes, archived_at))
158}
159
160pub fn restore_session_output(
169 wstore: &Arc<WaveStore>,
170 filestore: &Arc<FileStore>,
171 block_id: &str,
172) -> Result<u64, String> {
173 let block: Block = wstore
175 .get(block_id)
176 .map_err(|e| format!("wstore.get: {e}"))?
177 .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", block_id))?;
178
179 let archive_path_str = block
180 .meta
181 .get(META_SESSION_ARCHIVE_PATH)
182 .and_then(|v| v.as_str())
183 .ok_or_else(|| format!("session:archive_path not set for block {}", block_id))?
184 .to_string();
185
186 let archive_path = PathBuf::from(&archive_path_str);
187 let raw_bytes = read_gz(&archive_path)?;
188 let restored_bytes = raw_bytes.len() as u64;
189
190 let _ = filestore.delete_file(block_id, OUTPUT_FILENAME); filestore
193 .make_file(block_id, OUTPUT_FILENAME, FileMeta::default(), FileOpts::default())
194 .map_err(|e| format!("make_file: {e}"))?;
195 filestore
196 .append_data(block_id, OUTPUT_FILENAME, &raw_bytes)
197 .map_err(|e| format!("append_data: {e}"))?;
198
199 let mut meta = MetaMapType::new();
201 meta.insert(META_SESSION_ARCHIVED_AT.to_string(), serde_json::Value::Null);
202 meta.insert(META_SESSION_ARCHIVED_BYTES.to_string(), serde_json::Value::Null);
203 meta.insert(META_SESSION_ARCHIVE_PATH.to_string(), serde_json::Value::Null);
204
205 let oref_str = format!("block:{}", block_id);
206 crate::server::service::update_object_meta(wstore, &oref_str, &meta)
207 .map_err(|e| format!("update_object_meta: {e}"))?;
208
209 tracing::info!(
210 block_id = %block_id,
211 restored_bytes = restored_bytes,
212 archive_path = %archive_path_str,
213 "session restored"
214 );
215
216 Ok(restored_bytes)
217}
218
219pub fn read_session_output(
226 wstore: &Arc<WaveStore>,
227 filestore: &Arc<FileStore>,
228 block_id: &str,
229) -> Result<(Vec<u8>, u64), String> {
230 let block: Block = wstore
232 .get(block_id)
233 .map_err(|e| format!("wstore.get: {e}"))?
234 .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", block_id))?;
235
236 let is_archived = block
237 .meta
238 .get(META_SESSION_ARCHIVED_AT)
239 .and_then(|v| v.as_i64())
240 .map(|v| v > 0)
241 .unwrap_or(false);
242
243 let raw_bytes = if is_archived {
244 let archive_path_str = block
245 .meta
246 .get(META_SESSION_ARCHIVE_PATH)
247 .and_then(|v| v.as_str())
248 .ok_or_else(|| "session:archive_path not set".to_string())?
249 .to_string();
250 read_gz(Path::new(&archive_path_str))?
251 } else {
252 filestore
253 .read_file(block_id, OUTPUT_FILENAME)
254 .map_err(|e| format!("filestore read: {e}"))?
255 .unwrap_or_default()
256 };
257
258 let line_count = bytecount_lines(&raw_bytes);
259 Ok((raw_bytes, line_count))
260}
261
262fn bytecount_lines(data: &[u8]) -> u64 {
264 let text = String::from_utf8_lossy(data);
265 text.lines().filter(|l| !l.trim().is_empty()).count() as u64
266}
267
268pub fn default_archive_dir() -> Option<PathBuf> {
274 dirs::home_dir().map(|h| h.join(".agentmux").join("archives"))
275}
276
277pub struct SessionArchiver {
283 wstore: Arc<WaveStore>,
284 filestore: Arc<FileStore>,
285 pub inactive_days: u64,
287 pub max_total_bytes: u64,
289 pub archive_dir: PathBuf,
291}
292
293#[derive(Debug, Clone, Default)]
295pub struct SessionArchiverStats {
296 pub archived_count: u32,
297 pub pruned_count: u32,
298 pub bytes_freed: u64,
299}
300
301impl SessionArchiver {
302 pub fn new(
303 wstore: Arc<WaveStore>,
304 filestore: Arc<FileStore>,
305 inactive_days: u64,
306 max_total_bytes: u64,
307 archive_dir: PathBuf,
308 ) -> Self {
309 Self { wstore, filestore, inactive_days, max_total_bytes, archive_dir }
310 }
311
312 pub async fn sweep(&self) -> Result<SessionArchiverStats, String> {
317 let mut stats = SessionArchiverStats::default();
318 let now = now_ms();
319 let cutoff_ms = self.inactive_days as i64 * 86_400_000;
320
321 let all_blocks: Vec<Block> = self
323 .wstore
324 .get_all::<Block>()
325 .map_err(|e| format!("get_all blocks: {e}"))?;
326
327 for block in &all_blocks {
328 let view = block.meta.get("view").and_then(|v| v.as_str()).unwrap_or("");
330 if view != "agent" {
331 continue;
332 }
333
334 let archived_at = block
336 .meta
337 .get(META_SESSION_ARCHIVED_AT)
338 .and_then(|v| v.as_i64())
339 .unwrap_or(0);
340 if archived_at > 0 {
341 continue;
342 }
343
344 let last_activity = match block
346 .meta
347 .get(META_SESSION_LAST_ACTIVITY_MS)
348 .and_then(|v| v.as_i64())
349 {
350 Some(v) if v > 0 => v,
351 _ => continue,
352 };
353
354 let line_count = block
356 .meta
357 .get(META_SESSION_LINE_COUNT)
358 .and_then(|v| v.as_u64())
359 .unwrap_or(0);
360 if line_count == 0 {
361 continue;
362 }
363
364 if now - last_activity < cutoff_ms {
366 continue;
367 }
368
369 match archive_session_output(
371 &self.wstore,
372 &self.filestore,
373 &block.oid,
374 &self.archive_dir,
375 ) {
376 Ok((bytes, _)) => {
377 stats.archived_count += 1;
378 stats.bytes_freed += bytes;
379 tracing::info!(
380 block_id = %block.oid,
381 bytes_freed = bytes,
382 "session archiver: auto-archived inactive session"
383 );
384 }
385 Err(e) => {
386 tracing::warn!(
387 block_id = %block.oid,
388 error = %e,
389 "session archiver: failed to archive session"
390 );
391 }
392 }
393 }
394
395 stats.pruned_count = self.prune_archives(&mut stats.bytes_freed)?;
397
398 Ok(stats)
399 }
400
401 fn prune_archives(&self, bytes_freed: &mut u64) -> Result<u32, String> {
404 let Ok(entries) = std::fs::read_dir(&self.archive_dir) else {
405 return Ok(0);
406 };
407
408 let mut files: Vec<(std::time::SystemTime, PathBuf, u64)> = entries
410 .flatten()
411 .filter(|e| {
412 e.path()
413 .extension()
414 .map(|x| x == "gz")
415 .unwrap_or(false)
416 })
417 .filter_map(|e| {
418 let meta = e.metadata().ok()?;
419 let mtime = meta.modified().ok()?;
420 let size = meta.len();
421 Some((mtime, e.path(), size))
422 })
423 .collect();
424
425 files.sort_by_key(|(mtime, _, _)| *mtime);
427
428 let total: u64 = files.iter().map(|(_, _, s)| s).sum();
429 if total <= self.max_total_bytes {
430 return Ok(0);
431 }
432
433 let mut remaining = total;
434 let mut pruned = 0u32;
435
436 for (_, path, size) in &files {
437 if remaining <= self.max_total_bytes {
438 break;
439 }
440 if let Err(e) = std::fs::remove_file(path) {
441 tracing::warn!(path = %path.display(), error = %e, "session archiver: failed to prune archive");
442 continue;
443 }
444 remaining = remaining.saturating_sub(*size);
445 *bytes_freed += size;
446 pruned += 1;
447 tracing::info!(
448 path = %path.display(),
449 size_freed = size,
450 "session archiver: pruned archive"
451 );
452 }
453
454 Ok(pruned)
455 }
456}
457
458#[cfg(test)]
463mod tests {
464 use super::*;
465 use std::sync::Arc;
466 use crate::backend::storage::filestore::FileStore;
467 use crate::backend::storage::wstore::WaveStore;
468
469 #[tokio::test]
472 async fn test_sweep_archives_inactive_session() {
473 let tmp_dir = tempfile::tempdir().expect("tempdir");
475 let archive_dir = tmp_dir.path().to_path_buf();
476
477 let filestore = Arc::new(
479 FileStore::open_in_memory().expect("filestore"),
480 );
481 let block_id_owned = uuid::Uuid::new_v4().to_string();
489 let block_id = block_id_owned.as_str();
490 filestore
491 .make_file(block_id, OUTPUT_FILENAME, FileMeta::default(), FileOpts::default())
492 .expect("make_file");
493 filestore
494 .append_data(block_id, OUTPUT_FILENAME, b"line1\nline2\n")
495 .expect("append_data");
496
497 let db_dir = tmp_dir.path().join("wdb");
499 std::fs::create_dir_all(&db_dir).unwrap();
500 let wstore = Arc::new(
501 WaveStore::open(&db_dir.join("objects.db")).expect("wstore"),
502 );
503
504 use crate::backend::obj::{Block, MetaMapType};
506 let mut meta = MetaMapType::new();
507 meta.insert("view".to_string(), serde_json::json!("agent"));
508 let ten_days_ago = now_ms() - 10 * 86_400_000;
510 meta.insert(
511 META_SESSION_LAST_ACTIVITY_MS.to_string(),
512 serde_json::json!(ten_days_ago),
513 );
514 meta.insert(META_SESSION_LINE_COUNT.to_string(), serde_json::json!(2u64));
515
516 let mut block = Block {
517 oid: block_id.to_string(),
518 parentoref: String::new(),
519 version: 1,
520 runtimeopts: None,
521 stickers: None,
522 meta,
523 subblockids: None,
524 };
525 wstore.insert(&mut block).expect("wstore insert");
526
527 let archiver = SessionArchiver::new(
529 wstore.clone(),
530 filestore.clone(),
531 1, 2 * 1024 * 1024 * 1024,
533 archive_dir.clone(),
534 );
535
536 let stats = archiver.sweep().await.expect("sweep");
537 assert_eq!(stats.archived_count, 1, "should archive 1 session");
538 assert!(stats.bytes_freed > 0, "should free bytes");
539
540 let remaining = filestore.stat(block_id, OUTPUT_FILENAME).unwrap();
542 assert!(remaining.is_none(), "filestore entry should be deleted after archive");
543
544 let gz_path = archive_dir.join(format!("{}.jsonl.gz", block_id));
546 assert!(gz_path.exists(), ".gz file should exist");
547
548 let updated_block: Option<Block> = wstore.get(block_id).unwrap();
550 let updated_block = updated_block.expect("block still in store");
551 let archived_at = updated_block
552 .meta
553 .get(META_SESSION_ARCHIVED_AT)
554 .and_then(|v| v.as_i64())
555 .unwrap_or(0);
556 assert!(archived_at > 0, "session:archived_at should be set");
557 }
558
559 #[test]
560 fn test_bytecount_lines() {
561 assert_eq!(bytecount_lines(b""), 0);
562 assert_eq!(bytecount_lines(b"line1\n"), 1);
563 assert_eq!(bytecount_lines(b"line1\nline2\n"), 2);
564 assert_eq!(bytecount_lines(b" \n\nline3\n"), 1); }
566
567 #[test]
568 fn test_gz_roundtrip() {
569 let tmp = tempfile::NamedTempFile::new().unwrap();
570 let data = b"hello compressed world\nline2\n";
571 write_gz(data, tmp.path()).unwrap();
572 let out = read_gz(tmp.path()).unwrap();
573 assert_eq!(out, data);
574 }
575}