Skip to content

Commit

Permalink
bitcoind_rpc.rs: now using a r2d2 connection pool since
Browse files Browse the repository at this point in the history
bitcoindcore_rpc::Client is not thread-safe.

SurmountSystems#5
  • Loading branch information
jbride committed Nov 1, 2024
1 parent a4a23eb commit ac0dfcb
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ tower-http = { version = "0.5.2", features = ["full"] }
# This causes rusqlite to compile its own private libsqlite3 and link it with your Rust code, instead of using /usr/lib/x86_64-linux-gnu/libsqlite3.so
rusqlite = { version = "0.32.1", features = ["bundled"] }
r2d2_sqlite = "0.25.0"
r2d2 = "0.8.10"
axum = "0.7"
rand = "0.8.5"
r2d2 = "0.8.10"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ $ cargo test

Gabriel uses the following environment variables:

- API_SOCKET_URL
- WEB_SOCKET_URL
- set to a valid unix domain socket path to enable the HTTP API
- defaults to: 127.0.0.1:3000
- BITCOIND_RPC_URL
Expand Down
56 changes: 42 additions & 14 deletions src/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,43 @@ use bitcoin::{Address, Amount, BlockHash};
use bitcoin::hashes::sha256d::Hash;
use bitcoincore_rpc::json::{self, GetAddressInfoResult, ListUnspentResultEntry};
use bitcoincore_rpc::{Auth, Client, RpcApi};
use r2d2;

/// bitcoincore_rpc::Client is not threadsafe, so we need to manage connections in an r2d2 pool
/// Represents a connection to a Bitcoin Core RPC interface
#[derive(Debug)]
pub struct BitcoindRpcInfo {
rpc_client: Client,
rpc_pool: r2d2::Pool<BitcoindConnectionManager>,
}

#[derive(Debug)]
struct BitcoindConnectionManager {
url: String,
auth: Auth,
}

impl r2d2::ManageConnection for BitcoindConnectionManager {
type Connection = Client;
type Error = bitcoincore_rpc::Error;

fn connect(&self) -> Result<Client, Self::Error> {
Client::new(&self.url, self.auth.clone())
}

fn is_valid(&self, _: &mut Client) -> Result<(), Self::Error> {
// Optional: Add validation logic here
Ok(())
}

fn has_broken(&self, _: &mut Client) -> bool {
false
}
}

