Skip to content

Commit 6d1e71f

Browse files
bors[bot]montekki
andauthored
Merge #163
163: adds stream::filter_map combinator r=yoshuawuyts a=montekki Implements a `flat_map` combinator. Currently the same about `ret!` as in #162 . Also the naming should probably be `FilterMapStream`, but in that case `Take` stream should also change it's name i guess. Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flat_map Ref: #129 Co-authored-by: Fedor Sakharov <[email protected]>
2 parents a0c9442 + 9b381e4 commit 6d1e71f

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

src/stream/stream/filter_map.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
/// A stream that both filters and maps.
6+
#[derive(Clone, Debug)]
7+
pub struct FilterMap<S, F, T, B> {
8+
stream: S,
9+
f: F,
10+
__from: PhantomData<T>,
11+
__to: PhantomData<B>,
12+
}
13+
14+
impl<S, F, T, B> FilterMap<S, F, T, B> {
15+
pin_utils::unsafe_pinned!(stream: S);
16+
pin_utils::unsafe_unpinned!(f: F);
17+
18+
pub(crate) fn new(stream: S, f: F) -> Self {
19+
FilterMap {
20+
stream,
21+
f,
22+
__from: PhantomData,
23+
__to: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
29+
where
30+
S: futures_core::stream::Stream,
31+
F: FnMut(S::Item) -> Option<B>,
32+
{
33+
type Item = B;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
match next {
38+
Some(v) => match (self.as_mut().f())(v) {
39+
Some(b) => Poll::Ready(Some(b)),
40+
None => {
41+
cx.waker().wake_by_ref();
42+
Poll::Pending
43+
}
44+
},
45+
None => Poll::Ready(None),
46+
}
47+
}
48+
}

src/stream/stream/mod.rs

+39
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
mod all;
2525
mod any;
26+
mod filter_map;
2627
mod min_by;
2728
mod next;
2829
mod nth;
@@ -32,6 +33,7 @@ pub use take::Take;
3233

3334
use all::AllFuture;
3435
use any::AnyFuture;
36+
use filter_map::FilterMap;
3537
use min_by::MinByFuture;
3638
use next::NextFuture;
3739
use nth::NthFuture;
@@ -130,6 +132,43 @@ pub trait Stream {
130132
}
131133
}
132134

135+
/// Both filters and maps a stream.
136+
///
137+
/// # Examples
138+
///
139+
/// Basic usage:
140+
///
141+
/// ```
142+
/// # fn main() { async_std::task::block_on(async {
143+
/// #
144+
/// use std::collections::VecDeque;
145+
/// use async_std::stream::Stream;
146+
///
147+
/// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect();
148+
///
149+
/// let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
150+
///
151+
/// let one = parsed.next().await;
152+
/// assert_eq!(one, Some(1));
153+
///
154+
/// let three = parsed.next().await;
155+
/// assert_eq!(three, Some(3));
156+
///
157+
/// let five = parsed.next().await;
158+
/// assert_eq!(five, Some(5));
159+
///
160+
/// let end = parsed.next().await;
161+
/// assert_eq!(end, None);
162+
/// #
163+
/// # }) }
164+
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
165+
where
166+
Self: Sized,
167+
F: FnMut(Self::Item) -> Option<B>,
168+
{
169+
FilterMap::new(self, f)
170+
}
171+
133172
/// Returns the element that gives the minimum value with respect to the
134173
/// specified comparison function. If several elements are equally minimum,
135174
/// the first element is returned. If the stream is empty, `None` is returned.

0 commit comments

Comments
 (0)