Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fallback to multiple eth_getTransactionReceipt calls if eth_getBlockReceipts is not present #94

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Standard types across tables:
|dataset|blocks per request|results per block|method|
|-|-|-|-|
|Blocks|1|1|`eth_getBlockByNumber`|
|Transactions|1|multiple|`eth_getBlockByNumber`|
|Transactions|1|multiple|`eth_getBlockByNumber`, `eth_getBlockReceipts`, `eth_getTransactionReceipt`|
|Logs|multiple|multiple|`eth_getLogs`|
|Contracts|1|multiple|`trace_block`|
|Traces|1|multiple|`trace_block`|
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl CollectByBlock for Transactions {
.ok_or(CollectError::CollectError("block not found".to_string()))?;
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
let receipt = if schema.has_column("gas_used") | schema.has_column("success") {
Some(source.fetcher.get_block_receipts(request.block_number()?).await?)
Some(source.get_tx_receipts_in_block(&block).await?)
} else {
None
};
Expand Down
108 changes: 47 additions & 61 deletions crates/freeze/src/types/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use governor::{
middleware::NoOpMiddleware,
state::{direct::NotKeyed, InMemoryState},
};
use tokio::sync::{AcquireError, Semaphore, SemaphorePermit};
use tokio::{
sync::{AcquireError, Semaphore, SemaphorePermit},
task,
};

use crate::CollectError;

Expand All @@ -30,6 +33,46 @@ pub struct Source {
pub labels: SourceLabels,
}

impl Source {
/// Returns all receipts for a block.
/// Tries to use `eth_getBlockReceipts` first, and falls back to `eth_getTransactionReceipt`
pub async fn get_tx_receipts_in_block(
&self,
block: &Block<Transaction>,
) -> Result<Vec<TransactionReceipt>> {
let block_number =
block.number.ok_or(CollectError::CollectError("no block number".to_string()))?.as_u64();
if let Ok(receipts) = self.fetcher.get_block_receipts(block_number).await {
return Ok(receipts)
}

// fallback to `eth_getTransactionReceipt`
let mut tasks = Vec::new();
for tx in &block.transactions {
let tx_hash = tx.hash;
let fetcher = self.fetcher.clone();
let task = task::spawn(async move {
match fetcher.get_transaction_receipt(tx_hash).await? {
Some(receipt) => Ok(receipt),
None => {
Err(CollectError::CollectError("could not find tx receipt".to_string()))
}
}
});
tasks.push(task);
}
let mut receipts = Vec::new();
for task in tasks {
match task.await {
Ok(receipt) => receipts.push(receipt?),
Err(e) => return Err(CollectError::TaskFailed(e)),
}
}

Ok(receipts)
}
}

/// source labels (non-functional)
#[derive(Clone)]
pub struct SourceLabels {
Expand Down Expand Up @@ -185,6 +228,9 @@ impl<P: JsonRpcClient> Fetcher<P> {
}

/// Returns all receipts for a block.
/// Note that this uses the `eth_getBlockReceipts` method which is not supported by all nodes.
/// Consider using `FetcherExt::get_tx_receipts_in_block` which takes a block, and falls back to
/// `eth_getTransactionReceipt` if `eth_getBlockReceipts` is not supported.
pub async fn get_block_receipts(&self, block_num: u64) -> Result<Vec<TransactionReceipt>> {
let _permit = self.permit_request().await;
Self::map_err(self.provider.get_block_receipts(block_num).await)
Expand Down Expand Up @@ -551,7 +597,6 @@ impl<P: JsonRpcClient> Fetcher<P> {

use crate::err;
use std::collections::BTreeMap;
use tokio::task;

fn parse_geth_diff_object(
map: ethers::utils::__serde_json::Map<String, ethers::utils::__serde_json::Value>,
Expand All @@ -564,62 +609,3 @@ fn parse_geth_diff_object(

Ok(DiffMode { pre, post })
}

impl Source {
/// get gas used by transactions in block
pub async fn get_txs_gas_used(&self, block: &Block<Transaction>) -> Result<Vec<u64>> {
match get_txs_gas_used_per_block(block, self.fetcher.clone()).await {
Ok(value) => Ok(value),
Err(_) => get_txs_gas_used_per_tx(block, self.fetcher.clone()).await,
}
}
}

async fn get_txs_gas_used_per_block<P: JsonRpcClient>(
block: &Block<Transaction>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<u64>> {
// let fetcher = Arc::new(fetcher);
let block_number = match block.number {
Some(number) => number,
None => return Err(CollectError::CollectError("no block number".to_string())),
};
let receipts = fetcher.get_block_receipts(block_number.as_u64()).await?;
let mut gas_used: Vec<u64> = Vec::new();
for receipt in receipts {
match receipt.gas_used {
Some(value) => gas_used.push(value.as_u64()),
None => return Err(CollectError::CollectError("no gas_used for tx".to_string())),
}
}
Ok(gas_used)
}

async fn get_txs_gas_used_per_tx<P: JsonRpcClient + 'static>(
block: &Block<Transaction>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<u64>> {
// let fetcher = Arc::new(*fetcher.clone());
let mut tasks = Vec::new();
for tx in &block.transactions {
let tx_clone = tx.hash;
let fetcher = fetcher.clone();
let task = task::spawn(async move {
match fetcher.get_transaction_receipt(tx_clone).await? {
Some(receipt) => Ok(receipt.gas_used),
None => Err(CollectError::CollectError("could not find tx receipt".to_string())),
}
});
tasks.push(task);
}

let mut gas_used: Vec<u64> = Vec::new();
for task in tasks {
match task.await {
Ok(Ok(Some(value))) => gas_used.push(value.as_u64()),
_ => return Err(CollectError::CollectError("gas_used not available from node".into())),
}
}

Ok(gas_used)
}