diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 439906a483..5f014985c6 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1700,3 +1700,152 @@ func TestConsecutiveVersion(t *testing.T) { protocol.ConsensusV21, } } + +// This test attempts to cover the case when an accountUpdates.lookupX method: +// - can't find the requested address, +// - falls through looking at deltas and the LRU accounts cache, +// - then hits the database (calling accountsDbQueries.lookup) +// only to discover that the round stored in the database (committed in accountUpdates.commitRound) +// is out of sync with accountUpdates.cachedDBRound (updated a little bit later in accountUpdates.postCommit). +// +// In this case it waits on a condition variable and retries when +// commitSyncer/accountUpdates has advanced the cachedDBRound. +func TestAcctUpdatesLookupRetry(t *testing.T) { + partitiontest.PartitionTest(t) + + testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupRetry") + proto := config.Consensus[protocol.ConsensusCurrentVersion] + proto.MaxBalLookback = 10 + config.Consensus[testProtocolVersion] = proto + defer func() { + delete(config.Consensus, testProtocolVersion) + }() + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + rewardsLevels := []uint64{0} + + pooldata := basics.AccountData{} + pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + pooldata.Status = basics.NotParticipating + accts[0][testPoolAddr] = pooldata + + sinkdata := basics.AccountData{} + sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + sinkdata.Status = basics.NotParticipating + accts[0][testSinkAddr] = sinkdata + + ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts) + defer ml.Close() + + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") + defer au.close() + + // cover 10 genesis blocks + rewardLevel := uint64(0) + for i := 1; i < 10; i++ { + accts = append(accts, accts[0]) + rewardsLevels = append(rewardsLevels, rewardLevel) + } + + checkAcctUpdates(t, au, 0, 9, accts, rewardsLevels, proto) + + // lastCreatableID stores asset or app max used index to get rid of conflicts + lastCreatableID := crypto.RandUint64() % 512 + knownCreatables := make(map[basics.CreatableIndex]bool) + + for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ { + rewardLevelDelta := crypto.RandUint64() % 5 + rewardLevel += rewardLevelDelta + var updates ledgercore.AccountDeltas + var totals map[basics.Address]basics.AccountData + base := accts[i-1] + updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) + prevTotals, err := au.Totals(basics.Round(i - 1)) + require.NoError(t, err) + + newPool := totals[testPoolAddr] + newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta + updates.Upsert(testPoolAddr, newPool) + totals[testPoolAddr] = newPool + + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: basics.Round(i), + }, + } + blk.RewardsLevel = rewardLevel + blk.CurrentProtocol = testProtocolVersion + + delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) + delta.Accts.MergeAccounts(updates) + delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) + delta.Totals = accumulateTotals(t, testProtocolVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel) + au.newBlock(blk, delta) + accts = append(accts, totals) + rewardsLevels = append(rewardsLevels, rewardLevel) + + checkAcctUpdates(t, au, 0, i, accts, rewardsLevels, proto) + } + + flushRound := func(i basics.Round) { + // Clear the timer to ensure a flush + ml.trackers.lastFlushTime = time.Time{} + + ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i) + ml.trackers.waitAccountsWriting() + } + + // flush a couple of rounds (indirectly schedules commitSyncer) + flushRound(basics.Round(0)) + flushRound(basics.Round(1)) + + // add stallingTracker to list of trackers + stallingTracker := &blockingTracker{ + postCommitUnlockedEntryLock: make(chan struct{}), + postCommitUnlockedReleaseLock: make(chan struct{}), + postCommitEntryLock: make(chan struct{}), + postCommitReleaseLock: make(chan struct{}), + alwaysLock: true, + } + ml.trackers.trackers = append([]ledgerTracker{stallingTracker}, ml.trackers.trackers...) + + // kick off another round + go flushRound(basics.Round(2)) + + // let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock) + // this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase + <-stallingTracker.postCommitEntryLock + + // prune the baseAccounts cache, so that lookup will fall through to the DB + au.accountsMu.Lock() + au.baseAccounts.prune(0) + au.accountsMu.Unlock() + + rnd := basics.Round(2) + + // grab any address and data to use for call to lookup + var addr basics.Address + var data basics.AccountData + for a, d := range accts[rnd] { + addr = a + data = d + break + } + + // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait() + go func() { + time.Sleep(200 * time.Millisecond) + stallingTracker.postCommitReleaseLock <- struct{}{} + }() + + // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound + d, validThrough, err := au.LookupWithoutRewards(rnd, addr) + require.NoError(t, err) + require.Equal(t, d, data) + require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) + + // allow the postCommitUnlocked() handler to go through + <-stallingTracker.postCommitUnlockedEntryLock + stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} +} diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 983afed1d0..b5778691e0 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -427,6 +427,7 @@ type blockingTracker struct { postCommitEntryLock chan struct{} postCommitReleaseLock chan struct{} committedUpToRound int64 + alwaysLock bool } // loadFromDisk is not implemented in the blockingTracker. @@ -461,7 +462,7 @@ func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommit // postCommit implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { - if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") { bt.postCommitEntryLock <- struct{}{} <-bt.postCommitReleaseLock } @@ -469,7 +470,7 @@ func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitCo // postCommitUnlocked implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { - if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") { bt.postCommitUnlockedEntryLock <- struct{}{} <-bt.postCommitUnlockedReleaseLock }