Skip to content

Commit 78a3c32

Browse files
authored
core, core/rawdb, eth/sync: no tx indexing during snap sync (#28703)
This change simplifies the logic for indexing transactions and enhances the UX when transaction is not found by returning more information to users. Transaction indexing is now considered as a part of the initial sync, and `eth.syncing` will thus be `true` if transaction indexing is not yet finished. API consumers can use the syncing status to determine if the node is ready to serve users.
1 parent f55a10b commit 78a3c32

23 files changed

+446
-365
lines changed

core/blockchain.go

+114-88
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,24 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig {
185185
return &config
186186
}
187187

188+
// txLookup is wrapper over transaction lookup along with the corresponding
189+
// transaction object.
190+
type txLookup struct {
191+
lookup *rawdb.LegacyTxLookupEntry
192+
transaction *types.Transaction
193+
}
194+
195+
// TxIndexProgress is the struct describing the progress for transaction indexing.
196+
type TxIndexProgress struct {
197+
Indexed uint64 // number of blocks whose transactions are indexed
198+
Remaining uint64 // number of blocks whose transactions are not indexed yet
199+
}
200+
201+
// Done returns an indicator if the transaction indexing is finished.
202+
func (prog TxIndexProgress) Done() bool {
203+
return prog.Remaining == 0
204+
}
205+
188206
// BlockChain represents the canonical chain given a database with a genesis
189207
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
190208
//
@@ -242,15 +260,18 @@ type BlockChain struct {
242260
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
243261
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
244262
blockCache *lru.Cache[common.Hash, *types.Block]
245-
txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry]
263+
txLookupCache *lru.Cache[common.Hash, txLookup]
246264

247265
// future blocks are blocks added for later processing
248266
futureBlocks *lru.Cache[common.Hash, *types.Block]
249267

250-
wg sync.WaitGroup //
251-
quit chan struct{} // shutdown signal, closed in Stop.
252-
stopping atomic.Bool // false if chain is running, true when stopped
253-
procInterrupt atomic.Bool // interrupt signaler for block processing
268+
wg sync.WaitGroup
269+
quit chan struct{} // shutdown signal, closed in Stop.
270+
stopping atomic.Bool // false if chain is running, true when stopped
271+
procInterrupt atomic.Bool // interrupt signaler for block processing
272+
273+
txIndexRunning bool // flag if the background tx indexer is activated
274+
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing
254275

255276
engine consensus.Engine
256277
validator Validator // Block and state validator interface
@@ -297,8 +318,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
297318
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
298319
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
299320
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
300-
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit),
321+
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
301322
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
323+
txIndexProgCh: make(chan chan TxIndexProgress),
302324
engine: engine,
303325
vmConfig: vmConfig,
304326
}
@@ -466,6 +488,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
466488
// Start tx indexer/unindexer if required.
467489
if txLookupLimit != nil {
468490
bc.txLookupLimit = *txLookupLimit
491+
bc.txIndexRunning = true
469492

470493
bc.wg.Add(1)
471494
go bc.maintainTxIndex()
@@ -1155,14 +1178,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
11551178
// Ensure genesis is in ancients.
11561179
if first.NumberU64() == 1 {
11571180
if frozen, _ := bc.db.Ancients(); frozen == 0 {
1158-
b := bc.genesisBlock
11591181
td := bc.genesisBlock.Difficulty()
1160-
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
1161-
size += writeSize
1182+
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
11621183
if err != nil {
11631184
log.Error("Error writing genesis to ancients", "err", err)
11641185
return 0, err
11651186
}
1187+
size += writeSize
11661188
log.Info("Wrote genesis to ancients")
11671189
}
11681190
}
@@ -1176,44 +1198,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
11761198
// Write all chain data to ancients.
11771199
td := bc.GetTd(first.Hash(), first.NumberU64())
11781200
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
1179-
size += writeSize
11801201
if err != nil {
11811202
log.Error("Error importing chain data to ancients", "err", err)
11821203
return 0, err
11831204
}
1184-
1185-
// Write tx indices if any condition is satisfied:
1186-
// * If user requires to reserve all tx indices(txlookuplimit=0)
1187-
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
1188-
// * If block number is large enough to be regarded as a recent block
1189-
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
1190-
//
1191-
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
1192-
// an external ancient database, during the setup, blockchain will start
1193-
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
1194-
// range. In this case, all tx indices of newly imported blocks should be
1195-
// generated.
1196-
batch := bc.db.NewBatch()
1197-
for i, block := range blockChain {
1198-
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
1199-
rawdb.WriteTxLookupEntriesByBlock(batch, block)
1200-
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
1201-
rawdb.WriteTxLookupEntriesByBlock(batch, block)
1202-
}
1203-
stats.processed++
1204-
1205-
if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 {
1206-
size += int64(batch.ValueSize())
1207-
if err = batch.Write(); err != nil {
1208-
snapBlock := bc.CurrentSnapBlock().Number.Uint64()
1209-
if _, err := bc.db.TruncateHead(snapBlock + 1); err != nil {
1210-
log.Error("Can't truncate ancient store after failed insert", "err", err)
1211-
}
1212-
return 0, err
1213-
}
1214-
batch.Reset()
1215-
}
1216-
}
1205+
size += writeSize
12171206

