Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx input #20

Merged
merged 3 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ pub struct Args {
pub datatype: Vec<String>,

/// Block numbers, see syntax below
#[arg(short, long, allow_hyphen_values(true), help_heading = "Content Options")]
pub blocks: Option<Vec<String>>,

/// Transaction hashes, see syntax below
#[arg(
short,
long,
default_value = "0:latest",
allow_hyphen_values(true),
help_heading = "Content Options",
num_args = 1..
num_args(1..),
)]
pub blocks: Vec<String>,
pub txs: Option<Vec<String>>,

/// Align block chunk boundaries to regular intervals,
/// e.g. (1000, 2000, 3000) instead of (1106, 2106, 3106)
Expand All @@ -36,14 +38,6 @@ pub struct Args {
)]
pub reorg_buffer: u64,

// #[arg(
// short,
// long,
// allow_hyphen_values(true),
// help_heading = "Content Options",
// help = "Select by data transaction instead of by block,\ncan be a list or a file, see
// syntax below", )]
// pub txs: Vec<String>,
/// Columns to include alongside the default output,
/// use `all` to include all available columns
#[arg(short, long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")]
Expand Down Expand Up @@ -203,6 +197,12 @@ fn get_after_str() -> &'static str {
- omitting range start means 0 <white><bold>:700</bold></white> == <white><bold>0:700</bold></white>
- minus on start means minus end <white><bold>-1000:7000</bold></white> == <white><bold>6000:7000</bold></white>
- plus sign on end means plus start <white><bold>15M:+1000</bold></white> == <white><bold>15M:15.001K</bold></white>

<white><bold>Transaction hash specification syntax</bold></white>
- can use transaction hashes <white><bold>--txs TX_HASH1 TX_HASH2 TX_HASH3</bold></white>
- can use a parquet file <white><bold>--txs ./path/to/file.parquet[:COLUMN_NAME]</bold></white>
(default column name is <white><bold>transaction_hash</bold></white>)
- can use multiple parquet files <white><bold>--txs ./path/to/ethereum__logs*.parquet</bold></white>
"#
)
}
Expand Down
31 changes: 30 additions & 1 deletion crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,50 @@ pub(crate) async fn parse_blocks(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
let block_chunks = parse_block_inputs(&args.blocks, &provider).await?;
// parse inputs into BlockChunks
let block_chunks = match &args.blocks {
Some(inputs) => parse_block_inputs(inputs, &provider).await?,
None => return Err(ParseError::ParseError("could not parse block inputs".to_string())),
};

postprocess_block_chunks(block_chunks, args, provider).await
}

async fn postprocess_block_chunks(
block_chunks: Vec<BlockChunk>,
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
// align
let block_chunks = if args.align {
block_chunks.into_iter().filter_map(|x| x.align(args.chunk_size)).collect()
} else {
block_chunks
};

// split block range into chunks
let block_chunks = match args.n_chunks {
Some(n_chunks) => block_chunks.subchunk_by_count(&n_chunks),
None => block_chunks.subchunk_by_size(&args.chunk_size),
};

// apply reorg buffer
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?;

// put into Chunk enums
let chunks: Vec<Chunk> = block_chunks.iter().map(|x| Chunk::Block(x.clone())).collect();

Ok(chunks)
}

pub(crate) async fn get_default_block_chunks(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
let block_chunks = parse_block_inputs(&vec!["0:latest".to_string()], &provider).await?;
postprocess_block_chunks(block_chunks, args, provider).await
}

/// parse block numbers to freeze
async fn parse_block_inputs(
inputs: &Vec<String>,
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod blocks;
mod file_output;
mod query;
mod source;
mod transactions;

pub use args::*;
// use blocks::*;
Expand Down
11 changes: 9 additions & 2 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ use hex::FromHex;

use cryo_freeze::{ColumnEncoding, Datatype, FileFormat, MultiQuery, ParseError, RowFilter, Table};

use super::{blocks, file_output};
use super::{blocks, file_output, transactions};
use crate::args::Args;

pub(crate) async fn parse_query(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<MultiQuery, ParseError> {
let chunks = blocks::parse_blocks(args, provider).await?;
let chunks = match (&args.blocks, &args.txs) {
(Some(_), None) => blocks::parse_blocks(args, provider).await?,
(None, Some(txs)) => transactions::parse_transactions(txs)?,
(None, None) => blocks::get_default_block_chunks(args, provider).await?,
(Some(_), Some(_)) => {
return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string()))
}
};

// process schemas
let schemas = parse_schemas(args)?;
Expand Down
66 changes: 66 additions & 0 deletions crates/cli/src/parse/transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use cryo_freeze::{Chunk, ParseError, TransactionChunk};
use polars::prelude::*;

pub(crate) fn parse_transactions(txs: &[String]) -> Result<Vec<Chunk>, ParseError> {
let (files, hashes): (Vec<&String>, Vec<&String>) =
txs.iter().partition(|tx| std::path::Path::new(tx).exists());

let mut file_chunks = if !files.is_empty() {
let mut file_chunks = Vec::new();
for path in files {
let column = if path.contains(':') {
path.split(':')
.last()
.ok_or(ParseError::ParseError("could not parse txs path column".to_string()))?
} else {
"transaction_hash"
};
let tx_hashes = read_binary_column(path, column)
.map_err(|_e| ParseError::ParseError("could not read input".to_string()))?;
let chunk = TransactionChunk::Values(tx_hashes);
file_chunks.push(Chunk::Transaction(chunk));
}
file_chunks
} else {
Vec::new()
};

let hash_chunks = if !hashes.is_empty() {
let values: Result<Vec<Vec<u8>>, _> = hashes.iter().map(hex::decode).collect();
let values =
values.map_err(|_e| ParseError::ParseError("could not parse txs".to_string()))?;
let chunk = Chunk::Transaction(TransactionChunk::Values(values));
vec![chunk]
} else {
Vec::new()
};

file_chunks.extend(hash_chunks);
Ok(file_chunks)
}

fn read_binary_column(path: &str, column: &str) -> Result<Vec<Vec<u8>>, ParseError> {
let file = std::fs::File::open(path)
.map_err(|_e| ParseError::ParseError("could not open file path".to_string()))?;

let df = ParquetReader::new(file)
.with_columns(Some(vec![column.to_string()]))
.finish()
.map_err(|_e| ParseError::ParseError("could not read data from column".to_string()))?;

let series = df
.column(column)
.map_err(|_e| ParseError::ParseError("could not get column".to_string()))?;

let ca = series
.binary()
.map_err(|_e| ParseError::ParseError("could not convert to binary column".to_string()))?;

ca.into_iter()
.map(|value| {
value
.ok_or_else(|| ParseError::ParseError("transaction hash missing".to_string()))
.map(|data| data.into())
})
.collect()
}
Loading
Loading