impl BitcoindRpcInfo {
pub fn new() -> Result<Self> {
let url = env::var("BITCOIND_RPC_URL").map_err(|e| anyhow!("Missing BITCOIND_RPC_URL environment variable: {}", e))?;
let url = env::var("BITCOIND_RPC_URL")
.map_err(|e| anyhow!("Missing BITCOIND_RPC_URL environment variable: {}", e))?;

let auth = match env::var("BITCOIND_RPC_COOKIE_PATH") {
Ok(cookiefile) => Auth::CookieFile(cookiefile.into()),
Expand All @@ -27,25 +54,28 @@ impl BitcoindRpcInfo {
Auth::UserPass(user, pass)
}
};

Client::new(&url, auth)
.map(|rpc_client| BitcoindRpcInfo { rpc_client })
.map_err(|e| anyhow!("Failed to create RPC client: {}", e))

let manager = BitcoindConnectionManager { url, auth };
let pool = r2d2::Pool::builder()
.max_size(10) // Adjust pool size as needed
.build(manager)
.map_err(|e| anyhow!("Failed to create connection pool: {}", e))?;

Ok(BitcoindRpcInfo { rpc_pool: pool })
}

pub fn get_bitcoind_info_for_test_p2pk(
&self,
output_amount_btc: f64,
) -> Result<(ListUnspentResultEntry, GetAddressInfoResult, Address, Amount)> {
// Get network relay fee
let network_relay_fee = self.rpc_client.get_network_info()
let network_relay_fee = self.rpc_pool.get()?.get_network_info()
.map_err(|e| anyhow!("Failed to get network info: {}", e))?
.relay_fee;
let output_tx_total = network_relay_fee.to_btc() + output_amount_btc;

// Find suitable UTXO
let unspent_vec = self.rpc_client
.list_unspent(Some(3), None, None, None, None)
let unspent_vec = self.rpc_pool.get()?.list_unspent(Some(3), None, None, None, None)
.map_err(|e| anyhow!("Failed to list unspent transactions: {}", e))?;

let unspent_tx = unspent_vec
Expand All @@ -58,13 +88,11 @@ impl BitcoindRpcInfo {
.ok_or_else(|| anyhow!("UTXO has no address"))?
.assume_checked();

let input_utxo_address_info = self.rpc_client
.get_address_info(&input_utxo_address)
let input_utxo_address_info = self.rpc_pool.get()?.get_address_info(&input_utxo_address)
.map_err(|e| anyhow!("Failed to get address info: {}", e))?;

// Get change address
let change_addr = self.rpc_client
.get_raw_change_address(Some(json::AddressType::Bech32))
let change_addr = self.rpc_pool.get()?.get_raw_change_address(Some(json::AddressType::Bech32))
.map_err(|e| anyhow!("Failed to get change address: {}", e))?
.assume_checked();

Expand All @@ -73,7 +101,7 @@ impl BitcoindRpcInfo {

pub fn get_block_height(&self, sha256d_hash: &Hash) -> Result<usize> {
let hash = BlockHash::from_raw_hash(*sha256d_hash);
let block_header = self.rpc_client.get_block_header_info(&hash)?;
let block_header = self.rpc_pool.get()?.get_block_header_info(&hash)?;
Ok(block_header.height)
}
}
165 changes: 84 additions & 81 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use block::{
};
use clap::{Parser, Subcommand};
use nom::AsBytes;
use persistence::SQLitePersistence;
use tokio::sync::broadcast;
use zeromq::{Socket, SocketRecv};

Expand All @@ -27,7 +26,7 @@ use indicatif::ProgressBar;

use axum::routing::get;
use std::net::SocketAddr;
use tower_http::services::{ServeDir, ServeFile};
use tower_http::services::ServeDir;
use tower_http::trace::TraceLayer;
use tower::ServiceBuilder;

Expand Down Expand Up @@ -102,17 +101,12 @@ async fn main() -> Result<()> {
match &cli.command {
Commands::BlockFileEval(args) => run_block_file_eval(args),
Commands::Index(args) => run_index(args),
Commands::BlockAsyncEval(args) => evaluate_blocks_continuously(args).await,
Commands::Graph(args) => run_graph(args),
Commands::GenerateP2PKTx(args) => generate_p2pk_tx(args),
Commands::BlockAsyncEval(args) => run_async_block_eval_listener(args).await,
}
}

fn generate_p2pk_tx(args: &GenerateP2PKTxArgs) -> Result<()> {
let to_amount = Amount::from_str(&args.output_amount_btc)?;
let e_master_key = &args.extended_master_private_key;
p2pktx::generate_p2pk_tx(e_master_key, to_amount)
}

fn run_block_file_eval(args: &BlockFileEvalArgs) -> Result<()> {
// Maps previous block hash to next merkle root
Expand Down Expand Up @@ -178,8 +172,73 @@ fn run_block_file_eval(args: &BlockFileEvalArgs) -> Result<()> {
Ok(())
}

async fn run_async_block_eval_listener(args: &BlockAsyncEvalArgs) -> Result<()> {
fn run_index(args: &IndexArgs) -> Result<()> {
// Maps previous block hash to next merkle root
let header_map: HeaderMap = Default::default();
// Maps txid to tx value
let tx_map: TxMap = Default::default();
// Maps header hash to result Record
let result_map: ResultMap = Default::default();

if let Err(e) = process_blocks_in_parallel(&args.input, &result_map, &tx_map, &header_map) {
eprintln!("Failed to process blocks: {:?}", e);
}
let mut out: Vec<String> = vec![];
let mut last_block_hash: [u8; 32] =
hex::decode("4860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000")
.unwrap()
.try_into()
.expect("slice with incorrect length"); // Genesis block
let mut height = 0;
let mut p2pk_addresses = 0;
let mut p2pk_coins = 0.0;
while let Some(next_block_hash) = header_map.read().unwrap().get(&last_block_hash) {
// println!("Next block hash: {:?}", hex::encode(next_block_hash.1));
let result_map_read = result_map.read().unwrap();
let record = result_map_read.get(next_block_hash);
if let Some(record) = record {
let Record {
date,
p2pk_addresses_added,
p2pk_sats_added,
p2pk_addresses_spent,
p2pk_sats_spent,
} = &record;
p2pk_addresses += p2pk_addresses_added;
p2pk_addresses -= p2pk_addresses_spent;
p2pk_coins += p2pk_sats_added.to_owned() as f64 / 100_000_000.0;
p2pk_coins -= p2pk_sats_spent.to_owned() as f64 / 100_000_000.0;
out.push(format!("{height},{date},{p2pk_addresses},{p2pk_coins}"));
}
height += 1;
last_block_hash = *next_block_hash;
}

println!("Last block hash: {:?}", hex::encode(last_block_hash));
println!("Height: {}", height);

let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&args.output)?;

// When writing back to the file, ensure we start from the beginning
file.seek(std::io::SeekFrom::Start(0))?;
file.set_len(0)?; // Truncate the file

file.write_all(HEADER.as_bytes())?;
for line in &out {
writeln!(file, "{}", line)?;
}

Ok(())
}

async fn evaluate_blocks_continuously(args: &BlockAsyncEvalArgs) -> Result<()> {

// Create a SQLite persistence instance with a connection pool
let sqlite_persistence = persistence::SQLitePersistence::new()?;

// Create a broadcast channel for SSE events
Expand All @@ -192,27 +251,29 @@ async fn run_async_block_eval_listener(args: &BlockAsyncEvalArgs) -> Result<()>
tx: tx
});

// Clone the Arc for the ZMQ listener to use
// Clone the Arc for the ZMQ listener to use (later in this function)
let zmq_state = Arc::clone(&app_state);

let app = axum::Router::new()
// Define routes for REST API, SSE stream, and React frontend
let web_app_router = axum::Router::new()
.route("/api/aggregates", get(api::get_aggregates))
.route("/api/block/hash/:hash", get(api::get_block_by_hash))
.route("/api/block/height/:height", get(api::get_block_by_height))
.route("/api/blocks/stream", get(api::stream_blocks))
.nest_service("/", ServeDir::new("web/build"))
.with_state(app_state);

let addr: SocketAddr = env::var("API_SOCKET_ADDR")
// Determine socket that web_app will bind to
let web_addr: SocketAddr = env::var("WEB_SOCKET_ADDR")
.unwrap_or_else(|_| "127.0.0.1:3000".to_string())
.parse()
.expect("Failed to parse API_ADDR");
println!("REST API listening on {}", addr);
println!("REST API listening on {}", web_addr);

// Spawn the server in the background
// Spawn the web app server in the background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app.into_make_service())
let listener = tokio::net::TcpListener::bind(web_addr).await.unwrap();
axum::serve(listener, web_app_router.into_make_service())
.await
.unwrap();
});
Expand Down Expand Up @@ -254,7 +315,6 @@ async fn run_async_block_eval_listener(args: &BlockAsyncEvalArgs) -> Result<()>

let bitcoind_info = BitcoindRpcInfo::new()?;


loop {
let zmq_message = socket.recv().await?;

Expand Down Expand Up @@ -298,76 +358,19 @@ async fn run_async_block_eval_listener(args: &BlockAsyncEvalArgs) -> Result<()>
}
}

