Skip to content

Commit c58747b

Browse files
authored
Merge pull request #368 from starsheriff/stream_count
add stream::count
2 parents f49d7cb + 9ebe41f commit c58747b

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed

src/stream/stream/count.rs

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[doc(hidden)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct CountFuture<S> {
13+
#[pin]
14+
stream: S,
15+
count: usize,
16+
}
17+
}
18+
19+
impl<S> CountFuture<S> {
20+
pub(crate) fn new(stream: S) -> Self {
21+
CountFuture { stream, count: 0 }
22+
}
23+
}
24+
25+
impl<S> Future for CountFuture<S>
26+
where
27+
S: Stream,
28+
{
29+
type Output = usize;
30+
31+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32+
let this = self.project();
33+
let next = futures_core::ready!(this.stream.poll_next(cx));
34+
35+
match next {
36+
Some(_) => {
37+
cx.waker().wake_by_ref();
38+
*this.count += 1;
39+
Poll::Pending
40+
}
41+
None => Poll::Ready(*this.count),
42+
}
43+
}
44+
}

src/stream/stream/mod.rs

+29
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod any;
2626
mod chain;
2727
mod cloned;
2828
mod cmp;
29+
mod count;
2930
mod copied;
3031
mod cycle;
3132
mod enumerate;
@@ -68,6 +69,7 @@ mod zip;
6869
use all::AllFuture;
6970
use any::AnyFuture;
7071
use cmp::CmpFuture;
72+
use count::CountFuture;
7173
use cycle::Cycle;
7274
use enumerate::Enumerate;
7375
use eq::EqFuture;
@@ -1889,6 +1891,33 @@ extension_trait! {
18891891
CmpFuture::new(self, other)
18901892
}
18911893

1894+
#[doc = r#"
1895+
Counts the number of elements in the stream.
1896+
1897+
# Examples
1898+
1899+
```
1900+
# fn main() { async_std::task::block_on(async {
1901+
#
1902+
use async_std::prelude::*;
1903+
use async_std::stream;
1904+
1905+
let s1 = stream::from_iter(vec![0]);
1906+
let s2 = stream::from_iter(vec![1, 2, 3]);
1907+
1908+
assert_eq!(s1.count().await, 1);
1909+
assert_eq!(s2.count().await, 3);
1910+
#
1911+
# }) }
1912+
```
1913+
"#]
1914+
fn count(self) -> impl Future<Output = usize> [CountFuture<Self>]
1915+
where
1916+
Self: Sized,
1917+
{
1918+
CountFuture::new(self)
1919+
}
1920+
18921921
#[doc = r#"
18931922
Determines if the elements of this `Stream` are lexicographically
18941923
not equal to those of another.

0 commit comments

Comments
 (0)