Skip to content

Commit

Permalink
migrate: ethers to alloy
Browse files Browse the repository at this point in the history
  • Loading branch information
YadominJinta committed Jul 3, 2024
1 parent c3a2f1d commit 415e4c7
Show file tree
Hide file tree
Showing 57 changed files with 2,506 additions and 2,360 deletions.
2,569 changes: 1,239 additions & 1,330 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ cryo_cli = { version = "0.3.2", path = "./crates/cli" }
cryo_freeze = { version = "0.3.2", path = "./crates/freeze" }
cryo_to_df = { version = "0.3.2", path = "./crates/to_df" }

alloy = { version = "0.1", features = [
"full",
"rpc-types-trace",
"provider-ws",
"provider-ipc",
"provider-debug-api",
"provider-trace-api",
"transport-ipc-mock",
] }
anstyle = "1.0.4"
async-trait = "0.1.74"
chrono = { version = "0.4.31", features = ["serde"] }
Expand All @@ -29,8 +38,6 @@ clap_cryo = { version = "4.3.21-cryo", features = [
] }
colored = "2.0.4"
color-print = "0.3.5"
ethers = { version = "2.0.10", features = ["rustls", "ws", "ipc"] }
ethers-core = "2.0.10"
eyre = "0.6.8"
futures = "0.3.29"
governor = "0.6.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
alloy = { workspace = true }
anstyle = { workspace = true }
clap_cryo = { workspace = true }
color-print = { workspace = true }
colored = { workspace = true }
cryo_freeze = { workspace = true }
ethers = { workspace = true }
eyre = { workspace = true }
governor = { workspace = true }
hex = { workspace = true }
Expand Down
100 changes: 73 additions & 27 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ async fn parse_block_inputs(
}
}

