diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 15b1864d8b..57efac10e0 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -18944,9 +18944,9 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No * @param the element type of the optional value * @param optional the optional value to convert into a {@code Flowable} * @return the new Flowable instance + * @since 3.0.0 * @see #just(Object) * @see #empty() - * @since 3.0.0 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -19409,6 +19409,7 @@ public final Stream blockingStream(int prefetch) { * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements * will be emitted to the downstream * @return the new Flowable instance + * @since 3.0.0 * @see #concatMap(Function) * @see #concatMapIterable(Function) * @see #concatMapStream(Function, int) @@ -19461,6 +19462,7 @@ public final Stream blockingStream(int prefetch) { * will be emitted to the downstream * @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received * @return the new Flowable instance + * @since 3.0.0 * @see #concatMap(Function, int) * @see #concatMapIterable(Function, int) * @see #flatMapStream(Function, int) @@ -19515,6 +19517,7 @@ public final Stream blockingStream(int prefetch) { * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements * will be emitted to the downstream * @return the new Flowable instance + * @since 3.0.0 * @see #flatMap(Function) * @see #flatMapIterable(Function) * @see #flatMapStream(Function, int) @@ -19567,6 +19570,7 @@ public final Stream blockingStream(int prefetch) { * will be emitted to the downstream * @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received * @return the new Flowable instance + * @since 3.0.0 * @see #flatMap(Function, int) * @see #flatMapIterable(Function, int) * @see #concatMapStream(Function, int) diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index d5336f96ce..f26e584c16 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -4870,9 +4870,9 @@ public final TestObserver test(boolean dispose) { * @param the element type of the optional value * @param optional the optional value to convert into a {@code Maybe} * @return the new Maybe instance + * @since 3.0.0 * @see #just(Object) * @see #empty() - * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 0afdd29a93..eb33792f34 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -15937,9 +15937,9 @@ public final TestObserver test(boolean dispose) { // NoPMD * @param the element type of the optional value * @param optional the optional value to convert into an {@code Observable} * @return the new Observable instance + * @since 3.0.0 * @see #just(Object) * @see #empty() - * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -16355,6 +16355,7 @@ public final Stream blockingStream(int capacityHint) { * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements * will be emitted to the downstream * @return the new Observable instance + * @since 3.0.0 * @see #concatMap(Function) * @see #concatMapIterable(Function) * @see #flatMapStream(Function) @@ -16401,6 +16402,7 @@ public final Stream blockingStream(int capacityHint) { * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements * will be emitted to the downstream * @return the new Observable instance + * @since 3.0.0 * @see #flatMap(Function) * @see #flatMapIterable(Function) */ diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java index ad1b3ad140..c967a011e5 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java @@ -71,10 +71,23 @@ protected void subscribeActual(Subscriber s) { EmptySubscription.complete(s); } } else { - source.subscribe(new FlatMapStreamSubscriber<>(s, mapper, prefetch)); + source.subscribe(subscribe(s, mapper, prefetch)); } } + /** + * Create a {@link Subscriber} with the given parameters. + * @param the upstream value type + * @param the {@link Stream} and output value type + * @param downstream the downstream {@code Subscriber} to wrap + * @param mapper the mapper function + * @param prefetch the number of items to prefetch + * @return the new {@code Subscriber} + */ + public static Subscriber subscribe(Subscriber downstream, Function> mapper, int prefetch) { + return new FlatMapStreamSubscriber<>(downstream, mapper, prefetch); + } + static final class FlatMapStreamSubscriber extends AtomicInteger implements FlowableSubscriber, Subscription { diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollector.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollector.java new file mode 100644 index 0000000000..e94f50cb67 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollector.java @@ -0,0 +1,270 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Objects; +import java.util.concurrent.atomic.*; +import java.util.function.*; +import java.util.stream.Collector; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.AtomicThrowable; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Reduces all 'rails' into a single via a Java 8 {@link Collector} callback set. + * + * @param the value type + * @param the accumulator type + * @param the result type + * @since 3.0.0 + */ +public final class ParallelCollector extends Flowable { + + final ParallelFlowable source; + + final Collector collector; + + public ParallelCollector(ParallelFlowable source, Collector collector) { + this.source = source; + this.collector = collector; + } + + @Override + protected void subscribeActual(Subscriber s) { + ParallelCollectorSubscriber parent; + try { + parent = new ParallelCollectorSubscriber<>(s, source.parallelism(), collector); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptySubscription.error(ex, s); + return; + } + s.onSubscribe(parent); + + source.subscribe(parent.subscribers); + } + + static final class ParallelCollectorSubscriber extends DeferredScalarSubscription { + + private static final long serialVersionUID = -5370107872170712765L; + + final ParallelCollectorInnerSubscriber[] subscribers; + + final AtomicReference> current = new AtomicReference<>(); + + final AtomicInteger remaining = new AtomicInteger(); + + final AtomicThrowable error = new AtomicThrowable(); + + final Function finisher; + + ParallelCollectorSubscriber(Subscriber subscriber, int n, Collector collector) { + super(subscriber); + this.finisher = collector.finisher(); + @SuppressWarnings("unchecked") + ParallelCollectorInnerSubscriber[] a = new ParallelCollectorInnerSubscriber[n]; + for (int i = 0; i < n; i++) { + a[i] = new ParallelCollectorInnerSubscriber<>(this, collector.supplier().get(), collector.accumulator(), collector.combiner()); + } + this.subscribers = a; + remaining.lazySet(n); + } + + SlotPair addValue(A value) { + for (;;) { + SlotPair curr = current.get(); + + if (curr == null) { + curr = new SlotPair<>(); + if (!current.compareAndSet(null, curr)) { + continue; + } + } + + int c = curr.tryAcquireSlot(); + if (c < 0) { + current.compareAndSet(curr, null); + continue; + } + if (c == 0) { + curr.first = value; + } else { + curr.second = value; + } + + if (curr.releaseSlot()) { + current.compareAndSet(curr, null); + return curr; + } + return null; + } + } + + @Override + public void cancel() { + for (ParallelCollectorInnerSubscriber inner : subscribers) { + inner.cancel(); + } + } + + void innerError(Throwable ex) { + if (error.compareAndSet(null, ex)) { + cancel(); + downstream.onError(ex); + } else { + if (ex != error.get()) { + RxJavaPlugins.onError(ex); + } + } + } + + void innerComplete(A value, BinaryOperator combiner) { + for (;;) { + SlotPair sp = addValue(value); + + if (sp != null) { + + try { + value = combiner.apply(sp.first, sp.second); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + innerError(ex); + return; + } + + } else { + break; + } + } + + if (remaining.decrementAndGet() == 0) { + SlotPair sp = current.get(); + current.lazySet(null); + + R result; + try { + result = Objects.requireNonNull(finisher.apply(sp.first), "The finisher returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + innerError(ex); + return; + } + + complete(result); + } + } + } + + static final class ParallelCollectorInnerSubscriber + extends AtomicReference + implements FlowableSubscriber { + + private static final long serialVersionUID = -7954444275102466525L; + + final ParallelCollectorSubscriber parent; + + final BiConsumer accumulator; + + final BinaryOperator combiner; + + A container; + + boolean done; + + ParallelCollectorInnerSubscriber(ParallelCollectorSubscriber parent, A container, BiConsumer accumulator, BinaryOperator combiner) { + this.parent = parent; + this.accumulator = accumulator; + this.combiner = combiner; + this.container = container; + } + + @Override + public void onSubscribe(Subscription s) { + SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + if (!done) { + try { + accumulator.accept(container, t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + get().cancel(); + onError(ex); + return; + } + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + container = null; + done = true; + parent.innerError(t); + } + + @Override + public void onComplete() { + if (!done) { + A v = container; + container = null; + done = true; + parent.innerComplete(v, combiner); + } + } + + void cancel() { + SubscriptionHelper.cancel(this); + } + } + + static final class SlotPair extends AtomicInteger { + + private static final long serialVersionUID = 473971317683868662L; + + T first; + + T second; + + final AtomicInteger releaseIndex = new AtomicInteger(); + + int tryAcquireSlot() { + for (;;) { + int acquired = get(); + if (acquired >= 2) { + return -1; + } + + if (compareAndSet(acquired, acquired + 1)) { + return acquired; + } + } + } + + boolean releaseSlot() { + return releaseIndex.incrementAndGet() == 2; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStream.java new file mode 100644 index 0000000000..3f38d1fec3 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStream.java @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.Stream; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.parallel.ParallelFlowable; + +/** + * Flattens the generated {@link Stream}s on each rail. + * + * @param the input value type + * @param the output value type + * @since 3.0.0 + */ +public final class ParallelFlatMapStream extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + final int prefetch; + + public ParallelFlatMapStream( + ParallelFlowable source, + Function> mapper, + int prefetch) { + this.source = source; + this.mapper = mapper; + this.prefetch = prefetch; + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + @SuppressWarnings("unchecked") + final Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + parents[i] = FlowableFlatMapStream.subscribe(subscribers[i], mapper, prefetch); + } + + source.subscribe(parents); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptional.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptional.java new file mode 100644 index 0000000000..350f59f2b2 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptional.java @@ -0,0 +1,236 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Maps each 'rail' of the source ParallelFlowable with a mapper function. + * + * @param the input value type + * @param the output value type + * @since 3.0.0 + */ +public final class ParallelMapOptional extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + public ParallelMapOptional(ParallelFlowable source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelMapConditionalSubscriber<>((ConditionalSubscriber)a, mapper); + } else { + parents[i] = new ParallelMapSubscriber<>(a, mapper); + } + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelMapSubscriber implements ConditionalSubscriber, Subscription { + + final Subscriber downstream; + + final Function> mapper; + + Subscription upstream; + + boolean done; + + ParallelMapSubscriber(Subscriber actual, Function> mapper) { + this.downstream = actual; + this.mapper = mapper; + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t)) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return true; + } + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return true; + } + + if (v.isPresent()) { + downstream.onNext(v.get()); + return true; + } + return false; + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + downstream.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + downstream.onComplete(); + } + + } + static final class ParallelMapConditionalSubscriber implements ConditionalSubscriber, Subscription { + + final ConditionalSubscriber downstream; + + final Function> mapper; + + Subscription upstream; + + boolean done; + + ParallelMapConditionalSubscriber(ConditionalSubscriber actual, Function> mapper) { + this.downstream = actual; + this.mapper = mapper; + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t)) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return false; + } + + return v.isPresent() && downstream.tryOnNext(v.get()); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + downstream.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + downstream.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptional.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptional.java new file mode 100644 index 0000000000..a2d454c994 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptional.java @@ -0,0 +1,306 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.parallel.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Maps each 'rail' of the source ParallelFlowable with a mapper function + * and handle any failure based on a handler function. + * @param the input value type + * @param the output value type + * @since 3.0.0 + */ +public final class ParallelMapTryOptional extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + final BiFunction errorHandler; + + public ParallelMapTryOptional( + ParallelFlowable source, + Function> mapper, + BiFunction errorHandler) { + this.source = source; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelMapTryConditionalSubscriber<>((ConditionalSubscriber)a, mapper, errorHandler); + } else { + parents[i] = new ParallelMapTrySubscriber<>(a, mapper, errorHandler); + } + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelMapTrySubscriber implements ConditionalSubscriber, Subscription { + + final Subscriber downstream; + + final Function> mapper; + + final BiFunction errorHandler; + + Subscription upstream; + + boolean done; + + ParallelMapTrySubscriber(Subscriber actual, + Function> mapper, + BiFunction errorHandler) { + this.downstream = actual; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t) && !done) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + if (v.isPresent()) { + downstream.onNext(v.get()); + return true; + } + return false; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + downstream.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + downstream.onComplete(); + } + + } + static final class ParallelMapTryConditionalSubscriber implements ConditionalSubscriber, Subscription { + + final ConditionalSubscriber downstream; + + final Function> mapper; + + final BiFunction errorHandler; + + Subscription upstream; + + boolean done; + + ParallelMapTryConditionalSubscriber(ConditionalSubscriber actual, + Function> mapper, + BiFunction errorHandler) { + this.downstream = actual; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t) && !done) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + return v.isPresent() && downstream.tryOnNext(v.get()); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + downstream.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + downstream.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java index de7221cb38..b275337841 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java @@ -13,8 +13,7 @@ package io.reactivex.rxjava3.internal.operators.flowable; -import java.util.Iterator; -import java.util.Objects; +import java.util.*; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -80,6 +79,19 @@ public void subscribeActual(Subscriber s) { source.subscribe(new FlattenIterableSubscriber(s, mapper, prefetch)); } + /** + * Create a {@link Subscriber} with the given parameters. + * @param the upstream value type + * @param the {@link Iterable} and output value type + * @param downstream the downstream {@code Subscriber} to wrap + * @param mapper the mapper function + * @param prefetch the number of items to prefetch + * @return the new {@code Subscriber} + */ + public static Subscriber subscribe(Subscriber downstream, Function> mapper, int prefetch) { + return new FlattenIterableSubscriber<>(downstream, mapper, prefetch); + } + static final class FlattenIterableSubscriber extends BasicIntQueueSubscription implements FlowableSubscriber { @@ -118,7 +130,7 @@ static final class FlattenIterableSubscriber this.mapper = mapper; this.prefetch = prefetch; this.limit = prefetch - (prefetch >> 2); - this.error = new AtomicReference(); + this.error = new AtomicReference<>(); this.requested = new AtomicLong(); } @@ -153,7 +165,7 @@ public void onSubscribe(Subscription s) { } } - queue = new SpscArrayQueue(prefetch); + queue = new SpscArrayQueue<>(prefetch); downstream.onSubscribe(this); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelCollect.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelCollect.java index e1da1b76b7..29f9d91674 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelCollect.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelCollect.java @@ -67,7 +67,7 @@ public void subscribe(Subscriber[] subscribers) { return; } - parents[i] = new ParallelCollectSubscriber(subscribers[i], initialValue, collector); + parents[i] = new ParallelCollectSubscriber<>(subscribers[i], initialValue, collector); } source.subscribe(parents); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelDoOnNextTry.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelDoOnNextTry.java index 557d5282c4..6b3f69ff97 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelDoOnNextTry.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelDoOnNextTry.java @@ -59,9 +59,9 @@ public void subscribe(Subscriber[] subscribers) { for (int i = 0; i < n; i++) { Subscriber a = subscribers[i]; if (a instanceof ConditionalSubscriber) { - parents[i] = new ParallelDoOnNextConditionalSubscriber((ConditionalSubscriber)a, onNext, errorHandler); + parents[i] = new ParallelDoOnNextConditionalSubscriber<>((ConditionalSubscriber)a, onNext, errorHandler); } else { - parents[i] = new ParallelDoOnNextSubscriber(a, onNext, errorHandler); + parents[i] = new ParallelDoOnNextSubscriber<>(a, onNext, errorHandler); } } @@ -134,7 +134,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); @@ -245,7 +245,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilter.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilter.java index 6d1fc2fb96..14f414dce3 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilter.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilter.java @@ -51,9 +51,9 @@ public void subscribe(Subscriber[] subscribers) { for (int i = 0; i < n; i++) { Subscriber a = subscribers[i]; if (a instanceof ConditionalSubscriber) { - parents[i] = new ParallelFilterConditionalSubscriber((ConditionalSubscriber)a, predicate); + parents[i] = new ParallelFilterConditionalSubscriber<>((ConditionalSubscriber)a, predicate); } else { - parents[i] = new ParallelFilterSubscriber(a, predicate); + parents[i] = new ParallelFilterSubscriber<>(a, predicate); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilterTry.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilterTry.java index f6eff78249..bfd22ec5a8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilterTry.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilterTry.java @@ -57,9 +57,9 @@ public void subscribe(Subscriber[] subscribers) { for (int i = 0; i < n; i++) { Subscriber a = subscribers[i]; if (a instanceof ConditionalSubscriber) { - parents[i] = new ParallelFilterConditionalSubscriber((ConditionalSubscriber)a, predicate, errorHandler); + parents[i] = new ParallelFilterConditionalSubscriber<>((ConditionalSubscriber)a, predicate, errorHandler); } else { - parents[i] = new ParallelFilterSubscriber(a, predicate, errorHandler); + parents[i] = new ParallelFilterSubscriber<>(a, predicate, errorHandler); } } @@ -137,7 +137,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); @@ -226,7 +226,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFlatMapIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFlatMapIterable.java new file mode 100644 index 0000000000..1bb127233f --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFlatMapIterable.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.parallel; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlattenIterable; +import io.reactivex.rxjava3.parallel.ParallelFlowable; + +/** + * Flattens the generated {@link Iterable}s on each rail. + * + * @param the input value type + * @param the output value type + * @since 3.0.0 + */ +public final class ParallelFlatMapIterable extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + final int prefetch; + + public ParallelFlatMapIterable( + ParallelFlowable source, + Function> mapper, + int prefetch) { + this.source = source; + this.mapper = mapper; + this.prefetch = prefetch; + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + @SuppressWarnings("unchecked") + final Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + parents[i] = FlowableFlattenIterable.subscribe(subscribers[i], mapper, prefetch); + } + + source.subscribe(parents); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java index 126f7cdf13..0d4fb9a4c5 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java @@ -55,7 +55,7 @@ public void subscribe(Subscriber[] subscribers) { return; } - source.subscribe(new ParallelDispatcher(subscribers, prefetch)); + source.subscribe(new ParallelDispatcher<>(subscribers, prefetch)); } static final class ParallelDispatcher @@ -137,7 +137,7 @@ public void onSubscribe(Subscription s) { } } - queue = new SpscArrayQueue(prefetch); + queue = new SpscArrayQueue<>(prefetch); setupSubscribers(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java index b15fa8dfd5..4545b133f5 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java @@ -50,9 +50,9 @@ public ParallelJoin(ParallelFlowable source, int prefetch, boolean protected void subscribeActual(Subscriber s) { JoinSubscriptionBase parent; if (delayErrors) { - parent = new JoinSubscriptionDelayError(s, source.parallelism(), prefetch); + parent = new JoinSubscriptionDelayError<>(s, source.parallelism(), prefetch); } else { - parent = new JoinSubscription(s, source.parallelism(), prefetch); + parent = new JoinSubscription<>(s, source.parallelism(), prefetch); } s.onSubscribe(parent); source.subscribe(parent.subscribers); @@ -81,7 +81,7 @@ abstract static class JoinSubscriptionBase extends AtomicInteger JoinInnerSubscriber[] a = new JoinInnerSubscriber[n]; for (int i = 0; i < n; i++) { - a[i] = new JoinInnerSubscriber(this, prefetch); + a[i] = new JoinInnerSubscriber<>(this, prefetch); } this.subscribers = a; @@ -550,7 +550,7 @@ public boolean cancel() { SimplePlainQueue getQueue() { SimplePlainQueue q = queue; if (q == null) { - q = new SpscArrayQueue(prefetch); + q = new SpscArrayQueue<>(prefetch); this.queue = q; } return q; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelMapTry.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelMapTry.java index ff8da40c62..30250f2e60 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelMapTry.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelMapTry.java @@ -137,7 +137,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); @@ -250,7 +250,7 @@ public boolean tryOnNext(T t) { ParallelFailureHandling h; try { - h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + h = Objects.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null ParallelFailureHandling"); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); cancel(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelPeek.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelPeek.java index 50f04d46ec..206a2dd472 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelPeek.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelPeek.java @@ -75,7 +75,7 @@ public void subscribe(Subscriber[] subscribers) { Subscriber[] parents = new Subscriber[n]; for (int i = 0; i < n; i++) { - parents[i] = new ParallelPeekSubscriber(subscribers[i], this); + parents[i] = new ParallelPeekSubscriber<>(subscribers[i], this); } source.subscribe(parents); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduce.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduce.java index 87c02c8922..53fa2b3899 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduce.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduce.java @@ -66,7 +66,7 @@ public void subscribe(Subscriber[] subscribers) { return; } - parents[i] = new ParallelReduceSubscriber(subscribers[i], initialValue, reducer); + parents[i] = new ParallelReduceSubscriber<>(subscribers[i], initialValue, reducer); } source.subscribe(parents); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduceFull.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduceFull.java index e376c352cb..a47bc5b898 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduceFull.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduceFull.java @@ -22,6 +22,7 @@ import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.BiFunction; import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.AtomicThrowable; import io.reactivex.rxjava3.parallel.ParallelFlowable; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -44,7 +45,7 @@ public ParallelReduceFull(ParallelFlowable source, BiFunction s) { - ParallelReduceFullMainSubscriber parent = new ParallelReduceFullMainSubscriber(s, source.parallelism(), reducer); + ParallelReduceFullMainSubscriber parent = new ParallelReduceFullMainSubscriber<>(s, source.parallelism(), reducer); s.onSubscribe(parent); source.subscribe(parent.subscribers); @@ -58,18 +59,18 @@ static final class ParallelReduceFullMainSubscriber extends DeferredScalarSub final BiFunction reducer; - final AtomicReference> current = new AtomicReference>(); + final AtomicReference> current = new AtomicReference<>(); final AtomicInteger remaining = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicThrowable error = new AtomicThrowable(); ParallelReduceFullMainSubscriber(Subscriber subscriber, int n, BiFunction reducer) { super(subscriber); @SuppressWarnings("unchecked") ParallelReduceFullInnerSubscriber[] a = new ParallelReduceFullInnerSubscriber[n]; for (int i = 0; i < n; i++) { - a[i] = new ParallelReduceFullInnerSubscriber(this, reducer); + a[i] = new ParallelReduceFullInnerSubscriber<>(this, reducer); } this.subscribers = a; this.reducer = reducer; @@ -81,7 +82,7 @@ SlotPair addValue(T value) { SlotPair curr = current.get(); if (curr == null) { - curr = new SlotPair(); + curr = new SlotPair<>(); if (!current.compareAndSet(null, curr)) { continue; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java index bb6c1fd582..ceb2a7665a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java @@ -75,12 +75,12 @@ void createSubscriber(int i, Subscriber[] subscribers, Subscriber a = subscribers[i]; - SpscArrayQueue q = new SpscArrayQueue(prefetch); + SpscArrayQueue q = new SpscArrayQueue<>(prefetch); if (a instanceof ConditionalSubscriber) { - parents[i] = new RunOnConditionalSubscriber((ConditionalSubscriber)a, prefetch, q, worker); + parents[i] = new RunOnConditionalSubscriber<>((ConditionalSubscriber)a, prefetch, q, worker); } else { - parents[i] = new RunOnSubscriber(a, prefetch, q, worker); + parents[i] = new RunOnSubscriber<>(a, prefetch, q, worker); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelSortedJoin.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelSortedJoin.java index 0e20b6f2c9..7994abf697 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelSortedJoin.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelSortedJoin.java @@ -46,7 +46,7 @@ public ParallelSortedJoin(ParallelFlowable> source, Comparator s) { - SortedJoinSubscription parent = new SortedJoinSubscription(s, source.parallelism(), comparator); + SortedJoinSubscription parent = new SortedJoinSubscription<>(s, source.parallelism(), comparator); s.onSubscribe(parent); source.subscribe(parent.subscribers); @@ -74,7 +74,7 @@ static final class SortedJoinSubscription final AtomicInteger remaining = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference<>(); @SuppressWarnings("unchecked") SortedJoinSubscription(Subscriber actual, int n, Comparator comparator) { @@ -84,7 +84,7 @@ static final class SortedJoinSubscription SortedJoinInnerSubscriber[] s = new SortedJoinInnerSubscriber[n]; for (int i = 0; i < n; i++) { - s[i] = new SortedJoinInnerSubscriber(this, i); + s[i] = new SortedJoinInnerSubscriber<>(this, i); } this.subscribers = s; this.lists = new List[n]; diff --git a/src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java index d4dff1cbb0..c8fbf203ef 100644 --- a/src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java @@ -14,6 +14,7 @@ package io.reactivex.rxjava3.parallel; import java.util.*; +import java.util.stream.*; import org.reactivestreams.*; @@ -21,17 +22,19 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; +import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.operators.parallel.*; +import io.reactivex.rxjava3.internal.operators.parallel.ParallelReduceFull; import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import io.reactivex.rxjava3.internal.util.*; import io.reactivex.rxjava3.plugins.RxJavaPlugins; /** - * Abstract base class for Parallel publishers that take an array of Subscribers. + * Abstract base class for parallel publishing of events signaled to an array of {@link Subscriber}s. *

