Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net_plugin transaction dedup cache handling #56

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 26 additions & 76 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ namespace eosio {
struct node_transaction_state {
transaction_id_type id;
time_point_sec expires; /// time after which this may be purged.
uint32_t block_num = 0; /// block transaction was included in
uint32_t connection_id = 0;
};

struct by_expiry;
struct by_block_num;

typedef multi_index_container<
node_transaction_state,
Expand All @@ -82,10 +80,7 @@ namespace eosio {
>,
ordered_non_unique<
tag< by_expiry >,
member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >,
ordered_non_unique<
tag<by_block_num>,
member< node_transaction_state, uint32_t, &node_transaction_state::block_num > >
member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >
>
>
node_transaction_index;
Expand All @@ -98,6 +93,7 @@ namespace eosio {
};

struct by_block_id;
struct by_block_num;

typedef multi_index_container<
eosio::peer_block_state,
Expand All @@ -121,14 +117,6 @@ namespace eosio {
> peer_block_state_index;


struct update_block_num {
uint32_t new_bnum;
update_block_num(uint32_t bnum) : new_bnum(bnum) {}
void operator() (node_transaction_state& nts) {
nts.block_num = new_bnum;
}
};

class sync_manager {
private:
enum stages {
Expand Down Expand Up @@ -200,12 +188,10 @@ namespace eosio {
bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const;
bool have_block(const block_id_type& blkid) const;

bool add_peer_txn( const node_transaction_state& nts );
void update_txns_block_num( const signed_block_ptr& sb );
void update_txns_block_num( const transaction_id_type& id, uint32_t blk_num );
bool peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const;
bool add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, uint32_t connection_id,
const time_point_sec& now = time_point::now() );
bool have_txn( const transaction_id_type& tid ) const;
void expire_txns( uint32_t lib_num );
void expire_txns();
};

class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl> {
Expand Down Expand Up @@ -244,6 +230,7 @@ namespace eosio {
uint32_t max_client_count = 0;
uint32_t max_nodes_per_host = 1;
bool p2p_accept_transactions = true;
fc::microseconds p2p_dedup_cache_expire_time_us{};

/// Peer clock may be no more than 1 second skewed from our clock, including network latency.
const std::chrono::system_clock::duration peer_authentication_interval{std::chrono::seconds{1}};
Expand Down Expand Up @@ -1956,53 +1943,30 @@ namespace eosio {
return false;
}

bool dispatch_manager::add_peer_txn( const node_transaction_state& nts ) {
bool dispatch_manager::add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires,
uint32_t connection_id, const time_point_sec& now ) {
std::lock_guard<std::mutex> g( local_txns_mtx );
auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( nts.id ), nts.connection_id ) );
auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( id ), connection_id ) );
bool added = (tptr == local_txns.end());
if( added ) {
local_txns.insert( nts );
// expire at either transaction expiration or configured max expire time whichever is less
time_point_sec expires = now + my_impl->p2p_dedup_cache_expire_time_us;
expires = std::min( trx_expires, expires );
local_txns.insert( node_transaction_state{
.id = id,
.expires = expires,
.connection_id = connection_id} );
}
return added;
}

// thread safe
void dispatch_manager::update_txns_block_num( const signed_block_ptr& sb ) {
update_block_num ubn( sb->block_num() );
std::lock_guard<std::mutex> g( local_txns_mtx );
for( const auto& recpt : sb->transactions ) {
const transaction_id_type& id = (recpt.trx.index() == 0) ? std::get<transaction_id_type>(recpt.trx)
: std::get<packed_transaction>(recpt.trx).id();
auto range = local_txns.get<by_id>().equal_range( id );
for( auto itr = range.first; itr != range.second; ++itr ) {
local_txns.modify( itr, ubn );
}
}
}

// thread safe
void dispatch_manager::update_txns_block_num( const transaction_id_type& id, uint32_t blk_num ) {
update_block_num ubn( blk_num );
std::lock_guard<std::mutex> g( local_txns_mtx );
auto range = local_txns.get<by_id>().equal_range( id );
for( auto itr = range.first; itr != range.second; ++itr ) {
local_txns.modify( itr, ubn );
}
}

bool dispatch_manager::peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const {
std::lock_guard<std::mutex> g( local_txns_mtx );
const auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( tid ), connection_id ) );
return tptr != local_txns.end();
}

bool dispatch_manager::have_txn( const transaction_id_type& tid ) const {
std::lock_guard<std::mutex> g( local_txns_mtx );
const auto tptr = local_txns.get<by_id>().find( tid );
return tptr != local_txns.end();
}

