Skip to content

Commit 1c8a2e9

Browse files
rjl493456442kielbarry
authored andcommitted
les: handle conn/disc/reg logic in the eventloop (ethereum#16981)
* les: handle conn/disc/reg logic in the eventloop * les: try to dial before start eventloop * les: handle disconnect logic more safely * les: grammar fix
1 parent 8a9e6f5 commit 1c8a2e9

File tree

1 file changed

+135
-74
lines changed

1 file changed

+135
-74
lines changed

les/serverpool.go

+135-74
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,27 @@ const (
8787
initStatsWeight = 1
8888
)
8989

90+
// connReq represents a request for peer connection.
91+
type connReq struct {
92+
p *peer
93+
ip net.IP
94+
port uint16
95+
result chan *poolEntry
96+
}
97+
98+
// disconnReq represents a request for peer disconnection.
99+
type disconnReq struct {
100+
entry *poolEntry
101+
stopped bool
102+
done chan struct{}
103+
}
104+
105+
// registerReq represents a request for peer registration.
106+
type registerReq struct {
107+
entry *poolEntry
108+
done chan struct{}
109+
}
110+
90111
// serverPool implements a pool for storing and selecting newly discovered and already
91112
// known light server nodes. It received discovered nodes, stores statistics about
92113
// known nodes and takes care of always having enough good quality servers connected.
@@ -105,10 +126,13 @@ type serverPool struct {
105126
discLookups chan bool
106127

107128
entries map[discover.NodeID]*poolEntry
108-
lock sync.Mutex
109129
timeout, enableRetry chan *poolEntry
110130
adjustStats chan poolStatAdjust
111131

132+
connCh chan *connReq
133+
disconnCh chan *disconnReq
134+
registerCh chan *registerReq
135+
112136
knownQueue, newQueue poolEntryQueue
113137
knownSelect, newSelect *weightedRandomSelect
114138
knownSelected, newSelected int
@@ -125,6 +149,9 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
125149
timeout: make(chan *poolEntry, 1),
126150
adjustStats: make(chan poolStatAdjust, 100),
127151
enableRetry: make(chan *poolEntry, 1),
152+
connCh: make(chan *connReq),
153+
disconnCh: make(chan *disconnReq),
154+
registerCh: make(chan *registerReq),
128155
knownSelect: newWeightedRandomSelect(),
129156
newSelect: newWeightedRandomSelect(),
130157
fastDiscover: true,
@@ -147,9 +174,8 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
147174
pool.discLookups = make(chan bool, 100)
148175
go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
149176
}
150-
151-
go pool.eventLoop()
152177
pool.checkDial()
178+
go pool.eventLoop()
153179
}
154180

155181
// connect should be called upon any incoming connection. If the connection has been
@@ -158,83 +184,44 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
158184
// Note that whenever a connection has been accepted and a pool entry has been returned,
159185
// disconnect should also always be called.
160186
func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
161-
pool.lock.Lock()
162-
defer pool.lock.Unlock()
163-
entry := pool.entries[p.ID()]
164-
if entry == nil {
165-
entry = pool.findOrNewNode(p.ID(), ip, port)
166-
}
167-
p.Log().Debug("Connecting to new peer", "state", entry.state)
168-
if entry.state == psConnected || entry.state == psRegistered {
187+
log.Debug("Connect new entry", "enode", p.id)
188+
req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)}
189+
select {
190+
case pool.connCh <- req:
191+
case <-pool.quit:
169192
return nil
170193
}
171-
pool.connWg.Add(1)
172-
entry.peer = p
173-
entry.state = psConnected
174-
addr := &poolEntryAddress{
175-
ip: ip,
176-
port: port,
177-
lastSeen: mclock.Now(),
178-
}
179-
entry.lastConnected = addr
180-
entry.addr = make(map[string]*poolEntryAddress)
181-
entry.addr[addr.strKey()] = addr
182-
entry.addrSelect = *newWeightedRandomSelect()
183-
entry.addrSelect.update(addr)
184-
return entry
194+
return <-req.result
185195
}
186196

