Skip to content

Commit

Permalink
feat: remove notification record struct (#827)
Browse files Browse the repository at this point in the history
feat: remove notification record struct
  • Loading branch information
taddes authored Jan 29, 2025
1 parent b5224c5 commit 2aaf9f2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 172 deletions.
5 changes: 3 additions & 2 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ use protobuf::RepeatedField;
use serde_json::{from_str, json};
use uuid::Uuid;

use crate::db::RangeKey;
use crate::db::{
client::{DbClient, FetchMessageResponse},
error::{DbError, DbResult},
DbSettings, Notification, NotificationRecord, User, MAX_ROUTER_TTL, USER_RECORD_VERSION,
DbSettings, Notification, User, MAX_ROUTER_TTL, USER_RECORD_VERSION,
};

pub use self::metadata::MetadataBuilder;
Expand Down Expand Up @@ -733,7 +734,7 @@ impl BigTableClientImpl {
None,
));
};
let range_key = NotificationRecord::parse_chidmessageid(chidmessageid).map_err(|e| {
let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
DbError::Integrity(
format!("rows_to_notification expected chidmessageid: {e}"),
None,
Expand Down
166 changes: 3 additions & 163 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
/// functions. Each of the data stores are VERY
/// different, although the requested functions
/// are fairly simple.
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::result::Result as StdResult;

use derive_builder::Builder;
use lazy_static::lazy_static;
use regex::RegexSet;
use serde::Serializer;
use serde_derive::{Deserialize, Serialize};
use uuid::Uuid;
Expand All @@ -32,11 +29,10 @@ pub mod mock;

pub use reporter::spawn_pool_periodic_reporter;

use crate::errors::{ApcErrorKind, Result};
use crate::notification::{Notification, STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX};
use crate::util::timing::{ms_since_epoch, sec_since_epoch};
use crate::notification::Notification;
use crate::util::timing::ms_since_epoch;
use crate::{MAX_NOTIFICATION_TTL, MAX_ROUTER_TTL};
use models::{NotificationHeaders, RangeKey};
use models::RangeKey;

pub const USER_RECORD_VERSION: u64 = 1;

Expand Down Expand Up @@ -203,162 +199,6 @@ impl User {
}
}

/// A stored Notification record. This is a notification that is to be stored
/// until the User Agent reconnects. These are then converted to publishable
/// [crate::db::Notification] records.
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub struct NotificationRecord {
/// The UserAgent Identifier (UAID)
#[serde(serialize_with = "uuid_serializer")]
uaid: Uuid,
// Format:
// Topic Messages:
// {TOPIC_NOTIFICATION_PREFIX}:{channel id}:{topic}
// New Messages:
// {STANDARD_NOTIFICATION_PREFIX}:{timestamp int in microseconds}:{channel id}
chidmessageid: String,
/// Magic entry stored in the first Message record that indicates the highest
/// non-topic timestamp we've read into
#[serde(skip_serializing_if = "Option::is_none")]
pub current_timestamp: Option<u64>,
/// Magic entry stored in the first Message record that indicates the valid
/// channel id's
#[serde(skip_serializing)]
pub chids: Option<HashSet<String>>,
/// Time in seconds from epoch
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<u64>,
/// Expiration timestamp
expiry: u64,
/// TTL value provided by application server for the message
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<u64>,
/// The message data
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<String>,
/// Selected, associated message headers. These can contain additional
/// decryption information for the UserAgent.
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<NotificationHeaders>,
/// This is the acknowledgement-id used for clients to ack that they have received the
/// message. Autoendpoint refers to this as a message_id. Endpoints generate this
/// value before sending it to storage or a connection node.
#[serde(skip_serializing_if = "Option::is_none")]
updateid: Option<String>,
/// Internal Push Reliability tracking id. (Applied only to subscription updates generated
/// by Mozilla owned and consumed messages, like SendTab updates.)
#[serde(skip_serializing_if = "Option::is_none")]
reliability_id: Option<String>,
}

impl NotificationRecord {
/// read the custom sort_key and convert it into something the database can use.
pub(crate) fn parse_chidmessageid(key: &str) -> Result<RangeKey> {
lazy_static! {
static ref RE: RegexSet = RegexSet::new([
format!("^{}:\\S+:\\S+$", TOPIC_NOTIFICATION_PREFIX).as_str(),
format!("^{}:\\d+:\\S+$", STANDARD_NOTIFICATION_PREFIX).as_str(),
"^\\S{3,}:\\S+$"
])
.unwrap();
}
if !RE.is_match(key) {
return Err(ApcErrorKind::GeneralError("Invalid chidmessageid".into()).into());
}

let v: Vec<&str> = key.split(':').collect();
match v[0] {
// This is a topic message (There Can Only Be One. <guitar riff>)
"01" => {
if v.len() != 3 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (channel_id, topic) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: Some(topic.to_string()),
sortkey_timestamp: None,
legacy_version: None,
})
}
// A "normal" pending message.
"02" => {
if v.len() != 3 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (sortkey, channel_id) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: Some(sortkey.parse()?),
legacy_version: None,
})
}
// Ok, that's odd, but try to make some sense of it.
// (This is a bit of legacy code that we should be
// able to drop.)
_ => {
if v.len() != 2 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (channel_id, legacy_version) = (v[0], v[1]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: None,
legacy_version: Some(legacy_version.to_string()),
})
}
}
}

