Skip to content

Commit

Permalink
replace Schema struct with Table struct
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Jul 5, 2023
1 parent eac1419 commit 963ea62
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 107 deletions.
9 changes: 5 additions & 4 deletions crates/cli/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use cryo_freeze::Datatype;
use cryo_freeze::FileFormat;
use cryo_freeze::FreezeOpts;
use cryo_freeze::LogOpts;
use cryo_freeze::Schema;
use cryo_freeze::Table;

use crate::args::Args;

Expand Down Expand Up @@ -114,14 +114,15 @@ pub async fn parse_opts() -> Result<FreezeOpts> {
let (max_concurrent_chunks, max_concurrent_blocks) = parse_concurrency_args(&args)?;

// process schemas
let schemas: Result<HashMap<Datatype, Schema>, eyre::Report> = datatypes
let schemas: Result<HashMap<Datatype, Table>, eyre::Report> = datatypes
.iter()
.map(|datatype| {
datatype
.get_schema(
.table_schema(
&binary_column_format,
&args.include_columns,
&args.exclude_columns,
None,
)
.map(|schema| (*datatype, schema))
.wrap_err_with(|| format!("Failed to get schema for datatype: {:?}", datatype))
Expand Down Expand Up @@ -300,7 +301,7 @@ fn parse_compression(input: &Vec<String>) -> Result<ParquetCompression> {

fn parse_sort(
raw_sort: &Vec<String>,
schemas: &HashMap<Datatype, Schema>,
schemas: &HashMap<Datatype, Table>,
) -> Result<HashMap<Datatype, Vec<String>>, eyre::Report> {
if raw_sort.is_empty() {
Ok(HashMap::from_iter(schemas.iter().map(
Expand Down
16 changes: 9 additions & 7 deletions crates/cli/src/summaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cryo_freeze::ChunkAgg;
use cryo_freeze::Datatype;
use cryo_freeze::FreezeOpts;
use cryo_freeze::FreezeSummary;
use cryo_freeze::Schema;
use cryo_freeze::Table;

const TITLE_R: u8 = 0;
const TITLE_G: u8 = 225;
Expand Down Expand Up @@ -78,19 +78,21 @@ pub fn print_cryo_summary(opts: &FreezeOpts) {
print_schemas(&opts.schemas, opts);
}

pub fn print_schemas(schemas: &HashMap<Datatype, Schema>, opts: &FreezeOpts) {
pub fn print_schemas(schemas: &HashMap<Datatype, Table>, opts: &FreezeOpts) {
schemas.iter().for_each(|(name, schema)| {
println!();
println!();
print_schema(name, schema, opts.sort.get(name))
print_schema(name, &schema.clone(), opts.sort.get(name))
})
}

pub fn print_schema(name: &Datatype, schema: &Schema, sort: Option<&Vec<String>>) {
pub fn print_schema(name: &Datatype, schema: &Table, sort: Option<&Vec<String>>) {
print_header("schema for ".to_string() + name.dataset().name());
schema.iter().for_each(|(name, column_type)| {
print_bullet(name, column_type.as_str());
});
for column in schema.columns() {
if let Some(column_type) = schema.column_type(column) {
print_bullet(column, column_type.as_str());
}
}
println!();
if let Some(sort_cols) = sort {
println!(
Expand Down
6 changes: 3 additions & 3 deletions crates/freeze/src/dataframes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[macro_export]
macro_rules! with_series {
($all_series:expr, $name:expr, $value:expr, $schema:expr) => {
if $schema.contains_key($name) {
if $schema.has_column($name) {
$all_series.push(Series::new($name, $value));
}
};
Expand All @@ -12,8 +12,8 @@ macro_rules! with_series {
#[macro_export]
macro_rules! with_series_binary {
($all_series:expr, $name:expr, $value:expr, $schema:expr) => {
if $schema.contains_key($name) {
if let Some(ColumnType::Hex) = $schema.get($name) {
if $schema.has_column($name) {
if let Some(ColumnType::Hex) = $schema.column_type($name) {
$all_series.push(Series::new($name, $value.to_vec_hex()));
} else {
$all_series.push(Series::new($name, $value));
Expand Down
32 changes: 16 additions & 16 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::types::Dataset;
use crate::types::Datatype;
use crate::types::FetchOpts;
use crate::types::FreezeOpts;
use crate::types::Schema;
use crate::types::Table;
use crate::with_series;
use crate::with_series_binary;

Expand Down Expand Up @@ -108,22 +108,22 @@ async fn fetch_blocks(

async fn blocks_to_df(
mut blocks: mpsc::Receiver<Result<Option<Block<TxHash>>, CollectError>>,
schema: &Schema,
schema: &Table,
) -> Result<DataFrame, CollectError> {
let include_hash = schema.contains_key("hash");
let include_parent_hash = schema.contains_key("parent_hash");
let include_author = schema.contains_key("author");
let include_state_root = schema.contains_key("state_root");
let include_transactions_root = schema.contains_key("transactions_root");
let include_receipts_root = schema.contains_key("receipts_root");
let include_number = schema.contains_key("number");
let include_gas_used = schema.contains_key("gas_used");
let include_extra_data = schema.contains_key("extra_data");
let include_logs_bloom = schema.contains_key("logs_bloom");
let include_timestamp = schema.contains_key("timestamp");
let include_total_difficulty = schema.contains_key("total_difficulty");
let include_size = schema.contains_key("size");
let include_base_fee_per_gas = schema.contains_key("base_fee_per_gas");
let include_hash = schema.has_column("hash");
let include_parent_hash = schema.has_column("parent_hash");
let include_author = schema.has_column("author");
let include_state_root = schema.has_column("state_root");
let include_transactions_root = schema.has_column("transactions_root");
let include_receipts_root = schema.has_column("receipts_root");
let include_number = schema.has_column("number");
let include_gas_used = schema.has_column("gas_used");
let include_extra_data = schema.has_column("extra_data");
let include_logs_bloom = schema.has_column("logs_bloom");
let include_timestamp = schema.has_column("timestamp");
let include_total_difficulty = schema.has_column("total_difficulty");
let include_size = schema.has_column("size");
let include_base_fee_per_gas = schema.has_column("base_fee_per_gas");

let capacity = 0;
let mut hash: Vec<Vec<u8>> = Vec::with_capacity(capacity);
Expand Down
4 changes: 2 additions & 2 deletions crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::types::FetchOpts;
use crate::types::FreezeOpts;
use crate::types::LogOpts;
use crate::types::Logs;
use crate::types::Schema;
use crate::types::Table;

#[async_trait::async_trait]
impl Dataset for Logs {
Expand Down Expand Up @@ -109,7 +109,7 @@ async fn fetch_logs(

async fn logs_to_df(
mut logs: mpsc::Receiver<Result<Vec<Log>, CollectError>>,
_schema: &Schema,
_schema: &Table,
) -> Result<DataFrame, CollectError> {
let mut address: Vec<Vec<u8>> = Vec::new();
let mut topic0: Vec<Option<Vec<u8>>> = Vec::new();
Expand Down
8 changes: 4 additions & 4 deletions crates/freeze/src/datasets/state_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::types::CollectError;
use crate::types::Datatype;
use crate::types::FetchOpts;
use crate::types::FreezeOpts;
use crate::types::Schema;
use crate::types::Table;

pub(crate) async fn collect_single(
datatype: &Datatype,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub(crate) async fn fetch_state_diffs(

async fn state_diffs_to_df(
mut rx: mpsc::Receiver<(u64, Result<Vec<BlockTrace>, CollectError>)>,
schemas: &HashMap<Datatype, Schema>,
schemas: &HashMap<Datatype, Table>,
) -> Result<HashMap<Datatype, DataFrame>, PolarsError> {
let include_storage = schemas.contains_key(&Datatype::StorageDiffs);
let include_balance = schemas.contains_key(&Datatype::BalanceDiffs);
Expand Down Expand Up @@ -354,12 +354,12 @@ async fn state_diffs_to_df(
}

fn included(
schemas: &HashMap<Datatype, Schema>,
schemas: &HashMap<Datatype, Table>,
datatype: Datatype,
column_name: &'static str,
) -> bool {
if let Some(schema) = schemas.get(&datatype) {
schema.contains_key(column_name)
schema.has_column(column_name)
} else {
false
}
Expand Down
44 changes: 22 additions & 22 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::types::Dataset;
use crate::types::Datatype;
use crate::types::FetchOpts;
use crate::types::FreezeOpts;
use crate::types::Schema;
use crate::types::Table;
use crate::types::Traces;

#[async_trait::async_trait]
Expand Down Expand Up @@ -150,28 +150,28 @@ fn action_call_type_to_string(action_call_type: &CallType) -> String {

async fn traces_to_df(
mut rx: mpsc::Receiver<Result<Vec<Trace>, CollectError>>,
schema: &Schema,
schema: &Table,
) -> Result<DataFrame, CollectError> {
let include_action_from = schema.contains_key("action_from");
let include_action_to = schema.contains_key("action_to");
let include_action_value = schema.contains_key("action_value");
let include_action_gas = schema.contains_key("action_gas");
let include_action_input = schema.contains_key("action_input");
let include_action_call_type = schema.contains_key("action_call_type");
let include_action_init = schema.contains_key("action_init");
let include_action_reward_type = schema.contains_key("action_reward_type");
let include_action_type = schema.contains_key("action_type");
let include_result_gas_used = schema.contains_key("result_gas_used");
let include_result_output = schema.contains_key("result_output");
let include_result_code = schema.contains_key("result_code");
let include_result_address = schema.contains_key("result_address");
let include_trace_address = schema.contains_key("trace_address");
let include_subtraces = schema.contains_key("subtraces");
let include_transaction_position = schema.contains_key("transaction_position");
let include_transaction_hash = schema.contains_key("transaction_hash");
let include_block_number = schema.contains_key("block_number");
let include_block_hash = schema.contains_key("block_hash");
let include_error = schema.contains_key("error");
let include_action_from = schema.has_column("action_from");
let include_action_to = schema.has_column("action_to");
let include_action_value = schema.has_column("action_value");
let include_action_gas = schema.has_column("action_gas");
let include_action_input = schema.has_column("action_input");
let include_action_call_type = schema.has_column("action_call_type");
let include_action_init = schema.has_column("action_init");
let include_action_reward_type = schema.has_column("action_reward_type");
let include_action_type = schema.has_column("action_type");
let include_result_gas_used = schema.has_column("result_gas_used");
let include_result_output = schema.has_column("result_output");
let include_result_code = schema.has_column("result_code");
let include_result_address = schema.has_column("result_address");
let include_trace_address = schema.has_column("trace_address");
let include_subtraces = schema.has_column("subtraces");
let include_transaction_position = schema.has_column("transaction_position");
let include_transaction_hash = schema.has_column("transaction_hash");
let include_block_number = schema.has_column("block_number");
let include_block_hash = schema.has_column("block_hash");
let include_error = schema.has_column("error");

let capacity = 0;
let mut action_from: Vec<Option<Vec<u8>>> = Vec::with_capacity(capacity);
Expand Down
Loading

0 comments on commit 963ea62

Please sign in to comment.