Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shed): add commands for importing/exporting datastore snapshots #12685

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Implement new `lotus f3` CLI commands to list F3 participants, dump manifest, get/list finality certificates and check the F3 status. ([filecoin-project/lotus#12617](https://github.com/filecoin-project/lotus/pull/12617), [filecoin-project/lotus#12627](https://github.com/filecoin-project/lotus/pull/12627))
- Return a `"data"` field on the `"error"` returned from RPC when `eth_call` and `eth_estimateGas` APIs encounter `execution reverted` errors. ([filecoin-project/lotus#12553](https://github.com/filecoin-project/lotus/pull/12553))
- Implement `EthGetTransactionByBlockNumberAndIndex` (`eth_getTransactionByBlockNumberAndIndex`) and `EthGetTransactionByBlockHashAndIndex` (`eth_getTransactionByBlockHashAndIndex`) methods. ([filecoin-project/lotus#12618](https://github.com/filecoin-project/lotus/pull/12618))
- Add a set of `lotus-shed datastore` commands for importing, exporting, and clearing parts of the datastore ([filecoin-project/lotus#12685](https://github.com/filecoin-project/lotus/pull/12685)):

## Bug Fixes
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
Expand Down
277 changes: 277 additions & 0 deletions cmd/lotus-shed/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -17,10 +18,12 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/polydawn/refmt/cbor"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/multierr"
"golang.org/x/xerrors"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/repo"
)
Expand All @@ -34,6 +37,9 @@ var datastoreCmd = &cli.Command{
datastoreGetCmd,
datastoreRewriteCmd,
datastoreVlog2CarCmd,
datastoreImportCmd,
datastoreExportCmd,
datastoreClearCmd,
},
}

Expand Down Expand Up @@ -106,6 +112,98 @@ var datastoreListCmd = &cli.Command{
},
}

var datastoreClearCmd = &cli.Command{
Name: "clear",
Description: "Clear a part or all of the given datastore.",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.StringFlag{
Name: "prefix",
Usage: "only delete key/values with the given prefix",
Value: "",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "must be specified for the action to take effect",
},
},
ArgsUsage: "[namespace]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix")
}
namespace := cctx.Args().Get(0)

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

dryRun := !cctx.Bool("really-do-it")

query, err := ds.Query(cctx.Context, dsq.Query{
Prefix: cctx.String("prefix"),
})
if err != nil {
return err
}
defer query.Close() //nolint:errcheck

batch, err := ds.Batch(cctx.Context)
if err != nil {
return xerrors.Errorf("failed to create a datastore batch: %w", err)
}

for res, ok := query.NextSync(); ok; res, ok = query.NextSync() {
if res.Error != nil {
return xerrors.Errorf("failed to read from datastore: %w", res.Error)
}
_, _ = fmt.Fprintf(cctx.App.Writer, "deleting: %q\n", res.Key)
if !dryRun {
if err := batch.Delete(cctx.Context, datastore.NewKey(res.Key)); err != nil {
return xerrors.Errorf("failed to delete %q: %w", res.Key, err)
}
}
}

if !dryRun {
if err := batch.Commit(cctx.Context); err != nil {
return xerrors.Errorf("failed to flush the batch: %w", err)
}
} else {
_, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually delete this state.")
}

return nil
},
}

var datastoreGetCmd = &cli.Command{
Name: "get",
Description: "list datastore keys",
Expand Down Expand Up @@ -158,6 +256,185 @@ var datastoreGetCmd = &cli.Command{
},
}

var datastoreExportCmd = &cli.Command{
Name: "export",
Description: "Export part or all of the specified datastore, appending to the specified datastore snapshot.",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.StringFlag{
Name: "prefix",
Usage: "export only keys with the given prefix",
Value: "",
},
},
ArgsUsage: "[namespace filename]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename to which the snapshot will be written")
}
namespace := cctx.Args().Get(0)
fname := cctx.Args().Get(1)

snapshot, err := os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModePerm)
if err != nil {
return xerrors.Errorf("failed to open snapshot: %w", err)
}
defer func() {
_err = multierr.Append(_err, snapshot.Close())
}()

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

query, err := ds.Query(cctx.Context, dsq.Query{
Prefix: cctx.String("prefix"),
})
if err != nil {
return err
}

bufWriter := bufio.NewWriter(snapshot)
snapshotWriter := cbg.NewCborWriter(bufWriter)
for res, ok := query.NextSync(); ok; res, ok = query.NextSync() {
if res.Error != nil {
return xerrors.Errorf("failed to read from datastore: %w", res.Error)
}

entry := shedgen.DatastoreEntry{
Key: []byte(res.Key),
Value: res.Value,
}

_, _ = fmt.Fprintf(cctx.App.Writer, "exporting: %q\n", res.Key)
if err := entry.MarshalCBOR(snapshotWriter); err != nil {
return xerrors.Errorf("failed to write %q to snapshot: %w", res.Key, err)
}
}
if err := bufWriter.Flush(); err != nil {
return xerrors.Errorf("failed to flush snapshot: %w", err)
}

return nil
},
}

var datastoreImportCmd = &cli.Command{
Name: "import",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
},
Description: "Import the specified datastore snapshot.",
ArgsUsage: "[namespace filename]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename of the snapshot to import")
}
namespace := cctx.Args().Get(0)
fname := cctx.Args().Get(1)

snapshot, err := os.Open(fname)
if err != nil {
return xerrors.Errorf("failed to open snapshot: %w", err)
}
defer snapshot.Close() //nolint:errcheck

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

batch, err := ds.Batch(cctx.Context)
if err != nil {
return err
}

dryRun := !cctx.Bool("really-do-it")

snapshotReader := cbg.NewCborReader(bufio.NewReader(snapshot))
for {
var entry shedgen.DatastoreEntry
if err := entry.UnmarshalCBOR(snapshotReader); err != nil {
if errors.Is(err, io.EOF) {
break
}
return xerrors.Errorf("failed to read entry from snapshot: %w", err)
}

_, _ = fmt.Fprintf(cctx.App.Writer, "importing: %q\n", string(entry.Key))

if !dryRun {
key := datastore.NewKey(string(entry.Key))
if err := batch.Put(cctx.Context, key, entry.Value); err != nil {
return xerrors.Errorf("failed to put %q: %w", key, err)
}
}
}

if !dryRun {
if err := batch.Commit(cctx.Context); err != nil {
return xerrors.Errorf("failed to commit batch: %w", err)
}
} else {
_, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually import the datastore snapshot, overwriting any conflicting state.")
}
return nil
},
}

var datastoreBackupCmd = &cli.Command{
Name: "backup",
Description: "manage datastore backups",
Expand Down
Loading
Loading