187197
// registered should be called after a successful handshake
188198
func (pool *serverPool) registered(entry *poolEntry) {
189199
log.Debug("Registered new entry", "enode", entry.id)
190-
pool.lock.Lock()
191-
defer pool.lock.Unlock()
192-
193-
entry.state = psRegistered
194-
entry.regTime = mclock.Now()
195-
if !entry.known {
196-
pool.newQueue.remove(entry)
197-
entry.known = true
200+
req := &registerReq{entry: entry, done: make(chan struct{})}
201+
select {
202+
case pool.registerCh <- req:
203+
case <-pool.quit:
204+
return
198205
}
199-
pool.knownQueue.setLatest(entry)
200-
entry.shortRetry = shortRetryCnt
206+
<-req.done
201207
}
202208

203209
// disconnect should be called when ending a connection. Service quality statistics
204210
// can be updated optionally (not updated if no registration happened, in this case
205211
// only connection statistics are updated, just like in case of timeout)
206212
func (pool *serverPool) disconnect(entry *poolEntry) {
207-
log.Debug("Disconnected old entry", "enode", entry.id)
208-
pool.lock.Lock()
209-
defer pool.lock.Unlock()
210-
211-
if entry.state == psRegistered {
212-
connTime := mclock.Now() - entry.regTime
213-
connAdjust := float64(connTime) / float64(targetConnTime)
214-
if connAdjust > 1 {
215-
connAdjust = 1
216-
}
217-
stopped := false
218-
select {
219-
case <-pool.quit:
220-
stopped = true
221-
default:
222-
}
223-
if stopped {
224-
entry.connectStats.add(1, connAdjust)
225-
} else {
226-
entry.connectStats.add(connAdjust, 1)
227-
}
213+
stopped := false
214+
select {
215+
case <-pool.quit:
216+
stopped = true
217+
default:
228218
}
219+
log.Debug("Disconnected old entry", "enode", entry.id)
220+
req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
229221

230-
entry.state = psNotConnected
231-
if entry.knownSelected {
232-
pool.knownSelected--
233-
} else {
234-
pool.newSelected--
235-
}
236-
pool.setRetryDial(entry)
237-
pool.connWg.Done()
222+
// Block until disconnection request is served.
223+
pool.disconnCh <- req
224+
<-req.done
238225
}
239226

240227
const (
@@ -277,25 +264,51 @@ func (pool *serverPool) eventLoop() {
277264
if pool.discSetPeriod != nil {
278265
pool.discSetPeriod <- time.Millisecond * 100
279266
}
267+
268+
// disconnect updates service quality statistics depending on the connection time
269+
// and disconnection initiator.
270+
disconnect := func(req *disconnReq, stopped bool) {
271+
// Handle peer disconnection requests.
272+
entry := req.entry
273+
if entry.state == psRegistered {
274+
connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
275+
if connAdjust > 1 {
276+
connAdjust = 1
277+
}
278+
if stopped {
279+
// disconnect requested by ourselves.
280+
entry.connectStats.add(1, connAdjust)
281+
} else {
282+
// disconnect requested by server side.
283+
entry.connectStats.add(connAdjust, 1)
284+
}
285+
}
286+
entry.state = psNotConnected
287+
288+
if entry.knownSelected {
289+
pool.knownSelected--
290+
} else {
291+
pool.newSelected--
292+
}
293+
pool.setRetryDial(entry)
294+
pool.connWg.Done()
295+
close(req.done)
296+
}
297+
280298
for {
281299
select {
282300
case entry := <-pool.timeout:
283-
pool.lock.Lock()
284301
if !entry.removed {
285302
pool.checkDialTimeout(entry)
286303
}
287-
pool.lock.Unlock()
288304

289305
case entry := <-pool.enableRetry:
290-
pool.lock.Lock()
291306
if !entry.removed {
292307
entry.delayedRetry = false
293308
pool.updateCheckDial(entry)
294309
}
295-
pool.lock.Unlock()
296310

297311
case adj := <-pool.adjustStats:
298-
pool.lock.Lock()
299312
switch adj.adjustType {
300313
case pseBlockDelay:
301314
adj.entry.delayStats.add(float64(adj.time), 1)
@@ -305,13 +318,10 @@ func (pool *serverPool) eventLoop() {
305318
case pseResponseTimeout:
306319
adj.entry.timeoutStats.add(1, 1)
307320
}
308-
pool.lock.Unlock()
309321

310322
case node := <-pool.discNodes:
311-
pool.lock.Lock()
312323
entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
313324
pool.updateCheckDial(entry)
314-
pool.lock.Unlock()
315325

316326
case conv := <-pool.discLookups:
317327
if conv {
@@ -327,15 +337,66 @@ func (pool *serverPool) eventLoop() {
327337
}
328338
}
329339

340+
case req := <-pool.connCh:
341+
// Handle peer connection requests.
342+
entry := pool.entries[req.p.ID()]
343+
if entry == nil {
344+
entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
345+
}
346+
if entry.state == psConnected || entry.state == psRegistered {
347+
req.result <- nil
348+
continue
349+
}
350+
pool.connWg.Add(1)
351+
entry.peer = req.p
352+
entry.state = psConnected
353+
addr := &poolEntryAddress{
354+
ip: req.ip,
355+
port: req.port,
356+
lastSeen: mclock.Now(),
357+
}
358+
entry.lastConnected = addr
359+
entry.addr = make(map[string]*poolEntryAddress)
360+
entry.addr[addr.strKey()] = addr
361+
entry.addrSelect = *newWeightedRandomSelect()
362+
entry.addrSelect.update(addr)
363+
req.result <- entry
364+
365+
case req := <-pool.registerCh:
366+
// Handle peer registration requests.
367+
entry := req.entry
368+
entry.state = psRegistered
369+
entry.regTime = mclock.Now()
370+
if !entry.known {
371+
pool.newQueue.remove(entry)
372+
entry.known = true
373+
}
374+
pool.knownQueue.setLatest(entry)
375+
entry.shortRetry = shortRetryCnt
376+
close(req.done)
377+
378+
case req := <-pool.disconnCh:
379+
// Handle peer disconnection requests.
380+
disconnect(req, req.stopped)
381+
330382
case <-pool.quit:
331383
if pool.discSetPeriod != nil {
332384
close(pool.discSetPeriod)
333385
}
334-
pool.connWg.Wait()
386+
387+
// Spawn a goroutine to close the disconnCh after all connections are disconnected.
388+
go func() {
389+
pool.connWg.Wait()
390+
close(pool.disconnCh)
391+
}()
392+
393+
// Handle all remaining disconnection requests before exit.
394+
for req := range pool.disconnCh {
395+
disconnect(req, true)
396+
}
335397
pool.saveNodes()
336398
pool.wg.Done()
337399
return
338-
339400
}
340401
}
341402
}

0 commit comments

Comments
 (0)