Skip to content

Commit e496eaf

Browse files
karalabelightclient
authored andcommitted
core/state: fix runaway alloc caused by prefetcher heap escape (ethereum#30629)
Co-authored-by: lightclient <[email protected]>
1 parent 61c2aed commit e496eaf

File tree

4 files changed

+115
-77
lines changed

4 files changed

+115
-77
lines changed

core/state/state_object.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
199199

200200
// Schedule the resolved storage slots for prefetching if it's enabled.
201201
if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash {
202-
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil {
202+
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, nil, []common.Hash{key}, true); err != nil {
203203
log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err)
204204
}
205205
}
@@ -237,7 +237,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common
237237
// finalise moves all dirty storage slots into the pending area to be hashed or
238238
// committed later. It is invoked at the end of every transaction.
239239
func (s *stateObject) finalise() {
240-
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
240+
slotsToPrefetch := make([]common.Hash, 0, len(s.dirtyStorage))
241241
for key, value := range s.dirtyStorage {
242242
if origin, exist := s.uncommittedStorage[key]; exist && origin == value {
243243
// The slot is reverted to its original value, delete the entry
@@ -250,7 +250,7 @@ func (s *stateObject) finalise() {
250250
// The slot is different from its original value and hasn't been
251251
// tracked for commit yet.
252252
s.uncommittedStorage[key] = s.GetCommittedState(key)
253-
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
253+
slotsToPrefetch = append(slotsToPrefetch, key) // Copy needed for closure
254254
}
255255
// Aggregate the dirty storage slots into the pending area. It might
256256
// be possible that the value of tracked slot here is same with the
@@ -261,7 +261,7 @@ func (s *stateObject) finalise() {
261261
s.pendingStorage[key] = value
262262
}
263263
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
264-
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil {
264+
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, nil, slotsToPrefetch, false); err != nil {
265265
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
266266
}
267267
}
@@ -323,7 +323,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
323323
// Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved.
324324
var (
325325
deletions []common.Hash
326-
used = make([][]byte, 0, len(s.uncommittedStorage))
326+
used = make([]common.Hash, 0, len(s.uncommittedStorage))
327327
)
328328
for key, origin := range s.uncommittedStorage {
329329
// Skip noop changes, persist actual changes
@@ -346,7 +346,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
346346
deletions = append(deletions, key)
347347
}
348348
// Cache the items for preloading
349-
used = append(used, common.CopyBytes(key[:])) // Copy needed for closure
349+
used = append(used, key) // Copy needed for closure
350350
}
351351
for _, key := range deletions {
352352
if err := tr.DeleteStorage(s.address, key[:]); err != nil {
@@ -356,7 +356,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
356356
s.db.StorageDeleted.Add(1)
357357
}
358358
if s.db.prefetcher != nil {
359-
s.db.prefetcher.used(s.addrHash, s.data.Root, used)
359+
s.db.prefetcher.used(s.addrHash, s.data.Root, nil, used)
360360
}
361361
s.uncommittedStorage = make(Storage) // empties the commit markers
362362
return tr, nil

core/state/statedb.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness)
214214
// the prefetcher is constructed. For more details, see:
215215
// https://github.com/ethereum/go-ethereum/issues/29880
216216
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil)
217-
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
217+
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, nil, false); err != nil {
218218
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
219219
}
220220
}
@@ -587,7 +587,7 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
587587
}
588588
// Schedule the resolved account for prefetching if it's enabled.
589589
if s.prefetcher != nil {
590-
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil {
590+
if err = s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, []common.Address{addr}, nil, true); err != nil {
591591
log.Error("Failed to prefetch account", "addr", addr, "err", err)
592592
}
593593
}
@@ -720,7 +720,7 @@ func (s *StateDB) GetRefund() uint64 {
720720
// the journal as well as the refunds. Finalise, however, will not push any updates
721721
// into the tries just yet. Only IntermediateRoot or Commit will do that.
722722
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
723-
addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties))
723+
addressesToPrefetch := make([]common.Address, 0, len(s.journal.dirties))
724724
for addr := range s.journal.dirties {
725725
obj, exist := s.stateObjects[addr]
726726
if !exist {
@@ -753,10 +753,10 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
753753
// At this point, also ship the address off to the precacher. The precacher
754754
// will start loading tries, and when the change is eventually committed,
755755
// the commit-phase will be a lot faster
756-
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
756+
addressesToPrefetch = append(addressesToPrefetch, addr) // Copy needed for closure
757757
}
758758
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
759-
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil {
759+
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, nil, false); err != nil {
760760
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
761761
}
762762
}
@@ -877,7 +877,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
877877
// into a shortnode. This requires `B` to be resolved from disk.
878878
// Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved.
879879
var (
880-
usedAddrs [][]byte
880+
usedAddrs []common.Address
881881
deletedAddrs []common.Address
882882
)
883883
for addr, op := range s.mutations {
@@ -892,7 +892,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
892892
s.updateStateObject(s.stateObjects[addr])
893893
s.AccountUpdated += 1
894894
}
895-
usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure
895+
usedAddrs = append(usedAddrs, addr) // Copy needed for closure
896896
}
897897
for _, deletedAddr := range deletedAddrs {
898898
s.deleteStateObject(deletedAddr)
@@ -901,7 +901,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
901901
s.AccountUpdates += time.Since(start)
902902

903903
if s.prefetcher != nil {
904-
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
904+
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs, nil)
905905
}
906906
// Track the amount of time wasted on hashing the account trie
907907
defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now())

