From 772b4811d6defdad91f977bf75c340f81e203b48 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 12 Dec 2024 11:57:10 -0500 Subject: [PATCH] add blocklist consumer --- src/config.rs | 4 +- src/main.rs | 4 +- src/network/indexer_blocklist.rs | 150 +++++++++++++++++++++++++++++-- 3 files changed, 147 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index f967eb89..d129dd50 100644 --- a/src/config.rs +++ b/src/config.rs @@ -110,8 +110,10 @@ pub enum BlocklistEntry { #[derive(Clone, Deserialize, Serialize)] pub struct BlocklistInfo { /// Example query (should be minimal to reproduce bad response) + #[serde(default, skip_serializing_if = "Option::is_none")] query: Option, /// Bad query response, from the above query executed on indexers with this blocked PoI + #[serde(default, skip_serializing_if = "Option::is_none")] bad_query_response: Option, } @@ -140,7 +142,7 @@ pub enum ExchangeRateProvider { /// Kafka configuration. /// /// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka). -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct KafkaConfig(BTreeMap); impl Default for KafkaConfig { diff --git a/src/main.rs b/src/main.rs index 06eac060..f6679748 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,8 @@ async fn main() { } None => Default::default(), }; - let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist); + let indexer_blocklist = + indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into()); let mut network = network::service::spawn( http_client.clone(), network_subgraph_client, @@ -127,7 +128,6 @@ async fn main() { conf.receipts.verifier, ))); - // Initialize the auth service let auth_service = init_auth_service(http_client.clone(), conf.api_keys, conf.payment_required).await; diff --git a/src/network/indexer_blocklist.rs b/src/network/indexer_blocklist.rs index 5f6dbdce..274cedb7 100644 --- a/src/network/indexer_blocklist.rs +++ b/src/network/indexer_blocklist.rs @@ -1,5 +1,15 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; +use anyhow::{anyhow, Context as _}; +use futures::StreamExt as _; +use rand::{thread_rng, RngCore as _}; +use rdkafka::{ + consumer::{Consumer as _, StreamConsumer}, + Message, TopicPartitionList, +}; use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing}; use tokio::sync::watch; @@ -13,7 +23,7 @@ pub struct Blocklist { } impl Blocklist { - pub fn spawn(init: Vec) -> Self { + pub fn spawn(init: Vec, kafka_config: rdkafka::ClientConfig) -> Self { let (blocklist_tx, blocklist_rx) = watch::channel(Default::default()); let (poi_tx, poi_rx) = watch::channel(Default::default()); let (indexer_tx, indexer_rx) = watch::channel(Default::default()); @@ -26,7 +36,7 @@ impl Blocklist { actor.add_entry(entry); } tokio::spawn(async move { - actor.run().await; + actor.run(kafka_config).await; }); Self { blocklist: blocklist_rx, @@ -37,14 +47,55 @@ impl Blocklist { } struct Actor { - pub blocklist: watch::Sender>, - pub poi: watch::Sender>>, - pub indexer: watch::Sender>>, + blocklist: watch::Sender>, + poi: watch::Sender>>, + indexer: watch::Sender>>, } impl Actor { - async fn run(&mut self) { - todo!(); + async fn run(&mut self, kafka_config: rdkafka::ClientConfig) { + let consumer = match create_consumer(kafka_config).await { + Ok(consumer) => consumer, + Err(blocklist_err) => { + tracing::error!(%blocklist_err); + return; + } + }; + + let mut records: HashMap = Default::default(); + let mut stream = consumer.stream(); + while let Some(msg) = stream.next().await { + let msg = match msg { + Ok(msg) => msg, + Err(blocklist_recv_error) => { + tracing::error!(%blocklist_recv_error); + continue; + } + }; + let key = match msg.key_view::() { + Some(Ok(key)) => key, + result => { + tracing::error!("invalid key: {result:?}"); + continue; + } + }; + match msg.payload().map(serde_json::from_slice::) { + Some(Ok(entry)) => { + records.insert(key.to_string(), entry.clone()); + self.add_entry(entry); + } + None => { + let entry = records.remove(key); + if let Some(entry) = entry { + self.remove_entry(&entry); + } + } + Some(Err(blocklist_deserialize_err)) => { + tracing::error!(%blocklist_deserialize_err); + } + }; + } + tracing::error!("blocklist consumer stopped"); } fn add_entry(&mut self, entry: BlocklistEntry) { @@ -75,4 +126,87 @@ impl Actor { self.blocklist .send_modify(move |blocklist| blocklist.push(entry)); } + + fn remove_entry(&mut self, entry: &BlocklistEntry) { + match entry { + BlocklistEntry::Poi { + deployment, + block, + public_poi, + .. + } => { + self.poi.send_modify(|blocklist| { + if let Some(entry) = blocklist.get_mut(deployment) { + entry.retain(|value| &(*block, (*public_poi).into()) != value); + } + }); + } + BlocklistEntry::Other { + deployment, + indexer, + .. + } => { + self.indexer.send_modify(|blocklist| { + if let Some(entry) = blocklist.get_mut(indexer) { + entry.remove(deployment); + } + }); + } + }; + fn matching(a: &BlocklistEntry, b: &BlocklistEntry) -> bool { + match (a, b) { + ( + BlocklistEntry::Poi { + deployment, + public_poi, + block, + info: _, + }, + BlocklistEntry::Poi { + deployment: deployment_, + public_poi: public_poi_, + block: block_, + info: _, + }, + ) => { + (deployment == deployment_) && (public_poi == public_poi_) && (block == block_) + } + ( + BlocklistEntry::Other { + indexer, + deployment, + info: _, + }, + BlocklistEntry::Other { + indexer: indexer_, + deployment: deployment_, + info: _, + }, + ) => (indexer == indexer_) && (deployment == deployment_), + _ => false, + } + } + self.blocklist + .send_modify(|blocklist| blocklist.retain(|value| !matching(entry, value))); + } +} + +async fn create_consumer( + mut kafka_config: rdkafka::ClientConfig, +) -> anyhow::Result { + let topic = "gateway_blocklist"; + let group_id = format!("gateway-{:x}", thread_rng().next_u64()); + let consumer: StreamConsumer = kafka_config.set("group.id", group_id).create()?; + let metadata = consumer + .fetch_metadata(Some(topic), Duration::from_secs(30)) + .with_context(|| anyhow!("fetch {topic} metadata"))?; + anyhow::ensure!(!metadata.topics().is_empty()); + let topic_info = &metadata.topics()[0]; + let mut assignment = TopicPartitionList::new(); + for partition in topic_info.partitions() { + assignment.add_partition_offset(topic, partition.id(), rdkafka::Offset::Beginning)?; + } + tracing::debug!(?assignment); + consumer.assign(&assignment)?; + Ok(consumer) }