diff --git a/src/conn.rs b/src/conn.rs index 72c0d9e..78d6c4d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -118,52 +118,56 @@ impl Stream for Connection { let pin = self.get_mut(); loop { - // queue in the next message if not currently flushing - if let Err(err) = pin.start_send_next(cx) { - return Poll::Ready(Some(Err(err))); - } - - // send the message - if let Some(call) = pin.pending_flush.take() { - if pin.ws.poll_ready_unpin(cx).is_ready() { - pin.needs_flush = true; - // try another flush - continue; - } else { - pin.pending_flush = Some(call); + loop { + // queue in the next message if not currently flushing + if let Err(err) = pin.start_send_next(cx) { + return Poll::Ready(Some(Err(err))); } - } - break; - } - - // read from the ws - match ready!(pin.ws.poll_next_unpin(cx)) { - Some(Ok(WsMessage::Text(text))) => { - let ready = match serde_json::from_str::>(&text) { - Ok(msg) => { - tracing::trace!("Received {:?}", msg); - Ok(msg) - } - Err(err) => { - tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message"); - tracing::error!("Failed to deserialize WS response {}", err); - Err(err.into()) + // send the message + if let Some(call) = pin.pending_flush.take() { + if pin.ws.poll_ready_unpin(cx).is_ready() { + pin.needs_flush = true; + // try another flush + continue; + } else { + pin.pending_flush = Some(call); } - }; - Poll::Ready(Some(ready)) - } - Some(Ok(WsMessage::Close(_))) => Poll::Ready(None), - // ignore ping and pong - Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { - cx.waker().wake_by_ref(); - Poll::Pending + } + + break; } - Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), - Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))), - None => { - // ws connection closed - Poll::Ready(None) + + // read from the ws + match ready!(pin.ws.poll_next_unpin(cx)) { + Some(Ok(WsMessage::Text(text))) => { + let ready = match serde_json::from_str::>(&text) { + Ok(msg) => { + tracing::trace!("Received {:?}", msg); + Ok(msg) + } + Err(err) => { + tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message"); + tracing::error!("Failed to deserialize WS response {}", err); + // Go to the next iteration and try reading the next message + // in the hopes we can reconver and continue working. + continue; + } + }; + return Poll::Ready(Some(ready)); + } + Some(Ok(WsMessage::Close(_))) => return Poll::Ready(None), + // ignore ping and pong + Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + Some(Ok(msg)) => return Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), + Some(Err(err)) => return Poll::Ready(Some(Err(CdpError::Ws(err)))), + None => { + // ws connection closed + return Poll::Ready(None); + } } } }