Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: remove getValues() from some subjects/processors #6516

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 1 addition & 42 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package io.reactivex.processors;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;
Expand Down Expand Up @@ -63,7 +62,7 @@
* This {@code AsyncProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the very last observed value -
* after this {@code AsyncProcessor} has been completed - in a non-blocking and thread-safe
* manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}.
* manner via {@link #hasValue()} or {@link #getValue()}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code AsyncProcessor} honors the backpressure of the downstream {@code Subscriber}s and won't emit
Expand Down Expand Up @@ -331,46 +330,6 @@ public T getValue() {
return subscribers.get() == TERMINATED ? value : null;
}

/**
* Returns an Object array containing snapshot all values of this processor.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of this processor
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public Object[] getValues() {
T v = getValue();
return v != null ? new Object[] { v } : new Object[0];
}

/**
* Returns a typed array containing a snapshot of all values of this processor.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
* @param array the target array to copy values into if it fits
* @return the given array if the values fit into it or a new array containing all values
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public T[] getValues(T[] array) {
T v = getValue();
if (v == null) {
if (array.length != 0) {
array[0] = null;
}
return array;
}
if (array.length == 0) {
array = Arrays.copyOf(array, 1);
}
array[0] = v;
if (array.length != 1) {
array[1] = null;
}
return array;
}

static final class AsyncSubscription<T> extends DeferredScalarSubscription<T> {
private static final long serialVersionUID = 5629876084736248016L;

Expand Down
54 changes: 1 addition & 53 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.reactivex.processors;

import java.lang.reflect.Array;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;

Expand Down Expand Up @@ -95,8 +94,7 @@
* <p>
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
* {@link #getValues()} or {@link #getValues(Object[])}.
* in a non-blocking and thread-safe manner via {@link #hasValue()} or {@link #getValue()}.
* <p>
* Note that this processor signals {@code MissingBackpressureException} if a particular {@code Subscriber} is not
* ready to receive {@code onNext} events. To avoid this exception being signaled, use {@link #offer(Object)} to only
Expand Down Expand Up @@ -374,56 +372,6 @@ public T getValue() {
return NotificationLite.getValue(o);
}

/**
* Returns an Object array containing snapshot all values of the BehaviorProcessor.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of the BehaviorProcessor
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public Object[] getValues() {
@SuppressWarnings("unchecked")
T[] a = (T[])EMPTY_ARRAY;
T[] b = getValues(a);
if (b == EMPTY_ARRAY) {
return new Object[0];
}
return b;

}

/**
* Returns a typed array containing a snapshot of all values of the BehaviorProcessor.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
* @param array the target array to copy values into if it fits
* @return the given array if the values fit into it or a new array containing all values
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
Object o = value.get();
if (o == null || NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
if (array.length != 0) {
array[0] = null;
}
return array;
}
T v = NotificationLite.getValue(o);
if (array.length != 0) {
array[0] = v;
if (array.length != 1) {
array[1] = null;
}
} else {
array = (T[])Array.newInstance(array.getClass().getComponentType(), 1);
array[0] = v;
}
return array;
}

@Override
public boolean hasComplete() {
Object o = value.get();
Expand Down
47 changes: 2 additions & 45 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Observer;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.DeferredScalarDisposable;
Expand Down Expand Up @@ -64,7 +61,7 @@
* This {@code AsyncSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the very last observed value -
* after this {@code AsyncSubject} has been completed - in a non-blocking and thread-safe
* manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}.
* manner via {@link #hasValue()} or {@link #getValue()}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code AsyncSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
Expand Down Expand Up @@ -321,46 +318,6 @@ public T getValue() {
return subscribers.get() == TERMINATED ? value : null;
}

/**
* Returns an Object array containing snapshot all values of the Subject.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of the Subject
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public Object[] getValues() {
T v = getValue();
return v != null ? new Object[] { v } : new Object[0];
}

/**
* Returns a typed array containing a snapshot of all values of the Subject.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
* @param array the target array to copy values into if it fits
* @return the given array if the values fit into it or a new array containing all values
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public T[] getValues(T[] array) {
T v = getValue();
if (v == null) {
if (array.length != 0) {
array[0] = null;
}
return array;
}
if (array.length == 0) {
array = Arrays.copyOf(array, 1);
}
array[0] = v;
if (array.length != 1) {
array[1] = null;
}
return array;
}

static final class AsyncDisposable<T> extends DeferredScalarDisposable<T> {
private static final long serialVersionUID = 5629876084736248016L;

Expand Down
61 changes: 2 additions & 59 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@

package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.*;

import io.reactivex.Observer;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -97,8 +94,7 @@
* <p>
* This {@code BehaviorSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the latest observed value
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
* {@link #getValues()} or {@link #getValues(Object[])}.
* in a non-blocking and thread-safe manner via {@link #hasValue()} or {@link #getValue()}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code BehaviorSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
Expand Down Expand Up @@ -153,9 +149,6 @@
*/
public final class BehaviorSubject<T> extends Subject<T> {

/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];

final AtomicReference<Object> value;

final AtomicReference<BehaviorDisposable<T>[]> subscribers;
Expand Down Expand Up @@ -326,56 +319,6 @@ public T getValue() {
return NotificationLite.getValue(o);
}

/**
* Returns an Object array containing snapshot all values of the Subject.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of the Subject
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
public Object[] getValues() {
@SuppressWarnings("unchecked")
T[] a = (T[])EMPTY_ARRAY;
T[] b = getValues(a);
if (b == EMPTY_ARRAY) {
return new Object[0];
}
return b;

}

/**
* Returns a typed array containing a snapshot of all values of the Subject.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
* @param array the target array to copy values into if it fits
* @return the given array if the values fit into it or a new array containing all values
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
Object o = value.get();
if (o == null || NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
if (array.length != 0) {
array[0] = null;
}
return array;
}
T v = NotificationLite.getValue(o);
if (array.length != 0) {
array[0] = v;
if (array.length != 1) {
array[1] = null;
}
} else {
array = (T[])Array.newInstance(array.getClass().getComponentType(), 1);
array[0] = v;
}
return array;
}

@Override
public boolean hasComplete() {
Object o = value.get();
Expand Down
Loading