From e22ca2b1b1fcafcfbb3d8a41b531546c4feb9aa7 Mon Sep 17 00:00:00 2001 From: CEbbinghaus Date: Mon, 9 Sep 2024 23:05:30 +1000 Subject: [PATCH] Fixed whitespace and added more hearbeats --- backend/src/ws.rs | 217 +++++++++++++++++++++++----------------------- 1 file changed, 110 insertions(+), 107 deletions(-) diff --git a/backend/src/ws.rs b/backend/src/ws.rs index 5d2ac6f..74e504e 100644 --- a/backend/src/ws.rs +++ b/backend/src/ws.rs @@ -20,120 +20,123 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); #[get("/ws-test")] async fn ws(req: HttpRequest, body: web::Payload, sender: web::Data>) -> Result { - let (response, session, msg_stream) = actix_ws::handle(&req, body)?; + let (response, session, msg_stream) = actix_ws::handle(&req, body)?; - actix_rt::spawn(async move { - handle_ws(session, msg_stream, sender.into_inner()).await; - }); + actix_rt::spawn(async move { + handle_ws(session, msg_stream, sender.into_inner()).await; + }); - Ok(response) + Ok(response) } /// Broadcast text & binary messages received from a client, respond to ping messages, and monitor /// connection health to detect network issues and free up resources. pub async fn handle_ws( - mut session: actix_ws::Session, - mut msg_stream: actix_ws::MessageStream, - channel: Arc>, + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, + channel: Arc>, ) { - info!("connected"); - - let mut last_heartbeat = Instant::now(); - let mut interval = interval(HEARTBEAT_INTERVAL); - - let mut reciever = channel.subscribe(); - - let reason = loop { - // waits for either `msg_stream` to receive a message from the client, the broadcast channel - // to send a message, or the heartbeat interval timer to tick, yielding the value of - // whichever one is ready first - select! { - broadcast_msg = reciever.recv() => { - let msg = match broadcast_msg { - Ok(msg) => msg, - Err(error::RecvError::Closed) => break None, - Err(error::RecvError::Lagged(_)) => continue, - }; - - let res = session.text(Event::new(msg)).await; - - if let Err(err) = res { - error!("{err}"); - break None; - } - } - - // heartbeat interval ticked - _tick = interval.tick() => { - // if no heartbeat ping/pong received recently, close the connection - if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { - info!( - "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" - ); - - break None; - } - - // send heartbeat ping - let _ = session.ping(b"").await; - }, - - msg = msg_stream.next() => { - let msg = match msg { - // received message from WebSocket client - Some(Ok(msg)) => msg, - - // client WebSocket stream error - Some(Err(err)) => { - error!("{err}"); - break None; - } - - // client WebSocket stream ended - None => break None - }; - - debug!("msg: {msg:?}"); - - match msg { - Message::Text(data) => { - let event = ParsedEvent::parse(&data); - info!(event = ?event, "Recieved Event"); - let _ = channel.send(CardEvent::Updated); - // drop client's text messages - } - - Message::Binary(_) => { - // drop client's binary messages - } - - Message::Close(reason) => { - break reason; - } - - Message::Ping(bytes) => { - last_heartbeat = Instant::now(); - let _ = session.pong(&bytes).await; - } - - Message::Pong(_) => { - last_heartbeat = Instant::now(); - } - - Message::Continuation(_) => { - warn!("no support for continuation frames"); - } - - // no-op; ignore - Message::Nop => {} - }; - } - } - }; - - // attempt to close connection gracefully - let _ = session.close(reason).await; - - info!("disconnected"); + info!("connected"); + + let mut last_heartbeat = Instant::now(); + let mut interval = interval(HEARTBEAT_INTERVAL); + + let mut reciever = channel.subscribe(); + + let reason = loop { + // waits for either `msg_stream` to receive a message from the client, the broadcast channel + // to send a message, or the heartbeat interval timer to tick, yielding the value of + // whichever one is ready first + select! { + broadcast_msg = reciever.recv() => { + let msg = match broadcast_msg { + Ok(msg) => msg, + Err(error::RecvError::Closed) => break None, + Err(error::RecvError::Lagged(_)) => continue, + }; + + let res = session.text(Event::new(msg)).await; + + if let Err(err) = res { + error!("{err}"); + break None; + } + } + + // heartbeat interval ticked + _tick = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); + + break None; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + }, + + msg = msg_stream.next() => { + let msg = match msg { + // received message from WebSocket client + Some(Ok(msg)) => msg, + + // client WebSocket stream error + Some(Err(err)) => { + error!("{err}"); + break None; + } + + // client WebSocket stream ended + None => break None + }; + + debug!("msg: {msg:?}"); + + match msg { + Message::Text(data) => { + last_heartbeat = Instant::now(); + let event = ParsedEvent::parse(&data); + info!(event = ?event, "received Event"); + let _ = channel.send(CardEvent::Updated); + // drop client's text messages + } + + Message::Binary(_) => { + last_heartbeat = Instant::now(); + // drop client's binary messages + warn!("received binary message; dropping"); + } + + Message::Close(reason) => { + break reason; + } + + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Continuation(_) => { + warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; + } + } + }; + + // attempt to close connection gracefully + let _ = session.close(reason).await; + + info!("disconnected"); }