diff --git a/crates/freeze/src/datasets/erc20_transfers.rs b/crates/freeze/src/datasets/erc20_transfers.rs index a4081926..ee1c1444 100644 --- a/crates/freeze/src/datasets/erc20_transfers.rs +++ b/crates/freeze/src/datasets/erc20_transfers.rs @@ -21,7 +21,7 @@ pub struct Erc20Transfers { #[async_trait::async_trait] impl Dataset for Erc20Transfers { fn optional_parameters() -> Vec { - vec![Dim::Contract, Dim::Topic0, Dim::Topic1, Dim::Topic2] + vec![Dim::Contract, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::FromAddress, Dim::ToAddress] } fn use_block_ranges() -> bool { @@ -34,9 +34,20 @@ impl CollectByBlock for Erc20Transfers { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None]; + let mut topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None]; + if let Some(from_address) = &request.from_address { + let mut v = vec![0u8; 12]; + v.append(&mut from_address.to_owned()); + topics[1] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..])))); + } + if let Some(to_address) = &request.to_address { + let mut v = vec![0u8; 12]; + v.append(&mut to_address.to_owned()); + topics[2] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..])))); + } let filter = Filter { topics, ..request.ethers_log_filter()? }; let logs = source.fetcher.get_logs(&filter).await?; + Ok(logs.into_iter().filter(|x| x.topics.len() == 3 && x.data.len() == 32).collect()) } diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index e56c392d..5c57cf6a 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -21,7 +21,7 @@ pub struct Erc721Transfers { #[async_trait::async_trait] impl Dataset for Erc721Transfers { fn optional_parameters() -> Vec { - vec![Dim::Contract] + vec![Dim::Contract, Dim::FromAddress, Dim::ToAddress] } fn use_block_ranges() -> bool { @@ -34,9 +34,21 @@ impl CollectByBlock for Erc721Transfers { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None]; + let mut topics = + [Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None]; + if let Some(from_address) = &request.from_address { + let mut v = vec![0u8; 12]; + v.append(&mut from_address.to_owned()); + topics[1] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..])))); + } + if let Some(to_address) = &request.to_address { + let mut v = vec![0u8; 12]; + v.append(&mut to_address.to_owned()); + topics[2] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..])))); + } let filter = Filter { topics, ..request.ethers_log_filter()? }; let logs = source.fetcher.get_logs(&filter).await?; + Ok(logs.into_iter().filter(|x| x.topics.len() == 4 && x.data.len() == 0).collect()) } diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 1cdef0c3..53bbbd21 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -18,14 +18,19 @@ pub struct NativeTransfers { } #[async_trait::async_trait] -impl Dataset for NativeTransfers {} +impl Dataset for NativeTransfers { + fn optional_parameters() -> Vec { + vec![Dim::FromAddress, Dim::ToAddress] + } +} #[async_trait::async_trait] impl CollectByBlock for NativeTransfers { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - source.fetcher.trace_block(request.block_number()?.into()).await + let traces = source.fetcher.trace_block(request.block_number()?.into()).await?; + Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { @@ -40,7 +45,8 @@ impl CollectByTransaction for NativeTransfers { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await + let traces = source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await?; + Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index 05ecf80b..d619cdbc 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -31,14 +31,19 @@ pub struct Traces { } #[async_trait::async_trait] -impl Dataset for Traces {} +impl Dataset for Traces { + fn optional_parameters() -> Vec { + vec![Dim::FromAddress, Dim::ToAddress] + } +} #[async_trait::async_trait] impl CollectByBlock for Traces { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - source.fetcher.trace_block(request.block_number()?.into()).await + let traces = source.fetcher.trace_block(request.block_number()?.into()).await?; + Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { @@ -53,7 +58,8 @@ impl CollectByTransaction for Traces { type Response = Vec; async fn extract(request: Params, source: Arc, _: Arc) -> R { - source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await + let traces = source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await?; + Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { @@ -62,6 +68,43 @@ impl CollectByTransaction for Traces { process_traces(&traces, columns, &query.schemas) } } + +pub(crate) fn filter_traces_by_from_to_addresses( + traces: Vec, + from_address: &Option>, + to_address: &Option>, +) -> Vec { + // filter by from_address + let from_filter: Box bool + Send> = if let Some(from_address) = from_address { + Box::new(move |trace| { + let from = match &trace.action { + Action::Call(action) => action.from, + Action::Create(action) => action.from, + Action::Suicide(action) => action.address, + _ => return false, + }; + from.as_bytes() == from_address + }) + } else { + Box::new(|_| true) + }; + // filter by to_address + let to_filter: Box bool + Send> = if let Some(to_address) = to_address { + Box::new(move |trace| { + let to = match &trace.action { + Action::Call(action) => action.to, + Action::Suicide(action) => action.refund_address, + Action::Reward(action) => action.author, + _ => return false, + }; + to.as_bytes() == to_address + }) + } else { + Box::new(|_| true) + }; + traces.into_iter().filter(from_filter).filter(to_filter).collect() +} + /// process block into columns pub(crate) fn process_traces(traces: &[Trace], columns: &mut Traces, schemas: &Schemas) -> R<()> { let schema = schemas.get(&Datatype::Traces).ok_or(err("schema not provided"))?;