Skip to content

Commit

Permalink
feat: Add database tracking and report for Push Reliability (#769)
Browse files Browse the repository at this point in the history
This PR introduces tracking throughput for the database.

It also introduces the PushReliability reporting skeleton. This will be
fleshed out with full reporting later.

Closes: #[SYNC-4324](https://mozilla-hub.atlassian.net/browse/SYNC-4324)

[SYNC-4324]:
https://mozilla-hub.atlassian.net/browse/SYNC-4324?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
jrconlin authored Oct 21, 2024
1 parent 8fff7ab commit e95063c
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 74 deletions.
5 changes: 5 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ impl FromStr for ClientMessage {
}
}

/// Returned ACKnowledgement of the received message by the User Agent.
/// This is the payload for the `messageType:ack` packet.
///
#[derive(Debug, Deserialize)]
pub struct ClientAck {
// The channel_id which received messages
#[serde(rename = "channelID")]
pub channel_id: Uuid,
// The corresponding version number for the message.
pub version: String,
}

Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl AppState {
db_settings: settings.db_settings.clone(),
};
let storage_type = StorageType::from_dsn(&db_settings.dsn);

#[allow(unused)]
let db: Box<dyn DbClient> = match storage_type {
#[cfg(feature = "bigtable")]
Expand Down
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ ctor.workspace = true
tokio.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ async-stream = "0.3"
ctor.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ tokio.workspace = true
serde_json.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl WebPushClient {
// Get the stored notification record.
let n = &self.ack_state.unacked_stored_notifs[pos];
debug!("✅ Ack notif: {:?}", &n);
// TODO: Record "ack'd" reliability_id, if present.
// Only force delete Topic messages, since they don't have a timestamp.
// Other messages persist in the database, to be, eventually, cleaned up by their
// TTL. We will need to update the `CurrentTimestamp` field for the channel
Expand Down
20 changes: 16 additions & 4 deletions autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub enum ApiErrorKind {
#[error(transparent)]
Jwt(#[from] jsonwebtoken::errors::Error),

#[error(transparent)]
Serde(#[from] serde_json::Error),

#[error(transparent)]
ReqwestError(#[from] reqwest::Error),

#[error("Error while validating token")]
TokenHashValidation(#[source] openssl::error::ErrorStack),

Expand Down Expand Up @@ -143,6 +149,7 @@ impl ApiErrorKind {

ApiErrorKind::VapidError(_)
| ApiErrorKind::Jwt(_)
| ApiErrorKind::Serde(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::InvalidAuthentication
| ApiErrorKind::InvalidLocalAuth(_) => StatusCode::UNAUTHORIZED,
Expand All @@ -161,7 +168,8 @@ impl ApiErrorKind {
| ApiErrorKind::Io(_)
| ApiErrorKind::Metrics(_)
| ApiErrorKind::EndpointUrl(_)
| ApiErrorKind::RegistrationSecretHash(_) => StatusCode::INTERNAL_SERVER_ERROR,
| ApiErrorKind::RegistrationSecretHash(_)
| ApiErrorKind::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -179,7 +187,7 @@ impl ApiErrorKind {
ApiErrorKind::InvalidMessageId => "invalid_message_id",

ApiErrorKind::VapidError(_) => "vapid_error",
ApiErrorKind::Jwt(_) => "jwt",
ApiErrorKind::Jwt(_) | ApiErrorKind::Serde(_) => "jwt",
ApiErrorKind::TokenHashValidation(_) => "token_hash_validation",
ApiErrorKind::InvalidAuthentication => "invalid_authentication",
ApiErrorKind::InvalidLocalAuth(_) => "invalid_local_auth",
Expand All @@ -199,6 +207,7 @@ impl ApiErrorKind {
ApiErrorKind::Conditional(_) => "conditional",
ApiErrorKind::EndpointUrl(e) => return e.metric_label(),
ApiErrorKind::RegistrationSecretHash(_) => "registration_secret_hash",
ApiErrorKind::ReqwestError(_) => "reqwest",
})
}

Expand All @@ -221,7 +230,8 @@ impl ApiErrorKind {
// Ignore oversized payload.
ApiErrorKind::PayloadError(_) |
ApiErrorKind::Validation(_) |
ApiErrorKind::Conditional(_) => false,
ApiErrorKind::Conditional(_) |
ApiErrorKind::ReqwestError(_) => false,
_ => true,
}
}
Expand Down Expand Up @@ -251,6 +261,7 @@ impl ApiErrorKind {
ApiErrorKind::VapidError(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::Jwt(_)
| ApiErrorKind::Serde(_)
| ApiErrorKind::InvalidAuthentication
| ApiErrorKind::InvalidLocalAuth(_) => Some(109),

Expand All @@ -269,7 +280,8 @@ impl ApiErrorKind {
| ApiErrorKind::InvalidRouterToken
| ApiErrorKind::RegistrationSecretHash(_)
| ApiErrorKind::EndpointUrl(_)
| ApiErrorKind::InvalidMessageId => None,
| ApiErrorKind::InvalidMessageId
| ApiErrorKind::ReqwestError(_) => None,
}
}
}
Expand Down
24 changes: 14 additions & 10 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{ApiError, ApiErrorKind};
use crate::error::{ApiError, ApiErrorKind, ApiResult};
use crate::extractors::{
message_id::MessageId, notification_headers::NotificationHeaders, subscription::Subscription,
};
Expand Down Expand Up @@ -103,6 +103,7 @@ impl From<Notification> for autopush_common::notification::Notification {
timestamp: notification.timestamp,
data: notification.data,
sortkey_timestamp,
reliability_id: notification.subscription.reliability_id,
headers: {
let headers: HashMap<String, String> = notification.headers.into();
if headers.is_empty() {
Expand Down Expand Up @@ -160,25 +161,28 @@ impl Notification {
/// fields are still required when delivering to the connection server, so
/// we can't simply convert this notification type to that one and serialize
/// via serde.
pub fn serialize_for_delivery(&self) -> HashMap<&'static str, serde_json::Value> {
pub fn serialize_for_delivery(&self) -> ApiResult<HashMap<&'static str, serde_json::Value>> {
let mut map = HashMap::new();

map.insert(
"channelID",
serde_json::to_value(self.subscription.channel_id).unwrap(),
serde_json::to_value(self.subscription.channel_id)?,
);
map.insert("version", serde_json::to_value(&self.message_id).unwrap());
map.insert("ttl", serde_json::to_value(self.headers.ttl).unwrap());
map.insert("topic", serde_json::to_value(&self.headers.topic).unwrap());
map.insert("timestamp", serde_json::to_value(self.timestamp).unwrap());
map.insert("version", serde_json::to_value(&self.message_id)?);
map.insert("ttl", serde_json::to_value(self.headers.ttl)?);
map.insert("topic", serde_json::to_value(&self.headers.topic)?);
map.insert("timestamp", serde_json::to_value(self.timestamp)?);
if let Some(reliability_id) = &self.subscription.reliability_id {
map.insert("reliability_id", serde_json::to_value(reliability_id)?);
}

if let Some(data) = &self.data {
map.insert("data", serde_json::to_value(data).unwrap());
map.insert("data", serde_json::to_value(data)?);

let headers: HashMap<_, _> = self.headers.clone().into();
map.insert("headers", serde_json::to_value(headers).unwrap());
map.insert("headers", serde_json::to_value(headers)?);
}

map
Ok(map)
}
}
19 changes: 9 additions & 10 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Subscription {
/// (This should ONLY be applied for messages that match known
/// Mozilla provided VAPID public keys.)
///
pub tracking_id: Option<String>,
pub reliability_id: Option<String>,
}

impl FromRequest for Subscription {
Expand Down Expand Up @@ -73,11 +73,13 @@ impl FromRequest for Subscription {
.transpose()?;

trace!("raw vapid: {:?}", &vapid);
let trackable = if let Some(vapid) = &vapid {
app_state.reliability.is_trackable(vapid)
} else {
false
};
let reliability_id: Option<String> = vapid.as_ref().and_then(|v| {
app_state
.vapid_tracker
.is_trackable(v)
.then(|| app_state.vapid_tracker.get_id(req.headers()))
});
debug!("🔍 Assigning Reliability ID: {reliability_id:?}");

// Capturing the vapid sub right now will cause too much cardinality. Instead,
// let's just capture if we have a valid VAPID, as well as what sort of bad sub
Expand Down Expand Up @@ -132,14 +134,11 @@ impl FromRequest for Subscription {
.incr(&format!("updates.vapid.draft{:02}", vapid.vapid.version()))?;
}

let tracking_id =
trackable.then(|| app_state.reliability.get_tracking_id(req.headers()));

Ok(Subscription {
user,
channel_id,
vapid,
tracking_id,
reliability_id,
})
}
.boxed_local()
Expand Down
8 changes: 7 additions & 1 deletion autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub fn build_message_data(notification: &Notification) -> ApiResult<HashMap<&'st
message_data.insert_opt("enc", notification.headers.encryption.as_ref());
message_data.insert_opt("cryptokey", notification.headers.crypto_key.as_ref());
message_data.insert_opt("enckey", notification.headers.encryption_key.as_ref());
// Report the data to the UA. How this value is reported back is still a work in progress.
trace!(
"🔍 Sending Reliability ID: {:?}",
notification.subscription.reliability_id
);
message_data.insert_opt("rid", notification.subscription.reliability_id.as_ref());
}

Ok(message_data)
Expand Down Expand Up @@ -239,7 +245,7 @@ pub mod tests {
user,
channel_id: channel_id(),
vapid: None,
tracking_id: None,
reliability_id: None,
},
headers: NotificationHeaders {
ttl: 0,
Expand Down
18 changes: 10 additions & 8 deletions autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ impl Router for WebPushRouter {
);
}
Err(error) => {
if error.is_timeout() {
self.metrics.incr("error.node.timeout")?;
if let ApiErrorKind::ReqwestError(error) = &error.kind {
if error.is_timeout() {
self.metrics.incr("error.node.timeout")?;
};
if error.is_connect() {
self.metrics.incr("error.node.connect")?;
};
};
if error.is_connect() {
self.metrics.incr("error.node.connect")?;
}
debug!("✉ Error while sending webpush notification: {}", error);
self.remove_node_id(user, node_id).await?
}
Expand Down Expand Up @@ -177,11 +179,11 @@ impl WebPushRouter {
&self,
notification: &Notification,
node_id: &str,
) -> Result<Response, reqwest::Error> {
) -> ApiResult<Response> {
let url = format!("{}/push/{}", node_id, notification.subscription.user.uaid);
let notification = notification.serialize_for_delivery();
let notification = notification.serialize_for_delivery()?;

self.http.put(&url).json(&notification).send().await
Ok(self.http.put(&url).json(&notification).send().await?)
}

/// Notify the node to check for notifications for the user
Expand Down
6 changes: 3 additions & 3 deletions autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AppState {
pub apns_router: Arc<ApnsRouter>,
#[cfg(feature = "stub")]
pub stub_router: Arc<StubRouter>,
pub reliability: Arc<VapidTracker>,
pub vapid_tracker: Arc<VapidTracker>,
}

pub struct Server;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Server {
)
.await?,
);
let reliability = Arc::new(VapidTracker(settings.tracking_keys()));
let vapid_tracker = Arc::new(VapidTracker(settings.tracking_keys()));
#[cfg(feature = "stub")]
let stub_router = Arc::new(StubRouter::new(settings.stub.clone())?);
let app_state = AppState {
Expand All @@ -122,7 +122,7 @@ impl Server {
apns_router,
#[cfg(feature = "stub")]
stub_router,
reliability,
vapid_tracker,
};

spawn_pool_periodic_reporter(
Expand Down
31 changes: 21 additions & 10 deletions autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ impl Settings {
// public key, but that may not always be true.
pub fn tracking_keys(&self) -> Vec<String> {
let keys = &self.tracking_keys.replace(['"', ' '], "");
Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned())
.collect()
let result = Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned().replace("=", ""))
.collect();
trace!("🔍 tracking_keys: {result:?}");
result
}

/// Get the URL for this endpoint server
Expand All @@ -193,11 +195,20 @@ impl VapidTracker {
pub fn is_trackable(&self, vapid: &VapidHeaderWithKey) -> bool {
// ideally, [Settings.with_env_and_config_file()] does the work of pre-populating
// the Settings.tracking_vapid_pubs cache, but we can't rely on that.
self.0.contains(&vapid.public_key)
let key = vapid.public_key.replace('=', "");
let result = self.0.contains(&key);
debug!("🔍 Checking {key} {}", {
if result {
"Match!"
} else {
"no match"
}
});
result
}

/// Extract the message Id from the headers (if present), otherwise just make one up.
pub fn get_tracking_id(&self, headers: &HeaderMap) -> String {
pub fn get_id(&self, headers: &HeaderMap) -> String {
headers
.get("X-MessageId")
.and_then(|v|
Expand Down Expand Up @@ -304,7 +315,7 @@ mod tests {
#[test]
fn test_tracking_keys() -> ApiResult<()> {
let settings = Settings{
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI"]"#.to_owned(),
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7"]"#.to_owned(),
..Default::default()
};

Expand All @@ -314,7 +325,7 @@ mod tests {
token: "".to_owned(),
version_data: crate::headers::vapid::VapidVersionData::Version1,
},
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI".to_owned()
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7==".to_owned()
};

let key_set = settings.tracking_keys();
Expand All @@ -327,20 +338,20 @@ mod tests {
}

#[test]
fn test_tracking_id() -> ApiResult<()> {
fn test_reliability_id() -> ApiResult<()> {
let mut headers = HeaderMap::new();
let keys = Vec::new();
let reliability = VapidTracker(keys);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert!(!key.is_empty());

headers.insert(
HeaderName::from_lowercase(b"x-messageid").unwrap(),
HeaderValue::from_static("123foobar456"),
);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert_eq!(key, "123foobar456".to_owned());

Ok(())
Expand Down
Loading

0 comments on commit e95063c

Please sign in to comment.