Skip to content

Commit

Permalink
fix provider reopener, rework nonce cache (#481)
Browse files Browse the repository at this point in the history
The provider factory reopener's concept of "last_consistent_block" was
incorrect, as database consistency was compromised for previously
checked blocks. This caused the rbuilder to miss some slots due to its
inability to reopen the database during slot execution.

The nonce cache has been refactored to use database transactions instead
of database handles. This change simplifies the code and reduces the
need for database consistency checks.

## 📝 Summary

<!--- A general summary of your changes -->

## 💡 Motivation and Context

<!--- (Optional) Why is this change required? What problem does it
solve? Remove this section if not applicable. -->

---

## ✅ I have completed the following steps:

* [ ] Run `make lint`
* [ ] Run `make test`
* [ ] Added tests (if applicable)

---------

Co-authored-by: Daniel Xifra <[email protected]>
  • Loading branch information
dvush and ZanCorDX authored Mar 10, 2025
1 parent c28ea6e commit 2266b21
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 107 deletions.
26 changes: 7 additions & 19 deletions crates/rbuilder/src/building/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use crate::{
};
use ahash::HashSet;
use alloy_eips::eip4844::BlobTransactionSidecar;
use alloy_primitives::{Address, Bytes, B256};
use alloy_primitives::{Address, Bytes};
use block_building_helper::BiddableUnfinishedBlock;
use reth::{primitives::SealedBlock, revm::cached::CachedReads};
use reth_errors::ProviderError;
use std::{fmt::Debug, sync::Arc};
use tokio::sync::{
broadcast,
Expand Down Expand Up @@ -115,30 +114,24 @@ impl OrderConsumer {
}

#[derive(Debug)]
pub struct OrderIntakeConsumer<P> {
nonce_cache: NonceCache<P>,
pub struct OrderIntakeConsumer {
nonces: NonceCache,

block_orders: PrioritizedOrderStore,
onchain_nonces_updated: HashSet<Address>,

order_consumer: OrderConsumer,
}

impl<P> OrderIntakeConsumer<P>
where
P: StateProviderFactory,
{
impl OrderIntakeConsumer {
/// See [`ShareBundleMerger`] for sbundle_merger_selected_signers
pub fn new(
provider: P,
nonces: NonceCache,
orders: broadcast::Receiver<SimulatedOrderCommand>,
parent_block: B256,
sorting: Sorting,
) -> Self {
let nonce_cache = NonceCache::new(provider, parent_block);

Self {
nonce_cache,
nonces,
block_orders: PrioritizedOrderStore::new(sorting, vec![]),
onchain_nonces_updated: HashSet::default(),
order_consumer: OrderConsumer::new(orders),
Expand Down Expand Up @@ -170,18 +163,13 @@ where
SimulatedOrderCommand::Simulation(sim_order) => Some(sim_order),
SimulatedOrderCommand::Cancellation(_) => None,
});
let nonce_db_ref = match self.nonce_cache.get_ref() {
Ok(nonce_db_ref) => nonce_db_ref,
Err(ProviderError::BlockHashNotFound(_)) => return Ok(false), // This can happen on reorgs since the block is removed
Err(err) => return Err(err.into()),
};
let mut nonces = Vec::new();
for new_order in new_orders {
for nonce in new_order.order.nonces() {
if self.onchain_nonces_updated.contains(&nonce.address) {
continue;
}
let onchain_nonce = nonce_db_ref.nonce(nonce.address)?;
let onchain_nonce = self.nonces.nonce(nonce.address)?;
nonces.push(AccountNonce {
account: nonce.address,
nonce: onchain_nonce,
Expand Down
28 changes: 22 additions & 6 deletions crates/rbuilder/src/building/builders/ordering_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
primitives::{AccountNonce, OrderId},
provider::StateProviderFactory,
telemetry::mark_builder_considers_order,
utils::NonceCache,
};
use ahash::{HashMap, HashSet};
use reth::revm::cached::CachedReads;
Expand Down Expand Up @@ -63,12 +64,27 @@ where
P: StateProviderFactory + Clone + 'static,
{
let payload_id = input.ctx.payload_id;
let mut order_intake_consumer = OrderIntakeConsumer::new(
input.provider.clone(),
input.input,
input.ctx.attributes.parent,
config.sorting,
);

let nonces = {
let block_state = match input
.provider
.history_by_block_hash(input.ctx.attributes.parent)
{
Ok(state) => state,
Err(err) => {
error!(
?err,
payload_id,
builder = input.builder_name,
"Failed to get history_by_block_hash, cancelling builder job"
);
return;
}
};
NonceCache::new(block_state)
};

let mut order_intake_consumer = OrderIntakeConsumer::new(nonces, input.input, config.sorting);

let mut builder = OrderingBuilderContext::new(
input.provider.clone(),
Expand Down
45 changes: 19 additions & 26 deletions crates/rbuilder/src/building/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use crate::{
primitives::{Order, OrderId, SimValue, SimulatedOrder},
provider::StateProviderFactory,
telemetry::{add_order_simulation_time, mark_order_pending_nonce},
utils::{NonceCache, NonceCacheRef},
utils::NonceCache,
};
use ahash::{HashMap, HashSet};
use alloy_primitives::{Address, B256};
use alloy_primitives::Address;
use rand::seq::SliceRandom;
use reth::revm::cached::CachedReads;
use reth_errors::ProviderError;
Expand Down Expand Up @@ -68,9 +68,9 @@ pub struct SimulatedResult {

// @Feat replaceable orders
#[derive(Debug)]
pub struct SimTree<P> {
pub struct SimTree {
// fields for nonce management
nonce_cache: NonceCache<P>,
nonces: NonceCache,

sims: HashMap<SimulationId, SimulatedResult>,
sims_that_update_one_nonce: HashMap<NonceKey, SimulationId>,
Expand All @@ -88,14 +88,10 @@ enum OrderNonceState {
Ready(Vec<Order>),
}

impl<P> SimTree<P>
where
P: StateProviderFactory,
{
pub fn new(provider: P, parent_block: B256) -> Self {
let nonce_cache = NonceCache::new(provider, parent_block);
impl SimTree {
pub fn new(nonce_cache_ref: NonceCache) -> Self {
Self {
nonce_cache,
nonces: nonce_cache_ref,
sims: HashMap::default(),
sims_that_update_one_nonce: HashMap::default(),
pending_orders: HashMap::default(),
Expand All @@ -104,12 +100,12 @@ where
}
}

fn push_order(&mut self, order: Order, nonces: &NonceCacheRef) -> Result<(), ProviderError> {
fn push_order(&mut self, order: Order) -> Result<(), ProviderError> {
if self.pending_orders.contains_key(&order.id()) {
return Ok(());
}

let order_nonce_state = self.get_order_nonce_state(&order, nonces)?;
let order_nonce_state = self.get_order_nonce_state(&order)?;

let order_id = order.id();

Expand Down Expand Up @@ -145,17 +141,13 @@ where
Ok(())
}

fn get_order_nonce_state(
&mut self,
order: &Order,
nonces: &NonceCacheRef,
) -> Result<OrderNonceState, ProviderError> {
fn get_order_nonce_state(&mut self, order: &Order) -> Result<OrderNonceState, ProviderError> {
let mut onchain_nonces_incremented = HashSet::default();
let mut pending_nonces = Vec::new();
let mut parent_orders = Vec::new();

for nonce in order.nonces() {
let onchain_nonce = nonces.nonce(nonce.address)?;
let onchain_nonce = self.nonces.nonce(nonce.address)?;

match onchain_nonce.cmp(&nonce.nonce) {
Ordering::Equal => {
Expand Down Expand Up @@ -212,9 +204,8 @@ where
}

pub fn push_orders(&mut self, orders: Vec<Order>) -> Result<(), ProviderError> {
let state = self.nonce_cache.get_ref()?;
for order in orders {
self.push_order(order, &state)?;
self.push_order(order)?;
}
Ok(())
}
Expand All @@ -228,7 +219,6 @@ where
fn process_simulation_task_result(
&mut self,
result: SimulatedResult,
state: &NonceCacheRef,
) -> Result<(), ProviderError> {
self.sims.insert(result.id, result.clone());
let mut orders_ready = Vec::new();
Expand Down Expand Up @@ -275,7 +265,7 @@ where
}

for ready_order in orders_ready {
let pending_state = self.get_order_nonce_state(&ready_order, state)?;
let pending_state = self.get_order_nonce_state(&ready_order)?;
match pending_state {
OrderNonceState::Ready(parents) => {
self.ready_orders.push(SimulationRequest {
Expand All @@ -301,9 +291,8 @@ where
&mut self,
results: Vec<SimulatedResult>,
) -> Result<(), ProviderError> {
let nonces = self.nonce_cache.get_ref()?;
for result in results {
self.process_simulation_task_result(result, &nonces)?;
self.process_simulation_task_result(result)?;
}
Ok(())
}
Expand All @@ -321,7 +310,11 @@ pub fn simulate_all_orders_with_sim_tree<P>(
where
P: StateProviderFactory + Clone,
{
let mut sim_tree = SimTree::new(provider.clone(), ctx.attributes.parent);
let nonces = {
let state = provider.history_by_block_hash(ctx.attributes.parent)?;
NonceCache::new(state)
};
let mut sim_tree = SimTree::new(nonces);

let mut orders = orders.to_vec();
let random_insert_size = max(orders.len() / 20, 1);
Expand Down
20 changes: 17 additions & 3 deletions crates/rbuilder/src/live_builder/simulation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use crate::{
live_builder::order_input::orderpool::OrdersForBlock,
primitives::{OrderId, SimulatedOrder},
provider::StateProviderFactory,
utils::{gen_uid, Signer},
utils::{gen_uid, NonceCache, Signer},
};
use ahash::HashMap;
use parking_lot::Mutex;
use simulation_job::SimulationJob;
use std::sync::Arc;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use tracing::{error, info_span, Instrument};

#[derive(Debug)]
pub struct SlotOrderSimResults {
Expand Down Expand Up @@ -123,7 +123,21 @@ where

let handle = tokio::spawn(
async move {
let sim_tree = SimTree::new(provider, ctx.attributes.parent);
let nonces = {
let state = match provider.history_by_block_hash(ctx.attributes.parent) {
Ok(state) => state,
Err(err) => {
error!(
?err,
"Failed to get history_by_block_hash, cancelling simulation job"
);
return;
}
};
NonceCache::new(state)
};

let sim_tree = SimTree::new(nonces);
let new_order_sub = input.new_order_sub;
let (sim_req_sender, sim_req_receiver) = flume::unbounded();
let (sim_results_sender, sim_results_receiver) = mpsc::channel(1024);
Expand Down
12 changes: 4 additions & 8 deletions crates/rbuilder/src/live_builder/simulation/simulation_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
building::sim::{SimTree, SimulatedResult, SimulationRequest},
live_builder::order_input::order_sink::OrderPoolCommand,
primitives::{Order, OrderId, OrderReplacementKey},
provider::StateProviderFactory,
};
use ahash::HashSet;
use alloy_primitives::utils::format_ether;
Expand All @@ -25,7 +24,7 @@ use super::SimulatedOrderCommand;
/// If we get a cancellation and the order is in in_flight_orders we just remove it from in_flight_orders.
/// Only SimulatedOrders still in in_flight_orders are delivered.
/// @Pending: implement cancellations in the SimTree.
pub struct SimulationJob<P> {
pub struct SimulationJob {
block_cancellation: CancellationToken,
/// Input orders to be simulated
new_order_sub: mpsc::UnboundedReceiver<OrderPoolCommand>,
Expand All @@ -35,7 +34,7 @@ pub struct SimulationJob<P> {
sim_results_receiver: mpsc::Receiver<SimulatedResult>,
/// Output of the simulations
slot_sim_results_sender: mpsc::Sender<SimulatedOrderCommand>,
sim_tree: SimTree<P>,
sim_tree: SimTree,

orders_received: OrderCounter,
orders_simulated_ok: OrderCounter,
Expand All @@ -62,17 +61,14 @@ pub struct SimulationJob<P> {
not_cancelled_sent_simulated_orders: HashSet<OrderId>,
}

impl<P> SimulationJob<P>
where
P: StateProviderFactory,
{
impl SimulationJob {
pub fn new(
block_cancellation: CancellationToken,
new_order_sub: mpsc::UnboundedReceiver<OrderPoolCommand>,
sim_req_sender: flume::Sender<SimulationRequest>,
sim_results_receiver: mpsc::Receiver<SimulatedResult>,
slot_sim_results_sender: mpsc::Sender<SimulatedOrderCommand>,
sim_tree: SimTree<P>,
sim_tree: SimTree,
) -> Self {
Self {
block_cancellation,
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::primitives::{
};
use alloy_consensus::TxEnvelope;
use alloy_eips::eip2718::Encodable2718;
pub use noncer::{NonceCache, NonceCacheRef};
pub use noncer::NonceCache;
pub use provider_factory_reopen::{
check_block_hash_reader_health, is_provider_factory_health_error, HistoricalBlockError,
ProviderFactoryReopener, RootHasherImpl,
Expand Down
44 changes: 10 additions & 34 deletions crates/rbuilder/src/utils/noncer.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,27 @@
use crate::provider::StateProviderFactory;
use alloy_primitives::{Address, B256};
use alloy_primitives::Address;
use dashmap::DashMap;
use derivative::Derivative;
use reth::providers::StateProviderBox;
use reth_errors::ProviderResult;
use std::sync::Arc;

/// Struct to get nonces for Addresses, caching the results.
/// NonceCache contains the data (but doesn't allow you to query it) and NonceCacheRef is a reference that allows you to query it.
/// Usage:
/// - Create a NonceCache
/// - For every context where the nonce is needed call NonceCache::get_ref and call NonceCacheRef::nonce all the times you need.
/// Neither NonceCache or NonceCacheRef are clonable, the clone of shared info happens on get_ref where we clone the internal cache.
#[derive(Debug)]
pub struct NonceCache<P> {
provider: P,
// We use Arc<DashMap> here to allow concurrent access to cache
#[derive(Derivative)]
#[derivative(Debug)]
pub struct NonceCache {
#[derivative(Debug = "ignore")]
state: StateProviderBox,
cache: Arc<DashMap<Address, u64>>,
block: B256,
}

impl<P> NonceCache<P>
where
P: StateProviderFactory,
{
pub fn new(provider: P, block: B256) -> Self {
impl NonceCache {
pub fn new(state: StateProviderBox) -> Self {
Self {
provider,
state,
cache: Arc::new(DashMap::default()),
block,
}
}

pub fn get_ref(&self) -> ProviderResult<NonceCacheRef> {
let state = self.provider.history_by_block_hash(self.block)?;
Ok(NonceCacheRef {
state,
cache: Arc::clone(&self.cache),
})
}
}

pub struct NonceCacheRef {
state: StateProviderBox,
cache: Arc<DashMap<Address, u64>>,
}

impl NonceCacheRef {
pub fn nonce(&self, address: Address) -> ProviderResult<u64> {
if let Some(nonce) = self.cache.get(&address) {
return Ok(*nonce);
Expand Down
Loading

0 comments on commit 2266b21

Please sign in to comment.