agentmux_srv\server/
install_handlers.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `install.start` / `install.cancel` RPC handlers.
5//!
6//! Phase α of `SPEC_AGENT_INSTALL_STAGE_2026_05_17.md`. Spawns
7//! `npm install <package>` for the requested provider, streams every
8//! line of stdout+stderr to `install_chunk` WPS events scoped to
9//! `install:<sessionId>`, and emits a terminal `{ op: "done", ok,
10//! error? }` event when the child exits (or the user cancels).
11//!
12//! Phase α scope:
13//!  - npm-only install (existing per-version layout at
14//!    `~/.agentmux/<version>/cli/<provider>/`).
15//!  - Plain piped stdio (no PTY) — npm doesn't isatty-gate its output
16//!    line-by-line, so pipes are fine here. PTY would be required for
17//!    interactive post-install steps (Phase δ).
18//!  - Single in-flight install per session id. The frontend modal owns
19//!    the session id and prevents parallel installs at the UI layer.
20//!  - Cancel kills the child via `kill_on_drop` when the abort handle
21//!    fires. The partial `node_modules` dir is rm-rf'd best-effort.
22//!  - No verify / doctor / post-install steps yet — those land in
23//!    Phase β.
24
25use std::sync::Arc;
26
27use parking_lot::Mutex;
28use serde::Deserialize;
29use serde_json::json;
30
31use crate::backend::rpc::engine::WshRpcEngine;
32use crate::backend::wps::{Broker, WaveEvent};
33use crate::server::AppState;
34
35pub const COMMAND_INSTALL_START: &str = "install.start";
36pub const COMMAND_INSTALL_CANCEL: &str = "install.cancel";
37pub const COMMAND_INSTALL_CHECK: &str = "install.check";
38pub const COMMAND_RESOLVE_PREREQS: &str = "resolve.prereqs";
39
40#[derive(Debug, Deserialize)]
41#[serde(rename_all = "camelCase")]
42struct InstallStartReq {
43    provider_id: String,
44    cli_command: String,
45    npm_package: String,
46    #[serde(default)]
47    pinned_version: String,
48}
49
50#[derive(Debug, Deserialize)]
51#[serde(rename_all = "camelCase")]
52struct InstallCancelReq {
53    session_id: String,
54}
55
56#[derive(Debug, Deserialize)]
57#[serde(rename_all = "camelCase")]
58struct InstallCheckReq {
59    provider_id: String,
60    cli_command: String,
61}
62
63/// Per-session abort handle so `install.cancel` can kill an in-flight
64/// install. Also tracks `active_providers` so concurrent sessions for
65/// the *same* provider directory are rejected — without that, cancel
66/// of one would `rm_rf` the shared dir mid-install for the other.
67/// `parking_lot::Mutex` since the engine is sync at the handler
68/// boundary.
69#[derive(Default)]
70pub struct InstallSessionRegistry {
71    sessions: Mutex<std::collections::HashMap<String, tokio::sync::oneshot::Sender<()>>>,
72    active_providers: Mutex<std::collections::HashSet<String>>,
73}
74
75impl InstallSessionRegistry {
76    pub fn new() -> Arc<Self> {
77        Arc::new(Self::default())
78    }
79
80    fn insert(&self, session_id: String, tx: tokio::sync::oneshot::Sender<()>) {
81        self.sessions.lock().insert(session_id, tx);
82    }
83
84    /// Try to claim a provider; returns false if another session is
85    /// already installing this provider.
86    fn try_claim_provider(&self, provider_id: &str) -> bool {
87        self.active_providers.lock().insert(provider_id.to_string())
88    }
89
90    fn release_provider(&self, provider_id: &str) {
91        self.active_providers.lock().remove(provider_id);
92    }
93
94    fn cancel(&self, session_id: &str) -> bool {
95        if let Some(tx) = self.sessions.lock().remove(session_id) {
96            let _ = tx.send(());
97            true
98        } else {
99            false
100        }
101    }
102
103    fn drop_session(&self, session_id: &str) {
104        self.sessions.lock().remove(session_id);
105    }
106}
107
108/// Provider ids feed into the install dir path; reject anything that
109/// could escape `~/.agentmux/<version>/cli/<provider>/`.
110fn is_safe_provider_id(s: &str) -> bool {
111    !s.is_empty()
112        && s.len() <= 64
113        && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
114}
115
116/// CLI command names are joined into the bin-resolution path; same
117/// allowlist as provider ids, plus `.` (some real CLIs include dots,
118/// e.g. `eslint.cmd`).
119fn is_safe_cli_command(s: &str) -> bool {
120    !s.is_empty()
121        && s.len() <= 64
122        && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
123        && !s.contains("..")
124}
125
126/// Canonical install directory for a provider —
127/// `<agentmux_home>/instances/v<version>/cli/<provider>/`. This is the
128/// same path the frontend uses to launch the agent
129/// (`agent-model.ts::resolveCliDir`), so the bin we drop in
130/// `node_modules/.bin/` is what the launch path will execute.
131/// Honors portable / installed mode + the `AGENTMUX_HOME_OVERRIDE`
132/// test override via `DataPaths::from_env()`.
133fn provider_install_dir(provider_id: &str) -> Option<std::path::PathBuf> {
134    let paths = agentmux_common::DataPaths::from_env()?;
135    let version = env!("CARGO_PKG_VERSION");
136    Some(
137        paths
138            .home_dir
139            .join("instances")
140            .join(format!("v{version}"))
141            .join("cli")
142            .join(provider_id),
143    )
144}
145
146/// Returns the path to the installed CLI binary if present in the
147/// per-version cache, else None. Used by `install.check`.
148/// Locate a system tool (e.g. `git`, `gh`) on PATH. Uses the platform
149/// equivalent of `which` and returns the resolved absolute path. None
150/// when the tool isn't on PATH or the lookup itself failed.
151///
152/// Used by `resolve.prereqs` to pre-launch-check whether a provider's
153/// system dependencies are installed. The probe is path-only — never
154/// executes the tool — so it's safe to call without side effects.
155/// See SPEC_PROVIDER_SYSTEM_PREREQS_2026_05_18.md.
156async fn resolve_tool_path(tool: &str) -> Option<String> {
157    let cmd = if cfg!(windows) { "where" } else { "which" };
158    let output = tokio::process::Command::new(cmd)
159        .arg(tool)
160        .output()
161        .await
162        .ok()?;
163    if !output.status.success() {
164        return None;
165    }
166    // `where` on Windows can return multiple lines; first is canonical.
167    let stdout = String::from_utf8_lossy(&output.stdout);
168    stdout.lines().next().map(|s| s.trim().to_string())
169        .filter(|s| !s.is_empty())
170}
171
172fn resolve_installed_bin(provider_id: &str, cli_command: &str) -> Option<std::path::PathBuf> {
173    let dir = provider_install_dir(provider_id)?;
174    let bin_dir = dir.join("node_modules").join(".bin");
175    let candidates: &[&str] = if cfg!(windows) {
176        &[".cmd", ".exe", ""]
177    } else {
178        &["", ".cmd"]
179    };
180    for suffix in candidates {
181        let p = bin_dir.join(format!("{cli_command}{suffix}"));
182        if p.is_file() {
183            return Some(p);
184        }
185    }
186    None
187}
188
189pub fn register_install_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
190    let registry = state.install_sessions.clone();
191    let broker = state.broker.clone();
192    engine.register_handler(
193        COMMAND_INSTALL_START,
194        Box::new(move |data, _ctx| {
195            let registry = registry.clone();
196            let broker = broker.clone();
197            Box::pin(async move {
198                let req: InstallStartReq = serde_json::from_value(data)
199                    .map_err(|e| format!("install.start: {e}"))?;
200                if !is_safe_provider_id(&req.provider_id) {
201                    return Err(format!(
202                        "install.start: invalid provider id {:?} — must match [a-zA-Z0-9_-]+",
203                        req.provider_id
204                    ));
205                }
206                if !is_safe_cli_command(&req.cli_command) {
207                    return Err(format!(
208                        "install.start: invalid cli command {:?}",
209                        req.cli_command
210                    ));
211                }
212                if req.npm_package.is_empty() {
213                    return Err(format!(
214                        "install.start: provider {} has no npm_package — only npm-installable providers are supported in Phase α",
215                        req.provider_id
216                    ));
217                }
218                if !registry.try_claim_provider(&req.provider_id) {
219                    return Err(format!(
220                        "install.start: provider {} is already being installed in another session",
221                        req.provider_id
222                    ));
223                }
224                let session_id = format!("install-{}", uuid::Uuid::new_v4());
225                tracing::info!(
226                    session_id = %session_id,
227                    provider_id = %req.provider_id,
228                    npm_package = %req.npm_package,
229                    pinned_version = %req.pinned_version,
230                    "install.start"
231                );
232
233                let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel();
234                registry.insert(session_id.clone(), cancel_tx);
235
236                spawn_install_task(
237                    broker,
238                    registry,
239                    session_id.clone(),
240                    req.provider_id,
241                    req.cli_command,
242                    req.npm_package,
243                    req.pinned_version,
244                    cancel_rx,
245                );
246
247                Ok(Some(json!({ "sessionId": session_id })))
248            })
249        }),
250    );
251
252    engine.register_handler(
253        COMMAND_INSTALL_CHECK,
254        Box::new(move |data, _ctx| {
255            Box::pin(async move {
256                let req: InstallCheckReq = serde_json::from_value(data)
257                    .map_err(|e| format!("install.check: {e}"))?;
258                if !is_safe_provider_id(&req.provider_id) {
259                    return Err(format!(
260                        "install.check: invalid provider id {:?}",
261                        req.provider_id
262                    ));
263                }
264                if !is_safe_cli_command(&req.cli_command) {
265                    return Err(format!(
266                        "install.check: invalid cli command {:?}",
267                        req.cli_command
268                    ));
269                }
270                let installed = resolve_installed_bin(&req.provider_id, &req.cli_command).is_some();
271                Ok(Some(json!({ "installed": installed })))
272            })
273        }),
274    );
275
276    engine.register_handler(
277        COMMAND_RESOLVE_PREREQS,
278        Box::new(move |data, _ctx| {
279            Box::pin(async move {
280                #[derive(serde::Deserialize)]
281                #[serde(rename_all = "camelCase")]
282                struct Req { tools: Vec<String> }
283                let req: Req = serde_json::from_value(data)
284                    .map_err(|e| format!("resolve.prereqs: {e}"))?;
285                let mut results = Vec::with_capacity(req.tools.len());
286                for tool in &req.tools {
287                    if !is_safe_cli_command(tool) {
288                        return Err(format!(
289                            "resolve.prereqs: invalid tool name {:?}",
290                            tool
291                        ));
292                    }
293                    let path = resolve_tool_path(tool).await;
294                    results.push(json!({
295                        "tool": tool,
296                        "found": path.is_some(),
297                        "path": path,
298                    }));
299                }
300                Ok(Some(json!({ "results": results })))
301            })
302        }),
303    );
304
305    let registry = state.install_sessions.clone();
306    engine.register_handler(
307        COMMAND_INSTALL_CANCEL,
308        Box::new(move |data, _ctx| {
309            let registry = registry.clone();
310            Box::pin(async move {
311                let req: InstallCancelReq = serde_json::from_value(data)
312                    .map_err(|e| format!("install.cancel: {e}"))?;
313                let ok = registry.cancel(&req.session_id);
314                Ok(Some(json!({
315                    "success": ok,
316                    "error": if ok { serde_json::Value::Null } else {
317                        json!(format!("unknown or already-terminal session: {}", req.session_id))
318                    }
319                })))
320            })
321        }),
322    );
323}
324
325#[allow(clippy::too_many_arguments)]
326fn spawn_install_task(
327    broker: Arc<Broker>,
328    registry: Arc<InstallSessionRegistry>,
329    session_id: String,
330    provider_id: String,
331    cli_command: String,
332    npm_package: String,
333    pinned_version: String,
334    mut cancel_rx: tokio::sync::oneshot::Receiver<()>,
335) {
336    tokio::spawn(async move {
337        use std::process::Stdio;
338        use tokio::io::{AsyncBufReadExt, BufReader};
339        use tokio::process::Command;
340
341        let scope = format!("install:{}", session_id);
342        let emit_line = |broker: &Broker, line: String, stream: &'static str| {
343            let event = WaveEvent {
344                event: "install_chunk".to_string(),
345                scopes: vec![scope.clone()],
346                sender: String::new(),
347                persist: 1024,
348                data: Some(json!({
349                    "sessionId": session_id,
350                    "line": line,
351                    "stream": stream,
352                })),
353            };
354            broker.publish(event);
355        };
356        // Legacy emit path — the `error` field is a free-text string.
357        // New code paths should use `emit_done_typed` below to emit
358        // the wire-format `AgentMuxError` object so the frontend can
359        // render a friendly `<ErrorBanner />`.
360        let emit_done = |broker: &Broker, ok: bool, error: Option<String>| {
361            let event = WaveEvent {
362                event: "install_chunk".to_string(),
363                scopes: vec![scope.clone()],
364                sender: String::new(),
365                persist: 1024,
366                data: Some(json!({
367                    "sessionId": session_id,
368                    "op": "done",
369                    "ok": ok,
370                    "error": error,
371                })),
372            };
373            broker.publish(event);
374        };
375        let emit_done_typed = |broker: &Broker, err: agentmux_common::AgentMuxError| {
376            let event = WaveEvent {
377                event: "install_chunk".to_string(),
378                scopes: vec![scope.clone()],
379                sender: String::new(),
380                persist: 1024,
381                data: Some(json!({
382                    "sessionId": session_id,
383                    "op": "done",
384                    "ok": false,
385                    "error": err.to_wire(),
386                })),
387            };
388            broker.publish(event);
389        };
390
391        let provider_dir = match provider_install_dir(&provider_id) {
392            Some(p) => p,
393            None => {
394                emit_done(&broker, false, Some("cannot determine home directory".into()));
395                registry.drop_session(&session_id);
396                registry.release_provider(&provider_id);
397                return;
398            }
399        };
400        if let Err(e) = std::fs::create_dir_all(&provider_dir) {
401            // The disk-full / permission-denied / path-not-found cases
402            // route to the typed catalog so the frontend renders a
403            // friendly "Device out of space" message instead of the
404            // raw OS error. Other IO kinds fall through to Legacy.
405            let err = agentmux_common::AgentMuxError::from_io_with_path(
406                provider_dir.display().to_string(),
407                e,
408            );
409            emit_done_typed(&broker, err);
410            registry.drop_session(&session_id);
411            registry.release_provider(&provider_id);
412            return;
413        }
414        let provider_dir_str = provider_dir.to_string_lossy().to_string();
415
416        let pkg_arg = if pinned_version.is_empty() {
417            npm_package.clone()
418        } else {
419            format!("{}@{}", npm_package, pinned_version)
420        };
421
422        // `--progress=false` is unconditional: npm only renders the
423        // progress bar when both stdout and stderr are TTYs, and this
424        // task pipes both, so leaving progress at the default would
425        // produce no visible spinner anyway. `--loglevel=verbose` is
426        // also unconditional: the user's only signal of progress
427        // during long installs is the per-package fetch/extract
428        // chatter, so we always pay for the noise to gain the signal.
429        let npm_args: Vec<String> = vec![
430            "install".to_string(),
431            pkg_arg.clone(),
432            "--prefix".to_string(),
433            provider_dir_str.clone(),
434            "--no-audit".to_string(),
435            "--no-fund".to_string(),
436            "--progress=false".to_string(),
437            "--loglevel=verbose".to_string(),
438        ];
439
440        emit_line(
441            &broker,
442            format!("$ npm {}", npm_args.join(" ")),
443            "stdout",
444        );
445
446        let mut cmd = Command::new(if cfg!(windows) { "npm.cmd" } else { "npm" });
447        cmd.args(&npm_args);
448        cmd.stdin(Stdio::null())
449            .stdout(Stdio::piped())
450            .stderr(Stdio::piped())
451            .kill_on_drop(true);
452        #[cfg(windows)]
453        {
454            use std::os::windows::process::CommandExt;
455            cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW
456        }
457
458        let mut child = match cmd.spawn() {
459            Ok(c) => c,
460            Err(e) => {
461                emit_done(&broker, false, Some(format!("spawn npm: {e}")));
462                registry.drop_session(&session_id);
463                registry.release_provider(&provider_id);
464                return;
465            }
466        };
467
468        let stdout = child.stdout.take().expect("piped");
469        let stderr = child.stderr.take().expect("piped");
470
471        let broker_out = broker.clone();
472        let session_out = session_id.clone();
473        let scope_out = scope.clone();
474        let stdout_task = tokio::spawn(async move {
475            let mut lines = BufReader::new(stdout).lines();
476            while let Ok(Some(line)) = lines.next_line().await {
477                let event = WaveEvent {
478                    event: "install_chunk".to_string(),
479                    scopes: vec![scope_out.clone()],
480                    sender: String::new(),
481                    persist: 1024,
482                    data: Some(json!({
483                        "sessionId": session_out,
484                        "line": line,
485                        "stream": "stdout",
486                    })),
487                };
488                broker_out.publish(event);
489            }
490        });
491
492        let broker_err = broker.clone();
493        let session_err = session_id.clone();
494        let scope_err = scope.clone();
495        let stderr_task = tokio::spawn(async move {
496            let mut lines = BufReader::new(stderr).lines();
497            while let Ok(Some(line)) = lines.next_line().await {
498                let event = WaveEvent {
499                    event: "install_chunk".to_string(),
500                    scopes: vec![scope_err.clone()],
501                    sender: String::new(),
502                    persist: 1024,
503                    data: Some(json!({
504                        "sessionId": session_err,
505                        "line": line,
506                        "stream": "stderr",
507                    })),
508                };
509                broker_err.publish(event);
510            }
511        });
512
513        tokio::select! {
514            wait = child.wait() => {
515                let _ = stdout_task.await;
516                let _ = stderr_task.await;
517                match wait {
518                    Ok(s) if s.success() => {
519                        // npm exit 0 ≠ "binary is on disk". Verify the
520                        // expected bin shim exists so a package/bin
521                        // rename or provider-config mismatch surfaces
522                        // as an install failure rather than a phantom
523                        // launch with a non-existent cmd path.
524                        if resolve_installed_bin(&provider_id, &cli_command).is_some() {
525                            emit_done(&broker, true, None);
526                        } else {
527                            emit_done(
528                                &broker,
529                                false,
530                                Some(format!(
531                                    "npm install reported success but {} not found in {}/node_modules/.bin/",
532                                    cli_command,
533                                    provider_dir.display()
534                                )),
535                            );
536                        }
537                    }
538                    Ok(s) => emit_done(&broker, false, Some(format!("npm exited {:?}", s.code()))),
539                    Err(e) => emit_done(&broker, false, Some(format!("wait: {e}"))),
540                }
541            }
542            _ = &mut cancel_rx => {
543                tracing::info!(session_id = %session_id, "install.cancel: killing child");
544                let _ = child.kill().await;
545                let _ = stdout_task.await;
546                let _ = stderr_task.await;
547                // Wipe the partial install so a retry doesn't inherit
548                // a half-written package-lock.json. Best-effort —
549                // logging only on failure.
550                if let Err(e) = std::fs::remove_dir_all(&provider_dir) {
551                    // Best-effort cleanup; tag the log with the typed
552                    // code so grouped support requests can grep by
553                    // `amx_code` rather than free-text matching.
554                    let mux = agentmux_common::AgentMuxError::from_io_with_path(
555                        provider_dir.display().to_string(),
556                        e,
557                    );
558                    tracing::warn!(
559                        target: "amx::error",
560                        session_id = %session_id,
561                        amx_code = %mux.code(),
562                        provider_dir = %provider_dir.display(),
563                        error = %mux,
564                        "install.cancel: remove partial dir failed"
565                    );
566                }
567                emit_done(&broker, false, Some("cancelled".into()));
568            }
569        }
570
571        registry.drop_session(&session_id);
572        registry.release_provider(&provider_id);
573    });
574}
575
576#[cfg(test)]
577mod tests {
578    use super::{is_safe_cli_command, is_safe_provider_id};
579
580    #[test]
581    fn safe_provider_ids_accepted() {
582        for id in ["claude", "claude-code", "open_claw", "Codex42"] {
583            assert!(is_safe_provider_id(id), "{id} should be accepted");
584        }
585    }
586
587    #[test]
588    fn unsafe_provider_ids_rejected() {
589        for id in [
590            "",
591            "../escape",
592            "a/b",
593            "a\\b",
594            "a b",
595            ".",
596            "..",
597            "a..b",
598            "a/../b",
599            "\0null",
600            &"x".repeat(65),
601        ] {
602            assert!(!is_safe_provider_id(id), "{id:?} should be rejected");
603        }
604    }
605
606    #[test]
607    fn safe_cli_commands_accepted() {
608        for cmd in ["claude", "claude-code", "kimi.cmd", "agentmux-srv", "open_claw"] {
609            assert!(is_safe_cli_command(cmd), "{cmd} should be accepted");
610        }
611    }
612
613    #[test]
614    fn unsafe_cli_commands_rejected() {
615        for cmd in [
616            "",
617            "../etc/passwd",
618            "../../etc/passwd",
619            "a/b",
620            "a\\b",
621            "a b",
622            "..",
623            "a..b",
624            "a/../b",
625            "\0null",
626            &"x".repeat(65),
627        ] {
628            assert!(!is_safe_cli_command(cmd), "{cmd:?} should be rejected");
629        }
630    }
631}