1use std::collections::HashMap;
17use std::io::{BufRead, BufReader, Seek, SeekFrom};
18use std::path::{Path, PathBuf};
19use std::sync::{Arc, Mutex};
20use std::time::Duration;
21
22use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use tokio::sync::mpsc;
26
27use super::eventbus::{EventBus, WSEventType, WS_EVENT_RPC};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubagentInfo {
33 pub agent_id: String,
34 pub slug: String,
35 pub jsonl_path: String,
36 pub parent_agent: String,
37 pub session_id: String,
38 pub last_event_at: u64,
39 pub status: SubagentStatus,
40 pub event_count: usize,
41 pub model: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45#[serde(rename_all = "lowercase")]
46pub enum SubagentStatus {
47 Active,
48 Completed,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SubagentEvent {
53 pub agent_id: String,
54 pub event_type: SubagentEventType,
55 pub timestamp: u64,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum SubagentEventType {
61 Text { content: String },
62 ToolUse { name: String, input_summary: String },
63 ToolResult { is_error: bool, preview: String },
64 Progress { output: String },
65}
66
67struct SessionWatch {
70 subagents: HashMap<String, SubagentState>,
71}
72
73struct SubagentState {
74 info: SubagentInfo,
75 file_offset: u64,
76 events: Vec<SubagentEvent>,
77}
78
79#[allow(dead_code)]
80struct WatchedAgent {
81 agent_id: String,
82 config_dir: PathBuf,
83 _watcher: RecommendedWatcher,
84}
85
86pub struct SubagentWatcher {
89 event_bus: Arc<EventBus>,
90 sessions: Mutex<HashMap<String, SessionWatch>>,
91 watched_agents: Mutex<Vec<WatchedAgent>>,
92}
93
94impl SubagentWatcher {
95 pub fn new(event_bus: Arc<EventBus>) -> Self {
96 Self {
97 event_bus,
98 sessions: Mutex::new(HashMap::new()),
99 watched_agents: Mutex::new(Vec::new()),
100 }
101 }
102
103 pub fn spawn(event_bus: Arc<EventBus>) -> Arc<Self> {
105 let watcher = Arc::new(Self::new(event_bus));
106 tracing::info!("subagent watcher initialized");
107 watcher
108 }
109
110 pub fn watch_agent(self: &Arc<Self>, agent_id: &str, config_dir: PathBuf) {
113 let projects_dir = config_dir.join("projects");
115 if !projects_dir.exists() {
116 tracing::debug!(
117 agent = %agent_id,
118 dir = %projects_dir.display(),
119 "projects dir does not exist yet, will watch when created"
120 );
121 }
122
123 {
125 let watched = self.watched_agents.lock().unwrap();
126 if watched.iter().any(|w| w.agent_id == agent_id) {
127 tracing::debug!(agent = %agent_id, "already watching this agent");
128 return;
129 }
130 }
131
132 let (tx, mut rx) = mpsc::unbounded_channel::<PathBuf>();
133
134 let tx_clone = tx.clone();
136 let watched_dir = if projects_dir.exists() {
137 projects_dir.clone()
138 } else {
139 config_dir.clone()
141 };
142
143 let mut watcher = match notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
144 match res {
145 Ok(event) => {
146 let dominated = matches!(
147 event.kind,
148 EventKind::Modify(_) | EventKind::Create(_)
149 );
150 if dominated {
151 for path in event.paths {
152 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
153 if name.starts_with("agent-") && name.ends_with(".jsonl") {
154 let _ = tx_clone.send(path);
155 }
156 }
157 }
158 }
159 }
160 Err(e) => {
161 tracing::warn!(error = %e, "subagent filesystem watcher error");
162 }
163 }
164 }) {
165 Ok(w) => w,
166 Err(e) => {
167 tracing::warn!(
168 agent = %agent_id,
169 error = %e,
170 "failed to create subagent file watcher"
171 );
172 return;
173 }
174 };
175
176 if let Err(e) = watcher.watch(&watched_dir, RecursiveMode::Recursive) {
177 tracing::warn!(
178 agent = %agent_id,
179 dir = %watched_dir.display(),
180 error = %e,
181 "failed to watch directory for subagents"
182 );
183 return;
184 }
185
186 tracing::info!(
187 agent = %agent_id,
188 dir = %watched_dir.display(),
189 "watching for subagent JSONL files"
190 );
191
192 {
194 let mut watched = self.watched_agents.lock().unwrap();
195 watched.push(WatchedAgent {
196 agent_id: agent_id.to_string(),
197 config_dir: config_dir.clone(),
198 _watcher: watcher,
199 });
200 }
201
202 self.scan_existing_subagents(agent_id, &projects_dir);
204
205 let self_clone = Arc::clone(self);
207 let parent_agent = agent_id.to_string();
208 tokio::spawn(async move {
209 loop {
210 let path = match rx.recv().await {
211 Some(p) => p,
212 None => {
213 tracing::info!(
214 agent = %parent_agent,
215 "subagent watcher channel closed"
216 );
217 break;
218 }
219 };
220
221 tokio::time::sleep(Duration::from_millis(200)).await;
223 let mut paths = vec![path];
224 while let Ok(p) = rx.try_recv() {
225 if !paths.contains(&p) {
226 paths.push(p);
227 }
228 }
229
230 for changed_path in paths {
231 self_clone.process_jsonl_change(&parent_agent, &changed_path);
232 }
233 }
234 });
235 }
236
237 pub fn list_active(&self) -> Vec<SubagentInfo> {
239 let sessions = self.sessions.lock().unwrap();
240 let mut result = Vec::new();
241 for session in sessions.values() {
242 for state in session.subagents.values() {
243 result.push(state.info.clone());
244 }
245 }
246 result.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
247 result
248 }
249
250 pub fn get_history(&self, agent_id: &str, limit: usize) -> Vec<SubagentEvent> {
252 let sessions = self.sessions.lock().unwrap();
253 for session in sessions.values() {
254 if let Some(state) = session.subagents.get(agent_id) {
255 let events = &state.events;
256 let start = events.len().saturating_sub(limit);
257 return events[start..].to_vec();
258 }
259 }
260 Vec::new()
261 }
262
263 fn scan_existing_subagents(&self, parent_agent: &str, projects_dir: &Path) {
267 if !projects_dir.exists() {
268 return;
269 }
270
271 let walker = match std::fs::read_dir(projects_dir) {
272 Ok(w) => w,
273 Err(_) => return,
274 };
275
276 for entry in walker.flatten() {
277 let subagents_dir = entry.path().join("subagents");
278 if subagents_dir.is_dir() {
279 if let Ok(files) = std::fs::read_dir(&subagents_dir) {
280 for file in files.flatten() {
281 let path = file.path();
282 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
283 if name.starts_with("agent-") && name.ends_with(".jsonl") {
284 self.process_jsonl_change(parent_agent, &path);
285 }
286 }
287 }
288 }
289 }
290 }
291 }
292
293 fn process_jsonl_change(&self, parent_agent: &str, jsonl_path: &Path) {
296 let agent_id = match jsonl_path
298 .file_stem()
299 .and_then(|s| s.to_str())
300 .and_then(|s| s.strip_prefix("agent-"))
301 {
302 Some(id) => id.to_string(),
303 None => return,
304 };
305
306 let session_id = jsonl_path
308 .parent() .and_then(|p| p.parent()) .and_then(|p| p.file_name())
311 .and_then(|n| n.to_str())
312 .unwrap_or("unknown")
313 .to_string();
314
315 let current_offset = {
317 let sessions = self.sessions.lock().unwrap();
318 sessions
319 .get(&session_id)
320 .and_then(|s| s.subagents.get(&agent_id))
321 .map(|s| s.file_offset)
322 .unwrap_or(0)
323 };
324
325 let (new_events, new_offset, meta) = match read_jsonl_from_offset(jsonl_path, current_offset) {
327 Ok(result) => result,
328 Err(e) => {
329 tracing::debug!(
330 agent_id = %agent_id,
331 error = %e,
332 "failed to read subagent JSONL"
333 );
334 return;
335 }
336 };
337
338 let (is_new, info_snapshot, completed) = {
340 let mut sessions = self.sessions.lock().unwrap();
341 let session = sessions
342 .entry(session_id.clone())
343 .or_insert_with(|| SessionWatch {
344 subagents: HashMap::new(),
345 });
346
347 let is_new = !session.subagents.contains_key(&agent_id);
348 let state = session.subagents.entry(agent_id.clone()).or_insert_with(|| {
349 SubagentState {
350 info: SubagentInfo {
351 agent_id: agent_id.clone(),
352 slug: String::new(),
353 jsonl_path: jsonl_path.to_string_lossy().to_string(),
354 parent_agent: parent_agent.to_string(),
355 session_id: session_id.clone(),
356 last_event_at: now_millis(),
357 status: SubagentStatus::Active,
358 event_count: 0,
359 model: None,
360 },
361 file_offset: 0,
362 events: Vec::new(),
363 }
364 });
365
366 state.file_offset = new_offset;
367
368 if let Some(m) = meta {
370 if !m.slug.is_empty() {
371 state.info.slug = m.slug;
372 }
373 if let Some(model) = m.model {
374 state.info.model = Some(model);
375 }
376 }
377
378 if new_events.is_empty() && !is_new {
379 return;
380 }
381
382 let mut completed = false;
384 for event in &new_events {
385 state.info.event_count += 1;
386 state.info.last_event_at = event.timestamp;
387 state.events.push(event.clone());
388 }
389
390 if let Some(last) = new_events.last() {
392 if matches!(&last.event_type, SubagentEventType::Text { content } if content == "Subagent completed") {
393 completed = true;
394 state.info.status = SubagentStatus::Completed;
395 }
396 }
397
398 let info_snapshot = state.info.clone();
399 (is_new, info_snapshot, completed)
400 };
401 if is_new {
404 let spawned_event = WSEventType {
405 eventtype: WS_EVENT_RPC.to_string(),
406 oref: String::new(),
407 data: Some(json!({
408 "command": "eventrecv",
409 "data": {
410 "event": "subagent:spawned",
411 "data": {
412 "agentId": info_snapshot.agent_id,
413 "slug": info_snapshot.slug,
414 "parentAgent": parent_agent,
415 "sessionId": session_id,
416 "model": info_snapshot.model,
417 }
418 }
419 })),
420 };
421 self.event_bus.broadcast_event(&spawned_event);
422 tracing::info!(
423 agent_id = %agent_id,
424 slug = %info_snapshot.slug,
425 parent = %parent_agent,
426 "subagent spawned"
427 );
428 }
429
430 if !new_events.is_empty() {
431 let activity_event = WSEventType {
432 eventtype: WS_EVENT_RPC.to_string(),
433 oref: String::new(),
434 data: Some(json!({
435 "command": "eventrecv",
436 "data": {
437 "event": "subagent:activity",
438 "data": {
439 "agentId": agent_id,
440 "parentAgent": parent_agent,
441 "newEvents": new_events.len(),
442 "totalEvents": info_snapshot.event_count,
443 "events": new_events,
444 }
445 }
446 })),
447 };
448 self.event_bus.broadcast_event(&activity_event);
449 }
450
451 if completed {
452 let completed_event = WSEventType {
453 eventtype: WS_EVENT_RPC.to_string(),
454 oref: String::new(),
455 data: Some(json!({
456 "command": "eventrecv",
457 "data": {
458 "event": "subagent:completed",
459 "data": {
460 "agentId": agent_id,
461 "parentAgent": parent_agent,
462 "totalEvents": info_snapshot.event_count,
463 }
464 }
465 })),
466 };
467 self.event_bus.broadcast_event(&completed_event);
468 tracing::info!(
469 agent_id = %agent_id,
470 total_events = info_snapshot.event_count,
471 "subagent completed"
472 );
473 }
474 }
475}
476
477struct JsonlMeta {
481 slug: String,
482 model: Option<String>,
483}
484
485fn read_jsonl_from_offset(
488 path: &Path,
489 offset: u64,
490) -> Result<(Vec<SubagentEvent>, u64, Option<JsonlMeta>), String> {
491 let file = std::fs::File::open(path).map_err(|e| format!("open: {e}"))?;
492 let file_len = file.metadata().map_err(|e| format!("metadata: {e}"))?.len();
493
494 if file_len <= offset {
495 return Ok((Vec::new(), offset, None));
496 }
497
498 let mut reader = BufReader::new(file);
499 reader
500 .seek(SeekFrom::Start(offset))
501 .map_err(|e| format!("seek: {e}"))?;
502
503 let mut events = Vec::new();
504 let mut meta = None;
505 let mut current_offset = offset;
506
507 for line_result in reader.lines() {
508 let line = match line_result {
509 Ok(l) => l,
510 Err(_) => break,
511 };
512 current_offset += line.len() as u64 + 1; if line.trim().is_empty() {
515 continue;
516 }
517
518 let value: serde_json::Value = match serde_json::from_str(&line) {
519 Ok(v) => v,
520 Err(_) => continue,
521 };
522
523 if offset == 0 && meta.is_none() {
525 if let Some(slug) = value.get("slug").and_then(|v| v.as_str()) {
526 meta = Some(JsonlMeta {
527 slug: slug.to_string(),
528 model: value
529 .get("model")
530 .and_then(|v| v.as_str())
531 .map(|s| s.to_string()),
532 });
533 }
534 if meta.is_none() {
535 if let Some(agent_id) = value.get("agentId").and_then(|v| v.as_str()) {
536 meta = Some(JsonlMeta {
537 slug: value
538 .get("slug")
539 .and_then(|v| v.as_str())
540 .unwrap_or(agent_id)
541 .to_string(),
542 model: value
543 .get("model")
544 .and_then(|v| v.as_str())
545 .map(|s| s.to_string()),
546 });
547 }
548 }
549 }
550
551 let timestamp = value
552 .get("timestamp")
553 .and_then(|v| v.as_u64())
554 .unwrap_or_else(now_millis);
555
556 let event_type = parse_event_type(&value);
557 if let Some(et) = event_type {
558 let line_agent_id = value
559 .get("agentId")
560 .and_then(|v| v.as_str())
561 .unwrap_or("")
562 .to_string();
563
564 events.push(SubagentEvent {
565 agent_id: line_agent_id,
566 event_type: et,
567 timestamp,
568 });
569 }
570 }
571
572 Ok((events, current_offset, meta))
573}
574
575fn parse_event_type(value: &serde_json::Value) -> Option<SubagentEventType> {
577 let event_type = value.get("type").and_then(|v| v.as_str())?;
578
579 match event_type {
580 "assistant" => {
581 let content = value
582 .get("message")
583 .and_then(|m| m.get("content"))
584 .and_then(|c| {
585 if let Some(arr) = c.as_array() {
586 let texts: Vec<&str> = arr
587 .iter()
588 .filter_map(|block| {
589 if block.get("type").and_then(|t| t.as_str()) == Some("text") {
590 block.get("text").and_then(|t| t.as_str())
591 } else {
592 None
593 }
594 })
595 .collect();
596 if texts.is_empty() {
597 None
598 } else {
599 Some(texts.join("\n"))
600 }
601 } else {
602 c.as_str().map(|s| s.to_string())
603 }
604 })
605 .unwrap_or_default();
606 Some(SubagentEventType::Text { content })
607 }
608 "tool_use" => {
609 let name = value
610 .get("name")
611 .or_else(|| value.get("tool_name"))
612 .and_then(|v| v.as_str())
613 .unwrap_or("unknown")
614 .to_string();
615 let input_summary = value
616 .get("input")
617 .map(|v| {
618 let s = v.to_string();
619 if s.len() > 200 {
620 let end = s.char_indices().nth(200).map_or(s.len(), |(i, _)| i);
621 format!("{}...", &s[..end])
622 } else {
623 s
624 }
625 })
626 .unwrap_or_default();
627 Some(SubagentEventType::ToolUse {
628 name,
629 input_summary,
630 })
631 }
632 "tool_result" => {
633 let is_error = value
634 .get("is_error")
635 .and_then(|v| v.as_bool())
636 .unwrap_or(false);
637 let preview = value
638 .get("content")
639 .or_else(|| value.get("output"))
640 .map(|v| {
641 let s = if let Some(s) = v.as_str() {
642 s.to_string()
643 } else {
644 v.to_string()
645 };
646 if s.len() > 500 {
647 let end = s.char_indices().nth(500).map_or(s.len(), |(i, _)| i);
648 format!("{}...", &s[..end])
649 } else {
650 s
651 }
652 })
653 .unwrap_or_default();
654 Some(SubagentEventType::ToolResult { is_error, preview })
655 }
656 "progress" => {
657 let output = value
658 .get("output")
659 .or_else(|| value.get("content"))
660 .and_then(|v| v.as_str())
661 .unwrap_or("")
662 .to_string();
663 Some(SubagentEventType::Progress { output })
664 }
665 "result" => {
666 let content = value
667 .get("result")
668 .or_else(|| value.get("content"))
669 .map(|v| {
670 if let Some(s) = v.as_str() {
671 s.to_string()
672 } else {
673 v.to_string()
674 }
675 })
676 .unwrap_or_else(|| "Subagent completed".to_string());
677 Some(SubagentEventType::Text { content })
678 }
679 _ => None,
680 }
681}
682
683fn now_millis() -> u64 {
684 std::time::SystemTime::now()
685 .duration_since(std::time::UNIX_EPOCH)
686 .map(|d| d.as_millis() as u64)
687 .unwrap_or(0)
688}
689
690#[allow(dead_code)]
694pub fn encode_workspace_path(workspace_path: &str) -> String {
695 workspace_path
696 .replace('\\', "-")
697 .replace('/', "-")
698 .replace(':', "")
699}
700
701pub fn derive_claude_config_dir(agent_id: &str) -> Option<PathBuf> {
703 let home = dirs::home_dir()?;
704 let config_dir = home
705 .join(".config")
706 .join(format!("claude-{}", agent_id.to_lowercase()));
707 Some(config_dir)
708}