Skip to content

Commit 4f92348

Browse files
authored
feat(events): add max results error and default raw codec for eth_getLogs (#12671)
1 parent 8a5dc80 commit 4f92348

File tree

5 files changed

+281
-2
lines changed

5 files changed

+281
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
## Bug Fixes
77

88
- Make `EthTraceFilter` / `trace_filter` skip null rounds instead of erroring. ([filecoin-project/lotus#12702](https://github.com/filecoin-project/lotus/pull/12702))
9+
- Event APIs (`GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs`, `eth_newFilter`, etc.) will now return an error when a request matches more than `MaxFilterResults` (default: 10,000) rather than silently truncating the results. Also apply an internal event matcher for `eth_getLogs` (etc.) to avoid builtin actor events on database query so as not to include them in `MaxFilterResults` calculation. ([filecoin-project/lotus#12671](https://github.com/filecoin-project/lotus/pull/12671))
910

1011
## New Features
1112

chain/index/events.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/filecoin-project/lotus/chain/types"
2323
)
2424

25+
var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
26+
2527
const maxLookBackForWait = 120 // one hour of tipsets
2628

2729
type executedMessage struct {
@@ -358,9 +360,9 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
358360
if row.id != currentID {
359361
// Unfortunately we can't easily incorporate the max results limit into the query due to the
360362
// unpredictable number of rows caused by joins
361-
// Break here to stop collecting rows
363+
// Error here to inform the caller that we've hit the max results limit
362364
if f.MaxResults > 0 && len(ces) >= f.MaxResults {
363-
break
365+
return nil, ErrMaxResultsReached
364366
}
365367

366368
currentID = row.id
@@ -563,6 +565,9 @@ func makePrefillFilterQuery(f *EventFilter) ([]any, string, error) {
563565
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
564566
}
565567
}
568+
} else if f.Codec != 0 { // if no keys are specified, we can use the codec filter
569+
clauses = append(clauses, "ee.codec=?")
570+
values = append(values, f.Codec)
566571
}
567572

568573
s := `SELECT

chain/index/events_test.go

+269
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/ipfs/go-cid"
13+
"github.com/multiformats/go-multicodec"
1314
"github.com/stretchr/testify/require"
1415

1516
"github.com/filecoin-project/go-address"
@@ -390,6 +391,257 @@ func TestGetEventsFilterByAddress(t *testing.T) {
390391
}
391392
}
392393

394+
func TestGetEventsForFilterWithRawCodec(t *testing.T) {
395+
ctx := context.Background()
396+
seed := time.Now().UnixNano()
397+
t.Logf("seed: %d", seed)
398+
rng := pseudo.New(pseudo.NewSource(seed))
399+
headHeight := abi.ChainEpoch(60)
400+
401+
// Setup the indexer and chain store with the specified head height
402+
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
403+
t.Cleanup(func() { _ = si.Close() })
404+
405+
// Define codec constants (replace with actual multicodec values)
406+
var (
407+
codecRaw = multicodec.Raw
408+
codecCBOR = multicodec.Cbor
409+
)
410+
411+
// Create events with different codecs
412+
evRaw1 := fakeEventWithCodec(
413+
abi.ActorID(1),
414+
[]kv{
415+
{k: "type", v: []byte("approval")},
416+
{k: "signer", v: []byte("addr1")},
417+
},
418+
codecRaw,
419+
)
420+
421+
evCBOR := fakeEventWithCodec(
422+
abi.ActorID(2),
423+
[]kv{
424+
{k: "type", v: []byte("approval")},
425+
{k: "signer", v: []byte("addr2")},
426+
},
427+
codecCBOR,
428+
)
429+
430+
evRaw2 := fakeEventWithCodec(
431+
abi.ActorID(3),
432+
[]kv{
433+
{k: "type", v: []byte("transfer")},
434+
{k: "recipient", v: []byte("addr3")},
435+
},
436+
codecRaw,
437+
)
438+
439+
// Aggregate events
440+
events := []types.Event{*evRaw1, *evCBOR, *evRaw2}
441+
442+
// Create a fake message and associate it with the events
443+
fm := fakeMessage(address.TestAddress, address.TestAddress)
444+
em1 := executedMessage{
445+
msg: fm,
446+
evs: events,
447+
}
448+
449+
// Mock the Actor to Address mapping
450+
si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
451+
idAddr, err := address.NewIDAddress(uint64(emitter))
452+
if err != nil {
453+
return address.Undef, false
454+
}
455+
return idAddr, true
456+
})
457+
458+
// Mock the executed messages loader
459+
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
460+
return []executedMessage{em1}, nil
461+
})
462+
463+
// Create fake tipsets
464+
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)
465+
fakeTipSet2 := fakeTipSet(t, rng, 2, nil)
466+
467+
// Associate tipsets with their heights and CIDs
468+
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1
469+
cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2
470+
cs.SetTipSetByCid(t, fakeTipSet1)
471+
cs.SetTipSetByCid(t, fakeTipSet2)
472+
473+
// Associate messages with tipsets
474+
cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm})
475+
476+
// Apply the indexer to process the tipsets
477+
require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2))
478+
479+
t.Run("FilterEventsByRawCodecWithoutKeys", func(t *testing.T) {
480+
f := &EventFilter{
481+
MinHeight: 1,
482+
MaxHeight: 2,
483+
Codec: codecRaw, // Set to RAW codec
484+
}
485+
486+
// Retrieve events based on the filter
487+
ces, err := si.GetEventsForFilter(ctx, f)
488+
require.NoError(t, err)
489+
490+
// Define expected collected events (only RAW encoded events)
491+
expectedCES := []*CollectedEvent{
492+
{
493+
Entries: evRaw1.Entries,
494+
EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw1.Emitter))),
495+
EventIdx: 0,
496+
Reverted: false,
497+
Height: 1,
498+
TipSetKey: fakeTipSet1.Key(),
499+
MsgIdx: 0,
500+
MsgCid: fm.Cid(),
501+
},
502+
{
503+
Entries: evRaw2.Entries,
504+
EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw2.Emitter))),
505+
EventIdx: 2, // Adjust based on actual indexing
506+
Reverted: false,
507+
Height: 1,
508+
TipSetKey: fakeTipSet1.Key(),
509+
MsgIdx: 0,
510+
MsgCid: fm.Cid(),
511+
},
512+
}
513+
514+
require.Equal(t, expectedCES, ces)
515+
})
516+
}
517+
518+
func TestMaxFilterResults(t *testing.T) {
519+
ctx := context.Background()
520+
seed := time.Now().UnixNano()
521+
t.Logf("seed: %d", seed)
522+
rng := pseudo.New(pseudo.NewSource(seed))
523+
headHeight := abi.ChainEpoch(60)
524+
525+
// Setup the indexer and chain store with the specified head height
526+
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
527+
t.Cleanup(func() { _ = si.Close() })
528+
529+
// Define codec constants (replace with actual multicodec values)
530+
var (
531+
codecRaw = multicodec.Raw
532+
codecCBOR = multicodec.Cbor
533+
)
534+
535+
// Create events with different codecs
536+
evRaw1 := fakeEventWithCodec(
537+
abi.ActorID(1),
538+
[]kv{
539+
{k: "type", v: []byte("approval")},
540+
{k: "signer", v: []byte("addr1")},
541+
},
542+
codecRaw,
543+
)
544+
545+
evCBOR := fakeEventWithCodec(
546+
abi.ActorID(2),
547+
[]kv{
548+
{k: "type", v: []byte("approval")},
549+
{k: "signer", v: []byte("addr2")},
550+
},
551+
codecCBOR,
552+
)
553+
554+
evRaw2 := fakeEventWithCodec(
555+
abi.ActorID(3),
556+
[]kv{
557+
{k: "type", v: []byte("transfer")},
558+
{k: "recipient", v: []byte("addr3")},
559+
},
560+
codecRaw,
561+
)
562+
563+
evRaw3 := fakeEventWithCodec(
564+
abi.ActorID(4),
565+
[]kv{
566+
{k: "type", v: []byte("transfer")},
567+
{k: "recipient", v: []byte("addr4")},
568+
},
569+
codecCBOR,
570+
)
571+
572+
// Aggregate events
573+
events := []types.Event{*evRaw1, *evCBOR, *evRaw2, *evRaw3}
574+
575+
// Create a fake message and associate it with the events
576+
fm := fakeMessage(address.TestAddress, address.TestAddress)
577+
em1 := executedMessage{
578+
msg: fm,
579+
evs: events,
580+
}
581+
582+
// Mock the Actor to Address mapping
583+
si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
584+
idAddr, err := address.NewIDAddress(uint64(emitter))
585+
if err != nil {
586+
return address.Undef, false
587+
}
588+
return idAddr, true
589+
})
590+
591+
// Mock the executed messages loader
592+
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
593+
return []executedMessage{em1}, nil
594+
})
595+
596+
// Create fake tipsets
597+
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)
598+
fakeTipSet2 := fakeTipSet(t, rng, 2, nil)
599+
600+
// Associate tipsets with their heights and CIDs
601+
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1
602+
cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2
603+
cs.SetTipSetByCid(t, fakeTipSet1)
604+
cs.SetTipSetByCid(t, fakeTipSet2)
605+
606+
// Associate messages with tipsets
607+
cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm})
608+
609+
// Apply the indexer to process the tipsets
610+
require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2))
611+
612+
// if we hit max results, we should get an error
613+
// we have total 4 events
614+
testCases := []struct {
615+
name string
616+
maxResults int
617+
expectedCount int
618+
expectedErr string
619+
}{
620+
{name: "no max results", maxResults: 0, expectedCount: 4},
621+
{name: "max result more that total events", maxResults: 10, expectedCount: 4},
622+
{name: "max results less than total events", maxResults: 1, expectedErr: ErrMaxResultsReached.Error()},
623+
}
624+
625+
for _, tc := range testCases {
626+
t.Run(tc.name, func(t *testing.T) {
627+
f := &EventFilter{
628+
MinHeight: 1,
629+
MaxHeight: 2,
630+
MaxResults: tc.maxResults,
631+
}
632+
633+
ces, err := si.GetEventsForFilter(ctx, f)
634+
if tc.expectedErr != "" {
635+
require.Error(t, err)
636+
require.Contains(t, err.Error(), tc.expectedErr)
637+
} else {
638+
require.NoError(t, err)
639+
require.Equal(t, tc.expectedCount, len(ces))
640+
}
641+
})
642+
}
643+
}
644+
393645
func sortAddresses(addrs []address.Address) {
394646
sort.Slice(addrs, func(i, j int) bool {
395647
return addrs[i].String() < addrs[j].String()
@@ -435,6 +687,23 @@ func fakeEvent(emitter abi.ActorID, indexed []kv, unindexed []kv) *types.Event {
435687
return ev
436688
}
437689

690+
func fakeEventWithCodec(emitter abi.ActorID, indexed []kv, codec multicodec.Code) *types.Event {
691+
ev := &types.Event{
692+
Emitter: emitter,
693+
}
694+
695+
for _, in := range indexed {
696+
ev.Entries = append(ev.Entries, types.EventEntry{
697+
Flags: 0x01,
698+
Key: in.k,
699+
Codec: uint64(codec),
700+
Value: in.v,
701+
})
702+
}
703+
704+
return ev
705+
}
706+
438707
type kv struct {
439708
k string
440709
v []byte

chain/index/interface.go

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66

77
"github.com/ipfs/go-cid"
8+
"github.com/multiformats/go-multicodec"
89

910
"github.com/filecoin-project/go-address"
1011
"github.com/filecoin-project/go-state-types/abi"
@@ -47,6 +48,8 @@ type EventFilter struct {
4748

4849
KeysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match
4950
MaxResults int // maximum number of results to collect, 0 is unlimited
51+
52+
Codec multicodec.Code // optional codec filter, only used if KeysWithCodec is not set
5053
}
5154

5255
type Indexer interface {

node/impl/full/eth.go

+1
Original file line numberDiff line numberDiff line change
@@ -1784,6 +1784,7 @@ func (e *EthEventHandler) ethGetEventsForFilter(ctx context.Context, filterSpec
17841784
TipsetCid: pf.tipsetCid,
17851785
Addresses: pf.addresses,
17861786
KeysWithCodec: pf.keys,
1787+
Codec: multicodec.Raw,
17871788
MaxResults: e.EventFilterManager.MaxFilterResults,
17881789
}
17891790

0 commit comments

Comments
 (0)