Skip to content

Commit 994095a

Browse files
committed
fix: eth 68 protocol
1 parent d0319f9 commit 994095a

File tree

10 files changed

+117
-15
lines changed

10 files changed

+117
-15
lines changed

cmd/devp2p/internal/ethtest/suite.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -755,17 +755,20 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
755755
}
756756

757757
// generate 50 txs
758-
hashMap, _, err := generateTxs(s, 50)
758+
_, txs, err := generateTxs(s, 50)
759759
if err != nil {
760760
t.Fatalf("failed to generate transactions: %v", err)
761761
}
762762

763763
// create new pooled tx hashes announcement
764-
hashes := make([]common.Hash, 0)
765-
for _, hash := range hashMap {
766-
hashes = append(hashes, hash)
764+
hashes := make([]common.Hash, len(txs))
765+
types := make([]byte, len(txs))
766+
sizes := make([]uint32, len(txs))
767+
for i, tx := range txs {
768+
hashes[i] = tx.Hash()
769+
types[i] = tx.Type()
770+
sizes[i] = uint32(tx.Size())
767771
}
768-
announce := NewPooledTransactionHashes(hashes)
769772

770773
// send announcement
771774
conn, err := s.dial66()
@@ -776,6 +779,14 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
776779
if err = conn.peer(s.chain, nil); err != nil {
777780
t.Fatalf("peering failed: %v", err)
778781
}
782+
783+
var announce Message
784+
if conn.negotiatedProtoVersion == eth.ETH68 {
785+
announce = NewPooledTransactionHashes68{Types: types}
786+
} else {
787+
announce = NewPooledTransactionHashes(hashes)
788+
}
789+
779790
if err = conn.Write(announce); err != nil {
780791
t.Fatalf("failed to write to connection: %v", err)
781792
}
@@ -792,6 +803,8 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
792803
// ignore propagated txs from previous tests
793804
case *NewPooledTransactionHashes:
794805
continue
806+
case *NewPooledTransactionHashes68:
807+
continue
795808
// ignore block announcements from previous tests
796809
case *NewBlockHashes:
797810
continue

cmd/devp2p/internal/ethtest/types.go

+4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket
116116

117117
func (nb NewPooledTransactionHashes) Code() int { return 24 }
118118

119+
type NewPooledTransactionHashes68 eth.NewPooledTransactionHashesPacket68
120+
121+
func (nb NewPooledTransactionHashes68) Code() int { return 24 }
122+
119123
type GetPooledTransactions eth.GetPooledTransactionsPacket
120124

121125
func (gpt GetPooledTransactions) Code() int { return 25 }

eth/handler_eth.go

+3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
7070
case *eth.NewPooledTransactionHashesPacket:
7171
return h.txFetcher.Notify(peer.ID(), *packet)
7272

73+
case *eth.NewPooledTransactionHashesPacket68:
74+
return h.txFetcher.Notify(peer.ID(), packet.Hashes)
75+
7376
case *eth.TransactionsPacket:
7477
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
7578

eth/handler_eth_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
8282
// fork IDs in the protocol handshake.
8383
func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) }
8484
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
85+
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }
8586

8687
func testForkIDSplit(t *testing.T, protocol uint) {
8788
t.Parallel()
@@ -240,6 +241,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
240241
// Tests that received transactions are added to the local pool.
241242
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) }
242243
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
244+
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }
243245

244246
func testRecvTransactions(t *testing.T, protocol uint) {
245247
t.Parallel()
@@ -297,6 +299,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
297299

298300
// This test checks that pending transactions are sent.
299301
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
302+
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }
300303

301304
func testSendTransactions(t *testing.T, protocol uint) {
302305
t.Parallel()
@@ -383,6 +386,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
383386
// broadcasts or via announcements/retrievals.
384387
func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) }
385388
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
389+
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }
386390

387391
func testTransactionPropagation(t *testing.T, protocol uint) {
388392
t.Parallel()
@@ -690,6 +694,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
690694
// with the hashes in the header) gets discarded and not broadcast forward.
691695
func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) }
692696
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
697+
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }
693698

694699
func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
695700
t.Parallel()

