Skip to content

Commit 067084f

Browse files
core: fix race conditions in txpool (#23474)
* core: fix race conditions in txpool * core: fixed races in the txpool * core: rebased on master * core: move reheap mutex * core: renamed mutex * core: revert Reheap changes
1 parent d019e90 commit 067084f

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

core/tx_list.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"math"
2222
"math/big"
2323
"sort"
24+
"sync"
25+
"sync/atomic"
2426
"time"
2527

2628
"github.com/ethereum/go-ethereum/common"
@@ -478,9 +480,10 @@ func (h *priceHeap) Pop() interface{} {
478480
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
479481
// the floating heap is better. When baseFee is decreasing they behave similarly.
480482
type txPricedList struct {
481-
all *txLookup // Pointer to the map of all transactions
482-
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
483-
stales int // Number of stale price points to (re-heap trigger)
483+
all *txLookup // Pointer to the map of all transactions
484+
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
485+
stales int64 // Number of stale price points to (re-heap trigger)
486+
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
484487
}
485488

486489
const (
@@ -510,8 +513,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
510513
// the heap if a large enough ratio of transactions go stale.
511514
func (l *txPricedList) Removed(count int) {
512515
// Bump the stale counter, but exit if still too low (< 25%)
513-
l.stales += count
514-
if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 {
516+
stales := atomic.AddInt64(&l.stales, int64(count))
517+
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
515518
return
516519
}
517520
// Seems we've reached a critical number of stale transactions, reheap
@@ -535,7 +538,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool
535538
for len(h.list) > 0 {
536539
head := h.list[0]
537540
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
538-
l.stales--
541+
atomic.AddInt64(&l.stales, -1)
539542
heap.Pop(h)
540543
continue
541544
}
@@ -561,7 +564,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
561564
// Discard stale transactions if found during cleanup
562565
tx := heap.Pop(&l.urgent).(*types.Transaction)
563566
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
564-
l.stales--
567+
atomic.AddInt64(&l.stales, -1)
565568
continue
566569
}
567570
// Non stale transaction found, move to floating heap
@@ -574,7 +577,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
574577
// Discard stale transactions if found during cleanup
575578
tx := heap.Pop(&l.floating).(*types.Transaction)
576579
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
577-
l.stales--
580+
atomic.AddInt64(&l.stales, -1)
578581
continue
579582
}
580583
// Non stale transaction found, discard it
@@ -594,8 +597,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
594597

595598
// Reheap forcibly rebuilds the heap based on the current remote transaction set.
596599
func (l *txPricedList) Reheap() {
600+
l.reheapMu.Lock()
601+
defer l.reheapMu.Unlock()
597602
start := time.Now()
598-
l.stales = 0
603+
atomic.StoreInt64(&l.stales, 0)
599604
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
600605
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
601606
l.urgent.list = append(l.urgent.list, tx)

core/tx_pool.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"math/big"
2323
"sort"
2424
"sync"
25+
"sync/atomic"
2526
"time"
2627

2728
"github.com/ethereum/go-ethereum/common"
@@ -264,6 +265,7 @@ type TxPool struct {
264265
reorgDoneCh chan chan struct{}
265266
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
266267
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
268+
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
267269

268270
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
269271
}
@@ -294,6 +296,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
294296
queueTxEventCh: make(chan *types.Transaction),
295297
reorgDoneCh: make(chan chan struct{}),
296298
reorgShutdownCh: make(chan struct{}),
299+
initDoneCh: make(chan struct{}),
297300
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
298301
}
299302
pool.locals = newAccountSet(pool.signer)
@@ -347,6 +350,8 @@ func (pool *TxPool) loop() {
347350
defer evict.Stop()
348351
defer journal.Stop()
349352

353+
// Notify tests that the init phase is done
354+
close(pool.initDoneCh)
350355
for {
351356
select {
352357
// Handle ChainHeadEvent
@@ -365,8 +370,8 @@ func (pool *TxPool) loop() {
365370
case <-report.C:
366371
pool.mu.RLock()
367372
pending, queued := pool.stats()
368-
stales := pool.priced.stales
369373
pool.mu.RUnlock()
374+
stales := int(atomic.LoadInt64(&pool.priced.stales))
370375

371376
if pending != prevPending || queued != prevQueued || stales != prevStales {
372377
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)

core/tx_pool_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/big"
2525
"math/rand"
2626
"os"
27+
"sync/atomic"
2728
"testing"
2829
"time"
2930

@@ -64,7 +65,7 @@ type testBlockChain struct {
6465

6566
func (bc *testBlockChain) CurrentBlock() *types.Block {
6667
return types.NewBlock(&types.Header{
67-
GasLimit: bc.gasLimit,
68+
GasLimit: atomic.LoadUint64(&bc.gasLimit),
6869
}, nil, nil, nil, trie.NewStackTrie(nil))
6970
}
7071

@@ -123,6 +124,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK
123124
key, _ := crypto.GenerateKey()
124125
pool := NewTxPool(testTxPoolConfig, config, blockchain)
125126

127+
// wait for the pool to initialize
128+
<-pool.initDoneCh
126129
return pool, key
127130
}
128131

@@ -625,7 +628,7 @@ func TestTransactionDropping(t *testing.T) {
625628
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
626629
}
627630
// Reduce the block gas limit, check that invalidated transactions are dropped
628-
pool.chain.(*testBlockChain).gasLimit = 100
631+
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
629632
<-pool.requestReset(nil, nil)
630633

631634
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {

0 commit comments

Comments
 (0)