Skip to content

Commit

Permalink
ledger: add callback to clear state between commitRound retries (#6190)
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Dec 10, 2024
1 parent b0f1396 commit f87ae8a
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 89 deletions.
10 changes: 0 additions & 10 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,6 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 {
return offset
}

func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (ao *onlineAccounts) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (ao *onlineAccounts) handleCommitError(dcc *deferredCommitContext) {
}

func (ao *onlineAccounts) maxBalLookback() uint64 {
lastProtoVersion := ao.onlineRoundParamsData[len(ao.onlineRoundParamsData)-1].CurrentProtocol
return config.Consensus[lastProtoVersion].MaxBalLookback
Expand Down Expand Up @@ -535,9 +528,6 @@ func (ao *onlineAccounts) postCommit(ctx context.Context, dcc *deferredCommitCon
ao.voters.postCommit(dcc)
}

func (ao *onlineAccounts) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

// onlineCirculation return the total online balance for the given round, for use by agreement.
func (ao *onlineAccounts) onlineCirculation(rnd basics.Round, voteRnd basics.Round) (basics.MicroAlgos, error) {
// Get cached total stake for rnd
Expand Down
4 changes: 3 additions & 1 deletion ledger/acctonline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func commitSyncPartialComplete(t *testing.T, oa *onlineAccounts, ml *mockLedgerF
ml.trackers.lastFlushTime = dcc.flushTime

for _, lt := range ml.trackers.trackers {
lt.postCommitUnlocked(ml.trackers.ctx, dcc)
if lt, ok := lt.(trackerCommitLifetimeHandlers); ok {
lt.postCommitUnlocked(ml.trackers.ctx, dcc)
}
}
}

Expand Down
10 changes: 0 additions & 10 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,13 +1483,6 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro
return off, nil
}

func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (au *accountUpdates) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (au *accountUpdates) handleCommitError(dcc *deferredCommitContext) {
}

// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly.
func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error {
if au.logAccountUpdatesMetrics {
Expand Down Expand Up @@ -1745,9 +1738,6 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
}
}

func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

// compactKvDeltas takes an array of StateDeltas containing kv deltas (one array entry per round), and
// compacts the array into a single map that contains all the
// changes. Intermediate changes are eliminated. It counts the number of
Expand Down
2 changes: 0 additions & 2 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2044,7 +2044,6 @@ func TestAcctUpdatesResources(t *testing.T) {
require.NoError(t, err)
ml.trackers.dbRound = newBase
au.postCommit(ml.trackers.ctx, dcc)
au.postCommitUnlocked(ml.trackers.ctx, dcc)
}()

}
Expand Down Expand Up @@ -2330,7 +2329,6 @@ func auCommitSync(t *testing.T, rnd basics.Round, au *accountUpdates, ml *mockLe
require.NoError(t, err)
ml.trackers.dbRound = newBase
au.postCommit(ml.trackers.ctx, dcc)
au.postCommitUnlocked(ml.trackers.ctx, dcc)
}()
}
}
Expand Down
10 changes: 0 additions & 10 deletions ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,6 @@ func (b *bulletin) commitRound(context.Context, trackerdb.TransactionScope, *def
func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (b *bulletin) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (b *bulletin) handleCommitError(dcc *deferredCommitContext) {
}

func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}
8 changes: 8 additions & 0 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,14 @@ func (ct *catchpointTracker) handlePrepareCommitError(dcc *deferredCommitContext
ct.cancelWrite(dcc)
}

// if an error is encountered between retries, clear the balancesTrie to clear in-memory changes made in commitRound().
func (ct *catchpointTracker) clearCommitRoundRetry(ctx context.Context, dcc *deferredCommitContext) {
ct.log.Infof("rolling back failed commitRound for oldBase %d offset %d, clearing balancesTrie", dcc.oldBase, dcc.offset)
ct.catchpointsMu.Lock()
ct.balancesTrie = nil // balancesTrie will be re-created in the next call to commitRound
ct.catchpointsMu.Unlock()
}

