diff --git a/README.md b/README.md index e2e462ba..6ceb775f 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ *`cryo` is an early WIP, please report bugs + feedback to the issue tracker* +*note that `cryo`'s default settings will slam a node too hard for use with 3rd party RPC providers. Instead, `--requests-per-second` and `--max-concurrent-requests` should be used to impose ratelimits. Such settings will be handled automatically in a future release*. + ## Example Usage use as `cryo [OPTIONS]` diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index dc493906..b3bb0091 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -101,7 +101,10 @@ async fn fetch_blocks( .map_err(CollectError::ProviderError); match tx.send(block).await { Ok(_) => {} - Err(tokio::sync::mpsc::error::SendError(_e)) => println!("send error"), + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } } }); } @@ -145,52 +148,57 @@ async fn blocks_to_df( let mut base_fee_per_gas: Vec> = Vec::with_capacity(capacity); let mut n_rows = 0; - while let Some(Ok(Some(block))) = blocks.recv().await { - if let (Some(n), Some(h), Some(a)) = (block.number, block.hash, block.author) { - n_rows += 1; - - if include_hash { - hash.push(h.as_bytes().to_vec()); - } - if include_parent_hash { - parent_hash.push(block.parent_hash.as_bytes().to_vec()); - } - if include_author { - author.push(a.as_bytes().to_vec()); - } - if include_state_root { - state_root.push(block.state_root.as_bytes().to_vec()); - } - if include_transactions_root { - transactions_root.push(block.transactions_root.as_bytes().to_vec()); - } - if include_receipts_root { - receipts_root.push(block.receipts_root.as_bytes().to_vec()); - } - if include_number { - number.push(n.as_u32()) - } - if include_gas_used { - gas_used.push(block.gas_used.as_u32()); - } - if include_extra_data { - extra_data.push(block.extra_data.to_vec()); - } - if include_logs_bloom { - logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); - } - if include_timestamp { - timestamp.push(block.timestamp.as_u32()); - } - if include_total_difficulty { - total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); - } - if include_size { - size.push(block.size.map(|x| x.as_u32())); - } - if include_base_fee_per_gas { - base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); - } + while let Some(message) = blocks.recv().await { + match message { + Ok(Some(block)) => { + if let (Some(n), Some(h), Some(a)) = (block.number, block.hash, block.author) { + n_rows += 1; + + if include_hash { + hash.push(h.as_bytes().to_vec()); + } + if include_parent_hash { + parent_hash.push(block.parent_hash.as_bytes().to_vec()); + } + if include_author { + author.push(a.as_bytes().to_vec()); + } + if include_state_root { + state_root.push(block.state_root.as_bytes().to_vec()); + } + if include_transactions_root { + transactions_root.push(block.transactions_root.as_bytes().to_vec()); + } + if include_receipts_root { + receipts_root.push(block.receipts_root.as_bytes().to_vec()); + } + if include_number { + number.push(n.as_u32()) + } + if include_gas_used { + gas_used.push(block.gas_used.as_u32()); + } + if include_extra_data { + extra_data.push(block.extra_data.to_vec()); + } + if include_logs_bloom { + logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); + } + if include_timestamp { + timestamp.push(block.timestamp.as_u32()); + } + if include_total_difficulty { + total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); + } + if include_size { + size.push(block.size.map(|x| x.as_u32())); + } + if include_base_fee_per_gas { + base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); + } + } + } + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index 5a33bd85..9885f95a 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -105,7 +105,10 @@ async fn fetch_logs( .map_err(CollectError::ProviderError); match tx.send(result).await { Ok(_) => {} - Err(tokio::sync::mpsc::error::SendError(_e)) => println!("send error"), + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } } }); } @@ -129,60 +132,66 @@ async fn logs_to_df( let mut data: Vec> = Vec::new(); let mut n_rows = 0; - while let Some(Ok(logs)) = logs.recv().await { - for log in logs.iter() { - if let Some(true) = log.removed { - continue; - } - if let (Some(bn), Some(tx), Some(ti), Some(li)) = ( - log.block_number, - log.transaction_hash, - log.transaction_index, - log.log_index, - ) { - n_rows += 1; - address.push(log.address.as_bytes().to_vec()); - match log.topics.len() { - 0 => { - topic0.push(None); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 1 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 2 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(None); - topic3.push(None); - } - 3 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(None); - } - 4 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(Some(log.topics[3].as_bytes().to_vec())); + // while let Some(Ok(logs)) = logs.recv().await { + while let Some(message) = logs.recv().await { + match message { + Ok(logs) => { + for log in logs.iter() { + if let Some(true) = log.removed { + continue; } - _ => { - return Err(CollectError::InvalidNumberOfTopics); + if let (Some(bn), Some(tx), Some(ti), Some(li)) = ( + log.block_number, + log.transaction_hash, + log.transaction_index, + log.log_index, + ) { + n_rows += 1; + address.push(log.address.as_bytes().to_vec()); + match log.topics.len() { + 0 => { + topic0.push(None); + topic1.push(None); + topic2.push(None); + topic3.push(None); + } + 1 => { + topic0.push(Some(log.topics[0].as_bytes().to_vec())); + topic1.push(None); + topic2.push(None); + topic3.push(None); + } + 2 => { + topic0.push(Some(log.topics[0].as_bytes().to_vec())); + topic1.push(Some(log.topics[1].as_bytes().to_vec())); + topic2.push(None); + topic3.push(None); + } + 3 => { + topic0.push(Some(log.topics[0].as_bytes().to_vec())); + topic1.push(Some(log.topics[1].as_bytes().to_vec())); + topic2.push(Some(log.topics[2].as_bytes().to_vec())); + topic3.push(None); + } + 4 => { + topic0.push(Some(log.topics[0].as_bytes().to_vec())); + topic1.push(Some(log.topics[1].as_bytes().to_vec())); + topic2.push(Some(log.topics[2].as_bytes().to_vec())); + topic3.push(Some(log.topics[3].as_bytes().to_vec())); + } + _ => { + return Err(CollectError::InvalidNumberOfTopics); + } + } + data.push(log.data.clone().to_vec()); + block_number.push(bn.as_u32()); + transaction_hash.push(tx.as_bytes().to_vec()); + transaction_index.push(ti.as_u32()); + log_index.push(li.as_u32()); } } - data.push(log.data.clone().to_vec()); - block_number.push(bn.as_u32()); - transaction_hash.push(tx.as_bytes().to_vec()); - transaction_index.push(ti.as_u32()); - log_index.push(li.as_u32()); } + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index 44bfd6af..c53779c8 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -96,7 +96,8 @@ async fn state_diffs_to_df( // storage let include_storage_block_number = included(schemas, Datatype::StorageDiffs, "block_number"); - let include_storage_transaction_index = included(schemas, Datatype::StorageDiffs, "transaction_index"); + let include_storage_transaction_index = + included(schemas, Datatype::StorageDiffs, "transaction_index"); let include_storage_transaction_hash = included(schemas, Datatype::StorageDiffs, "transaction_hash"); let include_storage_address = included(schemas, Datatype::StorageDiffs, "address"); @@ -113,7 +114,8 @@ async fn state_diffs_to_df( // balance let include_balance_block_number = included(schemas, Datatype::BalanceDiffs, "block_number"); - let include_balance_transaction_index = included(schemas, Datatype::BalanceDiffs, "transaction_index"); + let include_balance_transaction_index = + included(schemas, Datatype::BalanceDiffs, "transaction_index"); let include_balance_transaction_hash = included(schemas, Datatype::BalanceDiffs, "transaction_hash"); let include_balance_address = included(schemas, Datatype::BalanceDiffs, "address"); @@ -128,7 +130,8 @@ async fn state_diffs_to_df( // nonce let include_nonce_block_number = included(schemas, Datatype::NonceDiffs, "block_number"); - let include_nonce_transaction_index = included(schemas, Datatype::NonceDiffs, "transaction_index"); + let include_nonce_transaction_index = + included(schemas, Datatype::NonceDiffs, "transaction_index"); let include_nonce_transaction_hash = included(schemas, Datatype::NonceDiffs, "transaction_hash"); let include_nonce_address = included(schemas, Datatype::NonceDiffs, "address"); @@ -143,7 +146,8 @@ async fn state_diffs_to_df( // code let include_code_block_number = included(schemas, Datatype::CodeDiffs, "block_number"); - let include_code_transaction_index = included(schemas, Datatype::CodeDiffs, "transaction_index"); + let include_code_transaction_index = + included(schemas, Datatype::CodeDiffs, "transaction_index"); let include_code_transaction_hash = included(schemas, Datatype::CodeDiffs, "transaction_hash"); let include_code_address = included(schemas, Datatype::CodeDiffs, "address"); let include_code_from_value = included(schemas, Datatype::CodeDiffs, "from_value"); @@ -160,7 +164,9 @@ async fn state_diffs_to_df( match message { (block_num, Ok(blocks_traces)) => { for ts in blocks_traces.iter() { - if let (Some(tx), Some(StateDiff(state_diff))) = (ts.transaction_hash, &ts.state_diff) { + if let (Some(tx), Some(StateDiff(state_diff))) = + (ts.transaction_hash, &ts.state_diff) + { for (addr, addr_diff) in state_diff.iter() { n_rows += n_rows; @@ -233,7 +239,9 @@ async fn state_diffs_to_df( Diff::Same => (0u64, 0u64), Diff::Born(value) => (0u64, value.as_u64()), Diff::Died(value) => (value.as_u64(), 0u64), - Diff::Changed(ChangedType { from, to }) => (from.as_u64(), to.as_u64()), + Diff::Changed(ChangedType { from, to }) => { + (from.as_u64(), to.as_u64()) + } }; if include_nonce_block_number { nonce_block_number.push(block_num); @@ -262,9 +270,15 @@ async fn state_diffs_to_df( H256::zero().as_bytes().to_vec(), H256::zero().as_bytes().to_vec(), ), - Diff::Born(value) => (H256::zero().as_bytes().to_vec(), value.to_vec()), - Diff::Died(value) => (value.to_vec(), H256::zero().as_bytes().to_vec()), - Diff::Changed(ChangedType { from, to }) => (from.to_vec(), to.to_vec()), + Diff::Born(value) => { + (H256::zero().as_bytes().to_vec(), value.to_vec()) + } + Diff::Died(value) => { + (value.to_vec(), H256::zero().as_bytes().to_vec()) + } + Diff::Changed(ChangedType { from, to }) => { + (from.to_vec(), to.to_vec()) + } }; if include_code_block_number { code_block_number.push(block_num); @@ -289,7 +303,7 @@ async fn state_diffs_to_df( } } } - _ => { return Err(CollectError::TooManyRequestsError) } + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index c9bb6022..0a3bfa66 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -83,7 +83,10 @@ impl Dataset for Traces { } fn default_sort(&self) -> Vec { - vec!["block_number".to_string(), "transaction_position".to_string()] + vec![ + "block_number".to_string(), + "transaction_position".to_string(), + ] } async fn collect_chunk( @@ -256,7 +259,8 @@ async fn traces_to_df( action_input.push(Some(a.input.to_vec())); } if include_action_call_type { - action_call_type.push(Some(action_call_type_to_string(&a.call_type))); + action_call_type + .push(Some(action_call_type_to_string(&a.call_type))); } if include_action_init { @@ -430,7 +434,7 @@ async fn traces_to_df( } } } - _ => { return Err(CollectError::TooManyRequestsError) } + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 0b68e771..19ab0bee 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -106,7 +106,10 @@ async fn fetch_blocks_and_transactions( .map_err(CollectError::ProviderError); match tx.send(block).await { Ok(_) => {} - Err(tokio::sync::mpsc::error::SendError(_e)) => println!("send error"), + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } } }); } @@ -136,34 +139,40 @@ async fn txs_to_df( let mut chain_ids: Vec> = Vec::new(); let mut n_rows = 0; - while let Some(Ok(Some(block))) = rx.recv().await { - for tx in block.transactions.iter() { - n_rows += 1; - match tx.block_number { - Some(block_number) => block_numbers.push(Some(block_number.as_u64())), - None => block_numbers.push(None), - } - match tx.transaction_index { - Some(transaction_index) => { - transaction_indices.push(Some(transaction_index.as_u64())) + while let Some(message) = rx.recv().await { + match message { + Ok(Some(block)) => { + for tx in block.transactions.iter() { + n_rows += 1; + match tx.block_number { + Some(block_number) => block_numbers.push(Some(block_number.as_u64())), + None => block_numbers.push(None), + } + match tx.transaction_index { + Some(transaction_index) => { + transaction_indices.push(Some(transaction_index.as_u64())) + } + None => transaction_indices.push(None), + } + hashes.push(tx.hash.as_bytes().to_vec()); + from_addresses.push(tx.from.as_bytes().to_vec()); + match tx.to { + Some(to_address) => to_addresses.push(Some(to_address.as_bytes().to_vec())), + None => to_addresses.push(None), + } + nonces.push(tx.nonce.as_u64()); + values.push(tx.value.to_string()); + inputs.push(tx.input.to_vec()); + gas.push(tx.gas.as_u32()); + gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); + transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); + max_priority_fee_per_gas + .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); + max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); + chain_ids.push(tx.chain_id.map(|value| value.as_u64())); } - None => transaction_indices.push(None), - } - hashes.push(tx.hash.as_bytes().to_vec()); - from_addresses.push(tx.from.as_bytes().to_vec()); - match tx.to { - Some(to_address) => to_addresses.push(Some(to_address.as_bytes().to_vec())), - None => to_addresses.push(None), } - nonces.push(tx.nonce.as_u64()); - values.push(tx.value.to_string()); - inputs.push(tx.input.to_vec()); - gas.push(tx.gas.as_u32()); - gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); - transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); - max_priority_fee_per_gas.push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); - max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); - chain_ids.push(tx.chain_id.map(|value| value.as_u64())); + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index 6ed15c6a..fafe0640 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -119,11 +119,16 @@ async fn vm_traces_to_df( n_rows: 0, }; - while let Some((number, Ok(block_traces))) = rx.recv().await { - for (tx_pos, block_trace) in block_traces.into_iter().enumerate() { - if let Some(vm_trace) = block_trace.vm_trace { - add_ops(vm_trace, schema, &mut columns, number, tx_pos as u32) + while let Some(message) = rx.recv().await { + match message { + (number, Ok(block_traces)) => { + for (tx_pos, block_trace) in block_traces.into_iter().enumerate() { + if let Some(vm_trace) = block_trace.vm_trace { + add_ops(vm_trace, schema, &mut columns, number, tx_pos as u32) + } + } } + _ => return Err(CollectError::TooManyRequestsError), } } diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 5e3b042c..321146c6 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -69,10 +69,11 @@ async fn freeze_chunk( } } Err(_e) => { + println!("chunk failed: {:?}", _e); return FreezeChunkSummary { skipped: false, errored: true, - } + }; } } } diff --git a/crates/freeze/src/types/error_types.rs b/crates/freeze/src/types/error_types.rs index cd534b7c..163f1aae 100644 --- a/crates/freeze/src/types/error_types.rs +++ b/crates/freeze/src/types/error_types.rs @@ -45,6 +45,10 @@ pub enum CollectError { /// Error related to bad schema #[error("Bad schema specified")] BadSchemaError, + + /// Error related to too many requests + #[error("try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests")] + TooManyRequestsError, } /// Error performing a chunk operation