Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: adaptive quality of service tuning #2630

Merged
merged 1 commit into from
Jun 6, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 119 additions & 25 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -54,14 +54,15 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired

headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts

qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value

maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -113,7 +114,8 @@ type Downloader struct {
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails int // Number of fast sync failures in the critical section

interrupt int32 // Atomic boolean to signal termination
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
@@ -159,6 +161,9 @@ type Downloader struct {
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers

quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@@ -172,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {

return &Downloader{
dl := &Downloader{
mode: FullSync,
mux: mux,
queue: newQueue(stateDb),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader,
hasBlockAndState: hasBlockAndState,
getHeader: getHeader,
@@ -203,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
}
go dl.qosTuner()
return dl
}

// Progress retrieves the synchronisation boundaries, specifically the origin
@@ -250,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
d.qosReduceConfidence()

return nil
}

@@ -515,7 +527,16 @@ func (d *Downloader) cancel() {
// Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
// Close the termination channel (make sure double close is allowed)
d.quitLock.Lock()
select {
case <-d.quitCh:
default:
close(d.quitCh)
}
d.quitLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between the cancel channel and the quit channel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering my own question: quit is permanent, cancel isn't. But can't we use cancel to exit the qos adaptation goroutine?


// Cancel any pending download requests
d.cancel()
}

@@ -932,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
if request == nil {
continue
}
@@ -973,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false)

timeout := time.After(headerTTL)
timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
@@ -1041,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {

// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
timeout := time.After(hashTTL)
timeout := time.After(d.requestTTL())

for finished := false; !finished; {
select {
@@ -1118,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2

timeout := time.After(hashTTL)
timeout := time.After(d.requestTTL())
go p.getAbsHeaders(uint64(check), 1, 0, false)

// Wait until a reply arrives to this request
@@ -1199,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {

getHeaders := func(from uint64) {
request = time.Now()
timeout.Reset(headerTTL)
timeout.Reset(d.requestTTL())

if skeleton {
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
@@ -1311,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peer) int { return p.HeaderCapacity() }
capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
@@ -1341,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
@@ -1365,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() }
capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
@@ -1417,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
}
})
}
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() }
capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
@@ -1799,8 +1820,10 @@ func (d *Downloader) processContent() error {
}
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
select {
case <-d.quitCh:
return errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
var (
@@ -1901,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
return errNoSyncActive
}
}

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

// A new RTT cycle passed, increase our confidence in the estimated RTT
conf := atomic.LoadUint64(&d.rttConfidence)
conf = conf + (1000000-conf)/2
atomic.StoreUint64(&d.rttConfidence, conf)

// Log the new QoS values and sleep until the next RTT
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
select {
case <-d.quitCh:
return
case <-time.After(rtt):
}
}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
// If we have a single peer, confidence is always 1
peers := uint64(d.peers.Len())
if peers == 1 {
atomic.StoreUint64(&d.rttConfidence, 1000000)
return
}
// If we have a ton of peers, don't drop confidence)
if peers >= uint64(qosConfidenceCap) {
return
}
// Otherwise drop the confidence factor
conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
if float64(conf)/1000000 < rttMinConfidence {
conf = uint64(rttMinConfidence * 1000000)
}
atomic.StoreUint64(&d.rttConfidence, conf)

rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preffered (stabler download stream).
func (d *Downloader) requestRTT() time.Duration {
return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
var (
rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
)
ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
if ttl > ttlLimit {
ttl = ttlLimit
}
return ttl
}
Loading