Skip to content

Commit

Permalink
Send transactions to all peers instead of a sub-set (#1261)
Browse files Browse the repository at this point in the history
* send transactions to all peers

* add unit test

* tidy up

* fix lint issues

* add wait for peers to be synced

* Apply suggestions from code review

Co-authored-by: baptiste-b-pegasys <[email protected]>

* improve handling of peers registration and fix issues from suggestion.

Co-authored-by: baptiste-b-pegasys <[email protected]>
  • Loading branch information
ricardolyn and baptiste-b-pegasys authored Sep 30, 2021
1 parent 743e4a1 commit 87648f6
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 2 deletions.
5 changes: 3 additions & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,9 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())

// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
// Quorum - Propagate the transactions to all our peers (instead of subset as geth is doing)
transfer := peers
// End Quorum
for _, peer := range transfer {
txset[peer] = append(txset[peer], tx.Hash())
}
Expand Down
78 changes: 78 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"math/big"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -732,3 +733,80 @@ func TestBroadcastMalformedBlock(t *testing.T) {
}
}
}

// Quorum
// Tests that when broadcasting transactions, it sends the full transactions to all peers instead of Announcing (aka sending only hashes)
func TestBroadcastTransactionsOnQuorum(t *testing.T) {
var (
evmux = new(event.TypeMux)
pow = ethash.NewFaker()
db = rawdb.NewMemoryDatabase()
config = &params.ChainConfig{}
gspec = &core.Genesis{Config: config}
destinationKey, _ = crypto.GenerateKey()
totalPeers = 100
)
gspec.MustCommit(db)
blockchain, _ := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil, nil, nil)
txPool := &testTxPool{pool: make(map[common.Hash]*types.Transaction)}

pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, txPool, pow, blockchain, db, 1, nil, false)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
pm.Start(totalPeers)
defer pm.Stop()

var peers []*testPeer
wgPeers := sync.WaitGroup{}
wgPeers.Add(totalPeers)
for i := 0; i < totalPeers; i++ {
peer, _ := newTestPeer(fmt.Sprintf("peer %d", i), eth65, pm, true)
go func() {
<-peer.EthPeerRegistered
wgPeers.Done()
}()
defer peer.close()

peers = append(peers, peer)
}
wgPeers.Wait() // wait until all peers are synced before pushing tx to the pool

transaction := types.NewTransaction(0, crypto.PubkeyToAddress(destinationKey.PublicKey), common.Big0, uint64(3000000), common.Big0, nil)
transactions := types.Transactions{transaction}

txPool.AddRemotes(transactions) // this will trigger the transaction broadcast/announce

doneCh := make(chan error, totalPeers)

wgPeers.Add(totalPeers)
defer func() {
wgPeers.Wait()
close(doneCh)
}()

for _, peer := range peers {
go func(p *testPeer) {
doneCh <- p2p.ExpectMsg(p.app, TransactionMsg, transactions)
wgPeers.Done()
}(peer)
}
var received int
for {
select {
case err := <-doneCh:
if err != nil {
t.Fatalf("broadcast failed: %v", err)
return
}
received++
if received == totalPeers {
// We found the right number
return
}
case <-time.After(2 * time.Second):
t.Errorf("timeout: broadcast count mismatch: have %d, want %d", received, totalPeers)
return
}
}
}

0 comments on commit 87648f6

Please sign in to comment.