agentmux_srv\server/identity_handlers.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pre-launch OAuth flow RPC handlers.
5//!
6//! Five commands per `docs/specs/SPEC_PRE_LAUNCH_OAUTH_FLOW_2026_05_14.md` §7:
7//!
8//! * `auth.start` — `StartProviderAuth`
9//! * `auth.poll` — `PollProviderAuth`
10//! * `auth.submitcallback` — `SubmitAuthCallback`
11//! * `auth.cancel` — `CancelProviderAuth`
12//! * `auth.submitapikey` — `SubmitProviderApiKey`
13//!
14//! This module owns the RPC surface and dispatches into
15//! `crate::identity::auth_session::AuthSessionManager`. The actual
16//! provider-CLI spawn (which feeds stdout lines into the session
17//! via `record_line`) lands in a follow-up commit on this branch —
18//! these handlers ship the lifecycle plumbing and return clear
19//! "spawn not yet wired" errors for the callback/api-key paths so
20//! frontend (PR B) can integrate against the real shape.
21
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::{SystemTime, UNIX_EPOCH};
25
26use serde::{Deserialize, Serialize};
27
28use crate::backend::providers::get_provider;
29use crate::backend::rpc::engine::WshRpcEngine;
30use crate::backend::storage::wstore::{IdentityAccount, SecretRef, WaveStore};
31use crate::backend::wps::Broker;
32
33use super::AppState;
34
35pub const COMMAND_AUTH_START: &str = "auth.start";
36pub const COMMAND_AUTH_POLL: &str = "auth.poll";
37pub const COMMAND_AUTH_SUBMIT_CALLBACK: &str = "auth.submitcallback";
38pub const COMMAND_AUTH_CANCEL: &str = "auth.cancel";
39pub const COMMAND_AUTH_SUBMIT_API_KEY: &str = "auth.submitapikey";
40
41#[derive(Debug, Deserialize)]
42#[serde(rename_all = "camelCase")]
43struct StartProviderAuthReq {
44 provider_id: String,
45 /// Optional — when set, a successful auth adds an account to the
46 /// existing bundle. When None, a fresh bundle is created.
47 #[serde(default)]
48 into_bundle_id: Option<String>,
49 /// Resolved CLI path. The frontend calls `resolvecli` first to
50 /// install / locate the provider's CLI; the resulting path is
51 /// passed here. Keeps the provider table single-sourced in the
52 /// frontend.
53 cli_path: String,
54 /// e.g. `["auth", "login"]` from the provider definition's
55 /// `authLoginCommand` field.
56 auth_login_args: Vec<String>,
57 /// e.g. `["auth", "status", "--json"]` — used to confirm
58 /// authentication after the CLI emits its success line.
59 auth_check_args: Vec<String>,
60 /// Env vars to inject at spawn time. Per-provider auth
61 /// isolation env vars come here (CLAUDE_CONFIG_DIR, CODEX_HOME,
62 /// GEMINI_CLI_HOME, etc.).
63 #[serde(default)]
64 auth_env: std::collections::HashMap<String, String>,
65 /// Spawn the auth login subprocess under a PTY (instead of plain
66 /// piped stdio). Required by providers whose auth subcommand
67 /// checks `isatty()` and refuses to run otherwise (currently
68 /// OpenClaw's `models auth login --provider <id>`). The flag is
69 /// forwarded down to the CEF host's `run_cli_login` which picks
70 /// the PTY branch when set.
71 #[serde(default)]
72 requires_tty: bool,
73}
74
75#[derive(Debug, Deserialize)]
76#[serde(rename_all = "camelCase")]
77struct PollProviderAuthReq {
78 session_id: String,
79}
80
81#[derive(Debug, Deserialize)]
82#[serde(rename_all = "camelCase")]
83struct SubmitAuthCallbackReq {
84 session_id: String,
85 /// The redirect URL the user pasted back (browser-didn't-open
86 /// fallback). The handler writes it to the spawned CLI's stdin.
87 callback_url: String,
88}
89
90#[derive(Debug, Deserialize)]
91#[serde(rename_all = "camelCase")]
92struct CancelProviderAuthReq {
93 session_id: String,
94}
95
96#[derive(Debug, Deserialize)]
97#[serde(rename_all = "camelCase")]
98struct SubmitProviderApiKeyReq {
99 provider_id: String,
100 /// Optional — same semantics as StartProviderAuth.into_bundle_id.
101 #[serde(default)]
102 into_bundle_id: Option<String>,
103 /// The API key the user pasted. Will be validated by running the
104 /// provider's authCheckCommand against it, then persisted as a
105 /// `SecretRef::PlaintextDev` (PR 1 of the storage spec) or
106 /// encrypted variant (PR 2).
107 api_key: String,
108 account_name: String,
109}
110
111#[derive(Debug, Serialize)]
112#[serde(rename_all = "camelCase")]
113struct AckResp {
114 success: bool,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 error: Option<String>,
117}
118
119pub fn register_identity_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
120 let mgr = state.auth_session_manager.clone();
121 let wstore = state.wstore.clone();
122 let broker = state.broker.clone();
123 engine.register_handler(
124 COMMAND_AUTH_START,
125 Box::new(move |data, _ctx| {
126 let mgr = mgr.clone();
127 let wstore = wstore.clone();
128 let broker = broker.clone();
129 Box::pin(async move {
130 let req: StartProviderAuthReq = serde_json::from_value(data)
131 .map_err(|e| format!("auth.start: {e}"))?;
132 tracing::info!(
133 provider_id = %req.provider_id,
134 cli_path = %req.cli_path,
135 into_bundle_id = ?req.into_bundle_id,
136 "auth.start"
137 );
138 // OAuth Bundles PR C invariant — when an OAuth flow runs
139 // against a bundle, the CLI's OAuth tokens land INSIDE the
140 // bundle's per-provider config dir, NOT ambient at
141 // `~/.claude/`. Compute the dir from the registry
142 // (single source of truth — `auth_dir_name` per provider)
143 // and the per-bundle root from `DataPaths::identity_dir`.
144 // Mirror the dir into `auth_env` under the provider's
145 // `auth_config_dir_env_var` (e.g. `CLAUDE_CONFIG_DIR`).
146 // This overrides whatever the frontend computed via the
147 // legacy ambient `ensureAuthDir` path so a bundle-bound
148 // launch never accidentally writes to the global dir.
149 //
150 // When `into_bundle_id` is empty (ambient launch — no
151 // bundle context), keep the legacy `auth_env` exactly so
152 // the existing ambient flow keeps working.
153 //
154 // Errors (path resolve, mkdir) log + fall back to the
155 // legacy env — never abort `auth.start` over a per-bundle
156 // dir issue. Mirrors the `inject_identity_env` pattern.
157 let mut auth_env = req.auth_env;
158 let bundle_dir = compute_and_ensure_bundle_dir(
159 req.into_bundle_id.as_deref(),
160 &req.provider_id,
161 &mut auth_env,
162 );
163 let r = mgr.start_session(req.provider_id.clone(), req.into_bundle_id.clone());
164 spawn_auth_cli(
165 mgr,
166 wstore,
167 broker,
168 r.session_id.clone(),
169 req.provider_id,
170 req.into_bundle_id,
171 bundle_dir,
172 req.cli_path,
173 req.auth_login_args,
174 req.auth_check_args,
175 auth_env,
176 req.requires_tty,
177 );
178 Ok(Some(serde_json::to_value(&r).unwrap_or_default()))
179 })
180 }),
181 );
182
183 let mgr = state.auth_session_manager.clone();
184 engine.register_handler(
185 COMMAND_AUTH_POLL,
186 Box::new(move |data, _ctx| {
187 let mgr = mgr.clone();
188 Box::pin(async move {
189 let req: PollProviderAuthReq = serde_json::from_value(data)
190 .map_err(|e| format!("auth.poll: {e}"))?;
191 let snap = mgr
192 .poll_session(&req.session_id)
193 .ok_or_else(|| format!("auth.poll: unknown session {}", req.session_id))?;
194 Ok(Some(serde_json::to_value(&snap).unwrap_or_default()))
195 })
196 }),
197 );
198
199 let mgr = state.auth_session_manager.clone();
200 engine.register_handler(
201 COMMAND_AUTH_CANCEL,
202 Box::new(move |data, _ctx| {
203 let mgr = mgr.clone();
204 Box::pin(async move {
205 let req: CancelProviderAuthReq = serde_json::from_value(data)
206 .map_err(|e| format!("auth.cancel: {e}"))?;
207 let ok = mgr.cancel_session(&req.session_id);
208 Ok(Some(
209 serde_json::to_value(&AckResp {
210 success: ok,
211 error: if ok {
212 None
213 } else {
214 Some(format!("unknown or already-terminal session: {}", req.session_id))
215 },
216 })
217 .unwrap_or_default(),
218 ))
219 })
220 }),
221 );
222
223 let mgr = state.auth_session_manager.clone();
224 engine.register_handler(
225 COMMAND_AUTH_SUBMIT_CALLBACK,
226 Box::new(move |data, _ctx| {
227 let mgr = mgr.clone();
228 Box::pin(async move {
229 let req: SubmitAuthCallbackReq = serde_json::from_value(data)
230 .map_err(|e| format!("auth.submitcallback: {e}"))?;
231 let delivered = mgr
232 .send_to_stdin(&req.session_id, req.callback_url)
233 .await;
234 Ok(Some(
235 serde_json::to_value(&AckResp {
236 success: delivered,
237 error: if delivered {
238 None
239 } else {
240 Some(format!(
241 "no stdin sender for session {} (process exited or session unknown)",
242 req.session_id
243 ))
244 },
245 })
246 .unwrap_or_default(),
247 ))
248 })
249 }),
250 );
251
252 engine.register_handler(
253 COMMAND_AUTH_SUBMIT_API_KEY,
254 Box::new(move |data, _ctx| {
255 Box::pin(async move {
256 let _req: SubmitProviderApiKeyReq = serde_json::from_value(data)
257 .map_err(|e| format!("auth.submitapikey: {e}"))?;
258 // API-key path validates by running the provider's
259 // authCheckCommand with the key in the appropriate
260 // env var, then persists via wstore. That persistence
261 // is part of PR C (bundle auto-creation) per spec
262 // §10 — the validate-and-stash logic is here but
263 // wstore writes wait. For PR A we return an explicit
264 // error so frontend (PR B) sees a clear "not yet"
265 // signal while OAuth providers work end-to-end.
266 Err::<Option<serde_json::Value>, String>(
267 "auth.submitapikey: bundle persistence lands in PR C"
268 .to_string(),
269 )
270 })
271 }),
272 );
273}
274
275/// Spawn the provider's auth-login CLI and drive the session through
276/// to a terminal state. Background-only — returns immediately. The
277/// drain task feeds stdout+stderr lines into
278/// `AuthSessionManager::record_line`; on a login-success pattern OR
279/// child exit, runs the provider's authCheckCommand to confirm and
280/// transitions to Success or Failed.
281///
282/// On the success path (CLI exited cleanly + authCheckCommand passed),
283/// when `into_bundle_id` AND `bundle_dir` are both set, the function
284/// persists the OAuth binding into the bundle: a `SecretRef::OAuthConfigDir`
285/// account is upserted (status `valid`) and bound via
286/// `bundle_identity_bind`. This is the §4.5 OAuth-success invariant —
287/// after this point, future launches of any agent against the bundle
288/// resolve through `inject_identity_env`'s oauth-class dispatch (PR B)
289/// and reuse the same CLI-managed tokens inside `bundle_dir`.
290///
291/// On failure or when `into_bundle_id` is empty (ambient launch), the
292/// per-bundle binding step is skipped. The bundle row (if any was
293/// auto-created by the New Identity modal) stays — the user's next
294/// attempt reuses it.
295#[allow(clippy::too_many_arguments)]
296fn spawn_auth_cli(
297 mgr: Arc<crate::identity::auth_session::AuthSessionManager>,
298 wstore: Arc<WaveStore>,
299 broker: Arc<Broker>,
300 session_id: String,
301 provider_id: String,
302 into_bundle_id: Option<String>,
303 bundle_dir: Option<String>,
304 cli_path: String,
305 auth_login_args: Vec<String>,
306 auth_check_args: Vec<String>,
307 auth_env: std::collections::HashMap<String, String>,
308 requires_tty: bool,
309) {
310 use std::process::Stdio;
311 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
312 use tokio::process::Command;
313 use tokio::sync::mpsc;
314
315 // Channel for SubmitAuthCallback → CLI stdin forwarding.
316 // Buffer of 4 is enough — only one URL per session in normal use.
317 let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(4);
318
319 // PTY branch: providers like OpenClaw whose auth subcommand
320 // refuses to run when `isatty()==0`. Spawn via portable_pty so the
321 // child sees an interactive terminal, feed PTY output through the
322 // same record_line matcher the pipes path uses. Stdout/stderr are
323 // merged into the single PTY stream — record_line handles both as
324 // identical input.
325 if requires_tty {
326 spawn_auth_cli_pty(
327 mgr,
328 wstore,
329 broker,
330 session_id,
331 provider_id,
332 into_bundle_id,
333 bundle_dir,
334 cli_path,
335 auth_login_args,
336 auth_check_args,
337 auth_env,
338 stdin_tx,
339 stdin_rx,
340 );
341 return;
342 }
343
344 let mgr_for_task = mgr.clone();
345 let session_id_for_task = session_id.clone();
346 let cli_path_for_check = cli_path.clone();
347 let auth_check_args_for_check = auth_check_args.clone();
348 let auth_env_for_check = auth_env.clone();
349 let wstore_for_task = wstore.clone();
350 let broker_for_task = broker.clone();
351 let into_bundle_id_for_task = into_bundle_id.clone();
352 let bundle_dir_for_task = bundle_dir.clone();
353 let provider_id_for_task = provider_id.clone();
354
355 let handle = tokio::spawn(async move {
356 tracing::info!(
357 session_id = %session_id_for_task,
358 provider_id = %provider_id,
359 cli_path = %cli_path,
360 auth_login_args = ?auth_login_args,
361 "auth.spawn: launching provider CLI"
362 );
363
364 // Spawn the CLI. kill_on_drop guarantees cleanup if our
365 // task is aborted (cancel path).
366 let mut child = match Command::new(&cli_path)
367 .args(&auth_login_args)
368 .envs(&auth_env)
369 .stdin(Stdio::piped())
370 .stdout(Stdio::piped())
371 .stderr(Stdio::piped())
372 .kill_on_drop(true)
373 .spawn()
374 {
375 Ok(c) => {
376 tracing::info!(
377 session_id = %session_id_for_task,
378 pid = ?c.id(),
379 "auth.spawn: child started"
380 );
381 c
382 }
383 Err(e) => {
384 tracing::error!(
385 session_id = %session_id_for_task,
386 error = %e,
387 "auth.spawn: Command::spawn failed"
388 );
389 mgr_for_task.finish_failure(
390 &session_id_for_task,
391 format!("spawn `{cli_path}` failed: {e}"),
392 );
393 mgr_for_task.detach_process(&session_id_for_task);
394 return;
395 }
396 };
397
398 let stdout = child
399 .stdout
400 .take()
401 .expect("piped stdout");
402 let stderr = child
403 .stderr
404 .take()
405 .expect("piped stderr");
406 let mut stdin = child
407 .stdin
408 .take()
409 .expect("piped stdin");
410
411 // Stdin writer — forwards callback URLs (pasted back by the
412 // user when browser auto-open failed) into the CLI process.
413 let stdin_writer = tokio::spawn(async move {
414 while let Some(line) = stdin_rx.recv().await {
415 if stdin.write_all(line.as_bytes()).await.is_err() {
416 break;
417 }
418 if stdin.write_all(b"\n").await.is_err() {
419 break;
420 }
421 let _ = stdin.flush().await;
422 }
423 });
424
425 // Stdout drain — line-by-line into the session manager. When
426 // we see a login-success pattern, confirm via authCheckCommand
427 // and transition to Success. Don't break the loop — some CLIs
428 // continue printing after the success line.
429 let mgr_stdout = mgr_for_task.clone();
430 let sid_stdout = session_id_for_task.clone();
431 let cli_path_stdout = cli_path_for_check.clone();
432 let check_args_stdout = auth_check_args_for_check.clone();
433 let check_env_stdout = auth_env_for_check.clone();
434 let wstore_stdout = wstore_for_task.clone();
435 let broker_stdout = broker_for_task.clone();
436 let into_bundle_id_stdout = into_bundle_id_for_task.clone();
437 let bundle_dir_stdout = bundle_dir_for_task.clone();
438 let provider_id_stdout = provider_id_for_task.clone();
439 // Shared between drain + post-exit. The drain sets it after
440 // persisting on a LoginSuccess match; the post-exit transition
441 // block (below) checks it and skips its entire success path if
442 // already transitioned — without this guard the drain's
443 // persist + post-exit's persist both ran on every successful
444 // OAuth, producing orphan IdentityAccount rows (each `Uuid::new_v4`)
445 // and duplicate `identitybundlebindings:changed:<id>` publishes.
446 // Reagent P1 on #981.
447 let success_transitioned = Arc::new(AtomicBool::new(false));
448 let success_transitioned_drain = Arc::clone(&success_transitioned);
449 let stdout_drain = tokio::spawn(async move {
450 let mut lines = BufReader::new(stdout).lines();
451 while let Ok(Some(line)) = lines.next_line().await {
452 // NOT logging the raw line — OAuth providers print
453 // auth URLs / callback codes / device codes on stdout
454 // and even debug-level logs would persist those into
455 // ~/.agentmux/logs/. Length-only is enough for "the
456 // CLI emitted something" diagnostics. Reagent P2 on #847.
457 tracing::debug!(session_id = %sid_stdout, bytes = line.len(), "auth.spawn: stdout line");
458 let m = mgr_stdout.record_line(&sid_stdout, &line);
459 if !success_transitioned_drain.load(Ordering::Acquire)
460 && matches!(
461 m,
462 Some(crate::identity::auth_patterns::AuthPatternMatch::LoginSuccess { .. })
463 )
464 {
465 if confirm_authenticated(
466 &cli_path_stdout,
467 &check_args_stdout,
468 &check_env_stdout,
469 )
470 .await
471 {
472 // Persist the OAuthConfigDir binding into the
473 // bundle. When `into_bundle_id` is empty (no
474 // bundle context, e.g. ambient continuation) or
475 // `bundle_dir` failed to resolve at spawn, the
476 // helper skips persistence and the session still
477 // succeeds — the user just won't get bundle-
478 // backed token reuse next launch. Bundle id
479 // returned to the frontend is the real bundle id
480 // when persistence happened, or a synthetic
481 // placeholder otherwise so existing UI keeps
482 // its filter-on-prefix behaviour.
483 let bundle_id = persist_oauth_binding_or_synthetic(
484 &wstore_stdout,
485 &broker_stdout,
486 into_bundle_id_stdout.as_deref(),
487 &provider_id_stdout,
488 bundle_dir_stdout.as_deref(),
489 &sid_stdout,
490 );
491 mgr_stdout.finish_success(&sid_stdout, bundle_id);
492 success_transitioned_drain.store(true, Ordering::Release);
493 }
494 }
495 }
496 });
497
498 // Stderr drain — same matcher path. Some CLIs emit the OAuth
499 // URL on stderr (verbose logging style).
500 let mgr_stderr = mgr_for_task.clone();
501 let sid_stderr = session_id_for_task.clone();
502 let stderr_drain = tokio::spawn(async move {
503 let mut lines = BufReader::new(stderr).lines();
504 while let Ok(Some(line)) = lines.next_line().await {
505 // Same redaction rationale as stdout above.
506 tracing::debug!(session_id = %sid_stderr, bytes = line.len(), "auth.spawn: stderr line");
507 let _ = mgr_stderr.record_line(&sid_stderr, &line);
508 }
509 });
510
511 // Wait for the child to exit (or for our task to be aborted
512 // by cancel_session — in which case kill_on_drop handles the
513 // child).
514 let exit = child.wait().await;
515 tracing::info!(session_id = %session_id_for_task, exit = ?exit, "auth.spawn: child exited");
516
517 // Let stdout/stderr drains catch any final lines.
518 let _ = stdout_drain.await;
519 let _ = stderr_drain.await;
520 // Stdin writer exits when the tx side is dropped (after this
521 // task ends) — abort defensively in case it's blocked on a
522 // pending receive.
523 stdin_writer.abort();
524
525 // Final transition if we haven't already hit Success on the
526 // login-pattern path. Re-run authCheck because some CLIs exit
527 // cleanly without printing a "logged in as" line.
528 //
529 // Skip the entire block when the drain has already transitioned
530 // (LoginSuccess pattern matched + confirm_authenticated + persist
531 // ran). Without this guard, persist_oauth_binding_or_synthetic
532 // would fire a second time on every successful OAuth — a fresh
533 // IdentityAccount UUID would be upserted, `bundle_identity_bind`
534 // would repoint to the new account (orphaning the first), and
535 // the broker would re-publish the bindings-changed event.
536 // Reagent P1 on #981.
537 if !success_transitioned.load(Ordering::Acquire) {
538 match exit {
539 Ok(s) if s.success() => {
540 if confirm_authenticated(
541 &cli_path_for_check,
542 &auth_check_args_for_check,
543 &auth_env_for_check,
544 )
545 .await
546 {
547 let bundle_id = persist_oauth_binding_or_synthetic(
548 &wstore_for_task,
549 &broker_for_task,
550 into_bundle_id_for_task.as_deref(),
551 &provider_id_for_task,
552 bundle_dir_for_task.as_deref(),
553 &session_id_for_task,
554 );
555 mgr_for_task.finish_success(&session_id_for_task, bundle_id);
556 } else {
557 mgr_for_task.finish_failure(
558 &session_id_for_task,
559 "CLI exited 0 but authentication check failed".to_string(),
560 );
561 }
562 }
563 Ok(s) => {
564 mgr_for_task.finish_failure(
565 &session_id_for_task,
566 format!("CLI exited with status {s}"),
567 );
568 }
569 Err(e) => {
570 mgr_for_task.finish_failure(
571 &session_id_for_task,
572 format!("wait error: {e}"),
573 );
574 }
575 }
576 }
577
578 mgr_for_task.detach_process(&session_id_for_task);
579 });
580
581 mgr.attach_process(&session_id, handle, stdin_tx);
582}
583
584/// PTY-backed variant of spawn_auth_cli. Used for providers whose auth
585/// subcommand bails on `isatty()==0` (currently OpenClaw). Mirrors the
586/// pipes path's lifecycle: spawn → drain → confirm → finish_success or
587/// finish_failure → detach. Stdout+stderr are merged on the PTY side;
588/// `record_line` doesn't care which stream a line came from.
589///
590/// Same OAuth-success invariant as the pipes path — when
591/// `into_bundle_id` and `bundle_dir` are both set and `confirm_authenticated`
592/// returns true, persists `SecretRef::OAuthConfigDir` + binding before
593/// `finish_success`.
594#[allow(clippy::too_many_arguments)]
595fn spawn_auth_cli_pty(
596 mgr: Arc<crate::identity::auth_session::AuthSessionManager>,
597 wstore: Arc<WaveStore>,
598 broker: Arc<Broker>,
599 session_id: String,
600 provider_id: String,
601 into_bundle_id: Option<String>,
602 bundle_dir: Option<String>,
603 cli_path: String,
604 auth_login_args: Vec<String>,
605 auth_check_args: Vec<String>,
606 auth_env: std::collections::HashMap<String, String>,
607 stdin_tx: tokio::sync::mpsc::Sender<String>,
608 mut stdin_rx: tokio::sync::mpsc::Receiver<String>,
609) {
610 use portable_pty::{native_pty_system, CommandBuilder, PtySize};
611
612 let mgr_for_task = mgr.clone();
613 let session_id_for_task = session_id.clone();
614 let cli_path_for_check = cli_path.clone();
615 let auth_check_args_for_check = auth_check_args.clone();
616 let auth_env_for_check = auth_env.clone();
617 let wstore_for_task = wstore.clone();
618 let broker_for_task = broker.clone();
619 let into_bundle_id_for_task = into_bundle_id.clone();
620 let bundle_dir_for_task = bundle_dir.clone();
621 let provider_id_for_task = provider_id.clone();
622
623 tracing::info!(
624 session_id = %session_id,
625 provider_id = %provider_id,
626 cli_path = %cli_path,
627 auth_login_args = ?auth_login_args,
628 "auth.spawn (PTY): launching provider CLI"
629 );
630
631 // Allocate the PTY, build the command, spawn the child, and
632 // register the PID — all synchronously, BEFORE the async drain/
633 // wait task and BEFORE `attach_process`. That way no
634 // `cancel_session` racing with the spawn can find a registered
635 // handle but a missing PID; either the PID is registered first
636 // (cancel can kill), or the early-return path fired and there
637 // is no child to kill.
638 let pty_system = native_pty_system();
639 let pair = match pty_system.openpty(PtySize {
640 rows: 24,
641 cols: 80,
642 pixel_width: 0,
643 pixel_height: 0,
644 }) {
645 Ok(p) => p,
646 Err(e) => {
647 // PTY allocation failure means the provider's auth
648 // subprocess can't get the interactive TTY it requires
649 // (openclaw `models auth login`). Surface as the typed
650 // `AMX-AUTH-001` so the FailedBanner shows the friendly
651 // recovery hint instead of "openpty failed: <e>".
652 let mux = agentmux_common::AgentMuxError::AuthRequiresTty {
653 provider: provider_id.clone(),
654 };
655 tracing::error!(
656 target: "amx::error",
657 session_id = %session_id,
658 amx_code = %mux.code(),
659 error = %e,
660 "auth.spawn (PTY): openpty failed"
661 );
662 mgr.finish_failure(&session_id, mux.to_wire().to_string());
663 mgr.detach_process(&session_id);
664 return;
665 }
666 };
667
668 let mut cmd = CommandBuilder::new(&cli_path);
669 for a in &auth_login_args {
670 cmd.arg(a);
671 }
672 for (k, v) in &auth_env {
673 cmd.env(k, v);
674 }
675 if let Ok(cwd) = std::env::current_dir() {
676 cmd.cwd(cwd);
677 }
678
679 let child = match pair.slave.spawn_command(cmd) {
680 Ok(c) => c,
681 Err(e) => {
682 tracing::error!(
683 session_id = %session_id,
684 error = %e,
685 "auth.spawn (PTY): spawn_command failed"
686 );
687 mgr.finish_failure(&session_id, format!("PTY spawn `{cli_path}` failed: {e}"));
688 mgr.detach_process(&session_id);
689 return;
690 }
691 };
692
693 if let Some(pid) = child.process_id() {
694 mgr.attach_pty_pid(&session_id, pid);
695 }
696
697 let reader = match pair.master.try_clone_reader() {
698 Ok(r) => r,
699 Err(e) => {
700 tracing::error!(session_id = %session_id, error = %e, "auth.spawn (PTY): try_clone_reader failed");
701 mgr.finish_failure(&session_id, format!("PTY reader: {e}"));
702 mgr.detach_process(&session_id);
703 return;
704 }
705 };
706 let writer = match pair.master.take_writer() {
707 Ok(w) => w,
708 Err(e) => {
709 tracing::error!(session_id = %session_id, error = %e, "auth.spawn (PTY): take_writer failed");
710 mgr.finish_failure(&session_id, format!("PTY writer: {e}"));
711 mgr.detach_process(&session_id);
712 return;
713 }
714 };
715
716 let handle = tokio::spawn(async move {
717
718 // Stdin writer: forward callback URLs from the manager into
719 // the child's PTY input. portable_pty's master writer is sync;
720 // wrap each write in `block_in_place` so the blocking IO
721 // doesn't starve the tokio reactor when the PTY input buffer
722 // is full.
723 let stdin_writer_handle = tokio::spawn(async move {
724 let mut writer = writer;
725 while let Some(line) = stdin_rx.recv().await {
726 let res = tokio::task::block_in_place(|| {
727 use std::io::Write;
728 writer
729 .write_all(line.as_bytes())
730 .and_then(|_| writer.write_all(b"\n"))
731 .and_then(|_| writer.flush())
732 });
733 if res.is_err() {
734 break;
735 }
736 }
737 });
738
739 // Drain: synchronous line reader from PTY master, feeds into
740 // the same record_line matcher the pipes path uses. Runs in a
741 // blocking thread; sends parsed events through a oneshot when
742 // we hit a login-success pattern (so the async task can run
743 // confirm_authenticated without crossing the blocking thread).
744 let mgr_drain = mgr_for_task.clone();
745 let sid_drain = session_id_for_task.clone();
746 let cli_path_drain = cli_path_for_check.clone();
747 let check_args_drain = auth_check_args_for_check.clone();
748 let check_env_drain = auth_env_for_check.clone();
749 let wstore_drain = wstore_for_task.clone();
750 let broker_drain = broker_for_task.clone();
751 let into_bundle_id_drain = into_bundle_id_for_task.clone();
752 let bundle_dir_drain = bundle_dir_for_task.clone();
753 let provider_id_drain = provider_id_for_task.clone();
754 // Shared with the post-exit fallback block below — same pattern
755 // as the pipes path. Drain's detached `Handle::current().spawn`
756 // task does the persist + finish_success; the drain sets this
757 // atomic BEFORE returning so the outer's check sees it. The
758 // detached task may still be in flight when the outer checks
759 // (PTY drain returns on EOF, the persist task runs async) — that
760 // is fine because the OUTER would only re-persist if the atomic
761 // is false, which it never is once the drain matched the
762 // LoginSuccess pattern. Reagent P1 on #981.
763 let success_transitioned = Arc::new(AtomicBool::new(false));
764 let success_transitioned_drain = Arc::clone(&success_transitioned);
765 let drain_handle = tokio::task::spawn_blocking(move || {
766 use std::io::BufRead;
767 let mut reader = std::io::BufReader::new(reader);
768 let mut line = String::new();
769 // The detached confirm/persist task's JoinHandle, populated
770 // when the drain matches LoginSuccess. Returned to outer so
771 // it can await completion before its post-exit check —
772 // without that wait, outer would race the detached and
773 // either skip too soon (if it sets the atomic eagerly) or
774 // double-persist. Reagent P1 follow-up on #981.
775 let mut maybe_detached: Option<tokio::task::JoinHandle<()>> = None;
776 loop {
777 line.clear();
778 match reader.read_line(&mut line) {
779 Ok(0) => break, // EOF
780 Ok(_) => {
781 // Same redaction policy as the pipes path —
782 // OAuth URLs / codes can be in here.
783 tracing::debug!(session_id = %sid_drain, bytes = line.len(), "auth.spawn (PTY): line");
784 let m = mgr_drain.record_line(&sid_drain, &line);
785 if maybe_detached.is_none()
786 && matches!(
787 m,
788 Some(crate::identity::auth_patterns::AuthPatternMatch::LoginSuccess { .. })
789 )
790 {
791 // Hand off to async to call confirm_authenticated.
792 // The detached task OWNS the transition —
793 // it calls finish_success on confirm-OK and
794 // finish_failure on confirm-NOT-OK, AND
795 // sets the shared atomic in BOTH cases. The
796 // drain only captures the JoinHandle here
797 // and returns it; outer awaits both drain
798 // and the inner handle before its check, so
799 // the atomic is correctly settled by then.
800 let cli = cli_path_drain.clone();
801 let args = check_args_drain.clone();
802 let env = check_env_drain.clone();
803 let mgr2 = mgr_drain.clone();
804 let sid2 = sid_drain.clone();
805 let wstore2 = wstore_drain.clone();
806 let broker2 = broker_drain.clone();
807 let into_bundle_id2 = into_bundle_id_drain.clone();
808 let bundle_dir2 = bundle_dir_drain.clone();
809 let provider_id2 = provider_id_drain.clone();
810 let success_for_detached = Arc::clone(&success_transitioned_drain);
811 let handle = tokio::runtime::Handle::current().spawn(async move {
812 if confirm_authenticated(&cli, &args, &env).await {
813 let bundle_id = persist_oauth_binding_or_synthetic(
814 &wstore2,
815 &broker2,
816 into_bundle_id2.as_deref(),
817 &provider_id2,
818 bundle_dir2.as_deref(),
819 &sid2,
820 );
821 mgr2.finish_success(&sid2, bundle_id);
822 // Atomic set ONLY on confirm-success
823 // — mirrors the pipes path. On
824 // confirm-miss the detached does
825 // nothing (atomic stays false),
826 // outer's post-exit fallback gets
827 // another shot at confirm by the
828 // time creds are fully written.
829 // Outer awaits this handle first,
830 // so the atomic is correctly
831 // reflected by the time it checks.
832 // codex P1 follow-up on #981.
833 success_for_detached.store(true, Ordering::Release);
834 }
835 });
836 maybe_detached = Some(handle);
837 }
838 }
839 Err(_) => break,
840 }
841 }
842 maybe_detached
843 });
844
845 // Wait for the child in a blocking task. pair (master + slave)
846 // moves into the closure so its destructor runs AFTER
847 // child.wait() — ConPTY contract on Windows.
848 let wait_handle = tokio::task::spawn_blocking(move || {
849 let mut child = child;
850 let exit = child.wait();
851 drop(pair);
852 exit
853 });
854
855 let exit = wait_handle.await;
856 tracing::info!(session_id = %session_id_for_task, exit = ?exit, "auth.spawn (PTY): child exited");
857
858 // Await the drain itself, THEN its optional detached
859 // confirm/persist task. Both have to complete before the
860 // post-exit check — otherwise outer might see atomic=false
861 // (detached still confirming), run its fallback persist, AND
862 // the detached's persist also runs → the original double-
863 // persist race re-opens. Reagent P1 follow-up on #981.
864 let detached = drain_handle.await.ok().flatten();
865 if let Some(h) = detached {
866 let _ = h.await;
867 }
868 stdin_writer_handle.abort();
869
870 // Final transition fallback — some CLIs exit cleanly without
871 // emitting a login-success line that record_line recognizes.
872 //
873 // Skip the whole block when the drain already transitioned —
874 // same double-persist guard as the pipes path. Reagent P1 on
875 // #981.
876 if !success_transitioned.load(Ordering::Acquire) {
877 match exit {
878 Ok(Ok(s)) if s.success() => {
879 if confirm_authenticated(
880 &cli_path_for_check,
881 &auth_check_args_for_check,
882 &auth_env_for_check,
883 )
884 .await
885 {
886 let bundle_id = persist_oauth_binding_or_synthetic(
887 &wstore_for_task,
888 &broker_for_task,
889 into_bundle_id_for_task.as_deref(),
890 &provider_id_for_task,
891 bundle_dir_for_task.as_deref(),
892 &session_id_for_task,
893 );
894 mgr_for_task.finish_success(&session_id_for_task, bundle_id);
895 } else {
896 mgr_for_task.finish_failure(
897 &session_id_for_task,
898 "CLI exited cleanly but auth-check still failed".to_string(),
899 );
900 }
901 }
902 Ok(Ok(s)) => {
903 mgr_for_task.finish_failure(
904 &session_id_for_task,
905 format!("CLI exited with code {:?}", s.exit_code()),
906 );
907 }
908 Ok(Err(e)) => {
909 mgr_for_task.finish_failure(
910 &session_id_for_task,
911 format!("PTY wait error: {e}"),
912 );
913 }
914 Err(e) => {
915 mgr_for_task.finish_failure(
916 &session_id_for_task,
917 format!("PTY wait task join error: {e}"),
918 );
919 }
920 }
921 }
922
923 mgr_for_task.detach_process(&session_id_for_task);
924 });
925
926 mgr.attach_process(&session_id, handle, stdin_tx);
927}
928
929/// Compute the per-bundle OAuth config dir + ensure it exists +
930/// override the provider's `auth_config_dir_env_var` entry in `auth_env`.
931///
932/// Returns `Some(<absolute path string>)` when:
933/// - `into_bundle_id` is `Some` and non-empty AND
934/// - the provider is registered in the CLI provider registry AND
935/// - the provider declares an `auth_config_dir_env_var` (oauth-class
936/// providers — claude / codex / openclaw — per spec §4.3) AND
937/// - `DataPaths::from_env()` resolves AND
938/// - `create_dir_all` succeeds.
939///
940/// Otherwise returns `None` and leaves `auth_env` untouched. Callers
941/// must continue without per-bundle isolation (legacy ambient path,
942/// or skip the binding-persist step) — never abort the OAuth start.
943///
944/// Per `SPEC_OAUTH_IDENTITY_BUNDLES_2026_05_22.md` §4.5: the dir
945/// (and the env-var key) come from the CLI provider registry
946/// (`backend::providers::get_provider(id)`) so the resolver / spawn
947/// path / OAuth-start handler never drift on which env var redirects
948/// each CLI's config home. The single source of truth lives in
949/// `agentmux-srv/src/backend/providers.rs`.
950fn compute_and_ensure_bundle_dir(
951 into_bundle_id: Option<&str>,
952 provider_id: &str,
953 auth_env: &mut std::collections::HashMap<String, String>,
954) -> Option<String> {
955 let bundle_id = into_bundle_id.filter(|s| !s.is_empty())?;
956 // Gate on provider_class so api-key-class providers (which have a
957 // registry entry with a non-empty `auth_config_dir_env_var` —
958 // e.g. kimi's `KIMI_SHARE_DIR`) never go through the per-bundle
959 // OAuth-dir path. Only claude / codex / openclaw — the spec §4.3
960 // oauth-class providers — get the per-bundle override.
961 match crate::identity::resolver::provider_class(provider_id) {
962 Some(crate::identity::resolver::ProviderClass::OAuth { .. }) => {}
963 _ => return None,
964 }
965 let provider = match get_provider(provider_id) {
966 Some(p) => p,
967 None => {
968 // OAuth-class per provider_class but missing from the CLI
969 // registry — should be impossible (resolver reads the env
970 // var from the registry itself), but treat as a soft fail.
971 tracing::warn!(
972 target: "identity",
973 provider_id,
974 "auth.start: oauth-class provider not in registry — skipping per-bundle dir override"
975 );
976 return None;
977 }
978 };
979 // Empty env-var name → no isolation possible (oauth-class providers
980 // should never have this empty per spec, but belt-and-braces).
981 if provider.auth_config_dir_env_var.is_empty() {
982 return None;
983 }
984 let paths = match agentmux_common::DataPaths::from_env() {
985 Some(p) => p,
986 None => {
987 tracing::warn!(
988 target: "identity",
989 provider_id,
990 bundle_id,
991 "auth.start: DataPaths::from_env() returned None — skipping per-bundle dir override"
992 );
993 return None;
994 }
995 };
996 // identity_dir rejects unsafe path segments (empty / `.` / `..` /
997 // segment with `/`, `\`, drive-letter, …). bundle_id comes from
998 // the auth.start request body, so this guard prevents a crafted
999 // id from escaping the identities root.
1000 let bundle_root = match paths.identity_dir(bundle_id) {
1001 Some(p) => p,
1002 None => {
1003 tracing::warn!(
1004 target: "identity",
1005 provider_id,
1006 bundle_id,
1007 "auth.start: bundle_id is not a safe path segment — skipping per-bundle dir override"
1008 );
1009 return None;
1010 }
1011 };
1012 let dir = bundle_root.join(provider.auth_dir_name);
1013 if let Err(e) = std::fs::create_dir_all(&dir) {
1014 tracing::warn!(
1015 target: "identity",
1016 provider_id,
1017 bundle_id,
1018 error = %e,
1019 path = %dir.display(),
1020 "auth.start: failed to create per-bundle config dir — skipping override"
1021 );
1022 return None;
1023 }
1024 let dir_str = dir.to_string_lossy().to_string();
1025 // Override (or insert) the provider's config-dir env var. The
1026 // frontend may have computed the legacy ambient dir via
1027 // `ensureAuthDir(providerId)` and put it here under the same key;
1028 // we replace it with the per-bundle dir so the OAuth CLI writes
1029 // its tokens inside the bundle, not in the ambient version-config
1030 // dir.
1031 auth_env.insert(
1032 provider.auth_config_dir_env_var.to_string(),
1033 dir_str.clone(),
1034 );
1035 tracing::info!(
1036 target: "identity",
1037 provider_id,
1038 bundle_id,
1039 env_var = provider.auth_config_dir_env_var,
1040 dir = %dir.display(),
1041 "auth.start: per-bundle OAuth config dir wired"
1042 );
1043 Some(dir_str)
1044}
1045
1046/// On a successful OAuth handshake (CLI exited 0 + authCheckCommand
1047/// confirmed), persist the OAuth binding into the bundle and return
1048/// the real bundle id to use in the `Success` wire status.
1049///
1050/// "Persist the binding" =
1051/// 1. Upsert an `IdentityAccount` with:
1052/// - provider = `<provider_id>`
1053/// - kind = "oauth"
1054/// - secret_ref = `SecretRef::OAuthConfigDir { dir }`
1055/// - status = "valid"
1056/// 2. Bind it via `bundle_identity_bind(bundle_id, provider, account_id)`.
1057/// 3. Publish `identitybundlebindings:changed:<bundle_id>` so the
1058/// Launch modal (and any other open Identity pane) re-fetches
1059/// the new binding and the launch button enables without a
1060/// manual refresh.
1061///
1062/// Per-binding errors (account upsert, bind, broker publish) are
1063/// logged + downgraded — the success transition still fires with the
1064/// real bundle id when possible, and falls back to the legacy
1065/// `pending-bundle-for-<sid>` synthetic when not. The OAuth CLI's
1066/// tokens are already on disk inside `bundle_dir` either way; the
1067/// resolver layer (PR B) just won't find a binding to point at them.
1068/// Same shape as `inject_identity_env`'s "log + skip, never abort".
1069///
1070/// When `into_bundle_id` is empty or `bundle_dir` is `None`, returns
1071/// the synthetic placeholder unchanged — the legacy ambient path
1072/// (PR A behaviour) is preserved.
1073fn persist_oauth_binding_or_synthetic(
1074 wstore: &Arc<WaveStore>,
1075 broker: &Arc<Broker>,
1076 into_bundle_id: Option<&str>,
1077 provider_id: &str,
1078 bundle_dir: Option<&str>,
1079 session_id: &str,
1080) -> String {
1081 let synthetic = || format!("pending-bundle-for-{session_id}");
1082 let bundle_id = match into_bundle_id.filter(|s| !s.is_empty()) {
1083 Some(b) => b,
1084 None => return synthetic(),
1085 };
1086 let dir = match bundle_dir.filter(|s| !s.is_empty()) {
1087 Some(d) => d,
1088 None => {
1089 tracing::warn!(
1090 target: "identity",
1091 bundle_id,
1092 provider_id,
1093 "auth success: bundle_dir unresolved — skipping binding persist"
1094 );
1095 return synthetic();
1096 }
1097 };
1098 let now = SystemTime::now()
1099 .duration_since(UNIX_EPOCH)
1100 .map(|d| d.as_millis() as i64)
1101 .unwrap_or(0);
1102 // Re-OAuth (token expiry / Reconnect): load any existing binding
1103 // for this (bundle, provider) and reuse its `account_id` so the
1104 // upsert UPDATES the prior IdentityAccount in place instead of
1105 // creating a fresh UUID + orphaning the old row in
1106 // `db_identity_accounts`. Fresh UUID only on first bind. codex
1107 // P2 follow-up on #981.
1108 let account_id = wstore
1109 .bundle_identity_bindings(bundle_id)
1110 .ok()
1111 .into_iter()
1112 .flatten()
1113 .find(|b| b.provider == provider_id)
1114 .map(|b| b.account_id)
1115 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1116 let account = IdentityAccount {
1117 id: account_id.clone(),
1118 name: format!("{provider_id}-oauth"),
1119 provider: provider_id.to_string(),
1120 kind: "oauth".to_string(),
1121 display_name: String::new(),
1122 secret_ref: SecretRef::OAuthConfigDir { dir: dir.to_string() },
1123 context: serde_json::json!({}),
1124 // Per spec §4.4: a binding the user JUST OAuth'd into is `valid`
1125 // by definition — the token file was written within the past
1126 // few seconds. The resolver's expiry probe (PR D) refines this
1127 // on every spawn.
1128 status: crate::identity::resolver::oauth_status::VALID.to_string(),
1129 created_at: now,
1130 updated_at: now,
1131 };
1132 if let Err(e) = wstore.identity_upsert(&account) {
1133 tracing::warn!(
1134 target: "identity",
1135 bundle_id,
1136 provider_id,
1137 error = %e,
1138 "auth success: identity_upsert failed — falling back to synthetic bundle id"
1139 );
1140 return synthetic();
1141 }
1142 if let Err(e) = wstore.bundle_identity_bind(bundle_id, provider_id, &account_id) {
1143 tracing::warn!(
1144 target: "identity",
1145 bundle_id,
1146 provider_id,
1147 account_id,
1148 error = %e,
1149 "auth success: bundle_identity_bind failed — account row persisted but no binding"
1150 );
1151 return synthetic();
1152 }
1153 // Broker push so the frontend's `identitybundlebindings:changed:<id>`
1154 // listener (AgentLaunchModal createEffect) refetches bindings and
1155 // flips `hasMatchingBinding` → true without a manual reload.
1156 broker.publish(crate::backend::wps::WaveEvent {
1157 event: format!("identitybundlebindings:changed:{bundle_id}"),
1158 scopes: vec![],
1159 sender: String::new(),
1160 persist: 0,
1161 data: None,
1162 });
1163 tracing::info!(
1164 target: "identity",
1165 bundle_id,
1166 provider_id,
1167 account_id,
1168 dir,
1169 "auth success: OAuth binding persisted"
1170 );
1171 bundle_id.to_string()
1172}
1173
1174/// Run the provider's auth-check subcommand and return true if it
1175/// exits 0. Failure modes (binary missing, network error, etc.) are
1176/// all treated as "not authenticated" — the caller will then either
1177/// keep waiting (drain task loop) or transition to Failed (exit
1178/// fallback).
1179async fn confirm_authenticated(
1180 cli_path: &str,
1181 args: &[String],
1182 env: &std::collections::HashMap<String, String>,
1183) -> bool {
1184 use std::process::Stdio;
1185 use tokio::process::Command;
1186 match Command::new(cli_path)
1187 .args(args)
1188 .envs(env)
1189 .stdin(Stdio::null())
1190 .stdout(Stdio::null())
1191 .stderr(Stdio::null())
1192 .kill_on_drop(true)
1193 .status()
1194 .await
1195 {
1196 Ok(s) => s.success(),
1197 Err(_) => false,
1198 }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use super::*;
1204 use crate::backend::storage::wstore::Identity;
1205
1206 // Request shape parsing tests — verify the wire contract matches
1207 // what the frontend (PR B) will send. The end-to-end RPC-engine
1208 // invocation is covered at the integration level once the CLI
1209 // spawn lands; the underlying AuthSessionManager behaviour is
1210 // covered in `identity::auth_session::tests`.
1211
1212 #[test]
1213 fn start_req_parses_minimal() {
1214 let v = serde_json::json!({
1215 "providerId": "claude",
1216 "cliPath": "/usr/bin/claude",
1217 "authLoginArgs": ["login"],
1218 "authCheckArgs": ["whoami"]
1219 });
1220 let r: StartProviderAuthReq = serde_json::from_value(v).unwrap();
1221 assert_eq!(r.provider_id, "claude");
1222 assert_eq!(r.cli_path, "/usr/bin/claude");
1223 assert_eq!(r.auth_login_args, vec!["login"]);
1224 assert_eq!(r.auth_check_args, vec!["whoami"]);
1225 assert!(r.into_bundle_id.is_none());
1226 assert!(r.auth_env.is_empty());
1227 }
1228
1229 #[test]
1230 fn start_req_parses_with_bundle_id() {
1231 let v = serde_json::json!({
1232 "providerId": "codex",
1233 "cliPath": "/usr/bin/codex",
1234 "authLoginArgs": ["auth", "login"],
1235 "authCheckArgs": ["auth", "status"],
1236 "authEnv": { "FOO": "bar" },
1237 "intoBundleId": "bundle-1"
1238 });
1239 let r: StartProviderAuthReq = serde_json::from_value(v).unwrap();
1240 assert_eq!(r.provider_id, "codex");
1241 assert_eq!(r.into_bundle_id.as_deref(), Some("bundle-1"));
1242 assert_eq!(r.auth_env.get("FOO").map(String::as_str), Some("bar"));
1243 }
1244
1245 #[test]
1246 fn poll_req_round_trips() {
1247 let v = serde_json::json!({ "sessionId": "auth-xyz" });
1248 let r: PollProviderAuthReq = serde_json::from_value(v).unwrap();
1249 assert_eq!(r.session_id, "auth-xyz");
1250 }
1251
1252 #[test]
1253 fn submit_callback_req_round_trips() {
1254 let v = serde_json::json!({
1255 "sessionId": "auth-xyz",
1256 "callbackUrl": "https://example.com/cb?code=abc"
1257 });
1258 let r: SubmitAuthCallbackReq = serde_json::from_value(v).unwrap();
1259 assert_eq!(r.session_id, "auth-xyz");
1260 assert!(r.callback_url.contains("code=abc"));
1261 }
1262
1263 #[test]
1264 fn submit_api_key_req_round_trips() {
1265 let v = serde_json::json!({
1266 "providerId": "openclaw",
1267 "apiKey": "sk-test",
1268 "accountName": "my-key"
1269 });
1270 let r: SubmitProviderApiKeyReq = serde_json::from_value(v).unwrap();
1271 assert_eq!(r.provider_id, "openclaw");
1272 assert_eq!(r.api_key, "sk-test");
1273 assert_eq!(r.account_name, "my-key");
1274 assert!(r.into_bundle_id.is_none());
1275 }
1276
1277 #[test]
1278 fn ack_resp_omits_error_when_success() {
1279 let r = AckResp { success: true, error: None };
1280 let v = serde_json::to_value(&r).unwrap();
1281 assert_eq!(v, serde_json::json!({ "success": true }));
1282 }
1283
1284 #[test]
1285 fn ack_resp_includes_error_when_failure() {
1286 let r = AckResp { success: false, error: Some("boom".into()) };
1287 let v = serde_json::to_value(&r).unwrap();
1288 assert_eq!(v, serde_json::json!({ "success": false, "error": "boom" }));
1289 }
1290
1291 // ── OAuth Bundles PR C invariant ──────────────────────────────
1292 //
1293 // Round-trip: spawn-dir computation → persist OAuth binding →
1294 // bundle row carries a `SecretRef::OAuthConfigDir` pointing at
1295 // exactly that dir, with status = "valid".
1296
1297 #[test]
1298 fn persist_oauth_binding_round_trip() {
1299 // Per spec §4.5: on auth success against bundle <id> for
1300 // provider <P>, the handler must
1301 // 1. compute the dir = DataPaths::identity_dir(<id>).join(P.auth_dir_name)
1302 // 2. set the provider's auth_config_dir_env_var to that dir
1303 // in the spawn env (so the CLI writes tokens there, not
1304 // at ~/.<P>/),
1305 // 3. on confirm, upsert an `IdentityAccount` whose
1306 // `secret_ref` is `SecretRef::OAuthConfigDir { dir }`
1307 // and `status = "valid"`,
1308 // 4. bind it via `bundle_identity_bind`.
1309 //
1310 // Together those three facts let the next launch's
1311 // `inject_identity_env` find the OAuth account, read its
1312 // OAuthConfigDir pointer, and inject CLAUDE_CONFIG_DIR with
1313 // the same path — closing the bundle loop.
1314
1315 // Use a tempdir as the agentmux home so DataPaths resolves
1316 // without depending on the user's real ~/.agentmux. Local
1317 // mutex so two env-var-touching tests in this module can't
1318 // race (agentmux-common's TEST_ENV_LOCK is `pub(crate)`,
1319 // not reachable from here).
1320 static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1321 let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1322 let tmp = tempfile::TempDir::new().unwrap();
1323 std::env::set_var("AGENTMUX_HOME_OVERRIDE", tmp.path());
1324 // DataPaths::from_env() wants every AGENTMUX_*_DIR — easiest
1325 // to compute paths once and export them.
1326 let paths = agentmux_common::DataPaths::resolve(
1327 "0.0.0-test",
1328 &agentmux_common::RuntimeMode::Installed,
1329 )
1330 .unwrap();
1331 paths.ensure_dirs().unwrap();
1332 for (k, v) in paths.to_env_vars() {
1333 std::env::set_var(k, v);
1334 }
1335
1336 let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1337 let broker = Arc::new(crate::backend::wps::Broker::new());
1338
1339 // Seed the bundle that the OAuth flow targets (PR 1 #969
1340 // creates this row up-front when the user names the new
1341 // identity; we mirror that here).
1342 let bundle_id = "id-oauth-pr-c";
1343 let identity = Identity {
1344 id: bundle_id.to_string(),
1345 name: "Work".to_string(),
1346 description: String::new(),
1347 is_blank: false,
1348 created_at: 0,
1349 updated_at: 0,
1350 };
1351 wstore.bundle_identity_upsert(&identity).unwrap();
1352
1353 // Step 1+2: compute and ensure dir, inject env var.
1354 let mut auth_env: std::collections::HashMap<String, String> =
1355 std::collections::HashMap::new();
1356 // Simulate the frontend's legacy `ensureAuthDir` putting an
1357 // ambient dir in the env first — the function MUST override
1358 // it with the per-bundle dir.
1359 auth_env.insert("CLAUDE_CONFIG_DIR".to_string(), "/legacy/ambient/claude".to_string());
1360 let dir = compute_and_ensure_bundle_dir(
1361 Some(bundle_id),
1362 "claude",
1363 &mut auth_env,
1364 )
1365 .expect("oauth-class provider with bundle id must yield a dir");
1366
1367 // Dir is `DataPaths::identity_dir(<id>).join(<auth_dir_name>)`
1368 // — claude's `auth_dir_name` is "claude" per the registry.
1369 let expected = paths
1370 .identity_dir(bundle_id)
1371 .expect("test bundle_id is a safe segment")
1372 .join("claude");
1373 assert_eq!(
1374 std::path::Path::new(&dir),
1375 expected,
1376 "per-bundle dir must equal DataPaths::identity_dir(id).join(auth_dir_name)"
1377 );
1378 // create_dir_all happened.
1379 assert!(expected.is_dir(), "dir should be created idempotently");
1380 // Env var was OVERRIDDEN with the per-bundle dir — registry-
1381 // sourced key (CLAUDE_CONFIG_DIR for claude).
1382 assert_eq!(
1383 auth_env.get("CLAUDE_CONFIG_DIR").map(String::as_str),
1384 Some(dir.as_str()),
1385 "auth_config_dir_env_var must point at the per-bundle dir, NOT the legacy ambient one",
1386 );
1387
1388 // Step 3+4: simulate the post-confirm_authenticated() success
1389 // path. The function returns the REAL bundle id (not the
1390 // synthetic placeholder) and persists the account + binding.
1391 let returned = persist_oauth_binding_or_synthetic(
1392 &wstore,
1393 &broker,
1394 Some(bundle_id),
1395 "claude",
1396 Some(&dir),
1397 "auth-sess-test",
1398 );
1399 assert_eq!(
1400 returned, bundle_id,
1401 "success path must return the bundle id, not the synthetic placeholder"
1402 );
1403
1404 // Binding row exists for (bundle, claude).
1405 let bindings = wstore.bundle_identity_bindings(bundle_id).unwrap();
1406 assert_eq!(bindings.len(), 1, "exactly one binding after one auth success");
1407 assert_eq!(bindings[0].identity_id, bundle_id);
1408 assert_eq!(bindings[0].provider, "claude");
1409
1410 // Account row carries SecretRef::OAuthConfigDir { dir } —
1411 // the same dir we passed at spawn — and status = "valid".
1412 let acct = wstore
1413 .identity_get(&bindings[0].account_id)
1414 .unwrap()
1415 .expect("account row exists");
1416 assert_eq!(acct.provider, "claude");
1417 assert_eq!(acct.kind, "oauth");
1418 assert_eq!(acct.status, "valid");
1419 match acct.secret_ref {
1420 SecretRef::OAuthConfigDir { dir: persisted_dir } => {
1421 assert_eq!(
1422 persisted_dir, dir,
1423 "persisted OAuthConfigDir.dir must equal the spawn dir"
1424 );
1425 }
1426 other => panic!("expected OAuthConfigDir, got {other:?}"),
1427 }
1428
1429 // Cleanup env so other tests don't inherit our overrides.
1430 std::env::remove_var("AGENTMUX_HOME_OVERRIDE");
1431 for (k, _) in paths.to_env_vars() {
1432 std::env::remove_var(k);
1433 }
1434 }
1435
1436 #[test]
1437 fn compute_dir_skipped_for_empty_bundle_id() {
1438 // Ambient launch (empty into_bundle_id) — no per-bundle dir
1439 // override, legacy auth_env left intact.
1440 let mut env = std::collections::HashMap::new();
1441 env.insert("CLAUDE_CONFIG_DIR".to_string(), "/legacy/ambient".to_string());
1442 let dir = compute_and_ensure_bundle_dir(None, "claude", &mut env);
1443 assert!(dir.is_none());
1444 assert_eq!(
1445 env.get("CLAUDE_CONFIG_DIR").map(String::as_str),
1446 Some("/legacy/ambient"),
1447 "legacy ambient env must survive when no bundle id"
1448 );
1449
1450 let mut env = std::collections::HashMap::new();
1451 let dir = compute_and_ensure_bundle_dir(Some(""), "claude", &mut env);
1452 assert!(dir.is_none(), "empty string bundle id is the same as None");
1453 assert!(env.is_empty());
1454 }
1455
1456 #[test]
1457 fn compute_dir_skipped_for_api_key_provider() {
1458 // Api-key-class providers must NOT go through the OAuth
1459 // per-bundle dir override even when their registry entry
1460 // declares an `auth_config_dir_env_var` (kimi has
1461 // `KIMI_SHARE_DIR` so the registry test alone isn't enough —
1462 // the gate has to be provider_class).
1463 let mut env = std::collections::HashMap::new();
1464 let dir = compute_and_ensure_bundle_dir(Some("id-1"), "kimi", &mut env);
1465 assert!(dir.is_none(), "api-key provider class must skip the OAuth dir path");
1466 assert!(
1467 env.get("KIMI_SHARE_DIR").is_none(),
1468 "api-key providers must NEVER get their config-dir env var set by the OAuth-start path"
1469 );
1470 // Same for github (no registry entry at all).
1471 let mut env = std::collections::HashMap::new();
1472 let dir = compute_and_ensure_bundle_dir(Some("id-1"), "github", &mut env);
1473 assert!(dir.is_none());
1474 assert!(env.get("GITHUB_TOKEN").is_none());
1475 }
1476
1477 #[test]
1478 fn persist_returns_synthetic_when_no_bundle_id() {
1479 let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1480 let broker = Arc::new(crate::backend::wps::Broker::new());
1481 let r = persist_oauth_binding_or_synthetic(
1482 &wstore,
1483 &broker,
1484 None,
1485 "claude",
1486 Some("/some/dir"),
1487 "sess-x",
1488 );
1489 assert_eq!(r, "pending-bundle-for-sess-x");
1490 }
1491
1492 #[test]
1493 fn persist_returns_synthetic_when_no_bundle_dir() {
1494 let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1495 let broker = Arc::new(crate::backend::wps::Broker::new());
1496 let r = persist_oauth_binding_or_synthetic(
1497 &wstore,
1498 &broker,
1499 Some("id-1"),
1500 "claude",
1501 None,
1502 "sess-y",
1503 );
1504 assert_eq!(
1505 r, "pending-bundle-for-sess-y",
1506 "no bundle dir → no persistence → synthetic id"
1507 );
1508 }
1509}