Skip to content

Commit

Permalink
Improve transaction relaying (XRPLF#4985)
Browse files Browse the repository at this point in the history
    Process held transactions through existing NetworkOPs batching:

    * Ensures that successful transactions are broadcast to peers,
      appropriate failed transactions are held for later attempts, fee
      changes are sent to subscribers, etc.

    Pop all transactions with sequential sequences, or tickets

    Give a transaction more chances to be retried:

    * Hold if the transaction gets a ter, tel, or tef result.
    * Use the new SF_HELD flag to ultimately prevent the transaction from
      being held and retried too many times.

    Decrease `shouldRelay` limit to 30s:

    * Allows transactions, validator lists, proposals, and validations to be
      relayed more often, but only when triggered by another event, such as
      receiving it from a peer
    * Decrease from 5min.
    * Expected to help transaction throughput on poorly connected networks.
  • Loading branch information
ximinez committed Apr 26, 2024
1 parent b84f7e7 commit cf6cfe1
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 60 deletions.
5 changes: 5 additions & 0 deletions src/ripple/app/ledger/LocalTxs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ namespace ripple {
class LocalTxs
{
public:
// The number of ledgers to hold a transaction is essentially
// arbitrary. It should be sufficient to allow the transaction to
// get into a fully-validated ledger.
static constexpr int holdLedgers = 5;

virtual ~LocalTxs() = default;

// Add a new local transaction
Expand Down
29 changes: 10 additions & 19 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,26 +548,17 @@ LedgerMaster::storeLedger(std::shared_ptr<Ledger const> ledger)
void
LedgerMaster::applyHeldTransactions()
{
std::lock_guard sl(m_mutex);
CanonicalTXSet const set = [this]() {
std::lock_guard sl(m_mutex);
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
CanonicalTXSet set(app_.openLedger().current()->info().parentHash);
std::swap(mHeldTransactions, set);
return set;
}();

app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});

// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
// it.
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);
if (!set.empty())
app_.getOPs().processTransactionSet(set);
}

std::shared_ptr<STTx const>
Expand Down
7 changes: 1 addition & 6 deletions src/ripple/app/ledger/impl/LocalTxs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,9 @@ namespace ripple {
class LocalTx
{
public:
// The number of ledgers to hold a transaction is essentially
// arbitrary. It should be sufficient to allow the transaction to
// get into a fully-validated ledger.
static int const holdLedgers = 5;

LocalTx(LedgerIndex index, std::shared_ptr<STTx const> const& txn)
: m_txn(txn)
, m_expire(index + holdLedgers)
, m_expire(index + LocalTxs::holdLedgers)
, m_id(txn->getTransactionID())
, m_account(txn->getAccountID(sfAccount))
, m_seqProxy(txn->getSeqProxy())
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ class ApplicationImp : public Application, public BasicApp

, hashRouter_(std::make_unique<HashRouter>(
stopwatch(),
HashRouter::getDefaultHoldTime()))
HashRouter::getDefaultHoldTime(),
HashRouter::getDefaultRelayTime()))

, mValidations(
ValidationParms(),
Expand Down
12 changes: 8 additions & 4 deletions src/ripple/app/misc/CanonicalTXSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@ CanonicalTXSet::popAcctTransaction(std::shared_ptr<STTx const> const& tx)
// 1. Prioritize transactions with Sequences over transactions with
// Tickets.
//
// 2. Don't worry about consecutive Sequence numbers. Creating Tickets
// can introduce a discontinuity in Sequence numbers.
// 2. For transactions not using Tickets, look for consecutive Sequence
// numbers. For transactions using Tickets, don't worry about
// consecutive Sequence numbers. Tickets can process out of order.
//
// 3. After handling all transactions with Sequences, return Tickets
// with the lowest Ticket ID first.
std::shared_ptr<STTx const> result;
uint256 const effectiveAccount{accountKey(tx->getAccountID(sfAccount))};

Key const after(effectiveAccount, tx->getSeqProxy(), beast::zero);
auto const seqProxy = tx->getSeqProxy();
Key const after(effectiveAccount, seqProxy, beast::zero);
auto const itrNext{map_.lower_bound(after)};
if (itrNext != map_.end() &&
itrNext->first.getAccount() == effectiveAccount)
itrNext->first.getAccount() == effectiveAccount &&
(!itrNext->second->getSeqProxy().isSeq() ||
itrNext->second->getSeqProxy().value() == seqProxy.value() + 1))
{
result = std::move(itrNext->second);
map_.erase(itrNext);
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ HashRouter::shouldRelay(uint256 const& key)

auto& s = emplace(key).first;

if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
if (!s.shouldRelay(suppressionMap_.clock().now(), relayTime_))
return {};

return s.releasePeerSet();
Expand Down
23 changes: 19 additions & 4 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace ripple {
// TODO convert these macros to int constants or an enum
#define SF_BAD 0x02 // Temporarily bad
#define SF_SAVED 0x04
#define SF_HELD 0x08 // Held by LedgerMaster after potential processing failure
#define SF_TRUSTED 0x10 // comes from trusted source

// Private flags, used internally in apply.cpp.
Expand Down Expand Up @@ -143,8 +144,21 @@ class HashRouter
return 300s;
}

HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
: suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds)
static inline std::chrono::seconds
getDefaultRelayTime()
{
using namespace std::chrono;

return 30s;
}

HashRouter(
Stopwatch& clock,
std::chrono::seconds entryHoldTime,
std::chrono::seconds entryRelayTime)
: suppressionMap_(clock)
, holdTime_(entryHoldTime)
, relayTime_(entryRelayTime)
{
}

Expand Down Expand Up @@ -195,11 +209,11 @@ class HashRouter
Effects:
If the item should be relayed, this function will not
return `true` again until the hold time has expired.
return a seated optional again until the relay time has expired.
The internal set of peers will also be reset.
@return A `std::optional` set of peers which do not need to be
relayed to. If the result is uninitialized, the item should
relayed to. If the result is unseated, the item should
_not_ be relayed.
*/
std::optional<std::set<PeerShortID>>
Expand All @@ -221,6 +235,7 @@ class HashRouter
suppressionMap_;

std::chrono::seconds const holdTime_;
std::chrono::seconds const relayTime_;
};

} // namespace ripple
Expand Down
Loading

0 comments on commit cf6cfe1

Please sign in to comment.