From 0a117fb1f1a9c2a062c8a60d53f2eb00637392e8 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Tue, 7 Jan 2025 16:24:54 +0100 Subject: [PATCH] proxy: Parse Notification twice only for unknown topic (#10296) ## Problem We currently parse Notification twice even in the happy path. ## Summary of changes Use `#[serde(other)]` to catch unknown topics and defer the second parsing. --- proxy/src/redis/notifications.rs | 55 ++++++++++++-------------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 4383d6be2c71..bf9d61ded3cf 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -37,7 +37,6 @@ struct NotificationHeader<'a> { #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(tag = "topic", content = "data")] -// Message to contributors: Make sure to align these topic names with the list below. pub(crate) enum Notification { #[serde( rename = "/allowed_ips_updated", @@ -74,21 +73,9 @@ pub(crate) enum Notification { PasswordUpdate { password_update: PasswordUpdate }, #[serde(rename = "/cancel_session")] Cancel(CancelSession), -} -/// Returns true if the topic name given is a known topic that we can deserialize and action on. -/// Returns false otherwise. -fn known_topic(s: &str) -> bool { - // Message to contributors: Make sure to align these topic names with the enum above. - matches!( - s, - "/allowed_ips_updated" - | "/block_public_or_vpc_access_updated" - | "/allowed_vpc_endpoints_updated_for_org" - | "/allowed_vpc_endpoints_updated_for_projects" - | "/password_updated" - | "/cancel_session" - ) + #[serde(other, skip_serializing)] + UnknownTopic, } #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] @@ -178,32 +165,29 @@ impl MessageHandler { let payload: String = msg.get_payload()?; tracing::debug!(?payload, "received a message payload"); - // For better error handling, we first parse the payload to extract the topic. - // If there's a topic we don't support, we can handle that error more gracefully. - let header: NotificationHeader = match serde_json::from_str(&payload) { - Ok(msg) => msg, - Err(e) => { - Metrics::get().proxy.redis_errors_total.inc(RedisErrors { - channel: msg.get_channel_name(), - }); - tracing::error!("broken message: {e}"); + let msg: Notification = match serde_json::from_str(&payload) { + Ok(Notification::UnknownTopic) => { + match serde_json::from_str::(&payload) { + // don't update the metric for redis errors if it's just a topic we don't know about. + Ok(header) => tracing::warn!(topic = header.topic, "unknown topic"), + Err(e) => { + Metrics::get().proxy.redis_errors_total.inc(RedisErrors { + channel: msg.get_channel_name(), + }); + tracing::error!("broken message: {e}"); + } + }; return Ok(()); } - }; - - if !known_topic(header.topic) { - // don't update the metric for redis errors if it's just a topic we don't know about. - tracing::warn!(topic = header.topic, "unknown topic"); - return Ok(()); - } - - let msg: Notification = match serde_json::from_str(&payload) { Ok(msg) => msg, Err(e) => { Metrics::get().proxy.redis_errors_total.inc(RedisErrors { channel: msg.get_channel_name(), }); - tracing::error!(topic = header.topic, "broken message: {e}"); + match serde_json::from_str::(&payload) { + Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"), + Err(_) => tracing::error!("broken message: {e}"), + }; return Ok(()); } }; @@ -278,6 +262,8 @@ impl MessageHandler { invalidate_cache(cache, msg); }); } + + Notification::UnknownTopic => unreachable!(), } Ok(()) @@ -304,6 +290,7 @@ fn invalidate_cache(cache: Arc, msg: Notification) { Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => { // https://github.com/neondatabase/neon/pull/10073 } + Notification::UnknownTopic => unreachable!(), } }