agentmux_cef/
saga_dispatch.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase CPD-5 — host-side saga command dispatch + idempotency LRU.
5//
6// After CPD-1+2+3 merged, the launcher dispatches `IssueCmd::Host`
7// saga commands carrying `saga_id` over the launcher → host pipe.
8// CPD-3 made the wire live; this module makes host processing
9// idempotent so a launcher retry (e.g. after host pipe reconnect
10// drains the launcher's `pending_buffer`) does NOT re-execute the
11// same command — instead the host re-emits the same `Report*` reply
12// it produced the first time.
13//
14// **Why not just dedupe at the launcher?** The launcher already does
15// best-effort dedupe via single-flight per saga, but the buffer →
16// reconnect → drain path can legitimately re-deliver a command if
17// the host crashed mid-processing (the launcher has no way to know
18// whether the host saw the original send). Defense-in-depth: launcher
19// avoids resends when it can; host absorbs the rest with this LRU.
20//
21// **Key:** `(saga_id, CommandKind)`. `saga_id` alone is not enough —
22// a future feature could legitimately issue multiple distinct host
23// commands inside one saga (e.g. a saga that both reaps panes AND
24// drains the pool, each with a different `Command` variant). Keying
25// on `(saga_id, kind)` lets the LRU hold one entry per (saga, action
26// type) pair without collisions.
27//
28// **Bound:** 256 entries. Saga rate is human-driven (window opens /
29// closes); 256 covers far more than any realistic concurrent-saga
30// burst. Drop-oldest on overflow.
31//
32// **Test coverage:** see `#[cfg(test)] mod tests` at the bottom —
33// covers (a) duplicate command re-emits same Report without re-action,
34// (b) LRU eviction at the 257th distinct entry preserves recency
35// ordering.
36
37use std::collections::VecDeque;
38use std::sync::Arc;
39
40use agentmux_common::ipc::Command;
41use parking_lot::Mutex;
42use tokio::sync::mpsc::UnboundedSender;
43
44/// Discriminator for the host-bound saga command variants. Tracks
45/// the three commands the host actually consumes today
46/// (`SpawnPoolWindow`, `ReapPanes`, `DrainPoolIfLast`); future host-
47/// bound commands add a variant here.
48///
49/// Used as half of the LRU key `(saga_id, CommandKind)` so the same
50/// `saga_id` can carry multiple distinct host actions (a saga that
51/// reaps panes AND drains pool, each with a different Command kind)
52/// without collision.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum CommandKind {
55    SpawnPoolWindow,
56    ReapPanes,
57    DrainPoolIfLast,
58}
59
60impl CommandKind {
61    /// Extract the discriminator from a host-bound saga `Command`.
62    /// Returns `None` for commands that aren't host-bound saga
63    /// payloads (e.g. `Report*` Commands flowing in the OTHER
64    /// direction, or `Register` / `Ping`).
65    pub fn from_command(cmd: &Command) -> Option<Self> {
66        match cmd {
67            Command::SpawnPoolWindow { .. } => Some(Self::SpawnPoolWindow),
68            Command::ReapPanes { .. } => Some(Self::ReapPanes),
69            Command::DrainPoolIfLast { .. } => Some(Self::DrainPoolIfLast),
70            _ => None,
71        }
72    }
73}
74
75/// Maximum number of `(saga_id, kind)` entries held in the LRU.
76/// Spec §3.7: bound 256 — far above any realistic concurrent saga
77/// count.
78pub const SAGA_LRU_CAP: usize = 256;
79
80/// Idempotency LRU keyed by `(saga_id, CommandKind)`. Stores the
81/// resulting `Report*` Command so a duplicate dispatch re-emits the
82/// same reply payload without re-running the action.
83///
84/// Implementation: simple `VecDeque` of `(key, report)` pairs. Front
85/// is oldest, back is most-recent. Linear scan on lookup is fine at
86/// `cap=256`: the entire scan is a few microseconds and only happens
87/// once per saga command (a human-rate event).
88pub struct SagaIdempotencyLru {
89    entries: VecDeque<((u64, CommandKind), Command)>,
90    cap: usize,
91}
92
93impl SagaIdempotencyLru {
94    pub fn new(cap: usize) -> Self {
95        Self {
96            entries: VecDeque::with_capacity(cap),
97            cap,
98        }
99    }
100
101    pub fn with_default_cap() -> Self {
102        Self::new(SAGA_LRU_CAP)
103    }
104
105    /// Look up the cached Report for `(saga_id, kind)`. On hit,
106    /// promotes the entry to most-recent (back of the deque) and
107    /// returns a clone of the cached Report. On miss, returns None.
108    pub fn get(&mut self, saga_id: u64, kind: CommandKind) -> Option<Command> {
109        let key = (saga_id, kind);
110        let pos = self.entries.iter().position(|(k, _)| *k == key)?;
111        // Promote to most-recent: remove + push to back.
112        let (k, v) = self.entries.remove(pos)?;
113        let cloned = v.clone();
114        self.entries.push_back((k, v));
115        Some(cloned)
116    }
117
118    /// Cache the Report for `(saga_id, kind)`. If at capacity, drops
119    /// the oldest entry (front). If a duplicate key exists, updates
120    /// in place + promotes (defensive — caller usually hits via
121    /// `get` first; this branch covers a race where two duplicate
122    /// commands raced through `get` with both seeing miss).
123    pub fn insert(&mut self, saga_id: u64, kind: CommandKind, report: Command) {
124        let key = (saga_id, kind);
125        if let Some(pos) = self.entries.iter().position(|(k, _)| *k == key) {
126            // Defensive duplicate — replace + promote.
127            self.entries.remove(pos);
128        } else if self.entries.len() >= self.cap {
129            // Drop-oldest on overflow.
130            self.entries.pop_front();
131        }
132        self.entries.push_back((key, report));
133    }
134
135    /// Current number of entries (for tests + diagnostics).
136    pub fn len(&self) -> usize {
137        self.entries.len()
138    }
139
140    /// Return the saga_id of the oldest entry, or None if empty.
141    /// Used in tests to verify drop-oldest semantics.
142    #[cfg(test)]
143    pub fn oldest_saga_id(&self) -> Option<u64> {
144        self.entries.front().map(|(k, _)| k.0)
145    }
146}
147
148impl Default for SagaIdempotencyLru {
149    fn default() -> Self {
150        Self::with_default_cap()
151    }
152}
153
154/// Outcome of `dispatch_host_command`: tells the caller whether the
155/// dispatch was a fresh execution (action ran, Report was inserted
156/// + sent) or a duplicate (Report was re-sent from cache, action
157/// did NOT re-run).
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum DispatchOutcome {
160    /// First-time dispatch: action executed, cached Report sent.
161    Fresh,
162    /// Duplicate dispatch: cached Report re-emitted, no action.
163    Duplicate,
164    /// Command had no `saga_id` (or `saga_id == 0`, the
165    /// "no-saga" sentinel) — the LRU is bypassed and the action
166    /// runs unconditionally. This branch is mostly defensive: the
167    /// launcher's CPD-3 wiring always issues with a real saga_id,
168    /// but a stray legacy / forward-compat dispatch could hit this.
169    NoSagaBypass,
170    /// Command kind isn't a known host-bound saga payload (e.g.
171    /// the reader received a `Report*` echo that should never have
172    /// been sent down to the host). Logged at warn; no action, no
173    /// reply.
174    Unrecognized,
175}
176
177/// Dispatcher trait — abstracted so tests can inject a fake action
178/// runner without spawning real CEF windows. Production code uses
179/// `LiveActionRunner` which calls into `commands::window_pool` /
180/// host close-path code.
181pub trait SagaActionRunner: Send + Sync {
182    /// Run the `SpawnPoolWindow` action. Returns the resulting
183    /// pool window label (the host normally synthesizes a
184    /// `window-pool-<uuid>` label; for the saga reply we report
185    /// "pending" so the launcher reducer can correlate the
186    /// follow-up `ReportPoolWindowAdded` organic event by saga_id).
187    /// In production the spawn is fire-and-forget on the UI thread
188    /// — the actual label is reported via the existing organic
189    /// `report_pool_window_added` path. The saga's `Report` here
190    /// carries an empty/sentinel label indicating "spawn requested,
191    /// pool will fill asynchronously."
192    fn spawn_pool_window(&self) -> String;
193
194    /// Run the `ReapPanes` action for the named window. In current
195    /// architecture the host's `on_before_close` already drains
196    /// panes synchronously when a window closes, so this is a
197    /// best-effort acknowledge — the saga relies on the organic
198    /// `Event::PanesReaped` (via `report_panes_reaped` in
199    /// `client.rs`) for the real signal. The Report this returns
200    /// is a saga-correlated echo so the saga's `expected_saga_id`
201    /// filter matches.
202    fn reap_panes(&self, label: &str);
203
204    /// Run the `DrainPoolIfLast` action for the named window.
205    /// Returns true if the host considers `label` to have been the
206    /// last user-visible window (i.e. a drain WOULD be triggered).
207    /// Like `reap_panes`, the host's existing `on_before_close`
208    /// already does this decision inline; the saga's command path
209    /// is a re-issue / confirmation channel.
210    fn drain_pool_if_last(&self, label: &str) -> bool;
211}
212
213/// Dispatch a host-bound saga `Command`: check the LRU, run the
214/// action if not cached, build the corresponding `Report*`,
215/// cache it, and send via `reply_tx`. Returns the outcome so callers
216/// can log / count fresh vs. duplicate dispatch.
217///
218/// `lru` is a shared `Arc<Mutex<...>>` so the read-loop task and
219/// any future direct-dispatch path share a single cache.
220pub fn dispatch_host_command<R: SagaActionRunner>(
221    cmd: &Command,
222    runner: &R,
223    lru: &Arc<Mutex<SagaIdempotencyLru>>,
224    reply_tx: &UnboundedSender<Command>,
225) -> DispatchOutcome {
226    let kind = match CommandKind::from_command(cmd) {
227        Some(k) => k,
228        None => {
229            tracing::warn!(
230                "[saga-dispatch] received non-host-bound command on host pipe: {:?}",
231                cmd
232            );
233            return DispatchOutcome::Unrecognized;
234        }
235    };
236
237    // Extract saga_id from the command. For each variant the field
238    // name is `saga_id: u64`. `0` is the "no saga" sentinel per
239    // CPD-1 spec — bypass the LRU in that case (the action just
240    // runs without dedupe; useful for legacy / forward-compat
241    // launchers that stamp `0`).
242    let saga_id = match cmd {
243        Command::SpawnPoolWindow { saga_id } => *saga_id,
244        Command::ReapPanes { saga_id, .. } => *saga_id,
245        Command::DrainPoolIfLast { saga_id, .. } => *saga_id,
246        _ => unreachable!("kind matched but variant didn't — schema drift"),
247    };
248
249    if saga_id == 0 {
250        // Legacy / no-saga path: run action, build Report with
251        // saga_id = None (organic), send. No LRU touch.
252        let report = build_and_run_report(cmd, kind, runner, None);
253        let _ = reply_tx.send(report);
254        return DispatchOutcome::NoSagaBypass;
255    }
256
257    // Hot path — saga_id present. Check LRU.
258    {
259        let mut guard = lru.lock();
260        if let Some(cached_report) = guard.get(saga_id, kind) {
261            tracing::info!(
262                "[saga-dispatch] duplicate saga command (saga_id={}, kind={:?}) — re-emitting cached report",
263                saga_id, kind
264            );
265            // Send the cached report; do NOT re-run the action.
266            let _ = reply_tx.send(cached_report);
267            return DispatchOutcome::Duplicate;
268        }
269    }
270
271    // Miss — run action, build Report, cache, send. We hold no
272    // lock across the action call (action may post UI tasks /
273    // touch CEF state).
274    let report = build_and_run_report(cmd, kind, runner, Some(saga_id));
275    {
276        let mut guard = lru.lock();
277        guard.insert(saga_id, kind, report.clone());
278    }
279    let _ = reply_tx.send(report);
280    DispatchOutcome::Fresh
281}
282
283/// Run the action for `cmd` and synthesize the corresponding
284/// `Report*` Command. `saga_id_for_report` is what the Report's
285/// echo field carries — `Some(N)` when the dispatch was saga-driven,
286/// `None` for the no-saga bypass path.
287fn build_and_run_report<R: SagaActionRunner>(
288    cmd: &Command,
289    kind: CommandKind,
290    runner: &R,
291    saga_id_for_report: Option<u64>,
292) -> Command {
293    match (cmd, kind) {
294        (Command::SpawnPoolWindow { .. }, CommandKind::SpawnPoolWindow) => {
295            let label = runner.spawn_pool_window();
296            Command::ReportPoolWindowAdded {
297                label,
298                saga_id: saga_id_for_report,
299            }
300        }
301        (Command::ReapPanes { label, .. }, CommandKind::ReapPanes) => {
302            runner.reap_panes(label);
303            Command::ReportPanesReaped {
304                label: label.clone(),
305                saga_id: saga_id_for_report,
306            }
307        }
308        (Command::DrainPoolIfLast { label, .. }, CommandKind::DrainPoolIfLast) => {
309            let was_last = runner.drain_pool_if_last(label);
310            Command::ReportPoolDrainDecision {
311                label: label.clone(),
312                was_last,
313                saga_id: saga_id_for_report,
314            }
315        }
316        _ => unreachable!("kind/cmd mismatch — guarded by from_command()"),
317    }
318}
319
320// ── Production action runner ──────────────────────────────────────
321//
322// Wraps real host code paths. Kept thin so the test runner can
323// substitute deterministic stubs without pulling in CEF.
324
325#[cfg(target_os = "windows")]
326pub struct LiveActionRunner {
327    pub state: Arc<crate::state::AppState>,
328}
329
330#[cfg(target_os = "windows")]
331impl SagaActionRunner for LiveActionRunner {
332    fn spawn_pool_window(&self) -> String {
333        // Fire the real spawn. The host's existing
334        // `report_pool_window_added` organic path will report the
335        // freshly minted label; the saga-correlated reply this
336        // dispatcher emits carries an empty label as a sentinel —
337        // the launcher reducer matches by saga_id, not label.
338        crate::commands::window_pool::spawn_pool_window(&self.state);
339        String::new()
340    }
341
342    fn reap_panes(&self, label: &str) {
343        // Host's `on_before_close` is the canonical pane-reaper.
344        // A saga-issued `ReapPanes` for a window whose close is
345        // already in flight is a redundant nudge — log and rely
346        // on the organic `report_panes_reaped` for the real signal.
347        // Future expansion (forced-close-from-saga) can hook here.
348        tracing::info!(
349            "[saga-dispatch] ReapPanes saga command for label={} — acknowledged (host close-path is canonical reaper)",
350            label
351        );
352    }
353
354    fn drain_pool_if_last(&self, label: &str) -> bool {
355        // Compute the same condition `on_before_close` uses to
356        // decide drain. MUST mirror that gate exactly: same pool
357        // inventory (unpromoted ∪ queue), same atomic-snapshot
358        // discipline. A two-lock variant or unpromoted-only check
359        // here lets a queued pool window inflate `user_count` and
360        // suppress the drain when the user actually did close
361        // their last visible window.
362        let (pool_inventory, browsers) = self.state.user_visibility_snapshot();
363        let labels: Vec<String> = browsers.into_iter().map(|(l, _)| l).collect();
364        let user_count = labels
365            .iter()
366            .filter(|k| !pool_inventory.contains(k.as_str()) && !k.starts_with("browser-pane-"))
367            .count();
368        // `was_last` semantics: closing window is the last user-
369        // visible window. Caller's `label` should be subtracted —
370        // but at saga-dispatch time the close hasn't happened yet
371        // (or has just happened); count of 0 OR count of 1 with
372        // the closing window in the set both indicate "last."
373        let label_present = labels.iter().any(|k| k == label);
374        user_count == 0 || (user_count == 1 && label_present)
375    }
376}
377
378// ── Tests ─────────────────────────────────────────────────────────
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use std::sync::atomic::{AtomicUsize, Ordering};
384    use tokio::sync::mpsc;
385
386    /// Test runner: counts how many times each action runs so
387    /// duplicate-dispatch tests can verify idempotency.
388    struct CountingRunner {
389        spawn_calls: AtomicUsize,
390        reap_calls: AtomicUsize,
391        drain_calls: AtomicUsize,
392        drain_returns_was_last: bool,
393    }
394
395    impl CountingRunner {
396        fn new() -> Self {
397            Self {
398                spawn_calls: AtomicUsize::new(0),
399                reap_calls: AtomicUsize::new(0),
400                drain_calls: AtomicUsize::new(0),
401                drain_returns_was_last: false,
402            }
403        }
404    }
405
406    impl SagaActionRunner for CountingRunner {
407        fn spawn_pool_window(&self) -> String {
408            let n = self.spawn_calls.fetch_add(1, Ordering::SeqCst);
409            format!("window-pool-test-{}", n)
410        }
411
412        fn reap_panes(&self, _label: &str) {
413            self.reap_calls.fetch_add(1, Ordering::SeqCst);
414        }
415
416        fn drain_pool_if_last(&self, _label: &str) -> bool {
417            self.drain_calls.fetch_add(1, Ordering::SeqCst);
418            self.drain_returns_was_last
419        }
420    }
421
422    fn drain_replies(rx: &mut mpsc::UnboundedReceiver<Command>) -> Vec<Command> {
423        let mut out = Vec::new();
424        while let Ok(cmd) = rx.try_recv() {
425            out.push(cmd);
426        }
427        out
428    }
429
430    #[test]
431    fn duplicate_spawn_pool_window_does_not_respawn_but_reemits_report() {
432        let runner = CountingRunner::new();
433        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
434        let (tx, mut rx) = mpsc::unbounded_channel();
435
436        let cmd = Command::SpawnPoolWindow { saga_id: 42 };
437
438        // First dispatch — fresh.
439        let outcome1 = dispatch_host_command(&cmd, &runner, &lru, &tx);
440        assert_eq!(outcome1, DispatchOutcome::Fresh);
441        assert_eq!(runner.spawn_calls.load(Ordering::SeqCst), 1);
442
443        // Second dispatch with same saga_id — duplicate; should
444        // NOT re-spawn, but SHOULD re-emit the same Report.
445        let outcome2 = dispatch_host_command(&cmd, &runner, &lru, &tx);
446        assert_eq!(outcome2, DispatchOutcome::Duplicate);
447        assert_eq!(
448            runner.spawn_calls.load(Ordering::SeqCst),
449            1,
450            "spawn must not re-execute on duplicate"
451        );
452
453        let replies = drain_replies(&mut rx);
454        assert_eq!(replies.len(), 2, "expected one reply per dispatch");
455        // Both replies must serialize to byte-identical JSON
456        // (Command itself isn't PartialEq because it carries
457        // some non-comparable nested types).
458        let j0 = serde_json::to_string(&replies[0]).unwrap();
459        let j1 = serde_json::to_string(&replies[1]).unwrap();
460        assert_eq!(j0, j1, "duplicate dispatch must re-emit identical Report");
461        match &replies[0] {
462            Command::ReportPoolWindowAdded { label, saga_id } => {
463                assert_eq!(label, "window-pool-test-0");
464                assert_eq!(*saga_id, Some(42));
465            }
466            other => panic!("unexpected reply: {:?}", other),
467        }
468    }
469
470    #[test]
471    fn duplicate_reap_panes_does_not_rerun_but_reemits_report() {
472        let runner = CountingRunner::new();
473        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
474        let (tx, mut rx) = mpsc::unbounded_channel();
475
476        let cmd = Command::ReapPanes {
477            label: "win-1".to_string(),
478            saga_id: 7,
479        };
480
481        assert_eq!(
482            dispatch_host_command(&cmd, &runner, &lru, &tx),
483            DispatchOutcome::Fresh
484        );
485        assert_eq!(
486            dispatch_host_command(&cmd, &runner, &lru, &tx),
487            DispatchOutcome::Duplicate
488        );
489        assert_eq!(runner.reap_calls.load(Ordering::SeqCst), 1);
490
491        let replies = drain_replies(&mut rx);
492        assert_eq!(replies.len(), 2);
493        let j0 = serde_json::to_string(&replies[0]).unwrap();
494        let j1 = serde_json::to_string(&replies[1]).unwrap();
495        assert_eq!(j0, j1);
496    }
497
498    #[test]
499    fn duplicate_drain_pool_if_last_reuses_was_last_decision() {
500        let mut runner = CountingRunner::new();
501        runner.drain_returns_was_last = true;
502        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
503        let (tx, mut rx) = mpsc::unbounded_channel();
504
505        let cmd = Command::DrainPoolIfLast {
506            label: "win-1".to_string(),
507            saga_id: 99,
508        };
509
510        assert_eq!(
511            dispatch_host_command(&cmd, &runner, &lru, &tx),
512            DispatchOutcome::Fresh
513        );
514        assert_eq!(
515            dispatch_host_command(&cmd, &runner, &lru, &tx),
516            DispatchOutcome::Duplicate
517        );
518        assert_eq!(runner.drain_calls.load(Ordering::SeqCst), 1);
519
520        let replies = drain_replies(&mut rx);
521        assert_eq!(replies.len(), 2);
522        // Even though the runner's `drain_returns_was_last` was
523        // captured at first call, the second reply must echo the
524        // same `was_last` (defense against runner state changing
525        // mid-flight — the LRU MUST hold the original decision).
526        match (&replies[0], &replies[1]) {
527            (
528                Command::ReportPoolDrainDecision {
529                    was_last: a,
530                    saga_id: id_a,
531                    ..
532                },
533                Command::ReportPoolDrainDecision {
534                    was_last: b,
535                    saga_id: id_b,
536                    ..
537                },
538            ) => {
539                assert_eq!(a, b);
540                assert_eq!(*a, true);
541                assert_eq!(id_a, id_b);
542                assert_eq!(*id_a, Some(99));
543            }
544            other => panic!("unexpected reply pair: {:?}", other),
545        }
546    }
547
548    #[test]
549    fn lru_evicts_oldest_at_capacity() {
550        // Use a small cap to keep test runtime cheap.
551        let cap = 4;
552        let mut lru = SagaIdempotencyLru::new(cap);
553        for i in 0..cap as u64 {
554            lru.insert(
555                i,
556                CommandKind::SpawnPoolWindow,
557                Command::ReportPoolWindowAdded {
558                    label: format!("w{}", i),
559                    saga_id: Some(i),
560                },
561            );
562        }
563        assert_eq!(lru.len(), cap);
564        assert_eq!(lru.oldest_saga_id(), Some(0));
565
566        // Insert one more — saga_id=0 should be evicted (oldest).
567        lru.insert(
568            cap as u64,
569            CommandKind::SpawnPoolWindow,
570            Command::ReportPoolWindowAdded {
571                label: format!("w{}", cap),
572                saga_id: Some(cap as u64),
573            },
574        );
575        assert_eq!(lru.len(), cap);
576        assert_eq!(lru.oldest_saga_id(), Some(1));
577        assert!(lru.get(0, CommandKind::SpawnPoolWindow).is_none());
578        assert!(lru.get(cap as u64, CommandKind::SpawnPoolWindow).is_some());
579    }
580
581    #[test]
582    fn lru_eviction_at_257th_distinct_command() {
583        // Per spec §3.7: bound 256.
584        let runner = CountingRunner::new();
585        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
586        let (tx, _rx) = mpsc::unbounded_channel();
587
588        // Fill to capacity (256 distinct saga_ids).
589        for i in 1..=SAGA_LRU_CAP as u64 {
590            let cmd = Command::SpawnPoolWindow { saga_id: i };
591            let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
592            assert_eq!(outcome, DispatchOutcome::Fresh);
593        }
594        assert_eq!(lru.lock().len(), SAGA_LRU_CAP);
595
596        // 257th distinct command — accepted; oldest (saga_id=1) evicted.
597        let cmd = Command::SpawnPoolWindow {
598            saga_id: SAGA_LRU_CAP as u64 + 1,
599        };
600        let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
601        assert_eq!(outcome, DispatchOutcome::Fresh);
602        assert_eq!(lru.lock().len(), SAGA_LRU_CAP);
603
604        // Verify saga_id=1 is gone (replaying it now would be a
605        // fresh call, not a duplicate).
606        let replay = Command::SpawnPoolWindow { saga_id: 1 };
607        let outcome = dispatch_host_command(&replay, &runner, &lru, &tx);
608        assert_eq!(
609            outcome,
610            DispatchOutcome::Fresh,
611            "evicted entry must NOT be served from cache"
612        );
613
614        // Verify saga_id=257 is still cached (recent).
615        let replay = Command::SpawnPoolWindow {
616            saga_id: SAGA_LRU_CAP as u64 + 1,
617        };
618        let outcome = dispatch_host_command(&replay, &runner, &lru, &tx);
619        assert_eq!(outcome, DispatchOutcome::Duplicate);
620    }
621
622    #[test]
623    fn distinct_kinds_with_same_saga_id_dont_collide() {
624        let runner = CountingRunner::new();
625        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
626        let (tx, _rx) = mpsc::unbounded_channel();
627
628        // Same saga_id, different kinds — both should be Fresh.
629        let spawn = Command::SpawnPoolWindow { saga_id: 5 };
630        let reap = Command::ReapPanes {
631            label: "x".to_string(),
632            saga_id: 5,
633        };
634        assert_eq!(
635            dispatch_host_command(&spawn, &runner, &lru, &tx),
636            DispatchOutcome::Fresh
637        );
638        assert_eq!(
639            dispatch_host_command(&reap, &runner, &lru, &tx),
640            DispatchOutcome::Fresh
641        );
642
643        // Re-issue both — both Duplicate.
644        assert_eq!(
645            dispatch_host_command(&spawn, &runner, &lru, &tx),
646            DispatchOutcome::Duplicate
647        );
648        assert_eq!(
649            dispatch_host_command(&reap, &runner, &lru, &tx),
650            DispatchOutcome::Duplicate
651        );
652    }
653
654    #[test]
655    fn saga_id_zero_bypasses_lru() {
656        let runner = CountingRunner::new();
657        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
658        let (tx, mut rx) = mpsc::unbounded_channel();
659
660        // saga_id=0 is the "no saga" sentinel; LRU must not track.
661        let cmd = Command::SpawnPoolWindow { saga_id: 0 };
662        for _ in 0..3 {
663            let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
664            assert_eq!(outcome, DispatchOutcome::NoSagaBypass);
665        }
666        assert_eq!(runner.spawn_calls.load(Ordering::SeqCst), 3);
667        assert_eq!(lru.lock().len(), 0);
668
669        let replies = drain_replies(&mut rx);
670        assert_eq!(replies.len(), 3);
671        for r in &replies {
672            match r {
673                Command::ReportPoolWindowAdded { saga_id, .. } => {
674                    assert_eq!(*saga_id, None, "no-saga bypass must report None");
675                }
676                _ => panic!("unexpected reply"),
677            }
678        }
679    }
680
681    #[test]
682    fn unrecognized_command_is_logged_and_dropped() {
683        let runner = CountingRunner::new();
684        let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
685        let (tx, mut rx) = mpsc::unbounded_channel();
686
687        // A `Report*` Command flowing the wrong way (host → host
688        // is nonsense; the launcher sends Reports? — no, Reports
689        // are host → launcher only) — verify we don't try to act
690        // on it.
691        let bogus = Command::ReportPanesReaped {
692            label: "x".to_string(),
693            saga_id: Some(1),
694        };
695        let outcome = dispatch_host_command(&bogus, &runner, &lru, &tx);
696        assert_eq!(outcome, DispatchOutcome::Unrecognized);
697        assert_eq!(runner.reap_calls.load(Ordering::SeqCst), 0);
698        assert!(drain_replies(&mut rx).is_empty());
699    }
700
701    #[test]
702    fn lru_get_promotes_to_most_recent() {
703        let mut lru = SagaIdempotencyLru::new(3);
704        for i in 1..=3u64 {
705            lru.insert(
706                i,
707                CommandKind::SpawnPoolWindow,
708                Command::ReportPoolWindowAdded {
709                    label: format!("w{}", i),
710                    saga_id: Some(i),
711                },
712            );
713        }
714        assert_eq!(lru.oldest_saga_id(), Some(1));
715        // Touch saga_id=1 — it should move to most-recent.
716        let _ = lru.get(1, CommandKind::SpawnPoolWindow);
717        assert_eq!(lru.oldest_saga_id(), Some(2));
718        // Insert a new entry; saga_id=2 (now oldest) gets evicted,
719        // not saga_id=1 (which we just touched).
720        lru.insert(
721            4,
722            CommandKind::SpawnPoolWindow,
723            Command::ReportPoolWindowAdded {
724                label: "w4".to_string(),
725                saga_id: Some(4),
726            },
727        );
728        assert_eq!(lru.oldest_saga_id(), Some(3));
729        assert!(lru.get(2, CommandKind::SpawnPoolWindow).is_none());
730        assert!(lru.get(1, CommandKind::SpawnPoolWindow).is_some());
731    }
732}