agentmux_srv\agents/
runner.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Unified agent runner — one-shot Claude Code spawn for the
5//! drone Agent block.
6//!
7//! Spawns `claude --print --output-format=stream-json` as a non-
8//! interactive subprocess, drains its stdout through
9//! `ClaudeTranslator`, forwards each `AgentEvent` on the caller's
10//! `tx`, and resolves the handle's `final_result` with the
11//! structured `AgentRunResult` once the stream emits `Done`.
12//!
13//! Headless and one-shot by design — the drone Agent block's
14//! contract is "send task, wait for done, return result." The
15//! interactive agent pane has its own PTY-based controller in
16//! `blockcontroller/shell.rs`; that path is NOT routed through
17//! this runner (see `docs/specs/SPEC_UNIFIED_AGENT_TYPES_2026_05_13.md`
18//! §4.2 — what's shared is the translator + event shape, not the
19//! spawn function).
20
21use std::path::PathBuf;
22use std::process::Stdio;
23
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
25use tokio::process::Command;
26use tokio::sync::{mpsc, oneshot};
27
28use super::translator::claude::ClaudeTranslator;
29use super::translator::Translator as _;
30use super::types::{AgentEvent, AgentRef, AgentRunResult, AgentTask};
31
32/// Override the default `claude` binary name. Set to a full path
33/// (or a different binary) for testing or non-PATH installs.
34const ENV_CLAUDE_BIN: &str = "AGENTMUX_CLAUDE_BIN";
35
36const DEFAULT_CLAUDE_BIN: &str = "claude";
37
38/// Handle returned by `run_agent`. The caller already holds the
39/// `mpsc::UnboundedReceiver<AgentEvent>` they paired with the `tx`
40/// passed into `run_agent`; this handle adds the structured terminal
41/// value via `final_result` (drone Agent block's downstream
42/// output) and the `instance_id` of the backing `db_agent_instances`
43/// row.
44///
45/// Dropping the caller's receiver implicitly cancels the run only if
46/// the runner observes the send error — Phase 2 adds an explicit
47/// `AbortHandle`.
48pub struct AgentRunHandle {
49    pub instance_id: String,
50    pub final_result: oneshot::Receiver<Result<AgentRunResult, String>>,
51}
52
53/// Error returned by the runner.
54#[derive(Debug, thiserror::Error)]
55pub enum AgentError {
56    #[error("agent runner: invalid AgentRef: {0}")]
57    InvalidRef(String),
58    #[error("agent runner: spawn failed: {0}")]
59    Spawn(String),
60}
61
62/// Spawn `claude --print --output-format=stream-json` per the given
63/// `AgentTask` and `AgentRef`, drain its stdout through the shared
64/// `ClaudeTranslator`, forward each `AgentEvent` on `tx`, and
65/// resolve the returned handle's `final_result` with an
66/// `AgentRunResult` constructed from the terminal Cost + Done events.
67///
68/// Working directory resolution:
69///   - `agent_ref.working_directory` if non-empty
70///   - else the current process working directory
71///
72/// Identity / memory bundle resolution and named-agent continuation
73/// are NOT plumbed in Phase 1.5 PR 2 — the drone Agent block
74/// always spawns fresh (per spec §8 "drone runs always allocate
75/// fresh instance_name"). The bundles can be added in a follow-up
76/// once the drone inspector (PR 3) needs to surface them.
77pub async fn run_agent(
78    agent_ref: AgentRef,
79    task: AgentTask,
80    tx: mpsc::UnboundedSender<AgentEvent>,
81) -> Result<AgentRunHandle, AgentError> {
82    let bin = std::env::var(ENV_CLAUDE_BIN)
83        .unwrap_or_else(|_| DEFAULT_CLAUDE_BIN.to_string());
84    run_agent_with_bin(&bin, agent_ref, task, tx).await
85}
86
87/// Internal entry point — same as `run_agent` but takes the `claude`
88/// binary path explicitly. Lets tests inject a known-nonexistent
89/// path to exercise the spawn-failure path without touching env vars
90/// (Rust 1.81+ flags `std::env::set_var` as unsound under concurrent
91/// test execution). The public `run_agent` is a thin shim that
92/// resolves the binary from `$AGENTMUX_CLAUDE_BIN` or the default.
93pub(crate) async fn run_agent_with_bin(
94    bin: &str,
95    agent_ref: AgentRef,
96    task: AgentTask,
97    tx: mpsc::UnboundedSender<AgentEvent>,
98) -> Result<AgentRunHandle, AgentError> {
99    let working_dir = if agent_ref.working_directory.is_empty() {
100        std::env::current_dir()
101            .map_err(|e| AgentError::Spawn(format!("cwd: {e}")))?
102    } else {
103        PathBuf::from(&agent_ref.working_directory)
104    };
105
106    // `claude --print` runs in non-interactive mode and exits when
107    // done. `--output-format=stream-json` emits one JSON object per
108    // line, the format ClaudeTranslator consumes. `--verbose` is
109    // required alongside stream-json (the CLI rejects stream-json
110    // without it). `--include-partial-messages` gives us the
111    // streaming text_deltas — the translator skips the resulting
112    // `partial: true` snapshots when building the transcript.
113    let mut cmd = Command::new(bin);
114    cmd.arg("--print")
115        .arg("--output-format=stream-json")
116        .arg("--verbose")
117        .arg("--include-partial-messages");
118    // Forward the configured turn cap so the CLI enforces it.
119    // Previously stored on AgentTask but never passed to the
120    // subprocess — silently ignored. Reagent P1 + codex P2 on
121    // PR #834.
122    if let Some(n) = task.max_turns {
123        cmd.arg("--max-turns").arg(n.to_string());
124    }
125    let mut child = cmd
126        .arg(&task.prompt)
127        .current_dir(&working_dir)
128        .stdin(Stdio::null())
129        .stdout(Stdio::piped())
130        .stderr(Stdio::piped())
131        .kill_on_drop(true)
132        .spawn()
133        .map_err(|e| AgentError::Spawn(format!("spawn `{bin}`: {e}")))?;
134
135    let stdout = child
136        .stdout
137        .take()
138        .ok_or_else(|| AgentError::Spawn("claude stdout pipe missing".to_string()))?;
139    let stderr = child
140        .stderr
141        .take()
142        .ok_or_else(|| AgentError::Spawn("claude stderr pipe missing".to_string()))?;
143
144    let instance_id = format!("drone-agent-{}", uuid::Uuid::new_v4());
145    let (result_tx, result_rx) = oneshot::channel();
146
147    // Drain stderr to EOF in the background so the child's pipe
148    // never fills (a half-drained pipe causes the child to block
149    // on stderr writes, which can stall the whole run). Capped at
150    // STDERR_CAP bytes — beyond that, additional bytes are read
151    // and dropped on the floor so the child can keep writing.
152    // Phase 2 surfaces stderr as a `dronerun:<id>` diagnostic
153    // event; for now it's captured locally and discarded.
154    // Reagent P2 on PR #834.
155    tokio::spawn(async move {
156        const STDERR_CAP: usize = 64 * 1024;
157        let mut buf = Vec::with_capacity(4096);
158        let mut reader = BufReader::new(stderr);
159        let mut sink = [0u8; 4096];
160        // Read until EOF, capping the kept prefix at STDERR_CAP.
161        loop {
162            if buf.len() < STDERR_CAP {
163                let space = STDERR_CAP - buf.len();
164                let take = space.min(sink.len());
165                match reader.read(&mut sink[..take]).await {
166                    Ok(0) => break,
167                    Ok(n) => buf.extend_from_slice(&sink[..n]),
168                    Err(_) => break,
169                }
170            } else {
171                // Beyond cap — keep draining but discard.
172                match reader.read(&mut sink).await {
173                    Ok(0) => break,
174                    Ok(_) => {}
175                    Err(_) => break,
176                }
177            }
178        }
179        // buf currently dropped; Phase 2 will plumb it to the broker.
180        let _ = buf;
181    });
182
183    tokio::spawn(async move {
184        let result = drain_and_collect(stdout, &tx, &mut child).await;
185        let _ = result_tx.send(result);
186    });
187
188    Ok(AgentRunHandle {
189        instance_id,
190        final_result: result_rx,
191    })
192}
193
194/// Drain `stdout` line-by-line through `ClaudeTranslator`, forward
195/// every emitted event on `tx`, accumulate the terminal `Cost` /
196/// `Done` event payloads into an `AgentRunResult`, wait for the
197/// child to exit, and return the result.
198///
199/// Split out from `run_agent` so it can be unit-tested against
200/// in-memory readers without spawning a real subprocess. Used by
201/// the integration test below as `drain_async_reader_for_test`.
202async fn drain_and_collect(
203    stdout: tokio::process::ChildStdout,
204    tx: &mpsc::UnboundedSender<AgentEvent>,
205    child: &mut tokio::process::Child,
206) -> Result<AgentRunResult, String> {
207    let result = drain_async_reader(BufReader::new(stdout), tx).await;
208
209    // Wait for child to exit so the OS reaps it cleanly.
210    let exit = child.wait().await.map_err(|e| format!("wait: {e}"))?;
211
212    match result {
213        Ok(mut accumulated) if exit.success() => {
214            // If the stream never emitted Done (e.g. claude died
215            // mid-run), surface a synthetic error rather than
216            // returning empty defaults silently.
217            if accumulated.response.is_empty() && accumulated.transcript.is_empty() {
218                return Err("claude exited 0 but stream produced no Done event".to_string());
219            }
220            accumulated.transcript.shrink_to_fit();
221            Ok(accumulated)
222        }
223        Ok(_) => Err(format!(
224            "claude exited with status {exit} but stream emitted no error"
225        )),
226        Err(e) => Err(e),
227    }
228}
229
230/// Drain an arbitrary async reader of newline-delimited stream-json
231/// frames, forward every emitted `AgentEvent` on `tx`, and return
232/// an accumulator capturing the terminal `Cost` and `Done` values.
233///
234/// Pure async helper — no subprocess, no broker. Unit-tested with
235/// `tokio::io::duplex` in-memory pipes.
236pub(crate) async fn drain_async_reader<R: tokio::io::AsyncBufRead + Unpin>(
237    mut reader: R,
238    tx: &mpsc::UnboundedSender<AgentEvent>,
239) -> Result<AgentRunResult, String> {
240    let mut translator = ClaudeTranslator::new();
241    let mut accumulated = AgentRunResult::default();
242    let mut line = String::new();
243    loop {
244        line.clear();
245        let n = reader
246            .read_line(&mut line)
247            .await
248            .map_err(|e| format!("stdout read: {e}"))?;
249        if n == 0 {
250            break; // EOF
251        }
252        let trimmed = line.trim_end_matches(['\n', '\r']);
253        if !trimmed.starts_with('{') {
254            continue;
255        }
256        let Ok(frame) = serde_json::from_str::<serde_json::Value>(trimmed) else {
257            continue;
258        };
259        for event in translator.translate(frame) {
260            // Capture terminal values before forwarding so a closed
261            // receiver doesn't lose the accumulated result.
262            match &event {
263                AgentEvent::Cost { cost_usd, tokens } => {
264                    accumulated.cost_usd = *cost_usd;
265                    accumulated.tokens = tokens.clone();
266                }
267                AgentEvent::Done {
268                    response,
269                    transcript,
270                } => {
271                    accumulated.response = response.clone();
272                    accumulated.transcript = transcript.clone();
273                }
274                _ => {}
275            }
276            // Forward — if the receiver is dropped, just stop
277            // sending (the drain still continues to capture the
278            // accumulated result).
279            let _ = tx.send(event);
280        }
281    }
282    Ok(accumulated)
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use serde_json::json;
289    use tokio::io::AsyncWriteExt;
290
291    /// Build a stream-json byte sequence simulating a complete
292    /// short claude run: streamed text + cost + result.
293    fn synthetic_stream(prompt_reply: &str, cost: f64) -> Vec<u8> {
294        let mut s = String::new();
295        for ch in prompt_reply.chars() {
296            s.push_str(&format!(
297                r#"{{"type":"stream_event","event":{{"type":"content_block_delta","delta":{{"type":"text_delta","text":"{ch}"}}}}}}"#,
298            ));
299            s.push('\n');
300        }
301        s.push_str(&format!(
302            r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"{prompt_reply}"}}]}}}}
303"#
304        ));
305        s.push_str(&format!(
306            r#"{{"type":"result","cost_usd":{cost},"usage":{{"input_tokens":10,"output_tokens":5,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"result":"{prompt_reply}"}}
307"#
308        ));
309        s.into_bytes()
310    }
311
312    #[tokio::test]
313    async fn drain_async_reader_accumulates_cost_and_done() {
314        let bytes = synthetic_stream("hello", 0.001);
315        let (mut w, r) = tokio::io::duplex(4096);
316        tokio::spawn(async move {
317            w.write_all(&bytes).await.unwrap();
318            w.shutdown().await.unwrap();
319        });
320
321        let (tx, mut rx) = mpsc::unbounded_channel();
322        let result = drain_async_reader(BufReader::new(r), &tx)
323            .await
324            .expect("drain ok");
325
326        assert_eq!(result.response, "hello");
327        assert_eq!(result.cost_usd, 0.001);
328        assert_eq!(result.tokens.input, 10);
329        assert_eq!(result.tokens.output, 5);
330        // Transcript contains the assistant turn.
331        assert_eq!(result.transcript.len(), 1);
332
333        // Events forwarded: 5 AssistantText (one per char) + Cost + Done.
334        drop(tx);
335        let mut evs = Vec::new();
336        while let Some(e) = rx.recv().await {
337            evs.push(e);
338        }
339        assert_eq!(evs.len(), 7, "got events: {evs:?}");
340        match &evs[evs.len() - 1] {
341            AgentEvent::Done { .. } => {}
342            other => panic!("expected last event Done, got {other:?}"),
343        }
344    }
345
346    #[tokio::test]
347    async fn drain_async_reader_skips_non_json_lines() {
348        // claude --verbose sometimes emits informational lines on
349        // stdout that aren't stream-json (rare but possible). Those
350        // must not break the drain.
351        let mut bytes: Vec<u8> = b"Reading config...\n".to_vec();
352        bytes.extend_from_slice(&synthetic_stream("ok", 0.0));
353        bytes.extend_from_slice(b"\n");
354
355        let (mut w, r) = tokio::io::duplex(4096);
356        tokio::spawn(async move {
357            w.write_all(&bytes).await.unwrap();
358            w.shutdown().await.unwrap();
359        });
360
361        let (tx, _rx) = mpsc::unbounded_channel();
362        let result = drain_async_reader(BufReader::new(r), &tx)
363            .await
364            .expect("drain ok");
365        assert_eq!(result.response, "ok");
366    }
367
368    #[tokio::test]
369    async fn drain_async_reader_returns_empty_on_no_stream() {
370        let (mut w, r) = tokio::io::duplex(4096);
371        tokio::spawn(async move {
372            // Just close — no output at all.
373            w.shutdown().await.unwrap();
374        });
375
376        let (tx, _rx) = mpsc::unbounded_channel();
377        let result = drain_async_reader(BufReader::new(r), &tx)
378            .await
379            .expect("drain ok");
380        // Default-empty result — the drain itself succeeds; the
381        // caller (drain_and_collect) is responsible for surfacing
382        // the "no Done event" as an error since it depends on
383        // exit status semantics.
384        assert!(result.response.is_empty());
385        assert_eq!(result.cost_usd, 0.0);
386    }
387
388    #[tokio::test]
389    async fn drain_async_reader_handles_multi_line_chunks() {
390        // BufReader's read_line is well-defined; this just guards
391        // against future regressions where someone might switch to a
392        // chunked reader.
393        let bytes = synthetic_stream("multi", 0.01);
394        let (mut w, r) = tokio::io::duplex(4096);
395        tokio::spawn(async move {
396            // Write in small chunks to exercise the read path.
397            for chunk in bytes.chunks(7) {
398                w.write_all(chunk).await.unwrap();
399            }
400            w.shutdown().await.unwrap();
401        });
402
403        let (tx, _rx) = mpsc::unbounded_channel();
404        let result = drain_async_reader(BufReader::new(r), &tx)
405            .await
406            .expect("drain ok");
407        assert_eq!(result.response, "multi");
408    }
409
410    #[tokio::test]
411    async fn drain_handles_malformed_json_gracefully() {
412        let mut bytes: Vec<u8> =
413            b"{this is not valid json\n{\"type\":\"unknown\"}\n".to_vec();
414        bytes.extend_from_slice(&synthetic_stream("recovered", 0.0));
415
416        let (mut w, r) = tokio::io::duplex(4096);
417        tokio::spawn(async move {
418            w.write_all(&bytes).await.unwrap();
419            w.shutdown().await.unwrap();
420        });
421
422        let (tx, _rx) = mpsc::unbounded_channel();
423        let result = drain_async_reader(BufReader::new(r), &tx)
424            .await
425            .expect("drain ok");
426        assert_eq!(result.response, "recovered");
427    }
428
429    #[tokio::test]
430    #[ignore = "requires `claude` CLI on PATH; run manually for end-to-end"]
431    async fn run_agent_end_to_end_with_real_claude() {
432        // Manual smoke: AGENTMUX_CLAUDE_BIN=/path/to/claude
433        // cargo test -p agentmux-srv -- --ignored
434        //     run_agent_end_to_end_with_real_claude
435        let (tx, mut rx) = mpsc::unbounded_channel();
436        let handle = run_agent(
437            AgentRef::default(),
438            AgentTask {
439                prompt: "What is 2+2? Respond with just the number.".to_string(),
440                context: serde_json::Map::new(),
441                max_turns: None,
442            },
443            tx,
444        )
445        .await
446        .expect("spawn ok");
447
448        // Drain events until done.
449        while let Some(_ev) = rx.recv().await {}
450
451        let result = handle
452            .final_result
453            .await
454            .expect("oneshot ok")
455            .expect("agent run ok");
456        assert!(result.response.contains('4'), "got: {}", result.response);
457        assert!(result.cost_usd > 0.0);
458    }
459
460    #[tokio::test]
461    async fn run_agent_with_bin_surfaces_spawn_failure() {
462        // Inject a known-nonexistent binary path so the spawn fails
463        // deterministically. Verifies the AgentError::Spawn path
464        // without touching env vars (set_var is unsound under
465        // concurrent test execution in Rust 1.81+).
466        let (tx, _rx) = mpsc::unbounded_channel();
467        let result = run_agent_with_bin(
468            "/definitely/does/not/exist/claude-xyz-test",
469            AgentRef::default(),
470            AgentTask {
471                prompt: "hi".to_string(),
472                context: serde_json::Map::new(),
473                max_turns: None,
474            },
475            tx,
476        )
477        .await;
478        match result {
479            Err(AgentError::Spawn(msg)) => {
480                assert!(
481                    msg.contains("spawn") || msg.contains("does/not/exist"),
482                    "spawn error message should reference the failure; got: {msg}"
483                );
484            }
485            Err(other) => panic!("expected Spawn error, got: {other}"),
486            Ok(_) => panic!("expected Spawn error, got Ok(handle)"),
487        }
488    }
489
490    #[test]
491    fn agent_task_max_turns_field_round_trips() {
492        // Reagent P1 + codex P2 on PR #834: the max_turns field is
493        // forwarded to the subprocess via `--max-turns N`. We can't
494        // assert the actual CLI argument here without spawning, but
495        // we can verify the field flows through AgentTask's
496        // serde + Clone path without loss — the subprocess wiring
497        // is exercised by the end-to-end test.
498        let task = AgentTask {
499            prompt: "x".into(),
500            context: serde_json::Map::new(),
501            max_turns: Some(7),
502        };
503        let v = serde_json::to_value(&task).unwrap();
504        assert_eq!(v["maxTurns"], json!(7));
505        let back: AgentTask = serde_json::from_value(v).unwrap();
506        assert_eq!(back.max_turns, Some(7));
507    }
508
509    #[test]
510    fn agent_run_result_has_sensible_defaults() {
511        let r = AgentRunResult::default();
512        assert_eq!(r.response, "");
513        assert_eq!(r.cost_usd, 0.0);
514        assert!(r.transcript.is_empty());
515        let _ = json!(r); // serializes without panic
516    }
517}