void dispatch_manager::expire_txns( uint32_t lib_num ) {
void dispatch_manager::expire_txns() {
size_t start_size = 0, end_size = 0;

std::unique_lock<std::mutex> g( local_txns_mtx );
Expand All @@ -2011,12 +1975,6 @@ namespace eosio {
auto ex_lo = old.lower_bound( fc::time_point_sec( 0 ) );
auto ex_up = old.upper_bound( time_point::now() );
old.erase( ex_lo, ex_up );
g.unlock(); // allow other threads opportunity to use local_txns

g.lock();
auto& stale = local_txns.get<by_block_num>();
stale.erase( stale.lower_bound( 1 ), stale.upper_bound( lib_num ) );
end_size = local_txns.size();
g.unlock();

fc_dlog( logger, "expire_local_txns size ${s} removed ${r}", ("s", start_size)( "r", start_size - end_size ) );
Expand Down Expand Up @@ -2092,17 +2050,13 @@ namespace eosio {
}

void dispatch_manager::bcast_transaction(const packed_transaction& trx) {
const auto& id = trx.id();
time_point_sec trx_expiration = trx.expiration();
node_transaction_state nts = {id, trx_expiration, 0, 0};

std::shared_ptr<std::vector<char>> send_buffer;
for_each_connection( [this, &trx, &nts, &send_buffer]( auto& cp ) {
const auto now = fc::time_point::now();
for_each_connection( [this, &trx, &now, &send_buffer]( auto& cp ) {
if( cp->is_blocks_only_connection() || !cp->current() ) {
return true;
}
nts.connection_id = cp->connection_id;
if( !add_peer_txn(nts) ) {
if( !add_peer_txn(trx.id(), trx.expiration(), cp->connection_id, now) ) {
return true;
}
if( !send_buffer ) {
Expand All @@ -2119,11 +2073,7 @@ namespace eosio {

void dispatch_manager::rejected_transaction(const packed_transaction_ptr& trx, uint32_t head_blk_num) {
fc_dlog( logger, "not sending rejected transaction ${tid}", ("tid", trx->id()) );
// keep rejected transaction around for awhile so we don't broadcast it
// update its block number so it will be purged when current block number is lib
if( trx->expiration() > fc::time_point::now() ) { // no need to update blk_num if already expired
update_txns_block_num( trx->id(), head_blk_num );
}
// keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns
}

// called from connection strand
Expand Down Expand Up @@ -2975,8 +2925,7 @@ namespace eosio {
}

bool have_trx = my_impl->dispatcher->have_txn( tid );
node_transaction_state nts = {tid, trx->expiration(), 0, connection_id};
my_impl->dispatcher->add_peer_txn( nts );
my_impl->dispatcher->add_peer_txn( tid, trx->expiration(), connection_id );

if( have_trx ) {
fc_dlog( logger, "got a duplicate transaction - dropping ${id}", ("id", tid) );
Expand Down Expand Up @@ -3067,7 +3016,6 @@ namespace eosio {
boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) );
dispatcher->add_peer_block( blk_id, cid );
dispatcher->update_txns_block_num( msg );
});
c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
dispatcher->recv_block( c, blk_id, blk_num );
Expand Down Expand Up @@ -3159,7 +3107,7 @@ namespace eosio {
uint32_t lib = 0;
std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = get_chain_info();
dispatcher->expire_blocks( lib );
dispatcher->expire_txns( lib );
dispatcher->expire_txns();
fc_dlog( logger, "expire_txns ${n}us", ("n", time_point::now() - now) );

start_expire_timer();
Expand Down Expand Up @@ -3403,7 +3351,8 @@ namespace eosio {
"Tuple of [PublicKey, WIF private key] (may specify multiple times)")
( "max-clients", bpo::value<int>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit")
( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections")
( "max-cleanup-time-msec", bpo::value<int>()->default_value(10), "max connection cleanup time per cleanup call in millisec")
( "max-cleanup-time-msec", bpo::value<int>()->default_value(10), "max connection cleanup time per cleanup call in milliseconds")
( "p2p-dedup-cache-expire-time-sec", bpo::value<uint32_t>()->default_value(10), "Maximum time to track transaction for duplicate optimization")
( "net-threads", bpo::value<uint16_t>()->default_value(my->thread_pool_size),
"Number of worker threads in net_plugin thread pool" )
( "sync-fetch-span", bpo::value<uint32_t>()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization")
Expand Down Expand Up @@ -3436,6 +3385,7 @@ namespace eosio {
my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
my->txn_exp_period = def_txn_expire_wait;
my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as<uint32_t>() );
my->resp_expected_period = def_resp_expected_wait;
my->max_client_count = options.at( "max-clients" ).as<int>();
my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
Expand Down