From 97713544814e8eae37678826aece41dc9cc4a4d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=AE=80=E8=88=9F?= Date: Tue, 19 Mar 2024 11:21:00 +0800 Subject: [PATCH] scan single block. (#1) --- indexer/Cargo.toml | 1 - indexer/src/error.rs | 10 ++---- scanner/src/main.rs | 11 ++++--- scanner/src/scanner.rs | 74 +++++++++++++++++++++++++----------------- 4 files changed, 53 insertions(+), 43 deletions(-) diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index f708cd0..41d4834 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -12,7 +12,6 @@ axum-macros = "0.4.1" env_logger = "0.11.2" ethers = { version = "2.0", features = ["abigen", "legacy"] } log = "0.4.20" -redis = { version = "0.25.0", features = ["tokio-comp","tokio-rustls-comp"] } rustc-hex = "2.1.0" serde = "1.0.197" serde_json = "1.0.114" diff --git a/indexer/src/error.rs b/indexer/src/error.rs index 603e48a..b31843b 100644 --- a/indexer/src/error.rs +++ b/indexer/src/error.rs @@ -5,17 +5,18 @@ use axum::response::{IntoResponse, Response}; pub enum IndexerError { Custom(String), DBError(sqlx::Error), - RdsError(redis::RedisError), IOError(std::io::Error), TomlDeError(toml::de::Error), HexError(rustc_hex::FromHexError), ParseUrlError(url::ParseError), } + impl From for IndexerError { fn from(e: String) -> Self { IndexerError::Custom(e) } } + impl From for IndexerError { fn from(e: url::ParseError) -> Self { IndexerError::ParseUrlError(e) @@ -46,12 +47,6 @@ impl From for IndexerError { } } -impl From for IndexerError { - fn from(e: redis::RedisError) -> Self { - IndexerError::RdsError(e) - } -} - pub type Result = core::result::Result; impl IntoResponse for IndexerError { @@ -59,7 +54,6 @@ impl IntoResponse for IndexerError { let err_msg = match self { IndexerError::Custom(e) => e, IndexerError::DBError(e) => e.to_string(), - IndexerError::RdsError(e) => e.to_string(), IndexerError::IOError(e) => e.to_string(), IndexerError::TomlDeError(e) => e.to_string(), IndexerError::HexError(e) => e.to_string(), diff --git a/scanner/src/main.rs b/scanner/src/main.rs index 0d5f91a..af098a8 100644 --- a/scanner/src/main.rs +++ b/scanner/src/main.rs @@ -22,13 +22,16 @@ const DEFAULT_RPC_RETRIES: usize = 3; #[derive(Parser, Debug)] struct Args { /// Node RPC - #[arg(short, long)] + #[arg(long)] pub node: String, + /// Pull single block + #[arg(long)] + pub single: bool, /// Block height to start scanning - #[arg(short, long)] + #[arg(long)] pub start: Option, /// Interval of scanning in seconds - #[arg(short, long)] + #[arg(long)] pub interval: Option, } @@ -65,7 +68,7 @@ async fn main() -> Result<()> { info!("Starting syncing..."); let scanner = Scanner::new(DEFAULT_RPC_RETRIES, num_cpus::get(), rpc_url, storage) .expect("failed to new scanner"); - let _ = scanner.run(start, interval).await; + let _ = scanner.run(start, interval, args.single).await; Ok(()) } diff --git a/scanner/src/scanner.rs b/scanner/src/scanner.rs index e7c5034..ef5abd2 100644 --- a/scanner/src/scanner.rs +++ b/scanner/src/scanner.rs @@ -102,7 +102,6 @@ impl RpcCaller { while retries < self.retries { match self.rpc.get_block_by_height(height).await { Ok(block) => { - //info!("block_hash: {}, height: {}", block.block_id.hash, height); self.process_block(block).await?; return Ok(()); } @@ -655,6 +654,14 @@ impl Scanner { }) } + pub async fn single_scan(&self, height: u64) -> Result<()> { + info!("Syncing block: {}", height); + self.caller.get_block_retried(height).await?; + self.caller.storage.upsert_tip(height as i64).await?; + info!("Syncing block: {} complete", height); + Ok(()) + } + pub async fn range_scan(&self, start: u64, end: u64) -> Result { info!("Syncing [{},{}) ...", start, end); let concurrency = self.caller.threads; //how many spawned. @@ -701,39 +708,46 @@ impl Scanner { Ok(succeed_cnt.load(Ordering::Acquire)) } - pub async fn run(&self, start: u64, interval: Duration) -> Result<()> { - let batch = (4 * self.caller.threads) as u64; - let mut height = start; - - info!("Fast syncing..."); - loop { - let cnt = self.range_scan(height, height + batch).await?; - if cnt == batch { - height += batch; - } else { - break; - } - } - info!("Fast syncing complete."); - loop { - if let Ok(h) = self.caller.storage.get_tip().await { - height = h as u64 + 1; + pub async fn run(&self, start: u64, interval: Duration, single: bool) -> Result<()> { + match single { + true => { + info!("Single syncing..."); + self.single_scan(start).await } - - match self.caller.get_block_retried(height).await { - Ok(_) => { - info!("Get block {} succeed", height); - self.caller.storage.upsert_tip(height as i64).await?; - } - Err(ScannerError::BlockNotFound(height)) => { - error!("Block {} not found", height) + false => { + let mut height = start; + let batch = (4 * self.caller.threads) as u64; + info!("Fast syncing..."); + loop { + let cnt = self.range_scan(height, height + batch).await?; + if cnt == batch { + height += batch; + } else { + break; + } } - Err(e) => { - error!("Get block {} error: {:?}", height, e); + info!("Fast syncing complete."); + loop { + if let Ok(h) = self.caller.storage.get_tip().await { + height = h as u64 + 1; + } + + match self.caller.get_block_retried(height).await { + Ok(_) => { + info!("Get block {} succeed", height); + self.caller.storage.upsert_tip(height as i64).await?; + } + Err(ScannerError::BlockNotFound(height)) => { + error!("Block {} not found", height) + } + Err(e) => { + error!("Get block {} error: {:?}", height, e); + } + } + + tokio::time::sleep(interval).await; } } - - tokio::time::sleep(interval).await; } } }