Skip to content

Commit 6ef3a16

Browse files
karalabefjl
andauthored
p2p/enode: use unix timestamp as base ENR sequence number (ethereum#19903)
This PR ensures that wiping all data associated with a node (apart from its nodekey) will not generate already used sequence number for the ENRs, since all remote nodes would reject them until they out-number the previously published largest one. The big complication with this scheme is that every local update to the ENR can potentially bump the sequence number by one. In order to ensure that local updates do not outrun the clock, the sequence number is a millisecond-precision timestamp, and updates are throttled to occur at most once per millisecond. Co-authored-by: Felix Lange <[email protected]>
1 parent 794c613 commit 6ef3a16

File tree

4 files changed

+69
-21
lines changed

4 files changed

+69
-21
lines changed

p2p/discover/v5wire/encoding_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,6 @@ func (n *handshakeTestNode) init(key *ecdsa.PrivateKey, ip net.IP, clock mclock.
512512
db, _ := enode.OpenDB("")
513513
n.ln = enode.NewLocalNode(db, key)
514514
n.ln.SetStaticIP(ip)
515-
if n.ln.Node().Seq() != 1 {
516-
panic(fmt.Errorf("unexpected seq %d", n.ln.Node().Seq()))
517-
}
518515
n.c = NewCodec(n.ln, key, clock)
519516
}
520517

p2p/enode/localnode.go

+42-3
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,25 @@ const (
3636
iptrackMinStatements = 10
3737
iptrackWindow = 5 * time.Minute
3838
iptrackContactWindow = 10 * time.Minute
39+
40+
// time needed to wait between two updates to the local ENR
41+
recordUpdateThrottle = time.Millisecond
3942
)
4043

4144
// LocalNode produces the signed node record of a local node, i.e. a node run in the
4245
// current process. Setting ENR entries via the Set method updates the record. A new version
4346
// of the record is signed on demand when the Node method is called.
4447
type LocalNode struct {
45-
cur atomic.Value // holds a non-nil node pointer while the record is up-to-date.
48+
cur atomic.Value // holds a non-nil node pointer while the record is up-to-date
49+
4650
id ID
4751
key *ecdsa.PrivateKey
4852
db *DB
4953

5054
// everything below is protected by a lock
51-
mu sync.Mutex
55+
mu sync.RWMutex
5256
seq uint64
57+
update time.Time // timestamp when the record was last updated
5358
entries map[string]enr.Entry
5459
endpoint4 lnEndpoint
5560
endpoint6 lnEndpoint
@@ -76,7 +81,8 @@ func NewLocalNode(db *DB, key *ecdsa.PrivateKey) *LocalNode {
7681
},
7782
}
7883
ln.seq = db.localSeq(ln.id)
79-
ln.invalidate()
84+
ln.update = time.Now()
85+
ln.cur.Store((*Node)(nil))
8086
return ln
8187
}
8288