- * Use {@code from()} to start processing a regular Publisher in 'rails'. - * Use {@code runOn()} to introduce where each 'rail' should run on thread-vise. - * Use {@code sequential()} to merge the sources back into a single Flowable. + * Use {@link #from(Publisher)} to start processing a regular {@link Publisher} in 'rails'. + * Use {@link #runOn(Scheduler)} to introduce where each 'rail' should run on thread-vise. + * Use {@link #sequential()} to merge the sources back into a single {@link Flowable}. * *

History: 2.0.5 - experimental; 2.1 - beta * @param the value type @@ -173,7 +176,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc *

{@code map} does not operate by default on a particular {@link Scheduler}.
* * @param the output value type - * @param mapper the mapper function turning Ts into Us. + * @param mapper the mapper function turning Ts into Rs. * @return the new ParallelFlowable instance */ @CheckReturnValue @@ -199,7 +202,7 @@ public final ParallelFlowable map(@NonNull Function *

History: 2.0.8 - experimental * @param the output value type - * @param mapper the mapper function turning Ts into Us. + * @param mapper the mapper function turning Ts into Rs. * @param errorHandler the enumeration that defines how to handle errors thrown * from the mapper function * @return the new ParallelFlowable instance @@ -229,7 +232,7 @@ public final ParallelFlowable map(@NonNull Function *

History: 2.0.8 - experimental * @param the output value type - * @param mapper the mapper function turning Ts into Us. + * @param mapper the mapper function turning Ts into Rs. * @param errorHandler the function called with the current repeat count and * failure Throwable and should return one of the {@link ParallelFailureHandling} * enumeration values to indicate how to proceed. @@ -1331,4 +1334,297 @@ public final ParallelFlowable concatMapDelayError( return RxJavaPlugins.onAssembly(new ParallelConcatMap<>( this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } + + /** + * Returns a {@code ParallelFlowable} that merges each item emitted by the source on each rail with the values in an + * {@link Iterable} corresponding to that item that is generated by a selector. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from each downstream rail. The source {@code ParallelFlowable}s is + * expected to honor backpressure as well. If the source {@code ParallelFlowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting {@code Iterable} + * @param mapper + * a function that returns an {@code Iterable} sequence of values for when given an item emitted by the + * source {@code ParallelFlowable} + * @return the new ParallelFlowable instance + * @see
ReactiveX operators documentation: FlatMap + * @see #flatMapStream(Function) + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final ParallelFlowable flatMapIterable(@NonNull Function> mapper) { + return flatMapIterable(mapper, Flowable.bufferSize()); + } + + /** + * Returns a {@code ParallelFlowable} that merges each item emitted by the source {@code ParallelFlowable} with the values in an + * Iterable corresponding to that item that is generated by a selector. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from each downstream rail. The source {@code ParallelFlowable}s is + * expected to honor backpressure as well. If the source {@code ParallelFlowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting {@code Iterable} + * @param mapper + * a function that returns an {@code Iterable} sequence of values for when given an item emitted by the + * source {@code ParallelFlowable} + * @param bufferSize + * the number of elements to prefetch from each upstream rail + * @return the new {@code ParallelFlowable} instance + * @see ReactiveX operators documentation: FlatMap + * @see #flatMapStream(Function, int) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final ParallelFlowable flatMapIterable(@NonNull Function> mapper, int bufferSize) { + Objects.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + return RxJavaPlugins.onAssembly(new ParallelFlatMapIterable<>(this, mapper, bufferSize)); + } + + // ------------------------------------------------------------------------- + // JDK 8 Support + // ------------------------------------------------------------------------- + + /** + * Maps the source values on each 'rail' to an optional and emits its value if any. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + *

+ *
Backpressure:
+ *
The operator is a pass-through for backpressure and the behavior + * is determined by the upstream and downstream rail behaviors.
+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the output value type + * @param mapper the mapper function turning Ts into optional of Rs. + * @return the new ParallelFlowable instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final ParallelFlowable mapOptional(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper"); + return RxJavaPlugins.onAssembly(new ParallelMapOptional<>(this, mapper)); + } + + /** + * Maps the source values on each 'rail' to an optional and emits its value if any and + * handles errors based on the given {@link ParallelFailureHandling} enumeration value. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + *

+ *
Backpressure:
+ *
The operator is a pass-through for backpressure and the behavior + * is determined by the upstream and downstream rail behaviors.
+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ *

History: 2.0.8 - experimental + * @param the output value type + * @param mapper the mapper function turning Ts into optional of Rs. + * @param errorHandler the enumeration that defines how to handle errors thrown + * from the mapper function + * @return the new ParallelFlowable instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final ParallelFlowable mapOptional(@NonNull Function> mapper, @NonNull ParallelFailureHandling errorHandler) { + Objects.requireNonNull(mapper, "mapper"); + Objects.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler)); + } + + /** + * Maps the source values on each 'rail' to an optional and emits its value if any and + * handles errors based on the returned value by the handler function. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + *

+ *
Backpressure:
+ *
The operator is a pass-through for backpressure and the behavior + * is determined by the upstream and downstream rail behaviors.
+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ *

History: 2.0.8 - experimental + * @param the output value type + * @param mapper the mapper function turning Ts into optional of Rs. + * @param errorHandler the function called with the current repeat count and + * failure Throwable and should return one of the {@link ParallelFailureHandling} + * enumeration values to indicate how to proceed. + * @return the new ParallelFlowable instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final ParallelFlowable mapOptional(@NonNull Function> mapper, @NonNull BiFunction errorHandler) { + Objects.requireNonNull(mapper, "mapper"); + Objects.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler)); + } + + /** + * Maps each upstream item on each rail into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flatMapIterable(Function)}: + *


+     * source.flatMapIterable(v -> createStream(v)::iterator);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches {@link Flowable#bufferSize()} items of the upstream (then 75% of it after the 75% received) + * and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code flatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @return the new Flowable instance + * @see #flatMap(Function) + * @see #flatMapIterable(Function) + * @see #flatMapStream(Function, int) + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> ParallelFlowable flatMapStream(@NonNull Function> mapper) { + return flatMapStream(mapper, Flowable.bufferSize()); + } + + /** + * Maps each upstream item of each rail into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flatMapIterable(Function, int)}: + *


+     * source.flatMapIterable(v -> createStream(v)::iterator, 32);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed(), 32);
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches the given amount of upstream items and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code flatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received + * @return the new Flowable instance + * @see #flatMap(Function, boolean, int) + * @see #flatMapIterable(Function, int) + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> ParallelFlowable flatMapStream(@NonNull Function> mapper, int prefetch) { + Objects.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelFlatMapStream<>(this, mapper, prefetch)); + } + + /** + * Reduces all values within a 'rail' and across 'rails' with a callbacks + * of the given {@link Collector} into a single sequential value. + *

+ * Each parallel rail receives its own {@link Collector#accumulator()} and + * {@link Collector#combiner()}. + *

+ *
Backpressure:
+ *
The operator honors backpressure from the downstream and consumes + * the upstream rails in an unbounded manner (requesting {@link Long#MAX_VALUE}).
+ *
Scheduler:
+ *
{@code collect} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the accumulator type + * @param the output value type + * @param collector the {@code Collector} instance + * @return the new Flowable instance emitting the collected value. + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable collect(@NonNull Collector collector) { + Objects.requireNonNull(collector, "collector is null"); + return RxJavaPlugins.onAssembly(new ParallelCollector<>(this, collector)); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollectorTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollectorTest.java new file mode 100644 index 0000000000..8e558a0048 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollectorTest.java @@ -0,0 +1,305 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.*; +import java.util.stream.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.parallel.ParallelInvalid; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.processors.BehaviorProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class ParallelCollectorTest extends RxJavaTest { + + static Set set(int count) { + return IntStream.rangeClosed(1, count) + .boxed() + .collect(Collectors.toSet()); + } + + @Test + public void basic() { + TestSubscriberEx> ts = Flowable.range(1, 5) + .parallel() + .collect(Collectors.toList()) + .subscribeWith(new TestSubscriberEx<>()); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertEquals(5, ts.values().get(0).size()); + assertTrue(ts.values().get(0).containsAll(set(5))); + } + + @Test + public void empty() { + Flowable.empty() + .parallel() + .collect(Collectors.toList()) + .test() + .assertResult(Collections.emptyList()); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .collect(Collectors.toList()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorSupplierCrash() { + Flowable.range(1, 5) + .parallel() + .collect(new Collector() { + + @Override + public Supplier supplier() { + throw new TestException(); + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorAccumulatorCrash() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .parallel() + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void collectorCombinerCrash() { + Flowable.range(1, 5) + .parallel() + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorFinisherCrash() { + Flowable.range(1, 5) + .parallel() + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> { throw new TestException(); }; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void async() { + for (int i = 1; i < 32; i++) { + TestSubscriber> ts = Flowable.range(1, 1000) + .parallel(i) + .runOn(Schedulers.computation()) + .collect(Collectors.toList()) + .test() + .withTag("Parallelism: " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertEquals(1000, ts.values().get(0).size()); + + assertTrue(ts.values().get(0).containsAll(set(1000))); + } + } + + @Test + public void asyncHidden() { + for (int i = 1; i < 32; i++) { + TestSubscriber> ts = Flowable.range(1, 1000) + .hide() + .parallel(i) + .runOn(Schedulers.computation()) + .collect(Collectors.toList()) + .test() + .withTag("Parallelism: " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertEquals(1000, ts.values().get(0).size()); + + assertTrue(ts.values().get(0).containsAll(set(1000))); + } + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .collect(Collectors.toList()) + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex.getCause() instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void asyncSum() { + long n = 1_000; + for (int i = 1; i < 32; i++) { + Flowable.rangeLong(1, n) + .parallel(i) + .runOn(Schedulers.computation()) + .collect(Collectors.summingLong(v -> v)) + .test() + .withTag("Parallelism: " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(n * (n + 1) / 2); + } + } + + @Test + public void asyncSumLong() { + long n = 1_000_000; + Flowable.rangeLong(1, n) + .parallel() + .runOn(Schedulers.computation()) + .collect(Collectors.summingLong(v -> v)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(n * (n + 1) / 2); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStreamTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStreamTest.java new file mode 100644 index 0000000000..0f39d4c842 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStreamTest.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.Stream; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.parallel.ParallelFlowableTest; + +public class ParallelFlatMapStreamTest extends RxJavaTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .flatMapStream(v -> Stream.of(1, 2, 3))); + } + + @Test + public void normal() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapStream(v -> Stream.of(v, v + 1)) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertValueCount(2000) + .assertNoErrors() + .assertComplete(); + } + } + + @Test + public void none() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapStream(v -> Stream.of()) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertResult(); + } + } + + @Test + public void mixed() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapStream(v -> v % 2 == 0 ? Stream.of(v) : Stream.of()) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptionalTest.java new file mode 100644 index 0000000000..67cab4087e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptionalTest.java @@ -0,0 +1,228 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.parallel.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class ParallelMapOptionalTest extends RxJavaTest { + + @Test + public void doubleFilter() { + Flowable.range(1, 10) + .parallel() + .mapOptional(Optional::of) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 3 == 0; + } + }) + .sequential() + .test() + .assertResult(6); + } + + @Test + public void doubleFilterAsync() { + Flowable.range(1, 10) + .parallel() + .runOn(Schedulers.computation()) + .mapOptional(Optional::of) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 3 == 0; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(6); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .mapOptional(Optional::of) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex.getCause() instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleError2() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .mapOptional(Optional::of) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex.getCause() instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .mapOptional(Optional::of) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrash() { + Flowable.just(1) + .parallel() + .mapOptional(v -> { throw new TestException(); }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrashConditional() { + Flowable.just(1) + .parallel() + .mapOptional(v -> { throw new TestException(); }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrashConditional2() { + Flowable.just(1) + .parallel() + .runOn(Schedulers.computation()) + .mapOptional(v -> { throw new TestException(); }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void allNone() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> Optional.empty()) + .sequential() + .test() + .assertResult(); + } + + @Test + public void allNoneConditional() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mixed() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty()) + .sequential() + .test() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mixedConditional() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty()) + .filter(v -> true) + .sequential() + .test() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void invalidSubscriberCount() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.range(1, 10).parallel() + .mapOptional(Optional::of) + ); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.mapOptional(Optional::of) + ); + + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.mapOptional(Optional::of) + .filter(v -> true) + ); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptionalTest.java new file mode 100644 index 0000000000..b7cae91568 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapTryOptionalTest.java @@ -0,0 +1,378 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.parallel.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.testsupport.*; + +public class ParallelMapTryOptionalTest extends RxJavaTest implements Consumer { + + volatile int calls; + + @Override + public void accept(Object t) throws Exception { + calls++; + } + + @Test + public void mapNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .mapOptional(Optional::of, e) + .sequential() + .test() + .assertResult(1); + } + } + + @Test + public void mapErrorNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .mapOptional(Optional::of, e) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void mapConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .mapOptional(Optional::of, e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + } + + @Test + public void mapErrorConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .mapOptional(Optional::of, e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void mapFailWithError() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.ERROR) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void mapFailWithStop() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.STOP) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mapFailWithRetry() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(new Function>() { + int count; + @Override + public Optional apply(Integer v) throws Exception { + if (count++ == 1) { + return Optional.of(-1); + } + return Optional.of(1 / v); + } + }, ParallelFailureHandling.RETRY) + .sequential() + .test() + .assertResult(-1, 1); + } + + @Test + public void mapFailWithRetryLimited() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailWithSkip() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailHandlerThrows() { + TestSubscriberEx ts = Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .sequential() + .to(TestHelper.testConsumer()) + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void mapInvalidSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .mapOptional(Optional::of, ParallelFailureHandling.ERROR) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void mapFailWithErrorConditional() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void mapFailWithStopConditional() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.STOP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mapFailWithRetryConditional() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(new Function>() { + int count; + @Override + public Optional apply(Integer v) throws Exception { + if (count++ == 1) { + return Optional.of(-1); + } + return Optional.of(1 / v); + } + }, ParallelFailureHandling.RETRY) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(-1, 1); + } + + @Test + public void mapFailWithRetryLimitedConditional() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailWithSkipConditional() { + Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), ParallelFailureHandling.SKIP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailHandlerThrowsConditional() { + TestSubscriberEx ts = Flowable.range(0, 2) + .parallel(1) + .mapOptional(v -> Optional.of(1 / v), new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .to(TestHelper.testConsumer()) + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void mapWrongParallelismConditional() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .mapOptional(Optional::of, ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + ); + } + + @Test + public void mapInvalidSourceConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .mapOptional(Optional::of, ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void failureHandlingEnum() { + TestHelper.checkEnum(ParallelFailureHandling.class); + } + + @Test + public void allNone() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> Optional.empty(), ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertResult(); + } + + @Test + public void allNoneConditional() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> Optional.empty(), ParallelFailureHandling.SKIP) + .filter(v -> true) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mixed() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty(), ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mixedConditional() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty(), ParallelFailureHandling.SKIP) + .filter(v -> true) + .sequential() + .test() + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mixedConditional2() { + Flowable.range(1, 1000) + .parallel() + .mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty(), ParallelFailureHandling.SKIP) + .filter(v -> v % 4 == 0) + .sequential() + .test() + .assertValueCount(250) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void invalidSubscriberCount() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.range(1, 10).parallel() + .mapOptional(Optional::of, ParallelFailureHandling.SKIP) + ); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.mapOptional(Optional::of, ParallelFailureHandling.ERROR) + ); + + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.mapOptional(Optional::of, ParallelFailureHandling.ERROR) + .filter(v -> true) + ); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/parallel/ParallelFlatMapIterableTest.java b/src/test/java/io/reactivex/rxjava3/parallel/ParallelFlatMapIterableTest.java new file mode 100644 index 0000000000..916d3651b9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/parallel/ParallelFlatMapIterableTest.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.parallel; + +import java.util.Arrays; +import java.util.stream.Stream; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; + +public class ParallelFlatMapIterableTest extends RxJavaTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .flatMapStream(v -> Stream.of(1, 2, 3))); + } + + @Test + public void normal() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapIterable(v -> Arrays.asList(v, v + 1)) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertValueCount(2000) + .assertNoErrors() + .assertComplete(); + } + } + + @Test + public void none() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapIterable(v -> Arrays.asList()) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertResult(); + } + } + + @Test + public void mixed() { + for (int i = 1; i < 32; i++) { + Flowable.range(1, 1000) + .parallel(i) + .flatMapIterable(v -> v % 2 == 0 ? Arrays.asList(v) : Arrays.asList()) + .sequential() + .test() + .withTag("Parallelism: " + i) + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTest.java b/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTest.java index c8e850daf1..e45034d416 100644 --- a/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTest.java @@ -15,7 +15,7 @@ import static org.junit.Assert.*; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -179,4 +179,24 @@ public Object apply(Integer v) throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class); } + + @Test + public void invalidSubscriberCount() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.range(1, 10).parallel() + .map(v -> v) + ); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.map(v -> v) + ); + + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.map(v -> v) + .filter(v -> true) + ); + } } diff --git a/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTryTest.java b/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTryTest.java index b036b2e17f..d56669d755 100644 --- a/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTryTest.java +++ b/src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTryTest.java @@ -13,7 +13,7 @@ package io.reactivex.rxjava3.parallel; -import java.util.List; +import java.util.*; import org.junit.Test; @@ -353,4 +353,24 @@ public void mapInvalidSourceConditional() { public void failureHandlingEnum() { TestHelper.checkEnum(ParallelFailureHandling.class); } + + @Test + public void invalidSubscriberCount() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.range(1, 10).parallel() + .map(v -> v, ParallelFailureHandling.SKIP) + ); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.map(v -> v, ParallelFailureHandling.ERROR) + ); + + TestHelper.checkDoubleOnSubscribeParallel( + p -> p.map(v -> v, ParallelFailureHandling.ERROR) + .filter(v -> true) + ); + } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index eabbb6f96d..f50354b6a5 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -1421,6 +1421,71 @@ protected void subscribeActual(Subscriber subscriber) { } } + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + @SuppressWarnings("unchecked") + public static void checkDoubleOnSubscribeParallel(Function, ? extends ParallelFlowable> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null, null, null }; + final CountDownLatch cdl = new CountDownLatch(2); + + ParallelFlowable source = new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + for (int i = 0; i < subscribers.length; i++) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs2); + + b[i * 2 + 0] = bs1.isCancelled(); + b[i * 2 + 1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + } + + @Override + public int parallelism() { + return 2; + } + }; + + ParallelFlowable out = transform.apply(source); + + out.subscribe(new Subscriber[] { NoOpConsumer.INSTANCE, NoOpConsumer.INSTANCE }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Rail 1 First disposed?", false, b[0]); + assertEquals("Rail 1 Second not disposed?", true, b[1]); + + assertEquals("Rail 2 First disposed?", false, b[2]); + assertEquals("Rail 2 Second not disposed?", true, b[3]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + assertError(errors, 1, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** * Check if the given transformed reactive type reports multiple onSubscribe calls to * RxJavaPlugins. diff --git a/src/test/java/io/reactivex/rxjava3/validators/TooManyEmptyNewLines.java b/src/test/java/io/reactivex/rxjava3/validators/TooManyEmptyNewLines.java index c87f19f10e..581e78301b 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/TooManyEmptyNewLines.java +++ b/src/test/java/io/reactivex/rxjava3/validators/TooManyEmptyNewLines.java @@ -112,7 +112,14 @@ static void findPattern(int newLines) throws Exception { fail .append(fname) .append("#L").append(i + 1) - .append("\n"); + .append("\n") + .append(" at ") + .append(fname.replace(".java", "")) + .append(".method(") + .append(fname) + .append(":").append(i + 1) + .append(")\n") + ; total++; i += c; } @@ -124,9 +131,7 @@ static void findPattern(int newLines) throws Exception { } } if (total != 0) { - fail.append("Found ") - .append(total) - .append(" instances"); + fail.insert(0, "Found " + total + " instances\n"); System.out.println(fail); throw new AssertionError(fail.toString()); }