Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http/retry): model PeekTrailersBody<B> with Frame<T> #3559

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions linkerd/http/retry/src/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Compatibility utilities for upgrading to http-body 1.0.

use http_body::Body;

pub(crate) use self::frame::Frame;

mod frame;

#[derive(Debug)]
pub(crate) struct ForwardCompatibleBody<B> {
inner: B,
data_finished: bool,
trailers_finished: bool,
}

// === impl ForwardCompatibleBody ===

impl<B: Body> ForwardCompatibleBody<B> {
pub(crate) fn new(body: B) -> Self {
if body.is_end_stream() {
Self {
inner: body,
data_finished: true,
trailers_finished: true,
}
} else {
Self {
inner: body,
data_finished: false,
trailers_finished: false,
}
}
}

pub(crate) fn into_inner(self) -> B {
self.inner
}

/// Returns a future that resolves to the next frame.
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
combinators::Frame(self)
}
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
}

/// Future that resolves to the next frame from a `Body`.
///
/// NB: This is a vendored stand-in for [`Frame<'a, T>`][frame], and and can be replaced once
/// we upgrade from http-body 0.4 to 1.0. This file was vendored, and subsequently adapted to this
/// project, at commit 86fdf00.
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
///
/// See linkerd/linkerd2#8733 for more information.
///
/// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html
mod combinators {
use core::future::Future;
use core::pin::Pin;
use core::task;
use http_body::Body;
use std::ops::Not;
use std::task::ready;

use super::ForwardCompatibleBody;

#[must_use = "futures don't do anything unless polled"]
#[derive(Debug)]
/// Future that resolves to the next frame from a [`Body`].
pub struct Frame<'a, T>(pub(super) &'a mut super::ForwardCompatibleBody<T>);

impl<T: Body + Unpin> Future for Frame<'_, T> {
type Output = Option<Result<super::Frame<T::Data>, T::Error>>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let Self(ForwardCompatibleBody {
inner,
data_finished,
trailers_finished,
}) = self.get_mut();
let mut pinned = Pin::new(inner);

// We have already yielded the trailers, the body is done.
if *trailers_finished {
return task::Poll::Ready(None);
}

// We are still yielding data frames.
if data_finished.not() {
match ready!(pinned.as_mut().poll_data(ctx)) {
Some(Ok(data)) => {
// We yielded a frame.
return task::Poll::Ready(Some(Ok(super::Frame::data(data))));
}
Some(Err(error)) => {
// If we encountered an error, we are finished.
*data_finished = true;
*trailers_finished = true;
return task::Poll::Ready(Some(Err(error)));
}
None => {
// We are done yielding data frames. Mark the corresponding flag, and fall
// through to poll the trailers...
*data_finished = true;
}
};
}

// We have yielded all of the data frames but have not yielded the trailers.
let trailers = ready!(pinned.poll_trailers(ctx));
*trailers_finished = true;
let trailers = trailers
.transpose()
.map(|res| res.map(super::Frame::trailers));
task::Poll::Ready(trailers)
}
}
}
131 changes: 131 additions & 0 deletions linkerd/http/retry/src/compat/frame.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#![allow(unused, reason = "this code is vendored from `http-body v1.0.1")]

//! A frame of any kind related to an HTTP stream (body).
//!
//! NB: This is a vendored stand-in for [`Frame<T>`][frame], and and can be replaced once
//! we upgrade from http-body 0.4 to 1.0. This file was vendored at commit 86fdf00.
//!
//! See linkerd/linkerd2#8733 for more information.
//!
//! [frame]: https://docs.rs/http-body/1.0.1/http_body/struct.Frame.html>

use http::HeaderMap;

/// A frame of any kind related to an HTTP stream (body).
#[derive(Debug)]
pub struct Frame<T> {
kind: Kind<T>,
}

#[derive(Debug)]
enum Kind<T> {
// The first two variants are "inlined" since they are undoubtedly
// the most common. This saves us from having to allocate a
// boxed trait object for them.
Data(T),
Trailers(HeaderMap),
//Unknown(Box<dyn Frameish>),
}

impl<T> Frame<T> {
/// Create a DATA frame with the provided `Buf`.
pub fn data(buf: T) -> Self {
Self {
kind: Kind::Data(buf),
}
}

/// Create a trailers frame.
pub fn trailers(map: HeaderMap) -> Self {
Self {
kind: Kind::Trailers(map),
}
}

/// Maps this frame's data to a different type.
pub fn map_data<F, D>(self, f: F) -> Frame<D>
where
F: FnOnce(T) -> D,
{
match self.kind {
Kind::Data(data) => Frame {
kind: Kind::Data(f(data)),
},
Kind::Trailers(trailers) => Frame {
kind: Kind::Trailers(trailers),
},
}
}

/// Returns whether this is a DATA frame.
pub fn is_data(&self) -> bool {
matches!(self.kind, Kind::Data(..))
}

/// Consumes self into the buf of the DATA frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a DATA frame.
/// `Frame::is_data` can also be used to determine if the frame is a DATA frame.
pub fn into_data(self) -> Result<T, Self> {
match self.kind {
Kind::Data(data) => Ok(data),
_ => Err(self),
}
}

/// If this is a DATA frame, returns a reference to it.
///
/// Returns `None` if not a DATA frame.
pub fn data_ref(&self) -> Option<&T> {
match self.kind {
Kind::Data(ref data) => Some(data),
_ => None,
}
}

/// If this is a DATA frame, returns a mutable reference to it.
///
/// Returns `None` if not a DATA frame.
pub fn data_mut(&mut self) -> Option<&mut T> {
match self.kind {
Kind::Data(ref mut data) => Some(data),
_ => None,
}
}

/// Returns whether this is a trailers frame.
pub fn is_trailers(&self) -> bool {
matches!(self.kind, Kind::Trailers(..))
}

/// Consumes self into the buf of the trailers frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a trailers frame.
/// `Frame::is_trailers` can also be used to determine if the frame is a trailers frame.
pub fn into_trailers(self) -> Result<HeaderMap, Self> {
match self.kind {
Kind::Trailers(trailers) => Ok(trailers),
_ => Err(self),
}
}

/// If this is a trailers frame, returns a reference to it.
///
/// Returns `None` if not a trailers frame.
pub fn trailers_ref(&self) -> Option<&HeaderMap> {
match self.kind {
Kind::Trailers(ref trailers) => Some(trailers),
_ => None,
}
}

/// If this is a trailers frame, returns a mutable reference to it.
///
/// Returns `None` if not a trailers frame.
pub fn trailers_mut(&mut self) -> Option<&mut HeaderMap> {
match self.kind {
Kind::Trailers(ref mut trailers) => Some(trailers),
_ => None,
}
}
}
2 changes: 2 additions & 0 deletions linkerd/http/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub mod peek_trailers;
pub mod replay;

mod compat;

pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody};
pub use tower::retry::budget::Budget;

Expand Down
Loading
Loading