Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: request id dispatcher and direct req/reply APIs #23576

Merged
merged 3 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 45 additions & 30 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// timeoutGracePeriod is the amount of time to allow for a peer to deliver a
// response to a locally already timed out request. Timeouts are not penalized
// as a peer might be temporarilly overloaded, however, they still must reply
// as a peer might be temporarily overloaded, however, they still must reply
// to each request. Failing to do so is considered a protocol violation.
var timeoutGracePeriod = 2 * time.Minute

Expand Down Expand Up @@ -81,12 +81,12 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
responses := make(chan *eth.Response)

// Track the currently active requests and their timeout order
requests := make(map[string]*eth.Request)
pending := make(map[string]*eth.Request)
defer func() {
// Abort all requests on sync cycle cancellation. The requests will still
// Abort all requests on sync cycle cancellation. The requests may still
// be fulfilled by the remote side, but the dispatcher will not wait to
// deliver them since nobody's going to be listening.
for _, req := range requests {
for _, req := range pending {
req.Close()
}
}()
Expand All @@ -101,13 +101,20 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
defer timeout.Stop()

// Track the peers with in-flight requests. We can't use the `requests` map
// as certain malicious (or faulty) peers sometimes flat out refuse to send
// any response back, forever stalling the pending request until the peer
// disconnects. By keeping the idle tracking separate, we can cleanly stop
// the fetchers even if a malicious peer is stalling.
flights := make(map[string]time.Time)

// Track the timed-out but not-yet-answered requests separately. We want to
// keep tracking which peers are busy (potentially overloaded), so removing
// all trace of a timed out request is not good. We also can't just cancel
// the pending request altogether as that would prevent a late response from
// being delivered, thus never unblocking the peer.
stales := make(map[string]*eth.Request)
defer func() {
// Abort all requests on sync cycle cancellation. The requests may still
// be fulfilled by the remote side, but the dispatcher will not wait to
// deliver them since nobody's going to be listening.
for _, req := range stales {
req.Close()
}
}()
// Subscribe to peer lifecycle events to schedule tasks to new joiners and
// reschedule tasks upon disconnections. We don't care which event happened
// for simplicity, so just use a single channel.
Expand All @@ -125,7 +132,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
// If there's nothing more to fetch, wait or terminate
if queue.pending() == 0 {
if len(requests) == 0 && finished {
if len(pending) == 0 && finished {
return nil
}
} else {
Expand All @@ -135,15 +142,18 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
caps []int
)
for _, peer := range d.peers.AllPeers() {
if sent, ok := flights[peer.id]; !ok {
pending, stale := pending[peer.id], stales[peer.id]
if pending == nil && stale == nil {
idles = append(idles, peer)
caps = append(caps, queue.capacity(peer, time.Second))
} else if waited := time.Since(sent); waited > timeoutGracePeriod {
// Request has been in flight longer than the grace period
// permitted it, consider the peer malicious attempting to
// stall the sync.
peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
d.dropPeer(peer.id)
} else if stale != nil {
if waited := time.Since(stale.Sent); waited > timeoutGracePeriod {
// Request has been in flight longer than the grace period
// permitted it, consider the peer malicious attempting to
// stall the sync.
peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
d.dropPeer(peer.id)
}
}
}
sort.Sort(&peerCapacitySort{idles, caps})
Expand Down Expand Up @@ -187,8 +197,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
continue
}
requests[peer.id] = req
flights[peer.id] = time.Now()
pending[peer.id] = req

ttl := d.peers.rates.TargetTimeout()
ordering[req] = timeouts.Size()
Expand All @@ -200,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && len(requests) == 0 && len(idles) == d.peers.Len() && queued > 0 {
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 {
return errPeersUnavailable
}
}
Expand All @@ -219,17 +228,20 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {

if event.join {
// Sanity check the internal state; this can be dropped later
if _, ok := requests[peerid]; ok {
if _, ok := pending[peerid]; ok {
event.peer.log.Error("Pending request exists for joining peer")
}
if _, ok := stales[peerid]; ok {
event.peer.log.Error("Stale request exists for joining peer")
}
// Loop back to the entry point for task assignment
continue
}
// A peer left, any existing requests need to be untracked, pending
// tasks returned and possible reassignment checked
if req, ok := requests[peerid]; ok {
if req, ok := pending[peerid]; ok {
queue.unreserve(peerid) // TODO(karalabe): This needs a non-expiration method
delete(requests, peerid)
delete(pending, peerid)
req.Close()

if index, live := ordering[req]; live {
Expand All @@ -246,7 +258,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
delete(ordering, req)
}
}
delete(flights, peerid)
if req, ok := stales[peerid]; ok {
delete(stales, peerid)
req.Close()
}

case <-timeout.C:
// Retrieve the next request which should have timed out. The check
Expand All @@ -265,9 +280,9 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
// cancel it, so it's not considered in-flight anymore, but keep
// the peer marked busy to prevent assigning a second request and
// overloading it further.
delete(requests, req.Peer)
delete(pending, req.Peer)
stales[req.Peer] = req
Comment on lines +283 to +284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change here, afaict the comment is now misleading. We don't cancel it so it's not considered in-flight anymore, since we no longer call req.Close() on it. I suppose the caller keeps waiting for the reply, even though this internally has been moved to stale? And after a little while longer, the peer will be rejected, prompting the stale request to be Closed and rescheduled?

delete(ordering, req)
req.Close()

timeouts.Pop()
if timeouts.Size() > 0 {
Expand Down Expand Up @@ -332,8 +347,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
delete(ordering, res.Req)
}
// Delete the pending request (if it still exists) and mark the peer idle
delete(requests, res.Req.Peer)
delete(flights, res.Req.Peer)
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)

// Signal the dispatcher that the round trip is done. We'll drop the
// peer if the data turns out to be junk.
Expand Down
61 changes: 49 additions & 12 deletions eth/protocols/eth/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var (
// Request is a pending request to allow tracking it and delivering a response
// back to the requester on their chosen channel.
type Request struct {
id uint64 // Request ID to match up replies to
sent time.Time // Timestamp when the request was sent
peer *Peer // Peer to which this request belogs for untracking
id uint64 // Request ID to match up replies to

sink chan *Response // Channel to deliver the response on
cancel chan struct{} // Channel to cancel requests ahead of time
Expand All @@ -51,15 +51,30 @@ type Request struct {
want uint64 // Message code of the response packet
data interface{} // Data content of the request packet

Peer string // Demultiplexer if cross-peer requests are batched together
Peer string // Demultiplexer if cross-peer requests are batched together
Sent time.Time // Timestamp when the request was sent
}

// Close aborts an in-flight request. Although there's no way to notify the
// remote peer about the cancellation, this method notifies the dispatcher to
// discard any late responses.
func (r *Request) Close() {
if r.cancel != nil { // Nil only in external packet tests
func (r *Request) Close() error {
if r.peer == nil { // Tests mock out the dispatcher, skip internal cancellation
return nil
}
cancelOp := &cancel{
id: r.id,
fail: make(chan error),
}
select {
case r.peer.reqCancel <- cancelOp:
if err := <-cancelOp.fail; err != nil {
return err
}
close(r.cancel)
return nil
case <-r.peer.term:
return errDisconnected
}
}

Expand All @@ -70,6 +85,13 @@ type request struct {
fail chan error
}

// cancel is a maintenance type on the dispatcher to stop tracking a pending
// request.
type cancel struct {
id uint64 // Request ID to stop tracking
fail chan error
}

// Response is a reply packet to a previously created request. It is delivered
// on the channel assigned by the requester subsystem and contains the original
// request embedded to allow uniquely matching it caller side.
Expand Down Expand Up @@ -102,6 +124,7 @@ func (p *Peer) dispatchRequest(req *Request) error {
fail: make(chan error),
}
req.cancel = make(chan struct{})
req.peer = p
req.Peer = p.id

select {
Expand Down Expand Up @@ -153,17 +176,17 @@ func (p *Peer) dispatchResponse(res *Response) error {
}
}

// dispatchRequests is a loop that accepts requests from higher layer packages,
// pushes it to the network and tracks and dispatches the responses back to the
// original requester.
func (p *Peer) dispatchRequests() {
// dispatcher is a loop that accepts requests from higher layer packages, pushes
// it to the network and tracks and dispatches the responses back to the original
// requester.
func (p *Peer) dispatcher() {
pending := make(map[uint64]*Request)

for {
select {
case reqOp := <-p.reqDispatch:
req := reqOp.req
req.sent = time.Now()
req.Sent = time.Now()

requestTracker.Track(p.id, p.version, req.code, req.want, req.id)
err := p2p.Send(p.rw, req.code, req.data)
Expand All @@ -173,10 +196,24 @@ func (p *Peer) dispatchRequests() {
pending[req.id] = req
}

case cancelOp := <-p.reqCancel:
// Retrieve the pendign request to cancel and short circuit if it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/pendign/pending

// has already been serviced and is not available anymore
req := pending[cancelOp.id]
if req == nil {
cancelOp.fail <- nil
continue
}
// Stop tracking the request
delete(pending, cancelOp.id)
cancelOp.fail <- nil

case resOp := <-p.resDispatch:
res := resOp.res
res.Req = pending[res.id]
res.Time = res.recv.Sub(res.Req.sent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the response to a stale request comes in -- we just drop it, right? I don't quite follow the logic. So we shuffle it over from pending to stale, but we still don't (immediately) reschedule it, but neither do we handle a late response?
I must have missed something, but it seems like moving something to stale unconditionally means it will time out a bit later, leading to the peer being rejected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad. The pending here is dispatcher-local pending, it has no relation to the pending/stales in the downloader.

// Independent if the request exists or not, track this packet
requestTracker.Fulfil(p.id, p.version, res.code, res.id)

switch {
case res.Req == nil:
Expand All @@ -196,10 +233,10 @@ func (p *Peer) dispatchRequests() {
// All dispatcher checks passed and the response was initialized
// with the matching request. Signal to the delivery routine that
// it can wait for a handler response and dispatch the data.
res.Time = res.recv.Sub(res.Req.Sent)
resOp.fail <- nil

// Stop tracking the request, the response dispatcher will deliver
requestTracker.Fulfil(p.id, p.version, res.code, res.id)
delete(pending, res.id)
}

Expand Down
4 changes: 3 additions & 1 deletion eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Peer struct {
txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests

reqDispatch chan *request // Dispatch channel to send requests and track then until fulfilment
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them

term chan struct{} // Termination channel to stop the broadcasters
Expand All @@ -106,6 +107,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
term: make(chan struct{}),
Expand All @@ -114,7 +116,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
go peer.broadcastBlocks()
go peer.broadcastTransactions()
go peer.announceTransactions()
go peer.dispatchRequests()
go peer.dispatcher()

return peer
}
Expand Down