Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: remove light mode in block fetcher #2804

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 5 additions & 88 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ var (
blockInsertFailGauge = metrics.NewRegisteredGauge("chain/insert/failed", nil)
)

// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func(common.Hash) *types.Header

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

Expand All @@ -96,9 +93,6 @@ type chainHeightFn func() uint64
// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height.
type chainFinalizedHeightFn func() uint64

// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)

// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

Expand Down Expand Up @@ -163,8 +157,6 @@ func (inject *blockOrHeaderInject) hash() common.Hash {
// BlockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type BlockFetcher struct {
light bool // The indicator whether it's a light fetcher or normal one.

// Various event channels
notify chan *blockAnnounce
inject chan *blockOrHeaderInject
Expand All @@ -190,13 +182,11 @@ type BlockFetcher struct {
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving

Expand All @@ -209,11 +199,9 @@ type BlockFetcher struct {
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn,
broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn,
insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn,
chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
Expand All @@ -229,13 +217,11 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
chainFinalizedHeight: chainFinalizedHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
Expand Down Expand Up @@ -382,15 +368,11 @@ func (f *BlockFetcher) loop() {
}
// Otherwise if fresh and still unknown, try and import
finalizedHeight := f.chainFinalizedHeight()
if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
if (number+maxUncleDist < height) || number <= finalizedHeight || f.getBlock(hash) != nil {
f.forgetBlock(hash)
continue
}
if f.light {
f.importHeaders(op)
} else {
f.importBlocks(op)
}
f.importBlocks(op)
}
// Wait for an outside event to occur
select {
Expand Down Expand Up @@ -457,12 +439,6 @@ func (f *BlockFetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)

// Now only direct block injection is allowed, drop the header injection
// here silently if we receive.
if f.light {
continue
}
f.enqueue(op.origin, nil, op.block)

case hash := <-f.done:
Expand All @@ -478,16 +454,13 @@ func (f *BlockFetcher) loop() {
// In current LES protocol(les2/les3), only header announce is
// available, no need to wait too much time for header broadcast.
timeout := arriveTimeout - gatherSlack
if f.light {
timeout = 0
}
if time.Since(announces[0].time) > timeout {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)

// If the block still didn't arrive, queue for fetching
if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
Expand Down Expand Up @@ -621,16 +594,6 @@ func (f *BlockFetcher) loop() {
f.forgetHash(hash)
continue
}
// Collect all headers only if we are running in light
// mode and the headers are not imported by other means.
if f.light {
if f.getHeader(hash) == nil {
announce.header = header
lightHeaders = append(lightHeaders, announce)
}
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
if f.getBlock(hash) == nil {
announce.header = header
Expand Down Expand Up @@ -766,12 +729,6 @@ func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
if len(f.announced) == 0 {
return
}
// Schedule announcement retrieval quickly for light mode
// since server won't send any headers to client.
if f.light {
fetch.Reset(lightTimeout)
return
}
// Otherwise find the earliest expiring announcement
earliest := time.Now()
for _, announces := range f.announced {
Expand Down Expand Up @@ -851,46 +808,6 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
}
}

// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
peer := op.origin
header := op.header
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
// If the parent's unknown, abort insertion
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
// forget block first, then re-queue
f.done <- hash
time.Sleep(reQueueBlockTimeout)
f.requeue <- op
return
}

defer func() { f.done <- hash }()
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
}
}()
}

// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
Expand Down
Loading
Loading