agentmux_srv\agents\translator/
claude.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Claude Code stream-json → `AgentEvent` translator.
5//!
6//! Mirrors the frontend reference implementation at
7//! `frontend/app/view/agent/providers/claude-translator.ts` so the
8//! drone Agent block can consume the same stream without
9//! round-tripping through the renderer. Handled frames:
10//!
11//! - `stream_event.content_block_start.content_block.type=text` —
12//!   starts a text block; subsequent `text_delta`s belong to it.
13//! - `stream_event.content_block_start.content_block.type=tool_use` —
14//!   starts an in-flight tool_use; subsequent `input_json_delta`s
15//!   accumulate; `content_block_stop` emits the finalized
16//!   `AgentEvent::ToolUse`.
17//! - `stream_event.content_block_delta.delta.type=text_delta` —
18//!   `AgentEvent::AssistantText` (also accumulated into the final
19//!   response).
20//! - `stream_event.content_block_delta.delta.type=input_json_delta` —
21//!   appends to the pending tool_use's partial_json.
22//! - `stream_event.content_block_stop` — flushes the pending
23//!   tool_use.
24//! - `user.message.content[].type=tool_result` — emits
25//!   `AgentEvent::ToolResult`.
26//! - `result.cost_usd` + `result.usage` — emits
27//!   `AgentEvent::Cost` followed by `AgentEvent::Done`. The Done's
28//!   `response` is the explicit `result.result` field if present,
29//!   otherwise the accumulated text from streamed text_deltas.
30//!
31//! Unknown frame types and malformed shapes produce an empty `Vec`
32//! rather than panicking — the runner falls back to whatever the
33//! parallel raw-byte WPS path published, so an unfamiliar frame
34//! degrades gracefully.
35//!
36//! `thinking_delta` and `message_delta` / `message_stop` are
37//! discarded (the agent pane filters them too).
38
39use std::collections::HashMap;
40
41use serde_json::{Map, Value};
42
43use super::super::types::{AgentEvent, AgentTurn, TokenCounts};
44use super::Translator;
45
46#[derive(Debug, Default)]
47pub struct ClaudeTranslator {
48    /// In-flight tool_use between `content_block_start` (type=tool_use)
49    /// and `content_block_stop`. Cleared on stop.
50    pending_tool: Option<PendingToolUse>,
51    /// Map from `tool_use_id` to its tool name, populated when a
52    /// tool_use lands so a later `tool_result` can carry the name
53    /// for renderers. Phase 1.5 doesn't surface tool name on the
54    /// `ToolResult` event (the id is enough for downstream matching),
55    /// but the map is kept for future use.
56    #[allow(dead_code)]
57    tool_names: HashMap<String, String>,
58    /// Streamed text accumulated since the last assistant turn, used
59    /// as the `Done.response` if the `result` frame has no explicit
60    /// `result` text field.
61    accumulated_response: String,
62    /// Per-turn transcript built up as `assistant` / `user` frames land.
63    transcript: Vec<AgentTurn>,
64}
65
66#[derive(Debug)]
67struct PendingToolUse {
68    id: String,
69    name: String,
70    partial_json: String,
71}
72
73impl ClaudeTranslator {
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Reset all in-flight state. Intended for re-use of the same
79    /// translator across distinct runs (rare; usually each run gets
80    /// a fresh translator).
81    pub fn reset(&mut self) {
82        *self = Self::default();
83    }
84}
85
86impl Translator for ClaudeTranslator {
87    fn translate(&mut self, frame: Value) -> Vec<AgentEvent> {
88        let mut out = Vec::new();
89        let Some(frame_type) = frame.get("type").and_then(|v| v.as_str()) else {
90            return out;
91        };
92        match frame_type {
93            "stream_event" => handle_stream_event(self, &frame, &mut out),
94            "user" => handle_user_message(self, &frame, &mut out),
95            "assistant" => handle_assistant_message(self, &frame, &mut out),
96            "result" => handle_result(self, &frame, &mut out),
97            _ => {}
98        }
99        out
100    }
101}
102
103fn handle_stream_event(t: &mut ClaudeTranslator, frame: &Value, out: &mut Vec<AgentEvent>) {
104    let Some(event) = frame.get("event") else {
105        return;
106    };
107    let Some(ev_type) = event.get("type").and_then(|v| v.as_str()) else {
108        return;
109    };
110    match ev_type {
111        "content_block_start" => {
112            let Some(block) = event.get("content_block") else {
113                return;
114            };
115            if block.get("type").and_then(|v| v.as_str()) == Some("tool_use") {
116                let id = block.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string();
117                let name = block.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string();
118                t.tool_names.insert(id.clone(), name.clone());
119                t.pending_tool = Some(PendingToolUse {
120                    id,
121                    name,
122                    partial_json: String::new(),
123                });
124            }
125        }
126        "content_block_delta" => {
127            let Some(delta) = event.get("delta") else {
128                return;
129            };
130            match delta.get("type").and_then(|v| v.as_str()) {
131                Some("text_delta") => {
132                    if let Some(text) = delta.get("text").and_then(|v| v.as_str()) {
133                        t.accumulated_response.push_str(text);
134                        out.push(AgentEvent::AssistantText {
135                            delta: text.to_string(),
136                        });
137                    }
138                }
139                Some("input_json_delta") => {
140                    if let (Some(pending), Some(partial)) =
141                        (&mut t.pending_tool, delta.get("partial_json").and_then(|v| v.as_str()))
142                    {
143                        pending.partial_json.push_str(partial);
144                    }
145                }
146                _ => {} // thinking_delta and other variants — discard
147            }
148        }
149        "content_block_stop" => {
150            if let Some(pending) = t.pending_tool.take() {
151                // Anthropic's stream starts every tool_use with an
152                // implicit `input: {}` and only sends input_json_delta
153                // chunks when there ARE arguments. Treat an empty
154                // partial_json as the canonical no-arg object instead
155                // of falling through to the parse-failure path, which
156                // would emit Value::String("") and break downstream
157                // tool runners expecting an object. Codex P2 on PR #833.
158                let input: Value = if pending.partial_json.is_empty() {
159                    Value::Object(Map::new())
160                } else {
161                    // Non-empty: parse, fall back to the raw string
162                    // on failure so a malformed stream surfaces
163                    // SOMETHING rather than silently dropping.
164                    serde_json::from_str(&pending.partial_json)
165                        .unwrap_or(Value::String(pending.partial_json))
166                };
167                out.push(AgentEvent::ToolUse {
168                    tool_use_id: pending.id,
169                    tool: pending.name,
170                    input,
171                });
172            }
173        }
174        _ => {} // message_start, message_delta, message_stop — discard
175    }
176}
177
178fn handle_user_message(t: &mut ClaudeTranslator, frame: &Value, out: &mut Vec<AgentEvent>) {
179    let Some(content) = frame
180        .get("message")
181        .and_then(|m| m.get("content"))
182        .and_then(|c| c.as_array())
183    else {
184        return;
185    };
186    for block in content {
187        if block.get("type").and_then(|v| v.as_str()) != Some("tool_result") {
188            continue;
189        }
190        let tool_use_id = block
191            .get("tool_use_id")
192            .and_then(|v| v.as_str())
193            .unwrap_or("")
194            .to_string();
195        // `content` on a tool_result may be a string or an array of
196        // content parts — preserve whichever shape arrived.
197        let output = block.get("content").cloned().unwrap_or(Value::Null);
198        let is_error = block
199            .get("is_error")
200            .and_then(|v| v.as_bool())
201            .unwrap_or(false);
202        out.push(AgentEvent::ToolResult {
203            tool_use_id: tool_use_id.clone(),
204            output: output.clone(),
205            is_error,
206        });
207        // Also record the tool_result in the transcript so Done.transcript
208        // is the full ordered turn list (assistant turns + tool_result
209        // turns), not just the assistant side. Codex P2 on PR #833.
210        t.transcript.push(AgentTurn {
211            role: "tool_result".to_string(),
212            content: serde_json::json!({
213                "tool_use_id": tool_use_id,
214                "content": output,
215                "is_error": is_error,
216            }),
217            timestamp_ms: now_ms(),
218        });
219    }
220}
221
222fn handle_assistant_message(t: &mut ClaudeTranslator, frame: &Value, _out: &mut Vec<AgentEvent>) {
223    // Skip partial snapshots — when Claude is launched with
224    // --include-partial-messages it emits top-level `assistant`
225    // frames with `partial: true` for each streaming delta before
226    // the final consolidated turn. Recording every snapshot would
227    // produce duplicate transcript entries per turn. Frontend
228    // translator does the same skip
229    // (frontend/app/view/agent/providers/claude-translator.ts).
230    // Reagent P1 + codex P2 on PR #833.
231    if frame.get("partial").and_then(|v| v.as_bool()) == Some(true) {
232        return;
233    }
234    // Record the turn into the transcript — used as part of `Done`.
235    // Don't emit per-block events here; the streaming path already
236    // emitted them via stream_event deltas.
237    let Some(message) = frame.get("message") else {
238        return;
239    };
240    let content = message.get("content").cloned().unwrap_or(Value::Null);
241    t.transcript.push(AgentTurn {
242        role: "assistant".to_string(),
243        content,
244        timestamp_ms: now_ms(),
245    });
246}
247
248fn handle_result(t: &mut ClaudeTranslator, frame: &Value, out: &mut Vec<AgentEvent>) {
249    let cost_usd = frame
250        .get("cost_usd")
251        .and_then(|v| v.as_f64())
252        .unwrap_or(0.0);
253    let tokens = parse_usage(frame.get("usage"));
254    out.push(AgentEvent::Cost { cost_usd, tokens });
255
256    let response = frame
257        .get("result")
258        .and_then(|v| v.as_str())
259        .map(|s| s.to_string())
260        .unwrap_or_else(|| std::mem::take(&mut t.accumulated_response));
261    let transcript = std::mem::take(&mut t.transcript);
262    out.push(AgentEvent::Done {
263        response,
264        transcript,
265    });
266}
267
268fn parse_usage(usage: Option<&Value>) -> TokenCounts {
269    let Some(usage) = usage.and_then(|v| v.as_object()) else {
270        return TokenCounts::default();
271    };
272    let take = |m: &Map<String, Value>, k: &str| -> u64 {
273        m.get(k).and_then(|v| v.as_u64()).unwrap_or(0)
274    };
275    TokenCounts {
276        input: take(usage, "input_tokens"),
277        output: take(usage, "output_tokens"),
278        cache_creation: take(usage, "cache_creation_input_tokens"),
279        cache_read: take(usage, "cache_read_input_tokens"),
280    }
281}
282
283fn now_ms() -> i64 {
284    std::time::SystemTime::now()
285        .duration_since(std::time::UNIX_EPOCH)
286        .map(|d| d.as_millis() as i64)
287        .unwrap_or(0)
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use serde_json::json;
294
295    fn text_delta(text: &str) -> Value {
296        json!({
297            "type": "stream_event",
298            "event": {
299                "type": "content_block_delta",
300                "delta": { "type": "text_delta", "text": text }
301            }
302        })
303    }
304
305    fn tool_use_start(id: &str, name: &str) -> Value {
306        json!({
307            "type": "stream_event",
308            "event": {
309                "type": "content_block_start",
310                "content_block": { "type": "tool_use", "id": id, "name": name }
311            }
312        })
313    }
314
315    fn input_json_delta(partial: &str) -> Value {
316        json!({
317            "type": "stream_event",
318            "event": {
319                "type": "content_block_delta",
320                "delta": { "type": "input_json_delta", "partial_json": partial }
321            }
322        })
323    }
324
325    fn content_block_stop() -> Value {
326        json!({
327            "type": "stream_event",
328            "event": { "type": "content_block_stop" }
329        })
330    }
331
332    #[test]
333    fn text_delta_emits_assistant_text() {
334        let mut t = ClaudeTranslator::new();
335        let events = t.translate(text_delta("hello"));
336        assert_eq!(events.len(), 1);
337        match &events[0] {
338            AgentEvent::AssistantText { delta } => assert_eq!(delta, "hello"),
339            other => panic!("expected AssistantText, got {other:?}"),
340        }
341    }
342
343    #[test]
344    fn streaming_text_accumulates_for_done_response() {
345        // No explicit `result.result` — `Done.response` should be the
346        // concatenation of streamed text_deltas.
347        let mut t = ClaudeTranslator::new();
348        t.translate(text_delta("Hello "));
349        t.translate(text_delta("world"));
350        let events = t.translate(json!({ "type": "result", "cost_usd": 0.01 }));
351        // Expect Cost + Done
352        assert_eq!(events.len(), 2);
353        match &events[1] {
354            AgentEvent::Done { response, .. } => assert_eq!(response, "Hello world"),
355            other => panic!("expected Done with accumulated text, got {other:?}"),
356        }
357    }
358
359    #[test]
360    fn tool_use_emits_only_on_content_block_stop() {
361        let mut t = ClaudeTranslator::new();
362        // Start doesn't emit yet.
363        assert!(t.translate(tool_use_start("t1", "Bash")).is_empty());
364        // Delta accumulates, doesn't emit.
365        assert!(t.translate(input_json_delta(r#"{"command":"#)).is_empty());
366        assert!(t.translate(input_json_delta(r#""ls"}"#)).is_empty());
367        // Stop flushes.
368        let events = t.translate(content_block_stop());
369        assert_eq!(events.len(), 1);
370        match &events[0] {
371            AgentEvent::ToolUse {
372                tool_use_id,
373                tool,
374                input,
375            } => {
376                assert_eq!(tool_use_id, "t1");
377                assert_eq!(tool, "Bash");
378                assert_eq!(input, &json!({ "command": "ls" }));
379            }
380            other => panic!("expected ToolUse, got {other:?}"),
381        }
382    }
383
384    #[test]
385    fn no_arg_tool_emits_empty_object() {
386        // No input_json_delta between start and stop — codex P2 on
387        // PR #833. The fallback path used to emit Value::String(""),
388        // which broke downstream tool runners. Now emits {}.
389        let mut t = ClaudeTranslator::new();
390        t.translate(tool_use_start("t1", "Echo"));
391        let events = t.translate(content_block_stop());
392        assert_eq!(events.len(), 1);
393        match &events[0] {
394            AgentEvent::ToolUse { input, .. } => {
395                assert_eq!(input, &json!({}));
396            }
397            other => panic!("expected ToolUse with empty object input, got {other:?}"),
398        }
399    }
400
401    #[test]
402    fn malformed_tool_input_falls_back_to_raw_string() {
403        let mut t = ClaudeTranslator::new();
404        t.translate(tool_use_start("t1", "Edit"));
405        t.translate(input_json_delta(r#"{"this is bad json"#));
406        let events = t.translate(content_block_stop());
407        assert_eq!(events.len(), 1);
408        match &events[0] {
409            AgentEvent::ToolUse { input, .. } => {
410                // Falls back to the partial string so the
411                // downstream consumer sees SOMETHING.
412                assert_eq!(input, &json!(r#"{"this is bad json"#));
413            }
414            other => panic!("expected ToolUse fallback, got {other:?}"),
415        }
416    }
417
418    #[test]
419    fn user_tool_result_emits_event() {
420        let mut t = ClaudeTranslator::new();
421        let events = t.translate(json!({
422            "type": "user",
423            "message": {
424                "content": [{
425                    "type": "tool_result",
426                    "tool_use_id": "t3",
427                    "content": "command output here",
428                    "is_error": false
429                }]
430            }
431        }));
432        assert_eq!(events.len(), 1);
433        match &events[0] {
434            AgentEvent::ToolResult {
435                tool_use_id,
436                output,
437                is_error,
438            } => {
439                assert_eq!(tool_use_id, "t3");
440                assert_eq!(output, &json!("command output here"));
441                assert!(!is_error);
442            }
443            other => panic!("expected ToolResult, got {other:?}"),
444        }
445    }
446
447    #[test]
448    fn tool_result_is_error_propagates() {
449        let mut t = ClaudeTranslator::new();
450        let events = t.translate(json!({
451            "type": "user",
452            "message": {
453                "content": [{
454                    "type": "tool_result",
455                    "tool_use_id": "t4",
456                    "content": "permission denied",
457                    "is_error": true
458                }]
459            }
460        }));
461        match &events[0] {
462            AgentEvent::ToolResult { is_error, .. } => assert!(is_error),
463            other => panic!("expected ToolResult with is_error, got {other:?}"),
464        }
465    }
466
467    #[test]
468    fn result_emits_cost_then_done_with_explicit_response() {
469        let mut t = ClaudeTranslator::new();
470        let events = t.translate(json!({
471            "type": "result",
472            "cost_usd": 0.0123,
473            "usage": {
474                "input_tokens": 100,
475                "output_tokens": 50,
476                "cache_creation_input_tokens": 0,
477                "cache_read_input_tokens": 200
478            },
479            "result": "final answer text"
480        }));
481        assert_eq!(events.len(), 2);
482        match &events[0] {
483            AgentEvent::Cost { cost_usd, tokens } => {
484                assert_eq!(*cost_usd, 0.0123);
485                assert_eq!(tokens.input, 100);
486                assert_eq!(tokens.output, 50);
487                assert_eq!(tokens.cache_creation, 0);
488                assert_eq!(tokens.cache_read, 200);
489            }
490            other => panic!("expected Cost first, got {other:?}"),
491        }
492        match &events[1] {
493            AgentEvent::Done { response, .. } => {
494                assert_eq!(response, "final answer text");
495            }
496            other => panic!("expected Done second, got {other:?}"),
497        }
498    }
499
500    #[test]
501    fn assistant_message_added_to_transcript() {
502        let mut t = ClaudeTranslator::new();
503        // Assistant frame produces NO events directly (the streaming
504        // path emits the per-block events); it contributes to the
505        // transcript captured at Done.
506        let events = t.translate(json!({
507            "type": "assistant",
508            "message": {
509                "content": [{ "type": "text", "text": "hello" }]
510            }
511        }));
512        assert!(events.is_empty());
513        // The transcript shows up on Done.
514        let done = t.translate(json!({ "type": "result", "cost_usd": 0.0 }));
515        match &done[1] {
516            AgentEvent::Done { transcript, .. } => {
517                assert_eq!(transcript.len(), 1);
518                assert_eq!(transcript[0].role, "assistant");
519            }
520            other => panic!("expected Done, got {other:?}"),
521        }
522    }
523
524    #[test]
525    fn unknown_frame_returns_empty() {
526        let mut t = ClaudeTranslator::new();
527        assert!(t.translate(json!({ "type": "system" })).is_empty());
528        assert!(t.translate(json!({ "type": "stream_event", "event": { "type": "message_stop" } })).is_empty());
529        assert!(t.translate(json!({ "type": "stream_event", "event": { "type": "message_delta" } })).is_empty());
530    }
531
532    #[test]
533    fn malformed_or_missing_type_returns_empty() {
534        let mut t = ClaudeTranslator::new();
535        assert!(t.translate(json!({})).is_empty());
536        assert!(t.translate(json!(null)).is_empty());
537        assert!(t.translate(json!("not an object")).is_empty());
538        assert!(t.translate(json!(42)).is_empty());
539    }
540
541    #[test]
542    fn skips_partial_assistant_snapshots() {
543        // When Claude is launched with --include-partial-messages it
544        // streams partial assistant frames with `partial: true` before
545        // the final consolidated turn. Recording each would duplicate
546        // transcript entries. Reagent P1 on PR #833.
547        let mut t = ClaudeTranslator::new();
548        // Three partial snapshots — all should be ignored.
549        t.translate(json!({
550            "type": "assistant",
551            "partial": true,
552            "message": { "content": [{ "type": "text", "text": "h" }] }
553        }));
554        t.translate(json!({
555            "type": "assistant",
556            "partial": true,
557            "message": { "content": [{ "type": "text", "text": "he" }] }
558        }));
559        t.translate(json!({
560            "type": "assistant",
561            "partial": true,
562            "message": { "content": [{ "type": "text", "text": "hello" }] }
563        }));
564        // Now the consolidated turn (partial: false / absent).
565        t.translate(json!({
566            "type": "assistant",
567            "message": { "content": [{ "type": "text", "text": "hello" }] }
568        }));
569        let done = t.translate(json!({ "type": "result", "cost_usd": 0.0 }));
570        match &done[1] {
571            AgentEvent::Done { transcript, .. } => {
572                assert_eq!(
573                    transcript.len(),
574                    1,
575                    "only the consolidated turn should land; got {transcript:?}"
576                );
577            }
578            other => panic!("expected Done, got {other:?}"),
579        }
580    }
581
582    #[test]
583    fn tool_result_recorded_in_transcript() {
584        // Codex P2 on PR #833: Done.transcript must be the full
585        // ordered turn list (assistant + tool_result), not just the
586        // assistant side. Audit/replay needs the whole conversation.
587        let mut t = ClaudeTranslator::new();
588        // Assistant turn 1.
589        t.translate(json!({
590            "type": "assistant",
591            "message": {
592                "content": [{
593                    "type": "tool_use",
594                    "id": "t1",
595                    "name": "Bash",
596                    "input": { "command": "ls" }
597                }]
598            }
599        }));
600        // Tool result.
601        t.translate(json!({
602            "type": "user",
603            "message": {
604                "content": [{
605                    "type": "tool_result",
606                    "tool_use_id": "t1",
607                    "content": "output",
608                    "is_error": false
609                }]
610            }
611        }));
612        // Final assistant turn.
613        t.translate(json!({
614            "type": "assistant",
615            "message": {
616                "content": [{ "type": "text", "text": "done" }]
617            }
618        }));
619        let done = t.translate(json!({ "type": "result", "cost_usd": 0.0 }));
620        match &done[1] {
621            AgentEvent::Done { transcript, .. } => {
622                assert_eq!(transcript.len(), 3, "got {transcript:?}");
623                assert_eq!(transcript[0].role, "assistant");
624                assert_eq!(transcript[1].role, "tool_result");
625                assert_eq!(transcript[2].role, "assistant");
626                // Verify the tool_result turn carries the structured
627                // content payload — not just the bare output.
628                let tr = &transcript[1].content;
629                assert_eq!(tr["tool_use_id"], json!("t1"));
630                assert_eq!(tr["content"], json!("output"));
631                assert_eq!(tr["is_error"], json!(false));
632            }
633            other => panic!("expected Done, got {other:?}"),
634        }
635    }
636
637    #[test]
638    fn reset_clears_in_flight_state() {
639        let mut t = ClaudeTranslator::new();
640        t.translate(tool_use_start("t5", "Edit"));
641        t.translate(input_json_delta(r#"{"x":1}"#));
642        t.translate(text_delta("partial response"));
643
644        t.reset();
645
646        // After reset, a content_block_stop should not emit because
647        // the pending_tool was cleared.
648        assert!(t.translate(content_block_stop()).is_empty());
649        // And the accumulated response is gone.
650        let done = t.translate(json!({ "type": "result", "cost_usd": 0.0 }));
651        match &done[1] {
652            AgentEvent::Done { response, .. } => assert_eq!(response, ""),
653            other => panic!("expected empty Done response, got {other:?}"),
654        }
655    }
656}