Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: XProcessor.offer to throw NPE immediately #6799

Merged
merged 1 commit into from
Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,11 @@ public static <T> BehaviorProcessor<T> create() {
*/
BehaviorProcessor(T defaultValue) {
this();
this.value.lazySet(Objects.requireNonNull(defaultValue, "defaultValue is null"));
this.value.lazySet(defaultValue);
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
protected void subscribeActual(@NonNull Subscriber<? super T> s) {
BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);
s.onSubscribe(bs);
if (add(bs)) {
Expand All @@ -258,7 +258,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
}

@Override
public void onSubscribe(Subscription s) {
public void onSubscribe(@NonNull Subscription s) {
if (terminalEvent.get() != null) {
s.cancel();
return;
Expand All @@ -267,7 +267,7 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(T t) {
public void onNext(@NonNull T t) {
ExceptionHelper.nullCheck(t, "onNext called with a null value.");

if (terminalEvent.get() != null) {
Expand All @@ -281,7 +281,7 @@ public void onNext(T t) {
}

@Override
public void onError(Throwable t) {
public void onError(@NonNull Throwable t) {
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
if (!terminalEvent.compareAndSet(null, t)) {
RxJavaPlugins.onError(t);
Expand Down Expand Up @@ -316,14 +316,13 @@ public void onComplete() {
* <p>History: 2.0.8 - experimental
* @param t the item to emit, not null
* @return true if the item was emitted to all Subscribers
* @throws NullPointerException if {@code t} is {@code null}
* @since 2.2
*/
@CheckReturnValue
public boolean offer(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
return true;
}
public boolean offer(@NonNull T t) {
ExceptionHelper.nullCheck(t, "offer called with a null value.");

BehaviorSubscription<T>[] array = subscribers.get();

for (BehaviorSubscription<T> s : array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void startUnbounded() {
}

@Override
public void onSubscribe(Subscription s) {
public void onSubscribe(@NonNull Subscription s) {
if (SubscriptionHelper.setOnce(upstream, s)) {
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(T t) {
public void onNext(@NonNull T t) {
if (once.get()) {
return;
}
Expand All @@ -306,26 +306,29 @@ public void onNext(T t) {
/**
* Tries to offer an item into the internal queue and returns false
* if the queue is full.
* @param t the item to offer, not null
* @param t the item to offer, not {@code null}
* @return true if successful, false if the queue is full
* @throws NullPointerException if {@code t} is {@code null}
* @throws IllegalStateException if the processor is in fusion mode
*/
@CheckReturnValue
public boolean offer(T t) {
public boolean offer(@NonNull T t) {
ExceptionHelper.nullCheck(t, "offer called with a null value.");
if (once.get()) {
return false;
}
ExceptionHelper.nullCheck(t, "offer called with a null value.");
if (fusionMode == QueueSubscription.NONE) {
if (queue.offer(t)) {
drain();
return true;
}
return false;
}
return false;
throw new IllegalStateException("offer() should not be called in fusion mode!");
}

@Override
public void onError(Throwable t) {
public void onError(@NonNull Throwable t) {
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
if (once.compareAndSet(false, true)) {
error = t;
Expand Down Expand Up @@ -369,7 +372,7 @@ public Throwable getThrowable() {
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
protected void subscribeActual(@NonNull Subscriber<? super T> s) {
MulticastSubscription<T> ms = new MulticastSubscription<>(s, this);
s.onSubscribe(ms);
if (add(ms)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static <T> PublishProcessor<T> create() {
}

@Override
protected void subscribeActual(Subscriber<? super T> t) {
protected void subscribeActual(@NonNull Subscriber<? super T> t) {
PublishSubscription<T> ps = new PublishSubscription<>(t, this);
t.onSubscribe(ps);
if (add(ps)) {
Expand Down Expand Up @@ -226,7 +226,7 @@ void remove(PublishSubscription<T> ps) {
}

@Override
public void onSubscribe(Subscription s) {
public void onSubscribe(@NonNull Subscription s) {
if (subscribers.get() == TERMINATED) {
s.cancel();
return;
Expand All @@ -236,7 +236,7 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(T t) {
public void onNext(@NonNull T t) {
ExceptionHelper.nullCheck(t, "onNext called with a null value.");
for (PublishSubscription<T> s : subscribers.get()) {
s.onNext(t);
Expand All @@ -245,7 +245,7 @@ public void onNext(T t) {

@SuppressWarnings("unchecked")
@Override
public void onError(Throwable t) {
public void onError(@NonNull Throwable t) {
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
Expand Down Expand Up @@ -281,14 +281,13 @@ public void onComplete() {
* <p>History: 2.0.8 - experimental
* @param t the item to emit, not null
* @return true if the item was emitted to all Subscribers
* @throws NullPointerException if {@code t} is {@code null}
* @since 2.2
*/
@CheckReturnValue
public boolean offer(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
return true;
}
public boolean offer(@NonNull T t) {
ExceptionHelper.nullCheck(t, "offer called with a null value.");

PublishSubscription<T>[] array = subscribers.get();

for (PublishSubscription<T> s : array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,14 @@ public void offer() {

ts = pp.test(1);

assertTrue(pp.offer(null));

ts.assertFailure(NullPointerException.class, 2);
try {
pp.offer(null);
fail("Should have thrown NPE!");
} catch (NullPointerException expected) {
// expected
}

assertTrue(pp.hasThrowable());
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
ts.assertValuesOnly(2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,12 @@ public void asyncFused() {
up.onNext(i);
}

assertFalse(mp.offer(10));
try {
mp.offer(10);
fail("Should have thrown IllegalStateException");
} catch (IllegalStateException expected) {
// expected
}

up.onComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,14 @@ public void offer() {

ts = pp.test(0);

assertTrue(pp.offer(null));

ts.assertFailure(NullPointerException.class);
try {
pp.offer(null);
fail("Should have thrown NPE!");
} catch (NullPointerException expected) {
// expected
}

assertTrue(pp.hasThrowable());
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
ts.assertEmpty();
}

@Test
Expand Down