@@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
51
51
bdp : wnd,
52
52
max_bandwidth : 0.0 ,
53
53
rtt : 0.0 ,
54
+ ping_delay : Duration :: from_millis ( 100 ) ,
55
+ stable_count : 0 ,
54
56
} ) ;
55
57
56
- let bytes = bdp. as_ref ( ) . map ( |_| 0 ) ;
58
+ let ( bytes, next_bdp_at) = if bdp. is_some ( ) {
59
+ ( Some ( 0 ) , Some ( Instant :: now ( ) ) )
60
+ } else {
61
+ ( None , None )
62
+ } ;
57
63
58
64
#[ cfg( feature = "runtime" ) ]
59
65
let keep_alive = config. keep_alive_interval . map ( |interval| KeepAlive {
@@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
75
81
is_keep_alive_timed_out : false ,
76
82
ping_pong,
77
83
ping_sent_at : None ,
84
+ next_bdp_at,
78
85
} ) ) ;
79
86
80
87
(
@@ -125,6 +132,9 @@ struct Shared {
125
132
/// If `Some`, bdp is enabled, and this tracks how many bytes have been
126
133
/// read during the current sample.
127
134
bytes : Option < usize > ,
135
+ /// We delay a variable amount of time between BDP pings. This allows us
136
+ /// to send less pings as the bandwidth stabilizes.
137
+ next_bdp_at : Option < Instant > ,
128
138
129
139
// keep-alive
130
140
/// If `Some`, keep-alive is enabled, and the Instant is how long ago
@@ -143,6 +153,12 @@ struct Bdp {
143
153
max_bandwidth : f64 ,
144
154
/// Round trip time in seconds
145
155
rtt : f64 ,
156
+ /// Delay the next ping by this amount.
157
+ ///
158
+ /// This will change depending on how stable the current bandwidth is.
159
+ ping_delay : Duration ,
160
+ /// The count of ping round trips where BDP has stayed the same.
161
+ stable_count : u32 ,
146
162
}
147
163
148
164
#[ cfg( feature = "runtime" ) ]
@@ -207,6 +223,17 @@ impl Recorder {
207
223
#[ cfg( feature = "runtime" ) ]
208
224
locked. update_last_read_at ( ) ;
209
225
226
+ // are we ready to send another bdp ping?
227
+ // if not, we don't need to record bytes either
228
+
229
+ if let Some ( ref next_bdp_at) = locked. next_bdp_at {
230
+ if Instant :: now ( ) < * next_bdp_at {
231
+ return ;
232
+ } else {
233
+ locked. next_bdp_at = None ;
234
+ }
235
+ }
236
+
210
237
if let Some ( ref mut bytes) = locked. bytes {
211
238
* bytes += len;
212
239
} else {
@@ -265,6 +292,7 @@ impl Recorder {
265
292
266
293
impl Ponger {
267
294
pub ( super ) fn poll ( & mut self , cx : & mut task:: Context < ' _ > ) -> Poll < Ponged > {
295
+ let now = Instant :: now ( ) ;
268
296
let mut locked = self . shared . lock ( ) . unwrap ( ) ;
269
297
#[ cfg( feature = "runtime" ) ]
270
298
let is_idle = self . is_idle ( ) ;
@@ -282,13 +310,13 @@ impl Ponger {
282
310
return Poll :: Pending ;
283
311
}
284
312
285
- let ( bytes , rtt ) = match locked. ping_pong . poll_pong ( cx) {
313
+ match locked. ping_pong . poll_pong ( cx) {
286
314
Poll :: Ready ( Ok ( _pong) ) => {
287
- let rtt = locked
315
+ let start = locked
288
316
. ping_sent_at
289
- . expect ( "pong received implies ping_sent_at" )
290
- . elapsed ( ) ;
317
+ . expect ( "pong received implies ping_sent_at" ) ;
291
318
locked. ping_sent_at = None ;
319
+ let rtt = now - start;
292
320
trace ! ( "recv pong" ) ;
293
321
294
322
#[ cfg( feature = "runtime" ) ]
@@ -299,19 +327,20 @@ impl Ponger {
299
327
}
300
328
}
301
329
302
- if self . bdp . is_some ( ) {
330
+ if let Some ( ref mut bdp ) = self . bdp {
303
331
let bytes = locked. bytes . expect ( "bdp enabled implies bytes" ) ;
304
332
locked. bytes = Some ( 0 ) ; // reset
305
333
trace ! ( "received BDP ack; bytes = {}, rtt = {:?}" , bytes, rtt) ;
306
- ( bytes, rtt)
307
- } else {
308
- // no bdp, done!
309
- return Poll :: Pending ;
334
+
335
+ let update = bdp. calculate ( bytes, rtt) ;
336
+ locked. next_bdp_at = Some ( now + bdp. ping_delay ) ;
337
+ if let Some ( update) = update {
338
+ return Poll :: Ready ( Ponged :: SizeUpdate ( update) )
339
+ }
310
340
}
311
341
}
312
342
Poll :: Ready ( Err ( e) ) => {
313
343
debug ! ( "pong error: {}" , e) ;
314
- return Poll :: Pending ;
315
344
}
316
345
Poll :: Pending => {
317
346
#[ cfg( feature = "runtime" ) ]
@@ -324,19 +353,11 @@ impl Ponger {
324
353
}
325
354
}
326
355
}
327
-
328
- return Poll :: Pending ;
329
356
}
330
- } ;
331
-
332
- drop ( locked) ;
333
-
334
- if let Some ( bdp) = self . bdp . as_mut ( ) . and_then ( |bdp| bdp. calculate ( bytes, rtt) ) {
335
- Poll :: Ready ( Ponged :: SizeUpdate ( bdp) )
336
- } else {
337
- // XXX: this doesn't register a waker...?
338
- Poll :: Pending
339
357
}
358
+
359
+ // XXX: this doesn't register a waker...?
360
+ Poll :: Pending
340
361
}
341
362
342
363
#[ cfg( feature = "runtime" ) ]
@@ -386,6 +407,7 @@ impl Bdp {
386
407
fn calculate ( & mut self , bytes : usize , rtt : Duration ) -> Option < WindowSize > {
387
408
// No need to do any math if we're at the limit.
388
409
if self . bdp as usize == BDP_LIMIT {
410
+ self . stabilize_delay ( ) ;
389
411
return None ;
390
412
}
391
413
@@ -405,6 +427,7 @@ impl Bdp {
405
427
406
428
if bw < self . max_bandwidth {
407
429
// not a faster bandwidth, so don't update
430
+ self . stabilize_delay ( ) ;
408
431
return None ;
409
432
} else {
410
433
self . max_bandwidth = bw;
@@ -415,11 +438,26 @@ impl Bdp {
415
438
if bytes >= self . bdp as usize * 2 / 3 {
416
439
self . bdp = ( bytes * 2 ) . min ( BDP_LIMIT ) as WindowSize ;
417
440
trace ! ( "BDP increased to {}" , self . bdp) ;
441
+
442
+ self . stable_count = 0 ;
443
+ self . ping_delay /= 2 ;
418
444
Some ( self . bdp )
419
445
} else {
446
+ self . stabilize_delay ( ) ;
420
447
None
421
448
}
422
449
}
450
+
451
+ fn stabilize_delay ( & mut self ) {
452
+ if self . ping_delay < Duration :: from_secs ( 10 ) {
453
+ self . stable_count += 1 ;
454
+
455
+ if self . stable_count >= 2 {
456
+ self . ping_delay *= 4 ;
457
+ self . stable_count = 0 ;
458
+ }
459
+ }
460
+ }
423
461
}
424
462
425
463
fn seconds ( dur : Duration ) -> f64 {
0 commit comments