diff --git a/src/conn.rs b/src/conn.rs index 46d04b5d..72c0d9ea 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -3,6 +3,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::ready; +use async_tungstenite::tungstenite::Message as WsMessage; use async_tungstenite::{tungstenite::protocol::WebSocketConfig, WebSocketStream}; use futures::stream::Stream; use futures::task::{Context, Poll}; @@ -132,21 +133,33 @@ impl Stream for Connection { pin.pending_flush = Some(call); } } + break; } // read from the ws match ready!(pin.ws.poll_next_unpin(cx)) { - Some(Ok(msg)) => match serde_json::from_slice::>(&msg.into_data()) { - Ok(msg) => { - tracing::trace!("Received {:?}", msg); - Poll::Ready(Some(Ok(msg))) - } - Err(err) => { - tracing::error!("Failed to deserialize WS response {}", err); - Poll::Ready(Some(Err(err.into()))) - } - }, + Some(Ok(WsMessage::Text(text))) => { + let ready = match serde_json::from_str::>(&text) { + Ok(msg) => { + tracing::trace!("Received {:?}", msg); + Ok(msg) + } + Err(err) => { + tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message"); + tracing::error!("Failed to deserialize WS response {}", err); + Err(err.into()) + } + }; + Poll::Ready(Some(ready)) + } + Some(Ok(WsMessage::Close(_))) => Poll::Ready(None), + // ignore ping and pong + Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))), None => { // ws connection closed diff --git a/src/error.rs b/src/error.rs index 99e45d4c..ce75bab4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,7 @@ use std::process::ExitStatus; use std::time::Instant; use async_tungstenite::tungstenite; +use async_tungstenite::tungstenite::Message; use base64::DecodeError; use futures::channel::mpsc::SendError; use futures::channel::oneshot::Canceled; @@ -28,6 +29,8 @@ pub enum CdpError { Chrome(#[from] chromiumoxide_types::Error), #[error("Received no response from the chromium instance.")] NoResponse, + #[error("Received unexpected ws message: {0:?}")] + UnexpectedWsMessage(Message), #[error("{0}")] ChannelSendError(#[from] ChannelError), #[error("Browser process exited with status {0:?} before websocket URL could be resolved, stderr: {1:?}")]