fn run_index(args: &IndexArgs) -> Result<()> {
// Maps previous block hash to next merkle root
let header_map: HeaderMap = Default::default();
// Maps txid to tx value
let tx_map: TxMap = Default::default();
// Maps header hash to result Record
let result_map: ResultMap = Default::default();

if let Err(e) = process_blocks_in_parallel(&args.input, &result_map, &tx_map, &header_map) {
eprintln!("Failed to process blocks: {:?}", e);
}
let mut out: Vec<String> = vec![];
let mut last_block_hash: [u8; 32] =
hex::decode("4860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000")
.unwrap()
.try_into()
.expect("slice with incorrect length"); // Genesis block
let mut height = 0;
let mut p2pk_addresses = 0;
let mut p2pk_coins = 0.0;
while let Some(next_block_hash) = header_map.read().unwrap().get(&last_block_hash) {
// println!("Next block hash: {:?}", hex::encode(next_block_hash.1));
let result_map_read = result_map.read().unwrap();
let record = result_map_read.get(next_block_hash);
if let Some(record) = record {
let Record {
date,
p2pk_addresses_added,
p2pk_sats_added,
p2pk_addresses_spent,
p2pk_sats_spent,
} = &record;
p2pk_addresses += p2pk_addresses_added;
p2pk_addresses -= p2pk_addresses_spent;
p2pk_coins += p2pk_sats_added.to_owned() as f64 / 100_000_000.0;
p2pk_coins -= p2pk_sats_spent.to_owned() as f64 / 100_000_000.0;
out.push(format!("{height},{date},{p2pk_addresses},{p2pk_coins}"));
}
height += 1;
last_block_hash = *next_block_hash;
}

println!("Last block hash: {:?}", hex::encode(last_block_hash));
println!("Height: {}", height);

let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&args.output)?;

// When writing back to the file, ensure we start from the beginning
file.seek(std::io::SeekFrom::Start(0))?;
file.set_len(0)?; // Truncate the file

file.write_all(HEADER.as_bytes())?;
for line in &out {
writeln!(file, "{}", line)?;
}

Ok(())
}

fn run_graph(_args: &GraphArgs) -> Result<()> {
// TODO: Implement graph functionality
println!("Graph functionality not yet implemented");
Ok(())
}


fn generate_p2pk_tx(args: &GenerateP2PKTxArgs) -> Result<()> {
let to_amount = Amount::from_str(&args.output_amount_btc)?;
let e_master_key = &args.extended_master_private_key;
p2pktx::generate_p2pk_tx(e_master_key, to_amount)
}

fn append_single_block_result_to_file(
mut file: &File,
block_aggregate: &BlockAggregateOutput
Expand Down

0 comments on commit ac0dfcb

Please sign in to comment.