Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the error during dry run #2365

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).


### Fixed
- [2365](https://github.com/FuelLabs/fuel-core/pull/2365): Fixed the error during dry run in the case of race condition.
- [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected.
- [2369](https://github.com/FuelLabs/fuel-core/pull/2369): The `transaction_insertion_time_in_thread_pool_milliseconds` metric is properly collected.
- [2413](https://github.com/FuelLabs/fuel-core/issues/2413): block production immediately errors if unable to lock the mutex.
Expand Down
2 changes: 0 additions & 2 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,6 @@ where
consensus: seal,
};

block.entity.header().time();

Comment on lines -362 to -363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

// Import the sealed block
self.block_importer
.commit_result(Uncommitted::new(
Expand Down
81 changes: 51 additions & 30 deletions crates/services/producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,20 @@ where

let da_height = predefined_block.header().application().da_height;

let view = self.view_provider.latest_view()?;

let header_to_produce =
self.new_header_with_da_height(height, block_time, da_height)?;
self.new_header_with_da_height(height, block_time, da_height, &view)?;

let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;

if header_to_produce.height() <= &latest_height {
return Err(Error::BlockHeightShouldBeHigherThanPrevious {
height,
previous_block: latest_height,
}
.into())
}

let maybe_mint_tx = transactions_source.pop();
let mint_tx =
Expand Down Expand Up @@ -196,10 +208,22 @@ where

let source = tx_source(gas_price, height).await?;

let view = self.view_provider.latest_view()?;

let header = self
.new_header_with_new_da_height(height, block_time)
.new_header_with_new_da_height(height, block_time, &view)
.await?;

let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;

if header.height() <= &latest_height {
return Err(Error::BlockHeightShouldBeHigherThanPrevious {
height,
previous_block: latest_height,
}
.into())
}

let component = Components {
header_to_produce: header,
transactions_source: source,
Expand Down Expand Up @@ -309,7 +333,7 @@ where
.unwrap_or(Tai64::UNIX_EPOCH)
});

let header = self.new_header(simulated_height, simulated_time)?;
let header = self.new_header(simulated_height, simulated_time, &view)?;

let gas_price = if let Some(inner) = gas_price {
inner
Expand Down Expand Up @@ -367,8 +391,9 @@ where
&self,
height: BlockHeight,
block_time: Tai64,
view: &ViewProvider::LatestView,
) -> anyhow::Result<PartialBlockHeader> {
let mut block_header = self.new_header(height, block_time)?;
let mut block_header = self.new_header(height, block_time, view)?;
let previous_da_height = block_header.da_height;
let gas_limit = self
.consensus_parameters_provider
Expand All @@ -390,11 +415,13 @@ where
height: BlockHeight,
block_time: Tai64,
da_height: DaBlockHeight,
view: &ViewProvider::LatestView,
) -> anyhow::Result<PartialBlockHeader> {
let mut block_header = self.new_header(height, block_time)?;
let mut block_header = self.new_header(height, block_time, view)?;
block_header.application.da_height = da_height;
Ok(block_header)
}

async fn select_new_da_height(
&self,
gas_limit: u64,
Expand Down Expand Up @@ -447,9 +474,9 @@ where
&self,
height: BlockHeight,
block_time: Tai64,
view: &ViewProvider::LatestView,
) -> anyhow::Result<PartialBlockHeader> {
let view = self.view_provider.latest_view()?;
let previous_block_info = self.previous_block_info(height, &view)?;
let previous_block_info = self.previous_block_info(height, view)?;
let consensus_parameters_version = view.latest_consensus_parameters_version()?;
let state_transition_bytecode_version =
view.latest_state_transition_bytecode_version()?;
Expand All @@ -475,29 +502,23 @@ where
height: BlockHeight,
view: &ViewProvider::LatestView,
) -> anyhow::Result<PreviousBlockInfo> {
let latest_height = self
.view_provider
.latest_view()?
.latest_height()
.ok_or(Error::NoGenesisBlock)?;
// block 0 is reserved for genesis
if height <= latest_height {
Err(Error::BlockHeightShouldBeHigherThanPrevious {
height,
previous_block: latest_height,
}
.into())
} else {
// get info from previous block height
let prev_height = height.pred().expect("We checked the height above");
let previous_block = view.get_block(&prev_height)?;
let prev_root = view.block_header_merkle_root(&prev_height)?;

Ok(PreviousBlockInfo {
prev_root,
da_height: previous_block.header().da_height,
})
}
let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;

// get info from previous block height
let prev_height =
height
.pred()
.ok_or(Error::BlockHeightShouldBeHigherThanPrevious {
height: 0u32.into(),
previous_block: latest_height,
})?;
let previous_block = view.get_block(&prev_height)?;
let prev_root = view.block_header_merkle_root(&prev_height)?;

Ok(PreviousBlockInfo {
prev_root,
da_height: previous_block.header().da_height,
})
}
}

Expand Down
44 changes: 23 additions & 21 deletions crates/services/producer/src/block_producer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,27 +649,33 @@ mod dry_run {
}

#[tokio::test]
async fn dry_run__errors_early_if_height_is_lower_than_chain_tip() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick scan but couldn't see a test for this on the produce or produce_predefined. Maybe I missed it. Do you think we need that coverage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have test case for BlockHeightShouldBeHigherThanPrevious error.

image

But after introduction of the state rewind feature, I don't think that BlockHeightShouldBeHigherThanPrevious error makes a lot of sense. Basically importer already is responsible and upgradable executor do this kind of check inside, some maybe we can just remove it from the block producer

// Given
let last_block_height = BlockHeight::new(42);

async fn dry_run__success_when_height_is_the_same_as_chain_height() {
let executor = MockExecutorWithCapture::default();
let ctx = TestContextBuilder::new()
.with_prev_height(last_block_height)
.build_with_executor(executor.clone());
let ctx = TestContext::default_from_executor(executor);
let producer = ctx.producer();

// When
let _ = ctx
.producer()
.dry_run(vec![], last_block_height.pred(), None, None, None)
const SAME_HEIGHT: u32 = 1;

// Given
let block = producer
.produce_and_execute_block_txpool(SAME_HEIGHT.into(), Tai64::now())
.await
.expect_err("expected failure");
.unwrap();
producer.view_provider.blocks.lock().unwrap().insert(
SAME_HEIGHT.into(),
block.result().block.clone().compress(&Default::default()),
);

// When
let result = producer
.dry_run(vec![], Some(SAME_HEIGHT.into()), None, None, None)
.await;

// Then
assert!(executor.has_no_captured_block_timestamp());
assert!(result.is_ok(), "{:?}", result);
}

impl MockExecutorWithCapture<Transaction> {
impl MockExecutorWithCapture {
fn captured_block_timestamp(&self) -> Tai64 {
*self
.captured
Expand All @@ -680,10 +686,6 @@ mod dry_run {
.header_to_produce
.time()
}

fn has_no_captured_block_timestamp(&self) -> bool {
self.captured.lock().unwrap().is_none()
}
}
}

Expand Down Expand Up @@ -722,10 +724,10 @@ prop_compose! {
}

#[allow(clippy::arithmetic_side_effects)]
fn ctx_for_block<Tx>(
fn ctx_for_block(
block: &Block,
executor: MockExecutorWithCapture<Tx>,
) -> TestContext<MockExecutorWithCapture<Tx>> {
executor: MockExecutorWithCapture,
) -> TestContext<MockExecutorWithCapture> {
let prev_height = block.header().height().pred().unwrap();
let prev_da_height = block.header().da_height.as_u64() - 1;
TestContextBuilder::new()
Expand Down
62 changes: 13 additions & 49 deletions crates/services/producer/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use fuel_core_types::{
TransactionExecutionStatus,
UncommittedResult,
},
txpool::ArcPoolTx,
},
};
use std::{
Expand Down Expand Up @@ -88,10 +87,10 @@ impl Relayer for MockRelayer {
}

#[derive(Default)]
pub struct MockTxPool(pub Vec<ArcPoolTx>);
pub struct MockTxPool(pub Vec<Transaction>);

impl TxPool for MockTxPool {
type TxSource = Vec<ArcPoolTx>;
type TxSource = Vec<Transaction>;

async fn get_source(&self, _: u64, _: BlockHeight) -> anyhow::Result<Self::TxSource> {
Ok(self.0.clone())
Expand All @@ -113,23 +112,7 @@ impl AsRef<MockDb> for MockDb {
}
}

fn arc_pool_tx_comp_to_block(component: &Components<Vec<ArcPoolTx>>) -> Block {
let transactions = component
.transactions_source
.clone()
.into_iter()
.map(|tx| tx.as_ref().into())
.collect();
Block::new(
component.header_to_produce,
transactions,
&[],
Default::default(),
)
.unwrap()
}

fn tx_comp_to_block(component: &Components<Vec<Transaction>>) -> Block {
fn arc_pool_tx_comp_to_block(component: &Components<Vec<Transaction>>) -> Block {
let transactions = component.transactions_source.clone();
Block::new(
component.header_to_produce,
Expand All @@ -140,10 +123,10 @@ fn tx_comp_to_block(component: &Components<Vec<Transaction>>) -> Block {
.unwrap()
}

impl BlockProducer<Vec<ArcPoolTx>> for MockExecutor {
impl BlockProducer<Vec<Transaction>> for MockExecutor {
fn produce_without_commit(
&self,
component: Components<Vec<ArcPoolTx>>,
component: Components<Vec<Transaction>>,
) -> ExecutorResult<UncommittedResult<Changes>> {
let block = arc_pool_tx_comp_to_block(&component);
// simulate executor inserting a block
Expand All @@ -166,10 +149,10 @@ impl BlockProducer<Vec<ArcPoolTx>> for MockExecutor {

pub struct FailingMockExecutor(pub Mutex<Option<ExecutorError>>);

impl BlockProducer<Vec<ArcPoolTx>> for FailingMockExecutor {
impl BlockProducer<Vec<Transaction>> for FailingMockExecutor {
fn produce_without_commit(
&self,
component: Components<Vec<ArcPoolTx>>,
component: Components<Vec<Transaction>>,
) -> ExecutorResult<UncommittedResult<Changes>> {
// simulate an execution failure
let mut err = self.0.lock().unwrap();
Expand All @@ -191,35 +174,16 @@ impl BlockProducer<Vec<ArcPoolTx>> for FailingMockExecutor {
}

#[derive(Clone)]
pub struct MockExecutorWithCapture<Tx> {
pub captured: Arc<Mutex<Option<Components<Vec<Tx>>>>>,
pub struct MockExecutorWithCapture {
pub captured: Arc<Mutex<Option<Components<Vec<Transaction>>>>>,
}

impl BlockProducer<Vec<ArcPoolTx>> for MockExecutorWithCapture<ArcPoolTx> {
fn produce_without_commit(
&self,
component: Components<Vec<ArcPoolTx>>,
) -> ExecutorResult<UncommittedResult<Changes>> {
let block = arc_pool_tx_comp_to_block(&component);
*self.captured.lock().unwrap() = Some(component);
Ok(UncommittedResult::new(
ExecutionResult {
block,
skipped_transactions: vec![],
tx_status: vec![],
events: vec![],
},
Default::default(),
))
}
}

impl BlockProducer<Vec<Transaction>> for MockExecutorWithCapture<Transaction> {
impl BlockProducer<Vec<Transaction>> for MockExecutorWithCapture {
fn produce_without_commit(
&self,
component: Components<Vec<Transaction>>,
) -> ExecutorResult<UncommittedResult<Changes>> {
let block = tx_comp_to_block(&component);
let block = arc_pool_tx_comp_to_block(&component);
*self.captured.lock().unwrap() = Some(component);
Ok(UncommittedResult::new(
ExecutionResult {
Expand All @@ -233,7 +197,7 @@ impl BlockProducer<Vec<Transaction>> for MockExecutorWithCapture<Transaction> {
}
}

impl DryRunner for MockExecutorWithCapture<Transaction> {
impl DryRunner for MockExecutorWithCapture {
fn dry_run(
&self,
block: Components<Vec<Transaction>>,
Expand All @@ -245,7 +209,7 @@ impl DryRunner for MockExecutorWithCapture<Transaction> {
}
}

impl<Tx> Default for MockExecutorWithCapture<Tx> {
impl Default for MockExecutorWithCapture {
fn default() -> Self {
Self {
captured: Arc::new(Mutex::new(None)),
Expand Down
Loading