// if an error is encountered during commit, cancel writing and clear the balances trie
func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) {
// in cases where the commitRound fails, it is not certain that the merkle trie is in a clean state, and should be cleared.
Expand Down
63 changes: 63 additions & 0 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ledger

import (
"bytes"
"context"
"encoding/hex"
"errors"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/txntest"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/trackerdb"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
Expand All @@ -48,6 +50,9 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
)

// assert catchpointTracker implements the trackerCommitLifetimeHandlers interface
var _ trackerCommitLifetimeHandlers = &catchpointTracker{}

func TestCatchpointIsWritingCatchpointFile(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -2094,3 +2099,61 @@ func TestMakeCatchpointFilePath(t *testing.T) {
}

}

// Test a case where in-memory SQLite, combined with fast locking (improved performance, or no
// deadlock detection) and concurrent reads (from transaction evaluation, stake lookups, etc) can
// cause the SQLite implementation in util/db/dbutil.go to retry the function looping over all
// tracker commitRound implementations. Since catchpointtracker' commitRound updates a merkle trie's
// DB storage and its in-memory cache, the retry can cause the the balancesTrie's cache to become
// corrupted and out of sync with the DB (which uses transaction rollback between retries). The
// merkle trie corruption manifests as error log messages like:
// - "attempted to add duplicate hash 'X' to merkle trie for account Y"
// - "failed to delete hash 'X' from merkle trie for account Y"
//
// So we assert that those errors do not occur after the fix in #6190.
//
//nolint:paralleltest // deadlock detection is globally disabled, so this test is not parallel-safe
func TestCatchpointTrackerFastRoundsDBRetry(t *testing.T) {
partitiontest.PartitionTest(t)

var bufNewLogger bytes.Buffer
log := logging.NewLogger()
log.SetOutput(&bufNewLogger)

// disabling deadlock detection globally causes the race detector to go off, but this
// bug can still happen even when deadlock detection is not disabled
//deadlock.Opts.Disable = true // disable deadlock detection during this test
//defer func() { deadlock.Opts.Disable = false }()

genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(cfg *ledgertesting.GenesisCfg) {
cfg.OnlineCount = 1
ledgertesting.TurnOffRewards(cfg)
})
cfg := config.GetDefaultLocal()
dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg, simpleLedgerLogger(log)) // in-memory SQLite
defer dl.Close()

appSrc := main(`int 1; int 1; ==; assert`)
app := dl.fundedApp(addrs[1], 1_000_000, appSrc)

makeTxn := func() *txntest.Txn {
return &txntest.Txn{
Type: "appl",
Sender: addrs[2],
ApplicationID: app,
Note: ledgertesting.RandomNote(),
}
}

for vb := dl.fullBlock(makeTxn()); vb.Block().Round() <= 1500; vb = dl.fullBlock(makeTxn()) {
nextRnd := vb.Block().Round() + 1
_, err := dl.generator.OnlineCirculation(nextRnd.SubSaturate(320), nextRnd)
require.NoError(t, err)
require.Empty(t, vb.Block().ExpiredParticipationAccounts)
require.Empty(t, vb.Block().AbsentParticipationAccounts)
}

// assert that no corruption of merkle trie happened due to DB retries leaving
// incorrect state in the merkle trie cache.
require.NotContains(t, bufNewLogger.String(), "to merkle trie for account", "Merkle trie was corrupted!")
}
10 changes: 0 additions & 10 deletions ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,6 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon
mt.ledgerDBRound.Set(uint64(dcc.newBase()))
}

func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (mt *metricsTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (mt *metricsTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}
10 changes: 0 additions & 10 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,6 @@ func (bn *blockNotifier) commitRound(context.Context, trackerdb.TransactionScope
func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bn *blockNotifier) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bn *blockNotifier) handleCommitError(dcc *deferredCommitContext) {
}

func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}
11 changes: 10 additions & 1 deletion ledger/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type simpleLedgerCfg struct {
onDisk bool // default is in-memory
notArchival bool // default is archival
logger logging.Logger
}

type simpleLedgerOption func(*simpleLedgerCfg)
Expand All @@ -54,6 +55,10 @@ func simpleLedgerNotArchival() simpleLedgerOption {
return func(cfg *simpleLedgerCfg) { cfg.notArchival = true }
}

func simpleLedgerLogger(l logging.Logger) simpleLedgerOption {
return func(cfg *simpleLedgerCfg) { cfg.logger = l }
}