/// Convert the stored notifications into publishable notifications
pub fn into_notif(self) -> Result<Notification> {
let key = Self::parse_chidmessageid(&self.chidmessageid)?;
let version = key
.legacy_version
.or(self.updateid)
.ok_or(ApcErrorKind::GeneralError(
"No valid updateid/version found".into(),
))?;

Ok(Notification {
channel_id: key.channel_id,
version,
ttl: self.ttl.unwrap_or(0),
timestamp: self
.timestamp
.ok_or("No timestamp found")
.map_err(|e| ApcErrorKind::GeneralError(e.to_string()))?,
topic: key.topic,
data: self.data,
headers: self.headers.map(|m| m.into()),
sortkey_timestamp: key.sortkey_timestamp,
reliability_id: None,
#[cfg(feature = "reliable_report")]
reliable_state: None,
})
}

/// Convert from a publishable Notification to a stored notification
pub fn from_notif(uaid: &Uuid, val: Notification) -> Self {
Self {
uaid: *uaid,
chidmessageid: val.chidmessageid(),
timestamp: Some(val.timestamp),
expiry: sec_since_epoch() + min(val.ttl, MAX_NOTIFICATION_TTL),
ttl: Some(val.ttl),
data: val.data,
headers: val.headers.map(|h| h.into()),
updateid: Some(val.version),
..Default::default()
}
}
}

#[cfg(test)]
mod tests {
use super::{User, USER_RECORD_VERSION};
Expand Down
78 changes: 74 additions & 4 deletions autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use lazy_static::lazy_static;
use regex::RegexSet;
use std::collections::HashMap;

use crate::errors::{ApcErrorKind, Result};
use crate::notification::{STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX};
use serde_derive::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -54,20 +58,86 @@ pub(crate) struct RangeKey {
/// The encoded sortkey and timestamp
pub(crate) sortkey_timestamp: Option<u64>,
/// Which version of this message are we handling
#[allow(unused)]
pub(crate) legacy_version: Option<String>,
}

impl RangeKey {
/// read the custom sort_key and convert it into something the database can use.
pub(crate) fn parse_chidmessageid(key: &str) -> Result<RangeKey> {
lazy_static! {
static ref RE: RegexSet = RegexSet::new([
format!("^{}:\\S+:\\S+$", TOPIC_NOTIFICATION_PREFIX).as_str(),
format!("^{}:\\d+:\\S+$", STANDARD_NOTIFICATION_PREFIX).as_str(),
"^\\S{3,}:\\S+$"
])
.unwrap();
}
if !RE.is_match(key) {
return Err(ApcErrorKind::GeneralError("Invalid chidmessageid".into()).into());
}

let v: Vec<&str> = key.split(':').collect();
match v[0] {
// This is a topic message (There Can Only Be One. <guitar riff>)
"01" => {
if v.len() != 3 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (channel_id, topic) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: Some(topic.to_string()),
sortkey_timestamp: None,
legacy_version: None,
})
}
// A "normal" pending message.
"02" => {
if v.len() != 3 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (sortkey, channel_id) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: Some(sortkey.parse()?),
legacy_version: None,
})
}
// Ok, that's odd, but try to make some sense of it.
// (This is a bit of legacy code that we should be
// able to drop.)
_ => {
if v.len() != 2 {
return Err(ApcErrorKind::GeneralError("Invalid topic key".into()).into());
}
let (channel_id, legacy_version) = (v[0], v[1]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: None,
legacy_version: Some(legacy_version.to_string()),
})
}
}
}
}

#[cfg(test)]
mod tests {
use crate::db::NotificationRecord;
use crate::db::RangeKey;
use crate::util::us_since_epoch;
use uuid::Uuid;

#[test]
fn test_parse_sort_key_ver1() {
let chid = Uuid::new_v4();
let chidmessageid = format!("01:{}:mytopic", chid.hyphenated());
let key = NotificationRecord::parse_chidmessageid(&chidmessageid).unwrap();
let key = RangeKey::parse_chidmessageid(&chidmessageid).unwrap();
assert_eq!(key.topic, Some("mytopic".to_string()));
assert_eq!(key.channel_id, chid);
assert_eq!(key.sortkey_timestamp, None);
Expand All @@ -78,7 +148,7 @@ mod tests {
let chid = Uuid::new_v4();
let sortkey_timestamp = us_since_epoch();
let chidmessageid = format!("02:{}:{}", sortkey_timestamp, chid.hyphenated());
let key = NotificationRecord::parse_chidmessageid(&chidmessageid).unwrap();
let key = RangeKey::parse_chidmessageid(&chidmessageid).unwrap();
assert_eq!(key.topic, None);
assert_eq!(key.channel_id, chid);
assert_eq!(key.sortkey_timestamp, Some(sortkey_timestamp));
Expand All @@ -87,7 +157,7 @@ mod tests {
#[test]
fn test_parse_sort_key_bad_values() {
for val in &["02j3i2o", "03:ffas:wef", "01::mytopic", "02:oops:ohnoes"] {
let key = NotificationRecord::parse_chidmessageid(val);
let key = RangeKey::parse_chidmessageid(val);
assert!(key.is_err());
}
}
Expand Down
4 changes: 1 addition & 3 deletions autopush-common/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use crate::util::ms_since_epoch;

#[derive(Serialize, Default, Deserialize, Clone, Debug)]
/// A Publishable Notification record. This is a notification that is either
/// received from a third party or is outbound to a UserAgent. If the
/// UserAgent is not currently available, it may be stored as a
/// [crate::db::NotificationRecord]
/// received from a third party or is outbound to a UserAgent.
pub struct Notification {
#[serde(rename = "channelID")]
pub channel_id: Uuid,
Expand Down

0 comments on commit 2aaf9f2

Please sign in to comment.