From 370f1817e7e90af663355089d8afb8b6debcbc14 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski <5281987+RaduW@users.noreply.github.com> Date: Fri, 6 Aug 2021 11:01:27 +0200 Subject: [PATCH] ref(server): Introduce trait for upstream requests (#1037) Introduce a better way to handle various Upstream Requests (regardless of if they go to Relay Authenticated endpoints or not). Generally cleans up the handling of futures that end up generating http requests to the downstream server in preparation for removing long running futures. --- relay-server/src/actors/envelopes.rs | 127 +++-- relay-server/src/actors/upstream.rs | 514 +++++++++---------- relay-server/src/endpoints/forward.rs | 163 ++++-- relay-server/src/extractors/forwarded_for.rs | 1 + relay-server/src/http.rs | 25 +- 5 files changed, 467 insertions(+), 363 deletions(-) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index ee061ebf87..9fd1cc8c3b 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; @@ -6,13 +7,14 @@ use std::time::{Duration, Instant}; use std::{fmt, net}; use actix::prelude::*; +use actix_web::http::Method; use chrono::{DateTime, Duration as SignedDuration, Utc}; use failure::Fail; -use futures::{future, prelude::*}; +use futures::{future, prelude::*, sync::oneshot}; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, ProjectId, ProjectKey, UnixTimestamp}; -use relay_config::{Config, RelayMode}; +use relay_config::{Config, HttpEncoding, RelayMode}; use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor}; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ @@ -33,10 +35,10 @@ use crate::actors::project_cache::{ CheckEnvelope, GetProjectState, InsertMetrics, MergeBuckets, ProjectCache, ProjectError, UpdateRateLimits, }; -use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError}; +use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError}; use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; -use crate::http::{HttpError, RequestBuilder}; +use crate::http::{HttpError, Request, RequestBuilder, Response}; use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers}; use crate::service::ServerError; use crate::utils::{ @@ -1733,6 +1735,7 @@ impl Handler for EnvelopeProcessor { /// Error returned from [`EnvelopeManager::send_envelope`]. #[derive(Debug)] enum SendEnvelopeError { + #[cfg(feature = "processing")] ScheduleFailed, #[cfg(feature = "processing")] StoreFailed(StoreError), @@ -1743,6 +1746,73 @@ enum SendEnvelopeError { /// Either a captured envelope or an error that occured during processing. pub type CapturedEnvelope = Result; +#[derive(Debug)] +struct SendEnvelope { + envelope: Envelope, + scoping: Scoping, + http_encoding: HttpEncoding, + response_sender: Option>>, + project_key: ProjectKey, +} + +impl UpstreamRequest for SendEnvelope { + fn method(&self) -> Method { + Method::POST + } + + fn path(&self) -> Cow<'_, str> { + format!("/api/{}/envelope/", self.scoping.project_id).into() + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + // Override the `sent_at` timestamp. Since the envelope went through basic + // normalization, all timestamps have been corrected. We propagate the new + // `sent_at` to allow the next Relay to double-check this timestamp and + // potentially apply correction again. This is done as close to sending as + // possible so that we avoid internal delays. + self.envelope.set_sent_at(Utc::now()); + + let meta = self.envelope.meta(); + + builder + .content_encoding(self.http_encoding) + .header_opt("Origin", meta.origin().map(|url| url.as_str())) + .header_opt("User-Agent", meta.user_agent()) + .header("X-Sentry-Auth", meta.auth_header()) + .header("X-Forwarded-For", meta.forwarded_for()) + .header("Content-Type", envelope::CONTENT_TYPE); + + let body = self.envelope.to_vec().map_err(HttpError::custom)?; + builder.body(body) + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.response_sender.take(); + + match result { + Ok(response) => { + let future = response + .consume() + .map_err(UpstreamRequestError::Http) + .map(|_| ()) + .then(move |body_result| { + sender.map(|sender| sender.send(body_result).ok()); + Ok(()) + }); + + Box::new(future) + } + Err(error) => { + sender.map(|sender| sender.send(Err(error))); + Box::new(future::err(())) + } + } + } +} + pub struct EnvelopeManager { config: Arc, active_envelopes: u32, @@ -1779,7 +1849,7 @@ impl EnvelopeManager { fn send_envelope( &mut self, project_key: ProjectKey, - mut envelope: Envelope, + envelope: Envelope, scoping: Scoping, #[allow(unused_variables)] start_time: Instant, ) -> ResponseFuture<(), SendEnvelopeError> { @@ -1815,43 +1885,19 @@ impl EnvelopeManager { } relay_log::trace!("sending envelope to sentry endpoint"); - let http_encoding = self.config.http_encoding(); - let request = SendRequest::post(format!("/api/{}/envelope/", scoping.project_id)).build( - move |mut builder: RequestBuilder| { - // Override the `sent_at` timestamp. Since the envelope went through basic - // normalization, all timestamps have been corrected. We propagate the new - // `sent_at` to allow the next Relay to double-check this timestamp and - // potentially apply correction again. This is done as close to sending as - // possible so that we avoid internal delays. - envelope.set_sent_at(Utc::now()); - - let meta = envelope.meta(); - - if let Some(origin) = meta.origin() { - builder.header("Origin", origin.as_str()); - } - - if let Some(user_agent) = meta.user_agent() { - builder.header("User-Agent", user_agent); - } - - builder - .content_encoding(http_encoding) - .header("X-Sentry-Auth", meta.auth_header()) - .header("X-Forwarded-For", meta.forwarded_for()) - .header("Content-Type", envelope::CONTENT_TYPE); - - let body = envelope - .to_vec() - .map_err(|e| UpstreamRequestError::Http(HttpError::custom(e)))?; + let (tx, rx) = oneshot::channel(); + let request = SendEnvelope { + envelope, + scoping, + http_encoding: self.config.http_encoding(), + response_sender: Some(tx), + project_key, + }; - builder.body(body).map_err(UpstreamRequestError::Http) - }, - ); + UpstreamRelay::from_registry().do_send(SendRequest(request)); - let future = UpstreamRelay::from_registry() - .send(request) - .map_err(|_| SendEnvelopeError::ScheduleFailed) + let future = rx + .map_err(|_| SendEnvelopeError::SendFailed(UpstreamRequestError::ChannelClosed)) .and_then(move |result| { if let Err(UpstreamRequestError::RateLimited(upstream_limits)) = result { let limits = upstream_limits.scope(&scoping); @@ -2173,6 +2219,7 @@ impl Handler for EnvelopeManager { let outcome = Outcome::Invalid(DiscardReason::Internal); match error { + #[cfg(feature = "processing")] SendEnvelopeError::ScheduleFailed => { envelope_context.send_outcomes(outcome); ProcessingError::ScheduleFailed diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index cb3472752b..66dfc78a2a 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -36,7 +36,7 @@ use serde::ser::Serialize; use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration}; use relay_common::{metric, tryf, RetryBackoff}; use relay_config::{Config, RelayMode}; -use relay_log::LogError; +use relay_log::{self, LogError}; use relay_quotas::{ DataCategories, QuotaScope, RateLimit, RateLimitScope, RateLimits, RetryAfter, Scoping, }; @@ -254,51 +254,48 @@ where } } -/// Upstream request objects queued inside the `Upstream` actor. +/// Request objects queued inside the [`Upstream`] actor. /// -/// The objects are transformed int HTTP requests, and sent to upstream as HTTP connections +/// The objects are transformed into HTTP requests, and sent to upstream as HTTP connections /// become available. -struct UpstreamRequest { - config: UpstreamRequestConfig, - /// One-shot channel to be notified when the request is done. - /// - /// The request is either successful or it has failed but we are not going to retry it. - response_sender: oneshot::Sender>, - /// Http method. - method: Method, - /// Request URL. - path: String, - /// Request build function. - build: Box, +struct EnqueuedRequest { + /// The request's trait object to create and handle the HTTP request. + request: Box, /// Number of times this request was already sent previous_retries: u32, - /// When the last sending attempt started - send_start: Option, } -impl UpstreamRequest { - pub fn route_name(&self) -> &'static str { - if self.path.contains("/outcomes/") { + +impl EnqueuedRequest { + fn new(request: impl UpstreamRequest + 'static) -> Self { + Self { + request: Box::new(request), + previous_retries: 0, + } + } + + fn route_name(&self) -> &'static str { + if self.request.path().contains("/outcomes/") { "outcomes" - } else if self.path.contains("/envelope/") { + } else if self.request.path().contains("/envelope/") { "envelope" - } else if self.path.contains("/projectids/") { + } else if self.request.path().contains("/projectids/") { "project_ids" - } else if self.path.contains("/projectconfigs/") { + } else if self.request.path().contains("/projectconfigs/") { "project_configs" - } else if self.path.contains("/publickeys/") { + } else if self.request.path().contains("/publickeys/") { "public_keys" - } else if self.path.contains("/challenge/") { + } else if self.request.path().contains("/challenge/") { "challenge" - } else if self.path.contains("/response/") { + } else if self.request.path().contains("/response/") { "response" - } else if self.path.contains("/live/") { + } else if self.request.path().contains("/live/") { "check_live" } else { "unknown" } } - pub fn retries_bucket(&self) -> &'static str { + fn retries_bucket(&self) -> &'static str { match self.previous_retries { 0 => "0", 1 => "1", @@ -378,8 +375,8 @@ pub struct UpstreamRelay { first_error: Option, max_inflight_requests: usize, num_inflight_requests: usize, - high_prio_requests: VecDeque, - low_prio_requests: VecDeque, + high_prio_requests: VecDeque, + low_prio_requests: VecDeque, config: Arc, reqwest_client: reqwest::Client, /// "reqwest runtime" as this tokio runtime is currently only spawned such that reqwest can @@ -500,13 +497,17 @@ impl UpstreamRelay { self.outage_backoff.reset(); } - fn upstream_connection_check(&mut self, ctx: &mut Context) { + fn schedule_connection_check(&mut self, ctx: &mut Context) { let next_backoff = self.outage_backoff.next_backoff(); relay_log::warn!( "Network outage, scheduling another check in {:?}", next_backoff ); - ctx.notify_later(CheckUpstreamConnection, next_backoff); + + ctx.run_later(next_backoff, |slf, ctx| { + let request = EnqueuedRequest::new(GetHealthcheck); + slf.enqueue(request, ctx, EnqueuePosition::Front); + }); } /// Records an occurrence of a network error. @@ -523,37 +524,44 @@ impl UpstreamRelay { } if !self.outage_backoff.started() { - self.upstream_connection_check(ctx); + self.schedule_connection_check(ctx); } } - fn send_request(&mut self, mut request: UpstreamRequest, ctx: &mut Context) { + fn send_request(&mut self, mut request: EnqueuedRequest, ctx: &mut Context) { let uri = self .config .upstream_descriptor() - .get_url(request.path.as_ref()); + .get_url(request.request.path().as_ref()); let host_header = self .config .http_host_header() .unwrap_or_else(|| self.config.upstream_descriptor().host()); - let method = reqwest::Method::from_bytes(request.method.as_ref().as_bytes()).unwrap(); + let method = + reqwest::Method::from_bytes(request.request.method().as_ref().as_bytes()).unwrap(); + let builder = self.reqwest_client.request(method, uri); let mut builder = RequestBuilder::reqwest(builder); builder.header("Host", host_header.as_bytes()); - if request.config.set_relay_id { + if request.request.set_relay_id() { if let Some(credentials) = self.config.credentials() { - builder.header("X-Sentry-Relay-Id", credentials.id.to_string().as_bytes()); + builder.header("X-Sentry-Relay-Id", credentials.id.to_string()); } } //try to build a ClientRequest - let client_request = match request.build.build_request(builder) { + let client_request = match request.request.build(builder) { Err(e) => { - request.response_sender.send(Err(e)).ok(); + request + .request + .respond(Err(UpstreamRequestError::Http(e))) + .into_actor(self) + .spawn(ctx); + return; } Ok(client_request) => client_request, @@ -562,13 +570,13 @@ impl UpstreamRelay { // we are about to send a HTTP message keep track of requests in flight self.num_inflight_requests += 1; - let intercept_status_errors = request.config.intercept_status_errors; - - request.send_start = Some(Instant::now()); - + let intercept_status_errors = request.request.intercept_status_errors(); + let send_start = Instant::now(); let client = self.reqwest_client.clone(); + let max_response_size = self.config.max_api_payload_size(); let (tx, rx) = oneshot::channel(); + self.reqwest_runtime.spawn(async move { let res = client .execute(client_request.0) @@ -582,16 +590,14 @@ impl UpstreamRelay { .flatten() .map(Response); - let max_response_size = self.config.max_api_payload_size(); - future .track(ctx.address().recipient()) .and_then(move |response| { handle_response(response, intercept_status_errors, max_response_size) }) .into_actor(self) - .then(|send_result, slf, ctx| { - slf.handle_http_response(request, send_result, ctx); + .then(move |send_result, slf, ctx| { + slf.handle_http_response(send_start, request, send_result, ctx); fut::ok(()) }) .spawn(ctx); @@ -599,7 +605,8 @@ impl UpstreamRelay { /// Adds a metric for the upstream request. fn meter_result( - request: &UpstreamRequest, + send_start: Instant, + request: &EnqueuedRequest, send_result: &Result, ) { let sc; @@ -635,15 +642,13 @@ impl UpstreamRelay { } }; - if let Some(send_start) = request.send_start { - metric!( - timer(RelayTimers::UpstreamRequestsDuration) = send_start.elapsed(), - result = result, - status_code = status_code, - route = request.route_name(), - retries = request.retries_bucket(), - ) - } + metric!( + timer(RelayTimers::UpstreamRequestsDuration) = send_start.elapsed(), + result = result, + status_code = status_code, + route = request.route_name(), + retries = request.retries_bucket(), + ); metric!( histogram(RelayHistograms::UpstreamRetries) = request.previous_retries.into(), @@ -661,15 +666,16 @@ impl UpstreamRelay { /// 4. Otherwise, ensure an authentication request is scheduled. fn handle_http_response( &mut self, - mut request: UpstreamRequest, + send_start: Instant, + mut request: EnqueuedRequest, send_result: Result, ctx: &mut Context, ) { - UpstreamRelay::meter_result(&request, &send_result); + UpstreamRelay::meter_result(send_start, &request, &send_result); if matches!(send_result, Err(ref err) if err.is_network_error()) { self.handle_network_error(ctx); - if request.config.retry { + if request.request.retry() { request.previous_retries += 1; return self.enqueue(request, ctx, EnqueuePosition::Back); } @@ -679,18 +685,22 @@ impl UpstreamRelay { self.reset_network_error(); } - request.response_sender.send(send_result).ok(); + request + .request + .respond(send_result) + .into_actor(self) + .spawn(ctx); } /// Enqueues a request and ensures that the message queue advances. fn enqueue( &mut self, - request: UpstreamRequest, + request: EnqueuedRequest, ctx: &mut Context, position: EnqueuePosition, ) { - let name = request.config.priority.name(); - let queue = match request.config.priority { + let name = request.request.priority().name(); + let queue = match request.request.priority() { // Immediate is special and bypasses the queue. Directly send the request and return // the response channel rather than waiting for `PumpHttpMessageQueue`. RequestPriority::Immediate => return self.send_request(request, ctx), @@ -711,81 +721,31 @@ impl UpstreamRelay { ctx.notify(PumpHttpMessageQueue); } - fn enqueue_request( - &mut self, - config: UpstreamRequestConfig, - method: Method, - path: P, - build: F, - ctx: &mut Context, - ) -> ResponseFuture - where - F: RequestBuilderTransformer, - P: AsRef, - { - let (tx, rx) = oneshot::channel::>(); - - let request = UpstreamRequest { - config, - method, - path: path.as_ref().to_owned(), - response_sender: tx, - build: Box::new(build), - previous_retries: 0, - send_start: None, - }; - - self.enqueue(request, ctx, EnqueuePosition::Front); - - let future = rx - // map errors caused by the oneshot channel being closed (unlikely) - .map_err(|_| UpstreamRequestError::ChannelClosed) - // unwrap the result (this is how we transport the http failure through the channel) - .and_then(|result| result); - - Box::new(future) - } - - fn enqueue_query( + fn enqueue_query( &mut self, query: Q, ctx: &mut Context, ) -> ResponseFuture { - let method = query.method(); - let path = query.path(); - let config = UpstreamRequestConfig { - retry: Q::retry(), - priority: Q::priority(), - intercept_status_errors: true, - set_relay_id: true, - }; - let credentials = tryf!(self .config .credentials() .ok_or(UpstreamRequestError::NoCredentials)); - let (json, signature) = credentials.secret_key.pack(query); - let json = Arc::new(json); + let (json, signature) = credentials.secret_key.pack(&query); + let (tx, rx) = oneshot::channel(); - let max_response_size = self.config.max_api_payload_size(); + let request = EnqueuedRequest::new(UpstreamQueryRequest { + query, + body: json, + signature, + max_response_size: self.config.max_api_payload_size(), + sender: Some(tx), + }); - let future = self - .enqueue_request( - config, - method, - path, - move |mut builder: RequestBuilder| { - builder.header("X-Sentry-Relay-Signature", signature.as_str().as_bytes()); - builder.header(header::CONTENT_TYPE, b"application/json"); - builder.body(&*json).map_err(UpstreamRequestError::Http) - }, - ctx, - ) - .and_then(move |r| { - r.json(max_response_size) - .map_err(UpstreamRequestError::Http) - }); + self.enqueue(request, ctx, EnqueuePosition::Front); + let future = rx + .map_err(|_| UpstreamRequestError::ChannelClosed) + .flatten(); Box::new(future) } @@ -989,47 +949,72 @@ impl Handler for UpstreamRelay { } } -/// Checks the status of the network connection with the upstream server -struct CheckUpstreamConnection; +struct ScheduleConnectionCheck; -impl Message for CheckUpstreamConnection { +impl Message for ScheduleConnectionCheck { type Result = (); } -impl Handler for UpstreamRelay { +impl Handler for UpstreamRelay { type Result = (); - fn handle(&mut self, _msg: CheckUpstreamConnection, ctx: &mut Self::Context) -> Self::Result { - self.enqueue_request( - UpstreamRequestConfig { - priority: RequestPriority::Immediate, - retry: false, - intercept_status_errors: true, - set_relay_id: true, - }, - Method::GET, - "/api/0/relays/live/", - |builder: RequestBuilder| builder.finish().map_err(UpstreamRequestError::Http), - ctx, - ) - .and_then(|client_response| { - // consume response bodies to ensure the connection remains usable. - client_response - .consume() - .map_err(UpstreamRequestError::Http) - }) - .into_actor(self) - .then(|result, slf, ctx| { + fn handle(&mut self, _msg: ScheduleConnectionCheck, ctx: &mut Self::Context) -> Self::Result { + self.schedule_connection_check(ctx); + } +} + +/// Checks the status of the network connection with the upstream server +struct GetHealthcheck; + +impl UpstreamRequest for GetHealthcheck { + fn method(&self) -> Method { + Method::GET + } + + fn path(&self) -> Cow<'_, str> { + Cow::Borrowed("/api/0/relays/live/") + } + + fn retry(&self) -> bool { + false + } + + fn priority(&self) -> RequestPriority { + RequestPriority::Immediate + } + + fn set_relay_id(&self) -> bool { + true + } + + fn intercept_status_errors(&self) -> bool { + true + } + + fn build(&mut self, builder: RequestBuilder) -> Result { + builder.finish() + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let future: ResponseFuture<_, _> = match result { + Ok(response) => Box::new(response.consume().map_err(UpstreamRequestError::Http)), + Err(err) => Box::new(future::err(err)), + }; + + Box::new(future.then(|result| { if matches!(result, Err(err) if err.is_network_error()) { // still network error, schedule another attempt - slf.upstream_connection_check(ctx); + UpstreamRelay::from_registry().do_send(ScheduleConnectionCheck); } else { // resume normal messages - ctx.notify(PumpHttpMessageQueue); + UpstreamRelay::from_registry().do_send(PumpHttpMessageQueue); } - fut::ok(()) - }) - .spawn(ctx); + + Ok(()) + })) } } @@ -1064,137 +1049,64 @@ where } } -pub struct SendRequest { - method: Method, - path: String, - builder: B, - transformer: R, - config: UpstreamRequestConfig, -} - -struct UpstreamRequestConfig { - /// Queueing priority for the request. - priority: RequestPriority, - /// Should the request be retried in case of network error. - retry: bool, - /// Should 429s be honored within the upstream. - intercept_status_errors: bool, - /// Should the x-sentry-relay-id header be added. - set_relay_id: bool, -} +/// Represents an HTTP request to be sent by the Upstream actor. +pub trait UpstreamRequest: Send { + /// The HTTP method of the request. + fn method(&self) -> Method; -impl SendRequest { - pub fn new>(method: Method, path: S) -> Self { - SendRequest { - method, - path: path.into(), - builder: (), - transformer: (), - config: UpstreamRequestConfig { - priority: RequestPriority::Low, - retry: true, - intercept_status_errors: true, - set_relay_id: true, - }, - } - } + /// The path relative to the upstream. + fn path(&self) -> Cow<'_, str>; - pub fn post>(path: S) -> Self { - Self::new(Method::POST, path) + /// Whether this request should retry on network errors. + fn retry(&self) -> bool { + true } -} -impl SendRequest -where - B: RequestBuilderTransformer, - T: ResponseTransformer, -{ - pub fn build(self, builder: F) -> SendRequest - where - F: RequestBuilderTransformer, - { - SendRequest { - method: self.method, - path: self.path, - builder, - transformer: self.transformer, - config: self.config, - } + /// The queueing priority of the request. Defaults to `Low`. + fn priority(&self) -> RequestPriority { + RequestPriority::Low } - #[inline] - pub fn retry(mut self, should_retry: bool) -> Self { - self.config.retry = should_retry; - self + /// True if normal error processing should occur, false if + /// errors from the upstream should not be processed and returned as is + /// in the response. + fn intercept_status_errors(&self) -> bool { + true } - #[inline] - pub fn intercept_status_errors(mut self, should_intercept_status_errors: bool) -> Self { - self.config.intercept_status_errors = should_intercept_status_errors; - self + /// If set to True it will add the X-Sentry-Relay-Id header to the request + /// + /// This should be done (only) for calls to endpoints that use Relay authentication. + fn set_relay_id(&self) -> bool { + true } - #[inline] - pub fn set_relay_id(mut self, should_set_relay_id: bool) -> Self { - self.config.set_relay_id = should_set_relay_id; - self - } + /// Called whenever the request will be send over HTTP (possible multiple times) + fn build(&mut self, builder: RequestBuilder) -> Result; - #[allow(dead_code)] - pub fn transform(self, callback: F) -> SendRequest - where - F: ResponseTransformer, - { - SendRequest { - method: self.method, - path: self.path, - builder: self.builder, - transformer: callback, - config: self.config, - } - } + /// Called when the HTTP request completes, either with success or an error that will not + /// be retried. + fn respond(&mut self, result: Result) + -> ResponseFuture<(), ()>; } -impl Message for SendRequest +pub struct SendRequest(pub T); + +impl Message for SendRequest where - B: RequestBuilderTransformer, - R: ResponseTransformer, + T: UpstreamRequest, { - type Result = Result<::Item, ::Error>; + type Result = (); } -// impl Message for SendRequest { -// type Result = Result<(), UpstreamRequestError>; -// } - -/// SendRequest messages represent external messages that need to be sent to the upstream server -/// and do not use Relay authentication. -/// -/// The handler adds the message to one of the message queues. -impl Handler> for UpstreamRelay +impl Handler> for UpstreamRelay where - B: RequestBuilderTransformer, - R: ResponseTransformer, - ::Item: Send + 'static, - ::Error: From + Send + 'static, + T: UpstreamRequest + 'static, { - type Result = ResponseFuture<::Item, ::Error>; - - fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { - let SendRequest { - method, - path, - builder, - transformer, - config, - } = message; - - let future = self - .enqueue_request(config, method, path, builder, ctx) - .from_err() - .and_then(move |r| transformer.transform_response(r)); + type Result = (); - Box::new(future) + fn handle(&mut self, msg: SendRequest, ctx: &mut Self::Context) -> Self::Result { + self.enqueue(EnqueuedRequest::new(msg.0), ctx, EnqueuePosition::Front); } } @@ -1223,7 +1135,7 @@ impl Handler for UpstreamRelay { } } -pub trait UpstreamQuery: Serialize { +pub trait UpstreamQuery: Serialize + Send + 'static { type Response: DeserializeOwned + 'static + Send; /// The HTTP method of the request. @@ -1247,6 +1159,66 @@ impl Message for SendQuery { type Result = Result; } +struct UpstreamQueryRequest { + query: T, + body: Vec, + signature: String, + max_response_size: usize, + sender: Option>>, +} + +impl UpstreamRequest for UpstreamQueryRequest { + fn method(&self) -> Method { + self.query.method() + } + + fn path(&self) -> Cow<'_, str> { + self.query.path() + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + builder.header( + "X-Sentry-Relay-Signature", + self.signature.as_str().as_bytes(), + ); + builder.header(header::CONTENT_TYPE, b"application/json"); + builder.body(&self.body) + } + + fn retry(&self) -> bool { + T::retry() + } + + fn priority(&self) -> RequestPriority { + T::priority() + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.sender.take(); + + match result { + Ok(response) => { + let future = response + .json(self.max_response_size) + .map_err(UpstreamRequestError::Http) + .then(|result| { + sender.map(|sender| sender.send(result)); + Ok(()) + }); + + Box::new(future) + } + Err(error) => { + sender.map(|sender| sender.send(Err(error))); + Box::new(future::err(())) + } + } + } +} + /// SendQuery messages represent messages that need to be sent to the upstream server /// and use Relay authentication. /// diff --git a/relay-server/src/endpoints/forward.rs b/relay-server/src/endpoints/forward.rs index c4d8139274..beb6a6c632 100644 --- a/relay-server/src/endpoints/forward.rs +++ b/relay-server/src/endpoints/forward.rs @@ -3,19 +3,26 @@ //! This endpoint will issue a client request to the upstream and append relay's own headers //! (`X-Forwarded-For` and `Sentry-Relay-Id`). The response is then streamed back to the origin. +use std::borrow::Cow; +use std::fmt; + use ::actix::prelude::*; use actix_web::error::ResponseError; +use actix_web::http::header::HeaderValue; use actix_web::http::{header, header::HeaderName, uri::PathAndQuery, StatusCode}; +use actix_web::http::{HeaderMap, Method}; use actix_web::{AsyncResponder, Error, HttpMessage, HttpRequest, HttpResponse}; +use bytes::Bytes; use failure::Fail; -use futures::prelude::*; +use futures::{future, prelude::*, sync::oneshot}; + use lazy_static::lazy_static; use relay_common::GlobMatcher; use relay_config::Config; use relay_log::LogError; -use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError}; +use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError}; use crate::body::ForwardBody; use crate::endpoints::statics; use crate::extractors::ForwardedFor; @@ -119,6 +126,97 @@ fn get_limit_for_path(path: &str, config: &Config) -> usize { } } +type Headers = Vec<(String, Vec)>; +type ForwardResponse = (StatusCode, Headers, Vec); + +struct ForwardRequest { + method: Method, + path: String, + headers: HeaderMap, + forwarded_for: ForwardedFor, + data: Bytes, + max_response_size: usize, + sender: Option>>, +} + +impl fmt::Debug for ForwardRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ForwardRequest") + .field("method", &self.method) + .field("path", &self.path) + .finish() + } +} + +impl UpstreamRequest for ForwardRequest { + fn method(&self) -> Method { + self.method.clone() + } + + fn path(&self) -> Cow<'_, str> { + self.path.as_str().into() + } + + fn retry(&self) -> bool { + false + } + + fn intercept_status_errors(&self) -> bool { + false + } + + fn set_relay_id(&self) -> bool { + false + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + for (key, value) in &self.headers { + // Since there is no API in actix-web to access the raw, not-yet-decompressed stream, we + // must not forward the content-encoding header, as the actix http client will do its own + // content encoding. Also remove content-length because it's likely wrong. + if HOP_BY_HOP_HEADERS.iter().any(|x| x == key) + || IGNORED_REQUEST_HEADERS.iter().any(|x| x == key) + { + continue; + } + + builder.header(key, value); + } + + builder.header("X-Forwarded-For", self.forwarded_for.as_ref()); + builder.body(&self.data) + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.sender.take(); + + match result { + Ok(response) => { + let status = StatusCode::from_u16(response.status().as_u16()).unwrap(); + let headers = response.clone_headers(); + + let future = response + .bytes(self.max_response_size) + .and_then(move |body| Ok((status, headers, body))) + .map_err(UpstreamRequestError::Http) + .then(|result| { + sender.map(|sender| sender.send(result)); + Ok(()) + }); + + Box::new(future) + } + Err(e) => { + sender.map(|sender| sender.send(Err(e))); + Box::new(future::err(())) + } + } + } +} + /// Implementation of the forward endpoint. /// /// This endpoint will create a proxy request to the upstream for every incoming request and stream @@ -145,49 +243,26 @@ pub fn forward_upstream( ForwardBody::new(request, limit) .map_err(Error::from) .and_then(move |data| { - let forward_request = SendRequest::new(method, path_and_query) - .retry(false) - .intercept_status_errors(false) - .set_relay_id(false) - .build(move |mut builder: RequestBuilder| { - for (key, value) in &headers { - // Since there is no API in actix-web to access the raw, not-yet-decompressed stream, we - // must not forward the content-encoding header, as the actix http client will do its own - // content encoding. Also remove content-length because it's likely wrong. - if HOP_BY_HOP_HEADERS.iter().any(|x| x == key) - || IGNORED_REQUEST_HEADERS.iter().any(|x| x == key) - { - continue; - } - - builder.header(key, value); - } - - builder.header("X-Forwarded-For", forwarded_for.as_ref()); - builder.body(&data).map_err(UpstreamRequestError::Http) - }) - .transform(move |response: Response| { - let status = response.status(); - let headers = response.clone_headers(); - response - .bytes(max_response_size) - .and_then(move |body| Ok((status, headers, body))) - .map_err(UpstreamRequestError::Http) - }); - - UpstreamRelay::from_registry() - .send(forward_request) - .map_err(|_| { - Error::from(ForwardedUpstreamRequestError( - UpstreamRequestError::ChannelClosed, - )) - }) - }) - .and_then(move |result: Result<_, UpstreamRequestError>| { - let (status, headers, body) = result.map_err(ForwardedUpstreamRequestError::from)?; - let actix_code = StatusCode::from_u16(status.as_u16()).unwrap(); - let mut forwarded_response = HttpResponse::build(actix_code); + let (tx, rx) = oneshot::channel(); + + let forward_request = ForwardRequest { + method, + path: path_and_query, + headers, + forwarded_for, + data, + max_response_size, + sender: Some(tx), + }; + UpstreamRelay::from_registry().do_send(SendRequest(forward_request)); + + rx.map_err(|_| UpstreamRequestError::ChannelClosed) + .flatten() + .map_err(|e| Error::from(ForwardedUpstreamRequestError::from(e))) + }) + .and_then(move |(status, headers, body)| { + let mut forwarded_response = HttpResponse::build(status); let mut has_content_type = false; for (key, value) in headers { diff --git a/relay-server/src/extractors/forwarded_for.rs b/relay-server/src/extractors/forwarded_for.rs index fd173d627b..9ea0756fbd 100644 --- a/relay-server/src/extractors/forwarded_for.rs +++ b/relay-server/src/extractors/forwarded_for.rs @@ -1,6 +1,7 @@ use actix_web::http::header; use actix_web::{Error, FromRequest, HttpMessage, HttpRequest}; +#[derive(Debug)] pub struct ForwardedFor(String); impl ForwardedFor { diff --git a/relay-server/src/http.rs b/relay-server/src/http.rs index fc252bbf0b..1d84ed3abf 100644 --- a/relay-server/src/http.rs +++ b/relay-server/src/http.rs @@ -103,18 +103,27 @@ impl RequestBuilder { self } + /// Add an optional header, not replacing existing ones. + /// + /// If the value is `Some`, the header is added. If the value is `None`, headers are not + /// changed. + pub fn header_opt( + &mut self, + key: impl AsRef, + value: Option>, + ) -> &mut Self { + if let Some(value) = value { + take_mut::take(&mut self.builder, |b| { + b.header(key.as_ref(), value.as_ref()) + }); + } + self + } + pub fn body(mut self, body: B) -> Result where B: AsRef<[u8]>, { - // actix-web's Binary is used as argument here because the type can be constructed from - // almost anything and then the actix-web codepath is minimally affected. - // - // Still it's not perfect as in the identity-encoding path we have some unnecessary copying - // that is just to get around type conflicts. We cannot use Bytes here because we have a - // version split between actix-web's Bytes dependency and reqwest's Bytes dependency. A - // real zero-copy abstraction over both would force us to downgrade reqwest to a version - // that uses Bytes 0.4. self.builder = match self.http_encoding { HttpEncoding::Identity => self.builder.body(body.as_ref().to_vec()), HttpEncoding::Deflate => {