Skip to content

Commit

Permalink
scan single block. (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssuchichan authored Mar 19, 2024
1 parent f5b05c4 commit 9771354
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 43 deletions.
1 change: 0 additions & 1 deletion indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ axum-macros = "0.4.1"
env_logger = "0.11.2"
ethers = { version = "2.0", features = ["abigen", "legacy"] }
log = "0.4.20"
redis = { version = "0.25.0", features = ["tokio-comp","tokio-rustls-comp"] }
rustc-hex = "2.1.0"
serde = "1.0.197"
serde_json = "1.0.114"
Expand Down
10 changes: 2 additions & 8 deletions indexer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ use axum::response::{IntoResponse, Response};
pub enum IndexerError {
Custom(String),
DBError(sqlx::Error),
RdsError(redis::RedisError),
IOError(std::io::Error),
TomlDeError(toml::de::Error),
HexError(rustc_hex::FromHexError),
ParseUrlError(url::ParseError),
}

impl From<String> for IndexerError {
fn from(e: String) -> Self {
IndexerError::Custom(e)
}
}

impl From<url::ParseError> for IndexerError {
fn from(e: url::ParseError) -> Self {
IndexerError::ParseUrlError(e)
Expand Down Expand Up @@ -46,20 +47,13 @@ impl From<sqlx::Error> for IndexerError {
}
}

impl From<redis::RedisError> for IndexerError {
fn from(e: redis::RedisError) -> Self {
IndexerError::RdsError(e)
}
}

pub type Result<T> = core::result::Result<T, IndexerError>;

impl IntoResponse for IndexerError {
fn into_response(self) -> Response {
let err_msg = match self {
IndexerError::Custom(e) => e,
IndexerError::DBError(e) => e.to_string(),
IndexerError::RdsError(e) => e.to_string(),
IndexerError::IOError(e) => e.to_string(),
IndexerError::TomlDeError(e) => e.to_string(),
IndexerError::HexError(e) => e.to_string(),
Expand Down
11 changes: 7 additions & 4 deletions scanner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ const DEFAULT_RPC_RETRIES: usize = 3;
#[derive(Parser, Debug)]
struct Args {
/// Node RPC
#[arg(short, long)]
#[arg(long)]
pub node: String,
/// Pull single block
#[arg(long)]
pub single: bool,
/// Block height to start scanning
#[arg(short, long)]
#[arg(long)]
pub start: Option<u64>,
/// Interval of scanning in seconds
#[arg(short, long)]
#[arg(long)]
pub interval: Option<u64>,
}

Expand Down Expand Up @@ -65,7 +68,7 @@ async fn main() -> Result<()> {
info!("Starting syncing...");
let scanner = Scanner::new(DEFAULT_RPC_RETRIES, num_cpus::get(), rpc_url, storage)
.expect("failed to new scanner");
let _ = scanner.run(start, interval).await;
let _ = scanner.run(start, interval, args.single).await;

Ok(())
}
74 changes: 44 additions & 30 deletions scanner/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl RpcCaller {
while retries < self.retries {
match self.rpc.get_block_by_height(height).await {
Ok(block) => {
//info!("block_hash: {}, height: {}", block.block_id.hash, height);
self.process_block(block).await?;
return Ok(());
}
Expand Down Expand Up @@ -655,6 +654,14 @@ impl Scanner {
})
}

pub async fn single_scan(&self, height: u64) -> Result<()> {
info!("Syncing block: {}", height);
self.caller.get_block_retried(height).await?;
self.caller.storage.upsert_tip(height as i64).await?;
info!("Syncing block: {} complete", height);
Ok(())
}

pub async fn range_scan(&self, start: u64, end: u64) -> Result<u64> {
info!("Syncing [{},{}) ...", start, end);
let concurrency = self.caller.threads; //how many spawned.
Expand Down Expand Up @@ -701,39 +708,46 @@ impl Scanner {
Ok(succeed_cnt.load(Ordering::Acquire))
}

pub async fn run(&self, start: u64, interval: Duration) -> Result<()> {
let batch = (4 * self.caller.threads) as u64;
let mut height = start;

info!("Fast syncing...");
loop {
let cnt = self.range_scan(height, height + batch).await?;
if cnt == batch {
height += batch;
} else {
break;
}
}
info!("Fast syncing complete.");
loop {
if let Ok(h) = self.caller.storage.get_tip().await {
height = h as u64 + 1;
pub async fn run(&self, start: u64, interval: Duration, single: bool) -> Result<()> {
match single {
true => {
info!("Single syncing...");
self.single_scan(start).await
}

match self.caller.get_block_retried(height).await {
Ok(_) => {
info!("Get block {} succeed", height);
self.caller.storage.upsert_tip(height as i64).await?;
}
Err(ScannerError::BlockNotFound(height)) => {
error!("Block {} not found", height)
false => {
let mut height = start;
let batch = (4 * self.caller.threads) as u64;
info!("Fast syncing...");
loop {
let cnt = self.range_scan(height, height + batch).await?;
if cnt == batch {
height += batch;
} else {
break;
}
}
Err(e) => {
error!("Get block {} error: {:?}", height, e);
info!("Fast syncing complete.");
loop {
if let Ok(h) = self.caller.storage.get_tip().await {
height = h as u64 + 1;
}

match self.caller.get_block_retried(height).await {
Ok(_) => {
info!("Get block {} succeed", height);
self.caller.storage.upsert_tip(height as i64).await?;
}
Err(ScannerError::BlockNotFound(height)) => {
error!("Block {} not found", height)
}
Err(e) => {
error!("Get block {} error: {:?}", height, e);
}
}

tokio::time::sleep(interval).await;
}
}

tokio::time::sleep(interval).await;
}
}
}
Expand Down

0 comments on commit 9771354

Please sign in to comment.