forked from marktoda/uniswapx-artemis
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathblock_collector.rs
60 lines (52 loc) · 1.82 KB
/
block_collector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use alloy::{
network::AnyNetwork,
primitives::{BlockHash, BlockNumber, BlockTimestamp},
providers::{DynProvider, Provider},
};
use anyhow::Result;
use artemis_core::types::{Collector, CollectorStream};
use async_trait::async_trait;
use std::sync::Arc;
use tokio_stream::StreamExt;
use tracing::{error, info};
/// A collector that listens for new blocks, and generates a stream of
/// [events](NewBlock) which contain the block number and hash.
pub struct BlockCollector {
provider: Arc<DynProvider<AnyNetwork>>,
}
/// A new block event, containing the block number and hash.
#[derive(Debug, Clone)]
pub struct NewBlock {
pub hash: BlockHash,
pub number: BlockNumber,
pub timestamp: BlockTimestamp,
}
impl BlockCollector {
pub fn new(provider: Arc<DynProvider<AnyNetwork>>) -> Self {
Self { provider }
}
}
/// Implementation of the [Collector](Collector) trait for the [BlockCollector](BlockCollector).
/// This implementation uses polling to subscribe to new blocks.
#[async_trait]
impl Collector<NewBlock> for BlockCollector {
async fn get_event_stream(&self) -> Result<CollectorStream<'_, NewBlock>> {
// Initial block number to start tracking from
let start_block = match self.provider.get_block_number().await {
Ok(num) => num,
Err(e) => {
error!("Failed to get initial block number: {}", e);
return Err(e.into());
}
};
info!("Starting BlockCollector from block number: {}", start_block);
let provider = self.provider.clone();
let sub = provider.subscribe_blocks().await?;
let stream = sub.into_stream().map(|header| NewBlock {
hash: header.hash,
number: header.number,
timestamp: header.timestamp,
});
Ok(Box::pin(stream))
}
}