Skip to content

Commit 0dbe8bd

Browse files
committed
core: dedup txpool announced events, discard stales
1 parent 9df2663 commit 0dbe8bd

File tree

1 file changed

+38
-12
lines changed

1 file changed

+38
-12
lines changed

core/tx_pool.go

+38-12
Original file line numberDiff line numberDiff line change
@@ -915,16 +915,20 @@ func (pool *TxPool) scheduleReorgLoop() {
915915
launchNextRun bool
916916
reset *txpoolResetRequest
917917
dirtyAccounts *accountSet
918-
queuedEvents []*types.Transaction
918+
queuedEvents = make(map[common.Address]*txSortedMap)
919919
)
920920
for {
921-
// Launch next run if needed.
921+
// Launch next background reorg if needed
922922
if curDone == nil && launchNextRun {
923+
// Run the background reorg and announcements
923924
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
924-
curDone = nextDone
925-
nextDone = make(chan struct{})
925+
926+
// Prepare everything for the next round of reorg
927+
curDone, nextDone = nextDone, make(chan struct{})
926928
launchNextRun = false
927-
reset, dirtyAccounts, queuedEvents = nil, nil, nil
929+
930+
reset, dirtyAccounts = nil, nil
931+
queuedEvents = make(map[common.Address]*txSortedMap)
928932
}
929933

930934
select {
@@ -951,7 +955,11 @@ func (pool *TxPool) scheduleReorgLoop() {
951955
case tx := <-pool.queueTxEventCh:
952956
// Queue up the event, but don't schedule a reorg. It's up to the caller to
953957
// request one later if they want the events sent.
954-
queuedEvents = append(queuedEvents, tx)
958+
addr, _ := types.Sender(pool.signer, tx)
959+
if _, ok := queuedEvents[addr]; !ok {
960+
queuedEvents[addr] = newTxSortedMap()
961+
}
962+
queuedEvents[addr].Put(tx)
955963

956964
case <-curDone:
957965
curDone = nil
@@ -968,30 +976,48 @@ func (pool *TxPool) scheduleReorgLoop() {
968976
}
969977

970978
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
971-
func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events []*types.Transaction) {
979+
func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
972980
defer close(done)
973981

974982
var promoteAddrs []common.Address
975983
if dirtyAccounts != nil {
976984
promoteAddrs = dirtyAccounts.flatten()
977985
}
978-
979986
pool.mu.Lock()
980987
if reset != nil {
988+
// Reset from the old head to the new, rescheduling any reorged transactions
981989
pool.reset(reset.oldHead, reset.newHead)
982-
// Reset needs promote for all addresses.
990+
991+
// Nonces were reset, discard any events that became stale
992+
for addr := range events {
993+
events[addr].Forward(pool.pendingState.GetNonce(addr))
994+
if events[addr].Len() == 0 {
995+
delete(events, addr)
996+
}
997+
}
998+
// Reset needs promote for all addresses
983999
promoteAddrs = promoteAddrs[:0]
9841000
for addr := range pool.queue {
9851001
promoteAddrs = append(promoteAddrs, addr)
9861002
}
9871003
}
9881004
promoted := pool.promoteExecutables(promoteAddrs)
989-
events = append(events, promoted...)
1005+
for _, tx := range promoted {
1006+
addr, _ := types.Sender(pool.signer, tx)
1007+
if _, ok := events[addr]; !ok {
1008+
events[addr] = newTxSortedMap()
1009+
}
1010+
events[addr].Put(tx)
1011+
}
9901012
pool.mu.Unlock()
9911013

992-
// Notify subsystems for newly added transactions.
1014+
// Notify subsystems for newly added transactions
9931015
if len(events) > 0 {
994-
pool.txFeed.Send(NewTxsEvent{events})
1016+
var txs []*types.Transaction
1017+
for _, set := range events {
1018+
txs = append(txs, set.Flatten()...)
1019+
}
1020+
pool.txFeed.Send(NewTxsEvent{txs})
9951021
}
9961022
}
9971023

0 commit comments

Comments
 (0)