Skip to content

Commit

Permalink
support websocket ws:// and wss:// rpc providers (#161)
Browse files Browse the repository at this point in the history
* support multiple provider types

* add websocket support
  • Loading branch information
sslivkoff authored Dec 31, 2023
1 parent 063c34d commit b2221f9
Show file tree
Hide file tree
Showing 45 changed files with 433 additions and 316 deletions.
2 changes: 1 addition & 1 deletion crates/cli/src/parse/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn parse_args(
args: &Args,
) -> Result<(Query, Source, FileOutput, ExecutionEnv), ParseError> {
let source = source::parse_source(args).await?;
let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?;
let query = query::parse_query(args, Arc::new(source.clone())).await?;
let sink = file_output::parse_file_output(args, &source)?;
let env = execution::parse_execution_env(args, query.n_tasks() as u64)?;
Ok((query, source, sink, env))
Expand Down
182 changes: 112 additions & 70 deletions crates/cli/src/parse/blocks.rs

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions crates/cli/src/parse/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ use super::{
};
use crate::args::Args;
use cryo_freeze::{
AddressChunk, CallDataChunk, Datatype, Dim, Fetcher, ParseError, Partition, PartitionLabels,
SlotChunk, Table, TimeDimension, TopicChunk, TransactionChunk,
AddressChunk, CallDataChunk, Datatype, Dim, ParseError, Partition, PartitionLabels, SlotChunk,
Source, Table, TimeDimension, TopicChunk, TransactionChunk,
};
use ethers::prelude::*;
use rand::{seq::SliceRandom, thread_rng};
use std::{collections::HashMap, str::FromStr, sync::Arc};

type ChunkLabels = Vec<Option<String>>;

pub(crate) async fn parse_partitions<P: JsonRpcClient>(
pub(crate) async fn parse_partitions(
args: &Args,
fetcher: Arc<Fetcher<P>>,
source: Arc<Source>,
schemas: &HashMap<Datatype, Table>,
) -> Result<(Vec<Partition>, Vec<Dim>, TimeDimension), ParseError> {
// TODO: if wanting to chunk these non-block dimensions, do it in parse_binary_arg()
// TODO: map from args to dim is not exhaustive

// parse chunk data
let (block_number_labels, block_numbers) = blocks::parse_blocks(args, fetcher.clone()).await?;
let (block_number_labels, block_numbers) = blocks::parse_blocks(args, source.clone()).await?;
let (block_number_labels, block_numbers) = if block_numbers.is_none() {
timestamps::parse_timestamps(args, fetcher.clone()).await?
timestamps::parse_timestamps(args, source.clone()).await?
} else {
(block_number_labels, block_numbers)
};
Expand All @@ -46,7 +46,7 @@ pub(crate) async fn parse_partitions<P: JsonRpcClient>(

// set default blocks
let block_numbers = if block_numbers.is_none() && transactions.is_none() {
Some(blocks::get_default_block_chunks(args, fetcher, schemas).await?)
Some(blocks::get_default_block_chunks(args, source, schemas).await?)
} else {
block_numbers
};
Expand Down
10 changes: 3 additions & 7 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use super::{parse_schemas, partitions};
use crate::args::Args;
use cryo_freeze::{Dim, Fetcher, ParseError, Query, QueryLabels, Schemas};
use ethers::prelude::*;
use cryo_freeze::{Dim, ParseError, Query, QueryLabels, Schemas, Source};
use std::sync::Arc;

/// parse Query struct from cli Args
pub async fn parse_query<P: JsonRpcClient>(
args: &Args,
fetcher: Arc<Fetcher<P>>,
) -> Result<Query, ParseError> {
pub async fn parse_query(args: &Args, source: Arc<Source>) -> Result<Query, ParseError> {
let (datatypes, schemas) = parse_schemas(args)?;

let arg_aliases = find_arg_aliases(args, &schemas);
Expand All @@ -17,7 +13,7 @@ pub async fn parse_query<P: JsonRpcClient>(
let args = new_args.as_ref().unwrap_or(args);

let (partitions, partitioned_by, time_dimension) =
partitions::parse_partitions(args, fetcher, &schemas).await?;
partitions::parse_partitions(args, source, &schemas).await?;
let datatypes = cryo_freeze::cluster_datatypes(datatypes);
let labels = QueryLabels { align: args.align, reorg_buffer: args.reorg_buffer };
Ok(Query {
Expand Down
33 changes: 24 additions & 9 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::env;

use crate::args::Args;
use cryo_freeze::{Fetcher, ParseError, Source, SourceLabels};
use cryo_freeze::{sources::ProviderWrapper, ParseError, Source, SourceLabels};
use ethers::prelude::*;
use governor::{Quota, RateLimiter};
use polars::prelude::*;
Expand All @@ -10,10 +10,24 @@ use std::num::NonZeroU32;
pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args)?;
let provider =
Provider::<RetryClient<Http>>::new_client(&rpc_url, args.max_retries, args.initial_backoff)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
let (provider, chain_id): (ProviderWrapper, u64) = if rpc_url.starts_with("http") {
let provider = Provider::<RetryClient<Http>>::new_client(
&rpc_url,
args.max_retries,
args.initial_backoff,
)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
} else if rpc_url.starts_with("ws") {
let provider = Provider::<Ws>::connect(&rpc_url).await.map_err(|_| {
ParseError::ParseError("could not instantiate HTTP Provider".to_string())
})?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
} else {
return Err(ParseError::ParseError(format!("invalid rpc url: {}", rpc_url)));
};

let rate_limiter = match args.requests_per_second {
Some(rate_limit) => match (NonZeroU32::new(1), NonZeroU32::new(rate_limit)) {
Expand All @@ -35,15 +49,16 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
};

let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize);
let semaphore = Some(semaphore);
let semaphore = Arc::new(Some(semaphore));

let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
fetcher: Arc::new(fetcher),
chain_id,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
semaphore,
rate_limiter: rate_limiter.into(),
rpc_url,
provider,
labels: SourceLabels {
max_concurrent_requests: args.requests_per_second.map(|x| x as u64),
max_requests_per_second: args.requests_per_second.map(|x| x as u64),
Expand All @@ -66,7 +81,7 @@ pub(crate) fn parse_rpc_url(args: &Args) -> Result<String, ParseError> {
}
},
};
if !url.starts_with("http") {
if !url.starts_with("http") & !url.starts_with("ws") {
url = "http://".to_string() + url.as_str();
};
Ok(url)
Expand Down
Loading

0 comments on commit b2221f9

Please sign in to comment.