Skip to content

Commit dafc56c

Browse files
authored
feat(events): compare-amt option for lotus-shed indexes inspect-events (#12571)
Instead of relying just on entry counts, compare the regenerated AMT root using just what we have in the db with the message receipt event root. This should tell us precisely that we have what we should or not. Ref: #12570
1 parent d516f3a commit dafc56c

File tree

2 files changed

+238
-41
lines changed

2 files changed

+238
-41
lines changed

CHANGELOG.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
- Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517))
99
- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575))
1010
- `lotus chain head` now supports a `--height` flag to print just the epoch number of the current chain head ([filecoin-project/lotus#12609](https://github.com/filecoin-project/lotus/pull/12609))
11+
- `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570))
1112

1213
## Bug Fixes
1314
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
1415
- Event APIs (Eth events and actor events) should only return reverted events if client queries by specific block hash / tipset. Eth and actor event subscription APIs should always return reverted events to enable accurate observation of real-time changes. ([filecoin-project/lotus#12585](https://github.com/filecoin-project/lotus/pull/12585))
1516
- Add logic to check if the miner's owner address is delegated (f4 address). If it is delegated, the `lotus-shed sectors termination-estimate` command now sends the termination state call using the worker ID. This fix resolves the issue where termination-estimate did not function correctly for miners with delegated owner addresses. ([filecoin-project/lotus#12569](https://github.com/filecoin-project/lotus/pull/12569))
1617

17-
## Improvements
18-
1918
## Deps
2019

2120
# UNRELEASED Node v1.30.0

cmd/lotus-shed/indexes.go

+237-39
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@ import (
1010
"strings"
1111
"time"
1212

13+
"github.com/ipfs/go-cid"
14+
cbor "github.com/ipfs/go-ipld-cbor"
1315
"github.com/mitchellh/go-homedir"
1416
"github.com/urfave/cli/v2"
17+
cbg "github.com/whyrusleeping/cbor-gen"
1518
"golang.org/x/xerrors"
1619

1720
"github.com/filecoin-project/go-address"
21+
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
1822
"github.com/filecoin-project/go-state-types/abi"
1923
"github.com/filecoin-project/go-state-types/crypto"
2024
"github.com/filecoin-project/go-state-types/exitcode"
2125

2226
lapi "github.com/filecoin-project/lotus/api"
27+
bstore "github.com/filecoin-project/lotus/blockstore"
2328
"github.com/filecoin-project/lotus/chain/types"
2429
"github.com/filecoin-project/lotus/chain/types/ethtypes"
2530
lcli "github.com/filecoin-project/lotus/cli"
@@ -34,6 +39,10 @@ const (
3439
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
3540
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
3641
tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?`
42+
43+
// these queries are used to extract just the information used to reconstruct the event AMT from the database
44+
selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC`
45+
selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC`
3746
)
3847

3948
func withCategory(cat string, cmd *cli.Command) *cli.Command {
@@ -483,87 +492,142 @@ var inspectEventsCmd = &cli.Command{
483492
if err != nil {
484493
return err
485494
}
495+
stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter)
496+
if err != nil {
497+
return err
498+
}
499+
stmtSelectEventEntries, err := db.Prepare(selectEventEntries)
500+
if err != nil {
501+
return err
502+
}
486503

487-
processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error {
488-
tsKeyCid, err := ts.Key().Cid()
504+
processHeight := func(ctx context.Context, messages []lapi.Message, receipts []*types.MessageReceipt) error {
505+
tsKeyCid, err := currTs.Key().Cid()
489506
if err != nil {
490-
return fmt.Errorf("failed to get tipset key cid: %w", err)
507+
return xerrors.Errorf("failed to get tipset key cid: %w", err)
491508
}
492509

493-
var expectEvents int
494-
var expectEntries int
510+
var problems []string
495511

496-
for _, receipt := range receipts {
497-
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
498-
continue
512+
checkEventAndEntryCounts := func() error {
513+
// compare by counting events, using ChainGetEvents to load the events from the chain
514+
expectEvents, expectEntries, err := chainEventAndEntryCountsAt(ctx, currTs, receipts, api)
515+
if err != nil {
516+
return err
499517
}
500-
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
518+
519+
actualEvents, actualEntries, err := dbEventAndEntryCountsAt(currTs, stmtEventCount, stmtEntryCount)
501520
if err != nil {
502-
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)
521+
return err
503522
}
504-
expectEvents += len(events)
505-
for _, event := range events {
506-
expectEntries += len(event.Entries)
523+
524+
if actualEvents != expectEvents {
525+
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
526+
}
527+
if actualEntries != expectEntries {
528+
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
507529
}
530+
531+
return nil
508532
}
509533

510-
var problems []string
534+
// Compare the AMT roots: we reconstruct the event AMT from the database data we have and
535+
// compare it with the on-chain AMT root from the receipt. If it's the same CID then we have
536+
// exactly the same event data. Any variation, in number of events, and even a single byte
537+
// in event data, will be considered a mismatch.
538+
539+
// cache for address -> actorID because it's typical for tipsets to generate many events for
540+
// the same actors so we can try and avoid too many StateLookupID calls
541+
addrIdCache := make(map[address.Address]abi.ActorID)
542+
543+
eventIndex := 0
544+
var hasEvents bool
545+
for msgIndex, receipt := range receipts {
546+
if receipt.EventsRoot == nil {
547+
continue
548+
}
549+
550+
amtRoot, has, problem, err := amtRootForEvents(
551+
ctx,
552+
api,
553+
tsKeyCid,
554+
prevTs.Key(),
555+
stmtSelectEventIdAndEmitter,
556+
stmtSelectEventEntries,
557+
messages[msgIndex],
558+
addrIdCache,
559+
)
560+
if err != nil {
561+
return err
562+
}
563+
if has && !hasEvents {
564+
hasEvents = true
565+
}
566+
567+
if problem != "" {
568+
problems = append(problems, problem)
569+
} else if amtRoot != *receipt.EventsRoot {
570+
problems = append(problems, fmt.Sprintf("events root mismatch for message %s", messages[msgIndex].Cid))
571+
// also provide more information about the mismatch
572+
if err := checkEventAndEntryCounts(); err != nil {
573+
return err
574+
}
575+
}
576+
577+
eventIndex++
578+
}
511579

512580
var seenHeight int
513581
var seenReverted int
514582
if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil {
515583
if err == sql.ErrNoRows {
516-
if expectEvents > 0 {
584+
if hasEvents {
517585
problems = append(problems, "not in events_seen table")
518586
} else {
519587
problems = append(problems, "zero-event epoch not in events_seen table")
520588
}
521589
} else {
522-
return fmt.Errorf("failed to check if tipset is seen: %w", err)
590+
return xerrors.Errorf("failed to check if tipset is seen: %w", err)
523591
}
524592
} else {
525-
if seenHeight != int(ts.Height()) {
593+
if seenHeight != int(currTs.Height()) {
526594
problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight))
527595
}
528596
if seenReverted != 0 {
529597
problems = append(problems, "events_seen marked as reverted")
530598
}
531599
}
532600

533-
var actualEvents int
534-
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
535-
return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
536-
}
537-
var actualEntries int
538-
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
539-
return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
540-
}
541-
542-
if actualEvents != expectEvents {
543-
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
544-
}
545-
if actualEntries != expectEntries {
546-
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
547-
}
548-
549601
if len(problems) > 0 {
550-
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems)
602+
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", currTs.Height(), tsKeyCid, strings.Join(problems, ", "))
551603
} else if logGood {
552-
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid)
604+
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", currTs.Height(), tsKeyCid)
553605
}
554606

555607
return nil
556608
}
557609

