agentmux_cef/saga_dispatch.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Phase CPD-5 — host-side saga command dispatch + idempotency LRU.
5//
6// After CPD-1+2+3 merged, the launcher dispatches `IssueCmd::Host`
7// saga commands carrying `saga_id` over the launcher → host pipe.
8// CPD-3 made the wire live; this module makes host processing
9// idempotent so a launcher retry (e.g. after host pipe reconnect
10// drains the launcher's `pending_buffer`) does NOT re-execute the
11// same command — instead the host re-emits the same `Report*` reply
12// it produced the first time.
13//
14// **Why not just dedupe at the launcher?** The launcher already does
15// best-effort dedupe via single-flight per saga, but the buffer →
16// reconnect → drain path can legitimately re-deliver a command if
17// the host crashed mid-processing (the launcher has no way to know
18// whether the host saw the original send). Defense-in-depth: launcher
19// avoids resends when it can; host absorbs the rest with this LRU.
20//
21// **Key:** `(saga_id, CommandKind)`. `saga_id` alone is not enough —
22// a future feature could legitimately issue multiple distinct host
23// commands inside one saga (e.g. a saga that both reaps panes AND
24// drains the pool, each with a different `Command` variant). Keying
25// on `(saga_id, kind)` lets the LRU hold one entry per (saga, action
26// type) pair without collisions.
27//
28// **Bound:** 256 entries. Saga rate is human-driven (window opens /
29// closes); 256 covers far more than any realistic concurrent-saga
30// burst. Drop-oldest on overflow.
31//
32// **Test coverage:** see `#[cfg(test)] mod tests` at the bottom —
33// covers (a) duplicate command re-emits same Report without re-action,
34// (b) LRU eviction at the 257th distinct entry preserves recency
35// ordering.
36
37use std::collections::VecDeque;
38use std::sync::Arc;
39
40use agentmux_common::ipc::Command;
41use parking_lot::Mutex;
42use tokio::sync::mpsc::UnboundedSender;
43
44/// Discriminator for the host-bound saga command variants. Tracks
45/// the three commands the host actually consumes today
46/// (`SpawnPoolWindow`, `ReapPanes`, `DrainPoolIfLast`); future host-
47/// bound commands add a variant here.
48///
49/// Used as half of the LRU key `(saga_id, CommandKind)` so the same
50/// `saga_id` can carry multiple distinct host actions (a saga that
51/// reaps panes AND drains pool, each with a different Command kind)
52/// without collision.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum CommandKind {
55 SpawnPoolWindow,
56 ReapPanes,
57 DrainPoolIfLast,
58}
59
60impl CommandKind {
61 /// Extract the discriminator from a host-bound saga `Command`.
62 /// Returns `None` for commands that aren't host-bound saga
63 /// payloads (e.g. `Report*` Commands flowing in the OTHER
64 /// direction, or `Register` / `Ping`).
65 pub fn from_command(cmd: &Command) -> Option<Self> {
66 match cmd {
67 Command::SpawnPoolWindow { .. } => Some(Self::SpawnPoolWindow),
68 Command::ReapPanes { .. } => Some(Self::ReapPanes),
69 Command::DrainPoolIfLast { .. } => Some(Self::DrainPoolIfLast),
70 _ => None,
71 }
72 }
73}
74
75/// Maximum number of `(saga_id, kind)` entries held in the LRU.
76/// Spec §3.7: bound 256 — far above any realistic concurrent saga
77/// count.
78pub const SAGA_LRU_CAP: usize = 256;
79
80/// Idempotency LRU keyed by `(saga_id, CommandKind)`. Stores the
81/// resulting `Report*` Command so a duplicate dispatch re-emits the
82/// same reply payload without re-running the action.
83///
84/// Implementation: simple `VecDeque` of `(key, report)` pairs. Front
85/// is oldest, back is most-recent. Linear scan on lookup is fine at
86/// `cap=256`: the entire scan is a few microseconds and only happens
87/// once per saga command (a human-rate event).
88pub struct SagaIdempotencyLru {
89 entries: VecDeque<((u64, CommandKind), Command)>,
90 cap: usize,
91}
92
93impl SagaIdempotencyLru {
94 pub fn new(cap: usize) -> Self {
95 Self {
96 entries: VecDeque::with_capacity(cap),
97 cap,
98 }
99 }
100
101 pub fn with_default_cap() -> Self {
102 Self::new(SAGA_LRU_CAP)
103 }
104
105 /// Look up the cached Report for `(saga_id, kind)`. On hit,
106 /// promotes the entry to most-recent (back of the deque) and
107 /// returns a clone of the cached Report. On miss, returns None.
108 pub fn get(&mut self, saga_id: u64, kind: CommandKind) -> Option<Command> {
109 let key = (saga_id, kind);
110 let pos = self.entries.iter().position(|(k, _)| *k == key)?;
111 // Promote to most-recent: remove + push to back.
112 let (k, v) = self.entries.remove(pos)?;
113 let cloned = v.clone();
114 self.entries.push_back((k, v));
115 Some(cloned)
116 }
117
118 /// Cache the Report for `(saga_id, kind)`. If at capacity, drops
119 /// the oldest entry (front). If a duplicate key exists, updates
120 /// in place + promotes (defensive — caller usually hits via
121 /// `get` first; this branch covers a race where two duplicate
122 /// commands raced through `get` with both seeing miss).
123 pub fn insert(&mut self, saga_id: u64, kind: CommandKind, report: Command) {
124 let key = (saga_id, kind);
125 if let Some(pos) = self.entries.iter().position(|(k, _)| *k == key) {
126 // Defensive duplicate — replace + promote.
127 self.entries.remove(pos);
128 } else if self.entries.len() >= self.cap {
129 // Drop-oldest on overflow.
130 self.entries.pop_front();
131 }
132 self.entries.push_back((key, report));
133 }
134
135 /// Current number of entries (for tests + diagnostics).
136 pub fn len(&self) -> usize {
137 self.entries.len()
138 }
139
140 /// Return the saga_id of the oldest entry, or None if empty.
141 /// Used in tests to verify drop-oldest semantics.
142 #[cfg(test)]
143 pub fn oldest_saga_id(&self) -> Option<u64> {
144 self.entries.front().map(|(k, _)| k.0)
145 }
146}
147
148impl Default for SagaIdempotencyLru {
149 fn default() -> Self {
150 Self::with_default_cap()
151 }
152}
153
154/// Outcome of `dispatch_host_command`: tells the caller whether the
155/// dispatch was a fresh execution (action ran, Report was inserted
156/// + sent) or a duplicate (Report was re-sent from cache, action
157/// did NOT re-run).
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum DispatchOutcome {
160 /// First-time dispatch: action executed, cached Report sent.
161 Fresh,
162 /// Duplicate dispatch: cached Report re-emitted, no action.
163 Duplicate,
164 /// Command had no `saga_id` (or `saga_id == 0`, the
165 /// "no-saga" sentinel) — the LRU is bypassed and the action
166 /// runs unconditionally. This branch is mostly defensive: the
167 /// launcher's CPD-3 wiring always issues with a real saga_id,
168 /// but a stray legacy / forward-compat dispatch could hit this.
169 NoSagaBypass,
170 /// Command kind isn't a known host-bound saga payload (e.g.
171 /// the reader received a `Report*` echo that should never have
172 /// been sent down to the host). Logged at warn; no action, no
173 /// reply.
174 Unrecognized,
175}
176
177/// Dispatcher trait — abstracted so tests can inject a fake action
178/// runner without spawning real CEF windows. Production code uses
179/// `LiveActionRunner` which calls into `commands::window_pool` /
180/// host close-path code.
181pub trait SagaActionRunner: Send + Sync {
182 /// Run the `SpawnPoolWindow` action. Returns the resulting
183 /// pool window label (the host normally synthesizes a
184 /// `window-pool-<uuid>` label; for the saga reply we report
185 /// "pending" so the launcher reducer can correlate the
186 /// follow-up `ReportPoolWindowAdded` organic event by saga_id).
187 /// In production the spawn is fire-and-forget on the UI thread
188 /// — the actual label is reported via the existing organic
189 /// `report_pool_window_added` path. The saga's `Report` here
190 /// carries an empty/sentinel label indicating "spawn requested,
191 /// pool will fill asynchronously."
192 fn spawn_pool_window(&self) -> String;
193
194 /// Run the `ReapPanes` action for the named window. In current
195 /// architecture the host's `on_before_close` already drains
196 /// panes synchronously when a window closes, so this is a
197 /// best-effort acknowledge — the saga relies on the organic
198 /// `Event::PanesReaped` (via `report_panes_reaped` in
199 /// `client.rs`) for the real signal. The Report this returns
200 /// is a saga-correlated echo so the saga's `expected_saga_id`
201 /// filter matches.
202 fn reap_panes(&self, label: &str);
203
204 /// Run the `DrainPoolIfLast` action for the named window.
205 /// Returns true if the host considers `label` to have been the
206 /// last user-visible window (i.e. a drain WOULD be triggered).
207 /// Like `reap_panes`, the host's existing `on_before_close`
208 /// already does this decision inline; the saga's command path
209 /// is a re-issue / confirmation channel.
210 fn drain_pool_if_last(&self, label: &str) -> bool;
211}
212
213/// Dispatch a host-bound saga `Command`: check the LRU, run the
214/// action if not cached, build the corresponding `Report*`,
215/// cache it, and send via `reply_tx`. Returns the outcome so callers
216/// can log / count fresh vs. duplicate dispatch.
217///
218/// `lru` is a shared `Arc<Mutex<...>>` so the read-loop task and
219/// any future direct-dispatch path share a single cache.
220pub fn dispatch_host_command<R: SagaActionRunner>(
221 cmd: &Command,
222 runner: &R,
223 lru: &Arc<Mutex<SagaIdempotencyLru>>,
224 reply_tx: &UnboundedSender<Command>,
225) -> DispatchOutcome {
226 let kind = match CommandKind::from_command(cmd) {
227 Some(k) => k,
228 None => {
229 tracing::warn!(
230 "[saga-dispatch] received non-host-bound command on host pipe: {:?}",
231 cmd
232 );
233 return DispatchOutcome::Unrecognized;
234 }
235 };
236
237 // Extract saga_id from the command. For each variant the field
238 // name is `saga_id: u64`. `0` is the "no saga" sentinel per
239 // CPD-1 spec — bypass the LRU in that case (the action just
240 // runs without dedupe; useful for legacy / forward-compat
241 // launchers that stamp `0`).
242 let saga_id = match cmd {
243 Command::SpawnPoolWindow { saga_id } => *saga_id,
244 Command::ReapPanes { saga_id, .. } => *saga_id,
245 Command::DrainPoolIfLast { saga_id, .. } => *saga_id,
246 _ => unreachable!("kind matched but variant didn't — schema drift"),
247 };
248
249 if saga_id == 0 {
250 // Legacy / no-saga path: run action, build Report with
251 // saga_id = None (organic), send. No LRU touch.
252 let report = build_and_run_report(cmd, kind, runner, None);
253 let _ = reply_tx.send(report);
254 return DispatchOutcome::NoSagaBypass;
255 }
256
257 // Hot path — saga_id present. Check LRU.
258 {
259 let mut guard = lru.lock();
260 if let Some(cached_report) = guard.get(saga_id, kind) {
261 tracing::info!(
262 "[saga-dispatch] duplicate saga command (saga_id={}, kind={:?}) — re-emitting cached report",
263 saga_id, kind
264 );
265 // Send the cached report; do NOT re-run the action.
266 let _ = reply_tx.send(cached_report);
267 return DispatchOutcome::Duplicate;
268 }
269 }
270
271 // Miss — run action, build Report, cache, send. We hold no
272 // lock across the action call (action may post UI tasks /
273 // touch CEF state).
274 let report = build_and_run_report(cmd, kind, runner, Some(saga_id));
275 {
276 let mut guard = lru.lock();
277 guard.insert(saga_id, kind, report.clone());
278 }
279 let _ = reply_tx.send(report);
280 DispatchOutcome::Fresh
281}
282
283/// Run the action for `cmd` and synthesize the corresponding
284/// `Report*` Command. `saga_id_for_report` is what the Report's
285/// echo field carries — `Some(N)` when the dispatch was saga-driven,
286/// `None` for the no-saga bypass path.
287fn build_and_run_report<R: SagaActionRunner>(
288 cmd: &Command,
289 kind: CommandKind,
290 runner: &R,
291 saga_id_for_report: Option<u64>,
292) -> Command {
293 match (cmd, kind) {
294 (Command::SpawnPoolWindow { .. }, CommandKind::SpawnPoolWindow) => {
295 let label = runner.spawn_pool_window();
296 Command::ReportPoolWindowAdded {
297 label,
298 saga_id: saga_id_for_report,
299 }
300 }
301 (Command::ReapPanes { label, .. }, CommandKind::ReapPanes) => {
302 runner.reap_panes(label);
303 Command::ReportPanesReaped {
304 label: label.clone(),
305 saga_id: saga_id_for_report,
306 }
307 }
308 (Command::DrainPoolIfLast { label, .. }, CommandKind::DrainPoolIfLast) => {
309 let was_last = runner.drain_pool_if_last(label);
310 Command::ReportPoolDrainDecision {
311 label: label.clone(),
312 was_last,
313 saga_id: saga_id_for_report,
314 }
315 }
316 _ => unreachable!("kind/cmd mismatch — guarded by from_command()"),
317 }
318}
319
320// ── Production action runner ──────────────────────────────────────
321//
322// Wraps real host code paths. Kept thin so the test runner can
323// substitute deterministic stubs without pulling in CEF.
324
325#[cfg(target_os = "windows")]
326pub struct LiveActionRunner {
327 pub state: Arc<crate::state::AppState>,
328}
329
330#[cfg(target_os = "windows")]
331impl SagaActionRunner for LiveActionRunner {
332 fn spawn_pool_window(&self) -> String {
333 // Fire the real spawn. The host's existing
334 // `report_pool_window_added` organic path will report the
335 // freshly minted label; the saga-correlated reply this
336 // dispatcher emits carries an empty label as a sentinel —
337 // the launcher reducer matches by saga_id, not label.
338 crate::commands::window_pool::spawn_pool_window(&self.state);
339 String::new()
340 }
341
342 fn reap_panes(&self, label: &str) {
343 // Host's `on_before_close` is the canonical pane-reaper.
344 // A saga-issued `ReapPanes` for a window whose close is
345 // already in flight is a redundant nudge — log and rely
346 // on the organic `report_panes_reaped` for the real signal.
347 // Future expansion (forced-close-from-saga) can hook here.
348 tracing::info!(
349 "[saga-dispatch] ReapPanes saga command for label={} — acknowledged (host close-path is canonical reaper)",
350 label
351 );
352 }
353
354 fn drain_pool_if_last(&self, label: &str) -> bool {
355 // Compute the same condition `on_before_close` uses to
356 // decide drain. MUST mirror that gate exactly: same pool
357 // inventory (unpromoted ∪ queue), same atomic-snapshot
358 // discipline. A two-lock variant or unpromoted-only check
359 // here lets a queued pool window inflate `user_count` and
360 // suppress the drain when the user actually did close
361 // their last visible window.
362 let (pool_inventory, browsers) = self.state.user_visibility_snapshot();
363 let labels: Vec<String> = browsers.into_iter().map(|(l, _)| l).collect();
364 let user_count = labels
365 .iter()
366 .filter(|k| !pool_inventory.contains(k.as_str()) && !k.starts_with("browser-pane-"))
367 .count();
368 // `was_last` semantics: closing window is the last user-
369 // visible window. Caller's `label` should be subtracted —
370 // but at saga-dispatch time the close hasn't happened yet
371 // (or has just happened); count of 0 OR count of 1 with
372 // the closing window in the set both indicate "last."
373 let label_present = labels.iter().any(|k| k == label);
374 user_count == 0 || (user_count == 1 && label_present)
375 }
376}
377
378// ── Tests ─────────────────────────────────────────────────────────
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use std::sync::atomic::{AtomicUsize, Ordering};
384 use tokio::sync::mpsc;
385
386 /// Test runner: counts how many times each action runs so
387 /// duplicate-dispatch tests can verify idempotency.
388 struct CountingRunner {
389 spawn_calls: AtomicUsize,
390 reap_calls: AtomicUsize,
391 drain_calls: AtomicUsize,
392 drain_returns_was_last: bool,
393 }
394
395 impl CountingRunner {
396 fn new() -> Self {
397 Self {
398 spawn_calls: AtomicUsize::new(0),
399 reap_calls: AtomicUsize::new(0),
400 drain_calls: AtomicUsize::new(0),
401 drain_returns_was_last: false,
402 }
403 }
404 }
405
406 impl SagaActionRunner for CountingRunner {
407 fn spawn_pool_window(&self) -> String {
408 let n = self.spawn_calls.fetch_add(1, Ordering::SeqCst);
409 format!("window-pool-test-{}", n)
410 }
411
412 fn reap_panes(&self, _label: &str) {
413 self.reap_calls.fetch_add(1, Ordering::SeqCst);
414 }
415
416 fn drain_pool_if_last(&self, _label: &str) -> bool {
417 self.drain_calls.fetch_add(1, Ordering::SeqCst);
418 self.drain_returns_was_last
419 }
420 }
421
422 fn drain_replies(rx: &mut mpsc::UnboundedReceiver<Command>) -> Vec<Command> {
423 let mut out = Vec::new();
424 while let Ok(cmd) = rx.try_recv() {
425 out.push(cmd);
426 }
427 out
428 }
429
430 #[test]
431 fn duplicate_spawn_pool_window_does_not_respawn_but_reemits_report() {
432 let runner = CountingRunner::new();
433 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
434 let (tx, mut rx) = mpsc::unbounded_channel();
435
436 let cmd = Command::SpawnPoolWindow { saga_id: 42 };
437
438 // First dispatch — fresh.
439 let outcome1 = dispatch_host_command(&cmd, &runner, &lru, &tx);
440 assert_eq!(outcome1, DispatchOutcome::Fresh);
441 assert_eq!(runner.spawn_calls.load(Ordering::SeqCst), 1);
442
443 // Second dispatch with same saga_id — duplicate; should
444 // NOT re-spawn, but SHOULD re-emit the same Report.
445 let outcome2 = dispatch_host_command(&cmd, &runner, &lru, &tx);
446 assert_eq!(outcome2, DispatchOutcome::Duplicate);
447 assert_eq!(
448 runner.spawn_calls.load(Ordering::SeqCst),
449 1,
450 "spawn must not re-execute on duplicate"
451 );
452
453 let replies = drain_replies(&mut rx);
454 assert_eq!(replies.len(), 2, "expected one reply per dispatch");
455 // Both replies must serialize to byte-identical JSON
456 // (Command itself isn't PartialEq because it carries
457 // some non-comparable nested types).
458 let j0 = serde_json::to_string(&replies[0]).unwrap();
459 let j1 = serde_json::to_string(&replies[1]).unwrap();
460 assert_eq!(j0, j1, "duplicate dispatch must re-emit identical Report");
461 match &replies[0] {
462 Command::ReportPoolWindowAdded { label, saga_id } => {
463 assert_eq!(label, "window-pool-test-0");
464 assert_eq!(*saga_id, Some(42));
465 }
466 other => panic!("unexpected reply: {:?}", other),
467 }
468 }
469
470 #[test]
471 fn duplicate_reap_panes_does_not_rerun_but_reemits_report() {
472 let runner = CountingRunner::new();
473 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
474 let (tx, mut rx) = mpsc::unbounded_channel();
475
476 let cmd = Command::ReapPanes {
477 label: "win-1".to_string(),
478 saga_id: 7,
479 };
480
481 assert_eq!(
482 dispatch_host_command(&cmd, &runner, &lru, &tx),
483 DispatchOutcome::Fresh
484 );
485 assert_eq!(
486 dispatch_host_command(&cmd, &runner, &lru, &tx),
487 DispatchOutcome::Duplicate
488 );
489 assert_eq!(runner.reap_calls.load(Ordering::SeqCst), 1);
490
491 let replies = drain_replies(&mut rx);
492 assert_eq!(replies.len(), 2);
493 let j0 = serde_json::to_string(&replies[0]).unwrap();
494 let j1 = serde_json::to_string(&replies[1]).unwrap();
495 assert_eq!(j0, j1);
496 }
497
498 #[test]
499 fn duplicate_drain_pool_if_last_reuses_was_last_decision() {
500 let mut runner = CountingRunner::new();
501 runner.drain_returns_was_last = true;
502 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
503 let (tx, mut rx) = mpsc::unbounded_channel();
504
505 let cmd = Command::DrainPoolIfLast {
506 label: "win-1".to_string(),
507 saga_id: 99,
508 };
509
510 assert_eq!(
511 dispatch_host_command(&cmd, &runner, &lru, &tx),
512 DispatchOutcome::Fresh
513 );
514 assert_eq!(
515 dispatch_host_command(&cmd, &runner, &lru, &tx),
516 DispatchOutcome::Duplicate
517 );
518 assert_eq!(runner.drain_calls.load(Ordering::SeqCst), 1);
519
520 let replies = drain_replies(&mut rx);
521 assert_eq!(replies.len(), 2);
522 // Even though the runner's `drain_returns_was_last` was
523 // captured at first call, the second reply must echo the
524 // same `was_last` (defense against runner state changing
525 // mid-flight — the LRU MUST hold the original decision).
526 match (&replies[0], &replies[1]) {
527 (
528 Command::ReportPoolDrainDecision {
529 was_last: a,
530 saga_id: id_a,
531 ..
532 },
533 Command::ReportPoolDrainDecision {
534 was_last: b,
535 saga_id: id_b,
536 ..
537 },
538 ) => {
539 assert_eq!(a, b);
540 assert_eq!(*a, true);
541 assert_eq!(id_a, id_b);
542 assert_eq!(*id_a, Some(99));
543 }
544 other => panic!("unexpected reply pair: {:?}", other),
545 }
546 }
547
548 #[test]
549 fn lru_evicts_oldest_at_capacity() {
550 // Use a small cap to keep test runtime cheap.
551 let cap = 4;
552 let mut lru = SagaIdempotencyLru::new(cap);
553 for i in 0..cap as u64 {
554 lru.insert(
555 i,
556 CommandKind::SpawnPoolWindow,
557 Command::ReportPoolWindowAdded {
558 label: format!("w{}", i),
559 saga_id: Some(i),
560 },
561 );
562 }
563 assert_eq!(lru.len(), cap);
564 assert_eq!(lru.oldest_saga_id(), Some(0));
565
566 // Insert one more — saga_id=0 should be evicted (oldest).
567 lru.insert(
568 cap as u64,
569 CommandKind::SpawnPoolWindow,
570 Command::ReportPoolWindowAdded {
571 label: format!("w{}", cap),
572 saga_id: Some(cap as u64),
573 },
574 );
575 assert_eq!(lru.len(), cap);
576 assert_eq!(lru.oldest_saga_id(), Some(1));
577 assert!(lru.get(0, CommandKind::SpawnPoolWindow).is_none());
578 assert!(lru.get(cap as u64, CommandKind::SpawnPoolWindow).is_some());
579 }
580
581 #[test]
582 fn lru_eviction_at_257th_distinct_command() {
583 // Per spec §3.7: bound 256.
584 let runner = CountingRunner::new();
585 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
586 let (tx, _rx) = mpsc::unbounded_channel();
587
588 // Fill to capacity (256 distinct saga_ids).
589 for i in 1..=SAGA_LRU_CAP as u64 {
590 let cmd = Command::SpawnPoolWindow { saga_id: i };
591 let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
592 assert_eq!(outcome, DispatchOutcome::Fresh);
593 }
594 assert_eq!(lru.lock().len(), SAGA_LRU_CAP);
595
596 // 257th distinct command — accepted; oldest (saga_id=1) evicted.
597 let cmd = Command::SpawnPoolWindow {
598 saga_id: SAGA_LRU_CAP as u64 + 1,
599 };
600 let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
601 assert_eq!(outcome, DispatchOutcome::Fresh);
602 assert_eq!(lru.lock().len(), SAGA_LRU_CAP);
603
604 // Verify saga_id=1 is gone (replaying it now would be a
605 // fresh call, not a duplicate).
606 let replay = Command::SpawnPoolWindow { saga_id: 1 };
607 let outcome = dispatch_host_command(&replay, &runner, &lru, &tx);
608 assert_eq!(
609 outcome,
610 DispatchOutcome::Fresh,
611 "evicted entry must NOT be served from cache"
612 );
613
614 // Verify saga_id=257 is still cached (recent).
615 let replay = Command::SpawnPoolWindow {
616 saga_id: SAGA_LRU_CAP as u64 + 1,
617 };
618 let outcome = dispatch_host_command(&replay, &runner, &lru, &tx);
619 assert_eq!(outcome, DispatchOutcome::Duplicate);
620 }
621
622 #[test]
623 fn distinct_kinds_with_same_saga_id_dont_collide() {
624 let runner = CountingRunner::new();
625 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
626 let (tx, _rx) = mpsc::unbounded_channel();
627
628 // Same saga_id, different kinds — both should be Fresh.
629 let spawn = Command::SpawnPoolWindow { saga_id: 5 };
630 let reap = Command::ReapPanes {
631 label: "x".to_string(),
632 saga_id: 5,
633 };
634 assert_eq!(
635 dispatch_host_command(&spawn, &runner, &lru, &tx),
636 DispatchOutcome::Fresh
637 );
638 assert_eq!(
639 dispatch_host_command(&reap, &runner, &lru, &tx),
640 DispatchOutcome::Fresh
641 );
642
643 // Re-issue both — both Duplicate.
644 assert_eq!(
645 dispatch_host_command(&spawn, &runner, &lru, &tx),
646 DispatchOutcome::Duplicate
647 );
648 assert_eq!(
649 dispatch_host_command(&reap, &runner, &lru, &tx),
650 DispatchOutcome::Duplicate
651 );
652 }
653
654 #[test]
655 fn saga_id_zero_bypasses_lru() {
656 let runner = CountingRunner::new();
657 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
658 let (tx, mut rx) = mpsc::unbounded_channel();
659
660 // saga_id=0 is the "no saga" sentinel; LRU must not track.
661 let cmd = Command::SpawnPoolWindow { saga_id: 0 };
662 for _ in 0..3 {
663 let outcome = dispatch_host_command(&cmd, &runner, &lru, &tx);
664 assert_eq!(outcome, DispatchOutcome::NoSagaBypass);
665 }
666 assert_eq!(runner.spawn_calls.load(Ordering::SeqCst), 3);
667 assert_eq!(lru.lock().len(), 0);
668
669 let replies = drain_replies(&mut rx);
670 assert_eq!(replies.len(), 3);
671 for r in &replies {
672 match r {
673 Command::ReportPoolWindowAdded { saga_id, .. } => {
674 assert_eq!(*saga_id, None, "no-saga bypass must report None");
675 }
676 _ => panic!("unexpected reply"),
677 }
678 }
679 }
680
681 #[test]
682 fn unrecognized_command_is_logged_and_dropped() {
683 let runner = CountingRunner::new();
684 let lru = Arc::new(Mutex::new(SagaIdempotencyLru::with_default_cap()));
685 let (tx, mut rx) = mpsc::unbounded_channel();
686
687 // A `Report*` Command flowing the wrong way (host → host
688 // is nonsense; the launcher sends Reports? — no, Reports
689 // are host → launcher only) — verify we don't try to act
690 // on it.
691 let bogus = Command::ReportPanesReaped {
692 label: "x".to_string(),
693 saga_id: Some(1),
694 };
695 let outcome = dispatch_host_command(&bogus, &runner, &lru, &tx);
696 assert_eq!(outcome, DispatchOutcome::Unrecognized);
697 assert_eq!(runner.reap_calls.load(Ordering::SeqCst), 0);
698 assert!(drain_replies(&mut rx).is_empty());
699 }
700
701 #[test]
702 fn lru_get_promotes_to_most_recent() {
703 let mut lru = SagaIdempotencyLru::new(3);
704 for i in 1..=3u64 {
705 lru.insert(
706 i,
707 CommandKind::SpawnPoolWindow,
708 Command::ReportPoolWindowAdded {
709 label: format!("w{}", i),
710 saga_id: Some(i),
711 },
712 );
713 }
714 assert_eq!(lru.oldest_saga_id(), Some(1));
715 // Touch saga_id=1 — it should move to most-recent.
716 let _ = lru.get(1, CommandKind::SpawnPoolWindow);
717 assert_eq!(lru.oldest_saga_id(), Some(2));
718 // Insert a new entry; saga_id=2 (now oldest) gets evicted,
719 // not saga_id=1 (which we just touched).
720 lru.insert(
721 4,
722 CommandKind::SpawnPoolWindow,
723 Command::ReportPoolWindowAdded {
724 label: "w4".to_string(),
725 saga_id: Some(4),
726 },
727 );
728 assert_eq!(lru.oldest_saga_id(), Some(3));
729 assert!(lru.get(2, CommandKind::SpawnPoolWindow).is_none());
730 assert!(lru.get(1, CommandKind::SpawnPoolWindow).is_some());
731 }
732}