func newSimpleLedgerWithConsensusVersion(t testing.TB, balances bookkeeping.GenesisBalances, cv protocol.ConsensusVersion, cfg config.Local, opts ...simpleLedgerOption) *Ledger {
var genHash crypto.Digest
crypto.RandBytes(genHash[:])
Expand All @@ -72,7 +77,11 @@ func newSimpleLedgerFull(t testing.TB, balances bookkeeping.GenesisBalances, cv
dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbName = strings.Replace(dbName, "/", "_", -1)
cfg.Archival = !slCfg.notArchival
l, err := OpenLedger(logging.Base(), dbName, !slCfg.onDisk, ledgercore.InitState{
log := slCfg.logger
if log == nil {
log = logging.Base()
}
l, err := OpenLedger(log, dbName, !slCfg.onDisk, ledgercore.InitState{
Block: genBlock,
Accounts: balances.Balances,
GenesisHash: genHash,
Expand Down
10 changes: 0 additions & 10 deletions ledger/spverificationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom
spt.pendingDeleteContexts = spt.pendingDeleteContexts[dcc.spVerification.lastDeleteIndex+1:]
}

func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) {
}

func (spt *spVerificationTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (spt *spVerificationTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (spt *spVerificationTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (spt *spVerificationTracker) close() {
}

Expand Down
1 change: 0 additions & 1 deletion ledger/spverificationtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func mockCommit(t *testing.T, spt *spVerificationTracker, ml *mockLedgerForTrack
postCommitCtx, cancel := context.WithCancel(context.Background())
defer cancel()
spt.postCommit(postCommitCtx, &dcc)
spt.postCommitUnlocked(postCommitCtx, &dcc)
}

func genesisBlock() *blockEntry {
Expand Down
21 changes: 21 additions & 0 deletions ledger/store/trackerdb/dualdriver/dualdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/algorand/go-algorand/ledger/store/trackerdb"
Expand Down Expand Up @@ -123,6 +124,10 @@ func (s *trackerStore) Transaction(fn trackerdb.TransactionFn) (err error) {
return s.TransactionContext(context.Background(), fn)
}

func (s *trackerStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) error {
return s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn)
}

func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) error {
handle, err := s.BeginTransaction(ctx)
if err != nil {
Expand All @@ -138,6 +143,22 @@ func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.Tran
return handle.Commit()
}

func (s *trackerStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) error {
var wg sync.WaitGroup
wg.Add(2)
var pErr, sErr error
go func() {
pErr = s.primary.TransactionContextWithRetryClearFn(ctx, fn, rollbackFn)
wg.Done()
}()
go func() {
sErr = s.secondary.TransactionContextWithRetryClearFn(ctx, fn, rollbackFn)
wg.Done()
}()
wg.Wait()
return coalesceErrors(pErr, sErr)
}

func (s *trackerStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) {
primary, err := s.primary.BeginTransaction(ctx)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions ledger/store/trackerdb/pebbledbdriver/pebbledriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ func (s *trackerStore) Transaction(fn trackerdb.TransactionFn) (err error) {
return s.TransactionContext(context.Background(), fn)
}

// TransactionWithRetryClearFn implements trackerdb.Store
func (s *trackerStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) {
return s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn)
}

// TransactionContext implements trackerdb.Store
func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) {
handle, err := s.BeginTransaction(ctx)
Expand All @@ -345,6 +350,13 @@ func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.Tran
return err
}

// TransactionContextWithRetryClearFn implements trackerdb.Store.
// It ignores the RetryClearFn, since it does not need to retry
// transactions to work around SQLite issues like the sqlitedriver.
func (s *trackerStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, _ trackerdb.RetryClearFn) error {
return s.TransactionContext(ctx, fn)
}

// BeginTransaction implements trackerdb.Store
func (s *trackerStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) {
scope := transactionScope{
Expand Down
10 changes: 10 additions & 0 deletions ledger/store/trackerdb/sqlitedriver/sqlitedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,22 @@ func (s *trackerSQLStore) Transaction(fn trackerdb.TransactionFn) (err error) {
return wrapIOError(s.TransactionContext(context.Background(), fn))
}

func (s *trackerSQLStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) {
return wrapIOError(s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn))
}

func (s *trackerSQLStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) {
return wrapIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}})
}))
}

func (s *trackerSQLStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) {
return wrapIOError(s.pair.Wdb.AtomicContextWithRetryClearFn(ctx, func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}})
}, rollbackFn))
}

func (s *trackerSQLStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) {
handle, err := s.pair.Wdb.Handle.BeginTx(ctx, nil)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions ledger/store/trackerdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type Store interface {
BeginSnapshot(ctx context.Context) (Snapshot, error)
// transaction support
Transaction(fn TransactionFn) (err error)
TransactionWithRetryClearFn(TransactionFn, RetryClearFn) (err error)
TransactionContext(ctx context.Context, fn TransactionFn) (err error)
TransactionContextWithRetryClearFn(context.Context, TransactionFn, RetryClearFn) (err error)
BeginTransaction(ctx context.Context) (Transaction, error)
// maintenance
Vacuum(ctx context.Context) (stats db.VacuumStats, err error)
Expand Down Expand Up @@ -153,3 +155,6 @@ type SnapshotFn func(ctx context.Context, tx SnapshotScope) error

// TransactionFn is the callback lambda used in `Transaction`.
type TransactionFn func(ctx context.Context, tx TransactionScope) error

// RetryClearFn is the rollback callback lambda used in `TransactionWithRetryClearFn`.
type RetryClearFn func(ctx context.Context)
Loading

0 comments on commit f87ae8a

Please sign in to comment.