Skip to content

Commit

Permalink
ref(server): Introduce trait for upstream requests (getsentry#1037)
Browse files Browse the repository at this point in the history
Introduce a better way to handle various Upstream Requests (regardless of if they go to Relay Authenticated endpoints or not).

Generally cleans up the handling of futures that end up generating http requests to the downstream server in preparation for removing long running futures.
  • Loading branch information
RaduW authored Aug 6, 2021
1 parent 7bf6b03 commit 370f181
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 363 deletions.
127 changes: 87 additions & 40 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
Expand All @@ -6,13 +7,14 @@ use std::time::{Duration, Instant};
use std::{fmt, net};

use actix::prelude::*;
use actix_web::http::Method;
use chrono::{DateTime, Duration as SignedDuration, Utc};
use failure::Fail;
use futures::{future, prelude::*};
use futures::{future, prelude::*, sync::oneshot};
use serde_json::Value as SerdeValue;

use relay_common::{clone, metric, ProjectId, ProjectKey, UnixTimestamp};
use relay_config::{Config, RelayMode};
use relay_config::{Config, HttpEncoding, RelayMode};
use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor};
use relay_general::processor::{process_value, ProcessingState};
use relay_general::protocol::{
Expand All @@ -33,10 +35,10 @@ use crate::actors::project_cache::{
CheckEnvelope, GetProjectState, InsertMetrics, MergeBuckets, ProjectCache, ProjectError,
UpdateRateLimits,
};
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequestError};
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError};
use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, RequestBuilder};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers};
use crate::service::ServerError;
use crate::utils::{
Expand Down Expand Up @@ -1733,6 +1735,7 @@ impl Handler<ProcessMetrics> for EnvelopeProcessor {
/// Error returned from [`EnvelopeManager::send_envelope`].
#[derive(Debug)]
enum SendEnvelopeError {
#[cfg(feature = "processing")]
ScheduleFailed,
#[cfg(feature = "processing")]
StoreFailed(StoreError),
Expand All @@ -1743,6 +1746,73 @@ enum SendEnvelopeError {
/// Either a captured envelope or an error that occured during processing.
pub type CapturedEnvelope = Result<Envelope, String>;

#[derive(Debug)]
struct SendEnvelope {
envelope: Envelope,
scoping: Scoping,
http_encoding: HttpEncoding,
response_sender: Option<oneshot::Sender<Result<(), UpstreamRequestError>>>,
project_key: ProjectKey,
}

impl UpstreamRequest for SendEnvelope {
fn method(&self) -> Method {
Method::POST
}

fn path(&self) -> Cow<'_, str> {
format!("/api/{}/envelope/", self.scoping.project_id).into()
}

fn build(&mut self, mut builder: RequestBuilder) -> Result<Request, HttpError> {
// Override the `sent_at` timestamp. Since the envelope went through basic
// normalization, all timestamps have been corrected. We propagate the new
// `sent_at` to allow the next Relay to double-check this timestamp and
// potentially apply correction again. This is done as close to sending as
// possible so that we avoid internal delays.
self.envelope.set_sent_at(Utc::now());

let meta = self.envelope.meta();

builder
.content_encoding(self.http_encoding)
.header_opt("Origin", meta.origin().map(|url| url.as_str()))
.header_opt("User-Agent", meta.user_agent())
.header("X-Sentry-Auth", meta.auth_header())
.header("X-Forwarded-For", meta.forwarded_for())
.header("Content-Type", envelope::CONTENT_TYPE);

let body = self.envelope.to_vec().map_err(HttpError::custom)?;
builder.body(body)
}

fn respond(
&mut self,
result: Result<Response, UpstreamRequestError>,
) -> ResponseFuture<(), ()> {
let sender = self.response_sender.take();

match result {
Ok(response) => {
let future = response
.consume()
.map_err(UpstreamRequestError::Http)
.map(|_| ())
.then(move |body_result| {
sender.map(|sender| sender.send(body_result).ok());
Ok(())
});

Box::new(future)
}
Err(error) => {
sender.map(|sender| sender.send(Err(error)));
Box::new(future::err(()))
}
}
}
}

pub struct EnvelopeManager {
config: Arc<Config>,
active_envelopes: u32,
Expand Down Expand Up @@ -1779,7 +1849,7 @@ impl EnvelopeManager {
fn send_envelope(
&mut self,
project_key: ProjectKey,
mut envelope: Envelope,
envelope: Envelope,
scoping: Scoping,
#[allow(unused_variables)] start_time: Instant,
) -> ResponseFuture<(), SendEnvelopeError> {
Expand Down Expand Up @@ -1815,43 +1885,19 @@ impl EnvelopeManager {
}

relay_log::trace!("sending envelope to sentry endpoint");
let http_encoding = self.config.http_encoding();
let request = SendRequest::post(format!("/api/{}/envelope/", scoping.project_id)).build(
move |mut builder: RequestBuilder| {
// Override the `sent_at` timestamp. Since the envelope went through basic
// normalization, all timestamps have been corrected. We propagate the new
// `sent_at` to allow the next Relay to double-check this timestamp and
// potentially apply correction again. This is done as close to sending as
// possible so that we avoid internal delays.
envelope.set_sent_at(Utc::now());

let meta = envelope.meta();

if let Some(origin) = meta.origin() {
builder.header("Origin", origin.as_str());
}

if let Some(user_agent) = meta.user_agent() {
builder.header("User-Agent", user_agent);
}

builder
.content_encoding(http_encoding)
.header("X-Sentry-Auth", meta.auth_header())
.header("X-Forwarded-For", meta.forwarded_for())
.header("Content-Type", envelope::CONTENT_TYPE);

let body = envelope
.to_vec()
.map_err(|e| UpstreamRequestError::Http(HttpError::custom(e)))?;
let (tx, rx) = oneshot::channel();
let request = SendEnvelope {
envelope,
scoping,
http_encoding: self.config.http_encoding(),
response_sender: Some(tx),
project_key,
};

builder.body(body).map_err(UpstreamRequestError::Http)
},
);
UpstreamRelay::from_registry().do_send(SendRequest(request));

let future = UpstreamRelay::from_registry()
.send(request)
.map_err(|_| SendEnvelopeError::ScheduleFailed)
let future = rx
.map_err(|_| SendEnvelopeError::SendFailed(UpstreamRequestError::ChannelClosed))
.and_then(move |result| {
if let Err(UpstreamRequestError::RateLimited(upstream_limits)) = result {
let limits = upstream_limits.scope(&scoping);
Expand Down Expand Up @@ -2173,6 +2219,7 @@ impl Handler<HandleEnvelope> for EnvelopeManager {
let outcome = Outcome::Invalid(DiscardReason::Internal);

match error {
#[cfg(feature = "processing")]
SendEnvelopeError::ScheduleFailed => {
envelope_context.send_outcomes(outcome);
ProcessingError::ScheduleFailed
Expand Down
Loading

0 comments on commit 370f181

Please sign in to comment.