@@ -1700,3 +1700,152 @@ func TestConsecutiveVersion(t *testing.T) {
1700
1700
protocol .ConsensusV21 ,
1701
1701
}
1702
1702
}
1703
+
1704
+ // This test attempts to cover the case when an accountUpdates.lookupX method:
1705
+ // - can't find the requested address,
1706
+ // - falls through looking at deltas and the LRU accounts cache,
1707
+ // - then hits the database (calling accountsDbQueries.lookup)
1708
+ // only to discover that the round stored in the database (committed in accountUpdates.commitRound)
1709
+ // is out of sync with accountUpdates.cachedDBRound (updated a little bit later in accountUpdates.postCommit).
1710
+ //
1711
+ // In this case it waits on a condition variable and retries when
1712
+ // commitSyncer/accountUpdates has advanced the cachedDBRound.
1713
+ func TestAcctUpdatesLookupRetry (t * testing.T ) {
1714
+ partitiontest .PartitionTest (t )
1715
+
1716
+ testProtocolVersion := protocol .ConsensusVersion ("test-protocol-TestAcctUpdatesLookupRetry" )
1717
+ proto := config .Consensus [protocol .ConsensusCurrentVersion ]
1718
+ proto .MaxBalLookback = 10
1719
+ config .Consensus [testProtocolVersion ] = proto
1720
+ defer func () {
1721
+ delete (config .Consensus , testProtocolVersion )
1722
+ }()
1723
+
1724
+ accts := []map [basics.Address ]basics.AccountData {ledgertesting .RandomAccounts (20 , true )}
1725
+ rewardsLevels := []uint64 {0 }
1726
+
1727
+ pooldata := basics.AccountData {}
1728
+ pooldata .MicroAlgos .Raw = 1000 * 1000 * 1000 * 1000
1729
+ pooldata .Status = basics .NotParticipating
1730
+ accts [0 ][testPoolAddr ] = pooldata
1731
+
1732
+ sinkdata := basics.AccountData {}
1733
+ sinkdata .MicroAlgos .Raw = 1000 * 1000 * 1000 * 1000
1734
+ sinkdata .Status = basics .NotParticipating
1735
+ accts [0 ][testSinkAddr ] = sinkdata
1736
+
1737
+ ml := makeMockLedgerForTracker (t , false , 10 , testProtocolVersion , accts )
1738
+ defer ml .Close ()
1739
+
1740
+ conf := config .GetDefaultLocal ()
1741
+ au := newAcctUpdates (t , ml , conf , "." )
1742
+ defer au .close ()
1743
+
1744
+ // cover 10 genesis blocks
1745
+ rewardLevel := uint64 (0 )
1746
+ for i := 1 ; i < 10 ; i ++ {
1747
+ accts = append (accts , accts [0 ])
1748
+ rewardsLevels = append (rewardsLevels , rewardLevel )
1749
+ }
1750
+
1751
+ checkAcctUpdates (t , au , 0 , 9 , accts , rewardsLevels , proto )
1752
+
1753
+ // lastCreatableID stores asset or app max used index to get rid of conflicts
1754
+ lastCreatableID := crypto .RandUint64 () % 512
1755
+ knownCreatables := make (map [basics.CreatableIndex ]bool )
1756
+
1757
+ for i := basics .Round (10 ); i < basics .Round (proto .MaxBalLookback + 15 ); i ++ {
1758
+ rewardLevelDelta := crypto .RandUint64 () % 5
1759
+ rewardLevel += rewardLevelDelta
1760
+ var updates ledgercore.AccountDeltas
1761
+ var totals map [basics.Address ]basics.AccountData
1762
+ base := accts [i - 1 ]
1763
+ updates , totals , lastCreatableID = ledgertesting .RandomDeltasBalancedFull (1 , base , rewardLevel , lastCreatableID )
1764
+ prevTotals , err := au .Totals (basics .Round (i - 1 ))
1765
+ require .NoError (t , err )
1766
+
1767
+ newPool := totals [testPoolAddr ]
1768
+ newPool .MicroAlgos .Raw -= prevTotals .RewardUnits () * rewardLevelDelta
1769
+ updates .Upsert (testPoolAddr , newPool )
1770
+ totals [testPoolAddr ] = newPool
1771
+
1772
+ blk := bookkeeping.Block {
1773
+ BlockHeader : bookkeeping.BlockHeader {
1774
+ Round : basics .Round (i ),
1775
+ },
1776
+ }
1777
+ blk .RewardsLevel = rewardLevel
1778
+ blk .CurrentProtocol = testProtocolVersion
1779
+
1780
+ delta := ledgercore .MakeStateDelta (& blk .BlockHeader , 0 , updates .Len (), 0 )
1781
+ delta .Accts .MergeAccounts (updates )
1782
+ delta .Creatables = creatablesFromUpdates (base , updates , knownCreatables )
1783
+ delta .Totals = accumulateTotals (t , testProtocolVersion , []map [basics.Address ]basics.AccountData {totals }, rewardLevel )
1784
+ au .newBlock (blk , delta )
1785
+ accts = append (accts , totals )
1786
+ rewardsLevels = append (rewardsLevels , rewardLevel )
1787
+
1788
+ checkAcctUpdates (t , au , 0 , i , accts , rewardsLevels , proto )
1789
+ }
1790
+
1791
+ flushRound := func (i basics.Round ) {
1792
+ // Clear the timer to ensure a flush
1793
+ ml .trackers .lastFlushTime = time.Time {}
1794
+
1795
+ ml .trackers .committedUpTo (basics .Round (proto .MaxBalLookback ) + i )
1796
+ ml .trackers .waitAccountsWriting ()
1797
+ }
1798
+
1799
+ // flush a couple of rounds (indirectly schedules commitSyncer)
1800
+ flushRound (basics .Round (0 ))
1801
+ flushRound (basics .Round (1 ))
1802
+
1803
+ // add stallingTracker to list of trackers
1804
+ stallingTracker := & blockingTracker {
1805
+ postCommitUnlockedEntryLock : make (chan struct {}),
1806
+ postCommitUnlockedReleaseLock : make (chan struct {}),
1807
+ postCommitEntryLock : make (chan struct {}),
1808
+ postCommitReleaseLock : make (chan struct {}),
1809
+ alwaysLock : true ,
1810
+ }
1811
+ ml .trackers .trackers = append ([]ledgerTracker {stallingTracker }, ml .trackers .trackers ... )
1812
+
1813
+ // kick off another round
1814
+ go flushRound (basics .Round (2 ))
1815
+
1816
+ // let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock)
1817
+ // this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase
1818
+ <- stallingTracker .postCommitEntryLock
1819
+
1820
+ // prune the baseAccounts cache, so that lookup will fall through to the DB
1821
+ au .accountsMu .Lock ()
1822
+ au .baseAccounts .prune (0 )
1823
+ au .accountsMu .Unlock ()
1824
+
1825
+ rnd := basics .Round (2 )
1826
+
1827
+ // grab any address and data to use for call to lookup
1828
+ var addr basics.Address
1829
+ var data basics.AccountData
1830
+ for a , d := range accts [rnd ] {
1831
+ addr = a
1832
+ data = d
1833
+ break
1834
+ }
1835
+
1836
+ // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait()
1837
+ go func () {
1838
+ time .Sleep (200 * time .Millisecond )
1839
+ stallingTracker .postCommitReleaseLock <- struct {}{}
1840
+ }()
1841
+
1842
+ // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
1843
+ d , validThrough , err := au .LookupWithoutRewards (rnd , addr )
1844
+ require .NoError (t , err )
1845
+ require .Equal (t , d , data )
1846
+ require .GreaterOrEqualf (t , uint64 (validThrough ), uint64 (rnd ), "validThrough: %v rnd :%v" , validThrough , rnd )
1847
+
1848
+ // allow the postCommitUnlocked() handler to go through
1849
+ <- stallingTracker .postCommitUnlockedEntryLock
1850
+ stallingTracker .postCommitUnlockedReleaseLock <- struct {}{}
1851
+ }
0 commit comments