Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnsync: Use dynamic maxEncodedTransactionGroups #3168

Merged
merged 5 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions txnsync/encodedgroups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestBadBitmask(t *testing.T) {
partitiontest.PartitionTest(t)

txnGroups, genesisID, genesisHash, err := txnGroupsData(96)
txnGroups, genesisID, genesisHash, err := txnGroupsData(50)
require.NoError(t, err)

var s syncState
Expand All @@ -41,7 +41,7 @@ func TestBadBitmask(t *testing.T) {
require.Equal(t, errIndexNotFound, err)
}

// corrupted bitmask may bcause panic during decoding. This test is to make sure it is an error and not a panic
// corrupted bitmask may cause panic during decoding. This test is to make sure it is an error and not a panic
func badEncodeTransactionGroups(t *testing.T, s *syncState, inTxnGroups []pooldata.SignedTxGroup, dataExchangeRate uint64) (packedTransactionGroups, error) {
txnCount := 0
for _, txGroup := range inTxnGroups {
Expand Down
11 changes: 6 additions & 5 deletions txnsync/encodedgroupstypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"github.com/algorand/go-algorand/protocol"
)

const maxEncodedTransactionGroups = 30000
const maxEncodedTransactionGroupEntries = 30000
const maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
const maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
const maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize
// set in init() in service.go
var maxEncodedTransactionGroups int
var maxEncodedTransactionGroupEntries int
var maxBitmaskSize int
var maxSignatureBytes int
var maxAddressBytes int

var errInvalidTxType = errors.New("invalid txtype")

Expand Down
9 changes: 6 additions & 3 deletions txnsync/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
)

const txnBlockMessageVersion = 1
const maxBloomFilterSize = 100000
const maxAcceptedMsgSeq = 64
const maxEncodedTransactionGroupBytes = 10000000
const maxProposalSize = 350000

// set in init() in service.go
var maxBloomFilterSize int
var maxEncodedTransactionGroupBytes int

var maxProposalSize = 350000

type transactionBlockMessage struct {
_struct struct{} `codec:",omitempty,omitemptyarray"` //nolint:structcheck,unused
Expand Down
2 changes: 1 addition & 1 deletion txnsync/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
_, err = incomingMessage.message.UnmarshalMsg(message)
if err != nil {
// if we received a message that we cannot parse, disconnect.
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.")
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer: %v, bytes: %d", err, len(message))
s.incomingMessagesQ.erase(peer, networkPeer)
return err
}
Expand Down
20 changes: 20 additions & 0 deletions txnsync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,25 @@ func MakeTransactionSyncService(log logging.Logger, conn NodeConnector, isRelay
}
s.state.service = s
s.state.xorBuilder.MaxIterations = 10

setTransactionSyncVariables(cfg)
return s
}

func setTransactionSyncVariables(cfg config.Local) {
if cfg.TxPoolSize < maxEncodedTransactionGroups {
return
}
maxEncodedTransactionGroups = cfg.TxPoolSize
maxEncodedTransactionGroupEntries = cfg.TxPoolSize
maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize

maxBloomFilterSize = cfg.TxPoolSize * 5 // 32 bit xor uses slightly more than 4 bytes/element.
maxEncodedTransactionGroupBytes = cfg.TxPoolSize * 10000 // assume each transaction takes 10KB, as a worst-case-scenario for bounding purposes only.
}

// Start starts the transaction sync
func (s *Service) Start() {
s.ctx, s.cancelCtx = context.WithCancel(context.Background())
Expand All @@ -76,3 +92,7 @@ func (s *Service) Stop() {
func (s *Service) GetIncomingMessageHandler() IncomingMessageHandler {
return s.state.asyncIncomingMessageHandler
}

func init() {
setTransactionSyncVariables(config.GetDefaultLocal())
}
4 changes: 2 additions & 2 deletions txnsync/txngroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene
return nil, err
}

if stub.TransactionGroupCount > maxEncodedTransactionGroups {
if stub.TransactionGroupCount > uint64(maxEncodedTransactionGroups) {
return nil, errors.New("invalid TransactionGroupCount")
}

Expand Down Expand Up @@ -191,7 +191,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene

func decompressTransactionGroupsBytes(data []byte, lenDecompressedBytes uint64) (decoded []byte, err error) {
compressionRatio := lenDecompressedBytes / uint64(len(data)) // data should have been compressed between 0 and 95%
if lenDecompressedBytes > maxEncodedTransactionGroupBytes || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
if lenDecompressedBytes > uint64(maxEncodedTransactionGroupBytes) || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
return nil, fmt.Errorf("invalid lenDecompressedBytes: %d, len(data): %d", lenDecompressedBytes, len(data))
}

Expand Down
10 changes: 5 additions & 5 deletions txnsync/txngroups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func txnGroupsData(numBlocks int) (txnGroups []pooldata.SignedTxGroup, genesisID
func TestTxnGroupEncodingLarge(t *testing.T) {
partitiontest.PartitionTest(t)

txnGroups, genesisID, genesisHash, err := txnGroupsData(969)
txnGroups, genesisID, genesisHash, err := txnGroupsData(500)
require.NoError(t, err)

var s syncState
Expand Down Expand Up @@ -268,10 +268,10 @@ func TestTxnGroupEncodingLarge(t *testing.T) {
}
}
require.Equal(t, 2, len(count))
require.Equal(t, 18351, count["axfer"])
require.Equal(t, 1663, count["pay"])
require.Equal(t, 20005, sigs)
require.Equal(t, 9, msigs)
require.Equal(t, 9834, count["axfer"])
require.Equal(t, 850, count["pay"])
require.Equal(t, 10678, sigs)
require.Equal(t, 6, msigs)
require.Equal(t, 0, lsigs)
}

Expand Down