From 8d23543ebf222a2726948cc759b7168b8a68bf57 Mon Sep 17 00:00:00 2001 From: maskpp Date: Wed, 23 Oct 2024 10:00:38 +0800 Subject: [PATCH 1/5] fix bug about txPoolContentWithMinTip api --- Cargo.lock | 6 +- crates/ethereum/evm/src/execute.rs | 2 +- crates/evm/Cargo.toml | 1 + crates/evm/src/execute.rs | 58 +++++--- crates/taiko/consensus/proposer/Cargo.toml | 3 - crates/taiko/consensus/proposer/src/lib.rs | 97 ++----------- crates/taiko/consensus/proposer/src/task.rs | 100 ++++--------- crates/taiko/evm/Cargo.toml | 2 + crates/taiko/evm/src/execute.rs | 147 +++++++++++++++++--- crates/taiko/payload/builder/src/builder.rs | 2 +- crates/taiko/payload/builder/src/lib.rs | 3 - 11 files changed, 219 insertions(+), 202 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e84cabf85de..82054e4559c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7203,6 +7203,7 @@ dependencies = [ name = "reth-evm" version = "1.0.0" dependencies = [ + "alloy-rpc-types-eth", "auto_impl", "futures-util", "parking_lot 0.12.3", @@ -9793,6 +9794,7 @@ version = "1.0.0" dependencies = [ "alloy-eips", "alloy-sol-types", + "flate2", "reth-chainspec", "reth-consensus", "reth-evm", @@ -9800,6 +9802,7 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-revm", + "reth-rpc-types-compat", "reth-testing-utils", "revm-primitives", "secp256k1", @@ -9880,7 +9883,6 @@ name = "taiko-reth-proposer-consensus" version = "1.0.0" dependencies = [ "alloy-rlp", - "flate2", "futures-util", "reth-chainspec", "reth-consensus", @@ -9890,8 +9892,6 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", - "reth-rpc-types", - "reth-rpc-types-compat", "reth-transaction-pool", "tokio", "tracing", diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 00256e5bfb77..83acf649ce20 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -379,7 +379,7 @@ where // NOTE: we need to merge keep the reverts for the bundle retention self.state.merge_transitions(BundleRetention::Reverts); - Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used }) + Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used, target_list: vec![] }) } } diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index 7fa52726648c..a425a569ee20 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -19,6 +19,7 @@ revm-primitives.workspace = true reth-prune-types.workspace = true reth-storage-errors.workspace = true reth-execution-types.workspace = true +alloy-rpc-types-eth.workspace = true revm.workspace = true diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 5ede6c78ff8e..26757af117af 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -5,6 +5,7 @@ use reth_primitives::{BlockNumber, BlockWithSenders, Receipt, Request, U256}; use reth_prune_types::PruneModes; use revm::db::BundleState; use revm_primitives::db::Database; +use alloy_rpc_types_eth::transaction::Transaction; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -55,7 +56,7 @@ pub trait BatchExecutor { /// for each input. fn execute_and_verify_many<'a, I>(&mut self, inputs: I) -> Result<(), Self::Error> where - I: IntoIterator>, + I: IntoIterator>, { for input in inputs { self.execute_and_verify_one(input)?; @@ -69,7 +70,7 @@ pub trait BatchExecutor { /// and [`BatchExecutor::finalize`]. fn execute_and_verify_batch<'a, I>(mut self, batch: I) -> Result where - I: IntoIterator>, + I: IntoIterator>, Self: Sized, { self.execute_and_verify_many(batch)?; @@ -90,6 +91,17 @@ pub trait BatchExecutor { fn size_hint(&self) -> Option; } +/// Result of the trigger +#[derive(Debug, Clone)] +pub struct TaskResult { + /// Transactions + pub txs: Vec, + /// Estimated gas used + pub estimated_gas_used: u64, + /// Bytes length + pub bytes_length: u64, +} + /// The output of an ethereum block. /// /// Contains the state changes, transaction receipts, and total gas used in the block. @@ -105,6 +117,8 @@ pub struct BlockExecutionOutput { pub requests: Vec, /// The total gas used by the block. pub gas_used: u64, + /// The target list. + pub target_list: Vec, } /// A helper type for ethereum block inputs that consists of a block and the total difficulty. @@ -118,12 +132,18 @@ pub struct BlockExecutionInput<'a, Block> { pub enable_anchor: bool, /// Enable skip invalid transaction. pub enable_skip: bool, + /// Enable build transaction lists. + pub enable_build: bool, + /// Max compressed bytes. + pub max_bytes_per_tx_list: u64, + /// Max length of transactions list. + pub max_transactions_lists: u64, } impl<'a, Block> BlockExecutionInput<'a, Block> { /// Creates a new input. pub fn new(block: &'a mut Block, total_difficulty: U256) -> Self { - Self { block, total_difficulty, enable_anchor: true, enable_skip: true } + Self { block, total_difficulty, enable_anchor: true, enable_skip: true, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } } } @@ -135,7 +155,7 @@ impl<'a, Block> From<(&'a mut Block, U256)> for BlockExecutionInput<'a, Block> { impl<'a, Block> From<(&'a mut Block, U256, bool)> for BlockExecutionInput<'a, Block> { fn from((block, total_difficulty, enable_anchor): (&'a mut Block, U256, bool)) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip: true } + Self { block, total_difficulty, enable_anchor, enable_skip: true, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } } } @@ -143,7 +163,7 @@ impl<'a, Block> From<(&'a mut Block, U256, bool, bool)> for BlockExecutionInput< fn from( (block, total_difficulty, enable_anchor, enable_skip): (&'a mut Block, U256, bool, bool), ) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip } + Self { block, total_difficulty, enable_anchor, enable_skip, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } } } @@ -160,19 +180,19 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// /// It is not expected to validate the state trie root, this must be done by the caller using /// the returned state. - type Executor>: for<'a> Executor< + type Executor>: for<'a> Executor< DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = BlockExecutionOutput, - Error = BlockExecutionError, + Input<'a>=BlockExecutionInput<'a, BlockWithSenders>, + Output=BlockExecutionOutput, + Error=BlockExecutionError, >; /// An executor that can execute a batch of blocks given a database. - type BatchExecutor>: for<'a> BatchExecutor< + type BatchExecutor>: for<'a> BatchExecutor< DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = ExecutionOutcome, - Error = BlockExecutionError, + Input<'a>=BlockExecutionInput<'a, BlockWithSenders>, + Output=ExecutionOutcome, + Error=BlockExecutionError, >; /// Creates a new executor for single block execution. @@ -180,7 +200,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// This is used to execute a single block and get the changed state. fn executor(&self, db: DB) -> Self::Executor where - DB: Database; + DB: Database; /// Creates a new batch executor with the given database and pruning modes. /// @@ -191,7 +211,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// execution. fn batch_executor(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor where - DB: Database; + DB: Database; } #[cfg(test)] @@ -205,19 +225,19 @@ mod tests { struct TestExecutorProvider; impl BlockExecutorProvider for TestExecutorProvider { - type Executor> = TestExecutor; - type BatchExecutor> = TestExecutor; + type Executor> = TestExecutor; + type BatchExecutor> = TestExecutor; fn executor(&self, _db: DB) -> Self::Executor where - DB: Database, + DB: Database, { TestExecutor(PhantomData) } fn batch_executor(&self, _db: DB, _prune_modes: PruneModes) -> Self::BatchExecutor where - DB: Database, + DB: Database, { TestExecutor(PhantomData) } diff --git a/crates/taiko/consensus/proposer/Cargo.toml b/crates/taiko/consensus/proposer/Cargo.toml index 18602236d600..628876beaf51 100644 --- a/crates/taiko/consensus/proposer/Cargo.toml +++ b/crates/taiko/consensus/proposer/Cargo.toml @@ -21,8 +21,6 @@ reth-revm.workspace = true reth-transaction-pool.workspace = true reth-evm.workspace = true reth-consensus.workspace = true -reth-rpc-types.workspace = true -reth-rpc-types-compat.workspace = true reth-errors.workspace = true @@ -32,5 +30,4 @@ tokio = { workspace = true, features = ["sync", "time"] } tracing.workspace = true # misc -flate2.workspace = true alloy-rlp.workspace = true diff --git a/crates/taiko/consensus/proposer/src/lib.rs b/crates/taiko/consensus/proposer/src/lib.rs index 7476607876f8..8a77fa9fce4e 100644 --- a/crates/taiko/consensus/proposer/src/lib.rs +++ b/crates/taiko/consensus/proposer/src/lib.rs @@ -12,26 +12,20 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use flate2::write::ZlibEncoder; -use flate2::Compression; use reth_chainspec::ChainSpec; use reth_consensus::{Consensus, ConsensusError, PostExecutionInput}; use reth_errors::RethError; use reth_execution_errors::{BlockExecutionError, BlockValidationError}; use reth_primitives::{ - eip4844::calculate_excess_blob_gas, proofs, transaction::TransactionSignedList, Address, Block, + eip4844::calculate_excess_blob_gas, proofs, Address, Block, BlockWithSenders, Header, Requests, SealedBlock, SealedHeader, TransactionSigned, Withdrawals, U256, }; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; use reth_revm::database::StateProviderDatabase; -use reth_rpc_types::Transaction; use reth_transaction_pool::TransactionPool; use std::{ - io::{self, Write}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -42,7 +36,7 @@ mod client; mod task; pub use crate::client::ProposerClient; -use reth_evm::execute::{BlockExecutionOutput, BlockExecutorProvider, Executor}; +use reth_evm::execute::{BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, TaskResult}; pub use task::ProposerTask; /// A consensus implementation intended for local development and testing purposes. @@ -157,17 +151,6 @@ pub struct TaskArgs { tx: oneshot::Sender, RethError>>, } -/// Result of the trigger -#[derive(Debug)] -pub struct TaskResult { - /// Transactions - pub txs: Vec, - /// Estimated gas used - pub estimated_gas_used: u64, - /// Bytes length - pub bytes_length: u64, -} - #[derive(Debug, Clone, Default)] struct Storage; @@ -303,70 +286,18 @@ impl Storage { ); // execute the block - let BlockExecutionOutput { receipts, .. } = - executor.executor(&mut db).execute((&mut block, U256::ZERO, false).into())?; - let Block { body, .. } = block.block; - - debug!(target: "taiko::proposer", transactions=?body, "after executing transactions"); - - let mut tx_lists = vec![]; - let mut chunk_start = 0; - let mut last_compressed_buf = None; - let mut gas_used_start = 0; - for idx in 0..body.len() { - if let Some((txs_range, estimated_gas_used, compressed_buf)) = { - let compressed_buf = encode_and_compress_tx_list(&body[chunk_start..=idx]) - .map_err(BlockExecutionError::other)?; - - if compressed_buf.len() > max_bytes_per_tx_list as usize { - // the first transaction in chunk is too large, so we need to skip it - if idx == chunk_start { - gas_used_start = receipts[idx].cumulative_gas_used; - chunk_start += 1; - // the first transaction in chunk is too large, so we need to skip it - None - } else { - // current chunk reaches the max_transactions_lists or max_bytes_per_tx_list - // and use previous transaction's data - let estimated_gas_used = - receipts[idx - 1].cumulative_gas_used - gas_used_start; - gas_used_start = receipts[idx - 1].cumulative_gas_used; - let range = chunk_start..idx; - chunk_start = idx; - Some((range, estimated_gas_used, last_compressed_buf.clone())) - } - } - // reach the limitation of max_transactions_lists or max_bytes_per_tx_list - else if idx - chunk_start + 1 == max_transactions_lists as usize { - let estimated_gas_used = receipts[idx].cumulative_gas_used - gas_used_start; - gas_used_start = receipts[idx].cumulative_gas_used; - let range = chunk_start..idx + 1; - chunk_start = idx + 1; - Some((range, estimated_gas_used, Some(compressed_buf))) - } else { - last_compressed_buf = Some(compressed_buf); - None - } - } { - tx_lists.push(TaskResult { - txs: body[txs_range] - .iter() - .cloned() - .map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap()) - .collect(), - estimated_gas_used, - bytes_length: compressed_buf.map_or(0, |b| b.len() as u64), - }); - } - } + let block_input = BlockExecutionInput { + block: &mut block, + total_difficulty: U256::ZERO, + enable_anchor: false, + enable_skip: false, + enable_build: true, + max_bytes_per_tx_list, + max_transactions_lists, + }; + let BlockExecutionOutput { target_list, .. } = + executor.executor(&mut db).execute(block_input.into())?; - Ok(tx_lists) + Ok(target_list) } } - -fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result> { - let encoded_buf = alloy_rlp::encode(TransactionSignedList(txs)); - let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(&encoded_buf)?; - encoder.finish() -} diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index 759f1046a189..ba7aa5fda996 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -1,12 +1,10 @@ use crate::{Storage, TaskArgs}; -use futures_util::{future::BoxFuture, FutureExt}; use reth_chainspec::ChainSpec; use reth_evm::execute::BlockExecutorProvider; use reth_primitives::IntoRecoveredTransaction; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; -use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; +use reth_transaction_pool::{TransactionPool}; use std::{ - collections::VecDeque, future::Future, pin::Pin, sync::Arc, @@ -21,16 +19,8 @@ pub struct ProposerTask { chain_spec: Arc, /// The client used to interact with the state provider: Provider, - /// Single active future that inserts a new block into `storage` - insert_task: Option>, /// Pool where transactions are stored pool: Pool, - /// backlog of sets of transactions ready to be mined - #[allow(clippy::type_complexity)] - queued: VecDeque<( - TaskArgs, - Vec::Transaction>>>, - )>, /// The type used for block execution block_executor: Executor, trigger_args_rx: UnboundedReceiver, @@ -51,9 +41,7 @@ impl ProposerTask Some(args), + match this.trigger_args_rx.poll_recv(cx) { + Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(()), - _ => None, - } { - let mut best_txs = this.pool.best_transactions(); - best_txs.skip_blobs(); - debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); - let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs - .filter(|tx| { - tx.effective_tip_per_gas(trigger_args.base_fee) - .map_or(false, |tip| tip >= trigger_args.min_tip as u128) - }) - .partition(|tx| { - trigger_args - .local_accounts - .as_ref() - .map(|local_accounts| local_accounts.contains(&tx.sender())) - .unwrap_or_default() - }); - local_txs.extend(remote_txs); - debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); - // miner returned a set of transaction that we feed to the producer - this.queued.push_back((trigger_args, local_txs)); - }; + Poll::Ready(Some(args)) => { + let mut best_txs = this.pool.best_transactions(); + best_txs.skip_blobs(); + debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); + let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs + .filter(|tx| { + tx.effective_tip_per_gas(args.base_fee) + .map_or(false, |tip| tip >= args.min_tip as u128) + }) + .partition(|tx| { + args + .local_accounts + .as_ref() + .map(|local_accounts| local_accounts.contains(&tx.sender())) + .unwrap_or_default() + }); + local_txs.extend(remote_txs); + debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); - if this.insert_task.is_none() { - if this.queued.is_empty() { - // nothing to insert - break; - } - - // ready to queue in new insert task; - let (trigger_args, txs) = this.queued.pop_front().expect("not empty"); + let client = this.provider.clone(); + let chain_spec = Arc::clone(&this.chain_spec); + let executor = this.block_executor.clone(); - let client = this.provider.clone(); - let chain_spec = Arc::clone(&this.chain_spec); - let pool = this.pool.clone(); - let executor = this.block_executor.clone(); - - // Create the mining future that creates a block, notifies the engine that drives - // the pipeline - this.insert_task = Some(Box::pin(async move { - let txs: Vec<_> = txs + let txs: Vec<_> = local_txs .into_iter() .map(|tx| tx.to_recovered_transaction().into_signed()) .collect(); @@ -131,7 +102,8 @@ where max_transactions_lists, base_fee, .. - } = trigger_args; + } = args; + let res = Storage::build_and_execute( txs.clone(), ommers, @@ -145,30 +117,16 @@ where base_fee, ); if res.is_ok() { - // clear all transactions from pool - pool.remove_transactions(txs.iter().map(|tx| tx.hash()).collect()); - } - let _ = tx.send(res); - })); - } - - if let Some(mut fut) = this.insert_task.take() { - match fut.poll_unpin(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - this.insert_task = Some(fut); - break; + let _ = tx.send(res); } } } } - - Poll::Pending } } impl std::fmt::Debug - for ProposerTask +for ProposerTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MiningTask").finish_non_exhaustive() diff --git a/crates/taiko/evm/Cargo.toml b/crates/taiko/evm/Cargo.toml index 92fbc1b74409..f48b1343c713 100644 --- a/crates/taiko/evm/Cargo.toml +++ b/crates/taiko/evm/Cargo.toml @@ -19,6 +19,7 @@ reth-revm = { workspace = true, features = ["taiko"] } reth-prune-types.workspace = true reth-execution-types.workspace = true reth-consensus.workspace = true +reth-rpc-types-compat.workspace = true # Taiko taiko-reth-beacon-consensus.workspace = true @@ -32,6 +33,7 @@ alloy-sol-types.workspace = true # Misc tracing.workspace = true +flate2.workspace = true [dev-dependencies] reth-testing-utils.workspace = true diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 27df7e39f1b6..4a827f1511e7 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -4,6 +4,8 @@ use crate::{ dao_fork::{DAO_HARDFORK_BENEFICIARY, DAO_HARDKFORK_ACCOUNTS}, TaikoEvmConfig, }; +use flate2::write::ZlibEncoder; +use flate2::Compression; use reth_chainspec::{ChainSpec, TAIKO_HEKLA, TAIKO_MAINNET}; use reth_consensus::ConsensusError; use reth_evm::{ @@ -14,9 +16,7 @@ use reth_evm::{ ConfigureEvm, }; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{ - BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, Withdrawals, U256, -}; +use reth_primitives::{BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, TransactionSigned, Withdrawals, U256}; use reth_prune_types::PruneModes; use reth_revm::{ batch::{BlockBatchRecord, BlockExecutorStats}, @@ -27,20 +27,23 @@ use reth_revm::{ }, Evm, JournaledState, }; -use revm_primitives::{ - db::{Database, DatabaseCommit}, - Address, BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, HashSet, ResultAndState, -}; +use revm_primitives::{db::{Database, DatabaseCommit}, Address, BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, HashSet, ResultAndState}; use taiko_reth_beacon_consensus::{ check_anchor_tx, decode_ontake_extra_data, validate_block_post_execution, }; #[cfg(not(feature = "std"))] use alloc::{sync::Arc, vec, vec::Vec}; +use std::io; +use std::io::Write; use tracing::debug; #[cfg(feature = "std")] use std::sync::Arc; +use revm_primitives::alloy_primitives::private::alloy_rlp; +use reth_evm::execute::TaskResult; +use reth_primitives::transaction::TransactionSignedList; +use reth_revm::interpreter::Host; /// Provides executors to execute regular ethereum blocks #[derive(Debug, Clone)] @@ -201,7 +204,7 @@ where block.base_fee_per_gas.unwrap_or_default(), treasury, ) - .map_err(|e| BlockExecutionError::CanonicalRevert { inner: e.to_string() })?; + .map_err(|e| BlockExecutionError::CanonicalRevert { inner: e.to_string() })?; } // If the signature was not valid, the sender address will have been set to zero @@ -341,10 +344,17 @@ impl TaikoBlockExecutor { } } +fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result> { + let encoded_buf = alloy_rlp::encode(TransactionSignedList(txs)); + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&encoded_buf)?; + encoder.finish() +} + impl TaikoBlockExecutor where EvmConfig: ConfigureEvm, - DB: Database, + DB: Database, { /// Configures a new evm configuration and block environment for the given block. /// @@ -394,6 +404,95 @@ where Ok(output) } + fn build_transaction_list( + &mut self, + block: &BlockWithSenders, + max_bytes_per_tx_list: u64, + max_transactions_lists: u64, + ) -> Result, BlockExecutionError> { + let env = self.evm_env_for_block(&block.header, U256::ZERO); + let mut evm = self.executor.evm_config.evm_with_env(&mut self.state, env); + // 2. configure the evm and execute + // apply pre execution changes + apply_beacon_root_contract_call( + &self.executor.chain_spec, + block.timestamp, + block.number, + block.parent_beacon_block_root, + &mut evm, + )?; + + apply_blockhashes_update( + evm.db_mut(), + &self.executor.chain_spec, + block.timestamp, + block.number, + block.parent_hash, + )?; + + let mut target_list: Vec = vec![]; + // get previous env + let previous_env = Box::new(evm.context.env().clone()); + + for _ in 0..max_transactions_lists { + // evm.context.evm.db.commit(state); + // re-set the previous env + evm.context.evm.env = previous_env.clone(); + + let mut cumulative_gas_used = 0; + let mut tx_list: Vec = vec![]; + let mut buf_len: u64 = 0; + + let length = block.body.len(); + for i in 0..length { + let transaction = block.body.get(i).unwrap(); + let sender = block.senders.get(i).unwrap(); + let block_available_gas = block.header.gas_limit - cumulative_gas_used; + if transaction.gas_limit() > block_available_gas { + break; + } + + EvmConfig::fill_tx_env(evm.tx_mut(), transaction, *sender); + + // Execute transaction. + let ResultAndState { result, state } = match evm.transact() { + Ok(res) => res, + Err(_) => continue, + }; + tx_list.push(transaction.clone()); + + let compressed_buf = encode_and_compress_tx_list(&tx_list) + .map_err(BlockExecutionError::other)?; + if compressed_buf.len() > max_bytes_per_tx_list as usize { + tx_list.pop(); + break; + } + + buf_len = compressed_buf.len() as u64; + // append gas used + cumulative_gas_used += result.gas_used(); + + // collect executed transaction state + evm.db_mut().commit(state); + } + + if tx_list.is_empty() { + break; + } + target_list.push(TaskResult { + txs: tx_list[..].iter() + .cloned() + .map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap()) + .collect(), + estimated_gas_used: cumulative_gas_used, + bytes_length: buf_len, + }); + } + + Ok(target_list) + } + + /// Apply settings before a new block is executed. pub(crate) fn on_new_block(&mut self, header: &Header) { // Set state clear flag if the block is after the Spurious Dragon hardfork. @@ -444,7 +543,7 @@ where impl Executor for TaikoBlockExecutor where EvmConfig: ConfigureEvm, - DB: Database, + DB: Database, { type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; type Output = BlockExecutionOutput; @@ -458,14 +557,26 @@ where /// /// State changes are committed to the database. fn execute(mut self, input: Self::Input<'_>) -> Result { - let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip } = input; - let TaikoExecuteOutput { receipts, requests, gas_used } = - self.execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; - - // NOTE: we need to merge keep the reverts for the bundle retention - self.state.merge_transitions(BundleRetention::Reverts); + let BlockExecutionInput { + block, + total_difficulty, + enable_anchor, + enable_skip, + enable_build, + max_bytes_per_tx_list, + max_transactions_lists + } = input; + if enable_build { + let target_list = self.build_transaction_list(block, max_bytes_per_tx_list, max_transactions_lists)?; + Ok(BlockExecutionOutput { state: Default::default(), receipts: vec![], requests: vec![], gas_used: 0, target_list }) + } else { + let TaikoExecuteOutput { receipts, requests, gas_used } = + self.execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; - Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used }) + // NOTE: we need to merge keep the reverts for the bundle retention + self.state.merge_transitions(BundleRetention::Reverts); + Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used, target_list: vec![] }) + } } } @@ -501,7 +612,7 @@ where type Error = BlockExecutionError; fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> { - let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip } = input; + let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip, .. } = input; let TaikoExecuteOutput { receipts, requests, gas_used: _ } = self .executor .execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; diff --git a/crates/taiko/payload/builder/src/builder.rs b/crates/taiko/payload/builder/src/builder.rs index 3eb04eb8adea..6846c3ebdc7d 100644 --- a/crates/taiko/payload/builder/src/builder.rs +++ b/crates/taiko/payload/builder/src/builder.rs @@ -167,7 +167,7 @@ where .ok_or(BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError))?; // execute the block - let BlockExecutionOutput { state, receipts, requests, gas_used } = + let BlockExecutionOutput { state, receipts, requests, gas_used, .. } = executor.executor(&mut db).execute((&mut block, U256::ZERO).into())?; let execution_outcome = diff --git a/crates/taiko/payload/builder/src/lib.rs b/crates/taiko/payload/builder/src/lib.rs index 777d1d76f884..f103ff8cdbfe 100644 --- a/crates/taiko/payload/builder/src/lib.rs +++ b/crates/taiko/payload/builder/src/lib.rs @@ -5,9 +5,6 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![allow(clippy::useless_let_if_seq)] pub mod builder; pub use builder::*; pub mod error; From 05cadb79d7b59bb1f9b803f5b428c798e23fa2c3 Mon Sep 17 00:00:00 2001 From: maskpp Date: Wed, 23 Oct 2024 10:32:54 +0800 Subject: [PATCH 2/5] format code --- crates/evm/src/execute.rs | 64 ++++++++++++++------- crates/taiko/consensus/proposer/src/lib.rs | 9 +-- crates/taiko/consensus/proposer/src/task.rs | 15 ++--- crates/taiko/evm/src/execute.rs | 60 +++++++++++++------ 4 files changed, 95 insertions(+), 53 deletions(-) diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 26757af117af..568978a45f37 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -1,11 +1,11 @@ //! Traits for execution. +use alloy_rpc_types_eth::transaction::Transaction; use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Receipt, Request, U256}; use reth_prune_types::PruneModes; use revm::db::BundleState; use revm_primitives::db::Database; -use alloy_rpc_types_eth::transaction::Transaction; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -56,7 +56,7 @@ pub trait BatchExecutor { /// for each input. fn execute_and_verify_many<'a, I>(&mut self, inputs: I) -> Result<(), Self::Error> where - I: IntoIterator>, + I: IntoIterator>, { for input in inputs { self.execute_and_verify_one(input)?; @@ -70,7 +70,7 @@ pub trait BatchExecutor { /// and [`BatchExecutor::finalize`]. fn execute_and_verify_batch<'a, I>(mut self, batch: I) -> Result where - I: IntoIterator>, + I: IntoIterator>, Self: Sized, { self.execute_and_verify_many(batch)?; @@ -143,7 +143,15 @@ pub struct BlockExecutionInput<'a, Block> { impl<'a, Block> BlockExecutionInput<'a, Block> { /// Creates a new input. pub fn new(block: &'a mut Block, total_difficulty: U256) -> Self { - Self { block, total_difficulty, enable_anchor: true, enable_skip: true, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } + Self { + block, + total_difficulty, + enable_anchor: true, + enable_skip: true, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } @@ -155,7 +163,15 @@ impl<'a, Block> From<(&'a mut Block, U256)> for BlockExecutionInput<'a, Block> { impl<'a, Block> From<(&'a mut Block, U256, bool)> for BlockExecutionInput<'a, Block> { fn from((block, total_difficulty, enable_anchor): (&'a mut Block, U256, bool)) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip: true, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } + Self { + block, + total_difficulty, + enable_anchor, + enable_skip: true, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } @@ -163,7 +179,15 @@ impl<'a, Block> From<(&'a mut Block, U256, bool, bool)> for BlockExecutionInput< fn from( (block, total_difficulty, enable_anchor, enable_skip): (&'a mut Block, U256, bool, bool), ) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip, enable_build: false, max_bytes_per_tx_list: 0, max_transactions_lists: 0 } + Self { + block, + total_difficulty, + enable_anchor, + enable_skip, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } @@ -180,19 +204,19 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// /// It is not expected to validate the state trie root, this must be done by the caller using /// the returned state. - type Executor>: for<'a> Executor< + type Executor>: for<'a> Executor< DB, - Input<'a>=BlockExecutionInput<'a, BlockWithSenders>, - Output=BlockExecutionOutput, - Error=BlockExecutionError, + Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, + Output = BlockExecutionOutput, + Error = BlockExecutionError, >; /// An executor that can execute a batch of blocks given a database. - type BatchExecutor>: for<'a> BatchExecutor< + type BatchExecutor>: for<'a> BatchExecutor< DB, - Input<'a>=BlockExecutionInput<'a, BlockWithSenders>, - Output=ExecutionOutcome, - Error=BlockExecutionError, + Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, + Output = ExecutionOutcome, + Error = BlockExecutionError, >; /// Creates a new executor for single block execution. @@ -200,7 +224,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// This is used to execute a single block and get the changed state. fn executor(&self, db: DB) -> Self::Executor where - DB: Database; + DB: Database; /// Creates a new batch executor with the given database and pruning modes. /// @@ -211,7 +235,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// execution. fn batch_executor(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor where - DB: Database; + DB: Database; } #[cfg(test)] @@ -225,19 +249,19 @@ mod tests { struct TestExecutorProvider; impl BlockExecutorProvider for TestExecutorProvider { - type Executor> = TestExecutor; - type BatchExecutor> = TestExecutor; + type Executor> = TestExecutor; + type BatchExecutor> = TestExecutor; fn executor(&self, _db: DB) -> Self::Executor where - DB: Database, + DB: Database, { TestExecutor(PhantomData) } fn batch_executor(&self, _db: DB, _prune_modes: PruneModes) -> Self::BatchExecutor where - DB: Database, + DB: Database, { TestExecutor(PhantomData) } diff --git a/crates/taiko/consensus/proposer/src/lib.rs b/crates/taiko/consensus/proposer/src/lib.rs index 8a77fa9fce4e..990497145a1a 100644 --- a/crates/taiko/consensus/proposer/src/lib.rs +++ b/crates/taiko/consensus/proposer/src/lib.rs @@ -18,9 +18,8 @@ use reth_consensus::{Consensus, ConsensusError, PostExecutionInput}; use reth_errors::RethError; use reth_execution_errors::{BlockExecutionError, BlockValidationError}; use reth_primitives::{ - eip4844::calculate_excess_blob_gas, proofs, Address, Block, - BlockWithSenders, Header, Requests, SealedBlock, SealedHeader, TransactionSigned, Withdrawals, - U256, + eip4844::calculate_excess_blob_gas, proofs, Address, Block, BlockWithSenders, Header, Requests, + SealedBlock, SealedHeader, TransactionSigned, Withdrawals, U256, }; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; use reth_revm::database::StateProviderDatabase; @@ -36,7 +35,9 @@ mod client; mod task; pub use crate::client::ProposerClient; -use reth_evm::execute::{BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, TaskResult}; +use reth_evm::execute::{ + BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, TaskResult, +}; pub use task::ProposerTask; /// A consensus implementation intended for local development and testing purposes. diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index ba7aa5fda996..d870f2226ca2 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -3,7 +3,7 @@ use reth_chainspec::ChainSpec; use reth_evm::execute::BlockExecutorProvider; use reth_primitives::IntoRecoveredTransaction; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; -use reth_transaction_pool::{TransactionPool}; +use reth_transaction_pool::TransactionPool; use std::{ future::Future, pin::Pin, @@ -38,13 +38,7 @@ impl ProposerTask, ) -> Self { - Self { - chain_spec, - provider, - pool, - block_executor, - trigger_args_rx, - } + Self { chain_spec, provider, pool, block_executor, trigger_args_rx } } } @@ -75,8 +69,7 @@ where .map_or(false, |tip| tip >= args.min_tip as u128) }) .partition(|tx| { - args - .local_accounts + args.local_accounts .as_ref() .map(|local_accounts| local_accounts.contains(&tx.sender())) .unwrap_or_default() @@ -126,7 +119,7 @@ where } impl std::fmt::Debug -for ProposerTask + for ProposerTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MiningTask").finish_non_exhaustive() diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 4a827f1511e7..61b95503a6fc 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -16,7 +16,10 @@ use reth_evm::{ ConfigureEvm, }; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, TransactionSigned, Withdrawals, U256}; +use reth_primitives::{ + BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, TransactionSigned, + Withdrawals, U256, +}; use reth_prune_types::PruneModes; use reth_revm::{ batch::{BlockBatchRecord, BlockExecutorStats}, @@ -27,7 +30,10 @@ use reth_revm::{ }, Evm, JournaledState, }; -use revm_primitives::{db::{Database, DatabaseCommit}, Address, BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, HashSet, ResultAndState}; +use revm_primitives::{ + db::{Database, DatabaseCommit}, + Address, BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, HashSet, ResultAndState, +}; use taiko_reth_beacon_consensus::{ check_anchor_tx, decode_ontake_extra_data, validate_block_post_execution, }; @@ -38,12 +44,12 @@ use std::io; use std::io::Write; use tracing::debug; -#[cfg(feature = "std")] -use std::sync::Arc; -use revm_primitives::alloy_primitives::private::alloy_rlp; use reth_evm::execute::TaskResult; use reth_primitives::transaction::TransactionSignedList; use reth_revm::interpreter::Host; +use revm_primitives::alloy_primitives::private::alloy_rlp; +#[cfg(feature = "std")] +use std::sync::Arc; /// Provides executors to execute regular ethereum blocks #[derive(Debug, Clone)] @@ -204,7 +210,7 @@ where block.base_fee_per_gas.unwrap_or_default(), treasury, ) - .map_err(|e| BlockExecutionError::CanonicalRevert { inner: e.to_string() })?; + .map_err(|e| BlockExecutionError::CanonicalRevert { inner: e.to_string() })?; } // If the signature was not valid, the sender address will have been set to zero @@ -354,7 +360,7 @@ fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result> impl TaikoBlockExecutor where EvmConfig: ConfigureEvm, - DB: Database, + DB: Database, { /// Configures a new evm configuration and block environment for the given block. /// @@ -461,8 +467,8 @@ where }; tx_list.push(transaction.clone()); - let compressed_buf = encode_and_compress_tx_list(&tx_list) - .map_err(BlockExecutionError::other)?; + let compressed_buf = + encode_and_compress_tx_list(&tx_list).map_err(BlockExecutionError::other)?; if compressed_buf.len() > max_bytes_per_tx_list as usize { tx_list.pop(); break; @@ -480,7 +486,8 @@ where break; } target_list.push(TaskResult { - txs: tx_list[..].iter() + txs: tx_list[..] + .iter() .cloned() .map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap()) .collect(), @@ -492,7 +499,6 @@ where Ok(target_list) } - /// Apply settings before a new block is executed. pub(crate) fn on_new_block(&mut self, header: &Header) { // Set state clear flag if the block is after the Spurious Dragon hardfork. @@ -543,7 +549,7 @@ where impl Executor for TaikoBlockExecutor where EvmConfig: ConfigureEvm, - DB: Database, + DB: Database, { type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; type Output = BlockExecutionOutput; @@ -564,18 +570,36 @@ where enable_skip, enable_build, max_bytes_per_tx_list, - max_transactions_lists + max_transactions_lists, } = input; if enable_build { - let target_list = self.build_transaction_list(block, max_bytes_per_tx_list, max_transactions_lists)?; - Ok(BlockExecutionOutput { state: Default::default(), receipts: vec![], requests: vec![], gas_used: 0, target_list }) + let target_list = + self.build_transaction_list(block, max_bytes_per_tx_list, max_transactions_lists)?; + Ok(BlockExecutionOutput { + state: Default::default(), + receipts: vec![], + requests: vec![], + gas_used: 0, + target_list, + }) } else { - let TaikoExecuteOutput { receipts, requests, gas_used } = - self.execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; + let TaikoExecuteOutput { receipts, requests, gas_used } = self + .execute_without_verification( + block, + total_difficulty, + enable_anchor, + enable_skip, + )?; // NOTE: we need to merge keep the reverts for the bundle retention self.state.merge_transitions(BundleRetention::Reverts); - Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used, target_list: vec![] }) + Ok(BlockExecutionOutput { + state: self.state.take_bundle(), + receipts, + requests, + gas_used, + target_list: vec![], + }) } } } From f8f1f3d7c89a70c4f4a354a25b0bdd2360f53ffe Mon Sep 17 00:00:00 2001 From: maskpp Date: Thu, 24 Oct 2024 20:48:14 +0800 Subject: [PATCH 3/5] small change --- crates/taiko/evm/src/execute.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 61b95503a6fc..94d3365984a2 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -42,7 +42,7 @@ use taiko_reth_beacon_consensus::{ use alloc::{sync::Arc, vec, vec::Vec}; use std::io; use std::io::Write; -use tracing::debug; +use tracing::{debug, info}; use reth_evm::execute::TaskResult; use reth_primitives::transaction::TransactionSignedList; @@ -449,8 +449,7 @@ where let mut tx_list: Vec = vec![]; let mut buf_len: u64 = 0; - let length = block.body.len(); - for i in 0..length { + for i in 0..block.body.len() { let transaction = block.body.get(i).unwrap(); let sender = block.senders.get(i).unwrap(); let block_available_gas = block.header.gas_limit - cumulative_gas_used; From 8458b335ecf936607c67a05089998eec88df3b5f Mon Sep 17 00:00:00 2001 From: maskpp Date: Fri, 25 Oct 2024 14:25:26 +0800 Subject: [PATCH 4/5] send Result --- crates/taiko/consensus/proposer/src/task.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index d870f2226ca2..ced1903f767a 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -109,9 +109,7 @@ where max_transactions_lists, base_fee, ); - if res.is_ok() { - let _ = tx.send(res); - } + let _ = tx.send(res); } } } From 09a492ff0c38c3806f583a3d615d4ea4608baffc Mon Sep 17 00:00:00 2001 From: maskpp Date: Fri, 25 Oct 2024 14:35:09 +0800 Subject: [PATCH 5/5] revert changes --- crates/taiko/consensus/proposer/src/task.rs | 101 ++++++++++++++------ crates/taiko/evm/src/execute.rs | 2 +- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index ced1903f767a..e262b07e94f5 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -1,10 +1,12 @@ use crate::{Storage, TaskArgs}; +use futures_util::{future::BoxFuture, FutureExt}; use reth_chainspec::ChainSpec; use reth_evm::execute::BlockExecutorProvider; use reth_primitives::IntoRecoveredTransaction; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; use std::{ + collections::VecDeque, future::Future, pin::Pin, sync::Arc, @@ -19,8 +21,16 @@ pub struct ProposerTask { chain_spec: Arc, /// The client used to interact with the state provider: Provider, + /// Single active future that inserts a new block into `storage` + insert_task: Option>, /// Pool where transactions are stored pool: Pool, + /// backlog of sets of transactions ready to be mined + #[allow(clippy::type_complexity)] + queued: VecDeque<( + TaskArgs, + Vec::Transaction>>>, + )>, /// The type used for block execution block_executor: Executor, trigger_args_rx: UnboundedReceiver, @@ -38,7 +48,15 @@ impl ProposerTask, ) -> Self { - Self { chain_spec, provider, pool, block_executor, trigger_args_rx } + Self { + chain_spec, + provider, + insert_task: None, + pool, + queued: Default::default(), + block_executor, + trigger_args_rx, + } } } @@ -56,32 +74,50 @@ where // this drives block production and loop { - match this.trigger_args_rx.poll_recv(cx) { - Poll::Pending => return Poll::Pending, + if let Some(trigger_args) = match this.trigger_args_rx.poll_recv(cx) { + Poll::Ready(Some(args)) => Some(args), Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(args)) => { - let mut best_txs = this.pool.best_transactions(); - best_txs.skip_blobs(); - debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); - let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs - .filter(|tx| { - tx.effective_tip_per_gas(args.base_fee) - .map_or(false, |tip| tip >= args.min_tip as u128) - }) - .partition(|tx| { - args.local_accounts - .as_ref() - .map(|local_accounts| local_accounts.contains(&tx.sender())) - .unwrap_or_default() - }); - local_txs.extend(remote_txs); - debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); + _ => None, + } { + let mut best_txs = this.pool.best_transactions(); + best_txs.skip_blobs(); + debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); + let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs + .filter(|tx| { + tx.effective_tip_per_gas(trigger_args.base_fee) + .map_or(false, |tip| tip >= trigger_args.min_tip as u128) + }) + .partition(|tx| { + trigger_args + .local_accounts + .as_ref() + .map(|local_accounts| local_accounts.contains(&tx.sender())) + .unwrap_or_default() + }); + local_txs.extend(remote_txs); + debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); + + // miner returned a set of transaction that we feed to the producer + this.queued.push_back((trigger_args, local_txs)); + }; + + if this.insert_task.is_none() { + if this.queued.is_empty() { + // nothing to insert + break; + } + + // ready to queue in new insert task; + let (trigger_args, txs) = this.queued.pop_front().expect("not empty"); - let client = this.provider.clone(); - let chain_spec = Arc::clone(&this.chain_spec); - let executor = this.block_executor.clone(); + let client = this.provider.clone(); + let chain_spec = Arc::clone(&this.chain_spec); + let executor = this.block_executor.clone(); - let txs: Vec<_> = local_txs + // Create the mining future that creates a block, notifies the engine that drives + // the pipeline + this.insert_task = Some(Box::pin(async move { + let txs: Vec<_> = txs .into_iter() .map(|tx| tx.to_recovered_transaction().into_signed()) .collect(); @@ -95,8 +131,7 @@ where max_transactions_lists, base_fee, .. - } = args; - + } = trigger_args; let res = Storage::build_and_execute( txs.clone(), ommers, @@ -110,9 +145,21 @@ where base_fee, ); let _ = tx.send(res); + })); + } + + if let Some(mut fut) = this.insert_task.take() { + match fut.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + this.insert_task = Some(fut); + break; + } } } } + + Poll::Pending } } diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 94d3365984a2..ef13df84e916 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -42,7 +42,7 @@ use taiko_reth_beacon_consensus::{ use alloc::{sync::Arc, vec, vec::Vec}; use std::io; use std::io::Write; -use tracing::{debug, info}; +use tracing::debug; use reth_evm::execute::TaskResult; use reth_primitives::transaction::TransactionSignedList;