diff --git a/src/config.rs b/src/config.rs index f967eb89..a7aefef3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -140,7 +140,7 @@ pub enum ExchangeRateProvider { /// Kafka configuration. /// /// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka). -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct KafkaConfig(BTreeMap); impl Default for KafkaConfig { diff --git a/src/main.rs b/src/main.rs index 06eac060..640afc6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,8 @@ async fn main() { } None => Default::default(), }; - let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist); + let indexer_blocklist = + indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into()); let mut network = network::service::spawn( http_client.clone(), network_subgraph_client, diff --git a/src/network/indexer_blocklist.rs b/src/network/indexer_blocklist.rs index 5f6dbdce..ad1b8be9 100644 --- a/src/network/indexer_blocklist.rs +++ b/src/network/indexer_blocklist.rs @@ -1,5 +1,7 @@ use std::collections::{HashMap, HashSet}; +use futures::StreamExt; +use rdkafka::Message; use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing}; use tokio::sync::watch; @@ -13,7 +15,7 @@ pub struct Blocklist { } impl Blocklist { - pub fn spawn(init: Vec) -> Self { + pub fn spawn(init: Vec, kafka_config: rdkafka::ClientConfig) -> Self { let (blocklist_tx, blocklist_rx) = watch::channel(Default::default()); let (poi_tx, poi_rx) = watch::channel(Default::default()); let (indexer_tx, indexer_rx) = watch::channel(Default::default()); @@ -26,7 +28,7 @@ impl Blocklist { actor.add_entry(entry); } tokio::spawn(async move { - actor.run().await; + actor.run(kafka_config.create().unwrap()).await; }); Self { blocklist: blocklist_rx, @@ -37,14 +39,33 @@ impl Blocklist { } struct Actor { - pub blocklist: watch::Sender>, - pub poi: watch::Sender>>, - pub indexer: watch::Sender>>, + blocklist: watch::Sender>, + poi: watch::Sender>>, + indexer: watch::Sender>>, } impl Actor { - async fn run(&mut self) { - todo!(); + async fn run(&mut self, consumer: rdkafka::consumer::StreamConsumer) { + let mut stream = consumer.stream(); + while let Some(msg) = stream.next().await { + let msg = match msg { + Ok(msg) => msg, + Err(blocklist_recv_error) => { + tracing::error!(%blocklist_recv_error); + continue; + } + }; + let entry: BlocklistEntry = match msg.payload().map(serde_json::from_slice) { + Some(Ok(entry)) => entry, + Some(Err(blocklist_deserialize_err)) => { + tracing::error!(%blocklist_deserialize_err); + continue; + } + None => continue, + }; + self.add_entry(entry); + } + tracing::error!("blocklist consumer stopped"); } fn add_entry(&mut self, entry: BlocklistEntry) {