12181207
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
12191208
if err := bc.db.Sync(); err != nil {
@@ -1231,8 +1220,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
12311220
}
12321221

12331222
// Delete block data from the main database.
1234-
batch.Reset()
1235-
canonHashes := make(map[common.Hash]struct{})
1223+
var (
1224+
batch = bc.db.NewBatch()
1225+
canonHashes = make(map[common.Hash]struct{})
1226+
)
12361227
for _, block := range blockChain {
12371228
canonHashes[block.Hash()] = struct{}{}
12381229
if block.NumberU64() == 0 {
@@ -1250,13 +1241,16 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
12501241
if err := batch.Write(); err != nil {
12511242
return 0, err
12521243
}
1244+
stats.processed += int32(len(blockChain))
12531245
return 0, nil
12541246
}
12551247

12561248
// writeLive writes blockchain and corresponding receipt chain into active store.
12571249
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
1258-
skipPresenceCheck := false
1259-
batch := bc.db.NewBatch()
1250+
var (
1251+
skipPresenceCheck = false
1252+
batch = bc.db.NewBatch()
1253+
)
12601254
for i, block := range blockChain {
12611255
// Short circuit insertion if shutting down or processing failed
12621256
if bc.insertStopped() {
@@ -1281,11 +1275,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
12811275
// Write all the data out into the database
12821276
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
12831277
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
1284-
rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed
12851278

12861279
// Write everything belongs to the blocks into the database. So that
1287-
// we can ensure all components of body is completed(body, receipts,
1288-
// tx indexes)
1280+
// we can ensure all components of body is completed(body, receipts)
1281+
// except transaction indexes(will be created once sync is finished).
12891282
if batch.ValueSize() >= ethdb.IdealBatchSize {
12901283
if err := batch.Write(); err != nil {
12911284
return 0, err
@@ -1317,19 +1310,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13171310
return n, err
13181311
}
13191312
}
1320-
// Write the tx index tail (block number from where we index) before write any live blocks
1321-
if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
1322-
// The tx index tail can only be one of the following two options:
1323-
// * 0: all ancient blocks have been indexed
1324-
// * ancient-limit: the indices of blocks before ancient-limit are ignored
1325-
if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
1326-
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {
1327-
rawdb.WriteTxIndexTail(bc.db, 0)
1328-
} else {
1329-
rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit)
1330-
}
1331-
}
1332-
}
13331313
if len(liveBlocks) > 0 {
13341314
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
13351315
if err == errInsertionInterrupted {
@@ -1338,13 +1318,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13381318
return n, err
13391319
}
13401320
}
1341-
1342-
head := blockChain[len(blockChain)-1]
1343-
context := []interface{}{
1344-
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
1345-
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
1346-
"size", common.StorageSize(size),
1347-
}
1321+
var (
1322+
head = blockChain[len(blockChain)-1]
1323+
context = []interface{}{
1324+
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
1325+
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
1326+
"size", common.StorageSize(size),
1327+
}
1328+
)
13481329
if stats.ignored > 0 {
13491330
context = append(context, []interface{}{"ignored", stats.ignored}...)
13501331
}
@@ -1360,7 +1341,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
13601341
if bc.insertStopped() {
13611342
return errInsertionInterrupted
13621343
}
1363-
13641344
batch := bc.db.NewBatch()
13651345
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
13661346
rawdb.WriteBlock(batch, block)
@@ -2427,23 +2407,24 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
24272407
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
24282408
defer func() { close(done) }()
24292409