core/state/trie_prefetcher.go

+96-54
Original file line numberDiff line numberDiff line change
@@ -118,31 +118,31 @@ func (p *triePrefetcher) report() {
118118
fetcher.wait() // ensure the fetcher's idle before poking in its internals
119119

120120
if fetcher.root == p.root {
121-
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
122-
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
121+
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr)))
122+
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr)))
123123

124124
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
125125
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
126126
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
127127

128-
for _, key := range fetcher.used {
129-
delete(fetcher.seenRead, string(key))
130-
delete(fetcher.seenWrite, string(key))
128+
for _, key := range fetcher.usedAddr {
129+
delete(fetcher.seenReadAddr, key)
130+
delete(fetcher.seenWriteAddr, key)
131131
}
132-
p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
132+
p.accountWasteMeter.Mark(int64(len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr)))
133133
} else {
134-
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
135-
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
134+
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot)))
135+
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot)))
136136

137137
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
138138
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
139139
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
140140

141-
for _, key := range fetcher.used {
142-
delete(fetcher.seenRead, string(key))
143-
delete(fetcher.seenWrite, string(key))
141+
for _, key := range fetcher.usedSlot {
142+
delete(fetcher.seenReadSlot, key)
143+
delete(fetcher.seenWriteSlot, key)
144144
}
145-
p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
145+
p.storageWasteMeter.Mark(int64(len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot)))
146146
}
147147
}
148148
}
@@ -158,7 +158,7 @@ func (p *triePrefetcher) report() {
158158
// upon the same contract, the parameters invoking this method may be
159159
// repeated.
160160
// 2. Finalize of the main account trie. This happens only once per block.
161-
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
161+
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, addrs []common.Address, slots []common.Hash, read bool) error {
162162
// If the state item is only being read, but reads are disabled, return
163163
if read && p.noreads {
164164
return nil
@@ -175,7 +175,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
175175
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
176176
p.fetchers[id] = fetcher
177177
}
178-
return fetcher.schedule(keys, read)
178+
return fetcher.schedule(addrs, slots, read)
179179
}
180180

181181
// trie returns the trie matching the root hash, blocking until the fetcher of
@@ -195,10 +195,12 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
195195

196196
// used marks a batch of state items used to allow creating statistics as to
197197
// how useful or wasteful the fetcher is.
198-
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
198+
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, usedAddr []common.Address, usedSlot []common.Hash) {
199199
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
200200
fetcher.wait() // ensure the fetcher's idle before poking in its internals
201-
fetcher.used = append(fetcher.used, used...)
201+
202+
fetcher.usedAddr = append(fetcher.usedAddr, usedAddr...)
203+
fetcher.usedSlot = append(fetcher.usedSlot, usedSlot...)
202204
}
203205
}
204206

@@ -235,44 +237,50 @@ type subfetcher struct {
235237
stop chan struct{} // Channel to interrupt processing
236238
term chan struct{} // Channel to signal interruption
237239

238-
seenRead map[string]struct{} // Tracks the entries already loaded via read operations
239-
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations
240+
seenReadAddr map[common.Address]struct{} // Tracks the accounts already loaded via read operations
241+
seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations
242+
seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations
243+
seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations
240244

241245
dupsRead int // Number of duplicate preload tasks via reads only
242246
dupsWrite int // Number of duplicate preload tasks via writes only
243247
dupsCross int // Number of duplicate preload tasks via read-write-crosses
244248

245-
used [][]byte // Tracks the entries used in the end
249+
usedAddr []common.Address // Tracks the accounts used in the end
250+
usedSlot []common.Hash // Tracks the storage used in the end
246251
}
247252

248253
// subfetcherTask is a trie path to prefetch, tagged with whether it originates
249254
// from a read or a write request.
250255
type subfetcherTask struct {
251256
read bool
252-
key []byte
257+
addr *common.Address
258+
slot *common.Hash
253259
}
254260

