Skip to content

Commit 22793e5

Browse files
committed
core: fix race conditions in txpool (ethereum#23474)
1 parent 2719c19 commit 22793e5

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

core/tx_list.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"math"
2222
"math/big"
2323
"sort"
24+
"sync"
25+
"sync/atomic"
2426
"time"
2527

2628
"github.com/XinFinOrg/XDPoSChain/common"
@@ -487,9 +489,10 @@ func (h *priceHeap) Pop() interface{} {
487489
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
488490
// the floating heap is better. When baseFee is decreasing they behave similarly.
489491
type txPricedList struct {
490-
all *txLookup // Pointer to the map of all transactions
491-
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
492-
stales int // Number of stale price points to (re-heap trigger)
492+
all *txLookup // Pointer to the map of all transactions
493+
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
494+
stales int64 // Number of stale price points to (re-heap trigger)
495+
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
493496
}
494497

495498
const (
@@ -519,8 +522,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
519522
// the heap if a large enough ratio of transactions go stale.
520523
func (l *txPricedList) Removed(count int) {
521524
// Bump the stale counter, but exit if still too low (< 25%)
522-
l.stales += count
523-
if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 {
525+
stales := atomic.AddInt64(&l.stales, int64(count))
526+
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
524527
return
525528
}
526529
// Seems we've reached a critical number of stale transactions, reheap
@@ -544,7 +547,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool
544547
for len(h.list) > 0 {
545548
head := h.list[0]
546549
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
547-
l.stales--
550+
atomic.AddInt64(&l.stales, -1)
548551
heap.Pop(h)
549552
continue
550553
}
@@ -570,7 +573,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
570573
// Discard stale transactions if found during cleanup
571574
tx := heap.Pop(&l.urgent).(*types.Transaction)
572575
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
573-
l.stales--
576+
atomic.AddInt64(&l.stales, -1)
574577
continue
575578
}
576579
// Non stale transaction found, move to floating heap
@@ -583,7 +586,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
583586
// Discard stale transactions if found during cleanup
584587
tx := heap.Pop(&l.floating).(*types.Transaction)
585588
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
586-
l.stales--
589+
atomic.AddInt64(&l.stales, -1)
587590
continue
588591
}
589592
// Non stale transaction found, discard it
@@ -603,8 +606,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
603606

604607
// Reheap forcibly rebuilds the heap based on the current remote transaction set.
605608
func (l *txPricedList) Reheap() {
609+
l.reheapMu.Lock()
610+
defer l.reheapMu.Unlock()
606611
start := time.Now()
607-
l.stales = 0
612+
atomic.StoreInt64(&l.stales, 0)
608613
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
609614
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
610615
l.urgent.list = append(l.urgent.list, tx)

core/tx_pool.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/big"
2424
"sort"
2525
"sync"
26+
"sync/atomic"
2627
"time"
2728

2829
"github.com/XinFinOrg/XDPoSChain/common"
@@ -296,6 +297,7 @@ type TxPool struct {
296297
reorgDoneCh chan chan struct{}
297298
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
298299
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
300+
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
299301

300302
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
301303

@@ -329,6 +331,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
329331
queueTxEventCh: make(chan *types.Transaction),
330332
reorgDoneCh: make(chan chan struct{}),
331333
reorgShutdownCh: make(chan struct{}),
334+
initDoneCh: make(chan struct{}),
332335
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
333336
trc21FeeCapacity: map[common.Address]*big.Int{},
334337
}
@@ -383,6 +386,8 @@ func (pool *TxPool) loop() {
383386
defer evict.Stop()
384387
defer journal.Stop()
385388

389+
// Notify tests that the init phase is done
390+
close(pool.initDoneCh)
386391
for {
387392
select {
388393
// Handle ChainHeadEvent
@@ -401,8 +406,8 @@ func (pool *TxPool) loop() {
401406
case <-report.C:
402407
pool.mu.RLock()
403408
pending, queued := pool.stats()
404-
stales := pool.priced.stales
405409
pool.mu.RUnlock()
410+
stales := int(atomic.LoadInt64(&pool.priced.stales))
406411

407412
if pending != prevPending || queued != prevQueued || stales != prevStales {
408413
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)

core/tx_pool_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"math/big"
2323
"math/rand"
2424
"os"
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

@@ -79,7 +80,7 @@ func (bc *testBlockChain) Config() *params.ChainConfig {
7980

8081
func (bc *testBlockChain) CurrentBlock() *types.Block {
8182
return types.NewBlock(&types.Header{
82-
GasLimit: bc.gasLimit,
83+
GasLimit: atomic.LoadUint64(&bc.gasLimit),
8384
}, nil, nil, nil)
8485
}
8586

@@ -139,6 +140,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK
139140
key, _ := crypto.GenerateKey()
140141
pool := NewTxPool(testTxPoolConfig, config, blockchain)
141142

143+
// wait for the pool to initialize
144+
<-pool.initDoneCh
142145
return pool, key
143146
}
144147

@@ -646,7 +649,7 @@ func TestTransactionDropping(t *testing.T) {
646649
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
647650
}
648651
// Reduce the block gas limit, check that invalidated transactions are dropped
649-
pool.chain.(*testBlockChain).gasLimit = 100
652+
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
650653
<-pool.requestReset(nil, nil)
651654

652655
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {

0 commit comments

Comments
 (0)