forked from paradigmxyz/reth
-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathtask.rs
172 lines (157 loc) · 6.18 KB
/
task.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
use crate::{Storage, TaskArgs};
use futures_util::{future::BoxFuture, FutureExt};
use reth_chainspec::ChainSpec;
use reth_evm::execute::BlockExecutorProvider;
use reth_primitives::IntoRecoveredTransaction;
use reth_provider::{BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::debug;
/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct ProposerTask<Provider, Pool: TransactionPool, Executor> {
/// The configured chain spec
chain_spec: Arc<ChainSpec>,
/// The client used to interact with the state
provider: Provider,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, ()>>,
/// Pool where transactions are stored
pool: Pool,
/// backlog of sets of transactions ready to be mined
#[allow(clippy::type_complexity)]
queued: VecDeque<(
TaskArgs,
Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
)>,
/// The type used for block execution
block_executor: Executor,
trigger_args_rx: UnboundedReceiver<TaskArgs>,
}
// === impl MiningTask ===
impl<Executor, Provider, Pool: TransactionPool> ProposerTask<Provider, Pool, Executor> {
/// Creates a new instance of the task
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
chain_spec: Arc<ChainSpec>,
provider: Provider,
pool: Pool,
block_executor: Executor,
trigger_args_rx: UnboundedReceiver<TaskArgs>,
) -> Self {
Self {
chain_spec,
provider,
insert_task: None,
pool,
queued: Default::default(),
block_executor,
trigger_args_rx,
}
}
}
impl<Executor, Provider, Pool> Future for ProposerTask<Provider, Pool, Executor>
where
Provider: StateProviderFactory + BlockReaderIdExt + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
Executor: BlockExecutorProvider,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// this drives block production and
loop {
if let Some(trigger_args) = match this.trigger_args_rx.poll_recv(cx) {
Poll::Ready(Some(args)) => Some(args),
Poll::Ready(None) => return Poll::Ready(()),
_ => None,
} {
let mut best_txs = this.pool.best_transactions();
best_txs.skip_blobs();
debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions");
let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs
.filter(|tx| {
tx.effective_tip_per_gas(trigger_args.base_fee)
.map_or(false, |tip| tip >= trigger_args.min_tip as u128)
})
.partition(|tx| {
trigger_args
.local_accounts
.as_ref()
.map(|local_accounts| local_accounts.contains(&tx.sender()))
.unwrap_or_default()
});
local_txs.extend(remote_txs);
debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions");
// miner returned a set of transaction that we feed to the producer
this.queued.push_back((trigger_args, local_txs));
};
if this.insert_task.is_none() {
if this.queued.is_empty() {
// nothing to insert
break;
}
// ready to queue in new insert task;
let (trigger_args, txs) = this.queued.pop_front().expect("not empty");
let client = this.provider.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let executor = this.block_executor.clone();
// Create the mining future that creates a block, notifies the engine that drives
// the pipeline
this.insert_task = Some(Box::pin(async move {
let txs: Vec<_> = txs
.into_iter()
.map(|tx| tx.to_recovered_transaction().into_signed())
.collect();
let ommers = vec![];
let TaskArgs {
tx,
beneficiary,
block_max_gas_limit,
max_bytes_per_tx_list,
max_transactions_lists,
base_fee,
..
} = trigger_args;
let res = Storage::build_and_execute(
txs.clone(),
ommers,
&client,
chain_spec,
&executor,
beneficiary,
block_max_gas_limit,
max_bytes_per_tx_list,
max_transactions_lists,
base_fee,
);
let _ = tx.send(res);
}));
}
if let Some(mut fut) = this.insert_task.take() {
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
this.insert_task = Some(fut);
break;
}
}
}
}
Poll::Pending
}
}
impl<Client, Pool: TransactionPool, EvmConfig: std::fmt::Debug> std::fmt::Debug
for ProposerTask<Client, Pool, EvmConfig>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MiningTask").finish_non_exhaustive()
}
}