255261
// newSubfetcher creates a goroutine to prefetch state items belonging to a
256262
// particular root hash.
257263
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
258264
sf := &subfetcher{
259-
db: db,
260-
state: state,
261-
owner: owner,
262-
root: root,
263-
addr: addr,
264-
wake: make(chan struct{}, 1),
265-
stop: make(chan struct{}),
266-
term: make(chan struct{}),
267-
seenRead: make(map[string]struct{}),
268-
seenWrite: make(map[string]struct{}),
265+
db: db,
266+
state: state,
267+
owner: owner,
268+
root: root,
269+
addr: addr,
270+
wake: make(chan struct{}, 1),
271+
stop: make(chan struct{}),
272+
term: make(chan struct{}),
273+
seenReadAddr: make(map[common.Address]struct{}),
274+
seenWriteAddr: make(map[common.Address]struct{}),
275+
seenReadSlot: make(map[common.Hash]struct{}),
276+
seenWriteSlot: make(map[common.Hash]struct{}),
269277
}
270278
go sf.loop()
271279
return sf
272280
}
273281

274282
// schedule adds a batch of trie keys to the queue to prefetch.
275-
func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
283+
func (sf *subfetcher) schedule(addrs []common.Address, slots []common.Hash, read bool) error {
276284
// Ensure the subfetcher is still alive
277285
select {
278286
case <-sf.term:
@@ -281,8 +289,11 @@ func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
281289
}
282290
// Append the tasks to the current queue
283291
sf.lock.Lock()
284-
for _, key := range keys {
285-
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key})
292+
for _, addr := range addrs {
293+
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, addr: &addr})
294+
}
295+
for _, slot := range slots {
296+
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, slot: &slot})
286297
}
287298
sf.lock.Unlock()
288299

@@ -378,35 +389,66 @@ func (sf *subfetcher) loop() {
378389
sf.lock.Unlock()
379390

380391
for _, task := range tasks {
381-
key := string(task.key)
382-
if task.read {
383-
if _, ok := sf.seenRead[key]; ok {
384-
sf.dupsRead++
385-
continue
386-
}
387-
if _, ok := sf.seenWrite[key]; ok {
388-
sf.dupsCross++
389-
continue
392+
if task.addr != nil {
393+
key := *task.addr
394+
if task.read {
395+
if _, ok := sf.seenReadAddr[key]; ok {
396+
sf.dupsRead++
397+
continue
398+
}
399+
if _, ok := sf.seenWriteAddr[key]; ok {
400+
sf.dupsCross++
401+
continue
402+
}
403+
} else {
404+
if _, ok := sf.seenReadAddr[key]; ok {
405+
sf.dupsCross++
406+
continue
407+
}
408+
if _, ok := sf.seenWriteAddr[key]; ok {
409+
sf.dupsWrite++
410+
continue
411+
}
390412
}
391413
} else {
392-
if _, ok := sf.seenRead[key]; ok {
393-
sf.dupsCross++
394-
continue
395-
}
396-
if _, ok := sf.seenWrite[key]; ok {
397-
sf.dupsWrite++
398-
continue
414+
key := *task.slot
415+
if task.read {
416+
if _, ok := sf.seenReadSlot[key]; ok {
417+
sf.dupsRead++
418+
continue
419+
}
420+
if _, ok := sf.seenWriteSlot[key]; ok {
421+
sf.dupsCross++
422+
continue
423+
}
424+
} else {
425+
if _, ok := sf.seenReadSlot[key]; ok {
426+
sf.dupsCross++
427+
continue
428+
}
429+
if _, ok := sf.seenWriteSlot[key]; ok {
430+
sf.dupsWrite++
431+
continue
432+
}
399433
}
400434
}
401-
if len(task.key) == common.AddressLength {
402-
sf.trie.GetAccount(common.BytesToAddress(task.key))
435+
if task.addr != nil {
436+
sf.trie.GetAccount(*task.addr)
403437
} else {
404-
sf.trie.GetStorage(sf.addr, task.key)
438+
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
405439
}
406440
if task.read {
407-
sf.seenRead[key] = struct{}{}
441+
if task.addr != nil {
442+
sf.seenReadAddr[*task.addr] = struct{}{}
443+
} else {
444+
sf.seenReadSlot[*task.slot] = struct{}{}
445+
}
408446
} else {
409-
sf.seenWrite[key] = struct{}{}
447+
if task.addr != nil {
448+
sf.seenWriteAddr[*task.addr] = struct{}{}
449+
} else {
450+
sf.seenWriteSlot[*task.slot] = struct{}{}
451+
}
410452
}
411453
}
412454

0 commit comments

Comments
 (0)