From 224287bff5eb7fc287be8087d09f646f32160f1f Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 5 Jul 2019 12:01:03 -0700 Subject: [PATCH 1/6] Reduce the genericity of WhileSomeFolder::consume_iter's closure The `take_while` closure only needs to be generic in `T`, along with a captured `&AtomicBool`. When we write the closure directly, it carries all the type baggage of `WhileSomeFolder<'f, C>::consumer_iter`, where the monomorphized type may explode. Instead, we can move this closure to a standalone function for reduced genericity. --- src/iter/while_some.rs | 19 +++++++++++-------- tests/issue671.rs | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 8 deletions(-) create mode 100644 tests/issue671.rs diff --git a/src/iter/while_some.rs b/src/iter/while_some.rs index 94ccdb323..0268af8d5 100644 --- a/src/iter/while_some.rs +++ b/src/iter/while_some.rs @@ -126,16 +126,19 @@ where where I: IntoIterator>, { - let full = self.full; + fn some(full: &AtomicBool) -> impl Fn(&Option) -> bool + '_ { + move |x| match *x { + Some(_) => !full.load(Ordering::Relaxed), + None => { + full.store(true, Ordering::Relaxed); + false + } + } + } + self.base = self.base.consume_iter( iter.into_iter() - .take_while(|x| match *x { - Some(_) => !full.load(Ordering::Relaxed), - None => { - full.store(true, Ordering::Relaxed); - false - } - }) + .take_while(some(self.full)) .map(Option::unwrap), ); self diff --git a/tests/issue671.rs b/tests/issue671.rs new file mode 100644 index 000000000..cc2d9a32f --- /dev/null +++ b/tests/issue671.rs @@ -0,0 +1,16 @@ +extern crate rayon; + +use rayon::prelude::*; + +#[test] +fn type_length_limit() { + let _ = Vec::>::new() + .into_par_iter() + .map(|x| x) + .map(|x| x) + .map(|x| x) + .map(|x| x) + .map(|x| x) + .map(|x| x) + .collect::>(); +} From d2c142479a9748f473b3c319b11ceb514339ccb3 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 5 Jul 2019 12:36:24 -0700 Subject: [PATCH 2/6] Reduce genericity in FromParallelIterator for Result --- src/result.rs | 16 ++++++++++------ tests/issue671.rs | 2 ++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/result.rs b/src/result.rs index 054900f69..e0a63dbc7 100644 --- a/src/result.rs +++ b/src/result.rs @@ -100,23 +100,27 @@ where where I: IntoParallelIterator>, { - let saved_error = Mutex::new(None); - let collection = par_iter - .into_par_iter() - .map(|item| match item { + fn ok(saved: &Mutex>) -> impl Fn(Result) -> Option + '_ { + move |item| match item { Ok(item) => Some(item), Err(error) => { // We don't need a blocking `lock()`, as anybody // else holding the lock will also be writing // `Some(error)`, and then ours is irrelevant. - if let Ok(mut guard) = saved_error.try_lock() { + if let Ok(mut guard) = saved.try_lock() { if guard.is_none() { *guard = Some(error); } } None } - }) + } + } + + let saved_error = Mutex::new(None); + let collection = par_iter + .into_par_iter() + .map(ok(&saved_error)) .while_some() .collect(); diff --git a/tests/issue671.rs b/tests/issue671.rs index cc2d9a32f..a2000d88f 100644 --- a/tests/issue671.rs +++ b/tests/issue671.rs @@ -1,3 +1,5 @@ +#![type_length_limit = "500000"] + extern crate rayon; use rayon::prelude::*; From 249fbb906d3cb292e57779f2dde9208db6b43fc1 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 10 Jul 2019 13:56:54 -0700 Subject: [PATCH 3/6] Reduce the genericity of extend closures Most of these closures only need to be generic in the item type, but when created in the context of some `I` generic iterator, they inherit that genericity. This affects both type length and code duplication. --- src/iter/collect/mod.rs | 23 +----- src/iter/extend.rs | 179 +++++++++++++++++++++------------------- tests/issue671-unzip.rs | 19 +++++ 3 files changed, 118 insertions(+), 103 deletions(-) create mode 100644 tests/issue671-unzip.rs diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs index 12be9f122..cab25e86d 100644 --- a/src/iter/collect/mod.rs +++ b/src/iter/collect/mod.rs @@ -1,5 +1,4 @@ use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; -use std::collections::LinkedList; use std::slice; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -138,24 +137,10 @@ where } None => { // This works like `extend`, but `Vec::append` is more efficient. - let list: LinkedList<_> = par_iter - .fold(Vec::new, |mut vec, elem| { - vec.push(elem); - vec - }) - .map(|vec| { - let mut list = LinkedList::new(); - list.push_back(vec); - list - }) - .reduce(LinkedList::new, |mut list1, mut list2| { - list1.append(&mut list2); - list1 - }); - - self.reserve(list.iter().map(Vec::len).sum()); - for mut vec in list { - self.append(&mut vec); + let list = super::extend::collect(par_iter); + self.reserve(super::extend::len(&list)); + for ref mut vec in list { + self.append(vec); } } } diff --git a/src/iter/extend.rs b/src/iter/extend.rs index bcd8ccb80..82221e1ae 100644 --- a/src/iter/extend.rs +++ b/src/iter/extend.rs @@ -14,41 +14,49 @@ where F: FnOnce(&mut C, &LinkedList>), C: Extend, { - let list = par_iter - .into_par_iter() - .fold(Vec::new, |mut vec, elem| { - vec.push(elem); - vec - }) - .map(|vec| { - let mut list = LinkedList::new(); - list.push_back(vec); - list - }) - .reduce(LinkedList::new, |mut list1, mut list2| { - list1.append(&mut list2); - list1 - }); - + let list = collect(par_iter); reserve(collection, &list); for vec in list { collection.extend(vec); } } +pub(super) fn collect(par_iter: I) -> LinkedList> +where + I: IntoParallelIterator, +{ + par_iter + .into_par_iter() + .fold(Vec::new, vec_push) + .map(as_list) + .reduce(LinkedList::new, list_append) +} + +fn vec_push(mut vec: Vec, elem: T) -> Vec { + vec.push(elem); + vec +} + +fn as_list(item: T) -> LinkedList { + let mut list = LinkedList::new(); + list.push_back(item); + list +} + +fn list_append(mut list1: LinkedList, ref mut list2: LinkedList) -> LinkedList { + list1.append(list2); + list1 +} + /// Compute the total length of a `LinkedList>`. -fn len(list: &LinkedList>) -> usize { +pub(super) fn len(list: &LinkedList>) -> usize { list.iter().map(Vec::len).sum() } -/// Compute the total string length of a `LinkedList>>`. -fn str_len(list: &LinkedList>) -> usize -where - T: AsRef, -{ - list.iter() - .flat_map(|vec| vec.iter().map(|s| s.as_ref().len())) - .sum() +fn no_reserve(_: &mut C, _: &LinkedList>) {} + +fn heap_reserve(heap: &mut BinaryHeap, list: &LinkedList>) { + heap.reserve(len(list)); } /// Extend a binary heap with items from a parallel iterator. @@ -60,7 +68,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |heap, list| heap.reserve(len(list))); + extend(self, par_iter, heap_reserve); } } @@ -73,7 +81,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |heap, list| heap.reserve(len(list))); + extend(self, par_iter, heap_reserve); } } @@ -87,12 +95,12 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |_, _| {}); + extend(self, par_iter, no_reserve); } } /// Extend a B-tree map with copied items from a parallel iterator. -impl<'a, K, V> ParallelExtend<(&'a K, &'a V)> for BTreeMap +impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap where K: Copy + Ord + Send + Sync, V: Copy + Send + Sync, @@ -101,7 +109,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |_, _| {}); + extend(self, par_iter, no_reserve); } } @@ -114,7 +122,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |_, _| {}); + extend(self, par_iter, no_reserve); } } @@ -127,10 +135,18 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |_, _| {}); + extend(self, par_iter, no_reserve); } } +fn map_reserve(map: &mut HashMap, list: &LinkedList>) +where + K: Eq + Hash, + S: BuildHasher, +{ + map.reserve(len(list)); +} + /// Extend a hash map with items from a parallel iterator. impl ParallelExtend<(K, V)> for HashMap where @@ -143,12 +159,12 @@ where I: IntoParallelIterator, { // See the map_collect benchmarks in rayon-demo for different strategies. - extend(self, par_iter, |map, list| map.reserve(len(list))); + extend(self, par_iter, map_reserve); } } /// Extend a hash map with copied items from a parallel iterator. -impl<'a, K, V, S> ParallelExtend<(&'a K, &'a V)> for HashMap +impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap where K: Copy + Eq + Hash + Send + Sync, V: Copy + Send + Sync, @@ -158,10 +174,18 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |map, list| map.reserve(len(list))); + extend(self, par_iter, map_reserve); } } +fn set_reserve(set: &mut HashSet, list: &LinkedList>) +where + T: Eq + Hash, + S: BuildHasher, +{ + set.reserve(len(list)); +} + /// Extend a hash set with items from a parallel iterator. impl ParallelExtend for HashSet where @@ -172,7 +196,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |set, list| set.reserve(len(list))); + extend(self, par_iter, set_reserve); } } @@ -186,10 +210,15 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |set, list| set.reserve(len(list))); + extend(self, par_iter, set_reserve); } } +fn list_push_back(mut list: LinkedList, elem: T) -> LinkedList { + list.push_back(elem); + list +} + /// Extend a linked list with items from a parallel iterator. impl ParallelExtend for LinkedList where @@ -201,14 +230,8 @@ where { let mut list = par_iter .into_par_iter() - .fold(LinkedList::new, |mut list, elem| { - list.push_back(elem); - list - }) - .reduce(LinkedList::new, |mut list1, mut list2| { - list1.append(&mut list2); - list1 - }); + .fold(LinkedList::new, list_push_back) + .reduce(LinkedList::new, list_append); self.append(&mut list); } } @@ -226,6 +249,11 @@ where } } +fn string_push(mut string: String, ch: char) -> String { + string.push(ch); + string +} + /// Extend a string with characters from a parallel iterator. impl ParallelExtend for String { fn par_extend(&mut self, par_iter: I) @@ -236,19 +264,9 @@ impl ParallelExtend for String { // with than `String`, so instead collect to `LinkedList`. let list: LinkedList<_> = par_iter .into_par_iter() - .fold(String::new, |mut string, ch| { - string.push(ch); - string - }) - .map(|vec| { - let mut list = LinkedList::new(); - list.push_back(vec); - list - }) - .reduce(LinkedList::new, |mut list1, mut list2| { - list1.append(&mut list2); - list1 - }); + .fold(String::new, string_push) + .map(as_list) + .reduce(LinkedList::new, list_append); self.reserve(list.iter().map(String::len).sum()); self.extend(list) @@ -265,13 +283,23 @@ impl<'a> ParallelExtend<&'a char> for String { } } +fn string_reserve>(string: &mut String, list: &LinkedList>) { + let len = list + .iter() + .flat_map(|vec| vec) + .map(T::as_ref) + .map(str::len) + .sum(); + string.reserve(len); +} + /// Extend a string with string slices from a parallel iterator. impl<'a> ParallelExtend<&'a str> for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { - extend(self, par_iter, |string, list| string.reserve(str_len(list))); + extend(self, par_iter, string_reserve); } } @@ -281,7 +309,7 @@ impl ParallelExtend for String { where I: IntoParallelIterator, { - extend(self, par_iter, |string, list| string.reserve(str_len(list))); + extend(self, par_iter, string_reserve); } } @@ -291,31 +319,14 @@ impl<'a> ParallelExtend> for String { where I: IntoParallelIterator>, { - // This is like `extend`, but `Extend> for String` - // wasn't added until Rust 1.19, so we can't use it directly yet. - let list = par_iter - .into_par_iter() - .fold(Vec::new, |mut vec, elem| { - vec.push(elem); - vec - }) - .map(|vec| { - let mut list = LinkedList::new(); - list.push_back(vec); - list - }) - .reduce(LinkedList::new, |mut list1, mut list2| { - list1.append(&mut list2); - list1 - }); - - self.reserve(str_len(&list)); - for vec in list { - self.extend(vec.iter().map(|cow| &**cow)); - } + extend(self, par_iter, string_reserve); } } +fn deque_reserve(deque: &mut VecDeque, list: &LinkedList>) { + deque.reserve(len(list)); +} + /// Extend a deque with items from a parallel iterator. impl ParallelExtend for VecDeque where @@ -325,7 +336,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |deque, list| deque.reserve(len(list))); + extend(self, par_iter, deque_reserve); } } @@ -338,7 +349,7 @@ where where I: IntoParallelIterator, { - extend(self, par_iter, |deque, list| deque.reserve(len(list))); + extend(self, par_iter, deque_reserve); } } diff --git a/tests/issue671-unzip.rs b/tests/issue671-unzip.rs new file mode 100644 index 000000000..83353d7ea --- /dev/null +++ b/tests/issue671-unzip.rs @@ -0,0 +1,19 @@ +#![type_length_limit = "10000"] + +extern crate rayon; + +use rayon::prelude::*; + +#[test] +fn type_length_limit() { + let input = vec![1, 2, 3, 4, 5]; + let (indexes, (squares, cubes)): (Vec<_>, (Vec<_>, Vec<_>)) = input + .par_iter() + .map(|x| (x * x, x * x * x)) + .enumerate() + .unzip(); + + drop(indexes); + drop(squares); + drop(cubes); +} From ba41dab8f384b5ba16cbfcb690633675ea16b342 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 10 Jul 2019 14:45:47 -0700 Subject: [PATCH 4/6] Reduce genericity in join wrappers --- rayon-core/src/job.rs | 6 +++++- rayon-core/src/join/mod.rs | 23 ++++++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index a47f4558d..ac1c6aaf6 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -109,10 +109,14 @@ where R: Send, { unsafe fn execute(this: *const Self) { + fn call(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R { + move || func(true) + } + let this = &*this; let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); - (*this.result.get()) = match unwind::halt_unwinding(|| func(true)) { + (*this.result.get()) = match unwind::halt_unwinding(call(func)) { Ok(x) => JobResult::Ok(x), Err(x) => JobResult::Panic(x), }; diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index ceb268d44..04d70c095 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -98,7 +98,12 @@ where RA: Send, RB: Send, { - join_context(|_| oper_a(), |_| oper_b()) + #[inline] + fn call(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R { + move |_| f() + } + + join_context(call(oper_a), call(oper_b)) } /// Identical to `join`, except that the closures have a parameter @@ -115,22 +120,30 @@ where RA: Send, RB: Send, { + #[inline] + fn call_a(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { + move || f(FnContext::new(injected)) + } + + #[inline] + fn call_b(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { + move |migrated| f(FnContext::new(migrated)) + } + registry::in_worker(|worker_thread, injected| unsafe { log!(Join { worker: worker_thread.index() }); - let latch = SpinLatch::new(); - // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), latch); + let job_b = StackJob::new(call_b(oper_b), SpinLatch::new()); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); // Execute task a; hopefully b gets stolen in the meantime. - let status_a = unwind::halt_unwinding(move || oper_a(FnContext::new(injected))); + let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); let result_a = match status_a { Ok(v) => v, Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), From 97cc314c336dfb6f75294ecc2cbaa25aa6b83ad0 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 10 Jul 2019 17:50:08 -0700 Subject: [PATCH 5/6] Reduce genericity in more iterator closures --- src/iter/find.rs | 6 +- src/iter/flatten.rs | 6 +- src/iter/fold.rs | 9 +- src/iter/for_each.rs | 2 +- src/iter/from_par_iter.rs | 3 +- src/iter/interleave.rs | 7 +- src/iter/map_with.rs | 11 ++- src/iter/mod.rs | 186 +++++++++++++++++++++++++++++--------- src/iter/noop.rs | 8 +- src/iter/panic_fuse.rs | 6 +- src/iter/skip.rs | 2 +- src/iter/try_fold.rs | 8 +- src/iter/try_reduce.rs | 15 +-- src/iter/update.rs | 36 ++++---- src/option.rs | 14 ++- src/range.rs | 7 +- src/slice/mod.rs | 4 +- src/split_producer.rs | 9 +- src/str.rs | 35 ++++--- 19 files changed, 259 insertions(+), 115 deletions(-) diff --git a/src/iter/find.rs b/src/iter/find.rs index 86b1fc17c..6358d5280 100644 --- a/src/iter/find.rs +++ b/src/iter/find.rs @@ -87,10 +87,14 @@ where where I: IntoIterator, { + fn not_full(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ { + move |_| !found.load(Ordering::Relaxed) + } + self.item = iter .into_iter() // stop iterating if another thread has found something - .take_while(|_| !self.full()) + .take_while(not_full(&self.found)) .find(self.find_op); if self.item.is_some() { self.found.store(true, Ordering::Relaxed) diff --git a/src/iter/flatten.rs b/src/iter/flatten.rs index 5862adc81..939ba1636 100644 --- a/src/iter/flatten.rs +++ b/src/iter/flatten.rs @@ -35,6 +35,10 @@ where where C: UnindexedConsumer, { - self.base.flat_map(|x| x).drive_unindexed(consumer) + fn id(x: T) -> T { + x + } + + self.base.flat_map(id).drive_unindexed(consumer) } } diff --git a/src/iter/fold.rs b/src/iter/fold.rs index 9ae937c9d..0de0aa3f0 100644 --- a/src/iter/fold.rs +++ b/src/iter/fold.rs @@ -147,11 +147,18 @@ where where I: IntoIterator, { + fn not_full(base: &C) -> impl Fn(&T) -> bool + '_ + where + C: Folder, + { + move |_| !base.full() + } + let base = self.base; let item = iter .into_iter() // stop iterating if another thread has finished - .take_while(|_| !base.full()) + .take_while(not_full(&base)) .fold(self.item, self.fold_op); FoldFolder { diff --git a/src/iter/for_each.rs b/src/iter/for_each.rs index 994140458..5307939ba 100644 --- a/src/iter/for_each.rs +++ b/src/iter/for_each.rs @@ -52,7 +52,7 @@ where where I: IntoIterator, { - iter.into_iter().fold((), |_, item| (self.op)(item)); + iter.into_iter().for_each(self.op); self } diff --git a/src/iter/from_par_iter.rs b/src/iter/from_par_iter.rs index 21860a4e0..5797df427 100644 --- a/src/iter/from_par_iter.rs +++ b/src/iter/from_par_iter.rs @@ -1,3 +1,4 @@ +use super::noop::NoopConsumer; use super::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; use std::borrow::Cow; @@ -222,6 +223,6 @@ impl FromParallelIterator<()> for () { where I: IntoParallelIterator, { - par_iter.into_par_iter().for_each(|()| {}) + par_iter.into_par_iter().drive_unindexed(NoopConsumer) } } diff --git a/src/iter/interleave.rs b/src/iter/interleave.rs index 028f70b8d..34f301762 100644 --- a/src/iter/interleave.rs +++ b/src/iter/interleave.rs @@ -205,11 +205,14 @@ where /// should yield the next element, otherwise, if `j` should yield /// the next element, set a = index/2 and b = (index/2)+1 fn split_at(self, index: usize) -> (Self, Self) { + #[inline] + fn odd_offset(flag: bool) -> usize { + (!flag) as usize + } + let even = index % 2 == 0; let idx = index >> 1; - let odd_offset = |flag| if flag { 0 } else { 1 }; - // desired split let (i_idx, j_idx) = ( idx + odd_offset(even || self.i_next), diff --git a/src/iter/map_with.rs b/src/iter/map_with.rs index c5eafc728..22733b315 100644 --- a/src/iter/map_with.rs +++ b/src/iter/map_with.rs @@ -310,10 +310,15 @@ where where I: IntoIterator, { + fn with<'f, T, U, R>( + item: &'f mut U, + map_op: impl Fn(&mut U, T) -> R + 'f, + ) -> impl FnMut(T) -> R + 'f { + move |x| map_op(item, x) + } + { - let map_op = self.map_op; - let item = &mut self.item; - let mapped_iter = iter.into_iter().map(|x| map_op(item, x)); + let mapped_iter = iter.into_iter().map(with(&mut self.item, self.map_op)); self.base = self.base.consume_iter(mapped_iter); } self diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 917aefff2..e10aaaf45 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -382,7 +382,7 @@ pub trait ParallelIterator: Sized + Send { OP: Fn(&mut T, Self::Item) + Sync + Send, T: Send + Clone, { - self.map_with(init, op).for_each(|()| ()) + self.map_with(init, op).collect() } /// Executes `OP` on a value returned by `init` with each item produced by @@ -419,7 +419,7 @@ pub trait ParallelIterator: Sized + Send { OP: Fn(&mut T, Self::Item) + Sync + Send, INIT: Fn() -> T + Sync + Send, { - self.map_init(init, op).for_each(|()| ()) + self.map_init(init, op).collect() } /// Executes a fallible `OP` on each item produced by the iterator, in parallel. @@ -447,7 +447,11 @@ pub trait ParallelIterator: Sized + Send { OP: Fn(Self::Item) -> R + Sync + Send, R: Try + Send, { - self.map(op).try_reduce(|| (), |(), ()| R::from_ok(())) + fn ok>(_: (), _: ()) -> R { + R::from_ok(()) + } + + self.map(op).try_reduce(<()>::default, ok) } /// Executes a fallible `OP` on the given `init` value with each item @@ -483,8 +487,11 @@ pub trait ParallelIterator: Sized + Send { T: Send + Clone, R: Try + Send, { - self.map_with(init, op) - .try_reduce(|| (), |(), ()| R::from_ok(())) + fn ok>(_: (), _: ()) -> R { + R::from_ok(()) + } + + self.map_with(init, op).try_reduce(<()>::default, ok) } /// Executes a fallible `OP` on a value returned by `init` with each item @@ -525,8 +532,11 @@ pub trait ParallelIterator: Sized + Send { INIT: Fn() -> T + Sync + Send, R: Try + Send, { - self.map_init(init, op) - .try_reduce(|| (), |(), ()| R::from_ok(())) + fn ok>(_: (), _: ()) -> R { + R::from_ok(()) + } + + self.map_init(init, op).try_reduce(<()>::default, ok) } /// Counts the number of items in this parallel iterator. @@ -541,7 +551,11 @@ pub trait ParallelIterator: Sized + Send { /// assert_eq!(count, 100); /// ``` fn count(self) -> usize { - self.map(|_| 1).sum() + fn one(_: T) -> usize { + 1 + } + + self.map(one).sum() } /// Applies `map_op` to each item of this iterator, producing a new @@ -916,21 +930,23 @@ pub trait ParallelIterator: Sized + Send { where OP: Fn(Self::Item, Self::Item) -> Self::Item + Sync + Send, { - self.fold( - || None, - |opt_a, b| match opt_a { + fn opt_fold(op: impl Fn(T, T) -> T) -> impl Fn(Option, T) -> Option { + move |opt_a, b| match opt_a { Some(a) => Some(op(a, b)), None => Some(b), - }, - ) - .reduce( - || None, - |opt_a, opt_b| match (opt_a, opt_b) { + } + } + + fn opt_reduce(op: impl Fn(T, T) -> T) -> impl Fn(Option, Option) -> Option { + move |opt_a, opt_b| match (opt_a, opt_b) { (Some(a), Some(b)) => Some(op(a, b)), (Some(v), None) | (None, Some(v)) => Some(v), (None, None) => None, - }, - ) + } + } + + self.fold(<_>::default, opt_fold(&op)) + .reduce(<_>::default, opt_reduce(&op)) } /// Reduces the items in the iterator into one item using a fallible `op`. @@ -1361,10 +1377,14 @@ pub trait ParallelIterator: Sized + Send { where F: Sync + Send + Fn(&Self::Item, &Self::Item) -> Ordering, { - self.reduce_with(|a, b| match f(&a, &b) { - Ordering::Greater => b, - _ => a, - }) + fn min(f: impl Fn(&T, &T) -> Ordering) -> impl Fn(T, T) -> T { + move |a, b| match f(&a, &b) { + Ordering::Greater => b, + _ => a, + } + } + + self.reduce_with(min(f)) } /// Computes the item that yields the minimum value for the given @@ -1389,7 +1409,18 @@ pub trait ParallelIterator: Sized + Send { K: Ord + Send, F: Sync + Send + Fn(&Self::Item) -> K, { - let (_, x) = self.map(|x| (f(&x), x)).min_by(|a, b| (a.0).cmp(&b.0))?; + fn key(f: impl Fn(&T) -> K) -> impl Fn(T) -> (K, T) { + move |x| (f(&x), x) + } + + fn min_key(a: (K, T), b: (K, T)) -> (K, T) { + match (a.0).cmp(&b.0) { + Ordering::Greater => b, + _ => a, + } + } + + let (_, x) = self.map(key(f)).reduce_with(min_key)?; Some(x) } @@ -1444,10 +1475,14 @@ pub trait ParallelIterator: Sized + Send { where F: Sync + Send + Fn(&Self::Item, &Self::Item) -> Ordering, { - self.reduce_with(|a, b| match f(&a, &b) { - Ordering::Greater => a, - _ => b, - }) + fn max(f: impl Fn(&T, &T) -> Ordering) -> impl Fn(T, T) -> T { + move |a, b| match f(&a, &b) { + Ordering::Greater => a, + _ => b, + } + } + + self.reduce_with(max(f)) } /// Computes the item that yields the maximum value for the given @@ -1472,7 +1507,18 @@ pub trait ParallelIterator: Sized + Send { K: Ord + Send, F: Sync + Send + Fn(&Self::Item) -> K, { - let (_, x) = self.map(|x| (f(&x), x)).max_by(|a, b| (a.0).cmp(&b.0))?; + fn key(f: impl Fn(&T) -> K) -> impl Fn(T) -> (K, T) { + move |x| (f(&x), x) + } + + fn max_key(a: (K, T), b: (K, T)) -> (K, T) { + match (a.0).cmp(&b.0) { + Ordering::Greater => a, + _ => b, + } + } + + let (_, x) = self.map(key(f)).reduce_with(max_key)?; Some(x) } @@ -1616,7 +1662,10 @@ pub trait ParallelIterator: Sized + Send { P: Fn(Self::Item) -> Option + Sync + Send, R: Send, { - self.filter_map(predicate).find_any(|_| true) + fn yes(_: &T) -> bool { + true + } + self.filter_map(predicate).find_any(yes) } /// Applies the given predicate to the items in the parallel iterator and @@ -1647,7 +1696,10 @@ pub trait ParallelIterator: Sized + Send { P: Fn(Self::Item) -> Option + Sync + Send, R: Send, { - self.filter_map(predicate).find_first(|_| true) + fn yes(_: &T) -> bool { + true + } + self.filter_map(predicate).find_first(yes) } /// Applies the given predicate to the items in the parallel iterator and @@ -1678,7 +1730,10 @@ pub trait ParallelIterator: Sized + Send { P: Fn(Self::Item) -> Option + Sync + Send, R: Send, { - self.filter_map(predicate).find_last(|_| true) + fn yes(_: &T) -> bool { + true + } + self.filter_map(predicate).find_last(yes) } #[doc(hidden)] @@ -1712,7 +1767,7 @@ pub trait ParallelIterator: Sized + Send { where P: Fn(Self::Item) -> bool + Sync + Send, { - self.map(predicate).find_any(|&p| p).is_some() + self.map(predicate).find_any(bool::clone).is_some() } /// Tests that every item in the parallel iterator matches the given @@ -1734,7 +1789,12 @@ pub trait ParallelIterator: Sized + Send { where P: Fn(Self::Item) -> bool + Sync + Send, { - self.map(predicate).find_any(|&p| !p).is_none() + #[inline] + fn is_false(x: &bool) -> bool { + !x + } + + self.map(predicate).find_any(is_false).is_none() } /// Creates an iterator over the `Some` items of this iterator, halting @@ -2216,11 +2276,21 @@ pub trait IndexedParallelIterator: ParallelIterator { I::Iter: IndexedParallelIterator, Self::Item: Ord, { + #[inline] + fn ordering((x, y): (T, T)) -> Ordering { + Ord::cmp(&x, &y) + } + + #[inline] + fn inequal(&ord: &Ordering) -> bool { + ord != Ordering::Equal + } + let other = other.into_par_iter(); let ord_len = self.len().cmp(&other.len()); self.zip(other) - .map(|(x, y)| Ord::cmp(&x, &y)) - .find_first(|&ord| ord != Ordering::Equal) + .map(ordering) + .find_first(inequal) .unwrap_or(ord_len) } @@ -2246,12 +2316,22 @@ pub trait IndexedParallelIterator: ParallelIterator { I::Iter: IndexedParallelIterator, Self::Item: PartialOrd, { + #[inline] + fn ordering, U>((x, y): (T, U)) -> Option { + PartialOrd::partial_cmp(&x, &y) + } + + #[inline] + fn inequal(&ord: &Option) -> bool { + ord != Some(Ordering::Equal) + } + let other = other.into_par_iter(); - let ord_len = Some(self.len().cmp(&other.len())); + let ord_len = self.len().cmp(&other.len()); self.zip(other) - .map(|(x, y)| PartialOrd::partial_cmp(&x, &y)) - .find_first(|&ord| ord != Some(Ordering::Equal)) - .unwrap_or(ord_len) + .map(ordering) + .find_first(inequal) + .unwrap_or(Some(ord_len)) } /// Determines if the elements of this `ParallelIterator` @@ -2262,8 +2342,13 @@ pub trait IndexedParallelIterator: ParallelIterator { I::Iter: IndexedParallelIterator, Self::Item: PartialEq, { + #[inline] + fn eq, U>((x, y): (T, U)) -> bool { + PartialEq::eq(&x, &y) + } + let other = other.into_par_iter(); - self.len() == other.len() && self.zip(other).all(|(x, y)| x.eq(&y)) + self.len() == other.len() && self.zip(other).all(eq) } /// Determines if the elements of this `ParallelIterator` @@ -2400,7 +2485,12 @@ pub trait IndexedParallelIterator: ParallelIterator { where P: Fn(Self::Item) -> bool + Sync + Send, { - let (i, _) = self.map(predicate).enumerate().find_any(|&(_, p)| p)?; + #[inline] + fn check(&(_, p): &(usize, bool)) -> bool { + p + } + + let (i, _) = self.map(predicate).enumerate().find_any(check)?; Some(i) } @@ -2432,7 +2522,12 @@ pub trait IndexedParallelIterator: ParallelIterator { where P: Fn(Self::Item) -> bool + Sync + Send, { - let (i, _) = self.map(predicate).enumerate().find_first(|&(_, p)| p)?; + #[inline] + fn check(&(_, p): &(usize, bool)) -> bool { + p + } + + let (i, _) = self.map(predicate).enumerate().find_first(check)?; Some(i) } @@ -2464,7 +2559,12 @@ pub trait IndexedParallelIterator: ParallelIterator { where P: Fn(Self::Item) -> bool + Sync + Send, { - let (i, _) = self.map(predicate).enumerate().find_last(|&(_, p)| p)?; + #[inline] + fn check(&(_, p): &(usize, bool)) -> bool { + p + } + + let (i, _) = self.map(predicate).enumerate().find_last(check)?; Some(i) } diff --git a/src/iter/noop.rs b/src/iter/noop.rs index 31d7528ed..1e55ecb20 100644 --- a/src/iter/noop.rs +++ b/src/iter/noop.rs @@ -2,12 +2,6 @@ use super::plumbing::*; pub(super) struct NoopConsumer; -impl NoopConsumer { - pub(super) fn new() -> Self { - NoopConsumer - } -} - impl Consumer for NoopConsumer { type Folder = NoopConsumer; type Reducer = NoopReducer; @@ -37,7 +31,7 @@ impl Folder for NoopConsumer { where I: IntoIterator, { - iter.into_iter().fold((), |_, _| ()); + iter.into_iter().for_each(drop); self } diff --git a/src/iter/panic_fuse.rs b/src/iter/panic_fuse.rs index d0cbbd4c1..b0d1ac0eb 100644 --- a/src/iter/panic_fuse.rs +++ b/src/iter/panic_fuse.rs @@ -306,9 +306,13 @@ where where I: IntoIterator, { + fn cool<'a, T>(fuse: &'a Fuse) -> impl Fn(&T) -> bool + 'a { + move |_| !fuse.panicked() + } + self.base = { let fuse = &self.fuse; - let iter = iter.into_iter().take_while(move |_| !fuse.panicked()); + let iter = iter.into_iter().take_while(cool(fuse)); self.base.consume_iter(iter) }; self diff --git a/src/iter/skip.rs b/src/iter/skip.rs index 73653f24e..903934c74 100644 --- a/src/iter/skip.rs +++ b/src/iter/skip.rs @@ -80,7 +80,7 @@ where P: Producer, { let (before_skip, after_skip) = base.split_at(self.n); - bridge_producer_consumer(self.n, before_skip, NoopConsumer::new()); + bridge_producer_consumer(self.n, before_skip, NoopConsumer); self.callback.callback(after_skip) } } diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs index 13165c9ec..51f988179 100644 --- a/src/iter/try_fold.rs +++ b/src/iter/try_fold.rs @@ -141,10 +141,12 @@ where { type Result = C::Result; - fn consume(self, item: T) -> Self { + fn consume(mut self, item: T) -> Self { let fold_op = self.fold_op; - let result = self.result.and_then(|acc| fold_op(acc, item).into_result()); - TryFoldFolder { result, ..self } + if let Ok(acc) = self.result { + self.result = fold_op(acc, item).into_result(); + } + self } fn complete(self) -> C::Result { diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs index 926746456..6c9f24181 100644 --- a/src/iter/try_reduce.rs +++ b/src/iter/try_reduce.rs @@ -102,15 +102,18 @@ where { type Result = T; - fn consume(self, item: T) -> Self { + fn consume(mut self, item: T) -> Self { let reduce_op = self.reduce_op; - let result = self - .result - .and_then(|left| reduce_op(left, item.into_result()?).into_result()); - if result.is_err() { + if let Ok(left) = self.result { + self.result = match item.into_result() { + Ok(right) => reduce_op(left, right).into_result(), + Err(error) => Err(error), + }; + } + if self.result.is_err() { self.full.store(true, Ordering::Relaxed) } - TryReduceFolder { result, ..self } + self } fn complete(self) -> T { diff --git a/src/iter/update.rs b/src/iter/update.rs index 7bed5d9a4..e822f4878 100644 --- a/src/iter/update.rs +++ b/src/iter/update.rs @@ -223,6 +223,13 @@ struct UpdateFolder<'f, C, F: 'f> { update_op: &'f F, } +fn apply(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T { + move |mut item| { + update_op(&mut item); + item + } +} + impl<'f, T, C, F> Folder for UpdateFolder<'f, C, F> where C: Folder, @@ -244,10 +251,9 @@ where I: IntoIterator, { let update_op = self.update_op; - self.base = self.base.consume_iter(iter.into_iter().map(|mut item| { - update_op(&mut item); - item - })); + self.base = self + .base + .consume_iter(iter.into_iter().map(apply(update_op))); self } @@ -271,7 +277,7 @@ struct UpdateSeq { impl Iterator for UpdateSeq where I: Iterator, - F: FnMut(&mut I::Item), + F: Fn(&mut I::Item), { type Item = I::Item; @@ -285,15 +291,11 @@ where self.base.size_hint() } - fn fold(self, init: Acc, mut g: G) -> Acc + fn fold(self, init: Acc, g: G) -> Acc where G: FnMut(Acc, Self::Item) -> Acc, { - let mut f = self.update_op; - self.base.fold(init, move |acc, mut v| { - f(&mut v); - g(acc, v) - }) + self.base.map(apply(self.update_op)).fold(init, g) } // if possible, re-use inner iterator specializations in collect @@ -301,27 +303,21 @@ where where C: ::std::iter::FromIterator, { - let mut f = self.update_op; - self.base - .map(move |mut v| { - f(&mut v); - v - }) - .collect() + self.base.map(apply(self.update_op)).collect() } } impl ExactSizeIterator for UpdateSeq where I: ExactSizeIterator, - F: FnMut(&mut I::Item), + F: Fn(&mut I::Item), { } impl DoubleEndedIterator for UpdateSeq where I: DoubleEndedIterator, - F: FnMut(&mut I::Item), + F: Fn(&mut I::Item), { fn next_back(&mut self) -> Option { let mut v = self.base.next_back()?; diff --git a/src/option.rs b/src/option.rs index 0c021d93f..925107f82 100644 --- a/src/option.rs +++ b/src/option.rs @@ -180,14 +180,18 @@ where where I: IntoParallelIterator>, { - let found_none = AtomicBool::new(false); - let collection = par_iter - .into_par_iter() - .inspect(|item| { + fn check(found_none: &AtomicBool) -> impl Fn(&Option) + '_ { + move |item| { if item.is_none() { found_none.store(true, Ordering::Relaxed); } - }) + } + } + + let found_none = AtomicBool::new(false); + let collection = par_iter + .into_par_iter() + .inspect(check(&found_none)) .while_some() .collect(); diff --git a/src/range.rs b/src/range.rs index bd4c7dcec..1a0f2c5d3 100644 --- a/src/range.rs +++ b/src/range.rs @@ -156,11 +156,16 @@ macro_rules! unindexed_range_impl { where C: UnindexedConsumer, { + #[inline] + fn offset(start: $t) -> impl Fn(usize) -> $t { + move |i| start.wrapping_add(i as $t) + } + if let Some(len) = self.opt_len() { // Drive this in indexed mode for better `collect`. (0..len) .into_par_iter() - .map(|i| self.range.start.wrapping_add(i as $t)) + .map(offset(self.range.start)) .drive(consumer) } else { bridge_unindexed(IterProducer { range: self.range }, consumer) diff --git a/src/slice/mod.rs b/src/slice/mod.rs index d78786996..72f50f713 100644 --- a/src/slice/mod.rs +++ b/src/slice/mod.rs @@ -186,7 +186,7 @@ pub trait ParallelSliceMut { where T: Ord, { - par_mergesort(self.as_parallel_slice_mut(), |a, b| a.lt(b)); + par_mergesort(self.as_parallel_slice_mut(), T::lt); } /// Sorts the slice in parallel with a comparator function. @@ -310,7 +310,7 @@ pub trait ParallelSliceMut { where T: Ord, { - par_quicksort(self.as_parallel_slice_mut(), |a, b| a.lt(b)); + par_quicksort(self.as_parallel_slice_mut(), T::lt); } /// Sorts the slice in parallel with a comparator function, but may not preserve the order of diff --git a/src/split_producer.rs b/src/split_producer.rs index 36d1b70f2..99c2a89c5 100644 --- a/src/split_producer.rs +++ b/src/split_producer.rs @@ -84,11 +84,10 @@ where fn split(self) -> (Self, Option) { // Look forward for the separator, and failing that look backward. let mid = self.data.midpoint(self.tail); - let index = self - .data - .find(self.separator, mid, self.tail) - .map(|i| mid + i) - .or_else(|| self.data.rfind(self.separator, mid)); + let index = match self.data.find(self.separator, mid, self.tail) { + Some(i) => Some(mid + i), + None => self.data.rfind(self.separator, mid), + }; if let Some(index) = index { let len = self.data.length(); diff --git a/src/str.rs b/src/str.rs index a60555dce..3b4573cf0 100644 --- a/src/str.rs +++ b/src/str.rs @@ -298,6 +298,11 @@ mod private { } use self::private::Pattern; +#[inline] +fn offset(base: usize) -> impl Fn((usize, T)) -> (usize, T) { + move |(i, x)| (base + i, x) +} + impl Pattern for char { private_impl! {} @@ -338,7 +343,7 @@ impl Pattern for char { where F: Folder<(usize, &'ch str)>, { - folder.consume_iter(chars.match_indices(*self).map(move |(i, s)| (base + i, s))) + folder.consume_iter(chars.match_indices(*self).map(offset(base))) } } @@ -379,7 +384,7 @@ impl bool> Pattern for FN { where F: Folder<(usize, &'ch str)>, { - folder.consume_iter(chars.match_indices(self).map(move |(i, s)| (base + i, s))) + folder.consume_iter(chars.match_indices(self).map(offset(base))) } } @@ -479,7 +484,7 @@ impl<'ch> UnindexedProducer for CharIndicesProducer<'ch> { F: Folder, { let base = self.index; - folder.consume_iter(self.chars.char_indices().map(move |(i, c)| (base + i, c))) + folder.consume_iter(self.chars.char_indices().map(offset(base))) } } @@ -704,6 +709,15 @@ impl<'ch, 'sep, P: Pattern + 'sep> UnindexedProducer for SplitTerminatorProducer #[derive(Debug, Clone)] pub struct Lines<'ch>(&'ch str); +#[inline] +fn no_carriage_return(line: &str) -> &str { + if line.ends_with('\r') { + &line[..line.len() - 1] + } else { + line + } +} + impl<'ch> ParallelIterator for Lines<'ch> { type Item = &'ch str; @@ -713,13 +727,7 @@ impl<'ch> ParallelIterator for Lines<'ch> { { self.0 .par_split_terminator('\n') - .map(|line| { - if line.ends_with('\r') { - &line[..line.len() - 1] - } else { - line - } - }) + .map(no_carriage_return) .drive_unindexed(consumer) } } @@ -730,6 +738,11 @@ impl<'ch> ParallelIterator for Lines<'ch> { #[derive(Debug, Clone)] pub struct SplitWhitespace<'ch>(&'ch str); +#[inline] +fn not_empty(s: &&str) -> bool { + !s.is_empty() +} + impl<'ch> ParallelIterator for SplitWhitespace<'ch> { type Item = &'ch str; @@ -739,7 +752,7 @@ impl<'ch> ParallelIterator for SplitWhitespace<'ch> { { self.0 .par_split(char::is_whitespace) - .filter(|string| !string.is_empty()) + .filter(not_empty) .drive_unindexed(consumer) } } From dbc114fbc2934bdb874d0dc5c1e3547401b71873 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 14 Aug 2019 13:32:08 -0700 Subject: [PATCH 6/6] Avoid closures in the Copied implementation --- src/iter/copied.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/iter/copied.rs b/src/iter/copied.rs index 175ff17e0..bb78be05b 100644 --- a/src/iter/copied.rs +++ b/src/iter/copied.rs @@ -102,11 +102,11 @@ where T: 'a + Copy, { type Item = T; - type IntoIter = iter::Map T>; + type IntoIter = iter::Cloned; fn into_iter(self) -> Self::IntoIter { // FIXME: use `Iterator::copied()` when Rust 1.36 is our minimum. - self.base.into_iter().map(|&x| x) + self.base.into_iter().cloned() } fn min_len(&self) -> usize { @@ -211,7 +211,7 @@ where I: IntoIterator, { // FIXME: use `Iterator::copied()` when Rust 1.36 is our minimum. - self.base = self.base.consume_iter(iter.into_iter().map(|&x| x)); + self.base = self.base.consume_iter(iter.into_iter().cloned()); self }