Skip to content

Commit 78c70d6

Browse files
authored
3.x: Add Single.mergeArray & mergeArrayDelayError (#6882)
* 3.x: Add Single.mergeArray & mergeArrayDelayError * Remove unnecessary shortcut
1 parent 9283700 commit 78c70d6

File tree

4 files changed

+131
-6
lines changed

4 files changed

+131
-6
lines changed

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,7 @@ public static <T> Flowable<T> merge(
14151415
}
14161416

14171417
/**
1418-
* Merges an array sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence,
1418+
* Merges an array of {@link MaybeSource} instances into a single {@link Flowable} sequence,
14191419
* running all {@code MaybeSource}s at once.
14201420
* <p>
14211421
* <img width="640" height="272" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.mergeArray.png" alt="">
@@ -1470,10 +1470,10 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
14701470
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.mergeArrayDelayError.png" alt="">
14711471
* <p>
14721472
* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an
1473-
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
1473+
* error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that
14741474
* error notification until all of the merged {@code MaybeSource}s have finished emitting items.
14751475
* <p>
1476-
* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only
1476+
* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only
14771477
* invoke the {@code onError} method of its subscribers once.
14781478
* <dl>
14791479
* <dt><b>Backpressure:</b></dt>

src/main/java/io/reactivex/rxjava3/core/Single.java

+76-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.concurrent.*;
1818
import java.util.stream.*;
1919

20-
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.*;
2121

2222
import io.reactivex.rxjava3.annotations.*;
2323
import io.reactivex.rxjava3.disposables.Disposable;
@@ -1323,6 +1323,81 @@ public static <T> Flowable<T> merge(
13231323
return merge(Flowable.fromArray(source1, source2, source3, source4));
13241324
}
13251325

1326+
/**
1327+
* Merges an array of {@link SingleSource} instances into a single {@link Flowable} sequence,
1328+
* running all {@code SingleSource}s at once.
1329+
* <p>
1330+
* <img width="640" height="272" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.mergeArray.png" alt="">
1331+
* <dl>
1332+
* <dt><b>Backpressure:</b></dt>
1333+
* <dd>The operator honors backpressure from downstream.</dd>
1334+
* <dt><b>Scheduler:</b></dt>
1335+
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
1336+
* <dt><b>Error handling:</b></dt>
1337+
* <dd>If any of the source {@code SingleSource}s signal a {@link Throwable} via {@code onError}, the resulting
1338+
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
1339+
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
1340+
* first one's error or, depending on the concurrency of the sources, may terminate with a
1341+
* {@link CompositeException} containing two or more of the various error signals.
1342+
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
1343+
* {@link RxJavaPlugins#onError(Throwable)} method as {@link UndeliverableException} errors. Similarly, {@code Throwable}s
1344+
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
1345+
* (composite) error will be sent to the same global error handler.
1346+
* Use {@link #mergeArrayDelayError(SingleSource...)} to merge sources and terminate only when all source {@code SingleSource}s
1347+
* have completed or failed with an error.
1348+
* </dd>
1349+
* </dl>
1350+
* @param <T> the common and resulting value type
1351+
* @param sources the array sequence of {@code SingleSource} sources
1352+
* @return the new {@code Flowable} instance
1353+
* @throws NullPointerException if {@code sources} is {@code null}
1354+
* @see #mergeArrayDelayError(SingleSource...)
1355+
*/
1356+
@BackpressureSupport(BackpressureKind.FULL)
1357+
@CheckReturnValue
1358+
@NonNull
1359+
@SchedulerSupport(SchedulerSupport.NONE)
1360+
@SafeVarargs
1361+
public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
1362+
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length);
1363+
}
1364+
1365+
/**
1366+
* Flattens an array of {@link SingleSource}s into one {@link Flowable}, in a way that allows a subscriber to receive all
1367+
* successfully emitted items from each of the source {@code SingleSource}s without being interrupted by an error
1368+
* notification from one of them.
1369+
* <p>
1370+
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.mergeArrayDelayError.png" alt="">
1371+
* <p>
1372+
* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code SingleSource}s notify of an
1373+
* error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that
1374+
* error notification until all of the merged {@code SingleSource}s have finished emitting items.
1375+
* <p>
1376+
* Even if multiple merged {@code SingleSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only
1377+
* invoke the {@code onError} method of its subscribers once.
1378+
* <dl>
1379+
* <dt><b>Backpressure:</b></dt>
1380+
* <dd>The operator honors backpressure from downstream.</dd>
1381+
* <dt><b>Scheduler:</b></dt>
1382+
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1383+
* </dl>
1384+
*
1385+
* @param <T> the common element base type
1386+
* @param sources
1387+
* the array of {@code SingleSource}s
1388+
* @return the new {@code Flowable} instance
1389+
* @throws NullPointerException if {@code sources} is {@code null}
1390+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1391+
*/
1392+
@BackpressureSupport(BackpressureKind.FULL)
1393+
@CheckReturnValue
1394+
@SchedulerSupport(SchedulerSupport.NONE)
1395+
@SafeVarargs
1396+
@NonNull
1397+
public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
1398+
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length);
1399+
}
1400+
13261401
/**
13271402
* Merges an {@link Iterable} sequence of {@link SingleSource} instances into one {@link Flowable} sequence,
13281403
* running all {@code SingleSource}s at once and delaying any error(s) until all sources succeed or fail.

src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void dispose() {
5757

5858
@Test
5959
public void cancel2() {
60-
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() {};
60+
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() { };
6161

6262
assertFalse(qs.isDisposed());
6363

@@ -66,7 +66,7 @@ public void cancel2() {
6666

6767
@Test
6868
public void dispose2() {
69-
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() {};
69+
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() { };
7070

7171
assertFalse(qs.isDisposed());
7272

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.single;
15+
16+
import org.junit.Test;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.exceptions.TestException;
20+
21+
public class SingleMergeArrayTest extends RxJavaTest {
22+
23+
@Test
24+
public void normal() {
25+
Single.mergeArray(Single.just(1), Single.just(2), Single.just(3))
26+
.test()
27+
.assertResult(1, 2, 3);
28+
}
29+
30+
@Test
31+
public void error() {
32+
Single.mergeArray(Single.just(1), Single.error(new TestException()), Single.just(3))
33+
.test()
34+
.assertFailure(TestException.class, 1);
35+
}
36+
37+
@Test
38+
public void normalDelayError() {
39+
Single.mergeArrayDelayError(Single.just(1), Single.just(2), Single.just(3))
40+
.test()
41+
.assertResult(1, 2, 3);
42+
}
43+
44+
@Test
45+
public void errorDelayError() {
46+
Single.mergeArrayDelayError(Single.just(1), Single.error(new TestException()), Single.just(3))
47+
.test()
48+
.assertFailure(TestException.class, 1, 3);
49+
}
50+
}

0 commit comments

Comments
 (0)