From 482bd2bb79a83ae1b937b4ce69446950147826fe Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 26 Dec 2019 23:08:04 +0100 Subject: [PATCH] 3.x: XProcessor.offer to throw NPE immediately --- .../rxjava3/processors/BehaviorProcessor.java | 19 +++++++++---------- .../processors/MulticastProcessor.java | 19 +++++++++++-------- .../rxjava3/processors/PublishProcessor.java | 17 ++++++++--------- .../processors/BehaviorProcessorTest.java | 12 +++++++----- .../processors/MulticastProcessorTest.java | 7 ++++++- .../processors/PublishProcessorTest.java | 12 +++++++----- 6 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java index ab4c517e65..ea93669ca3 100644 --- a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java @@ -234,11 +234,11 @@ public static BehaviorProcessor create() { */ BehaviorProcessor(T defaultValue) { this(); - this.value.lazySet(Objects.requireNonNull(defaultValue, "defaultValue is null")); + this.value.lazySet(defaultValue); } @Override - protected void subscribeActual(Subscriber s) { + protected void subscribeActual(@NonNull Subscriber s) { BehaviorSubscription bs = new BehaviorSubscription<>(s, this); s.onSubscribe(bs); if (add(bs)) { @@ -258,7 +258,7 @@ protected void subscribeActual(Subscriber s) { } @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(@NonNull Subscription s) { if (terminalEvent.get() != null) { s.cancel(); return; @@ -267,7 +267,7 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { ExceptionHelper.nullCheck(t, "onNext called with a null value."); if (terminalEvent.get() != null) { @@ -281,7 +281,7 @@ public void onNext(T t) { } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { ExceptionHelper.nullCheck(t, "onError called with a null Throwable."); if (!terminalEvent.compareAndSet(null, t)) { RxJavaPlugins.onError(t); @@ -316,14 +316,13 @@ public void onComplete() { *

History: 2.0.8 - experimental * @param t the item to emit, not null * @return true if the item was emitted to all Subscribers + * @throws NullPointerException if {@code t} is {@code null} * @since 2.2 */ @CheckReturnValue - public boolean offer(T t) { - if (t == null) { - onError(ExceptionHelper.createNullPointerException("offer called with a null value.")); - return true; - } + public boolean offer(@NonNull T t) { + ExceptionHelper.nullCheck(t, "offer called with a null value."); + BehaviorSubscription[] array = subscribers.get(); for (BehaviorSubscription s : array) { diff --git a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java index d258ba1718..ea300afe34 100644 --- a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java +++ b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java @@ -258,7 +258,7 @@ public void startUnbounded() { } @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(@NonNull Subscription s) { if (SubscriptionHelper.setOnce(upstream, s)) { if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") @@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { if (once.get()) { return; } @@ -306,26 +306,29 @@ public void onNext(T t) { /** * Tries to offer an item into the internal queue and returns false * if the queue is full. - * @param t the item to offer, not null + * @param t the item to offer, not {@code null} * @return true if successful, false if the queue is full + * @throws NullPointerException if {@code t} is {@code null} + * @throws IllegalStateException if the processor is in fusion mode */ @CheckReturnValue - public boolean offer(T t) { + public boolean offer(@NonNull T t) { + ExceptionHelper.nullCheck(t, "offer called with a null value."); if (once.get()) { return false; } - ExceptionHelper.nullCheck(t, "offer called with a null value."); if (fusionMode == QueueSubscription.NONE) { if (queue.offer(t)) { drain(); return true; } + return false; } - return false; + throw new IllegalStateException("offer() should not be called in fusion mode!"); } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { ExceptionHelper.nullCheck(t, "onError called with a null Throwable."); if (once.compareAndSet(false, true)) { error = t; @@ -369,7 +372,7 @@ public Throwable getThrowable() { } @Override - protected void subscribeActual(Subscriber s) { + protected void subscribeActual(@NonNull Subscriber s) { MulticastSubscription ms = new MulticastSubscription<>(s, this); s.onSubscribe(ms); if (add(ms)) { diff --git a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java index 36bc4eff34..0a73cf1bf8 100644 --- a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java @@ -141,7 +141,7 @@ public static PublishProcessor create() { } @Override - protected void subscribeActual(Subscriber t) { + protected void subscribeActual(@NonNull Subscriber t) { PublishSubscription ps = new PublishSubscription<>(t, this); t.onSubscribe(ps); if (add(ps)) { @@ -226,7 +226,7 @@ void remove(PublishSubscription ps) { } @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(@NonNull Subscription s) { if (subscribers.get() == TERMINATED) { s.cancel(); return; @@ -236,7 +236,7 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { ExceptionHelper.nullCheck(t, "onNext called with a null value."); for (PublishSubscription s : subscribers.get()) { s.onNext(t); @@ -245,7 +245,7 @@ public void onNext(T t) { @SuppressWarnings("unchecked") @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { ExceptionHelper.nullCheck(t, "onError called with a null Throwable."); if (subscribers.get() == TERMINATED) { RxJavaPlugins.onError(t); @@ -281,14 +281,13 @@ public void onComplete() { *

History: 2.0.8 - experimental * @param t the item to emit, not null * @return true if the item was emitted to all Subscribers + * @throws NullPointerException if {@code t} is {@code null} * @since 2.2 */ @CheckReturnValue - public boolean offer(T t) { - if (t == null) { - onError(ExceptionHelper.createNullPointerException("offer called with a null value.")); - return true; - } + public boolean offer(@NonNull T t) { + ExceptionHelper.nullCheck(t, "offer called with a null value."); + PublishSubscription[] array = subscribers.get(); for (PublishSubscription s : array) { diff --git a/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java index 0f04c4baea..d49ba5f565 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java @@ -673,12 +673,14 @@ public void offer() { ts = pp.test(1); - assertTrue(pp.offer(null)); - - ts.assertFailure(NullPointerException.class, 2); + try { + pp.offer(null); + fail("Should have thrown NPE!"); + } catch (NullPointerException expected) { + // expected + } - assertTrue(pp.hasThrowable()); - assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + ts.assertValuesOnly(2); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java index 4e71919b09..afa14b8deb 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java @@ -466,7 +466,12 @@ public void asyncFused() { up.onNext(i); } - assertFalse(mp.offer(10)); + try { + mp.offer(10); + fail("Should have thrown IllegalStateException"); + } catch (IllegalStateException expected) { + // expected + } up.onComplete(); diff --git a/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java index 277ac40c78..7c413d9dc0 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java @@ -597,12 +597,14 @@ public void offer() { ts = pp.test(0); - assertTrue(pp.offer(null)); - - ts.assertFailure(NullPointerException.class); + try { + pp.offer(null); + fail("Should have thrown NPE!"); + } catch (NullPointerException expected) { + // expected + } - assertTrue(pp.hasThrowable()); - assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + ts.assertEmpty(); } @Test