From 87648f6a39064efc5ba4fc775f32ff2af4531fa1 Mon Sep 17 00:00:00 2001 From: Ricardo Silva <1945557+ricardolyn@users.noreply.github.com> Date: Thu, 30 Sep 2021 17:01:56 +0100 Subject: [PATCH] Send transactions to all peers instead of a sub-set (#1261) * send transactions to all peers * add unit test * tidy up * fix lint issues * add wait for peers to be synced * Apply suggestions from code review Co-authored-by: baptiste-b-pegasys <85155432+baptiste-b-pegasys@users.noreply.github.com> * improve handling of peers registration and fix issues from suggestion. Co-authored-by: baptiste-b-pegasys <85155432+baptiste-b-pegasys@users.noreply.github.com> --- eth/handler.go | 5 +-- eth/handler_test.go | 78 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 66df500cd4..8615cffb04 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -941,8 +941,9 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) - // Send the block to a subset of our peers - transfer := peers[:int(math.Sqrt(float64(len(peers))))] + // Quorum - Propagate the transactions to all our peers (instead of subset as geth is doing) + transfer := peers + // End Quorum for _, peer := range transfer { txset[peer] = append(txset[peer], tx.Hash()) } diff --git a/eth/handler_test.go b/eth/handler_test.go index af253ac62b..fa0f802a7e 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -21,6 +21,7 @@ import ( "math" "math/big" "math/rand" + "sync" "testing" "time" @@ -732,3 +733,80 @@ func TestBroadcastMalformedBlock(t *testing.T) { } } } + +// Quorum +// Tests that when broadcasting transactions, it sends the full transactions to all peers instead of Announcing (aka sending only hashes) +func TestBroadcastTransactionsOnQuorum(t *testing.T) { + var ( + evmux = new(event.TypeMux) + pow = ethash.NewFaker() + db = rawdb.NewMemoryDatabase() + config = ¶ms.ChainConfig{} + gspec = &core.Genesis{Config: config} + destinationKey, _ = crypto.GenerateKey() + totalPeers = 100 + ) + gspec.MustCommit(db) + blockchain, _ := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil, nil, nil) + txPool := &testTxPool{pool: make(map[common.Hash]*types.Transaction)} + + pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, txPool, pow, blockchain, db, 1, nil, false) + if err != nil { + t.Fatalf("failed to start test protocol manager: %v", err) + } + pm.Start(totalPeers) + defer pm.Stop() + + var peers []*testPeer + wgPeers := sync.WaitGroup{} + wgPeers.Add(totalPeers) + for i := 0; i < totalPeers; i++ { + peer, _ := newTestPeer(fmt.Sprintf("peer %d", i), eth65, pm, true) + go func() { + <-peer.EthPeerRegistered + wgPeers.Done() + }() + defer peer.close() + + peers = append(peers, peer) + } + wgPeers.Wait() // wait until all peers are synced before pushing tx to the pool + + transaction := types.NewTransaction(0, crypto.PubkeyToAddress(destinationKey.PublicKey), common.Big0, uint64(3000000), common.Big0, nil) + transactions := types.Transactions{transaction} + + txPool.AddRemotes(transactions) // this will trigger the transaction broadcast/announce + + doneCh := make(chan error, totalPeers) + + wgPeers.Add(totalPeers) + defer func() { + wgPeers.Wait() + close(doneCh) + }() + + for _, peer := range peers { + go func(p *testPeer) { + doneCh <- p2p.ExpectMsg(p.app, TransactionMsg, transactions) + wgPeers.Done() + }(peer) + } + var received int + for { + select { + case err := <-doneCh: + if err != nil { + t.Fatalf("broadcast failed: %v", err) + return + } + received++ + if received == totalPeers { + // We found the right number + return + } + case <-time.After(2 * time.Second): + t.Errorf("timeout: broadcast count mismatch: have %d, want %d", received, totalPeers) + return + } + } +}