diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 8b6aca3955..19ee98053d 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -62,12 +62,10 @@ namespace eosio { struct node_transaction_state { transaction_id_type id; time_point_sec expires; /// time after which this may be purged. - uint32_t block_num = 0; /// block transaction was included in uint32_t connection_id = 0; }; struct by_expiry; - struct by_block_num; typedef multi_index_container< node_transaction_state, @@ -82,10 +80,7 @@ namespace eosio { >, ordered_non_unique< tag< by_expiry >, - member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >, - ordered_non_unique< - tag, - member< node_transaction_state, uint32_t, &node_transaction_state::block_num > > + member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > > > > node_transaction_index; @@ -98,6 +93,7 @@ namespace eosio { }; struct by_block_id; + struct by_block_num; typedef multi_index_container< eosio::peer_block_state, @@ -121,14 +117,6 @@ namespace eosio { > peer_block_state_index; - struct update_block_num { - uint32_t new_bnum; - update_block_num(uint32_t bnum) : new_bnum(bnum) {} - void operator() (node_transaction_state& nts) { - nts.block_num = new_bnum; - } - }; - class sync_manager { private: enum stages { @@ -200,12 +188,10 @@ namespace eosio { bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const; bool have_block(const block_id_type& blkid) const; - bool add_peer_txn( const node_transaction_state& nts ); - void update_txns_block_num( const signed_block_ptr& sb ); - void update_txns_block_num( const transaction_id_type& id, uint32_t blk_num ); - bool peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const; + bool add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, uint32_t connection_id, + const time_point_sec& now = time_point::now() ); bool have_txn( const transaction_id_type& tid ) const; - void expire_txns( uint32_t lib_num ); + void expire_txns(); }; class net_plugin_impl : public std::enable_shared_from_this { @@ -244,6 +230,7 @@ namespace eosio { uint32_t max_client_count = 0; uint32_t max_nodes_per_host = 1; bool p2p_accept_transactions = true; + fc::microseconds p2p_dedup_cache_expire_time_us{}; /// Peer clock may be no more than 1 second skewed from our clock, including network latency. const std::chrono::system_clock::duration peer_authentication_interval{std::chrono::seconds{1}}; @@ -1956,53 +1943,30 @@ namespace eosio { return false; } - bool dispatch_manager::add_peer_txn( const node_transaction_state& nts ) { + bool dispatch_manager::add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, + uint32_t connection_id, const time_point_sec& now ) { std::lock_guard g( local_txns_mtx ); - auto tptr = local_txns.get().find( std::make_tuple( std::ref( nts.id ), nts.connection_id ) ); + auto tptr = local_txns.get().find( std::make_tuple( std::ref( id ), connection_id ) ); bool added = (tptr == local_txns.end()); if( added ) { - local_txns.insert( nts ); + // expire at either transaction expiration or configured max expire time whichever is less + time_point_sec expires = now + my_impl->p2p_dedup_cache_expire_time_us; + expires = std::min( trx_expires, expires ); + local_txns.insert( node_transaction_state{ + .id = id, + .expires = expires, + .connection_id = connection_id} ); } return added; } - // thread safe - void dispatch_manager::update_txns_block_num( const signed_block_ptr& sb ) { - update_block_num ubn( sb->block_num() ); - std::lock_guard g( local_txns_mtx ); - for( const auto& recpt : sb->transactions ) { - const transaction_id_type& id = (recpt.trx.index() == 0) ? std::get(recpt.trx) - : std::get(recpt.trx).id(); - auto range = local_txns.get().equal_range( id ); - for( auto itr = range.first; itr != range.second; ++itr ) { - local_txns.modify( itr, ubn ); - } - } - } - - // thread safe - void dispatch_manager::update_txns_block_num( const transaction_id_type& id, uint32_t blk_num ) { - update_block_num ubn( blk_num ); - std::lock_guard g( local_txns_mtx ); - auto range = local_txns.get().equal_range( id ); - for( auto itr = range.first; itr != range.second; ++itr ) { - local_txns.modify( itr, ubn ); - } - } - - bool dispatch_manager::peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const { - std::lock_guard g( local_txns_mtx ); - const auto tptr = local_txns.get().find( std::make_tuple( std::ref( tid ), connection_id ) ); - return tptr != local_txns.end(); - } - bool dispatch_manager::have_txn( const transaction_id_type& tid ) const { std::lock_guard g( local_txns_mtx ); const auto tptr = local_txns.get().find( tid ); return tptr != local_txns.end(); } - void dispatch_manager::expire_txns( uint32_t lib_num ) { + void dispatch_manager::expire_txns() { size_t start_size = 0, end_size = 0; std::unique_lock g( local_txns_mtx ); @@ -2011,12 +1975,6 @@ namespace eosio { auto ex_lo = old.lower_bound( fc::time_point_sec( 0 ) ); auto ex_up = old.upper_bound( time_point::now() ); old.erase( ex_lo, ex_up ); - g.unlock(); // allow other threads opportunity to use local_txns - - g.lock(); - auto& stale = local_txns.get(); - stale.erase( stale.lower_bound( 1 ), stale.upper_bound( lib_num ) ); - end_size = local_txns.size(); g.unlock(); fc_dlog( logger, "expire_local_txns size ${s} removed ${r}", ("s", start_size)( "r", start_size - end_size ) ); @@ -2092,17 +2050,13 @@ namespace eosio { } void dispatch_manager::bcast_transaction(const packed_transaction& trx) { - const auto& id = trx.id(); - time_point_sec trx_expiration = trx.expiration(); - node_transaction_state nts = {id, trx_expiration, 0, 0}; - std::shared_ptr> send_buffer; - for_each_connection( [this, &trx, &nts, &send_buffer]( auto& cp ) { + const auto now = fc::time_point::now(); + for_each_connection( [this, &trx, &now, &send_buffer]( auto& cp ) { if( cp->is_blocks_only_connection() || !cp->current() ) { return true; } - nts.connection_id = cp->connection_id; - if( !add_peer_txn(nts) ) { + if( !add_peer_txn(trx.id(), trx.expiration(), cp->connection_id, now) ) { return true; } if( !send_buffer ) { @@ -2119,11 +2073,7 @@ namespace eosio { void dispatch_manager::rejected_transaction(const packed_transaction_ptr& trx, uint32_t head_blk_num) { fc_dlog( logger, "not sending rejected transaction ${tid}", ("tid", trx->id()) ); - // keep rejected transaction around for awhile so we don't broadcast it - // update its block number so it will be purged when current block number is lib - if( trx->expiration() > fc::time_point::now() ) { // no need to update blk_num if already expired - update_txns_block_num( trx->id(), head_blk_num ); - } + // keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns } // called from connection strand @@ -2975,8 +2925,7 @@ namespace eosio { } bool have_trx = my_impl->dispatcher->have_txn( tid ); - node_transaction_state nts = {tid, trx->expiration(), 0, connection_id}; - my_impl->dispatcher->add_peer_txn( nts ); + my_impl->dispatcher->add_peer_txn( tid, trx->expiration(), connection_id ); if( have_trx ) { fc_dlog( logger, "got a duplicate transaction - dropping ${id}", ("id", tid) ); @@ -3067,7 +3016,6 @@ namespace eosio { boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() { fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) ); dispatcher->add_peer_block( blk_id, cid ); - dispatcher->update_txns_block_num( msg ); }); c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() { dispatcher->recv_block( c, blk_id, blk_num ); @@ -3159,7 +3107,7 @@ namespace eosio { uint32_t lib = 0; std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = get_chain_info(); dispatcher->expire_blocks( lib ); - dispatcher->expire_txns( lib ); + dispatcher->expire_txns(); fc_dlog( logger, "expire_txns ${n}us", ("n", time_point::now() - now) ); start_expire_timer(); @@ -3403,7 +3351,8 @@ namespace eosio { "Tuple of [PublicKey, WIF private key] (may specify multiple times)") ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") ( "connection-cleanup-period", bpo::value()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections") - ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in millisec") + ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") + ( "p2p-dedup-cache-expire-time-sec", bpo::value()->default_value(10), "Maximum time to track transaction for duplicate optimization") ( "net-threads", bpo::value()->default_value(my->thread_pool_size), "Number of worker threads in net_plugin thread pool" ) ( "sync-fetch-span", bpo::value()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization") @@ -3436,6 +3385,7 @@ namespace eosio { my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as()); my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as(); my->txn_exp_period = def_txn_expire_wait; + my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as() ); my->resp_expected_period = def_resp_expected_wait; my->max_client_count = options.at( "max-clients" ).as(); my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as();