2430-
// If head is 0, it means the chain is just initialized and no blocks are inserted,
2431-
// so don't need to indexing anything.
2410+
// If head is 0, it means the chain is just initialized and no blocks are
2411+
// inserted, so don't need to index anything.
24322412
if head == 0 {
24332413
return
24342414
}
2435-
24362415
// The tail flag is not existent, it means the node is just initialized
2437-
// and all blocks(may from ancient store) are not indexed yet.
2416+
// and all blocks in the chain (part of them may from ancient store) are
2417+
// not indexed yet, index the chain according to the configuration then.
24382418
if tail == nil {
24392419
from := uint64(0)
24402420
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
24412421
from = head - bc.txLookupLimit + 1
24422422
}
2443-
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
2423+
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
24442424
return
24452425
}
2446-
// The tail flag is existent, but the whole chain is required to be indexed.
2426+
// The tail flag is existent (which means indexes in [tail, head] should be
2427+
// present), while the whole chain are requested for indexing.
24472428
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
24482429
if *tail > 0 {
24492430
// It can happen when chain is rewound to a historical point which
@@ -2453,17 +2434,58 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{})
24532434
if end > head+1 {
24542435
end = head + 1
24552436
}
2456-
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
2437+
rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
24572438
}
24582439
return
24592440
}
2460-
// Update the transaction index to the new chain state
2441+
// The tail flag is existent, adjust the index range according to configuration
2442+
// and latest head.
24612443
if head-bc.txLookupLimit+1 < *tail {
24622444
// Reindex a part of missing indices and rewind index tail to HEAD-limit
2463-
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
2445+
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
24642446
} else {
24652447
// Unindex a part of stale indices and forward index tail to HEAD-limit
2466-
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
2448+
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
2449+
}
2450+
}
2451+
2452+
// reportTxIndexProgress returns the tx indexing progress.
2453+
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
2454+
var (
2455+
remaining uint64
2456+
tail = rawdb.ReadTxIndexTail(bc.db)
2457+
)
2458+
total := bc.txLookupLimit
2459+
if bc.txLookupLimit == 0 {
2460+
total = head + 1 // genesis included
2461+
}
2462+
var indexed uint64
2463+
if tail != nil {
2464+
indexed = head - *tail + 1
2465+
}
2466+
// The value of indexed might be larger than total if some blocks need
2467+
// to be unindexed, avoiding a negative remaining.
2468+
if indexed < total {
2469+
remaining = total - indexed
2470+
}
2471+
return TxIndexProgress{
2472+
Indexed: indexed,
2473+
Remaining: remaining,
2474+
}
2475+
}
2476+
2477+
// TxIndexProgress retrieves the tx indexing progress, or an error if the
2478+
// background tx indexer is not activated or already stopped.
2479+
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
2480+
if !bc.txIndexRunning {
2481+
return TxIndexProgress{}, errors.New("tx indexer is not activated")
2482+
}
2483+
ch := make(chan TxIndexProgress, 1)
2484+
select {
2485+
case bc.txIndexProgCh <- ch:
2486+
return <-ch, nil
2487+
case <-bc.quit:
2488+
return TxIndexProgress{}, errors.New("blockchain is closed")
24672489
}
24682490
}
24692491

@@ -2482,8 +2504,9 @@ func (bc *BlockChain) maintainTxIndex() {
24822504

24832505
// Listening to chain events and manipulate the transaction indexes.
24842506
var (
2485-
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
2486-
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
2507+
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
2508+
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
2509+
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
24872510
)
24882511
sub := bc.SubscribeChainHeadEvent(headCh)
24892512
if sub == nil {
@@ -2492,23 +2515,26 @@ func (bc *BlockChain) maintainTxIndex() {
24922515
defer sub.Unsubscribe()
24932516
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())
24942517

2495-
// Launch the initial processing if chain is not empty. This step is
2496-
// useful in these scenarios that chain has no progress and indexer
2497-
// is never triggered.
2498-
if head := rawdb.ReadHeadBlock(bc.db); head != nil {
2518+
// Launch the initial processing if chain is not empty (head != genesis).
2519+
// This step is useful in these scenarios that chain has no progress and
2520+
// indexer is never triggered.
2521+
if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
24992522
done = make(chan struct{})
2523+
lastHead = head.Number().Uint64()
25002524
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
25012525
}
2502-
25032526
for {
25042527
select {
25052528
case head := <-headCh:
25062529
if done == nil {
25072530
done = make(chan struct{})
25082531
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
25092532
}
2533+
lastHead = head.Block.NumberU64()
25102534
case <-done:
25112535
done = nil
2536+
case ch := <-bc.txIndexProgCh:
2537+
ch <- bc.reportTxIndexProgress(lastHead)
25122538
case <-bc.quit:
25132539
if done != nil {
25142540
log.Info("Waiting background transaction indexer to exit")

0 commit comments

Comments
 (0)