Skip to content

Commit

Permalink
feat(operator): reduce number of rpc requests (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
ratankaliani authored Jan 30, 2025
1 parent 2aecf55 commit 5224e74
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
2 changes: 2 additions & 0 deletions script/bin/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ impl SP1BlobstreamOperator {
let rpc_client = TendermintRPCClient::default();
let mut stdin = SP1Stdin::new();

info!("Fetching inputs for proof.");
let inputs = rpc_client
.fetch_input_for_blobstream_proof(trusted_block, target_block)
.await;
info!("Inputs fetched for proof.");

// Simulate the step from the trusted block to the target block.
let verdict =
Expand Down
69 changes: 43 additions & 26 deletions script/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![allow(dead_code)]
use crate::types::*;
use alloy::primitives::B256;
use futures::{stream, StreamExt};
use log::debug;
use reqwest::Client;
use sp1_blobstream_primitives::types::ProofInputs;
use std::sync::Arc;
use std::{collections::HashMap, env, error::Error};
use subtle_encoding::hex;
use tendermint::block::Commit;
use tendermint::block::{Commit, Header};
use tendermint::validator::Set as TendermintValidatorSet;
use tendermint::Block;
use tendermint::{
block::signed_header::SignedHeader,
node::Id,
Expand All @@ -31,6 +33,9 @@ impl Default for TendermintRPCClient {
/// The default timeout for Tendermint RPC requests in seconds.
const DEFAULT_TENDERMINT_RPC_TIMEOUT_SECS: u64 = 20;

/// The default concurrency for Tendermint RPC requests.
const DEFAULT_TENDERMINT_RPC_CONCURRENCY: usize = 100;

impl TendermintRPCClient {
pub fn new(url: String) -> Self {
let client = Client::builder()
Expand All @@ -50,18 +55,16 @@ impl TendermintRPCClient {
trusted_block_height: u64,
target_block_height: u64,
) -> ProofInputs {
let light_blocks = self
.fetch_light_blocks_in_range(trusted_block_height, target_block_height)
let (trusted_light_block, target_light_block) = self
.get_light_blocks(trusted_block_height, target_block_height)
.await;
let headers = self
.get_headers_in_range(trusted_block_height + 1, target_block_height - 1)
.await;

let mut headers = Vec::new();
for light_block in &light_blocks[1..light_blocks.len() - 1] {
headers.push(light_block.signed_header.header.clone());
}

ProofInputs {
trusted_light_block: light_blocks[0].clone(),
target_light_block: light_blocks[light_blocks.len() - 1].clone(),
trusted_light_block,
target_light_block,
headers,
}
}
Expand Down Expand Up @@ -97,27 +100,16 @@ impl TendermintRPCClient {
end_height: u64,
) -> Vec<LightBlock> {
let peer_id = self.fetch_peer_id().await.unwrap();
let batch_size = 25;
let mut blocks = Vec::new();
debug!(
"Fetching light blocks in range: {} to {}",
start_height, end_height
);

for batch_start in (start_height..=end_height).step_by(batch_size) {
let batch_end = std::cmp::min(batch_start + (batch_size as u64) - 1, end_height);
let mut handles = Vec::new();

for height in batch_start..=batch_end {
let fetch_light_block =
async move { self.fetch_light_block(height, peer_id).await.unwrap() };
handles.push(fetch_light_block);
}

// Join all the futures in the current batch
let batch_blocks = futures::future::join_all(handles).await;
blocks.extend(batch_blocks);
}
let blocks = stream::iter(start_height..=end_height)
.map(|height| async move { self.fetch_light_block(height, peer_id).await.unwrap() })
.buffered(DEFAULT_TENDERMINT_RPC_CONCURRENCY)
.collect::<Vec<_>>()
.await;

debug!("Finished fetching light blocks!");
blocks
Expand All @@ -142,6 +134,24 @@ impl TendermintRPCClient {
(trusted_light_block, target_light_block)
}

/// Retrieves the block from the Tendermint node.
pub async fn get_block(&self, height: u64) -> Block {
let block = self.fetch_block_by_height(height).await.unwrap();
block.result.block
}

/// Retrieves the headers for the given range of block heights. Inclusive of start and end.
pub async fn get_headers_in_range(&self, start_height: u64, end_height: u64) -> Vec<Header> {
let mut headers = Vec::new();
let headers_stream = stream::iter(start_height..=end_height)
.map(|height| async move { self.get_block(height).await.header })
.buffered(DEFAULT_TENDERMINT_RPC_CONCURRENCY)
.collect::<Vec<_>>()
.await;
headers.extend(headers_stream);
headers
}

/// Retrieves the latest block height from the Tendermint node.
pub async fn get_latest_block_height(&self) -> u64 {
let latest_commit = self.fetch_latest_commit().await.unwrap();
Expand Down Expand Up @@ -226,6 +236,13 @@ impl TendermintRPCClient {
.unwrap()
}

/// Fetches the block by its height.
async fn fetch_block_by_height(&self, height: u64) -> Result<BlockResponse, Box<dyn Error>> {
let url = format!("{}/block?height={}", self.url, height);
let response: BlockResponse = self.client.get(url).send().await?.json().await?;
Ok(response)
}

/// Fetches the latest commit from the Tendermint node.
async fn fetch_latest_commit(&self) -> Result<CommitResponse, Box<dyn Error>> {
let url = format!("{}/commit", self.url);
Expand Down

0 comments on commit 5224e74

Please sign in to comment.