@@ -87,14 +93,34 @@ func (ln *LocalNode) Database() *DB {
8793

8894
// Node returns the current version of the local node record.
8995
func (ln *LocalNode) Node() *Node {
96+
// If we have a valid record, return that
9097
n := ln.cur.Load().(*Node)
9198
if n != nil {
9299
return n
93100
}
101+
94102
// Record was invalidated, sign a new copy.
95103
ln.mu.Lock()
96104
defer ln.mu.Unlock()
105+
106+
// Double check the current record, since multiple goroutines might be waiting
107+
// on the write mutex.
108+
if n = ln.cur.Load().(*Node); n != nil {
109+
return n
110+
}
111+
112+
// The initial sequence number is the current timestamp in milliseconds. To ensure
113+
// that the initial sequence number will always be higher than any previous sequence
114+
// number (assuming the clock is correct), we want to avoid updating the record faster
115+
// than once per ms. So we need to sleep here until the next possible update time has
116+
// arrived.
117+
lastChange := time.Since(ln.update)
118+
if lastChange < recordUpdateThrottle {
119+
time.Sleep(recordUpdateThrottle - lastChange)
120+
}
121+
97122
ln.sign()
123+
ln.update = time.Now()
98124
return ln.cur.Load().(*Node)
99125
}
100126

@@ -114,6 +140,10 @@ func (ln *LocalNode) ID() ID {
114140
// Set puts the given entry into the local record, overwriting any existing value.
115141
// Use Set*IP and SetFallbackUDP to set IP addresses and UDP port, otherwise they'll
116142
// be overwritten by the endpoint predictor.
143+
//
144+
// Since node record updates are throttled to one per second, Set is asynchronous.
145+
// Any update will be queued up and published when at least one second passes from
146+
// the last change.
117147
func (ln *LocalNode) Set(e enr.Entry) {
118148
ln.mu.Lock()
119149
defer ln.mu.Unlock()
@@ -288,3 +318,12 @@ func (ln *LocalNode) bumpSeq() {
288318
ln.seq++
289319
ln.db.storeLocalSeq(ln.id, ln.seq)
290320
}
321+
322+
// nowMilliseconds gives the current timestamp at millisecond precision.
323+
func nowMilliseconds() uint64 {
324+
ns := time.Now().UnixNano()
325+
if ns < 0 {
326+
return 0
327+
}
328+
return uint64(ns / 1000 / 1000)
329+
}

p2p/enode/localnode_test.go

+20-13
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,39 @@ func TestLocalNode(t *testing.T) {
4949
}
5050
}
5151

52+
// This test checks that the sequence number is persisted between restarts.
5253
func TestLocalNodeSeqPersist(t *testing.T) {
54+
timestamp := nowMilliseconds()
55+
5356
ln, db := newLocalNodeForTesting()
5457
defer db.Close()
5558

56-
if s := ln.Node().Seq(); s != 1 {
57-
t.Fatalf("wrong initial seq %d, want 1", s)
59+
initialSeq := ln.Node().Seq()
60+
if initialSeq < timestamp {
61+
t.Fatalf("wrong initial seq %d, want at least %d", initialSeq, timestamp)
5862
}
63+
5964
ln.Set(enr.WithEntry("x", uint(1)))
60-
if s := ln.Node().Seq(); s != 2 {
61-
t.Fatalf("wrong seq %d after set, want 2", s)
65+
if s := ln.Node().Seq(); s != initialSeq+1 {
66+
t.Fatalf("wrong seq %d after set, want %d", s, initialSeq+1)
6267
}
6368

6469
// Create a new instance, it should reload the sequence number.
6570
// The number increases just after that because a new record is
6671
// created without the "x" entry.
6772
ln2 := NewLocalNode(db, ln.key)
68-
if s := ln2.Node().Seq(); s != 3 {
69-
t.Fatalf("wrong seq %d on new instance, want 3", s)
73+
if s := ln2.Node().Seq(); s != initialSeq+2 {
74+
t.Fatalf("wrong seq %d on new instance, want %d", s, initialSeq+2)
7075
}
7176

77+
finalSeq := ln2.Node().Seq()
78+
7279
// Create a new instance with a different node key on the same database.
7380
// This should reset the sequence number.
7481
key, _ := crypto.GenerateKey()
7582
ln3 := NewLocalNode(db, key)
76-
if s := ln3.Node().Seq(); s != 1 {
77-
t.Fatalf("wrong seq %d on instance with changed key, want 1", s)
83+
if s := ln3.Node().Seq(); s < finalSeq {
84+
t.Fatalf("wrong seq %d on instance with changed key, want >= %d", s, finalSeq)
7885
}
7986
}
8087

@@ -91,32 +98,32 @@ func TestLocalNodeEndpoint(t *testing.T) {
9198
// Nothing is set initially.
9299
assert.Equal(t, net.IP(nil), ln.Node().IP())
93100
assert.Equal(t, 0, ln.Node().UDP())
94-
assert.Equal(t, uint64(1), ln.Node().Seq())
101+
initialSeq := ln.Node().Seq()
95102

96103
// Set up fallback address.
97104
ln.SetFallbackIP(fallback.IP)
98105
ln.SetFallbackUDP(fallback.Port)
99106
assert.Equal(t, fallback.IP, ln.Node().IP())
100107
assert.Equal(t, fallback.Port, ln.Node().UDP())
101-
assert.Equal(t, uint64(2), ln.Node().Seq())
108+
assert.Equal(t, initialSeq+1, ln.Node().Seq())
102109

103110
// Add endpoint statements from random hosts.
104111
for i := 0; i < iptrackMinStatements; i++ {
105112
assert.Equal(t, fallback.IP, ln.Node().IP())
106113
assert.Equal(t, fallback.Port, ln.Node().UDP())
107-
assert.Equal(t, uint64(2), ln.Node().Seq())
114+
assert.Equal(t, initialSeq+1, ln.Node().Seq())
108115

109116
from := &net.UDPAddr{IP: make(net.IP, 4), Port: 90}
110117
rand.Read(from.IP)
111118
ln.UDPEndpointStatement(from, predicted)
112119
}
113120
assert.Equal(t, predicted.IP, ln.Node().IP())
114121
assert.Equal(t, predicted.Port, ln.Node().UDP())
115-
assert.Equal(t, uint64(3), ln.Node().Seq())
122+
assert.Equal(t, initialSeq+2, ln.Node().Seq())
116123

117124
// Static IP overrides prediction.
118125
ln.SetStaticIP(staticIP)
119126
assert.Equal(t, staticIP, ln.Node().IP())
120127
assert.Equal(t, fallback.Port, ln.Node().UDP())
121-
assert.Equal(t, uint64(4), ln.Node().Seq())
128+
assert.Equal(t, initialSeq+3, ln.Node().Seq())
122129
}

p2p/enode/nodedb.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,14 @@ func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {
427427
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
428428
}
429429

430-
// LocalSeq retrieves the local record sequence counter.
430+
// localSeq retrieves the local record sequence counter, defaulting to the current
431+
// timestamp if no previous exists. This ensures that wiping all data associated
432+
// with a node (apart from its key) will not generate already used sequence nums.
431433
func (db *DB) localSeq(id ID) uint64 {
432-
return db.fetchUint64(localItemKey(id, dbLocalSeq))
434+
if seq := db.fetchUint64(localItemKey(id, dbLocalSeq)); seq > 0 {
435+
return seq
436+
}
437+
return nowMilliseconds()
433438
}
434439

435440
// storeLocalSeq stores the local record sequence counter.

0 commit comments

Comments
 (0)