Skip to content

Commit

Permalink
feat(1-1-restore): sets tombstone_gc mode to repair
Browse files Browse the repository at this point in the history
This sets tombstone_gc mode to 'repair' to avoid data resurrection.
If tables mode is already 'repair', 'disabled' or 'immediate', then it won't be changed.

Refs: #4258
  • Loading branch information
VAveryanov8 committed Feb 19, 2025
1 parent 7847914 commit 4937c0d
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 0 deletions.
47 changes: 47 additions & 0 deletions pkg/service/one2onerestore/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package one2onerestore
import (
"fmt"
"os"
"strings"
"testing"

"github.com/pkg/errors"
Expand All @@ -27,6 +28,7 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {

clusterSession := CreateSessionAndDropAllKeyspaces(t, h.client)

// Setup schema and data
ksName := "testrestore"
WriteData(t, clusterSession, ksName, 10)
mvName := "testmv"
Expand Down Expand Up @@ -73,6 +75,20 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
if srcCntMV != dstCntMV {
t.Fatalf("Expected row count in materialized view %d, but got %d", srcCntMV, dstCntMV)
}

// Ensure table's tombstone_gc mode is set to 'repair'
w, _ := newTestWorker(t, ManagedClusterHosts())
mode, err := w.getTableTombstoneGCMode(ksName, BigTableName)
if err != nil {
t.Fatalf("Get table tombstone_gc mode: %v", err)
}
if mode != modeRepair {
t.Fatalf("Expected repair mode, but got %s", string(mode))
}

if mode = getViewTombstoneGCMode(t, clusterSession, ksName, mvName); mode != modeRepair {
t.Fatalf("Expected repair mode, but got %s", string(mode))
}
}

func truncateAllTablesInKeyspace(tb testing.TB, session gocqlx.Session, ks string) {
Expand Down Expand Up @@ -105,3 +121,34 @@ func rowCount(t *testing.T, s gocqlx.Session, ks, tab string) int {
Printf("%s.%s row count: %v", ks, tab, cnt)
return cnt
}

func getViewTombstoneGCMode(t *testing.T, clusterSession gocqlx.Session, keyspace, view string) tombstoneGCMode {
t.Helper()
var ext map[string]string
q := qb.Select("system_schema.views").
Columns("extensions").
Where(qb.Eq("keyspace_name"), qb.Eq("view_name")).
Query(clusterSession).
Bind(keyspace, view)

defer q.Release()
err := q.Scan(&ext)
if err != nil {
t.Fatalf("scan: %v", err)
}

// Timeout (just using gc_grace_seconds) is the default mode
mode, ok := ext["tombstone_gc"]
if !ok {
return modeTimeout
}

allModes := []tombstoneGCMode{modeDisabled, modeTimeout, modeRepair, modeImmediate}
for _, m := range allModes {
if strings.Contains(mode, string(m)) {
return m
}
}
t.Fatalf("unrecognized tombstone_gc mode: %s", mode)
return ""
}
4 changes: 4 additions & 0 deletions pkg/service/one2onerestore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type worker struct {

// restore is an actual 1-1-restore stages.
func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Target) (err error) {
if err := w.setTombstoneGCModeRepair(ctx, workload, target.Keyspace); err != nil {
return errors.Wrap(err, "tombstone_gc mode")
}

views, err := w.dropViews(ctx, workload, target.Keyspace)
if err != nil {
return errors.Wrap(err, "drop views")
Expand Down
181 changes: 181 additions & 0 deletions pkg/service/one2onerestore/worker_tgc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (C) 2025 ScyllaDB

package one2onerestore

import (
"context"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"

"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
)

// Docs: https://docs.scylladb.com/stable/cql/ddl.html#tombstones-gc-options.
type tombstoneGCMode string

const (
modeDisabled tombstoneGCMode = "disabled"
modeTimeout tombstoneGCMode = "timeout"
modeRepair tombstoneGCMode = "repair"
modeImmediate tombstoneGCMode = "immediate"
)

// setTombstoneGCModeRepair sets tombstone gc mode to repair to avoid data resurrection issues during restore.
func (w *worker) setTombstoneGCModeRepair(ctx context.Context, workload []hostWorkload, keyspaceFilter []string) error {
w.awaitSchemaAgreement(ctx, w.clusterSession)

type backupTable struct {
keyspace, table string
}

tablesToRestore := map[backupTable]struct{}{}

for _, wl := range workload {
err := wl.manifestContent.ForEachIndexIter(keyspaceFilter, func(fm backupspec.FilesMeta) {
tablesToRestore[backupTable{keyspace: fm.Keyspace, table: fm.Table}] = struct{}{}
})
if err != nil {
return errors.Wrap(err, "manifest content files")
}
}

for table := range tablesToRestore {
mode, err := w.getTableTombstoneGCMode(table.keyspace, table.table)
if err != nil {
return errors.Wrap(err, "get tombstone_gc mode")
}
// No need to change tombstone gc mode.
if mode == modeDisabled || mode == modeImmediate || mode == modeRepair {
w.logger.Info(ctx, "Skipping set tombstone_gc mode", "table", table, "mode", mode)
continue
}
if err := w.setTableTombstoneGCMode(ctx, table.keyspace, table.table, modeRepair); err != nil {
return errors.Wrap(err, "set tombstone_gc mode repair")
}
}

return nil
}

// getTableTombstoneGCMode returns table's tombstone_gc mode.
func (w *worker) getTableTombstoneGCMode(keyspace, table string) (tombstoneGCMode, error) {
var ext map[string]string
q := qb.Select("system_schema.tables").
Columns("extensions").
Where(qb.Eq("keyspace_name"), qb.Eq("table_name")).
Query(w.clusterSession).
Bind(keyspace, table)

defer q.Release()
err := q.Scan(&ext)
if err != nil {
return "", err
}

// Timeout (just using gc_grace_seconds) is the default mode
mode, ok := ext["tombstone_gc"]
if !ok {
return modeTimeout, nil
}

allModes := []tombstoneGCMode{modeDisabled, modeTimeout, modeRepair, modeImmediate}
for _, m := range allModes {
if strings.Contains(mode, string(m)) {
return m, nil
}
}
return "", errors.Errorf("unrecognized tombstone_gc mode: %s", mode)
}

// setTableTombstoneGCMode alters 'tombstone_gc' mode.
func (w *worker) setTableTombstoneGCMode(ctx context.Context, keyspace, table string, mode tombstoneGCMode) error {
w.logger.Info(ctx, "Alter table's tombstone_gc mode",
"keyspace", keyspace,
"table", table,
)

op := func() error {
return w.clusterSession.ExecStmt(alterTableTombstoneGCStmt(keyspace, table, mode))
}

notify := func(err error, wait time.Duration) {
w.logger.Info(ctx, "Altering table's tombstone_gc mode failed",
"keyspace", keyspace,
"table", table,
"error", err,
"wait", wait,
)
}

return alterSchemaRetryWrapper(ctx, op, notify)
}

func alterTableTombstoneGCStmt(keyspace, table string, mode tombstoneGCMode) string {
return fmt.Sprintf(`ALTER TABLE %q.%q WITH tombstone_gc = {'mode': '%s'}`, keyspace, table, mode)
}

func (w *worker) awaitSchemaAgreement(ctx context.Context, clusterSession gocqlx.Session) {
w.logger.Info(ctx, "Awaiting schema agreement...")

var stepError error
defer func(start time.Time) {
if stepError != nil {
w.logger.Error(ctx, "Awaiting schema agreement failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.logger.Info(ctx, "Done awaiting schema agreement", "duration", timeutc.Since(start))
}
}(timeutc.Now())

const (
waitMin = 15 * time.Second // nolint: revive
waitMax = 1 * time.Minute
maxElapsedTime = 15 * time.Minute
multiplier = 2
jitter = 0.2
)

backoff := retry.NewExponentialBackoff(
waitMin,
maxElapsedTime,
waitMax,
multiplier,
jitter,
)

notify := func(err error, wait time.Duration) {
w.logger.Info(ctx, "Schema agreement not reached, retrying...", "error", err, "wait", wait)
}

const (
peerSchemasStmt = "SELECT schema_version FROM system.peers"
localSchemaStmt = "SELECT schema_version FROM system.local WHERE key='local'"
)

stepError = retry.WithNotify(ctx, func() error {
var v []string
if err := clusterSession.Query(peerSchemasStmt, nil).SelectRelease(&v); err != nil {
return retry.Permanent(err)
}
var lv string
if err := clusterSession.Query(localSchemaStmt, nil).GetRelease(&lv); err != nil {
return retry.Permanent(err)
}

// Join all versions
m := strset.New(v...)
m.Add(lv)
if m.Size() > 1 {
return errors.Errorf("cluster schema versions not consistent: %s", m.List())
}

return nil
}, backoff, notify)
}

0 comments on commit 4937c0d

Please sign in to comment.