diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index ee061ebf87..9fd1cc8c3b 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; @@ -6,13 +7,14 @@ use std::time::{Duration, Instant}; use std::{fmt, net}; use actix::prelude::*; +use actix_web::http::Method; use chrono::{DateTime, Duration as SignedDuration, Utc}; use failure::Fail; -use futures::{future, prelude::*}; +use futures::{future, prelude::*, sync::oneshot}; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, ProjectId, ProjectKey, UnixTimestamp}; -use relay_config::{Config, RelayMode}; +use relay_config::{Config, HttpEncoding, RelayMode}; use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor}; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ @@ -33,10 +35,10 @@ use crate::actors::project_cache::{ CheckEnvelope, GetProjectState, InsertMetrics, MergeBuckets, ProjectCache, ProjectError, UpdateRateLimits, }; -use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError}; +use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError}; use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta}; -use crate::http::{HttpError, RequestBuilder}; +use crate::http::{HttpError, Request, RequestBuilder, Response}; use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers}; use crate::service::ServerError; use crate::utils::{ @@ -1733,6 +1735,7 @@ impl Handler for EnvelopeProcessor { /// Error returned from [`EnvelopeManager::send_envelope`]. #[derive(Debug)] enum SendEnvelopeError { + #[cfg(feature = "processing")] ScheduleFailed, #[cfg(feature = "processing")] StoreFailed(StoreError), @@ -1743,6 +1746,73 @@ enum SendEnvelopeError { /// Either a captured envelope or an error that occured during processing. pub type CapturedEnvelope = Result; +#[derive(Debug)] +struct SendEnvelope { + envelope: Envelope, + scoping: Scoping, + http_encoding: HttpEncoding, + response_sender: Option>>, + project_key: ProjectKey, +} + +impl UpstreamRequest for SendEnvelope { + fn method(&self) -> Method { + Method::POST + } + + fn path(&self) -> Cow<'_, str> { + format!("/api/{}/envelope/", self.scoping.project_id).into() + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + // Override the `sent_at` timestamp. Since the envelope went through basic + // normalization, all timestamps have been corrected. We propagate the new + // `sent_at` to allow the next Relay to double-check this timestamp and + // potentially apply correction again. This is done as close to sending as + // possible so that we avoid internal delays. + self.envelope.set_sent_at(Utc::now()); + + let meta = self.envelope.meta(); + + builder + .content_encoding(self.http_encoding) + .header_opt("Origin", meta.origin().map(|url| url.as_str())) + .header_opt("User-Agent", meta.user_agent()) + .header("X-Sentry-Auth", meta.auth_header()) + .header("X-Forwarded-For", meta.forwarded_for()) + .header("Content-Type", envelope::CONTENT_TYPE); + + let body = self.envelope.to_vec().map_err(HttpError::custom)?; + builder.body(body) + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.response_sender.take(); + + match result { + Ok(response) => { + let future = response + .consume() + .map_err(UpstreamRequestError::Http) + .map(|_| ()) + .then(move |body_result| { + sender.map(|sender| sender.send(body_result).ok()); + Ok(()) + }); + + Box::new(future) + } + Err(error) => { + sender.map(|sender| sender.send(Err(error))); + Box::new(future::err(())) + } + } + } +} + pub struct EnvelopeManager { config: Arc, active_envelopes: u32, @@ -1779,7 +1849,7 @@ impl EnvelopeManager { fn send_envelope( &mut self, project_key: ProjectKey, - mut envelope: Envelope, + envelope: Envelope, scoping: Scoping, #[allow(unused_variables)] start_time: Instant, ) -> ResponseFuture<(), SendEnvelopeError> { @@ -1815,43 +1885,19 @@ impl EnvelopeManager { } relay_log::trace!("sending envelope to sentry endpoint"); - let http_encoding = self.config.http_encoding(); - let request = SendRequest::post(format!("/api/{}/envelope/", scoping.project_id)).build( - move |mut builder: RequestBuilder| { - // Override the `sent_at` timestamp. Since the envelope went through basic - // normalization, all timestamps have been corrected. We propagate the new - // `sent_at` to allow the next Relay to double-check this timestamp and - // potentially apply correction again. This is done as close to sending as - // possible so that we avoid internal delays. - envelope.set_sent_at(Utc::now()); - - let meta = envelope.meta(); - - if let Some(origin) = meta.origin() { - builder.header("Origin", origin.as_str()); - } - - if let Some(user_agent) = meta.user_agent() { - builder.header("User-Agent", user_agent); - } - - builder - .content_encoding(http_encoding) - .header("X-Sentry-Auth", meta.auth_header()) - .header("X-Forwarded-For", meta.forwarded_for()) - .header("Content-Type", envelope::CONTENT_TYPE); - - let body = envelope - .to_vec() - .map_err(|e| UpstreamRequestError::Http(HttpError::custom(e)))?; + let (tx, rx) = oneshot::channel(); + let request = SendEnvelope { + envelope, + scoping, + http_encoding: self.config.http_encoding(), + response_sender: Some(tx), + project_key, + }; - builder.body(body).map_err(UpstreamRequestError::Http) - }, - ); + UpstreamRelay::from_registry().do_send(SendRequest(request)); - let future = UpstreamRelay::from_registry() - .send(request) - .map_err(|_| SendEnvelopeError::ScheduleFailed) + let future = rx + .map_err(|_| SendEnvelopeError::SendFailed(UpstreamRequestError::ChannelClosed)) .and_then(move |result| { if let Err(UpstreamRequestError::RateLimited(upstream_limits)) = result { let limits = upstream_limits.scope(&scoping); @@ -2173,6 +2219,7 @@ impl Handler for EnvelopeManager { let outcome = Outcome::Invalid(DiscardReason::Internal); match error { + #[cfg(feature = "processing")] SendEnvelopeError::ScheduleFailed => { envelope_context.send_outcomes(outcome); ProcessingError::ScheduleFailed diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index cb3472752b..66dfc78a2a 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -36,7 +36,7 @@ use serde::ser::Serialize; use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration}; use relay_common::{metric, tryf, RetryBackoff}; use relay_config::{Config, RelayMode}; -use relay_log::LogError; +use relay_log::{self, LogError}; use relay_quotas::{ DataCategories, QuotaScope, RateLimit, RateLimitScope, RateLimits, RetryAfter, Scoping, }; @@ -254,51 +254,48 @@ where } } -/// Upstream request objects queued inside the `Upstream` actor. +/// Request objects queued inside the [`Upstream`] actor. /// -/// The objects are transformed int HTTP requests, and sent to upstream as HTTP connections +/// The objects are transformed into HTTP requests, and sent to upstream as HTTP connections /// become available. -struct UpstreamRequest { - config: UpstreamRequestConfig, - /// One-shot channel to be notified when the request is done. - /// - /// The request is either successful or it has failed but we are not going to retry it. - response_sender: oneshot::Sender>, - /// Http method. - method: Method, - /// Request URL. - path: String, - /// Request build function. - build: Box, +struct EnqueuedRequest { + /// The request's trait object to create and handle the HTTP request. + request: Box, /// Number of times this request was already sent previous_retries: u32, - /// When the last sending attempt started - send_start: Option, } -impl UpstreamRequest { - pub fn route_name(&self) -> &'static str { - if self.path.contains("/outcomes/") { + +impl EnqueuedRequest { + fn new(request: impl UpstreamRequest + 'static) -> Self { + Self { + request: Box::new(request), + previous_retries: 0, + } + } + + fn route_name(&self) -> &'static str { + if self.request.path().contains("/outcomes/") { "outcomes" - } else if self.path.contains("/envelope/") { + } else if self.request.path().contains("/envelope/") { "envelope" - } else if self.path.contains("/projectids/") { + } else if self.request.path().contains("/projectids/") { "project_ids" - } else if self.path.contains("/projectconfigs/") { + } else if self.request.path().contains("/projectconfigs/") { "project_configs" - } else if self.path.contains("/publickeys/") { + } else if self.request.path().contains("/publickeys/") { "public_keys" - } else if self.path.contains("/challenge/") { + } else if self.request.path().contains("/challenge/") { "challenge" - } else if self.path.contains("/response/") { + } else if self.request.path().contains("/response/") { "response" - } else if self.path.contains("/live/") { + } else if self.request.path().contains("/live/") { "check_live" } else { "unknown" } } - pub fn retries_bucket(&self) -> &'static str { + fn retries_bucket(&self) -> &'static str { match self.previous_retries { 0 => "0", 1 => "1", @@ -378,8 +375,8 @@ pub struct UpstreamRelay { first_error: Option, max_inflight_requests: usize, num_inflight_requests: usize, - high_prio_requests: VecDeque, - low_prio_requests: VecDeque, + high_prio_requests: VecDeque, + low_prio_requests: VecDeque, config: Arc, reqwest_client: reqwest::Client, /// "reqwest runtime" as this tokio runtime is currently only spawned such that reqwest can @@ -500,13 +497,17 @@ impl UpstreamRelay { self.outage_backoff.reset(); } - fn upstream_connection_check(&mut self, ctx: &mut Context) { + fn schedule_connection_check(&mut self, ctx: &mut Context) { let next_backoff = self.outage_backoff.next_backoff(); relay_log::warn!( "Network outage, scheduling another check in {:?}", next_backoff ); - ctx.notify_later(CheckUpstreamConnection, next_backoff); + + ctx.run_later(next_backoff, |slf, ctx| { + let request = EnqueuedRequest::new(GetHealthcheck); + slf.enqueue(request, ctx, EnqueuePosition::Front); + }); } /// Records an occurrence of a network error. @@ -523,37 +524,44 @@ impl UpstreamRelay { } if !self.outage_backoff.started() { - self.upstream_connection_check(ctx); + self.schedule_connection_check(ctx); } } - fn send_request(&mut self, mut request: UpstreamRequest, ctx: &mut Context) { + fn send_request(&mut self, mut request: EnqueuedRequest, ctx: &mut Context) { let uri = self .config .upstream_descriptor() - .get_url(request.path.as_ref()); + .get_url(request.request.path().as_ref()); let host_header = self .config .http_host_header() .unwrap_or_else(|| self.config.upstream_descriptor().host()); - let method = reqwest::Method::from_bytes(request.method.as_ref().as_bytes()).unwrap(); + let method = + reqwest::Method::from_bytes(request.request.method().as_ref().as_bytes()).unwrap(); + let builder = self.reqwest_client.request(method, uri); let mut builder = RequestBuilder::reqwest(builder); builder.header("Host", host_header.as_bytes()); - if request.config.set_relay_id { + if request.request.set_relay_id() { if let Some(credentials) = self.config.credentials() { - builder.header("X-Sentry-Relay-Id", credentials.id.to_string().as_bytes()); + builder.header("X-Sentry-Relay-Id", credentials.id.to_string()); } } //try to build a ClientRequest - let client_request = match request.build.build_request(builder) { + let client_request = match request.request.build(builder) { Err(e) => { - request.response_sender.send(Err(e)).ok(); + request + .request + .respond(Err(UpstreamRequestError::Http(e))) + .into_actor(self) + .spawn(ctx); + return; } Ok(client_request) => client_request, @@ -562,13 +570,13 @@ impl UpstreamRelay { // we are about to send a HTTP message keep track of requests in flight self.num_inflight_requests += 1; - let intercept_status_errors = request.config.intercept_status_errors; - - request.send_start = Some(Instant::now()); - + let intercept_status_errors = request.request.intercept_status_errors(); + let send_start = Instant::now(); let client = self.reqwest_client.clone(); + let max_response_size = self.config.max_api_payload_size(); let (tx, rx) = oneshot::channel(); + self.reqwest_runtime.spawn(async move { let res = client .execute(client_request.0) @@ -582,16 +590,14 @@ impl UpstreamRelay { .flatten() .map(Response); - let max_response_size = self.config.max_api_payload_size(); - future .track(ctx.address().recipient()) .and_then(move |response| { handle_response(response, intercept_status_errors, max_response_size) }) .into_actor(self) - .then(|send_result, slf, ctx| { - slf.handle_http_response(request, send_result, ctx); + .then(move |send_result, slf, ctx| { + slf.handle_http_response(send_start, request, send_result, ctx); fut::ok(()) }) .spawn(ctx); @@ -599,7 +605,8 @@ impl UpstreamRelay { /// Adds a metric for the upstream request. fn meter_result( - request: &UpstreamRequest, + send_start: Instant, + request: &EnqueuedRequest, send_result: &Result, ) { let sc; @@ -635,15 +642,13 @@ impl UpstreamRelay { } }; - if let Some(send_start) = request.send_start { - metric!( - timer(RelayTimers::UpstreamRequestsDuration) = send_start.elapsed(), - result = result, - status_code = status_code, - route = request.route_name(), - retries = request.retries_bucket(), - ) - } + metric!( + timer(RelayTimers::UpstreamRequestsDuration) = send_start.elapsed(), + result = result, + status_code = status_code, + route = request.route_name(), + retries = request.retries_bucket(), + ); metric!( histogram(RelayHistograms::UpstreamRetries) = request.previous_retries.into(), @@ -661,15 +666,16 @@ impl UpstreamRelay { /// 4. Otherwise, ensure an authentication request is scheduled. fn handle_http_response( &mut self, - mut request: UpstreamRequest, + send_start: Instant, + mut request: EnqueuedRequest, send_result: Result, ctx: &mut Context, ) { - UpstreamRelay::meter_result(&request, &send_result); + UpstreamRelay::meter_result(send_start, &request, &send_result); if matches!(send_result, Err(ref err) if err.is_network_error()) { self.handle_network_error(ctx); - if request.config.retry { + if request.request.retry() { request.previous_retries += 1; return self.enqueue(request, ctx, EnqueuePosition::Back); } @@ -679,18 +685,22 @@ impl UpstreamRelay { self.reset_network_error(); } - request.response_sender.send(send_result).ok(); + request + .request + .respond(send_result) + .into_actor(self) + .spawn(ctx); } /// Enqueues a request and ensures that the message queue advances. fn enqueue( &mut self, - request: UpstreamRequest, + request: EnqueuedRequest, ctx: &mut Context, position: EnqueuePosition, ) { - let name = request.config.priority.name(); - let queue = match request.config.priority { + let name = request.request.priority().name(); + let queue = match request.request.priority() { // Immediate is special and bypasses the queue. Directly send the request and return // the response channel rather than waiting for `PumpHttpMessageQueue`. RequestPriority::Immediate => return self.send_request(request, ctx), @@ -711,81 +721,31 @@ impl UpstreamRelay { ctx.notify(PumpHttpMessageQueue); } - fn enqueue_request( - &mut self, - config: UpstreamRequestConfig, - method: Method, - path: P, - build: F, - ctx: &mut Context, - ) -> ResponseFuture - where - F: RequestBuilderTransformer, - P: AsRef, - { - let (tx, rx) = oneshot::channel::>(); - - let request = UpstreamRequest { - config, - method, - path: path.as_ref().to_owned(), - response_sender: tx, - build: Box::new(build), - previous_retries: 0, - send_start: None, - }; - - self.enqueue(request, ctx, EnqueuePosition::Front); - - let future = rx - // map errors caused by the oneshot channel being closed (unlikely) - .map_err(|_| UpstreamRequestError::ChannelClosed) - // unwrap the result (this is how we transport the http failure through the channel) - .and_then(|result| result); - - Box::new(future) - } - - fn enqueue_query( + fn enqueue_query( &mut self, query: Q, ctx: &mut Context, ) -> ResponseFuture { - let method = query.method(); - let path = query.path(); - let config = UpstreamRequestConfig { - retry: Q::retry(), - priority: Q::priority(), - intercept_status_errors: true, - set_relay_id: true, - }; - let credentials = tryf!(self .config .credentials() .ok_or(UpstreamRequestError::NoCredentials)); - let (json, signature) = credentials.secret_key.pack(query); - let json = Arc::new(json); + let (json, signature) = credentials.secret_key.pack(&query); + let (tx, rx) = oneshot::channel(); - let max_response_size = self.config.max_api_payload_size(); + let request = EnqueuedRequest::new(UpstreamQueryRequest { + query, + body: json, + signature, + max_response_size: self.config.max_api_payload_size(), + sender: Some(tx), + }); - let future = self - .enqueue_request( - config, - method, - path, - move |mut builder: RequestBuilder| { - builder.header("X-Sentry-Relay-Signature", signature.as_str().as_bytes()); - builder.header(header::CONTENT_TYPE, b"application/json"); - builder.body(&*json).map_err(UpstreamRequestError::Http) - }, - ctx, - ) - .and_then(move |r| { - r.json(max_response_size) - .map_err(UpstreamRequestError::Http) - }); + self.enqueue(request, ctx, EnqueuePosition::Front); + let future = rx + .map_err(|_| UpstreamRequestError::ChannelClosed) + .flatten(); Box::new(future) } @@ -989,47 +949,72 @@ impl Handler for UpstreamRelay { } } -/// Checks the status of the network connection with the upstream server -struct CheckUpstreamConnection; +struct ScheduleConnectionCheck; -impl Message for CheckUpstreamConnection { +impl Message for ScheduleConnectionCheck { type Result = (); } -impl Handler for UpstreamRelay { +impl Handler for UpstreamRelay { type Result = (); - fn handle(&mut self, _msg: CheckUpstreamConnection, ctx: &mut Self::Context) -> Self::Result { - self.enqueue_request( - UpstreamRequestConfig { - priority: RequestPriority::Immediate, - retry: false, - intercept_status_errors: true, - set_relay_id: true, - }, - Method::GET, - "/api/0/relays/live/", - |builder: RequestBuilder| builder.finish().map_err(UpstreamRequestError::Http), - ctx, - ) - .and_then(|client_response| { - // consume response bodies to ensure the connection remains usable. - client_response - .consume() - .map_err(UpstreamRequestError::Http) - }) - .into_actor(self) - .then(|result, slf, ctx| { + fn handle(&mut self, _msg: ScheduleConnectionCheck, ctx: &mut Self::Context) -> Self::Result { + self.schedule_connection_check(ctx); + } +} + +/// Checks the status of the network connection with the upstream server +struct GetHealthcheck; + +impl UpstreamRequest for GetHealthcheck { + fn method(&self) -> Method { + Method::GET + } + + fn path(&self) -> Cow<'_, str> { + Cow::Borrowed("/api/0/relays/live/") + } + + fn retry(&self) -> bool { + false + } + + fn priority(&self) -> RequestPriority { + RequestPriority::Immediate + } + + fn set_relay_id(&self) -> bool { + true + } + + fn intercept_status_errors(&self) -> bool { + true + } + + fn build(&mut self, builder: RequestBuilder) -> Result { + builder.finish() + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let future: ResponseFuture<_, _> = match result { + Ok(response) => Box::new(response.consume().map_err(UpstreamRequestError::Http)), + Err(err) => Box::new(future::err(err)), + }; + + Box::new(future.then(|result| { if matches!(result, Err(err) if err.is_network_error()) { // still network error, schedule another attempt - slf.upstream_connection_check(ctx); + UpstreamRelay::from_registry().do_send(ScheduleConnectionCheck); } else { // resume normal messages - ctx.notify(PumpHttpMessageQueue); + UpstreamRelay::from_registry().do_send(PumpHttpMessageQueue); } - fut::ok(()) - }) - .spawn(ctx); + + Ok(()) + })) } } @@ -1064,137 +1049,64 @@ where } } -pub struct SendRequest { - method: Method, - path: String, - builder: B, - transformer: R, - config: UpstreamRequestConfig, -} - -struct UpstreamRequestConfig { - /// Queueing priority for the request. - priority: RequestPriority, - /// Should the request be retried in case of network error. - retry: bool, - /// Should 429s be honored within the upstream. - intercept_status_errors: bool, - /// Should the x-sentry-relay-id header be added. - set_relay_id: bool, -} +/// Represents an HTTP request to be sent by the Upstream actor. +pub trait UpstreamRequest: Send { + /// The HTTP method of the request. + fn method(&self) -> Method; -impl SendRequest { - pub fn new>(method: Method, path: S) -> Self { - SendRequest { - method, - path: path.into(), - builder: (), - transformer: (), - config: UpstreamRequestConfig { - priority: RequestPriority::Low, - retry: true, - intercept_status_errors: true, - set_relay_id: true, - }, - } - } + /// The path relative to the upstream. + fn path(&self) -> Cow<'_, str>; - pub fn post>(path: S) -> Self { - Self::new(Method::POST, path) + /// Whether this request should retry on network errors. + fn retry(&self) -> bool { + true } -} -impl SendRequest -where - B: RequestBuilderTransformer, - T: ResponseTransformer, -{ - pub fn build(self, builder: F) -> SendRequest - where - F: RequestBuilderTransformer, - { - SendRequest { - method: self.method, - path: self.path, - builder, - transformer: self.transformer, - config: self.config, - } + /// The queueing priority of the request. Defaults to `Low`. + fn priority(&self) -> RequestPriority { + RequestPriority::Low } - #[inline] - pub fn retry(mut self, should_retry: bool) -> Self { - self.config.retry = should_retry; - self + /// True if normal error processing should occur, false if + /// errors from the upstream should not be processed and returned as is + /// in the response. + fn intercept_status_errors(&self) -> bool { + true } - #[inline] - pub fn intercept_status_errors(mut self, should_intercept_status_errors: bool) -> Self { - self.config.intercept_status_errors = should_intercept_status_errors; - self + /// If set to True it will add the X-Sentry-Relay-Id header to the request + /// + /// This should be done (only) for calls to endpoints that use Relay authentication. + fn set_relay_id(&self) -> bool { + true } - #[inline] - pub fn set_relay_id(mut self, should_set_relay_id: bool) -> Self { - self.config.set_relay_id = should_set_relay_id; - self - } + /// Called whenever the request will be send over HTTP (possible multiple times) + fn build(&mut self, builder: RequestBuilder) -> Result; - #[allow(dead_code)] - pub fn transform(self, callback: F) -> SendRequest - where - F: ResponseTransformer, - { - SendRequest { - method: self.method, - path: self.path, - builder: self.builder, - transformer: callback, - config: self.config, - } - } + /// Called when the HTTP request completes, either with success or an error that will not + /// be retried. + fn respond(&mut self, result: Result) + -> ResponseFuture<(), ()>; } -impl Message for SendRequest +pub struct SendRequest(pub T); + +impl Message for SendRequest where - B: RequestBuilderTransformer, - R: ResponseTransformer, + T: UpstreamRequest, { - type Result = Result<::Item, ::Error>; + type Result = (); } -// impl Message for SendRequest { -// type Result = Result<(), UpstreamRequestError>; -// } - -/// SendRequest messages represent external messages that need to be sent to the upstream server -/// and do not use Relay authentication. -/// -/// The handler adds the message to one of the message queues. -impl Handler> for UpstreamRelay +impl Handler> for UpstreamRelay where - B: RequestBuilderTransformer, - R: ResponseTransformer, - ::Item: Send + 'static, - ::Error: From + Send + 'static, + T: UpstreamRequest + 'static, { - type Result = ResponseFuture<::Item, ::Error>; - - fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { - let SendRequest { - method, - path, - builder, - transformer, - config, - } = message; - - let future = self - .enqueue_request(config, method, path, builder, ctx) - .from_err() - .and_then(move |r| transformer.transform_response(r)); + type Result = (); - Box::new(future) + fn handle(&mut self, msg: SendRequest, ctx: &mut Self::Context) -> Self::Result { + self.enqueue(EnqueuedRequest::new(msg.0), ctx, EnqueuePosition::Front); } } @@ -1223,7 +1135,7 @@ impl Handler for UpstreamRelay { } } -pub trait UpstreamQuery: Serialize { +pub trait UpstreamQuery: Serialize + Send + 'static { type Response: DeserializeOwned + 'static + Send; /// The HTTP method of the request. @@ -1247,6 +1159,66 @@ impl Message for SendQuery { type Result = Result; } +struct UpstreamQueryRequest { + query: T, + body: Vec, + signature: String, + max_response_size: usize, + sender: Option>>, +} + +impl UpstreamRequest for UpstreamQueryRequest { + fn method(&self) -> Method { + self.query.method() + } + + fn path(&self) -> Cow<'_, str> { + self.query.path() + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + builder.header( + "X-Sentry-Relay-Signature", + self.signature.as_str().as_bytes(), + ); + builder.header(header::CONTENT_TYPE, b"application/json"); + builder.body(&self.body) + } + + fn retry(&self) -> bool { + T::retry() + } + + fn priority(&self) -> RequestPriority { + T::priority() + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.sender.take(); + + match result { + Ok(response) => { + let future = response + .json(self.max_response_size) + .map_err(UpstreamRequestError::Http) + .then(|result| { + sender.map(|sender| sender.send(result)); + Ok(()) + }); + + Box::new(future) + } + Err(error) => { + sender.map(|sender| sender.send(Err(error))); + Box::new(future::err(())) + } + } + } +} + /// SendQuery messages represent messages that need to be sent to the upstream server /// and use Relay authentication. /// diff --git a/relay-server/src/endpoints/forward.rs b/relay-server/src/endpoints/forward.rs index c4d8139274..beb6a6c632 100644 --- a/relay-server/src/endpoints/forward.rs +++ b/relay-server/src/endpoints/forward.rs @@ -3,19 +3,26 @@ //! This endpoint will issue a client request to the upstream and append relay's own headers //! (`X-Forwarded-For` and `Sentry-Relay-Id`). The response is then streamed back to the origin. +use std::borrow::Cow; +use std::fmt; + use ::actix::prelude::*; use actix_web::error::ResponseError; +use actix_web::http::header::HeaderValue; use actix_web::http::{header, header::HeaderName, uri::PathAndQuery, StatusCode}; +use actix_web::http::{HeaderMap, Method}; use actix_web::{AsyncResponder, Error, HttpMessage, HttpRequest, HttpResponse}; +use bytes::Bytes; use failure::Fail; -use futures::prelude::*; +use futures::{future, prelude::*, sync::oneshot}; + use lazy_static::lazy_static; use relay_common::GlobMatcher; use relay_config::Config; use relay_log::LogError; -use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError}; +use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError}; use crate::body::ForwardBody; use crate::endpoints::statics; use crate::extractors::ForwardedFor; @@ -119,6 +126,97 @@ fn get_limit_for_path(path: &str, config: &Config) -> usize { } } +type Headers = Vec<(String, Vec)>; +type ForwardResponse = (StatusCode, Headers, Vec); + +struct ForwardRequest { + method: Method, + path: String, + headers: HeaderMap, + forwarded_for: ForwardedFor, + data: Bytes, + max_response_size: usize, + sender: Option>>, +} + +impl fmt::Debug for ForwardRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ForwardRequest") + .field("method", &self.method) + .field("path", &self.path) + .finish() + } +} + +impl UpstreamRequest for ForwardRequest { + fn method(&self) -> Method { + self.method.clone() + } + + fn path(&self) -> Cow<'_, str> { + self.path.as_str().into() + } + + fn retry(&self) -> bool { + false + } + + fn intercept_status_errors(&self) -> bool { + false + } + + fn set_relay_id(&self) -> bool { + false + } + + fn build(&mut self, mut builder: RequestBuilder) -> Result { + for (key, value) in &self.headers { + // Since there is no API in actix-web to access the raw, not-yet-decompressed stream, we + // must not forward the content-encoding header, as the actix http client will do its own + // content encoding. Also remove content-length because it's likely wrong. + if HOP_BY_HOP_HEADERS.iter().any(|x| x == key) + || IGNORED_REQUEST_HEADERS.iter().any(|x| x == key) + { + continue; + } + + builder.header(key, value); + } + + builder.header("X-Forwarded-For", self.forwarded_for.as_ref()); + builder.body(&self.data) + } + + fn respond( + &mut self, + result: Result, + ) -> ResponseFuture<(), ()> { + let sender = self.sender.take(); + + match result { + Ok(response) => { + let status = StatusCode::from_u16(response.status().as_u16()).unwrap(); + let headers = response.clone_headers(); + + let future = response + .bytes(self.max_response_size) + .and_then(move |body| Ok((status, headers, body))) + .map_err(UpstreamRequestError::Http) + .then(|result| { + sender.map(|sender| sender.send(result)); + Ok(()) + }); + + Box::new(future) + } + Err(e) => { + sender.map(|sender| sender.send(Err(e))); + Box::new(future::err(())) + } + } + } +} + /// Implementation of the forward endpoint. /// /// This endpoint will create a proxy request to the upstream for every incoming request and stream @@ -145,49 +243,26 @@ pub fn forward_upstream( ForwardBody::new(request, limit) .map_err(Error::from) .and_then(move |data| { - let forward_request = SendRequest::new(method, path_and_query) - .retry(false) - .intercept_status_errors(false) - .set_relay_id(false) - .build(move |mut builder: RequestBuilder| { - for (key, value) in &headers { - // Since there is no API in actix-web to access the raw, not-yet-decompressed stream, we - // must not forward the content-encoding header, as the actix http client will do its own - // content encoding. Also remove content-length because it's likely wrong. - if HOP_BY_HOP_HEADERS.iter().any(|x| x == key) - || IGNORED_REQUEST_HEADERS.iter().any(|x| x == key) - { - continue; - } - - builder.header(key, value); - } - - builder.header("X-Forwarded-For", forwarded_for.as_ref()); - builder.body(&data).map_err(UpstreamRequestError::Http) - }) - .transform(move |response: Response| { - let status = response.status(); - let headers = response.clone_headers(); - response - .bytes(max_response_size) - .and_then(move |body| Ok((status, headers, body))) - .map_err(UpstreamRequestError::Http) - }); - - UpstreamRelay::from_registry() - .send(forward_request) - .map_err(|_| { - Error::from(ForwardedUpstreamRequestError( - UpstreamRequestError::ChannelClosed, - )) - }) - }) - .and_then(move |result: Result<_, UpstreamRequestError>| { - let (status, headers, body) = result.map_err(ForwardedUpstreamRequestError::from)?; - let actix_code = StatusCode::from_u16(status.as_u16()).unwrap(); - let mut forwarded_response = HttpResponse::build(actix_code); + let (tx, rx) = oneshot::channel(); + + let forward_request = ForwardRequest { + method, + path: path_and_query, + headers, + forwarded_for, + data, + max_response_size, + sender: Some(tx), + }; + UpstreamRelay::from_registry().do_send(SendRequest(forward_request)); + + rx.map_err(|_| UpstreamRequestError::ChannelClosed) + .flatten() + .map_err(|e| Error::from(ForwardedUpstreamRequestError::from(e))) + }) + .and_then(move |(status, headers, body)| { + let mut forwarded_response = HttpResponse::build(status); let mut has_content_type = false; for (key, value) in headers { diff --git a/relay-server/src/extractors/forwarded_for.rs b/relay-server/src/extractors/forwarded_for.rs index fd173d627b..9ea0756fbd 100644 --- a/relay-server/src/extractors/forwarded_for.rs +++ b/relay-server/src/extractors/forwarded_for.rs @@ -1,6 +1,7 @@ use actix_web::http::header; use actix_web::{Error, FromRequest, HttpMessage, HttpRequest}; +#[derive(Debug)] pub struct ForwardedFor(String); impl ForwardedFor { diff --git a/relay-server/src/http.rs b/relay-server/src/http.rs index fc252bbf0b..1d84ed3abf 100644 --- a/relay-server/src/http.rs +++ b/relay-server/src/http.rs @@ -103,18 +103,27 @@ impl RequestBuilder { self } + /// Add an optional header, not replacing existing ones. + /// + /// If the value is `Some`, the header is added. If the value is `None`, headers are not + /// changed. + pub fn header_opt( + &mut self, + key: impl AsRef, + value: Option>, + ) -> &mut Self { + if let Some(value) = value { + take_mut::take(&mut self.builder, |b| { + b.header(key.as_ref(), value.as_ref()) + }); + } + self + } + pub fn body(mut self, body: B) -> Result where B: AsRef<[u8]>, { - // actix-web's Binary is used as argument here because the type can be constructed from - // almost anything and then the actix-web codepath is minimally affected. - // - // Still it's not perfect as in the identity-encoding path we have some unnecessary copying - // that is just to get around type conflicts. We cannot use Bytes here because we have a - // version split between actix-web's Bytes dependency and reqwest's Bytes dependency. A - // real zero-copy abstraction over both would force us to downgrade reqwest to a version - // that uses Bytes 0.4. self.builder = match self.http_encoding { HttpEncoding::Identity => self.builder.body(body.as_ref().to_vec()), HttpEncoding::Deflate => {