eth/protocols/eth/broadcast.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
142142
if done == nil && len(queue) > 0 {
143143
// Pile transaction hashes until we reach our allowed network limit
144144
var (
145-
count int
146-
pending []common.Hash
147-
size common.StorageSize
145+
count int
146+
pending []common.Hash
147+
pendingTypes []byte
148+
pendingSizes []uint32
149+
size common.StorageSize
148150
)
149151
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
150-
if p.txpool.Get(queue[count]) != nil {
152+
if tx := p.txpool.Get(queue[count]); tx != nil {
151153
pending = append(pending, queue[count])
154+
pendingTypes = append(pendingTypes, tx.Type())
155+
pendingSizes = append(pendingSizes, uint32(tx.Size()))
152156
size += common.HashLength
153157
}
154158
}
@@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
159163
if len(pending) > 0 {
160164
done = make(chan struct{})
161165
go func() {
162-
if err := p.sendPooledTransactionHashes(pending); err != nil {
163-
fail <- err
164-
return
166+
if p.version >= ETH68 {
167+
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
168+
fail <- err
169+
return
170+
}
171+
} else {
172+
if err := p.sendPooledTransactionHashes(pending); err != nil {
173+
fail <- err
174+
return
175+
}
165176
}
166177
close(done)
167178
p.Log().Trace("Sent transaction announcements", "count", len(pending))

eth/protocols/eth/handler.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,21 @@ var eth66 = map[uint64]msgHandler{
216216
TransactionsExMsg: handleTransactionsEx,
217217
}
218218

219+
var eth68 = map[uint64]msgHandler{
220+
NewBlockHashesMsg: handleNewBlockhashes,
221+
NewBlockMsg: handleNewBlock,
222+
TransactionsMsg: handleTransactions,
223+
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
224+
GetBlockHeadersMsg: handleGetBlockHeaders66,
225+
BlockHeadersMsg: handleBlockHeaders66,
226+
GetBlockBodiesMsg: handleGetBlockBodies66,
227+
BlockBodiesMsg: handleBlockBodies66,
228+
GetReceiptsMsg: handleGetReceipts66,
229+
ReceiptsMsg: handleReceipts66,
230+
GetPooledTransactionsMsg: handleGetPooledTransactions66,
231+
PooledTransactionsMsg: handlePooledTransactions66,
232+
}
233+
219234
// handleMessage is invoked whenever an inbound message is received from a remote
220235
// peer. The remote connection is torn down upon returning any error.
221236
func handleMessage(backend Backend, peer *Peer) error {
@@ -230,9 +245,12 @@ func handleMessage(backend Backend, peer *Peer) error {
230245
defer msg.Discard()
231246

232247
var handlers = eth65
233-
if peer.Version() >= ETH66 {
248+
if peer.Version() == ETH66 {
234249
handlers = eth66
235250
}
251+
if peer.Version() == ETH68 {
252+
handlers = eth68
253+
}
236254

237255
// Track the amount of time it takes to serve the request and run the handler
238256
if metrics.Enabled {

eth/protocols/eth/handler_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error {
110110
// Tests that block headers can be retrieved from a remote chain based on user queries.
111111
func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) }
112112
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
113+
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }
113114

114115
func testGetBlockHeaders(t *testing.T, protocol uint) {
115116
t.Parallel()
@@ -312,6 +313,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
312313
// Tests that block contents can be retrieved from a remote chain based on their hashes.
313314
func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) }
314315
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
316+
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }
315317

316318
func testGetBlockBodies(t *testing.T, protocol uint) {
317319
t.Parallel()
@@ -403,6 +405,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
403405
// Tests that the state trie nodes can be retrieved based on hashes.
404406
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) }
405407
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
408+
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68) }
406409

407410
func testGetNodeData(t *testing.T, protocol uint) {
408411
t.Parallel()
@@ -524,6 +527,7 @@ func testGetNodeData(t *testing.T, protocol uint) {
524527
// Tests that the transaction receipts can be retrieved based on hashes.
525528
func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) }
526529
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
530+
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }
527531

528532
func testGetBlockReceipts(t *testing.T, protocol uint) {
529533
t.Parallel()

eth/protocols/eth/handlers.go

+27
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,33 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
798798
return nil
799799
}
800800

801+
func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
802+
// New transaction announcement arrived, make sure we have
803+
// a valid and fresh chain to handle them
804+
if !backend.AcceptTxs() {
805+
return nil
806+
}
807+
ann := new(NewPooledTransactionHashesPacket68)
808+
if err := msg.Decode(ann); err != nil {
809+
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
810+
}
811+
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
812+
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
813+
}
814+
f := func() error {
815+
// Schedule all the unknown hashes for retrieval
816+
for _, hash := range ann.Hashes {
817+
peer.markTransaction(hash)
818+
}
819+
return backend.Handle(peer, ann)
820+
}
821+
if params.ConsensusMethod == params.ConsensusPoW {
822+
return f()
823+
}
824+
go f()
825+
return nil
826+
}
827+
801828
func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
802829
// Decode the pooled transactions retrieval message
803830
var query GetPooledTransactionsPacket

eth/protocols/eth/peer.go

+6
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
227227
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
228228
}
229229

230+
func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
231+
// Mark all the transactions as known, but ensure we don't overflow our limits
232+
p.knownTxs.Add(hashes...)
233+
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
234+
}
235+
230236
// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
231237
// announce to a remote peer. The number of pending sends are capped (new ones
232238
// will force old sends to be dropped)

eth/protocols/eth/protocol.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
const (
3434
ETH65 = 65
3535
ETH66 = 66
36+
ETH68 = 68
3637
)
3738

3839
// ProtocolName is the official short name of the `eth` protocol used during
@@ -41,11 +42,11 @@ const ProtocolName = "mir"
4142

4243
// ProtocolVersions are the supported versions of the `eth` protocol (first
4344
// is primary).
44-
var ProtocolVersions = []uint{ETH66, ETH65}
45+
var ProtocolVersions = []uint{ETH68, ETH66, ETH65}
4546

4647
// protocolLengths are the number of implemented message corresponding to
4748
// different protocol versions.
48-
var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23}
49+
var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23, ETH68: 17}
4950

5051
// maxMessageSize is the maximum cap on the size of a protocol message.
5152
const maxMessageSize = 100 * 1024 * 1024
@@ -316,6 +317,13 @@ type ReceiptsRLPPacket66 struct {
316317
// NewPooledTransactionHashesPacket represents a transaction announcement packet.
317318
type NewPooledTransactionHashesPacket []common.Hash
318319

320+
// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer.
321+
type NewPooledTransactionHashesPacket68 struct {
322+
Types []byte
323+
Sizes []uint32
324+
Hashes []common.Hash
325+
}
326+
319327
// GetPooledTransactionsPacket represents a transaction query.
320328
type GetPooledTransactionsPacket []common.Hash
321329

@@ -421,6 +429,9 @@ func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }
421429
func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" }
422430
func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg }
423431

432+
func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }
433+
func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg }
434+
424435
func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
425436
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }
426437

0 commit comments

Comments
 (0)