agentmux_srv\backend/
session_archive.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Session archival and cleanup (Phase 3.3 — ultra-long-sessions).
5//!
6//! Responsibilities:
7//!   1. Periodic sweep: archive sessions inactive for `inactive_days` days by
8//!      compressing their FileStore "output" file to `archive_dir/<block_id>.jsonl.gz`
9//!      and freeing the FileStore entry.
10//!   2. Storage cap: after archiving, prune oldest `.gz` files until total archive
11//!      disk usage is below `max_total_bytes` (default 2 GB).
12//!
13//! The archive/restore/export logic used by the sweep is the same as the
14//! RPC handlers in `server/app_api.rs` — both call `archive_session_output`
15//! and `read_session_output` from this module.
16
17use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use flate2::Compression;
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25
26use crate::backend::blockcontroller::session_stats::{
27    META_SESSION_LAST_ACTIVITY_MS, META_SESSION_LINE_COUNT,
28};
29use crate::backend::obj::{Block, MetaMapType};
30use crate::backend::storage::filestore::{FileStore, FileMeta, FileOpts};
31use crate::backend::storage::wstore::WaveStore;
32
33// ---- Meta key constants for archival state ----
34
35/// Unix ms when the session was last archived.
36pub const META_SESSION_ARCHIVED_AT: &str = "session:archived_at";
37/// Byte count before compression (original output size).
38pub const META_SESSION_ARCHIVED_BYTES: &str = "session:archived_bytes";
39/// Absolute path to the `.jsonl.gz` archive file.
40pub const META_SESSION_ARCHIVE_PATH: &str = "session:archive_path";
41
42/// FileStore filename for session output.
43const OUTPUT_FILENAME: &str = "output";
44
45// ---------------------------------------------------------------------------
46// Shared helpers — used by both the RPC handlers and the sweep
47// ---------------------------------------------------------------------------
48
49fn now_ms() -> i64 {
50    SystemTime::now()
51        .duration_since(UNIX_EPOCH)
52        .unwrap_or_default()
53        .as_millis() as i64
54}
55
56/// Compress `data` with gzip and write to `dest_path`.
57fn write_gz(data: &[u8], dest_path: &Path) -> Result<(), String> {
58    let file = std::fs::File::create(dest_path)
59        .map_err(|e| format!("create archive file {}: {e}", dest_path.display()))?;
60    let mut encoder = GzEncoder::new(file, Compression::default());
61    encoder.write_all(data)
62        .map_err(|e| format!("compress: {e}"))?;
63    encoder.finish()
64        .map_err(|e| format!("gz finish: {e}"))?;
65    Ok(())
66}
67
68/// Decompress a `.gz` file and return raw bytes.
69fn read_gz(src_path: &Path) -> Result<Vec<u8>, String> {
70    let file = std::fs::File::open(src_path)
71        .map_err(|e| format!("open archive {}: {e}", src_path.display()))?;
72    let mut decoder = GzDecoder::new(file);
73    let mut out = Vec::new();
74    decoder.read_to_end(&mut out)
75        .map_err(|e| format!("decompress: {e}"))?;
76    Ok(out)
77}
78
79/// Ensure the archive directory exists.
80fn ensure_archive_dir(archive_dir: &Path) -> Result<(), String> {
81    std::fs::create_dir_all(archive_dir)
82        .map_err(|e| format!("create archive dir {}: {e}", archive_dir.display()))
83}
84
85// ---------------------------------------------------------------------------
86// archive_session_output
87// ---------------------------------------------------------------------------
88
89/// Archive the FileStore "output" file for `block_id`:
90///   1. Read bytes from FileStore.
91///   2. Compress to `archive_dir/<block_id>.jsonl.gz`.
92///   3. Delete the FileStore entry to reclaim SQLite space.
93///   4. Write archive meta keys to the block.
94///
95/// Returns `(archived_bytes, archived_at_ms)`.
96/// If the FileStore entry is missing or empty, returns `(0, now_ms)` — no-op.
97pub fn archive_session_output(
98    wstore: &Arc<WaveStore>,
99    filestore: &Arc<FileStore>,
100    block_id: &str,
101    archive_dir: &Path,
102) -> Result<(u64, i64), String> {
103    // Read existing data from FileStore
104    let raw_bytes = match filestore.read_file(block_id, OUTPUT_FILENAME) {
105        Ok(Some(b)) if !b.is_empty() => b,
106        Ok(_) => {
107            // Nothing to archive
108            return Ok((0, now_ms()));
109        }
110        Err(e) => return Err(format!("filestore read: {e}")),
111    };
112
113    let archived_bytes = raw_bytes.len() as u64;
114
115    ensure_archive_dir(archive_dir)?;
116
117    let archive_path = archive_dir.join(format!("{}.jsonl.gz", block_id));
118    write_gz(&raw_bytes, &archive_path)?;
119
120    let archived_at = now_ms();
121
122    // Write archive metadata BEFORE deleting the FileStore entry. If the meta
123    // update fails, the session data is still retrievable from FileStore and
124    // the orphaned .gz can be cleaned up by the next sweep. Deleting first
125    // would orphan the session if the meta write then failed.
126    let mut meta = MetaMapType::new();
127    meta.insert(META_SESSION_ARCHIVED_AT.to_string(), serde_json::json!(archived_at));
128    meta.insert(META_SESSION_ARCHIVED_BYTES.to_string(), serde_json::json!(archived_bytes));
129    meta.insert(
130        META_SESSION_ARCHIVE_PATH.to_string(),
131        serde_json::json!(archive_path.to_string_lossy().as_ref()),
132    );
133
134    let oref_str = format!("block:{}", block_id);
135    if let Err(e) = crate::server::service::update_object_meta(wstore, &oref_str, &meta) {
136        // Roll back the archive file so we don't leak disk on retry
137        let _ = std::fs::remove_file(&archive_path);
138        return Err(format!("update_object_meta: {e}"));
139    }
140
141    // Meta is now persisted; safe to reclaim the FileStore entry
142    if let Err(e) = filestore.delete_file(block_id, OUTPUT_FILENAME) {
143        tracing::warn!(
144            block_id = %block_id,
145            error = %e,
146            "session_archive: failed to delete filestore entry after archiving (meta already updated)"
147        );
148    }
149
150    tracing::info!(
151        block_id = %block_id,
152        archived_bytes = archived_bytes,
153        archive_path = %archive_path.display(),
154        "session archived"
155    );
156
157    Ok((archived_bytes, archived_at))
158}
159
160// ---------------------------------------------------------------------------
161// restore_session_output
162// ---------------------------------------------------------------------------
163
164/// Restore an archived session back into FileStore:
165///   1. Read `session:archive_path` from block meta.
166///   2. Decompress and write bytes back via `make_file` + `append_data`.
167///   3. Clear archive meta keys (keep the .gz file as backup).
168pub fn restore_session_output(
169    wstore: &Arc<WaveStore>,
170    filestore: &Arc<FileStore>,
171    block_id: &str,
172) -> Result<u64, String> {
173    // Read archive path from block meta
174    let block: Block = wstore
175        .get(block_id)
176        .map_err(|e| format!("wstore.get: {e}"))?
177        .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", block_id))?;
178
179    let archive_path_str = block
180        .meta
181        .get(META_SESSION_ARCHIVE_PATH)
182        .and_then(|v| v.as_str())
183        .ok_or_else(|| format!("session:archive_path not set for block {}", block_id))?
184        .to_string();
185
186    let archive_path = PathBuf::from(&archive_path_str);
187    let raw_bytes = read_gz(&archive_path)?;
188    let restored_bytes = raw_bytes.len() as u64;
189
190    // Recreate the FileStore entry (may already be gone after archival)
191    let _ = filestore.delete_file(block_id, OUTPUT_FILENAME); // ignore "not found"
192    filestore
193        .make_file(block_id, OUTPUT_FILENAME, FileMeta::default(), FileOpts::default())
194        .map_err(|e| format!("make_file: {e}"))?;
195    filestore
196        .append_data(block_id, OUTPUT_FILENAME, &raw_bytes)
197        .map_err(|e| format!("append_data: {e}"))?;
198
199    // Clear archive meta keys (keep the .gz as backup — don't delete it)
200    let mut meta = MetaMapType::new();
201    meta.insert(META_SESSION_ARCHIVED_AT.to_string(), serde_json::Value::Null);
202    meta.insert(META_SESSION_ARCHIVED_BYTES.to_string(), serde_json::Value::Null);
203    meta.insert(META_SESSION_ARCHIVE_PATH.to_string(), serde_json::Value::Null);
204
205    let oref_str = format!("block:{}", block_id);
206    crate::server::service::update_object_meta(wstore, &oref_str, &meta)
207        .map_err(|e| format!("update_object_meta: {e}"))?;
208
209    tracing::info!(
210        block_id = %block_id,
211        restored_bytes = restored_bytes,
212        archive_path = %archive_path_str,
213        "session restored"
214    );
215
216    Ok(restored_bytes)
217}
218
219// ---------------------------------------------------------------------------
220// read_session_output — used by session:export
221// ---------------------------------------------------------------------------
222
223/// Read the raw session output bytes, whether from FileStore (live) or archive.
224/// Returns `(bytes, line_count)`.
225pub fn read_session_output(
226    wstore: &Arc<WaveStore>,
227    filestore: &Arc<FileStore>,
228    block_id: &str,
229) -> Result<(Vec<u8>, u64), String> {
230    // Check if session is archived
231    let block: Block = wstore
232        .get(block_id)
233        .map_err(|e| format!("wstore.get: {e}"))?
234        .ok_or_else(|| format!("BLOCK_NOT_FOUND: {}", block_id))?;
235
236    let is_archived = block
237        .meta
238        .get(META_SESSION_ARCHIVED_AT)
239        .and_then(|v| v.as_i64())
240        .map(|v| v > 0)
241        .unwrap_or(false);
242
243    let raw_bytes = if is_archived {
244        let archive_path_str = block
245            .meta
246            .get(META_SESSION_ARCHIVE_PATH)
247            .and_then(|v| v.as_str())
248            .ok_or_else(|| "session:archive_path not set".to_string())?
249            .to_string();
250        read_gz(Path::new(&archive_path_str))?
251    } else {
252        filestore
253            .read_file(block_id, OUTPUT_FILENAME)
254            .map_err(|e| format!("filestore read: {e}"))?
255            .unwrap_or_default()
256    };
257
258    let line_count = bytecount_lines(&raw_bytes);
259    Ok((raw_bytes, line_count))
260}
261
262/// Count non-empty lines in a byte buffer.
263fn bytecount_lines(data: &[u8]) -> u64 {
264    let text = String::from_utf8_lossy(data);
265    text.lines().filter(|l| !l.trim().is_empty()).count() as u64
266}
267
268// ---------------------------------------------------------------------------
269// Default archive directory
270// ---------------------------------------------------------------------------
271
272/// Returns `~/.agentmux/archives/`, or `None` if the home directory cannot be determined.
273pub fn default_archive_dir() -> Option<PathBuf> {
274    dirs::home_dir().map(|h| h.join(".agentmux").join("archives"))
275}
276
277// ---------------------------------------------------------------------------
278// SessionArchiver — periodic sweep
279// ---------------------------------------------------------------------------
280
281/// Periodic session archival + storage cap enforcement.
282pub struct SessionArchiver {
283    wstore: Arc<WaveStore>,
284    filestore: Arc<FileStore>,
285    /// Sessions with no activity for this many days get archived.
286    pub inactive_days: u64,
287    /// Maximum total bytes across all `.jsonl.gz` archives.
288    pub max_total_bytes: u64,
289    /// Directory where `.jsonl.gz` files are written.
290    pub archive_dir: PathBuf,
291}
292
293/// Statistics from a single sweep run.
294#[derive(Debug, Clone, Default)]
295pub struct SessionArchiverStats {
296    pub archived_count: u32,
297    pub pruned_count: u32,
298    pub bytes_freed: u64,
299}
300
301impl SessionArchiver {
302    pub fn new(
303        wstore: Arc<WaveStore>,
304        filestore: Arc<FileStore>,
305        inactive_days: u64,
306        max_total_bytes: u64,
307        archive_dir: PathBuf,
308    ) -> Self {
309        Self { wstore, filestore, inactive_days, max_total_bytes, archive_dir }
310    }
311
312    /// Run one sweep:
313    ///   1. Find all agent blocks inactive for `inactive_days`.
314    ///   2. Archive each one (compress → delete FileStore).
315    ///   3. Prune oldest archives if total disk usage exceeds `max_total_bytes`.
316    pub async fn sweep(&self) -> Result<SessionArchiverStats, String> {
317        let mut stats = SessionArchiverStats::default();
318        let now = now_ms();
319        let cutoff_ms = self.inactive_days as i64 * 86_400_000;
320
321        // Collect all blocks
322        let all_blocks: Vec<Block> = self
323            .wstore
324            .get_all::<Block>()
325            .map_err(|e| format!("get_all blocks: {e}"))?;
326
327        for block in &all_blocks {
328            // Only agent panes
329            let view = block.meta.get("view").and_then(|v| v.as_str()).unwrap_or("");
330            if view != "agent" {
331                continue;
332            }
333
334            // Skip if already archived
335            let archived_at = block
336                .meta
337                .get(META_SESSION_ARCHIVED_AT)
338                .and_then(|v| v.as_i64())
339                .unwrap_or(0);
340            if archived_at > 0 {
341                continue;
342            }
343
344            // Skip if session:last_activity_ms is missing (fresh session)
345            let last_activity = match block
346                .meta
347                .get(META_SESSION_LAST_ACTIVITY_MS)
348                .and_then(|v| v.as_i64())
349            {
350                Some(v) if v > 0 => v,
351                _ => continue,
352            };
353
354            // Skip if session has no lines
355            let line_count = block
356                .meta
357                .get(META_SESSION_LINE_COUNT)
358                .and_then(|v| v.as_u64())
359                .unwrap_or(0);
360            if line_count == 0 {
361                continue;
362            }
363
364            // Check inactivity threshold
365            if now - last_activity < cutoff_ms {
366                continue;
367            }
368
369            // Archive it
370            match archive_session_output(
371                &self.wstore,
372                &self.filestore,
373                &block.oid,
374                &self.archive_dir,
375            ) {
376                Ok((bytes, _)) => {
377                    stats.archived_count += 1;
378                    stats.bytes_freed += bytes;
379                    tracing::info!(
380                        block_id = %block.oid,
381                        bytes_freed = bytes,
382                        "session archiver: auto-archived inactive session"
383                    );
384                }
385                Err(e) => {
386                    tracing::warn!(
387                        block_id = %block.oid,
388                        error = %e,
389                        "session archiver: failed to archive session"
390                    );
391                }
392            }
393        }
394
395        // Prune oldest archives if over the storage cap
396        stats.pruned_count = self.prune_archives(&mut stats.bytes_freed)?;
397
398        Ok(stats)
399    }
400
401    /// Delete oldest `.jsonl.gz` files until total size is under `max_total_bytes`.
402    /// Returns number of files deleted.
403    fn prune_archives(&self, bytes_freed: &mut u64) -> Result<u32, String> {
404        let Ok(entries) = std::fs::read_dir(&self.archive_dir) else {
405            return Ok(0);
406        };
407
408        // Collect (mtime, path, size) for all .jsonl.gz files
409        let mut files: Vec<(std::time::SystemTime, PathBuf, u64)> = entries
410            .flatten()
411            .filter(|e| {
412                e.path()
413                    .extension()
414                    .map(|x| x == "gz")
415                    .unwrap_or(false)
416            })
417            .filter_map(|e| {
418                let meta = e.metadata().ok()?;
419                let mtime = meta.modified().ok()?;
420                let size = meta.len();
421                Some((mtime, e.path(), size))
422            })
423            .collect();
424
425        // Sort oldest-first
426        files.sort_by_key(|(mtime, _, _)| *mtime);
427
428        let total: u64 = files.iter().map(|(_, _, s)| s).sum();
429        if total <= self.max_total_bytes {
430            return Ok(0);
431        }
432
433        let mut remaining = total;
434        let mut pruned = 0u32;
435
436        for (_, path, size) in &files {
437            if remaining <= self.max_total_bytes {
438                break;
439            }
440            if let Err(e) = std::fs::remove_file(path) {
441                tracing::warn!(path = %path.display(), error = %e, "session archiver: failed to prune archive");
442                continue;
443            }
444            remaining = remaining.saturating_sub(*size);
445            *bytes_freed += size;
446            pruned += 1;
447            tracing::info!(
448                path = %path.display(),
449                size_freed = size,
450                "session archiver: pruned archive"
451            );
452        }
453
454        Ok(pruned)
455    }
456}
457
458// ---------------------------------------------------------------------------
459// Tests
460// ---------------------------------------------------------------------------
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use std::sync::Arc;
466    use crate::backend::storage::filestore::FileStore;
467    use crate::backend::storage::wstore::WaveStore;
468
469    /// Verify that a block with old last_activity gets archived by the sweeper.
470    /// Uses in-memory stores — no disk I/O needed for this unit test.
471    #[tokio::test]
472    async fn test_sweep_archives_inactive_session() {
473        // Build a temp dir for archives
474        let tmp_dir = tempfile::tempdir().expect("tempdir");
475        let archive_dir = tmp_dir.path().to_path_buf();
476
477        // In-memory FileStore with output data
478        let filestore = Arc::new(
479            FileStore::open_in_memory().expect("filestore"),
480        );
481        // Write a fake "output" file for the block. The ID must be a
482        // valid UUID — `archive_session_output` calls
483        // `update_object_meta`, which parses the ORef
484        // `format!("block:{}", block_id)` via `ORef::parse` and
485        // rejects non-UUID oids. The earlier "blk-sweep-test" string
486        // failed silently inside the sweep's logged-but-suppressed
487        // error path.
488        let block_id_owned = uuid::Uuid::new_v4().to_string();
489        let block_id = block_id_owned.as_str();
490        filestore
491            .make_file(block_id, OUTPUT_FILENAME, FileMeta::default(), FileOpts::default())
492            .expect("make_file");
493        filestore
494            .append_data(block_id, OUTPUT_FILENAME, b"line1\nline2\n")
495            .expect("append_data");
496
497        // In-memory WaveStore
498        let db_dir = tmp_dir.path().join("wdb");
499        std::fs::create_dir_all(&db_dir).unwrap();
500        let wstore = Arc::new(
501            WaveStore::open(&db_dir.join("objects.db")).expect("wstore"),
502        );
503
504        // Insert a fake Block object with required meta
505        use crate::backend::obj::{Block, MetaMapType};
506        let mut meta = MetaMapType::new();
507        meta.insert("view".to_string(), serde_json::json!("agent"));
508        // last_activity 10 days ago (well past the 7-day threshold)
509        let ten_days_ago = now_ms() - 10 * 86_400_000;
510        meta.insert(
511            META_SESSION_LAST_ACTIVITY_MS.to_string(),
512            serde_json::json!(ten_days_ago),
513        );
514        meta.insert(META_SESSION_LINE_COUNT.to_string(), serde_json::json!(2u64));
515
516        let mut block = Block {
517            oid: block_id.to_string(),
518            parentoref: String::new(),
519            version: 1,
520            runtimeopts: None,
521            stickers: None,
522            meta,
523            subblockids: None,
524        };
525        wstore.insert(&mut block).expect("wstore insert");
526
527        // Run the archiver (1-day inactive threshold to keep test fast)
528        let archiver = SessionArchiver::new(
529            wstore.clone(),
530            filestore.clone(),
531            1, // 1 day inactive threshold
532            2 * 1024 * 1024 * 1024,
533            archive_dir.clone(),
534        );
535
536        let stats = archiver.sweep().await.expect("sweep");
537        assert_eq!(stats.archived_count, 1, "should archive 1 session");
538        assert!(stats.bytes_freed > 0, "should free bytes");
539
540        // Verify the FileStore entry was deleted
541        let remaining = filestore.stat(block_id, OUTPUT_FILENAME).unwrap();
542        assert!(remaining.is_none(), "filestore entry should be deleted after archive");
543
544        // Verify the .gz file was created
545        let gz_path = archive_dir.join(format!("{}.jsonl.gz", block_id));
546        assert!(gz_path.exists(), ".gz file should exist");
547
548        // Verify the block meta was updated
549        let updated_block: Option<Block> = wstore.get(block_id).unwrap();
550        let updated_block = updated_block.expect("block still in store");
551        let archived_at = updated_block
552            .meta
553            .get(META_SESSION_ARCHIVED_AT)
554            .and_then(|v| v.as_i64())
555            .unwrap_or(0);
556        assert!(archived_at > 0, "session:archived_at should be set");
557    }
558
559    #[test]
560    fn test_bytecount_lines() {
561        assert_eq!(bytecount_lines(b""), 0);
562        assert_eq!(bytecount_lines(b"line1\n"), 1);
563        assert_eq!(bytecount_lines(b"line1\nline2\n"), 2);
564        assert_eq!(bytecount_lines(b"  \n\nline3\n"), 1); // blanks excluded
565    }
566
567    #[test]
568    fn test_gz_roundtrip() {
569        let tmp = tempfile::NamedTempFile::new().unwrap();
570        let data = b"hello compressed world\nline2\n";
571        write_gz(data, tmp.path()).unwrap();
572        let out = read_gz(tmp.path()).unwrap();
573        assert_eq!(out, data);
574    }
575}