#[derive(Clone, Debug)]
enum RangePosition {
First,
Last,
Expand Down Expand Up @@ -322,15 +323,14 @@ async fn parse_block_number(
source: Arc<Source>,
) -> Result<u64, ParseError> {
match (block_ref, range_position) {
("latest", _) => source.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
("latest", _) => source.get_block_number().await.map_err(|_e| {
ParseError::ParseError("Error retrieving latest block number".to_string())
}),
("", RangePosition::First) => Ok(0),
("", RangePosition::Last) => {
source.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving last block number".to_string())
})
}
("", RangePosition::Last) => source
.get_block_number()
.await
.map_err(|_e| ParseError::ParseError("Error retrieving last block number".to_string())),
("", RangePosition::None) => Err(ParseError::ParseError("invalid input".to_string())),
_ if block_ref.ends_with('B') | block_ref.ends_with('b') => {
let s = &block_ref[..block_ref.len() - 1];
Expand Down Expand Up @@ -366,7 +366,7 @@ async fn apply_reorg_buffer(
0 => Ok(block_chunks),
reorg_filter => {
let latest_block = match source.get_block_number().await {
Ok(result) => result.as_u64(),
Ok(result) => result,
Err(_e) => {
return Err(ParseError::ParseError("reorg buffer parse error".to_string()))
}
Expand All @@ -387,24 +387,34 @@ pub(crate) async fn get_latest_block_number(source: Arc<Source>) -> Result<u64,
source
.get_block_number()
.await
.map(|n| n.as_u64())
.map_err(|_e| ParseError::ParseError("Error retrieving latest block number".to_string()))
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use alloy::{
providers::{IpcConnect, ProviderBuilder},
transports::ipc::MockIpcServer,
};

use super::*;
use ethers::prelude::*;

#[derive(Clone, Debug)]
enum BlockTokenTest<'a> {
WithoutMock((&'a str, BlockChunk)), // Token | Expected
WithMock((&'a str, BlockChunk, u64)), // Token | Expected | Mock Block Response
}

async fn block_token_test_helper(tests: Vec<(BlockTokenTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_token_test_helper(
tests: Vec<(BlockTokenTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let source = Source {
provider: provider.into(),
provider,
semaphore: Arc::new(None),
rate_limiter: Arc::new(None),
chain_id: 1,
Expand All @@ -416,8 +426,7 @@ mod tests {
let source = Arc::new(source);
for (test, res) in tests {
match test {
BlockTokenTest::WithMock((token, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockTokenTest::WithMock((token, expected, _latest)) => {
assert_eq!(
block_token_test_executor(token, expected, source.clone()).await,
res
Expand Down Expand Up @@ -458,15 +467,20 @@ mod tests {
}
}

#[derive(Clone, Debug)]
enum BlockInputTest<'a> {
WithoutMock((&'a String, Vec<BlockChunk>)), // Token | Expected
WithMock((&'a String, Vec<BlockChunk>, u64)), // Token | Expected | Mock Block Response
}

async fn block_input_test_helper(tests: Vec<(BlockInputTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_input_test_helper(
tests: Vec<(BlockInputTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let source = Arc::new(Source {
provider: provider.into(),
provider,
chain_id: 1,
rpc_url: "".to_string(),
inner_request_size: 1,
Expand All @@ -477,8 +491,7 @@ mod tests {
});
for (test, res) in tests {
match test {
BlockInputTest::WithMock((inputs, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockInputTest::WithMock((inputs, expected, _latest)) => {
assert_eq!(
block_input_test_executor(inputs, expected, source.clone()).await,
res
Expand Down Expand Up @@ -531,15 +544,20 @@ mod tests {
true
}

#[derive(Clone, Debug)]
enum BlockNumberTest<'a> {
WithoutMock((&'a str, RangePosition, u64)),
WithMock((&'a str, RangePosition, u64, u64)),
}

async fn block_number_test_helper(tests: Vec<(BlockNumberTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_number_test_helper(
tests: Vec<(BlockNumberTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let provider =
ProviderBuilder::new().on_ipc(IpcConnect::new(mock_ipc_path)).await.unwrap().boxed();
let source = Source {
provider: provider.into(),
provider,
semaphore: Arc::new(None),
rate_limiter: Arc::new(None),
chain_id: 1,
Expand All @@ -551,8 +569,7 @@ mod tests {
let source = Arc::new(source);
for (test, res) in tests {
match test {
BlockNumberTest::WithMock((block_ref, range_position, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockNumberTest::WithMock((block_ref, range_position, expected, _latest)) => {
assert_eq!(
block_number_test_executor(
block_ref,
Expand Down Expand Up @@ -604,7 +621,18 @@ mod tests {
// Number type
(BlockTokenTest::WithoutMock((r"1", BlockChunk::Numbers(vec![1]))), true), /* Single block */
];
block_token_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone().into_iter() {
match test {
BlockTokenTest::WithoutMock(_) => {}
BlockTokenTest::WithMock((_, _, mock_response)) => {
mock_server.add_reply(mock_response)
}
}
}
mock_server.spawn().await;
block_token_test_helper(tests, mock_ipc_path).await;
}

#[tokio::test]
Expand Down Expand Up @@ -648,7 +676,16 @@ mod tests {
true,
), // Multi input complex
];
block_input_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone() {
match test {
BlockInputTest::WithMock((_, _, expected)) => mock_server.add_reply(expected),
BlockInputTest::WithoutMock(_) => {}
}
}
mock_server.spawn().await;
block_input_test_helper(tests, mock_ipc_path).await;
}

#[tokio::test]
Expand All @@ -666,6 +703,15 @@ mod tests {
(BlockNumberTest::WithoutMock((r"1m", RangePosition::None, 1000000)), true), // m
(BlockNumberTest::WithoutMock((r"1k", RangePosition::None, 1000)), true), // k
];
block_number_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone() {
match test {
BlockNumberTest::WithMock((_, _, _, expected)) => mock_server.add_reply(expected),
BlockNumberTest::WithoutMock(_) => {}
}
}
mock_server.spawn().await;
block_number_test_helper(tests, mock_ipc_path).await;
}
}
1 change: 0 additions & 1 deletion crates/cli/src/parse/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use cryo_freeze::{
AddressChunk, CallDataChunk, Datatype, Dim, ParseError, Partition, PartitionLabels, SlotChunk,
Source, Table, TimeDimension, TopicChunk, TransactionChunk,
};
use ethers::prelude::*;
use rand::{seq::SliceRandom, thread_rng};
use std::{collections::HashMap, str::FromStr, sync::Arc};

Expand Down
40 changes: 19 additions & 21 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
use std::env;

use crate::args::Args;
use cryo_freeze::{sources::ProviderWrapper, ParseError, Source, SourceLabels};
use ethers::prelude::*;
use alloy::{
providers::{IpcConnect, Provider, ProviderBuilder, RootProvider, WsConnect},
transports::{http::reqwest::Url, BoxTransport},
};
use cryo_freeze::{ParseError, Source, SourceLabels};
use governor::{Quota, RateLimiter};
use polars::prelude::*;
use std::num::NonZeroU32;

pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args)?;
let (provider, chain_id): (ProviderWrapper, u64) = if rpc_url.starts_with("http") {
let provider = Provider::<RetryClient<Http>>::new_client(
&rpc_url,
args.max_retries,
args.initial_backoff,
)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
let (provider, chain_id): (RootProvider<BoxTransport>, u64) = if rpc_url.starts_with("http") {
let url: Url = rpc_url.clone().parse().map_err(ParseError::ParseUrlError)?;
let provider = ProviderBuilder::new().on_http(url);
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
(provider.boxed(), chain_id)
} else if rpc_url.starts_with("ws") {
let provider = Provider::<Ws>::connect(&rpc_url).await.map_err(|_| {
ParseError::ParseError("could not instantiate HTTP Provider".to_string())
})?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
let ws = WsConnect::new(rpc_url.clone());
let provider = ProviderBuilder::new().on_ws(ws).await.map_err(ParseError::ProviderError)?;
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
(provider.boxed(), chain_id)
} else if rpc_url.ends_with(".ipc") {
let provider: Provider<Ipc> = Provider::connect_ipc(&rpc_url).await.map_err(|_| {
ParseError::ParseError("could not instantiate HTTP Provider".to_string())
})?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
let ipc = IpcConnect::new(rpc_url.clone());
let provider =
ProviderBuilder::new().on_ipc(ipc).await.map_err(ParseError::ProviderError)?;
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
(provider.boxed(), chain_id)
} else {
return Err(ParseError::ParseError(format!("invalid rpc url: {}", rpc_url)));
};
Expand Down
Loading

0 comments on commit 415e4c7

Please sign in to comment.