Skip to content

Commit 9e2f56b

Browse files
authored
Merge pull request #3226 from filecoin-project/fix/mpool-state-inconsistencies
Address potential mpool state inconsistencies
2 parents 5c315b4 + 817358f commit 9e2f56b

File tree

6 files changed

+119
-9
lines changed

6 files changed

+119
-9
lines changed

api/api_full.go

+3
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ type FullNode interface {
184184
MpoolGetNonce(context.Context, address.Address) (uint64, error)
185185
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
186186

187+
// MpoolClear clears pending messages from the mpool
188+
MpoolClear(context.Context, bool) error
189+
187190
// MpoolGetConfig returns (a copy of) the current mpool config
188191
MpoolGetConfig(context.Context) (*types.MpoolConfig, error)
189192
// MpoolSetConfig sets the mpool config to (a copy of) the supplied config

api/apistruct/struct.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ type FullNodeStruct struct {
106106

107107
MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"`
108108

109-
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
109+
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
110+
MpoolClear func(context.Context, bool) error `perm:"write"`
111+
110112
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
111113
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
112114
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
@@ -494,6 +496,10 @@ func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey)
494496
return c.Internal.MpoolPending(ctx, tsk)
495497
}
496498

499+
func (c *FullNodeStruct) MpoolClear(ctx context.Context, local bool) error {
500+
return c.Internal.MpoolClear(ctx, local)
501+
}
502+
497503
func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
498504
return c.Internal.MpoolPush(ctx, smsg)
499505
}

chain/messagepool/messagepool.go

+56-8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/filecoin-project/specs-actors/actors/abi"
1414
"github.com/filecoin-project/specs-actors/actors/crypto"
15+
"github.com/hashicorp/go-multierror"
1516
lru "github.com/hashicorp/golang-lru"
1617
"github.com/ipfs/go-cid"
1718
"github.com/ipfs/go-datastore"
@@ -744,30 +745,42 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
744745
}
745746
}
746747

748+
var merr error
749+
747750
for _, ts := range revert {
748751
pts, err := mp.api.LoadTipSet(ts.Parents())
749752
if err != nil {
750-
return err
753+
log.Errorf("error loading reverted tipset parent: %s", err)
754+
merr = multierror.Append(merr, err)
755+
continue
751756
}
752757

758+
mp.curTs = pts
759+
753760
msgs, err := mp.MessagesForBlocks(ts.Blocks())
754761
if err != nil {
755-
return err
762+
log.Errorf("error retrieving messages for reverted block: %s", err)
763+
merr = multierror.Append(merr, err)
764+
continue
756765
}
757766

758-
mp.curTs = pts
759-
760767
for _, msg := range msgs {
761768
add(msg)
762769
}
763770
}
764771

765772
for _, ts := range apply {
773+
mp.curTs = ts
774+
766775
for _, b := range ts.Blocks() {
767776
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
768777
if err != nil {
769-
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
778+
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
779+
log.Errorf("error retrieving messages for block: %s", xerr)
780+
merr = multierror.Append(merr, xerr)
781+
continue
770782
}
783+
771784
for _, msg := range smsgs {
772785
rm(msg.Message.From, msg.Message.Nonce)
773786
maybeRepub(msg.Cid())
@@ -778,8 +791,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
778791
maybeRepub(msg.Cid())
779792
}
780793
}
781-
782-
mp.curTs = ts
783794
}
784795

785796
if repubTrigger {
@@ -862,7 +873,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
862873
}
863874
}
864875

865-
return nil
876+
return merr
866877
}
867878

