Skip to content

Commit 5a4d6a9

Browse files
rvaggrjan90
authored andcommitted
chore(events): improve perf for parallel event filter matching
1. Cache address look-ups for the given tipset across filters 2. Run the filters in an errgroup
1 parent 6cd7982 commit 5a4d6a9

File tree

4 files changed

+62
-31
lines changed

4 files changed

+62
-31
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Lotus changelog
22

33
# UNRELEASED
4+
- Improve eth filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603))
45

56
# Node and Miner v1.31.0-rc1 / 2024-11-12
67

chain/events/filter/event.go

+52-23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ipfs/go-cid"
1111
cbg "github.com/whyrusleeping/cbor-gen"
12+
"golang.org/x/sync/errgroup"
1213
"golang.org/x/xerrors"
1314

1415
"github.com/filecoin-project/go-address"
@@ -28,7 +29,10 @@ func isIndexedValue(b uint8) bool {
2829
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
2930
}
3031

31-
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)
32+
// AddressResolver is a function that resolves an actor ID to an address. If the
33+
// actor ID cannot be resolved to an address, the function must return
34+
// address.Undef.
35+
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) address.Address
3236

3337
type EventFilter interface {
3438
Filter
@@ -77,9 +81,6 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
7781
return nil
7882
}
7983

80-
// cache of lookups between actor id and f4 address
81-
addressLookups := make(map[abi.ActorID]address.Address)
82-
8384
ems, err := te.messages(ctx)
8485
if err != nil {
8586
return xerrors.Errorf("load executed messages: %w", err)
@@ -89,16 +90,10 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
8990

9091
for msgIdx, em := range ems {
9192
for _, ev := range em.Events() {
92-
// lookup address corresponding to the actor id
93-
addr, found := addressLookups[ev.Emitter]
94-
if !found {
95-
var ok bool
96-
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
97-
if !ok {
98-
// not an address we will be able to match against
99-
continue
100-
}
101-
addressLookups[ev.Emitter] = addr
93+
addr := resolver(ctx, ev.Emitter, te.rctTs)
94+
if addr == address.Undef {
95+
// not an address we will be able to match against
96+
continue
10297
}
10398

10499
if !f.matchAddress(addr) {
@@ -295,7 +290,7 @@ func (e *executedMessage) Events() []*types.Event {
295290

296291
type EventFilterManager struct {
297292
ChainStore *cstore.ChainStore
298-
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
293+
AddressResolver AddressResolver
299294
MaxFilterResults int
300295
ChainIndexer index.Indexer
301296

@@ -319,11 +314,17 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
319314
load: m.loadExecutedMessages,
320315
}
321316

322-
// TODO: could run this loop in parallel with errgroup if there are many filters
317+
tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver)
318+
319+
g, ctx := errgroup.WithContext(ctx)
323320
for _, f := range m.filters {
324-
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
325-
return err
326-
}
321+
g.Go(func() error {
322+
return f.CollectEvents(ctx, tse, false, tsAddressResolver)
323+
})
324+
}
325+
326+
if err := g.Wait(); err != nil {
327+
return err
327328
}
328329

329330
return nil
@@ -344,11 +345,17 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
344345
load: m.loadExecutedMessages,
345346
}
346347

347-
// TODO: could run this loop in parallel with errgroup if there are many filters
348+
tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver)
349+
350+
g, ctx := errgroup.WithContext(ctx)
348351
for _, f := range m.filters {
349-
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
350-
return err
351-
}
352+
g.Go(func() error {
353+
return f.CollectEvents(ctx, tse, true, tsAddressResolver)
354+
})
355+
}
356+
357+
if err := g.Wait(); err != nil {
358+
return err
352359
}
353360

354361
return nil
@@ -507,3 +514,25 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
507514

508515
return ems, nil
509516
}
517+
518+
// tipSetCachedAddressResolver returns a thread-safe function that resolves actor IDs to addresses
519+
// with a cache that is shared across all calls to the returned function. This should only be used
520+
// for a single TipSet, as the resolution may vary across TipSets and the cache does not account for
521+
// this.
522+
func tipSetCachedAddressResolver(resolver AddressResolver) func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
523+
addressLookups := make(map[abi.ActorID]address.Address)
524+
var addressLookupsLk sync.Mutex
525+
526+
return func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
527+
addressLookupsLk.Lock()
528+
defer addressLookupsLk.Unlock()
529+
530+
addr, ok := addressLookups[emitter]
531+
if !ok {
532+
addr = resolver(ctx, emitter, ts)
533+
addressLookups[emitter] = addr
534+
}
535+
536+
return addr
537+
}
538+
}

chain/events/filter/event_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,10 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) {
441441
a[actorID] = addr
442442
}
443443

444-
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
444+
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
445445
ra, ok := a[emitter]
446-
return ra, ok
446+
if ok {
447+
return ra
448+
}
449+
return address.Undef
447450
}

node/modules/actorevent.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,18 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L
105105
fm := &filter.EventFilterManager{
106106
ChainStore: cs,
107107
ChainIndexer: ci,
108-
// TODO:
109-
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
110-
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
108+
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
111109
idAddr, err := address.NewIDAddress(uint64(emitter))
112110
if err != nil {
113-
return address.Undef, false
111+
return address.Undef
114112
}
115113

116114
actor, err := sm.LoadActor(ctx, idAddr, ts)
117115
if err != nil || actor.DelegatedAddress == nil {
118-
return idAddr, true
116+
return idAddr
119117
}
120118

121-
return *actor.DelegatedAddress, true
119+
return *actor.DelegatedAddress
122120
},
123121

124122
MaxFilterResults: cfg.MaxFilterResults,

0 commit comments

Comments
 (0)