Skip to content

Commit

Permalink
proxy: Parse Notification twice only for unknown topic (#10296)
Browse files Browse the repository at this point in the history
## Problem

We currently parse Notification twice even in the happy path.

## Summary of changes

Use `#[serde(other)]` to catch unknown topics and defer the second
parsing.
  • Loading branch information
cloneable authored Jan 7, 2025
1 parent 4aa9786 commit 0a117fb
Showing 1 changed file with 21 additions and 34 deletions.
55 changes: 21 additions & 34 deletions proxy/src/redis/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ struct NotificationHeader<'a> {

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(tag = "topic", content = "data")]
// Message to contributors: Make sure to align these topic names with the list below.
pub(crate) enum Notification {
#[serde(
rename = "/allowed_ips_updated",
Expand Down Expand Up @@ -74,21 +73,9 @@ pub(crate) enum Notification {
PasswordUpdate { password_update: PasswordUpdate },
#[serde(rename = "/cancel_session")]
Cancel(CancelSession),
}

/// Returns true if the topic name given is a known topic that we can deserialize and action on.
/// Returns false otherwise.
fn known_topic(s: &str) -> bool {
// Message to contributors: Make sure to align these topic names with the enum above.
matches!(
s,
"/allowed_ips_updated"
| "/block_public_or_vpc_access_updated"
| "/allowed_vpc_endpoints_updated_for_org"
| "/allowed_vpc_endpoints_updated_for_projects"
| "/password_updated"
| "/cancel_session"
)
#[serde(other, skip_serializing)]
UnknownTopic,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
Expand Down Expand Up @@ -178,32 +165,29 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");

// For better error handling, we first parse the payload to extract the topic.
// If there's a topic we don't support, we can handle that error more gracefully.
let header: NotificationHeader = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!("broken message: {e}");
let msg: Notification = match serde_json::from_str(&payload) {
Ok(Notification::UnknownTopic) => {
match serde_json::from_str::<NotificationHeader>(&payload) {
// don't update the metric for redis errors if it's just a topic we don't know about.
Ok(header) => tracing::warn!(topic = header.topic, "unknown topic"),
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!("broken message: {e}");
}
};
return Ok(());
}
};

if !known_topic(header.topic) {
// don't update the metric for redis errors if it's just a topic we don't know about.
tracing::warn!(topic = header.topic, "unknown topic");
return Ok(());
}

let msg: Notification = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!(topic = header.topic, "broken message: {e}");
match serde_json::from_str::<NotificationHeader>(&payload) {
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
Err(_) => tracing::error!("broken message: {e}"),
};
return Ok(());
}
};
Expand Down Expand Up @@ -278,6 +262,8 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
invalidate_cache(cache, msg);
});
}

Notification::UnknownTopic => unreachable!(),
}

Ok(())
Expand All @@ -304,6 +290,7 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
// https://github.com/neondatabase/neon/pull/10073
}
Notification::UnknownTopic => unreachable!(),
}
}

Expand Down

1 comment on commit 0a117fb

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7377 tests run: 7014 passed, 1 failed, 362 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (3)

Postgres 17

Postgres 15

Code coverage* (full report)

  • functions: 31.2% (8411 of 26961 functions)
  • lines: 48.0% (66808 of 139225 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
0a117fb at 2025-01-07T17:38:06.914Z :recycle:

Please sign in to comment.