From 3e7960de5c7f135ae1b18d9676bbcda9e9f4c6d9 Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 7 May 2024 15:59:10 -0400 Subject: [PATCH] assert topics were included in application layer --- Cargo.lock | 57 +++++++++++++++++++++++++ cdn-broker/src/tasks/broker/handler.rs | 25 ++++++----- cdn-broker/src/tasks/user/handler.rs | 27 +++++++++--- cdn-proto/Cargo.toml | 1 + cdn-proto/src/def.rs | 31 +++++++++++++- tests/src/tests/subscribe.rs | 58 +++++++++++++++++++++++++- 6 files changed, 181 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99db97f..c662713 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -870,6 +870,7 @@ dependencies = [ "kanal", "lazy_static", "mnemonic", + "num_enum", "pem", "portpicker", "pprof", @@ -2430,6 +2431,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02339744ee7253741199f897151b38e72257d13802d4ee837285cc2990a90845" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "object" version = "0.32.2" @@ -2705,6 +2727,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.81" @@ -3921,6 +3952,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml_datetime" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" + +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap 2.2.6", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.10.2" @@ -4484,6 +4532,15 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/cdn-broker/src/tasks/broker/handler.rs b/cdn-broker/src/tasks/broker/handler.rs index c2c9edb..061824a 100644 --- a/cdn-broker/src/tasks/broker/handler.rs +++ b/cdn-broker/src/tasks/broker/handler.rs @@ -5,7 +5,7 @@ use std::{sync::Arc, time::Duration}; use cdn_proto::{ authenticate_with_broker, bail, connection::{auth::broker::BrokerAuth, protocols::Connection as _, Bytes, UserPublicKey}, - def::{Connection, RunDef}, + def::{Connection, RunDef, Topic as _}, discovery::BrokerIdentifier, error::{Error, Result}, message::{Message, Topic}, @@ -131,20 +131,28 @@ impl Inner { // If we receive a broadcast message from a broker, we want to send it to all interested users Message::Broadcast(ref broadcast) => { - let topics = broadcast.topics.clone(); + // Get and prune the topics + let mut topics = broadcast.topics.clone(); + Def::Topic::prune(&mut topics)?; - self.handle_broadcast_message(topics, &raw_message, true); + self.handle_broadcast_message(&topics, &raw_message, true); } // If we receive a subscribe message from a broker, we add them as "interested" locally. - Message::Subscribe(subscribe) => { + Message::Subscribe(mut subscribe) => { + // Prune the topics + Def::Topic::prune(&mut subscribe)?; + self.connections .write() .subscribe_broker_to(broker_identifier, subscribe); } // If we receive a subscribe message from a broker, we remove them as "interested" locally. - Message::Unsubscribe(unsubscribe) => { + Message::Unsubscribe(mut unsubscribe) => { + // Prune the topics + Def::Topic::prune(&mut unsubscribe)?; + self.connections .write() .unsubscribe_broker_from(broker_identifier, &unsubscribe); @@ -212,18 +220,15 @@ impl Inner { /// This function handles broadcast messages from users and brokers. pub fn handle_broadcast_message( self: &Arc, - mut topics: Vec, + topics: &[Topic], message: &Bytes, to_users_only: bool, ) { - // Deduplicate topics - topics.dedup(); - // Get the list of actors interested in the topics let (interested_brokers, interested_users) = self .connections .read() - .get_interested_by_topic(&topics, to_users_only); + .get_interested_by_topic(&topics.to_vec(), to_users_only); // Debug log the broadcast debug!( diff --git a/cdn-broker/src/tasks/user/handler.rs b/cdn-broker/src/tasks/user/handler.rs index 844e48a..06e85fb 100644 --- a/cdn-broker/src/tasks/user/handler.rs +++ b/cdn-broker/src/tasks/user/handler.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::Duration; use cdn_proto::connection::{protocols::Connection as _, UserPublicKey}; -use cdn_proto::def::{Connection, RunDef}; +use cdn_proto::def::{Connection, RunDef, Topic as _}; use cdn_proto::error::{Error, Result}; use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message, mnemonic}; use tokio::spawn; @@ -18,7 +18,8 @@ impl Inner { /// This function handles a user (public) connection. pub async fn handle_user_connection(self: Arc, connection: Connection) { // Verify (authenticate) the connection. Needs to happen within 5 seconds - let Ok(Ok((public_key, topics))) = timeout( + // TODO: make this stateless (e.g. separate subscribe message on connect) + let Ok(Ok((public_key, mut topics))) = timeout( Duration::from_secs(5), BrokerAuth::::verify_user( &connection, @@ -32,6 +33,12 @@ impl Inner { return; }; + // Prune the supplied topics + // + // We don't care about the error because we want to allow users + // to connect without subscribing to any topics. + let _ = Def::Topic::prune(&mut topics); + // Create a human-readable user identifier (by public key) let public_key = UserPublicKey::from(public_key); let user_identifier = mnemonic(&public_key); @@ -100,13 +107,18 @@ impl Inner { // If we get a broadcast message from a user, send it to both brokers and users. Message::Broadcast(ref broadcast) => { - let topics = broadcast.topics.clone(); + // Get and prune the topics + let mut topics = broadcast.topics.clone(); + Def::Topic::prune(&mut topics)?; - self.handle_broadcast_message(topics, &raw_message, false); + self.handle_broadcast_message(&topics, &raw_message, false); } // Subscribe messages from users will just update the state locally - Message::Subscribe(subscribe) => { + Message::Subscribe(mut subscribe) => { + // Prune the topics + Def::Topic::prune(&mut subscribe)?; + // TODO: add handle functions for this to make it easier to read self.connections .write() @@ -114,7 +126,10 @@ impl Inner { } // Unsubscribe messages from users will just update the state locally - Message::Unsubscribe(unsubscribe) => { + Message::Unsubscribe(mut unsubscribe) => { + // Prune the topics + Def::Topic::prune(&mut unsubscribe)?; + self.connections .write() .unsubscribe_user_from(public_key, &unsubscribe); diff --git a/cdn-proto/Cargo.toml b/cdn-proto/Cargo.toml index 706523d..300e569 100644 --- a/cdn-proto/Cargo.toml +++ b/cdn-proto/Cargo.toml @@ -64,3 +64,4 @@ rkyv.workspace = true mnemonic = "1" rcgen.workspace = true derivative.workspace = true +num_enum = "0.7" \ No newline at end of file diff --git a/cdn-proto/src/def.rs b/cdn-proto/src/def.rs index 2f42997..2cddd3c 100644 --- a/cdn-proto/src/def.rs +++ b/cdn-proto/src/def.rs @@ -1,6 +1,7 @@ //! Compile-time run configuration for all CDN components. use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS; +use num_enum::{IntoPrimitive, TryFromPrimitive}; use crate::connection::middleware::{ Middleware as MiddlewareType, NoMiddleware, TrustedMiddleware, UntrustedMiddleware, @@ -10,19 +11,45 @@ use crate::connection::protocols::{quic::Quic, tcp::Tcp, Protocol as ProtocolTyp use crate::crypto::signature::SignatureScheme; use crate::discovery::embedded::Embedded; use crate::discovery::{redis::Redis, DiscoveryClient}; +use crate::error::{Error, Result}; -/// The test topics for the CDN. +/// An implementation of `Topic` for testing purposes. #[repr(u8)] +#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)] pub enum TestTopic { Global = 0, DA = 1, } +/// Defines the topic type for CDN messages +pub trait Topic: Into + TryFrom + Clone + Send + Sync { + /// Prunes the topics to only include valid topics. + /// + /// # Errors + /// - If no valid topics are supplied + fn prune(topics: &mut Vec) -> Result<()> { + // Deduplicate the topics + topics.dedup(); + + // Retain only the topics that can be converted to the desired type + topics.retain(|topic| Self::try_from(*topic).is_ok()); + + // Make sure we have at least one topic + if topics.is_empty() { + Err(Error::Parse("supplied no valid topics".to_string())) + } else { + Ok(()) + } + } +} +impl Topic for TestTopic {} + /// This trait defines the run configuration for all CDN components. pub trait RunDef: 'static { type Broker: ConnectionDef; type User: ConnectionDef; type DiscoveryClientType: DiscoveryClient; + type Topic: Topic; } /// This trait defines the connection configuration for a single CDN component. @@ -39,6 +66,7 @@ impl RunDef for ProductionRunDef { type Broker = ProductionBrokerConnection; type User = ProductionUserConnection; type DiscoveryClientType = Redis; + type Topic = TestTopic; } /// The production broker connection configuration. @@ -77,6 +105,7 @@ impl RunDef for TestingRunDef { type Broker = TestingConnection; type User = TestingConnection; type DiscoveryClientType = Embedded; + type Topic = TestTopic; } /// The testing connection configuration. diff --git a/tests/src/tests/subscribe.rs b/tests/src/tests/subscribe.rs index f7ce571..1ca6566 100644 --- a/tests/src/tests/subscribe.rs +++ b/tests/src/tests/subscribe.rs @@ -4,7 +4,7 @@ use cdn_proto::{ def::TestTopic, message::{Broadcast, Message}, }; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use super::*; @@ -107,3 +107,59 @@ async fn test_subscribe() { .await .is_err()); } + +/// Test that subscribing to an invalid topic kills the connection. +#[tokio::test] +async fn test_invalid_subscribe() { + // Get a temporary path for the discovery endpoint + let discovery_endpoint = get_temp_db_path(); + + // Create and start a new broker + new_broker(0, "8098", "8099", &discovery_endpoint).await; + + // Create and start a new marshal + new_marshal("8100", &discovery_endpoint).await; + + // Create and get the handle to a new client subscribed to an invalid topic + let client = new_client(0, vec![99], "8100"); + + // Ensure the connection is open + let Ok(()) = timeout(Duration::from_secs(1), client.ensure_initialized()).await else { + panic!("client failed to connect"); + }; + + // Subscribe to an invalid topic + let _ = client.subscribe(vec![99]).await; + + // Sleep for a bit to allow the client to disconnect + sleep(Duration::from_millis(50)).await; + + // Assert we can't send a message (as we are disconnected) + assert!( + client + .send_broadcast_message(vec![1], b"hello invalid".to_vec()) + .await + .is_err(), + "sent message but should've been disconnected" + ); + + // Reinitialize the connection + let Ok(()) = timeout(Duration::from_secs(4), client.ensure_initialized()).await else { + panic!("client failed to connect"); + }; + + // Unsubscribe from the invalid topic + let _ = client.unsubscribe(vec![99]).await; + + // Sleep for a bit to allow the client to disconnect + sleep(Duration::from_millis(50)).await; + + // Assert we can't send a message (as we are disconnected) + assert!( + client + .send_broadcast_message(vec![1], b"hello invalid".to_vec()) + .await + .is_err(), + "sent message but should've been disconnected" + ); +}