agentmux_cef\browser_api/
cdp.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Minimal Chrome DevTools Protocol (CDP) WebSocket client.
5//!
6//! One session = one WebSocket to `ws://127.0.0.1:<debug_port>/devtools/page/<target>`.
7//! Per-request construction in Phase 1 (see PLAN §Phase 5 for pooling).
8//!
9//! CDP protocol: every client request is `{id, method, params}`; every
10//! server reply with matching `id` is `{id, result}` or `{id, error}`.
11//! Events (no id) are ignored here — we don't subscribe to any in
12//! Phase 1 since every method we use is request/response.
13
14use std::time::Duration;
15
16use futures_util::{SinkExt, StreamExt};
17use serde_json::{json, Value};
18use tokio::net::TcpStream;
19use tokio_tungstenite::{
20    connect_async,
21    tungstenite::protocol::Message,
22    MaybeTlsStream, WebSocketStream,
23};
24
25pub type CdpError = String;
26
27pub struct CdpSession {
28    ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
29    next_id: u64,
30}
31
32impl CdpSession {
33    /// Open a CDP session against `ws://127.0.0.1:<port>/devtools/page/<target>`.
34    pub async fn connect(ws_url: &str) -> Result<Self, CdpError> {
35        let (ws, _resp) = connect_async(ws_url)
36            .await
37            .map_err(|e| format!("CDP connect {ws_url}: {e}"))?;
38        Ok(Self { ws, next_id: 1 })
39    }
40
41    /// Send `{id, method, params}` and wait for the reply with the
42    /// same id. Events (no id) and unrelated replies are silently
43    /// discarded — v1 doesn't multiplex.
44    pub async fn call(
45        &mut self,
46        method: &str,
47        params: Value,
48    ) -> Result<Value, CdpError> {
49        let id = self.next_id;
50        self.next_id += 1;
51
52        let req = json!({ "id": id, "method": method, "params": params });
53        let msg = Message::Text(req.to_string().into());
54        self.ws
55            .send(msg)
56            .await
57            .map_err(|e| format!("CDP send {method}: {e}"))?;
58
59        // Wait up to 10s for the matching reply. CDP replies are
60        // usually <100 ms on localhost; the generous bound is a
61        // safety net for pages stalled on JS execution.
62        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
63        loop {
64            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
65            if remaining.is_zero() {
66                return Err(format!("CDP timeout waiting for reply to {method}"));
67            }
68            let next = tokio::time::timeout(remaining, self.ws.next()).await;
69            let frame = match next {
70                Ok(Some(Ok(f))) => f,
71                Ok(Some(Err(e))) => return Err(format!("CDP ws error: {e}")),
72                Ok(None) => return Err("CDP ws closed".to_string()),
73                Err(_) => return Err(format!("CDP timeout waiting for reply to {method}")),
74            };
75            let text = match frame {
76                Message::Text(t) => t,
77                Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
78                Message::Close(_) => return Err("CDP ws closed by peer".to_string()),
79            };
80
81            let msg: Value = serde_json::from_str(&text)
82                .map_err(|e| format!("CDP parse: {e}"))?;
83            // Events have no top-level "id". Skip them.
84            let Some(msg_id) = msg.get("id").and_then(|v| v.as_u64()) else {
85                continue;
86            };
87            if msg_id != id {
88                // Interleaved reply to a previous call — unexpected in
89                // v1 since we don't pipeline, but skip it to be safe.
90                continue;
91            }
92            if let Some(err) = msg.get("error") {
93                return Err(format!(
94                    "CDP {method} error: {}",
95                    err.get("message")
96                        .and_then(|v| v.as_str())
97                        .unwrap_or("unknown"),
98                ));
99            }
100            return Ok(msg.get("result").cloned().unwrap_or(Value::Null));
101        }
102    }
103
104    pub async fn close(mut self) {
105        let _ = self.ws.close(None).await;
106    }
107}