1use std::sync::Arc;
26
27use parking_lot::Mutex;
28use serde::Deserialize;
29use serde_json::json;
30
31use crate::backend::rpc::engine::WshRpcEngine;
32use crate::backend::wps::{Broker, WaveEvent};
33use crate::server::AppState;
34
35pub const COMMAND_INSTALL_START: &str = "install.start";
36pub const COMMAND_INSTALL_CANCEL: &str = "install.cancel";
37pub const COMMAND_INSTALL_CHECK: &str = "install.check";
38pub const COMMAND_RESOLVE_PREREQS: &str = "resolve.prereqs";
39
40#[derive(Debug, Deserialize)]
41#[serde(rename_all = "camelCase")]
42struct InstallStartReq {
43 provider_id: String,
44 cli_command: String,
45 npm_package: String,
46 #[serde(default)]
47 pinned_version: String,
48}
49
50#[derive(Debug, Deserialize)]
51#[serde(rename_all = "camelCase")]
52struct InstallCancelReq {
53 session_id: String,
54}
55
56#[derive(Debug, Deserialize)]
57#[serde(rename_all = "camelCase")]
58struct InstallCheckReq {
59 provider_id: String,
60 cli_command: String,
61}
62
63#[derive(Default)]
70pub struct InstallSessionRegistry {
71 sessions: Mutex<std::collections::HashMap<String, tokio::sync::oneshot::Sender<()>>>,
72 active_providers: Mutex<std::collections::HashSet<String>>,
73}
74
75impl InstallSessionRegistry {
76 pub fn new() -> Arc<Self> {
77 Arc::new(Self::default())
78 }
79
80 fn insert(&self, session_id: String, tx: tokio::sync::oneshot::Sender<()>) {
81 self.sessions.lock().insert(session_id, tx);
82 }
83
84 fn try_claim_provider(&self, provider_id: &str) -> bool {
87 self.active_providers.lock().insert(provider_id.to_string())
88 }
89
90 fn release_provider(&self, provider_id: &str) {
91 self.active_providers.lock().remove(provider_id);
92 }
93
94 fn cancel(&self, session_id: &str) -> bool {
95 if let Some(tx) = self.sessions.lock().remove(session_id) {
96 let _ = tx.send(());
97 true
98 } else {
99 false
100 }
101 }
102
103 fn drop_session(&self, session_id: &str) {
104 self.sessions.lock().remove(session_id);
105 }
106}
107
108fn is_safe_provider_id(s: &str) -> bool {
111 !s.is_empty()
112 && s.len() <= 64
113 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
114}
115
116fn is_safe_cli_command(s: &str) -> bool {
120 !s.is_empty()
121 && s.len() <= 64
122 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
123 && !s.contains("..")
124}
125
126fn provider_install_dir(provider_id: &str) -> Option<std::path::PathBuf> {
134 let paths = agentmux_common::DataPaths::from_env()?;
135 let version = env!("CARGO_PKG_VERSION");
136 Some(
137 paths
138 .home_dir
139 .join("instances")
140 .join(format!("v{version}"))
141 .join("cli")
142 .join(provider_id),
143 )
144}
145
146async fn resolve_tool_path(tool: &str) -> Option<String> {
157 let cmd = if cfg!(windows) { "where" } else { "which" };
158 let output = tokio::process::Command::new(cmd)
159 .arg(tool)
160 .output()
161 .await
162 .ok()?;
163 if !output.status.success() {
164 return None;
165 }
166 let stdout = String::from_utf8_lossy(&output.stdout);
168 stdout.lines().next().map(|s| s.trim().to_string())
169 .filter(|s| !s.is_empty())
170}
171
172fn resolve_installed_bin(provider_id: &str, cli_command: &str) -> Option<std::path::PathBuf> {
173 let dir = provider_install_dir(provider_id)?;
174 let bin_dir = dir.join("node_modules").join(".bin");
175 let candidates: &[&str] = if cfg!(windows) {
176 &[".cmd", ".exe", ""]
177 } else {
178 &["", ".cmd"]
179 };
180 for suffix in candidates {
181 let p = bin_dir.join(format!("{cli_command}{suffix}"));
182 if p.is_file() {
183 return Some(p);
184 }
185 }
186 None
187}
188
189pub fn register_install_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
190 let registry = state.install_sessions.clone();
191 let broker = state.broker.clone();
192 engine.register_handler(
193 COMMAND_INSTALL_START,
194 Box::new(move |data, _ctx| {
195 let registry = registry.clone();
196 let broker = broker.clone();
197 Box::pin(async move {
198 let req: InstallStartReq = serde_json::from_value(data)
199 .map_err(|e| format!("install.start: {e}"))?;
200 if !is_safe_provider_id(&req.provider_id) {
201 return Err(format!(
202 "install.start: invalid provider id {:?} — must match [a-zA-Z0-9_-]+",
203 req.provider_id
204 ));
205 }
206 if !is_safe_cli_command(&req.cli_command) {
207 return Err(format!(
208 "install.start: invalid cli command {:?}",
209 req.cli_command
210 ));
211 }
212 if req.npm_package.is_empty() {
213 return Err(format!(
214 "install.start: provider {} has no npm_package — only npm-installable providers are supported in Phase α",
215 req.provider_id
216 ));
217 }
218 if !registry.try_claim_provider(&req.provider_id) {
219 return Err(format!(
220 "install.start: provider {} is already being installed in another session",
221 req.provider_id
222 ));
223 }
224 let session_id = format!("install-{}", uuid::Uuid::new_v4());
225 tracing::info!(
226 session_id = %session_id,
227 provider_id = %req.provider_id,
228 npm_package = %req.npm_package,
229 pinned_version = %req.pinned_version,
230 "install.start"
231 );
232
233 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel();
234 registry.insert(session_id.clone(), cancel_tx);
235
236 spawn_install_task(
237 broker,
238 registry,
239 session_id.clone(),
240 req.provider_id,
241 req.cli_command,
242 req.npm_package,
243 req.pinned_version,
244 cancel_rx,
245 );
246
247 Ok(Some(json!({ "sessionId": session_id })))
248 })
249 }),
250 );
251
252 engine.register_handler(
253 COMMAND_INSTALL_CHECK,
254 Box::new(move |data, _ctx| {
255 Box::pin(async move {
256 let req: InstallCheckReq = serde_json::from_value(data)
257 .map_err(|e| format!("install.check: {e}"))?;
258 if !is_safe_provider_id(&req.provider_id) {
259 return Err(format!(
260 "install.check: invalid provider id {:?}",
261 req.provider_id
262 ));
263 }
264 if !is_safe_cli_command(&req.cli_command) {
265 return Err(format!(
266 "install.check: invalid cli command {:?}",
267 req.cli_command
268 ));
269 }
270 let installed = resolve_installed_bin(&req.provider_id, &req.cli_command).is_some();
271 Ok(Some(json!({ "installed": installed })))
272 })
273 }),
274 );
275
276 engine.register_handler(
277 COMMAND_RESOLVE_PREREQS,
278 Box::new(move |data, _ctx| {
279 Box::pin(async move {
280 #[derive(serde::Deserialize)]
281 #[serde(rename_all = "camelCase")]
282 struct Req { tools: Vec<String> }
283 let req: Req = serde_json::from_value(data)
284 .map_err(|e| format!("resolve.prereqs: {e}"))?;
285 let mut results = Vec::with_capacity(req.tools.len());
286 for tool in &req.tools {
287 if !is_safe_cli_command(tool) {
288 return Err(format!(
289 "resolve.prereqs: invalid tool name {:?}",
290 tool
291 ));
292 }
293 let path = resolve_tool_path(tool).await;
294 results.push(json!({
295 "tool": tool,
296 "found": path.is_some(),
297 "path": path,
298 }));
299 }
300 Ok(Some(json!({ "results": results })))
301 })
302 }),
303 );
304
305 let registry = state.install_sessions.clone();
306 engine.register_handler(
307 COMMAND_INSTALL_CANCEL,
308 Box::new(move |data, _ctx| {
309 let registry = registry.clone();
310 Box::pin(async move {
311 let req: InstallCancelReq = serde_json::from_value(data)
312 .map_err(|e| format!("install.cancel: {e}"))?;
313 let ok = registry.cancel(&req.session_id);
314 Ok(Some(json!({
315 "success": ok,
316 "error": if ok { serde_json::Value::Null } else {
317 json!(format!("unknown or already-terminal session: {}", req.session_id))
318 }
319 })))
320 })
321 }),
322 );
323}
324
325#[allow(clippy::too_many_arguments)]
326fn spawn_install_task(
327 broker: Arc<Broker>,
328 registry: Arc<InstallSessionRegistry>,
329 session_id: String,
330 provider_id: String,
331 cli_command: String,
332 npm_package: String,
333 pinned_version: String,
334 mut cancel_rx: tokio::sync::oneshot::Receiver<()>,
335) {
336 tokio::spawn(async move {
337 use std::process::Stdio;
338 use tokio::io::{AsyncBufReadExt, BufReader};
339 use tokio::process::Command;
340
341 let scope = format!("install:{}", session_id);
342 let emit_line = |broker: &Broker, line: String, stream: &'static str| {
343 let event = WaveEvent {
344 event: "install_chunk".to_string(),
345 scopes: vec![scope.clone()],
346 sender: String::new(),
347 persist: 1024,
348 data: Some(json!({
349 "sessionId": session_id,
350 "line": line,
351 "stream": stream,
352 })),
353 };
354 broker.publish(event);
355 };
356 let emit_done = |broker: &Broker, ok: bool, error: Option<String>| {
361 let event = WaveEvent {
362 event: "install_chunk".to_string(),
363 scopes: vec![scope.clone()],
364 sender: String::new(),
365 persist: 1024,
366 data: Some(json!({
367 "sessionId": session_id,
368 "op": "done",
369 "ok": ok,
370 "error": error,
371 })),
372 };
373 broker.publish(event);
374 };
375 let emit_done_typed = |broker: &Broker, err: agentmux_common::AgentMuxError| {
376 let event = WaveEvent {
377 event: "install_chunk".to_string(),
378 scopes: vec![scope.clone()],
379 sender: String::new(),
380 persist: 1024,
381 data: Some(json!({
382 "sessionId": session_id,
383 "op": "done",
384 "ok": false,
385 "error": err.to_wire(),
386 })),
387 };
388 broker.publish(event);
389 };
390
391 let provider_dir = match provider_install_dir(&provider_id) {
392 Some(p) => p,
393 None => {
394 emit_done(&broker, false, Some("cannot determine home directory".into()));
395 registry.drop_session(&session_id);
396 registry.release_provider(&provider_id);
397 return;
398 }
399 };
400 if let Err(e) = std::fs::create_dir_all(&provider_dir) {
401 let err = agentmux_common::AgentMuxError::from_io_with_path(
406 provider_dir.display().to_string(),
407 e,
408 );
409 emit_done_typed(&broker, err);
410 registry.drop_session(&session_id);
411 registry.release_provider(&provider_id);
412 return;
413 }
414 let provider_dir_str = provider_dir.to_string_lossy().to_string();
415
416 let pkg_arg = if pinned_version.is_empty() {
417 npm_package.clone()
418 } else {
419 format!("{}@{}", npm_package, pinned_version)
420 };
421
422 let npm_args: Vec<String> = vec![
430 "install".to_string(),
431 pkg_arg.clone(),
432 "--prefix".to_string(),
433 provider_dir_str.clone(),
434 "--no-audit".to_string(),
435 "--no-fund".to_string(),
436 "--progress=false".to_string(),
437 "--loglevel=verbose".to_string(),
438 ];
439
440 emit_line(
441 &broker,
442 format!("$ npm {}", npm_args.join(" ")),
443 "stdout",
444 );
445
446 let mut cmd = Command::new(if cfg!(windows) { "npm.cmd" } else { "npm" });
447 cmd.args(&npm_args);
448 cmd.stdin(Stdio::null())
449 .stdout(Stdio::piped())
450 .stderr(Stdio::piped())
451 .kill_on_drop(true);
452 #[cfg(windows)]
453 {
454 use std::os::windows::process::CommandExt;
455 cmd.creation_flags(0x08000000); }
457
458 let mut child = match cmd.spawn() {
459 Ok(c) => c,
460 Err(e) => {
461 emit_done(&broker, false, Some(format!("spawn npm: {e}")));
462 registry.drop_session(&session_id);
463 registry.release_provider(&provider_id);
464 return;
465 }
466 };
467
468 let stdout = child.stdout.take().expect("piped");
469 let stderr = child.stderr.take().expect("piped");
470
471 let broker_out = broker.clone();
472 let session_out = session_id.clone();
473 let scope_out = scope.clone();
474 let stdout_task = tokio::spawn(async move {
475 let mut lines = BufReader::new(stdout).lines();
476 while let Ok(Some(line)) = lines.next_line().await {
477 let event = WaveEvent {
478 event: "install_chunk".to_string(),
479 scopes: vec![scope_out.clone()],
480 sender: String::new(),
481 persist: 1024,
482 data: Some(json!({
483 "sessionId": session_out,
484 "line": line,
485 "stream": "stdout",
486 })),
487 };
488 broker_out.publish(event);
489 }
490 });
491
492 let broker_err = broker.clone();
493 let session_err = session_id.clone();
494 let scope_err = scope.clone();
495 let stderr_task = tokio::spawn(async move {
496 let mut lines = BufReader::new(stderr).lines();
497 while let Ok(Some(line)) = lines.next_line().await {
498 let event = WaveEvent {
499 event: "install_chunk".to_string(),
500 scopes: vec![scope_err.clone()],
501 sender: String::new(),
502 persist: 1024,
503 data: Some(json!({
504 "sessionId": session_err,
505 "line": line,
506 "stream": "stderr",
507 })),
508 };
509 broker_err.publish(event);
510 }
511 });
512
513 tokio::select! {
514 wait = child.wait() => {
515 let _ = stdout_task.await;
516 let _ = stderr_task.await;
517 match wait {
518 Ok(s) if s.success() => {
519 if resolve_installed_bin(&provider_id, &cli_command).is_some() {
525 emit_done(&broker, true, None);
526 } else {
527 emit_done(
528 &broker,
529 false,
530 Some(format!(
531 "npm install reported success but {} not found in {}/node_modules/.bin/",
532 cli_command,
533 provider_dir.display()
534 )),
535 );
536 }
537 }
538 Ok(s) => emit_done(&broker, false, Some(format!("npm exited {:?}", s.code()))),
539 Err(e) => emit_done(&broker, false, Some(format!("wait: {e}"))),
540 }
541 }
542 _ = &mut cancel_rx => {
543 tracing::info!(session_id = %session_id, "install.cancel: killing child");
544 let _ = child.kill().await;
545 let _ = stdout_task.await;
546 let _ = stderr_task.await;
547 if let Err(e) = std::fs::remove_dir_all(&provider_dir) {
551 let mux = agentmux_common::AgentMuxError::from_io_with_path(
555 provider_dir.display().to_string(),
556 e,
557 );
558 tracing::warn!(
559 target: "amx::error",
560 session_id = %session_id,
561 amx_code = %mux.code(),
562 provider_dir = %provider_dir.display(),
563 error = %mux,
564 "install.cancel: remove partial dir failed"
565 );
566 }
567 emit_done(&broker, false, Some("cancelled".into()));
568 }
569 }
570
571 registry.drop_session(&session_id);
572 registry.release_provider(&provider_id);
573 });
574}
575
576#[cfg(test)]
577mod tests {
578 use super::{is_safe_cli_command, is_safe_provider_id};
579
580 #[test]
581 fn safe_provider_ids_accepted() {
582 for id in ["claude", "claude-code", "open_claw", "Codex42"] {
583 assert!(is_safe_provider_id(id), "{id} should be accepted");
584 }
585 }
586
587 #[test]
588 fn unsafe_provider_ids_rejected() {
589 for id in [
590 "",
591 "../escape",
592 "a/b",
593 "a\\b",
594 "a b",
595 ".",
596 "..",
597 "a..b",
598 "a/../b",
599 "\0null",
600 &"x".repeat(65),
601 ] {
602 assert!(!is_safe_provider_id(id), "{id:?} should be rejected");
603 }
604 }
605
606 #[test]
607 fn safe_cli_commands_accepted() {
608 for cmd in ["claude", "claude-code", "kimi.cmd", "agentmux-srv", "open_claw"] {
609 assert!(is_safe_cli_command(cmd), "{cmd} should be accepted");
610 }
611 }
612
613 #[test]
614 fn unsafe_cli_commands_rejected() {
615 for cmd in [
616 "",
617 "../etc/passwd",
618 "../../etc/passwd",
619 "a/b",
620 "a\\b",
621 "a b",
622 "..",
623 "a..b",
624 "a/../b",
625 "\0null",
626 &"x".repeat(65),
627 ] {
628 assert!(!is_safe_cli_command(cmd), "{cmd:?} should be rejected");
629 }
630 }
631}