diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index f16fa2cf03..cb7fbd726f 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -129,9 +129,9 @@
* }, BackpressureStrategy.BUFFER);
*
* System.out.println("Subscribe!");
- *
+ *
* source.subscribe(System.out::println);
- *
+ *
* System.out.println("Done!");
*
*
@@ -277,50 +277,6 @@ public static Flowable combineLatest(Publisher extends T>[] sources,
return combineLatest(sources, combiner, bufferSize());
}
- /**
- * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
- * the source Publishers each time an item is received from any of the source Publishers, where this
- * aggregation is defined by a specified function.
- *
- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
- * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
- * {@code Function} passed to the method would trigger a {@code ClassCastException}.
- *
- * If any of the sources never produces an item but only terminates (normally or with an error), the
- * resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
- * If that input source is also synchronous, other sources after it will not be subscribed to.
- *
- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting
- * any items and without any calls to the combiner function.
- *
- *
- * Backpressure:
- * The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
- * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal
- * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- * Scheduler:
- * {@code combineLatest} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param
- * the common base type of source values
- * @param
- * the result type
- * @param sources
- * the collection of source Publishers
- * @param combiner
- * the aggregation function used to combine the items emitted by the source Publishers
- * @return a Flowable that emits items that are the result of combining the items emitted by the source
- * Publishers by means of the given aggregation function
- * @see ReactiveX operators documentation: CombineLatest
- */
- @SchedulerSupport(SchedulerSupport.NONE)
- @CheckReturnValue
- @BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatest(Function super Object[], ? extends R> combiner, Publisher extends T>... sources) {
- return combineLatest(sources, combiner, bufferSize());
- }
-
/**
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
@@ -515,100 +471,6 @@ public static Flowable combineLatestDelayError(Publisher extends T>[
return combineLatestDelayError(sources, combiner, bufferSize());
}
- /**
- * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
- * the source Publishers each time an item is received from any of the source Publishers, where this
- * aggregation is defined by a specified function and delays any error from the sources until
- * all source Publishers terminate.
- *
- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
- * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
- * {@code Function} passed to the method would trigger a {@code ClassCastException}.
- *
- * If any of the sources never produces an item but only terminates (normally or with an error), the
- * resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
- * If that input source is also synchronous, other sources after it will not be subscribed to.
- *
- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting
- * any items and without any calls to the combiner function.
- *
- *
- * Backpressure:
- * The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
- * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal
- * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- * Scheduler:
- * {@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param
- * the common base type of source values
- * @param
- * the result type
- * @param sources
- * the collection of source Publishers
- * @param combiner
- * the aggregation function used to combine the items emitted by the source Publishers
- * @return a Flowable that emits items that are the result of combining the items emitted by the source
- * Publishers by means of the given aggregation function
- * @see ReactiveX operators documentation: CombineLatest
- */
- @SchedulerSupport(SchedulerSupport.NONE)
- @CheckReturnValue
- @BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestDelayError(Function super Object[], ? extends R> combiner,
- Publisher extends T>... sources) {
- return combineLatestDelayError(sources, combiner, bufferSize());
- }
-
- /**
- * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
- * the source Publishers each time an item is received from any of the source Publisher, where this
- * aggregation is defined by a specified function and delays any error from the sources until
- * all source Publishers terminate.
- *
- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
- * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
- * {@code Function} passed to the method would trigger a {@code ClassCastException}.
- *
- * If any of the sources never produces an item but only terminates (normally or with an error), the
- * resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
- * If that input source is also synchronous, other sources after it will not be subscribed to.
- *
- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting
- * any items and without any calls to the combiner function.
- *
- *
- * Backpressure:
- * The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
- * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal
- * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- * Scheduler:
- * {@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param
- * the common base type of source values
- * @param
- * the result type
- * @param sources
- * the collection of source Publishers
- * @param combiner
- * the aggregation function used to combine the items emitted by the source Publishers
- * @param bufferSize
- * the internal buffer size and prefetch amount applied to every source Publisher
- * @return a Flowable that emits items that are the result of combining the items emitted by the source
- * Publishers by means of the given aggregation function
- * @see ReactiveX operators documentation: CombineLatest
- */
- @SchedulerSupport(SchedulerSupport.NONE)
- @CheckReturnValue
- @BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestDelayError(Function super Object[], ? extends R> combiner,
- int bufferSize, Publisher extends T>... sources) {
- return combineLatestDelayError(sources, combiner, bufferSize);
- }
-
/**
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
@@ -802,8 +664,7 @@ public static Flowable combineLatest(
BiFunction super T1, ? super T2, ? extends R> combiner) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
- Function f = Functions.toFunction(combiner);
- return combineLatest(f, source1, source2);
+ return combineLatest(new Publisher[] { source1, source2 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -853,7 +714,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3);
+ return combineLatest(new Publisher[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -907,7 +768,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -966,7 +827,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -1029,7 +890,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -1097,7 +958,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -1169,7 +1030,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7, source8);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -1246,7 +1107,7 @@ public static Flowable combineLatest(
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
ObjectHelper.requireNonNull(source9, "source9 is null");
- return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7, source8, source9);
+ return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -10870,15 +10731,15 @@ public final Flowable> groupBy(Function super T,
* map has been evicted. The next source emission will bring about the completion of the evicted
* {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable}
* will prompt the creation and emission of a new {@link GroupedFlowable} with that key.
- *
+ *
* A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
* internal hash map containing the {@link GroupedFlowable}s.
- *
+ *
*
The map created by an {@code evictingMapFactory} must be thread-safe.
- *
+ *
*
An example of an {@code evictingMapFactory} using CacheBuilder from the Guava library is below:
- *
+ *
*
* Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory =
* notify ->
@@ -10905,7 +10766,7 @@ public final Flowable> groupBy(Function super T,
* .flatMap(g -> g)
* .forEach(System.out::println);
*
- *
+ *
*
*
*
@@ -11238,7 +11099,7 @@ public final Single lastOrError() {
* Example:
*
* // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
- *
+ *
* public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
*
* // The downstream's Subscriber that will receive the onXXX events
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index bcc013a10f..dc003e1f0a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -168,49 +168,6 @@ public static int bufferSize() {
return Flowable.bufferSize();
}
- /**
- * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
- * the source ObservableSources each time an item is received from any of the source ObservableSources, where this
- * aggregation is defined by a specified function.
- *
- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
- * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
- * {@code Function} passed to the method would trigger a {@code ClassCastException}.
- *
- * If any of the sources never produces an item but only terminates (normally or with an error), the
- * resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
- * If that input source is also synchronous, other sources after it will not be subscribed to.
- *
- * If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
- * any items and without any calls to the combiner function.
- *
- *
- *
- *
- * Scheduler:
- * {@code combineLatest} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param
- * the common base type of source values
- * @param
- * the result type
- * @param sources
- * the collection of source ObservableSources
- * @param combiner
- * the aggregation function used to combine the items emitted by the source ObservableSources
- * @param bufferSize
- * the internal buffer size and prefetch amount applied to every source Observable
- * @return an Observable that emits items that are the result of combining the items emitted by the source
- * ObservableSources by means of the given aggregation function
- * @see ReactiveX operators documentation: CombineLatest
- */
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- public static Observable combineLatest(Function super Object[], ? extends R> combiner, int bufferSize, ObservableSource extends T>... sources) {
- return combineLatest(sources, combiner, bufferSize);
- }
-
/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
@@ -437,7 +394,7 @@ public static Observable combineLatest(
BiFunction super T1, ? super T2, ? extends R> combiner) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2);
+ return combineLatest(new ObservableSource[] { source1, source2 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -482,7 +439,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3);
+ return combineLatest(new ObservableSource[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -531,7 +488,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -585,7 +542,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -643,7 +600,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -706,7 +663,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -773,7 +730,7 @@ public static Observable combineLatest(
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -845,7 +802,7 @@ public static Observable combineLates
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
ObjectHelper.requireNonNull(source9, "source9 is null");
- return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9);
+ return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize());
}
/**
@@ -890,51 +847,6 @@ public static Observable combineLatestDelayError(ObservableSource ex
return combineLatestDelayError(sources, combiner, bufferSize());
}
- /**
- * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
- * the source ObservableSources each time an item is received from any of the source ObservableSources, where this
- * aggregation is defined by a specified function and delays any error from the sources until
- * all source ObservableSources terminate.
- *
- *
- *
- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
- * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
- * {@code Function} passed to the method would trigger a {@code ClassCastException}.
- *
- * If any of the sources never produces an item but only terminates (normally or with an error), the
- * resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
- * If that input source is also synchronous, other sources after it will not be subscribed to.
- *
- * If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
- * any items and without any calls to the combiner function.
- *
- *
- * Scheduler:
- * {@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param
- * the common base type of source values
- * @param
- * the result type
- * @param sources
- * the collection of source ObservableSources
- * @param combiner
- * the aggregation function used to combine the items emitted by the source ObservableSources
- * @param bufferSize
- * the internal buffer size and prefetch amount applied to every source Observable
- * @return an Observable that emits items that are the result of combining the items emitted by the source
- * ObservableSources by means of the given aggregation function
- * @see ReactiveX operators documentation: CombineLatest
- */
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- public static Observable combineLatestDelayError(Function super Object[], ? extends R> combiner,
- int bufferSize, ObservableSource extends T>... sources) {
- return combineLatestDelayError(sources, combiner, bufferSize);
- }
-
/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java
index 1e279f64c1..dffc68f230 100644
--- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java
+++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java
@@ -76,27 +76,6 @@ public void ambIterableOneIsNull() {
.assertError(NullPointerException.class);
}
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, (Publisher[])null);
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsOneIsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, Flowable.never(), null).blockingLast();
- }
-
@Test(expected = NullPointerException.class)
public void combineLatestIterableNull() {
Flowable.combineLatestDelayError((Iterable>)null, new Function() {
@@ -133,23 +112,6 @@ public Object apply(Object[] v) {
}).blockingLast();
}
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsFunctionNull() {
- Flowable.combineLatestDelayError(null, Flowable.never());
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsFunctionReturnsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return null;
- }
- }, just1).blockingLast();
- }
-
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableFunctionNull() {
@@ -2759,12 +2721,6 @@ public void combineLatestDelayErrorIterableFunctionNull() {
Flowable.combineLatestDelayError(Arrays.asList(just1), null, 128);
}
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsFunctionNull() {
- Flowable.combineLatestDelayError(null, 128, Flowable.never());
- }
-
@Test(expected = NullPointerException.class)
public void zipFlowableNull() {
Flowable.zip((Flowable>)null, new Function() {
@@ -2795,27 +2751,6 @@ public void concatFlowableNull() {
Flowable.concat((Flowable>)null);
}
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, (Flowable[])null);
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsOneIsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, Flowable.never(), null).blockingLast();
- }
-
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableNull() {
Flowable.combineLatestDelayError((Iterable>)null, new Function() {
@@ -2876,15 +2811,4 @@ public void delaySubscriptionOtherNull() {
public void sampleFlowableNull() {
just1.sample(null);
}
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
- Flowable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return null;
- }
- }, 128, just1).blockingLast();
- }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java
index bbffd97ee4..0a07b99d81 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java
@@ -1305,15 +1305,14 @@ public Object apply(Object a, Object b) throws Exception {
@Test
public void errorDelayed() {
Flowable.combineLatestDelayError(
+ new Publisher[] { Flowable.error(new TestException()), Flowable.just(1) },
new Function() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
- 128,
- Flowable.error(new TestException()),
- Flowable.just(1)
+ 128
)
.test()
.assertFailure(TestException.class);
@@ -1323,15 +1322,14 @@ public Object apply(Object[] a) throws Exception {
@Test
public void errorDelayed2() {
Flowable.combineLatestDelayError(
+ new Publisher[] { Flowable.error(new TestException()).startWithItem(1), Flowable.empty() },
new Function() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
- 128,
- Flowable.error(new TestException()).startWithItem(1),
- Flowable.empty()
+ 128
)
.test()
.assertFailure(TestException.class);
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java
index bb27338d84..e20e96892d 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java
@@ -926,15 +926,14 @@ public Object apply(Object a, Object b) throws Exception {
@Test
public void errorDelayed() {
Observable.combineLatestDelayError(
+ new ObservableSource[] { Observable.error(new TestException()), Observable.just(1) },
new Function() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
- 128,
- Observable.error(new TestException()),
- Observable.just(1)
+ 128
)
.test()
.assertFailure(TestException.class);
@@ -944,15 +943,14 @@ public Object apply(Object[] a) throws Exception {
@Test
public void errorDelayed2() {
Observable.combineLatestDelayError(
+ new ObservableSource[] { Observable.error(new TestException()).startWithItem(1), Observable.empty() },
new Function() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
- 128,
- Observable.error(new TestException()).startWithItem(1),
- Observable.empty()
+ 128
)
.test()
.assertFailure(TestException.class);
diff --git a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java
index 41cfb91bb2..76952c0904 100644
--- a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java
+++ b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java
@@ -74,27 +74,6 @@ public void ambIterableOneIsNull() {
.assertError(NullPointerException.class);
}
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsNull() {
- Observable.combineLatest(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, (Observable[])null);
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsOneIsNull() {
- Observable.combineLatest(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, Observable.never(), null).blockingLast();
- }
-
@Test(expected = NullPointerException.class)
public void combineLatestIterableNull() {
Observable.combineLatest((Iterable>)null, new Function() {
@@ -131,23 +110,6 @@ public Object apply(Object[] v) {
}, 128).blockingLast();
}
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsFunctionNull() {
- Observable.combineLatest(null, 128, Observable.never());
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestVarargsFunctionReturnsNull() {
- Observable.combineLatest(new Function() {
- @Override
- public Object apply(Object[] v) {
- return null;
- }
- }, 128, just1).blockingLast();
- }
-
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableFunctionNull() {
@@ -165,27 +127,6 @@ public Object apply(Object[] v) {
}, 128).blockingLast();
}
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsNull() {
- Observable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, (Observable[])null);
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsOneIsNull() {
- Observable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return 1;
- }
- }, 128, Observable.never(), null).blockingLast();
- }
-
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableNull() {
Observable.combineLatestDelayError((Iterable>)null, new Function() {
@@ -222,23 +163,6 @@ public Object apply(Object[] v) {
}, 128).blockingLast();
}
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsFunctionNull() {
- Observable.combineLatestDelayError(null, 128, Observable.never());
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
- Observable.combineLatestDelayError(new Function() {
- @Override
- public Object apply(Object[] v) {
- return null;
- }
- }, 128, just1).blockingLast();
- }
-
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableFunctionNull() {
diff --git a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java
index eb662c8b90..dcb5a94fe6 100644
--- a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java
+++ b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java
@@ -27,14 +27,13 @@ public class CombineLatestArrayDelayErrorTckTest extends BaseTck {
public Publisher createPublisher(long elements) {
return
Flowable.combineLatestDelayError(
+ new Publisher[] { Flowable.just(1L), Flowable.fromIterable(iterate(elements)) },
new Function() {
@Override
public Long apply(Object[] a) throws Exception {
return (Long)a[0];
}
- },
- Flowable.just(1L),
- Flowable.fromIterable(iterate(elements))
+ }
)
;
}
diff --git a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java
index 7b57576d6e..a776e28e71 100644
--- a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java
+++ b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java
@@ -27,14 +27,13 @@ public class CombineLatestArrayTckTest extends BaseTck {
public Publisher createPublisher(long elements) {
return
Flowable.combineLatest(
+ new Publisher[] { Flowable.just(1L), Flowable.fromIterable(iterate(elements)) },
new Function() {
@Override
public Long apply(Object[] a) throws Exception {
return (Long)a[0];
}
- },
- Flowable.just(1L),
- Flowable.fromIterable(iterate(elements))
+ }
)
;
}