Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test to exercise lookup corner cases #3376

Merged
merged 3 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,145 @@ func TestConsecutiveVersion(t *testing.T) {
protocol.ConsensusV21,
}
}

func TestAcctUpdatesLookupRetry(t *testing.T) {
partitiontest.PartitionTest(t)

testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupRetry")
proto := config.Consensus[protocol.ConsensusCurrentVersion]
proto.MaxBalLookback = 10
config.Consensus[testProtocolVersion] = proto
defer func() {
delete(config.Consensus, testProtocolVersion)
}()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
rewardsLevels := []uint64{0}

pooldata := basics.AccountData{}
pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
pooldata.Status = basics.NotParticipating
accts[0][testPoolAddr] = pooldata

sinkdata := basics.AccountData{}
sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
sinkdata.Status = basics.NotParticipating
accts[0][testSinkAddr] = sinkdata

ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts)
defer ml.Close()

conf := config.GetDefaultLocal()
au := newAcctUpdates(t, ml, conf, ".")
defer au.close()

// cover 10 genesis blocks
rewardLevel := uint64(0)
for i := 1; i < 10; i++ {
accts = append(accts, accts[0])
rewardsLevels = append(rewardsLevels, rewardLevel)
}

checkAcctUpdates(t, au, 0, 9, accts, rewardsLevels, proto)

// lastCreatableID stores asset or app max used index to get rid of conflicts
lastCreatableID := crypto.RandUint64() % 512
knownCreatables := make(map[basics.CreatableIndex]bool)

for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ {
rewardLevelDelta := crypto.RandUint64() % 5
rewardLevel += rewardLevelDelta
var updates ledgercore.AccountDeltas
var totals map[basics.Address]basics.AccountData
base := accts[i-1]
updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)

newPool := totals[testPoolAddr]
newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta
updates.Upsert(testPoolAddr, newPool)
totals[testPoolAddr] = newPool

blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: basics.Round(i),
},
}
blk.RewardsLevel = rewardLevel
blk.CurrentProtocol = testProtocolVersion

delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0)
delta.Accts.MergeAccounts(updates)
delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
delta.Totals = accumulateTotals(t, testProtocolVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel)
au.newBlock(blk, delta)
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)

checkAcctUpdates(t, au, 0, i, accts, rewardsLevels, proto)
}

flushRound := func(i basics.Round) {
// Clear the timer to ensure a flush
ml.trackers.lastFlushTime = time.Time{}

ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i)
ml.trackers.waitAccountsWriting()
//checkAcctUpdates(t, au, i, basics.Round(proto.MaxBalLookback+14), accts, rewardsLevels, proto)
}

// flush a couple of rounds (indirectly schedules commitSyncer)
flushRound(basics.Round(0))
flushRound(basics.Round(1))

// add stallingTracker to list of trackers
stallingTracker := &blockingTracker{
postCommitUnlockedEntryLock: make(chan struct{}),
postCommitUnlockedReleaseLock: make(chan struct{}),
postCommitEntryLock: make(chan struct{}),
postCommitReleaseLock: make(chan struct{}),
alwaysLock: true,
}
ml.trackers.trackers = append([]ledgerTracker{stallingTracker}, ml.trackers.trackers...)

// kick off another round
go flushRound(basics.Round(2))

// let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock)
// this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase
<-stallingTracker.postCommitEntryLock

// prune the baseAccounts cache, so that lookup will fall through to the DB
au.accountsMu.Lock()
au.baseAccounts.prune(0)
au.accountsMu.Unlock()

rnd := basics.Round(2)

// grab any address and data to use for call to lookup
var addr basics.Address
var data basics.AccountData
for a, d := range accts[rnd] {
addr = a
data = d
break
}

// release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait()
go func() {
time.Sleep(200 * time.Millisecond)
stallingTracker.postCommitReleaseLock <- struct{}{}
}()

// issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
d, validThrough, err := au.LookupWithoutRewards(rnd, addr)
require.NoError(t, err)
require.Equal(t, d, data)
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd)
t.Log("validThrough", validThrough)

// allow the postCommitUnlocked() handler to go through
<-stallingTracker.postCommitUnlockedEntryLock
stallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
}
5 changes: 3 additions & 2 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ type blockingTracker struct {
postCommitEntryLock chan struct{}
postCommitReleaseLock chan struct{}
committedUpToRound int64
alwaysLock bool
}

// loadFromDisk is not implemented in the blockingTracker.
Expand Down Expand Up @@ -461,15 +462,15 @@ func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommit

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitEntryLock <- struct{}{}
<-bt.postCommitReleaseLock
}
}

// postCommitUnlocked implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitUnlockedEntryLock <- struct{}{}
<-bt.postCommitUnlockedReleaseLock
}
Expand Down