558610
for i := 0; ctx.Err() == nil && i < epochs; i++ {
559-
// get receipts for the parent of the previous tipset (which will be currTs)
560-
receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid())
611+
// get receipts and messages for the parent of the previous tipset (which will be currTs)
612+
613+
blockCid := prevTs.Blocks()[0].Cid()
614+
615+
messages, err := api.ChainGetParentMessages(ctx, blockCid)
616+
if err != nil {
617+
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i)
618+
break
619+
}
620+
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
561621
if err != nil {
562622
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i)
563623
break
564624
}
565625

566-
err = processHeight(ctx, currTs, receipts)
626+
if len(messages) != len(receipts) {
627+
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts))
628+
}
629+
630+
err = processHeight(ctx, messages, receipts)
567631
if err != nil {
568632
return err
569633
}
@@ -572,14 +636,148 @@ var inspectEventsCmd = &cli.Command{
572636
prevTs = currTs
573637
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
574638
if err != nil {
575-
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
639+
return xerrors.Errorf("failed to load tipset %s: %w", currTs, err)
576640
}
577641
}
578642

579643
return nil
580644
},
581645
}
582646

647+
// amtRootForEvents generates the events AMT root CID for a given message's events, and returns
648+
// whether the message has events, a string describing any non-fatal problem encountered,
649+
// and a fatal error if one occurred.
650+
func amtRootForEvents(
651+
ctx context.Context,
652+
api lapi.FullNode,
653+
tsKeyCid cid.Cid,
654+
prevTsKey types.TipSetKey,
655+
stmtSelectEventIdAndEmitter, stmtSelectEventEntries *sql.Stmt,
656+
message lapi.Message,
657+
addrIdCache map[address.Address]abi.ActorID,
658+
) (cid.Cid, bool, string, error) {
659+
660+
events := make([]cbg.CBORMarshaler, 0)
661+
662+
rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), message.Cid.Bytes())
663+
if err != nil {
664+
return cid.Undef, false, "", xerrors.Errorf("failed to query events: %w", err)
665+
}
666+
defer func() {
667+
_ = rows.Close()
668+
}()
669+
670+
for rows.Next() {
671+
var eventId int
672+
var emitterAddr []byte
673+
if err := rows.Scan(&eventId, &emitterAddr); err != nil {
674+
return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err)
675+
}
676+
677+
addr, err := address.NewFromBytes(emitterAddr)
678+
if err != nil {
679+
return cid.Undef, false, "", xerrors.Errorf("failed to parse address: %w", err)
680+
}
681+
var actorId abi.ActorID
682+
if id, ok := addrIdCache[addr]; ok {
683+
actorId = id
684+
} else {
685+
if addr.Protocol() != address.ID {
686+
// use the previous tipset (height+1) to do an address lookup because the actor
687+
// may have been created in the current tipset (i.e. deferred execution means the
688+
// changed state isn't available until the next epoch)
689+
idAddr, err := api.StateLookupID(ctx, addr, prevTsKey)
690+
if err != nil {
691+
// TODO: fix this? we should be able to resolve all addresses
692+
return cid.Undef, false, fmt.Sprintf("failed to resolve address (%s), could not compare amt", addr.String()), nil
693+
}
694+
addr = idAddr
695+
}
696+
id, err := address.IDFromAddress(addr)
697+
if err != nil {
698+
return cid.Undef, false, "", xerrors.Errorf("failed to get ID from address: %w", err)
699+
}
700+
actorId = abi.ActorID(id)
701+
addrIdCache[addr] = actorId
702+
}
703+
704+
event := types.Event{
705+
Emitter: actorId,
706+
Entries: make([]types.EventEntry, 0),
707+
}
708+
709+
rows2, err := stmtSelectEventEntries.Query(eventId)
710+
if err != nil {
711+
return cid.Undef, false, "", xerrors.Errorf("failed to query event entries: %w", err)
712+
}
713+
defer func() {
714+
_ = rows2.Close()
715+
}()
716+
717+
for rows2.Next() {
718+
var flags []byte
719+
var key string
720+
var codec uint64
721+
var value []byte
722+
if err := rows2.Scan(&flags, &key, &codec, &value); err != nil {
723+
return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err)
724+
}
725+
entry := types.EventEntry{
726+
Flags: flags[0],
727+
Key: key,
728+
Codec: codec,
729+
Value: value,
730+
}
731+
event.Entries = append(event.Entries, entry)
732+
}
733+
734+
events = append(events, &event)
735+
}
736+
737+
// construct the AMT from our slice to an in-memory IPLD store just so we can get the root,
738+
// we don't need the blocks themselves
739+
root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
740+
if err != nil {
741+
return cid.Undef, false, "", xerrors.Errorf("failed to create AMT: %w", err)
742+
}
743+
return root, len(events) > 0, "", nil
744+
}
745+
746+
func chainEventAndEntryCountsAt(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt, api lapi.FullNode) (int, int, error) {
747+
var expectEvents int
748+
var expectEntries int
749+
for _, receipt := range receipts {
750+
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
751+
continue
752+
}
753+
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
754+
if err != nil {
755+
return 0, 0, xerrors.Errorf("failed to load events for tipset %s: %w", ts, err)
756+
}
757+
expectEvents += len(events)
758+
for _, event := range events {
759+
expectEntries += len(event.Entries)
760+
}
761+
}
762+
return expectEvents, expectEntries, nil
763+
}
764+
765+
func dbEventAndEntryCountsAt(ts *types.TipSet, stmtEventCount, stmtEntryCount *sql.Stmt) (int, int, error) {
766+
tsKeyCid, err := ts.Key().Cid()
767+
if err != nil {
768+
return 0, 0, xerrors.Errorf("failed to get tipset key cid: %w", err)
769+
}
770+
var actualEvents int
771+
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
772+
return 0, 0, xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
773+
}
774+
var actualEntries int
775+
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
776+
return 0, 0, xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
777+
}
778+
return actualEvents, actualEntries, nil
779+
}
780+
583781
var backfillMsgIndexCmd = &cli.Command{
584782
Name: "backfill-msgindex",
585783
Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",

0 commit comments

Comments
 (0)