agentmux_srv\backend/
subagent_watcher.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Subagent watcher: monitors Claude Code session directories for subagent
5//! JSONL files and broadcasts activity events to WebSocket clients.
6//!
7//! Claude Code spawns "subagents" via the Task tool. Each subagent writes its
8//! conversation to a JSONL file under:
9//!   `<claude-config>/projects/<encoded-workspace>/subagents/agent-<id>.jsonl`
10//!
11//! This module watches those directories and emits:
12//!   - `subagent:spawned`   — new subagent JSONL file detected
13//!   - `subagent:activity`  — new events appended to a subagent file
14//!   - `subagent:completed` — subagent finished (result event seen)
15
16use std::collections::HashMap;
17use std::io::{BufRead, BufReader, Seek, SeekFrom};
18use std::path::{Path, PathBuf};
19use std::sync::{Arc, Mutex};
20use std::time::Duration;
21
22use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use tokio::sync::mpsc;
26
27use super::eventbus::{EventBus, WSEventType, WS_EVENT_RPC};
28
29// ── Public types ──────────────────────────────────────────────────────────
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubagentInfo {
33    pub agent_id: String,
34    pub slug: String,
35    pub jsonl_path: String,
36    pub parent_agent: String,
37    pub session_id: String,
38    pub last_event_at: u64,
39    pub status: SubagentStatus,
40    pub event_count: usize,
41    pub model: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45#[serde(rename_all = "lowercase")]
46pub enum SubagentStatus {
47    Active,
48    Completed,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SubagentEvent {
53    pub agent_id: String,
54    pub event_type: SubagentEventType,
55    pub timestamp: u64,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum SubagentEventType {
61    Text { content: String },
62    ToolUse { name: String, input_summary: String },
63    ToolResult { is_error: bool, preview: String },
64    Progress { output: String },
65}
66
67// ── Internal state ────────────────────────────────────────────────────────
68
69struct SessionWatch {
70    subagents: HashMap<String, SubagentState>,
71}
72
73struct SubagentState {
74    info: SubagentInfo,
75    file_offset: u64,
76    events: Vec<SubagentEvent>,
77}
78
79#[allow(dead_code)]
80struct WatchedAgent {
81    agent_id: String,
82    config_dir: PathBuf,
83    _watcher: RecommendedWatcher,
84}
85
86// ── SubagentWatcher ───────────────────────────────────────────────────────
87
88pub struct SubagentWatcher {
89    event_bus: Arc<EventBus>,
90    sessions: Mutex<HashMap<String, SessionWatch>>,
91    watched_agents: Mutex<Vec<WatchedAgent>>,
92}
93
94impl SubagentWatcher {
95    pub fn new(event_bus: Arc<EventBus>) -> Self {
96        Self {
97            event_bus,
98            sessions: Mutex::new(HashMap::new()),
99            watched_agents: Mutex::new(Vec::new()),
100        }
101    }
102
103    /// Create a new SubagentWatcher and return it wrapped in Arc.
104    pub fn spawn(event_bus: Arc<EventBus>) -> Arc<Self> {
105        let watcher = Arc::new(Self::new(event_bus));
106        tracing::info!("subagent watcher initialized");
107        watcher
108    }
109
110    /// Start watching a Claude Code agent's session directory for subagent files.
111    /// Spawns a background tokio task for debounced file event processing.
112    pub fn watch_agent(self: &Arc<Self>, agent_id: &str, config_dir: PathBuf) {
113        // Derive the projects directory where Claude stores session data
114        let projects_dir = config_dir.join("projects");
115        if !projects_dir.exists() {
116            tracing::debug!(
117                agent = %agent_id,
118                dir = %projects_dir.display(),
119                "projects dir does not exist yet, will watch when created"
120            );
121        }
122
123        // Check if already watching this agent
124        {
125            let watched = self.watched_agents.lock().unwrap();
126            if watched.iter().any(|w| w.agent_id == agent_id) {
127                tracing::debug!(agent = %agent_id, "already watching this agent");
128                return;
129            }
130        }
131
132        let (tx, mut rx) = mpsc::unbounded_channel::<PathBuf>();
133
134        // Set up filesystem watcher
135        let tx_clone = tx.clone();
136        let watched_dir = if projects_dir.exists() {
137            projects_dir.clone()
138        } else {
139            // Watch parent (config_dir) until projects/ appears
140            config_dir.clone()
141        };
142
143        let mut watcher = match notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
144            match res {
145                Ok(event) => {
146                    let dominated = matches!(
147                        event.kind,
148                        EventKind::Modify(_) | EventKind::Create(_)
149                    );
150                    if dominated {
151                        for path in event.paths {
152                            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
153                                if name.starts_with("agent-") && name.ends_with(".jsonl") {
154                                    let _ = tx_clone.send(path);
155                                }
156                            }
157                        }
158                    }
159                }
160                Err(e) => {
161                    tracing::warn!(error = %e, "subagent filesystem watcher error");
162                }
163            }
164        }) {
165            Ok(w) => w,
166            Err(e) => {
167                tracing::warn!(
168                    agent = %agent_id,
169                    error = %e,
170                    "failed to create subagent file watcher"
171                );
172                return;
173            }
174        };
175
176        if let Err(e) = watcher.watch(&watched_dir, RecursiveMode::Recursive) {
177            tracing::warn!(
178                agent = %agent_id,
179                dir = %watched_dir.display(),
180                error = %e,
181                "failed to watch directory for subagents"
182            );
183            return;
184        }
185
186        tracing::info!(
187            agent = %agent_id,
188            dir = %watched_dir.display(),
189            "watching for subagent JSONL files"
190        );
191
192        // Store the watcher handle to keep it alive
193        {
194            let mut watched = self.watched_agents.lock().unwrap();
195            watched.push(WatchedAgent {
196                agent_id: agent_id.to_string(),
197                config_dir: config_dir.clone(),
198                _watcher: watcher,
199            });
200        }
201
202        // Scan for any existing subagent files
203        self.scan_existing_subagents(agent_id, &projects_dir);
204
205        // Spawn async task to process file change notifications
206        let self_clone = Arc::clone(self);
207        let parent_agent = agent_id.to_string();
208        tokio::spawn(async move {
209            loop {
210                let path = match rx.recv().await {
211                    Some(p) => p,
212                    None => {
213                        tracing::info!(
214                            agent = %parent_agent,
215                            "subagent watcher channel closed"
216                        );
217                        break;
218                    }
219                };
220
221                // Debounce: drain additional events within 200ms
222                tokio::time::sleep(Duration::from_millis(200)).await;
223                let mut paths = vec![path];
224                while let Ok(p) = rx.try_recv() {
225                    if !paths.contains(&p) {
226                        paths.push(p);
227                    }
228                }
229
230                for changed_path in paths {
231                    self_clone.process_jsonl_change(&parent_agent, &changed_path);
232                }
233            }
234        });
235    }
236
237    /// List all subagents across all sessions (sync — safe to call from RPC dispatch).
238    pub fn list_active(&self) -> Vec<SubagentInfo> {
239        let sessions = self.sessions.lock().unwrap();
240        let mut result = Vec::new();
241        for session in sessions.values() {
242            for state in session.subagents.values() {
243                result.push(state.info.clone());
244            }
245        }
246        result.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
247        result
248    }
249
250    /// Get recent events for a specific subagent (sync — safe to call from RPC dispatch).
251    pub fn get_history(&self, agent_id: &str, limit: usize) -> Vec<SubagentEvent> {
252        let sessions = self.sessions.lock().unwrap();
253        for session in sessions.values() {
254            if let Some(state) = session.subagents.get(agent_id) {
255                let events = &state.events;
256                let start = events.len().saturating_sub(limit);
257                return events[start..].to_vec();
258            }
259        }
260        Vec::new()
261    }
262
263    // ── Internal methods ──────────────────────────────────────────────────
264
265    /// Scan for existing subagent JSONL files in a projects directory.
266    fn scan_existing_subagents(&self, parent_agent: &str, projects_dir: &Path) {
267        if !projects_dir.exists() {
268            return;
269        }
270
271        let walker = match std::fs::read_dir(projects_dir) {
272            Ok(w) => w,
273            Err(_) => return,
274        };
275
276        for entry in walker.flatten() {
277            let subagents_dir = entry.path().join("subagents");
278            if subagents_dir.is_dir() {
279                if let Ok(files) = std::fs::read_dir(&subagents_dir) {
280                    for file in files.flatten() {
281                        let path = file.path();
282                        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
283                            if name.starts_with("agent-") && name.ends_with(".jsonl") {
284                                self.process_jsonl_change(parent_agent, &path);
285                            }
286                        }
287                    }
288                }
289            }
290        }
291    }
292
293    /// Process a changed/new JSONL subagent file. Reads new lines, updates state,
294    /// and broadcasts events via EventBus.
295    fn process_jsonl_change(&self, parent_agent: &str, jsonl_path: &Path) {
296        // Extract agent ID from filename: agent-<id>.jsonl
297        let agent_id = match jsonl_path
298            .file_stem()
299            .and_then(|s| s.to_str())
300            .and_then(|s| s.strip_prefix("agent-"))
301        {
302            Some(id) => id.to_string(),
303            None => return,
304        };
305
306        // Derive session_id from the parent directory structure
307        let session_id = jsonl_path
308            .parent() // subagents/
309            .and_then(|p| p.parent()) // project-encoded-dir/
310            .and_then(|p| p.file_name())
311            .and_then(|n| n.to_str())
312            .unwrap_or("unknown")
313            .to_string();
314
315        // Read the current offset before locking (so file I/O is outside the lock)
316        let current_offset = {
317            let sessions = self.sessions.lock().unwrap();
318            sessions
319                .get(&session_id)
320                .and_then(|s| s.subagents.get(&agent_id))
321                .map(|s| s.file_offset)
322                .unwrap_or(0)
323        };
324
325        // Do file I/O outside the mutex lock
326        let (new_events, new_offset, meta) = match read_jsonl_from_offset(jsonl_path, current_offset) {
327            Ok(result) => result,
328            Err(e) => {
329                tracing::debug!(
330                    agent_id = %agent_id,
331                    error = %e,
332                    "failed to read subagent JSONL"
333                );
334                return;
335            }
336        };
337
338        // Now lock and update state
339        let (is_new, info_snapshot, completed) = {
340            let mut sessions = self.sessions.lock().unwrap();
341            let session = sessions
342                .entry(session_id.clone())
343                .or_insert_with(|| SessionWatch {
344                    subagents: HashMap::new(),
345                });
346
347            let is_new = !session.subagents.contains_key(&agent_id);
348            let state = session.subagents.entry(agent_id.clone()).or_insert_with(|| {
349                SubagentState {
350                    info: SubagentInfo {
351                        agent_id: agent_id.clone(),
352                        slug: String::new(),
353                        jsonl_path: jsonl_path.to_string_lossy().to_string(),
354                        parent_agent: parent_agent.to_string(),
355                        session_id: session_id.clone(),
356                        last_event_at: now_millis(),
357                        status: SubagentStatus::Active,
358                        event_count: 0,
359                        model: None,
360                    },
361                    file_offset: 0,
362                    events: Vec::new(),
363                }
364            });
365
366            state.file_offset = new_offset;
367
368            // Update metadata from first line if we got it
369            if let Some(m) = meta {
370                if !m.slug.is_empty() {
371                    state.info.slug = m.slug;
372                }
373                if let Some(model) = m.model {
374                    state.info.model = Some(model);
375                }
376            }
377
378            if new_events.is_empty() && !is_new {
379                return;
380            }
381
382            // Process events
383            let mut completed = false;
384            for event in &new_events {
385                state.info.event_count += 1;
386                state.info.last_event_at = event.timestamp;
387                state.events.push(event.clone());
388            }
389
390            // Check last event for result type (completion)
391            if let Some(last) = new_events.last() {
392                if matches!(&last.event_type, SubagentEventType::Text { content } if content == "Subagent completed") {
393                    completed = true;
394                    state.info.status = SubagentStatus::Completed;
395                }
396            }
397
398            let info_snapshot = state.info.clone();
399            (is_new, info_snapshot, completed)
400        };
401        // Mutex released here — broadcast outside the lock
402
403        if is_new {
404            let spawned_event = WSEventType {
405                eventtype: WS_EVENT_RPC.to_string(),
406                oref: String::new(),
407                data: Some(json!({
408                    "command": "eventrecv",
409                    "data": {
410                        "event": "subagent:spawned",
411                        "data": {
412                            "agentId": info_snapshot.agent_id,
413                            "slug": info_snapshot.slug,
414                            "parentAgent": parent_agent,
415                            "sessionId": session_id,
416                            "model": info_snapshot.model,
417                        }
418                    }
419                })),
420            };
421            self.event_bus.broadcast_event(&spawned_event);
422            tracing::info!(
423                agent_id = %agent_id,
424                slug = %info_snapshot.slug,
425                parent = %parent_agent,
426                "subagent spawned"
427            );
428        }
429
430        if !new_events.is_empty() {
431            let activity_event = WSEventType {
432                eventtype: WS_EVENT_RPC.to_string(),
433                oref: String::new(),
434                data: Some(json!({
435                    "command": "eventrecv",
436                    "data": {
437                        "event": "subagent:activity",
438                        "data": {
439                            "agentId": agent_id,
440                            "parentAgent": parent_agent,
441                            "newEvents": new_events.len(),
442                            "totalEvents": info_snapshot.event_count,
443                            "events": new_events,
444                        }
445                    }
446                })),
447            };
448            self.event_bus.broadcast_event(&activity_event);
449        }
450
451        if completed {
452            let completed_event = WSEventType {
453                eventtype: WS_EVENT_RPC.to_string(),
454                oref: String::new(),
455                data: Some(json!({
456                    "command": "eventrecv",
457                    "data": {
458                        "event": "subagent:completed",
459                        "data": {
460                            "agentId": agent_id,
461                            "parentAgent": parent_agent,
462                            "totalEvents": info_snapshot.event_count,
463                        }
464                    }
465                })),
466            };
467            self.event_bus.broadcast_event(&completed_event);
468            tracing::info!(
469                agent_id = %agent_id,
470                total_events = info_snapshot.event_count,
471                "subagent completed"
472            );
473        }
474    }
475}
476
477// ── JSONL parsing ─────────────────────────────────────────────────────────
478
479/// Metadata extracted from the first JSONL line (the subagent init record).
480struct JsonlMeta {
481    slug: String,
482    model: Option<String>,
483}
484
485/// Read a JSONL file from a byte offset, parsing new subagent events.
486/// Returns (events, new_offset, optional_meta).
487fn read_jsonl_from_offset(
488    path: &Path,
489    offset: u64,
490) -> Result<(Vec<SubagentEvent>, u64, Option<JsonlMeta>), String> {
491    let file = std::fs::File::open(path).map_err(|e| format!("open: {e}"))?;
492    let file_len = file.metadata().map_err(|e| format!("metadata: {e}"))?.len();
493
494    if file_len <= offset {
495        return Ok((Vec::new(), offset, None));
496    }
497
498    let mut reader = BufReader::new(file);
499    reader
500        .seek(SeekFrom::Start(offset))
501        .map_err(|e| format!("seek: {e}"))?;
502
503    let mut events = Vec::new();
504    let mut meta = None;
505    let mut current_offset = offset;
506
507    for line_result in reader.lines() {
508        let line = match line_result {
509            Ok(l) => l,
510            Err(_) => break,
511        };
512        current_offset += line.len() as u64 + 1; // +1 for newline
513
514        if line.trim().is_empty() {
515            continue;
516        }
517
518        let value: serde_json::Value = match serde_json::from_str(&line) {
519            Ok(v) => v,
520            Err(_) => continue,
521        };
522
523        // Extract metadata from init/config lines
524        if offset == 0 && meta.is_none() {
525            if let Some(slug) = value.get("slug").and_then(|v| v.as_str()) {
526                meta = Some(JsonlMeta {
527                    slug: slug.to_string(),
528                    model: value
529                        .get("model")
530                        .and_then(|v| v.as_str())
531                        .map(|s| s.to_string()),
532                });
533            }
534            if meta.is_none() {
535                if let Some(agent_id) = value.get("agentId").and_then(|v| v.as_str()) {
536                    meta = Some(JsonlMeta {
537                        slug: value
538                            .get("slug")
539                            .and_then(|v| v.as_str())
540                            .unwrap_or(agent_id)
541                            .to_string(),
542                        model: value
543                            .get("model")
544                            .and_then(|v| v.as_str())
545                            .map(|s| s.to_string()),
546                    });
547                }
548            }
549        }
550
551        let timestamp = value
552            .get("timestamp")
553            .and_then(|v| v.as_u64())
554            .unwrap_or_else(now_millis);
555
556        let event_type = parse_event_type(&value);
557        if let Some(et) = event_type {
558            let line_agent_id = value
559                .get("agentId")
560                .and_then(|v| v.as_str())
561                .unwrap_or("")
562                .to_string();
563
564            events.push(SubagentEvent {
565                agent_id: line_agent_id,
566                event_type: et,
567                timestamp,
568            });
569        }
570    }
571
572    Ok((events, current_offset, meta))
573}
574
575/// Parse a JSONL line into a SubagentEventType based on the `type` field.
576fn parse_event_type(value: &serde_json::Value) -> Option<SubagentEventType> {
577    let event_type = value.get("type").and_then(|v| v.as_str())?;
578
579    match event_type {
580        "assistant" => {
581            let content = value
582                .get("message")
583                .and_then(|m| m.get("content"))
584                .and_then(|c| {
585                    if let Some(arr) = c.as_array() {
586                        let texts: Vec<&str> = arr
587                            .iter()
588                            .filter_map(|block| {
589                                if block.get("type").and_then(|t| t.as_str()) == Some("text") {
590                                    block.get("text").and_then(|t| t.as_str())
591                                } else {
592                                    None
593                                }
594                            })
595                            .collect();
596                        if texts.is_empty() {
597                            None
598                        } else {
599                            Some(texts.join("\n"))
600                        }
601                    } else {
602                        c.as_str().map(|s| s.to_string())
603                    }
604                })
605                .unwrap_or_default();
606            Some(SubagentEventType::Text { content })
607        }
608        "tool_use" => {
609            let name = value
610                .get("name")
611                .or_else(|| value.get("tool_name"))
612                .and_then(|v| v.as_str())
613                .unwrap_or("unknown")
614                .to_string();
615            let input_summary = value
616                .get("input")
617                .map(|v| {
618                    let s = v.to_string();
619                    if s.len() > 200 {
620                        let end = s.char_indices().nth(200).map_or(s.len(), |(i, _)| i);
621                        format!("{}...", &s[..end])
622                    } else {
623                        s
624                    }
625                })
626                .unwrap_or_default();
627            Some(SubagentEventType::ToolUse {
628                name,
629                input_summary,
630            })
631        }
632        "tool_result" => {
633            let is_error = value
634                .get("is_error")
635                .and_then(|v| v.as_bool())
636                .unwrap_or(false);
637            let preview = value
638                .get("content")
639                .or_else(|| value.get("output"))
640                .map(|v| {
641                    let s = if let Some(s) = v.as_str() {
642                        s.to_string()
643                    } else {
644                        v.to_string()
645                    };
646                    if s.len() > 500 {
647                        let end = s.char_indices().nth(500).map_or(s.len(), |(i, _)| i);
648                        format!("{}...", &s[..end])
649                    } else {
650                        s
651                    }
652                })
653                .unwrap_or_default();
654            Some(SubagentEventType::ToolResult { is_error, preview })
655        }
656        "progress" => {
657            let output = value
658                .get("output")
659                .or_else(|| value.get("content"))
660                .and_then(|v| v.as_str())
661                .unwrap_or("")
662                .to_string();
663            Some(SubagentEventType::Progress { output })
664        }
665        "result" => {
666            let content = value
667                .get("result")
668                .or_else(|| value.get("content"))
669                .map(|v| {
670                    if let Some(s) = v.as_str() {
671                        s.to_string()
672                    } else {
673                        v.to_string()
674                    }
675                })
676                .unwrap_or_else(|| "Subagent completed".to_string());
677            Some(SubagentEventType::Text { content })
678        }
679        _ => None,
680    }
681}
682
683fn now_millis() -> u64 {
684    std::time::SystemTime::now()
685        .duration_since(std::time::UNIX_EPOCH)
686        .map(|d| d.as_millis() as u64)
687        .unwrap_or(0)
688}
689
690// ── Utility: encode workspace path like Claude Code does ──────────────────
691
692/// Encode a workspace path the same way Claude Code does for its projects dir.
693#[allow(dead_code)]
694pub fn encode_workspace_path(workspace_path: &str) -> String {
695    workspace_path
696        .replace('\\', "-")
697        .replace('/', "-")
698        .replace(':', "")
699}
700
701/// Derive the Claude Code config directory for a host agent.
702pub fn derive_claude_config_dir(agent_id: &str) -> Option<PathBuf> {
703    let home = dirs::home_dir()?;
704    let config_dir = home
705        .join(".config")
706        .join(format!("claude-{}", agent_id.to_lowercase()));
707    Some(config_dir)
708}