1use std::sync::Arc;
23
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::sync::Mutex;
26
27#[cfg(target_os = "windows")]
28use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
29
30use agentmux_common::ipc::{ClientKind, Command, ErrorCode, Event};
31
32use crate::reducer;
33use crate::state::State;
34
35#[derive(Debug)]
36pub struct ServerCtx {
37 pub srv_pid: u32,
38 pub srv_version: String,
39 pub state: Arc<Mutex<State>>,
40 pub events_tx: tokio::sync::broadcast::Sender<Event>,
41 pub event_log: Arc<crate::event_log::EventLog>,
42}
43
44#[cfg(target_os = "windows")]
45pub fn bind_first_pipe_instance(pipe_name: &str) -> std::io::Result<NamedPipeServer> {
46 ServerOptions::new()
47 .first_pipe_instance(true)
48 .create(pipe_name)
49}
50
51#[cfg(target_os = "windows")]
52pub fn run_srv_ipc_server(
53 pipe_name: String,
54 first: NamedPipeServer,
55 ctx: ServerCtx,
56) -> tokio::task::JoinHandle<()> {
57 tokio::spawn(async move {
58 let ctx = Arc::new(ctx);
59 tracing::info!(target: "srv-ipc", "[srv-ipc] server starting on {}", pipe_name);
60
61 let mut current = first;
62
63 loop {
64 if let Err(e) = current.connect().await {
65 tracing::warn!(target: "srv-ipc", "[srv-ipc] connect failed: {} — recreating instance", e);
66 current = match ServerOptions::new().create(&pipe_name) {
67 Ok(s) => s,
68 Err(create_err) => {
69 tracing::error!(target: "srv-ipc", "[srv-ipc] FATAL: failed to recreate pipe after connect error: {}", create_err);
70 return;
71 }
72 };
73 continue;
74 }
75
76 let accepted = current;
77 current = match ServerOptions::new().create(&pipe_name) {
78 Ok(s) => s,
79 Err(e) => {
80 tracing::error!(target: "srv-ipc", "[srv-ipc] FATAL: failed to create next pipe instance: {}", e);
81 tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
82 return;
83 }
84 };
85
86 tokio::spawn(handle_connection(accepted, Arc::clone(&ctx)));
87 }
88 })
89}
90
91#[cfg(not(target_os = "windows"))]
92pub fn run_srv_ipc_server(
93 _pipe_name: String,
94 _ctx: ServerCtx,
95) -> tokio::task::JoinHandle<()> {
96 tokio::spawn(async {})
98}
99
100#[cfg(target_os = "windows")]
101async fn handle_connection(stream: NamedPipeServer, ctx: Arc<ServerCtx>) {
102 let (read_half, write_half) = tokio::io::split(stream);
103 let writer = Arc::new(Mutex::new(write_half));
104 let reader = BufReader::new(read_half);
105 let mut lines = reader.lines();
106
107 let mut registered_kind: Option<ClientKind> = None;
108 let mut registered_pid: Option<u32> = None;
109 let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
110
111 let fanout_handle = {
115 let writer = Arc::clone(&writer);
116 let mut events_rx = ctx.events_tx.subscribe();
117 tokio::spawn(async move {
118 loop {
119 match events_rx.recv().await {
120 Ok(event) => {
121 if send_event(&writer, event).await.is_err() {
124 return;
125 }
126 }
127 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
128 tracing::warn!(target: "srv-ipc", "[srv-ipc] conn_id={} lagged event bus, missed {} events", conn_id, n);
129 }
130 Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
131 }
132 }
133 })
134 };
135
136 loop {
137 let line = match lines.next_line().await {
138 Ok(Some(l)) => l,
139 Ok(None) => {
140 tracing::info!(target: "srv-ipc", "[srv-ipc] connection conn_id={} closed (kind={:?}, pid={:?})", conn_id, registered_kind, registered_pid);
141 dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
146 fanout_handle.abort();
147 return;
148 }
149 Err(e) => {
150 tracing::warn!(target: "srv-ipc", "[srv-ipc] read error conn_id={}: {}", conn_id, e);
151 dispatch_synthetic_goodbye(&ctx, conn_id, registered_pid).await;
152 fanout_handle.abort();
153 return;
154 }
155 };
156
157 if line.trim().is_empty() {
158 continue;
159 }
160
161 let cmd = match serde_json::from_str::<Command>(&line) {
162 Ok(c) => c,
163 Err(e) => {
164 let _ = send_event(
172 &writer,
173 Event::Error {
174 code: ErrorCode::InvalidCommand,
175 message: format!("parse failed: {}", e),
176 fatal: false,
177 version: 0,
178 },
179 )
180 .await;
181 continue;
182 }
183 };
184
185 if let Some(reply) = enforce_register_first(&cmd, ®istered_kind, &ctx).await {
187 let close = matches!(&reply, Event::Error { fatal: true, .. });
188 let _ = send_event(&writer, reply).await;
189 if close {
190 fanout_handle.abort();
191 return;
192 }
193 continue;
194 }
195 if let Command::Register { .. } = &cmd {
196 if registered_kind.is_some() {
197 let _ = send_event(
200 &writer,
201 Event::Error {
202 code: ErrorCode::AlreadyRegistered,
203 message: "Register sent twice on the same connection".into(),
204 fatal: false,
205 version: 0,
206 },
207 )
208 .await;
209 continue;
210 }
211 }
212
213 let pre_register = if let Command::Register { kind, pid, .. } = &cmd {
214 Some((*kind, *pid))
215 } else {
216 None
217 };
218
219 if let Command::GetEvents { since } = &cmd {
230 let v = {
237 let state = ctx.state.lock().await;
238 state.event_version
239 };
240 let replay = ctx.event_log.events_since(*since);
241 if ctx.event_log.replay_truncated(*since) {
242 tracing::warn!(target: "srv-ipc", "[srv-ipc] conn_id={} GetEvents since={} truncated", conn_id, since);
243 }
244 let _ = send_event(
245 &writer,
246 Event::EventList {
247 events: replay,
248 version: v,
249 },
250 )
251 .await;
252 continue;
253 }
254
255 let now_rfc3339 = chrono::Utc::now().to_rfc3339();
256 let events = {
257 let mut state = ctx.state.lock().await;
258 let rctx = reducer::Ctx {
259 now_rfc3339,
260 conn_id,
261 registered_pid,
262 };
263 reducer::update(&mut state, cmd.clone(), &rctx)
264 };
265
266 if let Some((kind, pid)) = pre_register {
267 let rejected = events
268 .iter()
269 .any(|e| matches!(e, Event::Error { code: ErrorCode::AlreadyRegistered, .. }));
270 if !rejected {
271 registered_kind = Some(kind);
272 registered_pid = Some(pid);
273 }
274 }
275
276 let goodbye = matches!(cmd, Command::Goodbye);
277 for event in events {
278 let event = patch_srv_identity(event, &ctx);
282 if !matches!(
287 event,
288 Event::Snapshot { .. }
289 | Event::EventList { .. }
290 | Event::SrvSnapshot { .. }
291 | Event::Error { .. }
292 ) {
293 ctx.event_log.append(event.clone());
294 }
295 let _ = ctx.events_tx.send(event);
296 }
297 if goodbye {
298 tracing::info!(target: "srv-ipc", "[srv-ipc] goodbye from conn_id={} kind={:?} pid={:?}", conn_id, registered_kind, registered_pid);
299 fanout_handle.abort();
300 return;
301 }
302 }
303}
304
305#[cfg(target_os = "windows")]
316async fn dispatch_synthetic_goodbye(
317 ctx: &Arc<ServerCtx>,
318 conn_id: u64,
319 registered_pid: Option<u32>,
320) {
321 let Some(pid) = registered_pid else {
322 return;
323 };
324 let now_rfc3339 = chrono::Utc::now().to_rfc3339();
325 let events = {
326 let mut state = ctx.state.lock().await;
327 let rctx = reducer::Ctx {
328 now_rfc3339,
329 conn_id,
330 registered_pid: Some(pid),
331 };
332 reducer::update(&mut state, Command::Goodbye, &rctx)
333 };
334 for event in events {
335 let event = patch_srv_identity(event, ctx);
336 if !matches!(
337 event,
338 Event::Snapshot { .. }
339 | Event::EventList { .. }
340 | Event::SrvSnapshot { .. }
341 | Event::Error { .. }
342 ) {
343 ctx.event_log.append(event.clone());
344 }
345 let _ = ctx.events_tx.send(event);
346 }
347}
348
349static NEXT_CONN_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
350
351#[cfg(target_os = "windows")]
353async fn enforce_register_first(
354 cmd: &Command,
355 registered_kind: &Option<ClientKind>,
356 ctx: &Arc<ServerCtx>,
357) -> Option<Event> {
358 if registered_kind.is_some() {
359 return None;
360 }
361 let (msg, fatal) = match cmd {
362 Command::Register { .. } => return None,
363 Command::Ping { .. } => ("Ping before Register".to_string(), false),
364 Command::GetSrvSnapshot => ("GetSrvSnapshot before Register".to_string(), false),
365 Command::GetEvents { .. } => ("GetEvents before Register".to_string(), false),
366 Command::Goodbye => ("Goodbye before Register".to_string(), true),
367 _ => ("Command before Register".to_string(), false),
372 };
373 let v = 0;
376 Some(Event::Error {
382 code: ErrorCode::NotRegistered,
383 message: msg,
384 fatal,
385 version: v,
386 })
387}
388
389fn patch_srv_identity(event: Event, ctx: &Arc<ServerCtx>) -> Event {
394 if let Event::Registered {
395 client_id,
396 launcher_pid,
397 launcher_version,
398 version,
399 } = event
400 {
401 if launcher_pid == 0 && launcher_version.is_empty() {
402 return Event::Registered {
403 client_id,
404 launcher_pid: ctx.srv_pid,
405 launcher_version: ctx.srv_version.clone(),
406 version,
407 };
408 }
409 return Event::Registered {
410 client_id,
411 launcher_pid,
412 launcher_version,
413 version,
414 };
415 }
416 event
417}
418
419#[cfg(target_os = "windows")]
420async fn send_event(
421 writer: &Arc<Mutex<tokio::io::WriteHalf<NamedPipeServer>>>,
422 event: Event,
423) -> std::io::Result<()> {
424 let mut buf = serde_json::to_vec(&event).map_err(|e| {
425 std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
426 })?;
427 buf.push(b'\n');
428 let mut w = writer.lock().await;
429 w.write_all(&buf).await?;
430 w.flush().await?;
431 Ok(())
432}