868879
type statBucket struct {
@@ -962,3 +973,40 @@ func (mp *MessagePool) loadLocal() error {
962973

963974
return nil
964975
}
976+
977+
func (mp *MessagePool) Clear(local bool) {
978+
mp.lk.Lock()
979+
defer mp.lk.Unlock()
980+
981+
// remove everything if local is true, including removing local messages from
982+
// the datastore
983+
if local {
984+
for a := range mp.localAddrs {
985+
mset, ok := mp.pending[a]
986+
if !ok {
987+
continue
988+
}
989+
990+
for _, m := range mset.msgs {
991+
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
992+
if err != nil {
993+
log.Warnf("error deleting local message: %s", err)
994+
}
995+
}
996+
}
997+
998+
mp.pending = make(map[address.Address]*msgSet)
999+
mp.republished = nil
1000+
1001+
return
1002+
}
1003+
1004+
// remove everything except the local messages
1005+
for a := range mp.pending {
1006+
_, isLocal := mp.localAddrs[a]
1007+
if isLocal {
1008+
continue
1009+
}
1010+
delete(mp.pending, a)
1011+
}
1012+
}

cli/mpool.go

+33
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var mpoolCmd = &cli.Command{
2020
Usage: "Manage message pool",
2121
Subcommands: []*cli.Command{
2222
mpoolPending,
23+
mpoolClear,
2324
mpoolSub,
2425
mpoolStat,
2526
mpoolReplaceCmd,
@@ -83,6 +84,38 @@ var mpoolPending = &cli.Command{
8384
},
8485
}
8586

87+
var mpoolClear = &cli.Command{
88+
Name: "clear",
89+
Usage: "Clear all pending messages from the mpool (USE WITH CARE)",
90+
Flags: []cli.Flag{
91+
&cli.BoolFlag{
92+
Name: "local",
93+
Usage: "also clear local messages",
94+
},
95+
&cli.BoolFlag{
96+
Name: "really-do-it",
97+
Usage: "must be specified for the action to take effect",
98+
},
99+
},
100+
Action: func(cctx *cli.Context) error {
101+
api, closer, err := GetFullNodeAPI(cctx)
102+
if err != nil {
103+
return err
104+
}
105+
defer closer()
106+
107+
really := cctx.Bool("really-do-it")
108+
if !really {
109+
return fmt.Errorf("--really-do-it must be specified for this action to have an effect; you have been warned.")
110+
}
111+
112+
local := cctx.Bool("local")
113+
114+
ctx := ReqContext(cctx)
115+
return api.MpoolClear(ctx, local)
116+
},
117+
}
118+
86119
var mpoolSub = &cli.Command{
87120
Name: "sub",
88121
Usage: "Subscribe to mpool changes",

documentation/en/mpool.md

+15
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The full node API defines the following methods for interacting with the mpool:
2020
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
2121
MpoolGetConfig(context.Context) (*types.MpoolConfig, error)
2222
MpoolSetConfig(context.Context, *types.MpoolConfig) error
23+
MpoolClear(context.Context, local bool) error
2324
```
2425

2526
### MpoolPending
@@ -61,6 +62,13 @@ Returns (a copy of) the current mpool configuration.
6162

6263
Sets the mpool configuration to (a copy of) the supplied configuration object.
6364

65+
### MpoolClear
66+
67+
Clears pending messages from the mpool; if `local` is `true` then local messages are also cleared and removed from the datastore.
68+
69+
This should be used with extreme care and only in the case of errors during head changes that
70+
would leave the mpool in an inconsistent state.
71+
6472

6573
## Command Line Interfae
6674

@@ -75,6 +83,7 @@ lotus mpool stat [--local]
7583
lotus mpool replace [--gas-feecap <feecap>] [--gas-premium <premium>] [--gas-limit <limit>] [from] [nonce]
7684
lotus mpool find [--from <address>] [--to <address>] [--method <int>]
7785
lotus mpool config [<configuration>]
86+
lotus mpool clear [--local]
7887
```
7988

8089
### lotus mpool pending
@@ -98,6 +107,12 @@ Searches for messages in the mpool.
98107
### lotus mpool config
99108
Gets or sets the current mpool configuration.
100109

110+
### lotus mpool clear
111+
Unconditionally clears pending messages from the mpool.
112+
If the `--local` flag is passed, then local messages are also cleared; otherwise local messages are retained.
113+
114+
*Warning*: this command should only be used in the case of head change errors leaving the mpool in an state.
115+
101116
## Configuration
102117

103118
The mpool a few parameters that can be configured by the user, either through the API

node/impl/full/mpool.go

+5
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
105105
}
106106
}
107107

108+
func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
109+
a.Mpool.Clear(local)
110+
return nil
111+
}
112+
108113
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
109114
return a.Mpool.Push(smsg)
110115
}

0 commit comments

Comments
 (0)