agentmux_srv\backend\blockcontroller/
session_stats.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Session metadata tracking for agent panes (Phase 1.4 — ultra-long-sessions).
5//!
6//! Tracks per-session stats as block metadata keys:
7//!   `session:start_ts_ms`    — Unix ms when the first output line arrived
8//!   `session:last_activity_ms` — Unix ms of most recent output line
9//!   `session:line_count`     — total output lines emitted this session
10//!   `session:token_estimate` — rough token count (chars / 4, cumulative)
11//!
12//! To avoid a `SetMeta` write on every output line (which can be very frequent),
13//! this module debounces flushes to at most once per second using a local
14//! `Instant`-based timestamp.  Accumulators live in `SessionStatsAccumulator`
15//! which each controller instance owns privately.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20use crate::backend::obj::MetaMapType;
21use crate::backend::storage::wstore::WaveStore;
22
23/// Keys used for session stats in block metadata.
24pub const META_SESSION_START_TS_MS: &str = "session:start_ts_ms";
25pub const META_SESSION_LAST_ACTIVITY_MS: &str = "session:last_activity_ms";
26pub const META_SESSION_LINE_COUNT: &str = "session:line_count";
27pub const META_SESSION_TOKEN_ESTIMATE: &str = "session:token_estimate";
28
29/// Debounce interval: at most one WaveStore write per second.
30const FLUSH_DEBOUNCE: Duration = Duration::from_secs(1);
31
32/// Returns the current Unix timestamp in milliseconds.
33fn now_ms() -> i64 {
34    SystemTime::now()
35        .duration_since(UNIX_EPOCH)
36        .unwrap_or_default()
37        .as_millis() as i64
38}
39
40/// In-memory accumulator for session stats.  One per controller instance.
41///
42/// All fields are plain integers — no locking needed because each controller
43/// calls `record_line` from its single async stdout-reader task only.
44pub struct SessionStatsAccumulator {
45    /// Block ID this accumulator belongs to.
46    block_id: String,
47    /// Unix ms when the first line was seen; 0 = not yet set.
48    start_ts_ms: i64,
49    /// Unix ms of the most-recently flushed line.
50    last_activity_ms: i64,
51    /// Total lines seen since session start.
52    line_count: u64,
53    /// Cumulative token estimate (chars / 4).
54    token_estimate: u64,
55    /// Wall-clock instant of the last flush; `None` = never flushed.
56    last_flush: Option<Instant>,
57}
58
59impl SessionStatsAccumulator {
60    /// Create a new accumulator for `block_id`.
61    pub fn new(block_id: String) -> Self {
62        Self {
63            block_id,
64            start_ts_ms: 0,
65            last_activity_ms: 0,
66            line_count: 0,
67            token_estimate: 0,
68            last_flush: None,
69        }
70    }
71
72    /// Record one output line of `line_len` bytes.
73    ///
74    /// Updates in-memory counters.  Flushes to the WaveStore if the debounce
75    /// interval has elapsed *or* if this is the very first line (so the
76    /// frontend sees `session:start_ts_ms` promptly).
77    pub fn record_line(&mut self, line_len: usize, wstore: &Option<Arc<WaveStore>>) {
78        let ts = now_ms();
79        let is_first = self.start_ts_ms == 0;
80
81        if is_first {
82            self.start_ts_ms = ts;
83        }
84        self.last_activity_ms = ts;
85        self.line_count += 1;
86        self.token_estimate += (line_len / 4) as u64;
87
88        // Flush immediately on first line; otherwise debounce.
89        let should_flush = is_first || match self.last_flush {
90            None => true,
91            Some(last) => last.elapsed() >= FLUSH_DEBOUNCE,
92        };
93
94        if should_flush {
95            if let Some(ref store) = wstore {
96                self.flush(store);
97            }
98        }
99    }
100
101    /// Force-flush all accumulated stats to the WaveStore right now.
102    ///
103    /// Called by `record_line` when the debounce window has elapsed.
104    fn flush(&mut self, wstore: &Arc<WaveStore>) {
105        let oref_str = format!("block:{}", self.block_id);
106        let mut meta_update = MetaMapType::new();
107
108        if self.start_ts_ms != 0 {
109            meta_update.insert(
110                META_SESSION_START_TS_MS.to_string(),
111                serde_json::json!(self.start_ts_ms),
112            );
113        }
114        meta_update.insert(
115            META_SESSION_LAST_ACTIVITY_MS.to_string(),
116            serde_json::json!(self.last_activity_ms),
117        );
118        meta_update.insert(
119            META_SESSION_LINE_COUNT.to_string(),
120            serde_json::json!(self.line_count),
121        );
122        meta_update.insert(
123            META_SESSION_TOKEN_ESTIMATE.to_string(),
124            serde_json::json!(self.token_estimate),
125        );
126
127        match crate::server::service::update_object_meta(wstore, &oref_str, &meta_update) {
128            Ok(()) => {
129                tracing::trace!(
130                    block_id = %self.block_id,
131                    line_count = self.line_count,
132                    token_estimate = self.token_estimate,
133                    "session stats flushed"
134                );
135            }
136            Err(e) => {
137                tracing::warn!(
138                    block_id = %self.block_id,
139                    error = %e,
140                    "failed to flush session stats"
141                );
142            }
143        }
144
145        self.last_flush = Some(Instant::now());
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn test_accumulator_first_line_sets_start_ts() {
155        let mut acc = SessionStatsAccumulator::new("blk-1".to_string());
156        // No wstore — flush is skipped but counters still update.
157        acc.record_line(100, &None);
158        assert_ne!(acc.start_ts_ms, 0);
159        assert_eq!(acc.line_count, 1);
160        assert_eq!(acc.token_estimate, 25); // 100 / 4
161    }
162
163    #[test]
164    fn test_accumulator_multiple_lines() {
165        let mut acc = SessionStatsAccumulator::new("blk-2".to_string());
166        acc.record_line(40, &None);
167        acc.record_line(80, &None);
168        acc.record_line(120, &None);
169        assert_eq!(acc.line_count, 3);
170        // 40/4 + 80/4 + 120/4 = 10 + 20 + 30 = 60
171        assert_eq!(acc.token_estimate, 60);
172    }
173
174    #[test]
175    fn test_accumulator_start_ts_not_reset_on_second_line() {
176        let mut acc = SessionStatsAccumulator::new("blk-3".to_string());
177        acc.record_line(10, &None);
178        let first_ts = acc.start_ts_ms;
179        acc.record_line(10, &None);
180        assert_eq!(acc.start_ts_ms, first_ts, "start_ts must not change after first line");
181    }
182
183    #[test]
184    fn test_debounce_constants() {
185        assert_eq!(FLUSH_DEBOUNCE, Duration::from_secs(1));
186    }
187}