agentmux_srv\server/
identity_handlers.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pre-launch OAuth flow RPC handlers.
5//!
6//! Five commands per `docs/specs/SPEC_PRE_LAUNCH_OAUTH_FLOW_2026_05_14.md` §7:
7//!
8//!   * `auth.start`           — `StartProviderAuth`
9//!   * `auth.poll`            — `PollProviderAuth`
10//!   * `auth.submitcallback`  — `SubmitAuthCallback`
11//!   * `auth.cancel`          — `CancelProviderAuth`
12//!   * `auth.submitapikey`    — `SubmitProviderApiKey`
13//!
14//! This module owns the RPC surface and dispatches into
15//! `crate::identity::auth_session::AuthSessionManager`. The actual
16//! provider-CLI spawn (which feeds stdout lines into the session
17//! via `record_line`) lands in a follow-up commit on this branch —
18//! these handlers ship the lifecycle plumbing and return clear
19//! "spawn not yet wired" errors for the callback/api-key paths so
20//! frontend (PR B) can integrate against the real shape.
21
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::{SystemTime, UNIX_EPOCH};
25
26use serde::{Deserialize, Serialize};
27
28use crate::backend::providers::get_provider;
29use crate::backend::rpc::engine::WshRpcEngine;
30use crate::backend::storage::wstore::{IdentityAccount, SecretRef, WaveStore};
31use crate::backend::wps::Broker;
32
33use super::AppState;
34
35pub const COMMAND_AUTH_START: &str = "auth.start";
36pub const COMMAND_AUTH_POLL: &str = "auth.poll";
37pub const COMMAND_AUTH_SUBMIT_CALLBACK: &str = "auth.submitcallback";
38pub const COMMAND_AUTH_CANCEL: &str = "auth.cancel";
39pub const COMMAND_AUTH_SUBMIT_API_KEY: &str = "auth.submitapikey";
40
41#[derive(Debug, Deserialize)]
42#[serde(rename_all = "camelCase")]
43struct StartProviderAuthReq {
44    provider_id: String,
45    /// Optional — when set, a successful auth adds an account to the
46    /// existing bundle. When None, a fresh bundle is created.
47    #[serde(default)]
48    into_bundle_id: Option<String>,
49    /// Resolved CLI path. The frontend calls `resolvecli` first to
50    /// install / locate the provider's CLI; the resulting path is
51    /// passed here. Keeps the provider table single-sourced in the
52    /// frontend.
53    cli_path: String,
54    /// e.g. `["auth", "login"]` from the provider definition's
55    /// `authLoginCommand` field.
56    auth_login_args: Vec<String>,
57    /// e.g. `["auth", "status", "--json"]` — used to confirm
58    /// authentication after the CLI emits its success line.
59    auth_check_args: Vec<String>,
60    /// Env vars to inject at spawn time. Per-provider auth
61    /// isolation env vars come here (CLAUDE_CONFIG_DIR, CODEX_HOME,
62    /// GEMINI_CLI_HOME, etc.).
63    #[serde(default)]
64    auth_env: std::collections::HashMap<String, String>,
65    /// Spawn the auth login subprocess under a PTY (instead of plain
66    /// piped stdio). Required by providers whose auth subcommand
67    /// checks `isatty()` and refuses to run otherwise (currently
68    /// OpenClaw's `models auth login --provider <id>`). The flag is
69    /// forwarded down to the CEF host's `run_cli_login` which picks
70    /// the PTY branch when set.
71    #[serde(default)]
72    requires_tty: bool,
73}
74
75#[derive(Debug, Deserialize)]
76#[serde(rename_all = "camelCase")]
77struct PollProviderAuthReq {
78    session_id: String,
79}
80
81#[derive(Debug, Deserialize)]
82#[serde(rename_all = "camelCase")]
83struct SubmitAuthCallbackReq {
84    session_id: String,
85    /// The redirect URL the user pasted back (browser-didn't-open
86    /// fallback). The handler writes it to the spawned CLI's stdin.
87    callback_url: String,
88}
89
90#[derive(Debug, Deserialize)]
91#[serde(rename_all = "camelCase")]
92struct CancelProviderAuthReq {
93    session_id: String,
94}
95
96#[derive(Debug, Deserialize)]
97#[serde(rename_all = "camelCase")]
98struct SubmitProviderApiKeyReq {
99    provider_id: String,
100    /// Optional — same semantics as StartProviderAuth.into_bundle_id.
101    #[serde(default)]
102    into_bundle_id: Option<String>,
103    /// The API key the user pasted. Will be validated by running the
104    /// provider's authCheckCommand against it, then persisted as a
105    /// `SecretRef::PlaintextDev` (PR 1 of the storage spec) or
106    /// encrypted variant (PR 2).
107    api_key: String,
108    account_name: String,
109}
110
111#[derive(Debug, Serialize)]
112#[serde(rename_all = "camelCase")]
113struct AckResp {
114    success: bool,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    error: Option<String>,
117}
118
119pub fn register_identity_handlers(engine: &Arc<WshRpcEngine>, state: &AppState) {
120    let mgr = state.auth_session_manager.clone();
121    let wstore = state.wstore.clone();
122    let broker = state.broker.clone();
123    engine.register_handler(
124        COMMAND_AUTH_START,
125        Box::new(move |data, _ctx| {
126            let mgr = mgr.clone();
127            let wstore = wstore.clone();
128            let broker = broker.clone();
129            Box::pin(async move {
130                let req: StartProviderAuthReq = serde_json::from_value(data)
131                    .map_err(|e| format!("auth.start: {e}"))?;
132                tracing::info!(
133                    provider_id = %req.provider_id,
134                    cli_path = %req.cli_path,
135                    into_bundle_id = ?req.into_bundle_id,
136                    "auth.start"
137                );
138                // OAuth Bundles PR C invariant — when an OAuth flow runs
139                // against a bundle, the CLI's OAuth tokens land INSIDE the
140                // bundle's per-provider config dir, NOT ambient at
141                // `~/.claude/`. Compute the dir from the registry
142                // (single source of truth — `auth_dir_name` per provider)
143                // and the per-bundle root from `DataPaths::identity_dir`.
144                // Mirror the dir into `auth_env` under the provider's
145                // `auth_config_dir_env_var` (e.g. `CLAUDE_CONFIG_DIR`).
146                // This overrides whatever the frontend computed via the
147                // legacy ambient `ensureAuthDir` path so a bundle-bound
148                // launch never accidentally writes to the global dir.
149                //
150                // When `into_bundle_id` is empty (ambient launch — no
151                // bundle context), keep the legacy `auth_env` exactly so
152                // the existing ambient flow keeps working.
153                //
154                // Errors (path resolve, mkdir) log + fall back to the
155                // legacy env — never abort `auth.start` over a per-bundle
156                // dir issue. Mirrors the `inject_identity_env` pattern.
157                let mut auth_env = req.auth_env;
158                let bundle_dir = compute_and_ensure_bundle_dir(
159                    req.into_bundle_id.as_deref(),
160                    &req.provider_id,
161                    &mut auth_env,
162                );
163                let r = mgr.start_session(req.provider_id.clone(), req.into_bundle_id.clone());
164                spawn_auth_cli(
165                    mgr,
166                    wstore,
167                    broker,
168                    r.session_id.clone(),
169                    req.provider_id,
170                    req.into_bundle_id,
171                    bundle_dir,
172                    req.cli_path,
173                    req.auth_login_args,
174                    req.auth_check_args,
175                    auth_env,
176                    req.requires_tty,
177                );
178                Ok(Some(serde_json::to_value(&r).unwrap_or_default()))
179            })
180        }),
181    );
182
183    let mgr = state.auth_session_manager.clone();
184    engine.register_handler(
185        COMMAND_AUTH_POLL,
186        Box::new(move |data, _ctx| {
187            let mgr = mgr.clone();
188            Box::pin(async move {
189                let req: PollProviderAuthReq = serde_json::from_value(data)
190                    .map_err(|e| format!("auth.poll: {e}"))?;
191                let snap = mgr
192                    .poll_session(&req.session_id)
193                    .ok_or_else(|| format!("auth.poll: unknown session {}", req.session_id))?;
194                Ok(Some(serde_json::to_value(&snap).unwrap_or_default()))
195            })
196        }),
197    );
198
199    let mgr = state.auth_session_manager.clone();
200    engine.register_handler(
201        COMMAND_AUTH_CANCEL,
202        Box::new(move |data, _ctx| {
203            let mgr = mgr.clone();
204            Box::pin(async move {
205                let req: CancelProviderAuthReq = serde_json::from_value(data)
206                    .map_err(|e| format!("auth.cancel: {e}"))?;
207                let ok = mgr.cancel_session(&req.session_id);
208                Ok(Some(
209                    serde_json::to_value(&AckResp {
210                        success: ok,
211                        error: if ok {
212                            None
213                        } else {
214                            Some(format!("unknown or already-terminal session: {}", req.session_id))
215                        },
216                    })
217                    .unwrap_or_default(),
218                ))
219            })
220        }),
221    );
222
223    let mgr = state.auth_session_manager.clone();
224    engine.register_handler(
225        COMMAND_AUTH_SUBMIT_CALLBACK,
226        Box::new(move |data, _ctx| {
227            let mgr = mgr.clone();
228            Box::pin(async move {
229                let req: SubmitAuthCallbackReq = serde_json::from_value(data)
230                    .map_err(|e| format!("auth.submitcallback: {e}"))?;
231                let delivered = mgr
232                    .send_to_stdin(&req.session_id, req.callback_url)
233                    .await;
234                Ok(Some(
235                    serde_json::to_value(&AckResp {
236                        success: delivered,
237                        error: if delivered {
238                            None
239                        } else {
240                            Some(format!(
241                                "no stdin sender for session {} (process exited or session unknown)",
242                                req.session_id
243                            ))
244                        },
245                    })
246                    .unwrap_or_default(),
247                ))
248            })
249        }),
250    );
251
252    engine.register_handler(
253        COMMAND_AUTH_SUBMIT_API_KEY,
254        Box::new(move |data, _ctx| {
255            Box::pin(async move {
256                let _req: SubmitProviderApiKeyReq = serde_json::from_value(data)
257                    .map_err(|e| format!("auth.submitapikey: {e}"))?;
258                // API-key path validates by running the provider's
259                // authCheckCommand with the key in the appropriate
260                // env var, then persists via wstore. That persistence
261                // is part of PR C (bundle auto-creation) per spec
262                // §10 — the validate-and-stash logic is here but
263                // wstore writes wait. For PR A we return an explicit
264                // error so frontend (PR B) sees a clear "not yet"
265                // signal while OAuth providers work end-to-end.
266                Err::<Option<serde_json::Value>, String>(
267                    "auth.submitapikey: bundle persistence lands in PR C"
268                        .to_string(),
269                )
270            })
271        }),
272    );
273}
274
275/// Spawn the provider's auth-login CLI and drive the session through
276/// to a terminal state. Background-only — returns immediately. The
277/// drain task feeds stdout+stderr lines into
278/// `AuthSessionManager::record_line`; on a login-success pattern OR
279/// child exit, runs the provider's authCheckCommand to confirm and
280/// transitions to Success or Failed.
281///
282/// On the success path (CLI exited cleanly + authCheckCommand passed),
283/// when `into_bundle_id` AND `bundle_dir` are both set, the function
284/// persists the OAuth binding into the bundle: a `SecretRef::OAuthConfigDir`
285/// account is upserted (status `valid`) and bound via
286/// `bundle_identity_bind`. This is the §4.5 OAuth-success invariant —
287/// after this point, future launches of any agent against the bundle
288/// resolve through `inject_identity_env`'s oauth-class dispatch (PR B)
289/// and reuse the same CLI-managed tokens inside `bundle_dir`.
290///
291/// On failure or when `into_bundle_id` is empty (ambient launch), the
292/// per-bundle binding step is skipped. The bundle row (if any was
293/// auto-created by the New Identity modal) stays — the user's next
294/// attempt reuses it.
295#[allow(clippy::too_many_arguments)]
296fn spawn_auth_cli(
297    mgr: Arc<crate::identity::auth_session::AuthSessionManager>,
298    wstore: Arc<WaveStore>,
299    broker: Arc<Broker>,
300    session_id: String,
301    provider_id: String,
302    into_bundle_id: Option<String>,
303    bundle_dir: Option<String>,
304    cli_path: String,
305    auth_login_args: Vec<String>,
306    auth_check_args: Vec<String>,
307    auth_env: std::collections::HashMap<String, String>,
308    requires_tty: bool,
309) {
310    use std::process::Stdio;
311    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
312    use tokio::process::Command;
313    use tokio::sync::mpsc;
314
315    // Channel for SubmitAuthCallback → CLI stdin forwarding.
316    // Buffer of 4 is enough — only one URL per session in normal use.
317    let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(4);
318
319    // PTY branch: providers like OpenClaw whose auth subcommand
320    // refuses to run when `isatty()==0`. Spawn via portable_pty so the
321    // child sees an interactive terminal, feed PTY output through the
322    // same record_line matcher the pipes path uses. Stdout/stderr are
323    // merged into the single PTY stream — record_line handles both as
324    // identical input.
325    if requires_tty {
326        spawn_auth_cli_pty(
327            mgr,
328            wstore,
329            broker,
330            session_id,
331            provider_id,
332            into_bundle_id,
333            bundle_dir,
334            cli_path,
335            auth_login_args,
336            auth_check_args,
337            auth_env,
338            stdin_tx,
339            stdin_rx,
340        );
341        return;
342    }
343
344    let mgr_for_task = mgr.clone();
345    let session_id_for_task = session_id.clone();
346    let cli_path_for_check = cli_path.clone();
347    let auth_check_args_for_check = auth_check_args.clone();
348    let auth_env_for_check = auth_env.clone();
349    let wstore_for_task = wstore.clone();
350    let broker_for_task = broker.clone();
351    let into_bundle_id_for_task = into_bundle_id.clone();
352    let bundle_dir_for_task = bundle_dir.clone();
353    let provider_id_for_task = provider_id.clone();
354
355    let handle = tokio::spawn(async move {
356        tracing::info!(
357            session_id = %session_id_for_task,
358            provider_id = %provider_id,
359            cli_path = %cli_path,
360            auth_login_args = ?auth_login_args,
361            "auth.spawn: launching provider CLI"
362        );
363
364        // Spawn the CLI. kill_on_drop guarantees cleanup if our
365        // task is aborted (cancel path).
366        let mut child = match Command::new(&cli_path)
367            .args(&auth_login_args)
368            .envs(&auth_env)
369            .stdin(Stdio::piped())
370            .stdout(Stdio::piped())
371            .stderr(Stdio::piped())
372            .kill_on_drop(true)
373            .spawn()
374        {
375            Ok(c) => {
376                tracing::info!(
377                    session_id = %session_id_for_task,
378                    pid = ?c.id(),
379                    "auth.spawn: child started"
380                );
381                c
382            }
383            Err(e) => {
384                tracing::error!(
385                    session_id = %session_id_for_task,
386                    error = %e,
387                    "auth.spawn: Command::spawn failed"
388                );
389                mgr_for_task.finish_failure(
390                    &session_id_for_task,
391                    format!("spawn `{cli_path}` failed: {e}"),
392                );
393                mgr_for_task.detach_process(&session_id_for_task);
394                return;
395            }
396        };
397
398        let stdout = child
399            .stdout
400            .take()
401            .expect("piped stdout");
402        let stderr = child
403            .stderr
404            .take()
405            .expect("piped stderr");
406        let mut stdin = child
407            .stdin
408            .take()
409            .expect("piped stdin");
410
411        // Stdin writer — forwards callback URLs (pasted back by the
412        // user when browser auto-open failed) into the CLI process.
413        let stdin_writer = tokio::spawn(async move {
414            while let Some(line) = stdin_rx.recv().await {
415                if stdin.write_all(line.as_bytes()).await.is_err() {
416                    break;
417                }
418                if stdin.write_all(b"\n").await.is_err() {
419                    break;
420                }
421                let _ = stdin.flush().await;
422            }
423        });
424
425        // Stdout drain — line-by-line into the session manager. When
426        // we see a login-success pattern, confirm via authCheckCommand
427        // and transition to Success. Don't break the loop — some CLIs
428        // continue printing after the success line.
429        let mgr_stdout = mgr_for_task.clone();
430        let sid_stdout = session_id_for_task.clone();
431        let cli_path_stdout = cli_path_for_check.clone();
432        let check_args_stdout = auth_check_args_for_check.clone();
433        let check_env_stdout = auth_env_for_check.clone();
434        let wstore_stdout = wstore_for_task.clone();
435        let broker_stdout = broker_for_task.clone();
436        let into_bundle_id_stdout = into_bundle_id_for_task.clone();
437        let bundle_dir_stdout = bundle_dir_for_task.clone();
438        let provider_id_stdout = provider_id_for_task.clone();
439        // Shared between drain + post-exit. The drain sets it after
440        // persisting on a LoginSuccess match; the post-exit transition
441        // block (below) checks it and skips its entire success path if
442        // already transitioned — without this guard the drain's
443        // persist + post-exit's persist both ran on every successful
444        // OAuth, producing orphan IdentityAccount rows (each `Uuid::new_v4`)
445        // and duplicate `identitybundlebindings:changed:<id>` publishes.
446        // Reagent P1 on #981.
447        let success_transitioned = Arc::new(AtomicBool::new(false));
448        let success_transitioned_drain = Arc::clone(&success_transitioned);
449        let stdout_drain = tokio::spawn(async move {
450            let mut lines = BufReader::new(stdout).lines();
451            while let Ok(Some(line)) = lines.next_line().await {
452                // NOT logging the raw line — OAuth providers print
453                // auth URLs / callback codes / device codes on stdout
454                // and even debug-level logs would persist those into
455                // ~/.agentmux/logs/. Length-only is enough for "the
456                // CLI emitted something" diagnostics. Reagent P2 on #847.
457                tracing::debug!(session_id = %sid_stdout, bytes = line.len(), "auth.spawn: stdout line");
458                let m = mgr_stdout.record_line(&sid_stdout, &line);
459                if !success_transitioned_drain.load(Ordering::Acquire)
460                    && matches!(
461                        m,
462                        Some(crate::identity::auth_patterns::AuthPatternMatch::LoginSuccess { .. })
463                    )
464                {
465                    if confirm_authenticated(
466                        &cli_path_stdout,
467                        &check_args_stdout,
468                        &check_env_stdout,
469                    )
470                    .await
471                    {
472                        // Persist the OAuthConfigDir binding into the
473                        // bundle. When `into_bundle_id` is empty (no
474                        // bundle context, e.g. ambient continuation) or
475                        // `bundle_dir` failed to resolve at spawn, the
476                        // helper skips persistence and the session still
477                        // succeeds — the user just won't get bundle-
478                        // backed token reuse next launch. Bundle id
479                        // returned to the frontend is the real bundle id
480                        // when persistence happened, or a synthetic
481                        // placeholder otherwise so existing UI keeps
482                        // its filter-on-prefix behaviour.
483                        let bundle_id = persist_oauth_binding_or_synthetic(
484                            &wstore_stdout,
485                            &broker_stdout,
486                            into_bundle_id_stdout.as_deref(),
487                            &provider_id_stdout,
488                            bundle_dir_stdout.as_deref(),
489                            &sid_stdout,
490                        );
491                        mgr_stdout.finish_success(&sid_stdout, bundle_id);
492                        success_transitioned_drain.store(true, Ordering::Release);
493                    }
494                }
495            }
496        });
497
498        // Stderr drain — same matcher path. Some CLIs emit the OAuth
499        // URL on stderr (verbose logging style).
500        let mgr_stderr = mgr_for_task.clone();
501        let sid_stderr = session_id_for_task.clone();
502        let stderr_drain = tokio::spawn(async move {
503            let mut lines = BufReader::new(stderr).lines();
504            while let Ok(Some(line)) = lines.next_line().await {
505                // Same redaction rationale as stdout above.
506                tracing::debug!(session_id = %sid_stderr, bytes = line.len(), "auth.spawn: stderr line");
507                let _ = mgr_stderr.record_line(&sid_stderr, &line);
508            }
509        });
510
511        // Wait for the child to exit (or for our task to be aborted
512        // by cancel_session — in which case kill_on_drop handles the
513        // child).
514        let exit = child.wait().await;
515        tracing::info!(session_id = %session_id_for_task, exit = ?exit, "auth.spawn: child exited");
516
517        // Let stdout/stderr drains catch any final lines.
518        let _ = stdout_drain.await;
519        let _ = stderr_drain.await;
520        // Stdin writer exits when the tx side is dropped (after this
521        // task ends) — abort defensively in case it's blocked on a
522        // pending receive.
523        stdin_writer.abort();
524
525        // Final transition if we haven't already hit Success on the
526        // login-pattern path. Re-run authCheck because some CLIs exit
527        // cleanly without printing a "logged in as" line.
528        //
529        // Skip the entire block when the drain has already transitioned
530        // (LoginSuccess pattern matched + confirm_authenticated + persist
531        // ran). Without this guard, persist_oauth_binding_or_synthetic
532        // would fire a second time on every successful OAuth — a fresh
533        // IdentityAccount UUID would be upserted, `bundle_identity_bind`
534        // would repoint to the new account (orphaning the first), and
535        // the broker would re-publish the bindings-changed event.
536        // Reagent P1 on #981.
537        if !success_transitioned.load(Ordering::Acquire) {
538            match exit {
539                Ok(s) if s.success() => {
540                    if confirm_authenticated(
541                        &cli_path_for_check,
542                        &auth_check_args_for_check,
543                        &auth_env_for_check,
544                    )
545                    .await
546                    {
547                        let bundle_id = persist_oauth_binding_or_synthetic(
548                            &wstore_for_task,
549                            &broker_for_task,
550                            into_bundle_id_for_task.as_deref(),
551                            &provider_id_for_task,
552                            bundle_dir_for_task.as_deref(),
553                            &session_id_for_task,
554                        );
555                        mgr_for_task.finish_success(&session_id_for_task, bundle_id);
556                    } else {
557                        mgr_for_task.finish_failure(
558                            &session_id_for_task,
559                            "CLI exited 0 but authentication check failed".to_string(),
560                        );
561                    }
562                }
563                Ok(s) => {
564                    mgr_for_task.finish_failure(
565                        &session_id_for_task,
566                        format!("CLI exited with status {s}"),
567                    );
568                }
569                Err(e) => {
570                    mgr_for_task.finish_failure(
571                        &session_id_for_task,
572                        format!("wait error: {e}"),
573                    );
574                }
575            }
576        }
577
578        mgr_for_task.detach_process(&session_id_for_task);
579    });
580
581    mgr.attach_process(&session_id, handle, stdin_tx);
582}
583
584/// PTY-backed variant of spawn_auth_cli. Used for providers whose auth
585/// subcommand bails on `isatty()==0` (currently OpenClaw). Mirrors the
586/// pipes path's lifecycle: spawn → drain → confirm → finish_success or
587/// finish_failure → detach. Stdout+stderr are merged on the PTY side;
588/// `record_line` doesn't care which stream a line came from.
589///
590/// Same OAuth-success invariant as the pipes path — when
591/// `into_bundle_id` and `bundle_dir` are both set and `confirm_authenticated`
592/// returns true, persists `SecretRef::OAuthConfigDir` + binding before
593/// `finish_success`.
594#[allow(clippy::too_many_arguments)]
595fn spawn_auth_cli_pty(
596    mgr: Arc<crate::identity::auth_session::AuthSessionManager>,
597    wstore: Arc<WaveStore>,
598    broker: Arc<Broker>,
599    session_id: String,
600    provider_id: String,
601    into_bundle_id: Option<String>,
602    bundle_dir: Option<String>,
603    cli_path: String,
604    auth_login_args: Vec<String>,
605    auth_check_args: Vec<String>,
606    auth_env: std::collections::HashMap<String, String>,
607    stdin_tx: tokio::sync::mpsc::Sender<String>,
608    mut stdin_rx: tokio::sync::mpsc::Receiver<String>,
609) {
610    use portable_pty::{native_pty_system, CommandBuilder, PtySize};
611
612    let mgr_for_task = mgr.clone();
613    let session_id_for_task = session_id.clone();
614    let cli_path_for_check = cli_path.clone();
615    let auth_check_args_for_check = auth_check_args.clone();
616    let auth_env_for_check = auth_env.clone();
617    let wstore_for_task = wstore.clone();
618    let broker_for_task = broker.clone();
619    let into_bundle_id_for_task = into_bundle_id.clone();
620    let bundle_dir_for_task = bundle_dir.clone();
621    let provider_id_for_task = provider_id.clone();
622
623    tracing::info!(
624        session_id = %session_id,
625        provider_id = %provider_id,
626        cli_path = %cli_path,
627        auth_login_args = ?auth_login_args,
628        "auth.spawn (PTY): launching provider CLI"
629    );
630
631    // Allocate the PTY, build the command, spawn the child, and
632    // register the PID — all synchronously, BEFORE the async drain/
633    // wait task and BEFORE `attach_process`. That way no
634    // `cancel_session` racing with the spawn can find a registered
635    // handle but a missing PID; either the PID is registered first
636    // (cancel can kill), or the early-return path fired and there
637    // is no child to kill.
638    let pty_system = native_pty_system();
639    let pair = match pty_system.openpty(PtySize {
640        rows: 24,
641        cols: 80,
642        pixel_width: 0,
643        pixel_height: 0,
644    }) {
645        Ok(p) => p,
646        Err(e) => {
647            // PTY allocation failure means the provider's auth
648            // subprocess can't get the interactive TTY it requires
649            // (openclaw `models auth login`). Surface as the typed
650            // `AMX-AUTH-001` so the FailedBanner shows the friendly
651            // recovery hint instead of "openpty failed: <e>".
652            let mux = agentmux_common::AgentMuxError::AuthRequiresTty {
653                provider: provider_id.clone(),
654            };
655            tracing::error!(
656                target: "amx::error",
657                session_id = %session_id,
658                amx_code = %mux.code(),
659                error = %e,
660                "auth.spawn (PTY): openpty failed"
661            );
662            mgr.finish_failure(&session_id, mux.to_wire().to_string());
663            mgr.detach_process(&session_id);
664            return;
665        }
666    };
667
668    let mut cmd = CommandBuilder::new(&cli_path);
669    for a in &auth_login_args {
670        cmd.arg(a);
671    }
672    for (k, v) in &auth_env {
673        cmd.env(k, v);
674    }
675    if let Ok(cwd) = std::env::current_dir() {
676        cmd.cwd(cwd);
677    }
678
679    let child = match pair.slave.spawn_command(cmd) {
680        Ok(c) => c,
681        Err(e) => {
682            tracing::error!(
683                session_id = %session_id,
684                error = %e,
685                "auth.spawn (PTY): spawn_command failed"
686            );
687            mgr.finish_failure(&session_id, format!("PTY spawn `{cli_path}` failed: {e}"));
688            mgr.detach_process(&session_id);
689            return;
690        }
691    };
692
693    if let Some(pid) = child.process_id() {
694        mgr.attach_pty_pid(&session_id, pid);
695    }
696
697    let reader = match pair.master.try_clone_reader() {
698        Ok(r) => r,
699        Err(e) => {
700            tracing::error!(session_id = %session_id, error = %e, "auth.spawn (PTY): try_clone_reader failed");
701            mgr.finish_failure(&session_id, format!("PTY reader: {e}"));
702            mgr.detach_process(&session_id);
703            return;
704        }
705    };
706    let writer = match pair.master.take_writer() {
707        Ok(w) => w,
708        Err(e) => {
709            tracing::error!(session_id = %session_id, error = %e, "auth.spawn (PTY): take_writer failed");
710            mgr.finish_failure(&session_id, format!("PTY writer: {e}"));
711            mgr.detach_process(&session_id);
712            return;
713        }
714    };
715
716    let handle = tokio::spawn(async move {
717
718        // Stdin writer: forward callback URLs from the manager into
719        // the child's PTY input. portable_pty's master writer is sync;
720        // wrap each write in `block_in_place` so the blocking IO
721        // doesn't starve the tokio reactor when the PTY input buffer
722        // is full.
723        let stdin_writer_handle = tokio::spawn(async move {
724            let mut writer = writer;
725            while let Some(line) = stdin_rx.recv().await {
726                let res = tokio::task::block_in_place(|| {
727                    use std::io::Write;
728                    writer
729                        .write_all(line.as_bytes())
730                        .and_then(|_| writer.write_all(b"\n"))
731                        .and_then(|_| writer.flush())
732                });
733                if res.is_err() {
734                    break;
735                }
736            }
737        });
738
739        // Drain: synchronous line reader from PTY master, feeds into
740        // the same record_line matcher the pipes path uses. Runs in a
741        // blocking thread; sends parsed events through a oneshot when
742        // we hit a login-success pattern (so the async task can run
743        // confirm_authenticated without crossing the blocking thread).
744        let mgr_drain = mgr_for_task.clone();
745        let sid_drain = session_id_for_task.clone();
746        let cli_path_drain = cli_path_for_check.clone();
747        let check_args_drain = auth_check_args_for_check.clone();
748        let check_env_drain = auth_env_for_check.clone();
749        let wstore_drain = wstore_for_task.clone();
750        let broker_drain = broker_for_task.clone();
751        let into_bundle_id_drain = into_bundle_id_for_task.clone();
752        let bundle_dir_drain = bundle_dir_for_task.clone();
753        let provider_id_drain = provider_id_for_task.clone();
754        // Shared with the post-exit fallback block below — same pattern
755        // as the pipes path. Drain's detached `Handle::current().spawn`
756        // task does the persist + finish_success; the drain sets this
757        // atomic BEFORE returning so the outer's check sees it. The
758        // detached task may still be in flight when the outer checks
759        // (PTY drain returns on EOF, the persist task runs async) — that
760        // is fine because the OUTER would only re-persist if the atomic
761        // is false, which it never is once the drain matched the
762        // LoginSuccess pattern. Reagent P1 on #981.
763        let success_transitioned = Arc::new(AtomicBool::new(false));
764        let success_transitioned_drain = Arc::clone(&success_transitioned);
765        let drain_handle = tokio::task::spawn_blocking(move || {
766            use std::io::BufRead;
767            let mut reader = std::io::BufReader::new(reader);
768            let mut line = String::new();
769            // The detached confirm/persist task's JoinHandle, populated
770            // when the drain matches LoginSuccess. Returned to outer so
771            // it can await completion before its post-exit check —
772            // without that wait, outer would race the detached and
773            // either skip too soon (if it sets the atomic eagerly) or
774            // double-persist. Reagent P1 follow-up on #981.
775            let mut maybe_detached: Option<tokio::task::JoinHandle<()>> = None;
776            loop {
777                line.clear();
778                match reader.read_line(&mut line) {
779                    Ok(0) => break, // EOF
780                    Ok(_) => {
781                        // Same redaction policy as the pipes path —
782                        // OAuth URLs / codes can be in here.
783                        tracing::debug!(session_id = %sid_drain, bytes = line.len(), "auth.spawn (PTY): line");
784                        let m = mgr_drain.record_line(&sid_drain, &line);
785                        if maybe_detached.is_none()
786                            && matches!(
787                                m,
788                                Some(crate::identity::auth_patterns::AuthPatternMatch::LoginSuccess { .. })
789                            )
790                        {
791                            // Hand off to async to call confirm_authenticated.
792                            // The detached task OWNS the transition —
793                            // it calls finish_success on confirm-OK and
794                            // finish_failure on confirm-NOT-OK, AND
795                            // sets the shared atomic in BOTH cases. The
796                            // drain only captures the JoinHandle here
797                            // and returns it; outer awaits both drain
798                            // and the inner handle before its check, so
799                            // the atomic is correctly settled by then.
800                            let cli = cli_path_drain.clone();
801                            let args = check_args_drain.clone();
802                            let env = check_env_drain.clone();
803                            let mgr2 = mgr_drain.clone();
804                            let sid2 = sid_drain.clone();
805                            let wstore2 = wstore_drain.clone();
806                            let broker2 = broker_drain.clone();
807                            let into_bundle_id2 = into_bundle_id_drain.clone();
808                            let bundle_dir2 = bundle_dir_drain.clone();
809                            let provider_id2 = provider_id_drain.clone();
810                            let success_for_detached = Arc::clone(&success_transitioned_drain);
811                            let handle = tokio::runtime::Handle::current().spawn(async move {
812                                if confirm_authenticated(&cli, &args, &env).await {
813                                    let bundle_id = persist_oauth_binding_or_synthetic(
814                                        &wstore2,
815                                        &broker2,
816                                        into_bundle_id2.as_deref(),
817                                        &provider_id2,
818                                        bundle_dir2.as_deref(),
819                                        &sid2,
820                                    );
821                                    mgr2.finish_success(&sid2, bundle_id);
822                                    // Atomic set ONLY on confirm-success
823                                    // — mirrors the pipes path. On
824                                    // confirm-miss the detached does
825                                    // nothing (atomic stays false),
826                                    // outer's post-exit fallback gets
827                                    // another shot at confirm by the
828                                    // time creds are fully written.
829                                    // Outer awaits this handle first,
830                                    // so the atomic is correctly
831                                    // reflected by the time it checks.
832                                    // codex P1 follow-up on #981.
833                                    success_for_detached.store(true, Ordering::Release);
834                                }
835                            });
836                            maybe_detached = Some(handle);
837                        }
838                    }
839                    Err(_) => break,
840                }
841            }
842            maybe_detached
843        });
844
845        // Wait for the child in a blocking task. pair (master + slave)
846        // moves into the closure so its destructor runs AFTER
847        // child.wait() — ConPTY contract on Windows.
848        let wait_handle = tokio::task::spawn_blocking(move || {
849            let mut child = child;
850            let exit = child.wait();
851            drop(pair);
852            exit
853        });
854
855        let exit = wait_handle.await;
856        tracing::info!(session_id = %session_id_for_task, exit = ?exit, "auth.spawn (PTY): child exited");
857
858        // Await the drain itself, THEN its optional detached
859        // confirm/persist task. Both have to complete before the
860        // post-exit check — otherwise outer might see atomic=false
861        // (detached still confirming), run its fallback persist, AND
862        // the detached's persist also runs → the original double-
863        // persist race re-opens. Reagent P1 follow-up on #981.
864        let detached = drain_handle.await.ok().flatten();
865        if let Some(h) = detached {
866            let _ = h.await;
867        }
868        stdin_writer_handle.abort();
869
870        // Final transition fallback — some CLIs exit cleanly without
871        // emitting a login-success line that record_line recognizes.
872        //
873        // Skip the whole block when the drain already transitioned —
874        // same double-persist guard as the pipes path. Reagent P1 on
875        // #981.
876        if !success_transitioned.load(Ordering::Acquire) {
877            match exit {
878                Ok(Ok(s)) if s.success() => {
879                    if confirm_authenticated(
880                        &cli_path_for_check,
881                        &auth_check_args_for_check,
882                        &auth_env_for_check,
883                    )
884                    .await
885                    {
886                        let bundle_id = persist_oauth_binding_or_synthetic(
887                            &wstore_for_task,
888                            &broker_for_task,
889                            into_bundle_id_for_task.as_deref(),
890                            &provider_id_for_task,
891                            bundle_dir_for_task.as_deref(),
892                            &session_id_for_task,
893                        );
894                        mgr_for_task.finish_success(&session_id_for_task, bundle_id);
895                    } else {
896                        mgr_for_task.finish_failure(
897                            &session_id_for_task,
898                            "CLI exited cleanly but auth-check still failed".to_string(),
899                        );
900                    }
901                }
902                Ok(Ok(s)) => {
903                    mgr_for_task.finish_failure(
904                        &session_id_for_task,
905                        format!("CLI exited with code {:?}", s.exit_code()),
906                    );
907                }
908                Ok(Err(e)) => {
909                    mgr_for_task.finish_failure(
910                        &session_id_for_task,
911                        format!("PTY wait error: {e}"),
912                    );
913                }
914                Err(e) => {
915                    mgr_for_task.finish_failure(
916                        &session_id_for_task,
917                        format!("PTY wait task join error: {e}"),
918                    );
919                }
920            }
921        }
922
923        mgr_for_task.detach_process(&session_id_for_task);
924    });
925
926    mgr.attach_process(&session_id, handle, stdin_tx);
927}
928
929/// Compute the per-bundle OAuth config dir + ensure it exists +
930/// override the provider's `auth_config_dir_env_var` entry in `auth_env`.
931///
932/// Returns `Some(<absolute path string>)` when:
933///   - `into_bundle_id` is `Some` and non-empty AND
934///   - the provider is registered in the CLI provider registry AND
935///   - the provider declares an `auth_config_dir_env_var` (oauth-class
936///     providers — claude / codex / openclaw — per spec §4.3) AND
937///   - `DataPaths::from_env()` resolves AND
938///   - `create_dir_all` succeeds.
939///
940/// Otherwise returns `None` and leaves `auth_env` untouched. Callers
941/// must continue without per-bundle isolation (legacy ambient path,
942/// or skip the binding-persist step) — never abort the OAuth start.
943///
944/// Per `SPEC_OAUTH_IDENTITY_BUNDLES_2026_05_22.md` §4.5: the dir
945/// (and the env-var key) come from the CLI provider registry
946/// (`backend::providers::get_provider(id)`) so the resolver / spawn
947/// path / OAuth-start handler never drift on which env var redirects
948/// each CLI's config home. The single source of truth lives in
949/// `agentmux-srv/src/backend/providers.rs`.
950fn compute_and_ensure_bundle_dir(
951    into_bundle_id: Option<&str>,
952    provider_id: &str,
953    auth_env: &mut std::collections::HashMap<String, String>,
954) -> Option<String> {
955    let bundle_id = into_bundle_id.filter(|s| !s.is_empty())?;
956    // Gate on provider_class so api-key-class providers (which have a
957    // registry entry with a non-empty `auth_config_dir_env_var` —
958    // e.g. kimi's `KIMI_SHARE_DIR`) never go through the per-bundle
959    // OAuth-dir path. Only claude / codex / openclaw — the spec §4.3
960    // oauth-class providers — get the per-bundle override.
961    match crate::identity::resolver::provider_class(provider_id) {
962        Some(crate::identity::resolver::ProviderClass::OAuth { .. }) => {}
963        _ => return None,
964    }
965    let provider = match get_provider(provider_id) {
966        Some(p) => p,
967        None => {
968            // OAuth-class per provider_class but missing from the CLI
969            // registry — should be impossible (resolver reads the env
970            // var from the registry itself), but treat as a soft fail.
971            tracing::warn!(
972                target: "identity",
973                provider_id,
974                "auth.start: oauth-class provider not in registry — skipping per-bundle dir override"
975            );
976            return None;
977        }
978    };
979    // Empty env-var name → no isolation possible (oauth-class providers
980    // should never have this empty per spec, but belt-and-braces).
981    if provider.auth_config_dir_env_var.is_empty() {
982        return None;
983    }
984    let paths = match agentmux_common::DataPaths::from_env() {
985        Some(p) => p,
986        None => {
987            tracing::warn!(
988                target: "identity",
989                provider_id,
990                bundle_id,
991                "auth.start: DataPaths::from_env() returned None — skipping per-bundle dir override"
992            );
993            return None;
994        }
995    };
996    // identity_dir rejects unsafe path segments (empty / `.` / `..` /
997    // segment with `/`, `\`, drive-letter, …). bundle_id comes from
998    // the auth.start request body, so this guard prevents a crafted
999    // id from escaping the identities root.
1000    let bundle_root = match paths.identity_dir(bundle_id) {
1001        Some(p) => p,
1002        None => {
1003            tracing::warn!(
1004                target: "identity",
1005                provider_id,
1006                bundle_id,
1007                "auth.start: bundle_id is not a safe path segment — skipping per-bundle dir override"
1008            );
1009            return None;
1010        }
1011    };
1012    let dir = bundle_root.join(provider.auth_dir_name);
1013    if let Err(e) = std::fs::create_dir_all(&dir) {
1014        tracing::warn!(
1015            target: "identity",
1016            provider_id,
1017            bundle_id,
1018            error = %e,
1019            path = %dir.display(),
1020            "auth.start: failed to create per-bundle config dir — skipping override"
1021        );
1022        return None;
1023    }
1024    let dir_str = dir.to_string_lossy().to_string();
1025    // Override (or insert) the provider's config-dir env var. The
1026    // frontend may have computed the legacy ambient dir via
1027    // `ensureAuthDir(providerId)` and put it here under the same key;
1028    // we replace it with the per-bundle dir so the OAuth CLI writes
1029    // its tokens inside the bundle, not in the ambient version-config
1030    // dir.
1031    auth_env.insert(
1032        provider.auth_config_dir_env_var.to_string(),
1033        dir_str.clone(),
1034    );
1035    tracing::info!(
1036        target: "identity",
1037        provider_id,
1038        bundle_id,
1039        env_var = provider.auth_config_dir_env_var,
1040        dir = %dir.display(),
1041        "auth.start: per-bundle OAuth config dir wired"
1042    );
1043    Some(dir_str)
1044}
1045
1046/// On a successful OAuth handshake (CLI exited 0 + authCheckCommand
1047/// confirmed), persist the OAuth binding into the bundle and return
1048/// the real bundle id to use in the `Success` wire status.
1049///
1050/// "Persist the binding" =
1051///   1. Upsert an `IdentityAccount` with:
1052///        - provider = `<provider_id>`
1053///        - kind = "oauth"
1054///        - secret_ref = `SecretRef::OAuthConfigDir { dir }`
1055///        - status = "valid"
1056///   2. Bind it via `bundle_identity_bind(bundle_id, provider, account_id)`.
1057///   3. Publish `identitybundlebindings:changed:<bundle_id>` so the
1058///      Launch modal (and any other open Identity pane) re-fetches
1059///      the new binding and the launch button enables without a
1060///      manual refresh.
1061///
1062/// Per-binding errors (account upsert, bind, broker publish) are
1063/// logged + downgraded — the success transition still fires with the
1064/// real bundle id when possible, and falls back to the legacy
1065/// `pending-bundle-for-<sid>` synthetic when not. The OAuth CLI's
1066/// tokens are already on disk inside `bundle_dir` either way; the
1067/// resolver layer (PR B) just won't find a binding to point at them.
1068/// Same shape as `inject_identity_env`'s "log + skip, never abort".
1069///
1070/// When `into_bundle_id` is empty or `bundle_dir` is `None`, returns
1071/// the synthetic placeholder unchanged — the legacy ambient path
1072/// (PR A behaviour) is preserved.
1073fn persist_oauth_binding_or_synthetic(
1074    wstore: &Arc<WaveStore>,
1075    broker: &Arc<Broker>,
1076    into_bundle_id: Option<&str>,
1077    provider_id: &str,
1078    bundle_dir: Option<&str>,
1079    session_id: &str,
1080) -> String {
1081    let synthetic = || format!("pending-bundle-for-{session_id}");
1082    let bundle_id = match into_bundle_id.filter(|s| !s.is_empty()) {
1083        Some(b) => b,
1084        None => return synthetic(),
1085    };
1086    let dir = match bundle_dir.filter(|s| !s.is_empty()) {
1087        Some(d) => d,
1088        None => {
1089            tracing::warn!(
1090                target: "identity",
1091                bundle_id,
1092                provider_id,
1093                "auth success: bundle_dir unresolved — skipping binding persist"
1094            );
1095            return synthetic();
1096        }
1097    };
1098    let now = SystemTime::now()
1099        .duration_since(UNIX_EPOCH)
1100        .map(|d| d.as_millis() as i64)
1101        .unwrap_or(0);
1102    // Re-OAuth (token expiry / Reconnect): load any existing binding
1103    // for this (bundle, provider) and reuse its `account_id` so the
1104    // upsert UPDATES the prior IdentityAccount in place instead of
1105    // creating a fresh UUID + orphaning the old row in
1106    // `db_identity_accounts`. Fresh UUID only on first bind. codex
1107    // P2 follow-up on #981.
1108    let account_id = wstore
1109        .bundle_identity_bindings(bundle_id)
1110        .ok()
1111        .into_iter()
1112        .flatten()
1113        .find(|b| b.provider == provider_id)
1114        .map(|b| b.account_id)
1115        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1116    let account = IdentityAccount {
1117        id: account_id.clone(),
1118        name: format!("{provider_id}-oauth"),
1119        provider: provider_id.to_string(),
1120        kind: "oauth".to_string(),
1121        display_name: String::new(),
1122        secret_ref: SecretRef::OAuthConfigDir { dir: dir.to_string() },
1123        context: serde_json::json!({}),
1124        // Per spec §4.4: a binding the user JUST OAuth'd into is `valid`
1125        // by definition — the token file was written within the past
1126        // few seconds. The resolver's expiry probe (PR D) refines this
1127        // on every spawn.
1128        status: crate::identity::resolver::oauth_status::VALID.to_string(),
1129        created_at: now,
1130        updated_at: now,
1131    };
1132    if let Err(e) = wstore.identity_upsert(&account) {
1133        tracing::warn!(
1134            target: "identity",
1135            bundle_id,
1136            provider_id,
1137            error = %e,
1138            "auth success: identity_upsert failed — falling back to synthetic bundle id"
1139        );
1140        return synthetic();
1141    }
1142    if let Err(e) = wstore.bundle_identity_bind(bundle_id, provider_id, &account_id) {
1143        tracing::warn!(
1144            target: "identity",
1145            bundle_id,
1146            provider_id,
1147            account_id,
1148            error = %e,
1149            "auth success: bundle_identity_bind failed — account row persisted but no binding"
1150        );
1151        return synthetic();
1152    }
1153    // Broker push so the frontend's `identitybundlebindings:changed:<id>`
1154    // listener (AgentLaunchModal createEffect) refetches bindings and
1155    // flips `hasMatchingBinding` → true without a manual reload.
1156    broker.publish(crate::backend::wps::WaveEvent {
1157        event: format!("identitybundlebindings:changed:{bundle_id}"),
1158        scopes: vec![],
1159        sender: String::new(),
1160        persist: 0,
1161        data: None,
1162    });
1163    tracing::info!(
1164        target: "identity",
1165        bundle_id,
1166        provider_id,
1167        account_id,
1168        dir,
1169        "auth success: OAuth binding persisted"
1170    );
1171    bundle_id.to_string()
1172}
1173
1174/// Run the provider's auth-check subcommand and return true if it
1175/// exits 0. Failure modes (binary missing, network error, etc.) are
1176/// all treated as "not authenticated" — the caller will then either
1177/// keep waiting (drain task loop) or transition to Failed (exit
1178/// fallback).
1179async fn confirm_authenticated(
1180    cli_path: &str,
1181    args: &[String],
1182    env: &std::collections::HashMap<String, String>,
1183) -> bool {
1184    use std::process::Stdio;
1185    use tokio::process::Command;
1186    match Command::new(cli_path)
1187        .args(args)
1188        .envs(env)
1189        .stdin(Stdio::null())
1190        .stdout(Stdio::null())
1191        .stderr(Stdio::null())
1192        .kill_on_drop(true)
1193        .status()
1194        .await
1195    {
1196        Ok(s) => s.success(),
1197        Err(_) => false,
1198    }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use super::*;
1204    use crate::backend::storage::wstore::Identity;
1205
1206    // Request shape parsing tests — verify the wire contract matches
1207    // what the frontend (PR B) will send. The end-to-end RPC-engine
1208    // invocation is covered at the integration level once the CLI
1209    // spawn lands; the underlying AuthSessionManager behaviour is
1210    // covered in `identity::auth_session::tests`.
1211
1212    #[test]
1213    fn start_req_parses_minimal() {
1214        let v = serde_json::json!({
1215            "providerId": "claude",
1216            "cliPath": "/usr/bin/claude",
1217            "authLoginArgs": ["login"],
1218            "authCheckArgs": ["whoami"]
1219        });
1220        let r: StartProviderAuthReq = serde_json::from_value(v).unwrap();
1221        assert_eq!(r.provider_id, "claude");
1222        assert_eq!(r.cli_path, "/usr/bin/claude");
1223        assert_eq!(r.auth_login_args, vec!["login"]);
1224        assert_eq!(r.auth_check_args, vec!["whoami"]);
1225        assert!(r.into_bundle_id.is_none());
1226        assert!(r.auth_env.is_empty());
1227    }
1228
1229    #[test]
1230    fn start_req_parses_with_bundle_id() {
1231        let v = serde_json::json!({
1232            "providerId": "codex",
1233            "cliPath": "/usr/bin/codex",
1234            "authLoginArgs": ["auth", "login"],
1235            "authCheckArgs": ["auth", "status"],
1236            "authEnv": { "FOO": "bar" },
1237            "intoBundleId": "bundle-1"
1238        });
1239        let r: StartProviderAuthReq = serde_json::from_value(v).unwrap();
1240        assert_eq!(r.provider_id, "codex");
1241        assert_eq!(r.into_bundle_id.as_deref(), Some("bundle-1"));
1242        assert_eq!(r.auth_env.get("FOO").map(String::as_str), Some("bar"));
1243    }
1244
1245    #[test]
1246    fn poll_req_round_trips() {
1247        let v = serde_json::json!({ "sessionId": "auth-xyz" });
1248        let r: PollProviderAuthReq = serde_json::from_value(v).unwrap();
1249        assert_eq!(r.session_id, "auth-xyz");
1250    }
1251
1252    #[test]
1253    fn submit_callback_req_round_trips() {
1254        let v = serde_json::json!({
1255            "sessionId": "auth-xyz",
1256            "callbackUrl": "https://example.com/cb?code=abc"
1257        });
1258        let r: SubmitAuthCallbackReq = serde_json::from_value(v).unwrap();
1259        assert_eq!(r.session_id, "auth-xyz");
1260        assert!(r.callback_url.contains("code=abc"));
1261    }
1262
1263    #[test]
1264    fn submit_api_key_req_round_trips() {
1265        let v = serde_json::json!({
1266            "providerId": "openclaw",
1267            "apiKey": "sk-test",
1268            "accountName": "my-key"
1269        });
1270        let r: SubmitProviderApiKeyReq = serde_json::from_value(v).unwrap();
1271        assert_eq!(r.provider_id, "openclaw");
1272        assert_eq!(r.api_key, "sk-test");
1273        assert_eq!(r.account_name, "my-key");
1274        assert!(r.into_bundle_id.is_none());
1275    }
1276
1277    #[test]
1278    fn ack_resp_omits_error_when_success() {
1279        let r = AckResp { success: true, error: None };
1280        let v = serde_json::to_value(&r).unwrap();
1281        assert_eq!(v, serde_json::json!({ "success": true }));
1282    }
1283
1284    #[test]
1285    fn ack_resp_includes_error_when_failure() {
1286        let r = AckResp { success: false, error: Some("boom".into()) };
1287        let v = serde_json::to_value(&r).unwrap();
1288        assert_eq!(v, serde_json::json!({ "success": false, "error": "boom" }));
1289    }
1290
1291    // ── OAuth Bundles PR C invariant ──────────────────────────────
1292    //
1293    // Round-trip: spawn-dir computation → persist OAuth binding →
1294    // bundle row carries a `SecretRef::OAuthConfigDir` pointing at
1295    // exactly that dir, with status = "valid".
1296
1297    #[test]
1298    fn persist_oauth_binding_round_trip() {
1299        // Per spec §4.5: on auth success against bundle <id> for
1300        // provider <P>, the handler must
1301        //   1. compute the dir = DataPaths::identity_dir(<id>).join(P.auth_dir_name)
1302        //   2. set the provider's auth_config_dir_env_var to that dir
1303        //      in the spawn env (so the CLI writes tokens there, not
1304        //      at ~/.<P>/),
1305        //   3. on confirm, upsert an `IdentityAccount` whose
1306        //      `secret_ref` is `SecretRef::OAuthConfigDir { dir }`
1307        //      and `status = "valid"`,
1308        //   4. bind it via `bundle_identity_bind`.
1309        //
1310        // Together those three facts let the next launch's
1311        // `inject_identity_env` find the OAuth account, read its
1312        // OAuthConfigDir pointer, and inject CLAUDE_CONFIG_DIR with
1313        // the same path — closing the bundle loop.
1314
1315        // Use a tempdir as the agentmux home so DataPaths resolves
1316        // without depending on the user's real ~/.agentmux. Local
1317        // mutex so two env-var-touching tests in this module can't
1318        // race (agentmux-common's TEST_ENV_LOCK is `pub(crate)`,
1319        // not reachable from here).
1320        static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1321        let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1322        let tmp = tempfile::TempDir::new().unwrap();
1323        std::env::set_var("AGENTMUX_HOME_OVERRIDE", tmp.path());
1324        // DataPaths::from_env() wants every AGENTMUX_*_DIR — easiest
1325        // to compute paths once and export them.
1326        let paths = agentmux_common::DataPaths::resolve(
1327            "0.0.0-test",
1328            &agentmux_common::RuntimeMode::Installed,
1329        )
1330        .unwrap();
1331        paths.ensure_dirs().unwrap();
1332        for (k, v) in paths.to_env_vars() {
1333            std::env::set_var(k, v);
1334        }
1335
1336        let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1337        let broker = Arc::new(crate::backend::wps::Broker::new());
1338
1339        // Seed the bundle that the OAuth flow targets (PR 1 #969
1340        // creates this row up-front when the user names the new
1341        // identity; we mirror that here).
1342        let bundle_id = "id-oauth-pr-c";
1343        let identity = Identity {
1344            id: bundle_id.to_string(),
1345            name: "Work".to_string(),
1346            description: String::new(),
1347            is_blank: false,
1348            created_at: 0,
1349            updated_at: 0,
1350        };
1351        wstore.bundle_identity_upsert(&identity).unwrap();
1352
1353        // Step 1+2: compute and ensure dir, inject env var.
1354        let mut auth_env: std::collections::HashMap<String, String> =
1355            std::collections::HashMap::new();
1356        // Simulate the frontend's legacy `ensureAuthDir` putting an
1357        // ambient dir in the env first — the function MUST override
1358        // it with the per-bundle dir.
1359        auth_env.insert("CLAUDE_CONFIG_DIR".to_string(), "/legacy/ambient/claude".to_string());
1360        let dir = compute_and_ensure_bundle_dir(
1361            Some(bundle_id),
1362            "claude",
1363            &mut auth_env,
1364        )
1365        .expect("oauth-class provider with bundle id must yield a dir");
1366
1367        // Dir is `DataPaths::identity_dir(<id>).join(<auth_dir_name>)`
1368        // — claude's `auth_dir_name` is "claude" per the registry.
1369        let expected = paths
1370            .identity_dir(bundle_id)
1371            .expect("test bundle_id is a safe segment")
1372            .join("claude");
1373        assert_eq!(
1374            std::path::Path::new(&dir),
1375            expected,
1376            "per-bundle dir must equal DataPaths::identity_dir(id).join(auth_dir_name)"
1377        );
1378        // create_dir_all happened.
1379        assert!(expected.is_dir(), "dir should be created idempotently");
1380        // Env var was OVERRIDDEN with the per-bundle dir — registry-
1381        // sourced key (CLAUDE_CONFIG_DIR for claude).
1382        assert_eq!(
1383            auth_env.get("CLAUDE_CONFIG_DIR").map(String::as_str),
1384            Some(dir.as_str()),
1385            "auth_config_dir_env_var must point at the per-bundle dir, NOT the legacy ambient one",
1386        );
1387
1388        // Step 3+4: simulate the post-confirm_authenticated() success
1389        // path. The function returns the REAL bundle id (not the
1390        // synthetic placeholder) and persists the account + binding.
1391        let returned = persist_oauth_binding_or_synthetic(
1392            &wstore,
1393            &broker,
1394            Some(bundle_id),
1395            "claude",
1396            Some(&dir),
1397            "auth-sess-test",
1398        );
1399        assert_eq!(
1400            returned, bundle_id,
1401            "success path must return the bundle id, not the synthetic placeholder"
1402        );
1403
1404        // Binding row exists for (bundle, claude).
1405        let bindings = wstore.bundle_identity_bindings(bundle_id).unwrap();
1406        assert_eq!(bindings.len(), 1, "exactly one binding after one auth success");
1407        assert_eq!(bindings[0].identity_id, bundle_id);
1408        assert_eq!(bindings[0].provider, "claude");
1409
1410        // Account row carries SecretRef::OAuthConfigDir { dir } —
1411        // the same dir we passed at spawn — and status = "valid".
1412        let acct = wstore
1413            .identity_get(&bindings[0].account_id)
1414            .unwrap()
1415            .expect("account row exists");
1416        assert_eq!(acct.provider, "claude");
1417        assert_eq!(acct.kind, "oauth");
1418        assert_eq!(acct.status, "valid");
1419        match acct.secret_ref {
1420            SecretRef::OAuthConfigDir { dir: persisted_dir } => {
1421                assert_eq!(
1422                    persisted_dir, dir,
1423                    "persisted OAuthConfigDir.dir must equal the spawn dir"
1424                );
1425            }
1426            other => panic!("expected OAuthConfigDir, got {other:?}"),
1427        }
1428
1429        // Cleanup env so other tests don't inherit our overrides.
1430        std::env::remove_var("AGENTMUX_HOME_OVERRIDE");
1431        for (k, _) in paths.to_env_vars() {
1432            std::env::remove_var(k);
1433        }
1434    }
1435
1436    #[test]
1437    fn compute_dir_skipped_for_empty_bundle_id() {
1438        // Ambient launch (empty into_bundle_id) — no per-bundle dir
1439        // override, legacy auth_env left intact.
1440        let mut env = std::collections::HashMap::new();
1441        env.insert("CLAUDE_CONFIG_DIR".to_string(), "/legacy/ambient".to_string());
1442        let dir = compute_and_ensure_bundle_dir(None, "claude", &mut env);
1443        assert!(dir.is_none());
1444        assert_eq!(
1445            env.get("CLAUDE_CONFIG_DIR").map(String::as_str),
1446            Some("/legacy/ambient"),
1447            "legacy ambient env must survive when no bundle id"
1448        );
1449
1450        let mut env = std::collections::HashMap::new();
1451        let dir = compute_and_ensure_bundle_dir(Some(""), "claude", &mut env);
1452        assert!(dir.is_none(), "empty string bundle id is the same as None");
1453        assert!(env.is_empty());
1454    }
1455
1456    #[test]
1457    fn compute_dir_skipped_for_api_key_provider() {
1458        // Api-key-class providers must NOT go through the OAuth
1459        // per-bundle dir override even when their registry entry
1460        // declares an `auth_config_dir_env_var` (kimi has
1461        // `KIMI_SHARE_DIR` so the registry test alone isn't enough —
1462        // the gate has to be provider_class).
1463        let mut env = std::collections::HashMap::new();
1464        let dir = compute_and_ensure_bundle_dir(Some("id-1"), "kimi", &mut env);
1465        assert!(dir.is_none(), "api-key provider class must skip the OAuth dir path");
1466        assert!(
1467            env.get("KIMI_SHARE_DIR").is_none(),
1468            "api-key providers must NEVER get their config-dir env var set by the OAuth-start path"
1469        );
1470        // Same for github (no registry entry at all).
1471        let mut env = std::collections::HashMap::new();
1472        let dir = compute_and_ensure_bundle_dir(Some("id-1"), "github", &mut env);
1473        assert!(dir.is_none());
1474        assert!(env.get("GITHUB_TOKEN").is_none());
1475    }
1476
1477    #[test]
1478    fn persist_returns_synthetic_when_no_bundle_id() {
1479        let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1480        let broker = Arc::new(crate::backend::wps::Broker::new());
1481        let r = persist_oauth_binding_or_synthetic(
1482            &wstore,
1483            &broker,
1484            None,
1485            "claude",
1486            Some("/some/dir"),
1487            "sess-x",
1488        );
1489        assert_eq!(r, "pending-bundle-for-sess-x");
1490    }
1491
1492    #[test]
1493    fn persist_returns_synthetic_when_no_bundle_dir() {
1494        let wstore = Arc::new(WaveStore::open_in_memory().unwrap());
1495        let broker = Arc::new(crate::backend::wps::Broker::new());
1496        let r = persist_oauth_binding_or_synthetic(
1497            &wstore,
1498            &broker,
1499            Some("id-1"),
1500            "claude",
1501            None,
1502            "sess-y",
1503        );
1504        assert_eq!(
1505            r, "pending-bundle-for-sess-y",
1506            "no bundle dir → no persistence → synthetic id"
1507        );
1508    }
1509}