Skip to content

Commit b2c7778

Browse files
authored
timer: finish updating timer (#1222)
* timer: restructure feature flags * update timer tests * Add `async-traits` to CI This also disables a buggy `threadpool` test. This test should be fixed in the future. Refs #1225
1 parent 8e7d8af commit b2c7778

27 files changed

+779
-748
lines changed

azure-pipelines.yml

+25-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ jobs:
1919
displayName: Test tokio
2020
cross: true
2121
crates:
22-
- tokio
22+
tokio:
23+
- default
2324

2425
# Test crates that are platform specific
2526
- template: ci/azure-test-stable.yml
@@ -30,11 +31,14 @@ jobs:
3031
rust: $(nightly)
3132
crates:
3233
# - tokio-fs
33-
- tokio-reactor
34+
tokio-reactor:
35+
- default
3436
# - tokio-signal
35-
- tokio-tcp
37+
tokio-tcp:
38+
- default
3639
# - tokio-tls
37-
- tokio-udp
40+
tokio-udp:
41+
- default
3842
# - tokio-uds
3943

4044
# Test crates that are NOT platform specific
@@ -45,15 +49,24 @@ jobs:
4549
rust: $(nightly)
4650
crates:
4751
# - tokio-buf
48-
- tokio-codec
49-
- tokio-current-thread
50-
- tokio-executor
51-
- tokio-io
52-
- tokio-sync
53-
- tokio-macros
52+
tokio-codec:
53+
- default
54+
tokio-current-thread:
55+
- default
56+
tokio-executor:
57+
- default
58+
tokio-io:
59+
- default
60+
tokio-sync:
61+
- default
62+
tokio-macros:
63+
- default
5464
# - tokio-threadpool
55-
- tokio-timer
56-
- tokio-test
65+
tokio-timer:
66+
- default
67+
- async-traits
68+
tokio-test:
69+
- default
5770

5871
# - template: ci/azure-cargo-check.yml
5972
# parameters:

ci/azure-test-stable.yml

+9-6
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ jobs:
3434
- template: azure-patch-crates.yml
3535

3636
- ${{ each crate in parameters.crates }}:
37-
- script: cargo test --lib && cargo test --tests && cargo test --examples
38-
env:
39-
LOOM_MAX_DURATION: 10
40-
CI: 'True'
41-
displayName: cargo test -p ${{ crate }} (PATCHED)
42-
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
37+
- ${{ each feature in crate.value }}:
38+
- script: cargo test --tests --no-default-features --features ${{ feature }}
39+
env:
40+
LOOM_MAX_DURATION: 10
41+
CI: 'True'
42+
displayName: cargo test --tests --features ${{ feature }}
43+
44+
- script: cargo test --examples --no-default-features --features ${{ feature }}
45+
displayName: cargo test --examples --features ${{ feature }}

tokio-sync/tests/lock.rs

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ fn readiness() {
5151
}
5252

5353
#[test]
54+
#[ignore]
5455
fn lock() {
5556
let mut lock = Lock::new(false);
5657

tokio-test/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ publish = false
2424
[dependencies]
2525
assertive = { git = "http://github.com/carllerche/assertive" }
2626
pin-convert = "0.1.0"
27-
# tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
27+
tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
2828
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }

tokio-test/src/clock.rs

+28-12
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
//! });
2121
//! ```
2222
23-
use futures::{future::lazy, Future};
23+
use tokio_executor::park::{Park, Unpark};
24+
use tokio_timer::clock::{Clock, Now};
25+
use tokio_timer::Timer;
26+
2427
use std::marker::PhantomData;
2528
use std::rc::Rc;
2629
use std::sync::{Arc, Mutex};
2730
use std::time::{Duration, Instant};
28-
use tokio_executor::park::{Park, Unpark};
29-
use tokio_timer::clock::{Clock, Now};
30-
use tokio_timer::Timer;
3131

3232
/// Run the provided closure with a `MockClock` that starts at the current time.
3333
pub fn mock<F, R>(f: F) -> R
@@ -123,17 +123,16 @@ impl MockClock {
123123
where
124124
F: FnOnce(&mut Handle) -> R,
125125
{
126-
let mut enter = ::tokio_executor::enter().unwrap();
127-
128-
::tokio_timer::clock::with_default(&self.clock, &mut enter, |enter| {
126+
::tokio_timer::clock::with_default(&self.clock, || {
129127
let park = self.time.mock_park();
130128
let timer = Timer::new(park);
131129
let handle = timer.handle();
132130
let time = self.time.clone();
133131

134-
::tokio_timer::with_default(&handle, enter, |_| {
132+
::tokio_timer::with_default(&handle, || {
135133
let mut handle = Handle::new(timer, time);
136-
lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
134+
f(&mut handle)
135+
// lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
137136
})
138137
})
139138
}
@@ -145,8 +144,13 @@ impl Handle {
145144
}
146145

147146
/// Turn the internal timer and mock park for the provided duration.
148-
pub fn turn(&mut self, duration: Option<Duration>) {
149-
self.timer.turn(duration).unwrap();
147+
pub fn turn(&mut self) {
148+
self.timer.turn(None).unwrap();
149+
}
150+
151+
/// Turn the internal timer and mock park for the provided duration.
152+
pub fn turn_for(&mut self, duration: Duration) {
153+
self.timer.turn(Some(duration)).unwrap();
150154
}
151155

152156
/// Advance the `MockClock` by the provided duration.
@@ -156,14 +160,26 @@ impl Handle {
156160

157161
while inner.lock().unwrap().now() < deadline {
158162
let dur = deadline - inner.lock().unwrap().now();
159-
self.turn(Some(dur));
163+
self.turn_for(dur);
160164
}
161165
}
162166

167+
/// Returns the total amount of time the time has been advanced.
168+
pub fn advanced(&self) -> Duration {
169+
self.time.inner.lock().unwrap().advance
170+
}
171+
163172
/// Get the currently mocked time
164173
pub fn now(&mut self) -> Instant {
165174
self.time.now()
166175
}
176+
177+
/// Turn the internal timer once, but force "parking" for `duration` regardless of any pending
178+
/// timeouts
179+
pub fn park_for(&mut self, duration: Duration) {
180+
self.time.inner.lock().unwrap().park_for = Some(duration);
181+
self.turn()
182+
}
167183
}
168184

169185
impl MockTime {

tokio-test/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! assert_ready!(fut.poll());
2121
//! ```
2222
23-
// pub mod clock;
23+
pub mod clock;
2424
mod macros;
2525
pub mod task;
2626

tokio-test/src/macros.rs

+4-29
Original file line numberDiff line numberDiff line change
@@ -74,41 +74,16 @@ macro_rules! assert_pending {
7474
}};
7575
}
7676

77-
/*
7877
/// Assert if a poll is ready and check for equality on the value
7978
#[macro_export]
8079
macro_rules! assert_ready_eq {
8180
($e:expr, $expect:expr) => {
82-
use $crate::codegen::futures::Async::Ready;
83-
match $e {
84-
Ok(e) => assert_eq!(e, Ready($expect)),
85-
Err(e) => panic!("error = {:?}", e),
86-
}
81+
let val = $crate::assert_ready!($e);
82+
assert_eq!(val, $expect)
8783
};
8884

8985
($e:expr, $expect:expr, $($msg:tt),+) => {
90-
use $crate::codegen::futures::Async::Ready;
91-
match $e {
92-
Ok(e) => assert_eq!(e, Ready($expect), $($msg)+),
93-
Err(e) => {
94-
let msg = format_args!($($msg),+);
95-
panic!("error = {:?}; {}", e, msg)
96-
}
97-
}
98-
};
99-
}
100-
*/
101-
102-
/*
103-
/// Assert if the deadline has passed
104-
#[macro_export]
105-
macro_rules! assert_elapsed {
106-
($e:expr) => {
107-
assert!($e.unwrap_err().is_elapsed());
108-
};
109-
110-
($e:expr, $($msg:expr),+) => {
111-
assert!($e.unwrap_err().is_elapsed(), $msg);
86+
let val = $crate::assert_ready!($e);
87+
assert_eq!(val, $expect, $($msg),*)
11288
};
11389
}
114-
*/

tokio-threadpool/examples/depth.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(features = "broken")]
2+
13
extern crate env_logger;
24
extern crate futures;
35
extern crate tokio_threadpool;

tokio-threadpool/examples/hello.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(features = "broken")]
2+
13
extern crate env_logger;
24
extern crate futures;
35
extern crate tokio_threadpool;

tokio-timer/Cargo.toml

+6-16
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,22 @@ Timer facilities for Tokio
2222
publish = false
2323

2424
[features]
25-
# individual `Stream` impls if you so desire
26-
delay-queue = ["futures-core-preview"]
27-
interval = ["futures-core-preview"]
28-
timeout-stream = ["futures-core-preview"]
29-
throttle = ["futures-core-preview"]
30-
31-
# easily enable all `Stream` impls
32-
streams = [
33-
"delay-queue",
34-
"interval",
35-
"timeout-stream",
36-
"throttle",
37-
]
25+
async-traits = ["futures-core-preview"]
3826

3927
[dependencies]
4028
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }
4129
tokio-sync = { version = "0.2.0", path = "../tokio-sync" }
42-
crossbeam-utils = "0.6.0"
4330

31+
async-util = { git = "https://github.com/tokio-rs/async" }
32+
crossbeam-utils = "0.6.0"
4433
# Backs `DelayQueue`
4534
slab = "0.4.1"
46-
4735
# optionals
4836
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }
4937

5038
[dev-dependencies]
5139
rand = "0.6"
52-
tokio-mock-task = "0.1.0"
5340
tokio = { version = "0.2.0", path = "../tokio" }
41+
tokio-current-thread = { version = "0.2.0", path = "../tokio-current-thread" }
42+
tokio-sync = { version = "0.2.0", path = "../tokio-sync", features = ["async-traits"] }
43+
tokio-test = { version = "0.2.0", path = "../tokio-test" }

tokio-timer/src/clock/clock.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::cell::Cell;
44
use std::fmt;
55
use std::sync::Arc;
66
use std::time::Instant;
7-
use tokio_executor::Enter;
87

98
/// A handle to a source of time.
109
///
@@ -108,9 +107,9 @@ impl fmt::Debug for Clock {
108107
/// # Panics
109108
///
110109
/// This function panics if there already is a default clock set.
111-
pub fn with_default<F, R>(clock: &Clock, enter: &mut Enter, f: F) -> R
110+
pub fn with_default<F, R>(clock: &Clock, f: F) -> R
112111
where
113-
F: FnOnce(&mut Enter) -> R,
112+
F: FnOnce() -> R,
114113
{
115114
CLOCK.with(|cell| {
116115
assert!(
@@ -132,6 +131,6 @@ where
132131

133132
cell.set(Some(clock as *const Clock));
134133

135-
f(enter)
134+
f()
136135
})
137136
}

tokio-timer/src/delay.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl Delay {
7474
}
7575

7676
// Used by `Timeout<Stream>`
77-
#[cfg(feature = "timeout-stream")]
77+
#[cfg(feature = "async-traits")]
7878
pub(crate) fn reset_timeout(&mut self) {
7979
self.registration.reset_timeout();
8080
}

tokio-timer/src/delay_queue.rs

+33-17
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::clock::now;
88
use crate::timer::Handle;
99
use crate::wheel::{self, Wheel};
1010
use crate::{Delay, Error};
11-
use futures_core::Stream;
11+
1212
use slab::Slab;
1313
use std::cmp;
1414
use std::future::Future;
@@ -347,6 +347,34 @@ impl<T> DelayQueue<T> {
347347
Key::new(key)
348348
}
349349

350+
/// TODO: Dox... also is the fn signature correct?
351+
pub fn poll_next(
352+
&mut self,
353+
cx: &mut task::Context<'_>,
354+
) -> Poll<Option<Result<Expired<T>, Error>>> {
355+
let item = ready!(self.poll_idx(cx));
356+
Poll::Ready(item.map(|result| {
357+
result.map(|idx| {
358+
let data = self.slab.remove(idx);
359+
debug_assert!(data.next.is_none());
360+
debug_assert!(data.prev.is_none());
361+
362+
Expired {
363+
key: Key::new(idx),
364+
data: data.inner,
365+
deadline: self.start + Duration::from_millis(data.when),
366+
}
367+
})
368+
}))
369+
}
370+
371+
/// TODO: Dox... also is the fn signature correct?
372+
pub async fn next(&mut self) -> Option<Result<Expired<T>, Error>> {
373+
use async_util::future::poll_fn;
374+
375+
poll_fn(|cx| self.poll_next(cx)).await
376+
}
377+
350378
/// Insert `value` into the queue set to expire after the requested duration
351379
/// elapses.
352380
///
@@ -696,26 +724,14 @@ impl<T> DelayQueue<T> {
696724
// We never put `T` in a `Pin`...
697725
impl<T> Unpin for DelayQueue<T> {}
698726

699-
impl<T> Stream for DelayQueue<T> {
727+
#[cfg(feature = "async-traits")]
728+
impl<T> futures_core::Stream for DelayQueue<T> {
700729
// DelayQueue seems much more specific, where a user may care that it
701730
// has reached capacity, so return those errors instead of panicking.
702731
type Item = Result<Expired<T>, Error>;
703732

704-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
705-
let item = ready!(self.poll_idx(cx));
706-
Poll::Ready(item.map(|result| {
707-
result.map(|idx| {
708-
let data = self.slab.remove(idx);
709-
debug_assert!(data.next.is_none());
710-
debug_assert!(data.prev.is_none());
711-
712-
Expired {
713-
key: Key::new(idx),
714-
data: data.inner,
715-
deadline: self.start + Duration::from_millis(data.when),
716-
}
717-
})
718-
}))
733+
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
734+
DelayQueue::poll_next(self.get_mut(), cx)
719735
}
720736
}
721737

0 commit comments

Comments
 (0)