From c15d927c181febd631e56915c5949059223f7008 Mon Sep 17 00:00:00 2001 From: Marcus Griep Date: Wed, 26 Jun 2024 12:29:03 -0600 Subject: [PATCH] feat: add debug spans for decoding requests Closes: #1759 --- tonic/src/codec/compression.rs | 7 ++-- tonic/src/codec/decode.rs | 73 ++++++++++++++++++++++++++++------ 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/tonic/src/codec/compression.rs b/tonic/src/codec/compression.rs index 16150b201..b62d92dce 100644 --- a/tonic/src/codec/compression.rs +++ b/tonic/src/codec/compression.rs @@ -158,8 +158,7 @@ impl CompressionEncoding { } #[allow(missing_docs)] - #[cfg(any(feature = "gzip", feature = "zstd"))] - pub(crate) fn as_str(&self) -> &'static str { + pub(crate) const fn as_str(&self) -> &'static str { match self { #[cfg(feature = "gzip")] CompressionEncoding::Gzip => "gzip", @@ -169,11 +168,11 @@ impl CompressionEncoding { } #[cfg(any(feature = "gzip", feature = "zstd"))] - pub(crate) fn into_header_value(self) -> http::HeaderValue { + pub(crate) const fn into_header_value(self) -> http::HeaderValue { http::HeaderValue::from_static(self.as_str()) } - pub(crate) fn encodings() -> &'static [Self] { + pub(crate) const fn encodings() -> &'static [Self] { &[ #[cfg(feature = "gzip")] CompressionEncoding::Gzip, diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 742660e31..3dd27f3cb 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -38,12 +38,48 @@ impl Unpin for Streaming {} #[derive(Debug, Clone)] enum State { - ReadHeader, + ReadHeader { + span: tracing::Span, + }, ReadBody { + span: tracing::Span, compression: Option, len: usize, }, - Error(Status), + Error(Box), +} + +impl State { + fn read_header() -> Self { + let span = tracing::debug_span!( + "read_header", + compression = tracing::field::Empty, + body.bytes = tracing::field::Empty, + ); + Self::ReadHeader { span } + } + + fn read_body(compression: Option, len: usize) -> Self { + let span = tracing::debug_span!( + "read_body", + compression = compression.map(|c| c.as_str()), + compressed.bytes = compression.is_some().then_some(len), + uncompressed.bytes = compression.is_none().then_some(len), + ); + Self::ReadBody { + span, + compression, + len, + } + } + + fn span(&self) -> Option<&tracing::Span> { + match self { + Self::ReadHeader { span } => Some(span), + Self::ReadBody { span, .. } => Some(span), + Self::Error(_) => None, + } + } } #[derive(Debug, PartialEq, Eq)] @@ -125,7 +161,7 @@ impl Streaming { .map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), - state: State::ReadHeader, + state: State::read_header(), direction, buf: BytesMut::with_capacity(buffer_size), trailers: None, @@ -142,7 +178,8 @@ impl StreamingInner { &mut self, buffer_settings: BufferSettings, ) -> Result>, Status> { - if let State::ReadHeader = self.state { + if let State::ReadHeader { span } = &self.state { + let _guard = span.enter(); if self.buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -151,7 +188,8 @@ impl StreamingInner { 0 => None, 1 => { { - if self.encoding.is_some() { + if let Some(ce) = self.encoding { + span.record("compression", ce.as_str()); self.encoding } else { // https://grpc.github.io/grpc/core/md_doc_compression.html @@ -177,6 +215,7 @@ impl StreamingInner { }; let len = self.buf.get_u32() as usize; + span.record("body.bytes", len); let limit = self .max_message_size .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); @@ -191,14 +230,19 @@ impl StreamingInner { } self.buf.reserve(len); + drop(_guard); - self.state = State::ReadBody { - compression: compression_encoding, - len, - } + self.state = State::read_body(compression_encoding, len) } - if let State::ReadBody { len, compression } = self.state { + if let State::ReadBody { + len, + span, + compression, + } = &self.state + { + let (len, compression) = (*len, *compression); + let _guard = span.enter(); // if we haven't read enough of the message then return and keep // reading if self.buf.remaining() < len || self.buf.len() < len { @@ -228,6 +272,7 @@ impl StreamingInner { return Err(Status::new(Code::Internal, message)); } let decompressed_len = self.decompress_buf.len(); + span.record("uncompressed.bytes", decompressed_len); DecodeBuf::new(&mut self.decompress_buf, decompressed_len) } else { DecodeBuf::new(&mut self.buf, len) @@ -241,6 +286,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { + let _guard = self.state.span().map(|s| s.enter()); let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { @@ -248,7 +294,8 @@ impl StreamingInner { return Poll::Ready(Ok(None)); } - let _ = std::mem::replace(&mut self.state, State::Error(status.clone())); + drop(_guard); + let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone()))); debug!("decoder inner stream error: {:?}", status); return Poll::Ready(Err(status)); } @@ -378,7 +425,7 @@ impl Streaming { match self.inner.decode_chunk(self.decoder.buffer_settings())? { Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? { Some(msg) => { - self.inner.state = State::ReadHeader; + self.inner.state = State::read_header(); Ok(Some(msg)) } None => Ok(None), @@ -394,7 +441,7 @@ impl Stream for Streaming { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let State::Error(status) = &self.inner.state { - return Poll::Ready(Some(Err(status.clone()))); + return Poll::Ready(Some(Err(*status.clone()))); } if let Some(item) = self.decode_chunk()? {