From 36a87336c470c133ed7d87aed0d7632a868cc3a5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 19 Jun 2019 13:31:51 +0200 Subject: [PATCH] 3.x: ConnectableFlowable/ConnetableFlowable redesign --- src/main/java/io/reactivex/Flowable.java | 2 +- src/main/java/io/reactivex/Observable.java | 2 +- .../flowables/ConnectableFlowable.java | 23 +- .../disposables/ResettableConnectable.java | 54 -- .../operators/flowable/FlowablePublish.java | 770 ++++++------------ .../operators/flowable/FlowableRefCount.java | 20 +- .../operators/flowable/FlowableReplay.java | 18 +- .../observable/ObservablePublish.java | 401 ++++----- .../observable/ObservableRefCount.java | 20 +- .../observable/ObservableReplay.java | 15 +- .../observables/ConnectableObservable.java | 31 +- .../flowable/FlowablePublishTest.java | 299 +++++-- .../flowable/FlowableRefCountTest.java | 122 ++- .../observable/ObservablePublishTest.java | 129 ++- .../observable/ObservableRefCountTest.java | 105 ++- .../reactivex/plugins/RxJavaPluginsTest.java | 10 + 16 files changed, 1036 insertions(+), 985 deletions(-) delete mode 100644 src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 4ae2322d09..07b76aa101 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -12488,7 +12488,7 @@ public final Flowable publish(Function, ? extends Pub @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableFlowable publish(int bufferSize) { ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return FlowablePublish.create(this, bufferSize); + return RxJavaPlugins.onAssembly(new FlowablePublish(this, bufferSize)); } /** diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index fa24976de9..6123ce455d 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -10207,7 +10207,7 @@ public final Observable onTerminateDetach() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableObservable publish() { - return ObservablePublish.create(this); + return RxJavaPlugins.onAssembly(new ObservablePublish(this)); } /** diff --git a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java index 21636e67da..f2d66563a2 100644 --- a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java +++ b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java @@ -34,11 +34,24 @@ * before the {@code Flowable} begins emitting items. *

* - * + *

+ * When the upstream terminates, the {@code ConnectableFlowable} remains in this terminated state and, + * depending on the actual underlying implementation, relays cached events to late {@link Subscriber}s. + * In order to reuse and restart this {@code ConnectableFlowable}, the {@link #reset()} method has to be called. + * When called, this {@code ConnectableFlowable} will appear as fresh, unconnected source to new {@link Subscriber}s. + * Disposing the connection will reset the {@code ConnectableFlowable} to its fresh state and there is no need to call + * {@code reset()} in this case. + *

+ * Note that although {@link #connect()} and {@link #reset()} are safe to call from multiple threads, it is recommended + * a dedicated thread or business logic manages the connection or resetting of a {@code ConnectableFlowable} so that + * there is no unwanted signal loss due to early {@code connect()} or {@code reset()} calls while {@code Subscriber}s are + * still being subscribed to to this {@code ConnectableFlowable} to receive signals from the get go. + *

* @see RxJava Wiki: * Connectable Observable Operators * @param * the type of items emitted by the {@code ConnectableFlowable} + * @since 2.0.0 */ public abstract class ConnectableFlowable extends Flowable { @@ -53,6 +66,14 @@ public abstract class ConnectableFlowable extends Flowable { */ public abstract void connect(@NonNull Consumer connection); + /** + * Resets this ConnectableFlowable into its fresh state if it has terminated. + *

+ * Calling this method on a fresh or active {@code ConnectableFlowable} has no effect. + * @since 3.0.0 + */ + public abstract void reset(); + /** * Instructs the {@code ConnectableFlowable} to begin emitting the items from its underlying * {@link Flowable} to its {@link Subscriber}s. diff --git a/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java b/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java deleted file mode 100644 index a111080a77..0000000000 --- a/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.disposables; - -import io.reactivex.annotations.Experimental; -import io.reactivex.disposables.Disposable; -import io.reactivex.flowables.ConnectableFlowable; -import io.reactivex.observables.ConnectableObservable; - -/** - * Interface allowing conditional resetting of connections in {@link ConnectableObservable}s - * and {@link ConnectableFlowable}s. - * @since 2.2.2 - experimental - */ -@Experimental -public interface ResettableConnectable { - - /** - * Reset the connectable source only if the given {@link Disposable} {@code connection} instance - * is still representing a connection established by a previous {@code connect()} connection. - *

- * For example, an immediately previous connection should reset the connectable source: - *


-     * Disposable d = connectable.connect();
-     * 
-     * ((ResettableConnectable)connectable).resetIf(d);
-     * 
- * However, if the connection indicator {@code Disposable} is from a much earlier connection, - * it should not affect the current connection: - *

-     * Disposable d1 = connectable.connect();
-     * d.dispose();
-     *
-     * Disposable d2 = connectable.connect();
-     *
-     * ((ResettableConnectable)connectable).resetIf(d);
-     * 
-     * assertFalse(d2.isDisposed());
-     * 
- * @param connection the disposable received from a previous {@code connect()} call. - */ - void resetIf(Disposable connection); -} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index 7ab84a568b..6dac8e3ac1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -29,47 +29,30 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * A connectable observable which shares an underlying source and dispatches source values to subscribers in a backpressure-aware - * manner. - * @param the value type + * Shares a single underlying connection to the upstream Publisher + * and multicasts events to all subscribed subscribers until the upstream + * completes or the connection is disposed. + *

+ * The difference to FlowablePublish is that when the upstream terminates, + * late subscriberss will receive that terminal event until the connection is + * disposed and the ConnectableFlowable is reset to its fresh state. + * + * @param the element type + * @since 2.2.10 */ -public final class FlowablePublish extends ConnectableFlowable implements HasUpstreamPublisher { - /** - * Indicates this child has been cancelled: the state is swapped in atomically and - * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. - */ - static final long CANCELLED = Long.MIN_VALUE; +public final class FlowablePublish extends ConnectableFlowable +implements HasUpstreamPublisher { - /** The source observable. */ - final Flowable source; - /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ - final AtomicReference> current; + final Publisher source; - /** The size of the prefetch buffer. */ final int bufferSize; - final Publisher onSubscribe; - - /** - * Creates a OperatorPublish instance to publish values of the given source observable. - * @param the source value type - * @param source the source observable - * @param bufferSize the size of the prefetch buffer - * @return the connectable observable - */ - public static ConnectableFlowable create(Flowable source, final int bufferSize) { - // the current connection to source needs to be shared between the operator and its onSubscribe call - final AtomicReference> curr = new AtomicReference>(); - Publisher onSubscribe = new FlowablePublisher(curr, bufferSize); - return RxJavaPlugins.onAssembly(new FlowablePublish(onSubscribe, source, curr, bufferSize)); - } + final AtomicReference> current; - private FlowablePublish(Publisher onSubscribe, Flowable source, - final AtomicReference> current, int bufferSize) { - this.onSubscribe = onSubscribe; + public FlowablePublish(Publisher source, int bufferSize) { this.source = source; - this.current = current; this.bufferSize = bufferSize; + this.current = new AtomicReference>(); } @Override @@ -77,112 +60,137 @@ public Publisher source() { return source; } - @Override - protected void subscribeActual(Subscriber s) { - onSubscribe.subscribe(s); + /** + * @return The internal buffer size of this FloawblePublishAlt operator. + */ + public int publishBufferSize() { + return bufferSize; } @Override public void connect(Consumer connection) { - boolean doConnect; - PublishSubscriber ps; - // we loop because concurrent connect/disconnect and termination may change the state + PublishConnection conn; + boolean doConnect = false; + for (;;) { - // retrieve the current subscriber-to-source instance - ps = current.get(); - // if there is none yet or the current has been disposed - if (ps == null || ps.isDisposed()) { - // create a new subscriber-to-source - PublishSubscriber u = new PublishSubscriber(current, bufferSize); - // try setting it as the current subscriber-to-source - if (!current.compareAndSet(ps, u)) { - // did not work, perhaps a new subscriber arrived - // and created a new subscriber-to-source as well, retry + conn = current.get(); + + if (conn == null || conn.isDisposed()) { + PublishConnection fresh = new PublishConnection(current, bufferSize); + if (!current.compareAndSet(conn, fresh)) { continue; } - ps = u; + conn = fresh; } - // if connect() was called concurrently, only one of them should actually - // connect to the source - doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; // NOPMD + + doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true); + break; } - /* - * Notify the callback that we have a (new) connection which it can dispose - * but since ps is unique to a connection, multiple calls to connect() will return the - * same Subscription and even if there was a connect-disconnect-connect pair, the older - * references won't disconnect the newer connection. - * Synchronous source consumers have the opportunity to disconnect via dispose on the - * Disposable as subscribe() may never return on its own. - * - * Note however, that asynchronously disconnecting a running source might leave - * child subscribers without any terminal event; PublishProcessor does not have this - * issue because the cancellation was always triggered by the child subscribers - * themselves. - */ + try { - connection.accept(ps); + connection.accept(conn); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } + if (doConnect) { - source.subscribe(ps); + source.subscribe(conn); + } + } + + @Override + protected void subscribeActual(Subscriber s) { + PublishConnection conn; + + for (;;) { + conn = current.get(); + + // don't create a fresh connection if the current is disposed + if (conn == null) { + PublishConnection fresh = new PublishConnection(current, bufferSize); + if (!current.compareAndSet(conn, fresh)) { + continue; + } + conn = fresh; + } + + break; + } + + InnerSubscription inner = new InnerSubscription(s, conn); + s.onSubscribe(inner); + + if (conn.add(inner)) { + if (inner.isCancelled()) { + conn.remove(inner); + } else { + conn.drain(); + } + return; + } + + Throwable ex = conn.error; + if (ex != null) { + inner.downstream.onError(ex); + } else { + inner.downstream.onComplete(); + } + } + + @Override + public void reset() { + PublishConnection conn = current.get(); + if (conn != null && conn.isDisposed()) { + current.compareAndSet(conn, null); } } - @SuppressWarnings("rawtypes") - static final class PublishSubscriber + static final class PublishConnection extends AtomicInteger implements FlowableSubscriber, Disposable { - private static final long serialVersionUID = -202316842419149694L; - /** Indicates an empty array of inner subscribers. */ - static final InnerSubscriber[] EMPTY = new InnerSubscriber[0]; - /** Indicates a terminated PublishSubscriber. */ - static final InnerSubscriber[] TERMINATED = new InnerSubscriber[0]; + private static final long serialVersionUID = -1672047311619175801L; - /** Holds onto the current connected PublishSubscriber. */ - final AtomicReference> current; - /** The prefetch buffer size. */ - final int bufferSize; + final AtomicReference> current; - /** Tracks the subscribed InnerSubscribers. */ - final AtomicReference[]> subscribers; - /** - * Atomically changed from false to true by connect to make sure the - * connection is only performed by one thread. - */ - final AtomicBoolean shouldConnect; + final AtomicReference upstream; - final AtomicReference upstream = new AtomicReference(); + final AtomicBoolean connect; - /** Contains either an onComplete or an onError token from upstream. */ - volatile Object terminalEvent; + final AtomicReference[]> subscribers; - int sourceMode; + final int bufferSize; - /** Holds notifications from upstream. */ volatile SimpleQueue queue; + int sourceMode; + + volatile boolean done; + Throwable error; + + int consumed; + + @SuppressWarnings("rawtypes") + static final InnerSubscription[] EMPTY = new InnerSubscription[0]; + @SuppressWarnings("rawtypes") + static final InnerSubscription[] TERMINATED = new InnerSubscription[0]; + @SuppressWarnings("unchecked") - PublishSubscriber(AtomicReference> current, int bufferSize) { - this.subscribers = new AtomicReference[]>(EMPTY); + PublishConnection(AtomicReference> current, int bufferSize) { this.current = current; - this.shouldConnect = new AtomicBoolean(); + this.upstream = new AtomicReference(); + this.connect = new AtomicBoolean(); this.bufferSize = bufferSize; + this.subscribers = new AtomicReference[]>(EMPTY); } + @SuppressWarnings("unchecked") @Override public void dispose() { - if (subscribers.get() != TERMINATED) { - @SuppressWarnings("unchecked") - InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED); - if (ps != TERMINATED) { - current.compareAndSet(PublishSubscriber.this, null); - SubscriptionHelper.cancel(upstream); - } - } + subscribers.getAndSet(TERMINATED); + current.compareAndSet(this, null); + SubscriptionHelper.cancel(upstream); } @Override @@ -201,8 +209,8 @@ public void onSubscribe(Subscription s) { if (m == QueueSubscription.SYNC) { sourceMode = m; queue = qs; - terminalEvent = NotificationLite.complete(); - dispatch(); + done = true; + drain(); return; } if (m == QueueSubscription.ASYNC) { @@ -228,46 +236,149 @@ public void onNext(T t) { } // since many things can happen concurrently, we have a common dispatch // loop to act on the current state serially - dispatch(); + drain(); } @Override - public void onError(Throwable e) { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (terminalEvent == null) { - terminalEvent = NotificationLite.error(e); - // since many things can happen concurrently, we have a common dispatch - // loop to act on the current state serially - dispatch(); + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); } else { - RxJavaPlugins.onError(e); + error = t; + done = true; + drain(); } } @Override public void onComplete() { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (terminalEvent == null) { - terminalEvent = NotificationLite.complete(); - // since many things can happen concurrently, we have a common dispatch loop - // to act on the current state serially - dispatch(); + done = true; + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + SimpleQueue queue = this.queue; + int consumed = this.consumed; + int limit = this.bufferSize - (this.bufferSize >> 2); + boolean async = this.sourceMode != QueueSubscription.SYNC; + + outer: + for (;;) { + if (queue != null) { + long minDemand = Long.MAX_VALUE; + boolean hasDemand = false; + + InnerSubscription[] consumers = subscribers.get(); + + for (InnerSubscription inner : consumers) { + long request = inner.get(); + if (request != Long.MIN_VALUE) { + hasDemand = true; + minDemand = Math.min(request - inner.emitted, minDemand); + } + } + + if (!hasDemand) { + minDemand = 0L; + } + + while (minDemand != 0L) { + boolean d = done; + T v; + + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.get().cancel(); + queue.clear(); + done = true; + signalError(ex); + return; + } + + boolean empty = v == null; + + if (checkTerminated(d, empty)) { + return; + } + + if (empty) { + break; + } + + for (InnerSubscription inner : consumers) { + if (!inner.isCancelled()) { + inner.downstream.onNext(v); + inner.emitted++; + } + } + + if (async && ++consumed == limit) { + consumed = 0; + upstream.get().request(limit); + } + minDemand--; + + if (consumers != subscribers.get()) { + continue outer; + } + } + + if (checkTerminated(done, queue.isEmpty())) { + return; + } + } + + this.consumed = consumed; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + if (queue == null) { + queue = this.queue; + } } } - /** - * Atomically try adding a new InnerSubscriber to this Subscriber or return false if this - * Subscriber was terminated. - * @param producer the producer to add - * @return true if succeeded, false otherwise - */ - boolean add(InnerSubscriber producer) { + @SuppressWarnings("unchecked") + boolean checkTerminated(boolean isDone, boolean isEmpty) { + if (isDone && isEmpty) { + Throwable ex = error; + + if (ex != null) { + signalError(ex); + } else { + for (InnerSubscription inner : subscribers.getAndSet(TERMINATED)) { + if (!inner.isCancelled()) { + inner.downstream.onComplete(); + } + } + } + return true; + } + return false; + } + + @SuppressWarnings("unchecked") + void signalError(Throwable ex) { + for (InnerSubscription inner : subscribers.getAndSet(TERMINATED)) { + if (!inner.isCancelled()) { + inner.downstream.onError(ex); + } + } + } + + boolean add(InnerSubscription inner) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array - InnerSubscriber[] c = subscribers.get(); + InnerSubscription[] c = subscribers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { @@ -276,9 +387,9 @@ boolean add(InnerSubscriber producer) { // we perform a copy-on-write logic int len = c.length; @SuppressWarnings("unchecked") - InnerSubscriber[] u = new InnerSubscriber[len + 1]; + InnerSubscription[] u = new InnerSubscription[len + 1]; System.arraycopy(c, 0, u, 0, len); - u[len] = producer; + u[len] = inner; // try setting the subscribers array if (subscribers.compareAndSet(c, u)) { return true; @@ -288,16 +399,12 @@ boolean add(InnerSubscriber producer) { } } - /** - * Atomically removes the given InnerSubscriber from the subscribers array. - * @param producer the producer to remove - */ @SuppressWarnings("unchecked") - void remove(InnerSubscriber producer) { + void remove(InnerSubscription inner) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current subscribers array - InnerSubscriber[] c = subscribers.get(); + InnerSubscription[] c = subscribers.get(); int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit if (len == 0) { @@ -307,7 +414,7 @@ void remove(InnerSubscriber producer) { // although this is O(n), we don't expect too many child subscribers in general int j = -1; for (int i = 0; i < len; i++) { - if (c[i].equals(producer)) { + if (c[i] == inner) { j = i; break; } @@ -317,14 +424,14 @@ void remove(InnerSubscriber producer) { return; } // we do copy-on-write logic here - InnerSubscriber[] u; + InnerSubscription[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { u = EMPTY; } else { // otherwise, create a new array one less in size - u = new InnerSubscriber[len - 1]; + u = new InnerSubscription[len - 1]; // copy elements being before the given producer System.arraycopy(c, 0, u, 0, j); // copy elements being after the given producer @@ -338,393 +445,42 @@ void remove(InnerSubscriber producer) { // (a concurrent add/remove or termination), we need to retry } } + } - /** - * Perform termination actions in case the source has terminated in some way and - * the queue has also become empty. - * @param term the terminal event (a NotificationLite.error or completed) - * @param empty set to true if the queue is empty - * @return true if there is indeed a terminal condition - */ - @SuppressWarnings("unchecked") - boolean checkTerminated(Object term, boolean empty) { - // first of all, check if there is actually a terminal event - if (term != null) { - // is it a completion event (impl. note, this is much cheaper than checking for isError) - if (NotificationLite.isComplete(term)) { - // but we also need to have an empty queue - if (empty) { - // this will prevent OnSubscribe spinning on a terminated but - // not yet cancelled PublishSubscriber - current.compareAndSet(this, null); - /* - * This will swap in a terminated array so add() in OnSubscribe will reject - * child subscribers to associate themselves with a terminated and thus - * never again emitting chain. - * - * Since we atomically change the contents of 'subscribers' only one - * operation wins at a time. If an add() wins before this getAndSet, - * its value will be part of the returned array by getAndSet and thus - * will receive the terminal notification. Otherwise, if getAndSet wins, - * add() will refuse to add the child producer and will trigger the - * creation of subscriber-to-source. - */ - for (InnerSubscriber ip : subscribers.getAndSet(TERMINATED)) { - ip.child.onComplete(); - } - // indicate we reached the terminal state - return true; - } - } else { - Throwable t = NotificationLite.getError(term); - // this will prevent OnSubscribe spinning on a terminated - // but not yet cancelled PublishSubscriber - current.compareAndSet(this, null); - // this will swap in a terminated array so add() in OnSubscribe will reject - // child subscribers to associate themselves with a terminated and thus - // never again emitting chain - InnerSubscriber[] a = subscribers.getAndSet(TERMINATED); - if (a.length != 0) { - for (InnerSubscriber ip : a) { - ip.child.onError(t); - } - } else { - RxJavaPlugins.onError(t); - } - // indicate we reached the terminal state - return true; - } - } - // there is still work to be done - return false; - } - - /** - * The common serialization point of events arriving from upstream and child subscribers - * requesting more. - */ - void dispatch() { - // standard construct of queue-drain - // if there is an emission going on, indicate that more work needs to be done - // the exact nature of this work needs to be determined from other data structures - if (getAndIncrement() != 0) { - return; - } - int missed = 1; - - // saving a local copy because this will be accessed after every item - // delivered to detect changes in the subscribers due to an onNext - // and thus not dropping items - AtomicReference[]> subscribers = this.subscribers; - - // We take a snapshot of the current child subscribers. - // Concurrent subscribers may miss this iteration, but it is to be expected - InnerSubscriber[] ps = subscribers.get(); - - outer: - for (;;) { - /* - * We need to read terminalEvent before checking the queue for emptiness because - * all enqueue happens before setting the terminal event. - * If it were the other way around, when the emission is paused between - * checking isEmpty and checking terminalEvent, some other thread might - * have produced elements and set the terminalEvent and we'd quit emitting - * prematurely. - */ - Object term = terminalEvent; - /* - * See if the queue is empty; since we need this information multiple - * times later on, we read it one. - * Although the queue can become non-empty in the mean time, we will - * detect it through the missing flag and will do another iteration. - */ - SimpleQueue q = queue; - - boolean empty = q == null || q.isEmpty(); - // if the queue is empty and the terminal event was received, quit - // and don't bother restoring emitting to false: no further activity is - // possible at this point - if (checkTerminated(term, empty)) { - return; - } - - // We have elements queued. Note that due to the serialization nature of dispatch() - // this loop is the only one which can turn a non-empty queue into an empty one - // and as such, no need to ask the queue itself again for that. - if (!empty) { - - int len = ps.length; - // Let's assume everyone requested the maximum value. - long maxRequested = Long.MAX_VALUE; - // count how many have triggered cancellation - int cancelled = 0; - - // Now find the minimum amount each child-subscriber requested - // since we can only emit that much to all of them without violating - // backpressure constraints - for (InnerSubscriber ip : ps) { - long r = ip.get(); - // if there is one child subscriber that hasn't requested yet - // we can't emit anything to anyone - if (r != CANCELLED) { - maxRequested = Math.min(maxRequested, r - ip.emitted); - } else { - cancelled++; - } - } - - // it may happen everyone has cancelled between here and subscribers.get() - // or we have no subscribers at all to begin with - if (len == cancelled) { - term = terminalEvent; - // so let's consume a value from the queue - T v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - upstream.get().cancel(); - term = NotificationLite.error(ex); - terminalEvent = term; - v = null; - } - // or terminate if there was a terminal event and the queue is empty - if (checkTerminated(term, v == null)) { - return; - } - // otherwise, just ask for a new value - if (sourceMode != QueueSubscription.SYNC) { - upstream.get().request(1); - } - // and retry emitting to potential new child subscribers - continue; - } - // if we get here, it means there are non-cancelled child subscribers - // and we count the number of emitted values because the queue - // may contain less than requested - int d = 0; - while (d < maxRequested) { - term = terminalEvent; - T v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - upstream.get().cancel(); - term = NotificationLite.error(ex); - terminalEvent = term; - v = null; - } + static final class InnerSubscription extends AtomicLong + implements Subscription { - empty = v == null; - // let's check if there is a terminal event and the queue became empty just now - if (checkTerminated(term, empty)) { - return; - } - // the queue is empty but we aren't terminated yet, finish this emission loop - if (empty) { - break; - } - // we need to unwrap potential nulls - T value = NotificationLite.getValue(v); - - boolean subscribersChanged = false; - - // let's emit this value to all child subscribers - for (InnerSubscriber ip : ps) { - // if ip.get() is negative, the child has either cancelled in the - // meantime or hasn't requested anything yet - // this eager behavior will skip cancelled children in case - // multiple values are available in the queue - long ipr = ip.get(); - if (ipr != CANCELLED) { - if (ipr != Long.MAX_VALUE) { - // indicate this child has received 1 element - ip.emitted++; - } - ip.child.onNext(value); - } else { - subscribersChanged = true; - } - } - // indicate we emitted one element - d++; - - // see if the array of subscribers changed as a consequence - // of emission or concurrent activity - InnerSubscriber[] freshArray = subscribers.get(); - if (subscribersChanged || freshArray != ps) { - ps = freshArray; - - // if we did emit at least one element, request more to replenish the queue - if (d != 0) { - if (sourceMode != QueueSubscription.SYNC) { - upstream.get().request(d); - } - } + private static final long serialVersionUID = 2845000326761540265L; - continue outer; - } - } + final Subscriber downstream; - // if we did emit at least one element, request more to replenish the queue - if (d != 0) { - if (sourceMode != QueueSubscription.SYNC) { - upstream.get().request(d); - } - } - // if we have requests but not an empty queue after emission - // let's try again to see if more requests/child subscribers are - // ready to receive more - if (maxRequested != 0L && !empty) { - continue; - } - } + final PublishConnection parent; - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - - // get a fresh copy of the current subscribers - ps = subscribers.get(); - } - } - } - /** - * A Subscription that manages the request and cancellation state of a - * child subscriber in thread-safe manner. - * @param the value type - */ - static final class InnerSubscriber extends AtomicLong implements Subscription { - - private static final long serialVersionUID = -4453897557930727610L; - /** The actual child subscriber. */ - final Subscriber child; - /** - * The parent subscriber-to-source used to allow removing the child in case of - * child cancellation. - */ - volatile PublishSubscriber parent; - - /** Track the number of emitted items (avoids decrementing the request counter). */ long emitted; - InnerSubscriber(Subscriber child) { - this.child = child; + InnerSubscription(Subscriber downstream, PublishConnection parent) { + this.downstream = downstream; + this.parent = parent; } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.addCancel(this, n); - PublishSubscriber p = parent; - if (p != null) { - p.dispatch(); - } + parent.drain(); } } @Override public void cancel() { - long r = get(); - // let's see if we are cancelled - if (r != CANCELLED) { - // if not, swap in the terminal state, this is idempotent - // because other methods using CAS won't overwrite this value, - // concurrent calls to cancel will atomically swap in the same - // terminal value - r = getAndSet(CANCELLED); - // and only one of them will see a non-terminated value before the swap - if (r != CANCELLED) { - PublishSubscriber p = parent; - if (p != null) { - // remove this from the parent - p.remove(this); - // After removal, we might have unblocked the other child subscribers: - // let's assume this child had 0 requested before the cancellation while - // the others had non-zero. By removing this 'blocking' child, the others - // are now free to receive events - p.dispatch(); - } - } + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + parent.drain(); } } - } - static final class FlowablePublisher implements Publisher { - private final AtomicReference> curr; - private final int bufferSize; - - FlowablePublisher(AtomicReference> curr, int bufferSize) { - this.curr = curr; - this.bufferSize = bufferSize; - } - - @Override - public void subscribe(Subscriber child) { - // create the backpressure-managing producer for this child - InnerSubscriber inner = new InnerSubscriber(child); - child.onSubscribe(inner); - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishSubscriber r = curr.get(); - // if there isn't one or it is cancelled/disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishSubscriber u = new PublishSubscriber(curr, bufferSize); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - if (inner.get() == CANCELLED) { - r.remove(inner); - } else { - inner.parent = r; - } - r.dispatch(); - break; // NOPMD - } - /* - * The current PublishSubscriber has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child subscribers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishProcessor would be to immediately terminate such child - * subscribers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * subscribers asynchronously before a connect call. - */ - } + public boolean isCancelled() { + return get() == Long.MIN_VALUE; } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 02ed97b462..27557ebc79 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -122,11 +122,7 @@ void terminated(RefConnection rc) { } } if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); - } + source.reset(); } } } @@ -137,14 +133,10 @@ void timeout(RefConnection rc) { connection = null; Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - if (connectionObject == null) { - rc.disconnectedEarly = true; - } else { - ((ResettableConnectable)source).resetIf(connectionObject); - } + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + source.reset(); } } } @@ -179,7 +171,7 @@ public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); synchronized (parent) { if (disconnectedEarly) { - ((ResettableConnectable)parent.source).resetIf(t); + parent.source.reset(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 1acd6bfad8..f1c10ae44b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.flowable; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -24,7 +24,6 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.ResettableConnectable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.HasUpstreamPublisher; import io.reactivex.internal.subscribers.SubscriberResourceWrapper; @@ -33,7 +32,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher, ResettableConnectable { +public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher { /** The source observable. */ final Flowable source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -162,10 +161,12 @@ protected void subscribeActual(Subscriber s) { onSubscribe.subscribe(s); } - @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void resetIf(Disposable connectionObject) { - current.compareAndSet((ReplaySubscriber)connectionObject, null); + public void reset() { + ReplaySubscriber conn = current.get(); + if (conn != null && conn.isDisposed()) { + current.compareAndSet(conn, null); + } } @Override @@ -1156,6 +1157,11 @@ public void connect(Consumer connection) { cf.connect(connection); } + @Override + public void reset() { + cf.reset(); + } + @Override protected void subscribeActual(Subscriber s) { flowable.subscribe(s); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java index debc37875b..932f0b1468 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java @@ -26,360 +26,265 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * A connectable observable which shares an underlying source and dispatches source values to observers in a backpressure-aware - * manner. - * @param the value type + * Shares a single underlying connection to the upstream ObservableSource + * and multicasts events to all subscribed observers until the upstream + * completes or the connection is disposed. + *

+ * The difference to ObservablePublish is that when the upstream terminates, + * late observers will receive that terminal event until the connection is + * disposed and the ConnectableObservable is reset to its fresh state. + * + * @param the element type + * @since 2.2.10 */ -public final class ObservablePublish extends ConnectableObservable implements HasUpstreamObservableSource { - /** The source observable. */ - final ObservableSource source; - /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ - final AtomicReference> current; +public final class ObservablePublish extends ConnectableObservable +implements HasUpstreamObservableSource { - final ObservableSource onSubscribe; + final ObservableSource source; - /** - * Creates a OperatorPublish instance to publish values of the given source observable. - * @param the source value type - * @param source the source observable - * @return the connectable observable - */ - public static ConnectableObservable create(ObservableSource source) { - // the current connection to source needs to be shared between the operator and its onSubscribe call - final AtomicReference> curr = new AtomicReference>(); - ObservableSource onSubscribe = new PublishSource(curr); - return RxJavaPlugins.onAssembly(new ObservablePublish(onSubscribe, source, curr)); - } + final AtomicReference> current; - private ObservablePublish(ObservableSource onSubscribe, ObservableSource source, - final AtomicReference> current) { - this.onSubscribe = onSubscribe; + public ObservablePublish(ObservableSource source) { this.source = source; - this.current = current; - } - - @Override - public ObservableSource source() { - return source; - } - - @Override - protected void subscribeActual(Observer observer) { - onSubscribe.subscribe(observer); + this.current = new AtomicReference>(); } @Override public void connect(Consumer connection) { - boolean doConnect; - PublishObserver ps; - // we loop because concurrent connect/disconnect and termination may change the state + boolean doConnect = false; + PublishConnection conn; + for (;;) { - // retrieve the current subscriber-to-source instance - ps = current.get(); - // if there is none yet or the current has been disposed - if (ps == null || ps.isDisposed()) { - // create a new subscriber-to-source - PublishObserver u = new PublishObserver(current); - // try setting it as the current subscriber-to-source - if (!current.compareAndSet(ps, u)) { - // did not work, perhaps a new subscriber arrived - // and created a new subscriber-to-source as well, retry + conn = current.get(); + + if (conn == null || conn.isDisposed()) { + PublishConnection fresh = new PublishConnection(current); + if (!current.compareAndSet(conn, fresh)) { continue; } - ps = u; + conn = fresh; } - // if connect() was called concurrently, only one of them should actually - // connect to the source - doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; // NOPMD + + doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true); + break; } - /* - * Notify the callback that we have a (new) connection which it can dispose - * but since ps is unique to a connection, multiple calls to connect() will return the - * same Disposable and even if there was a connect-disconnect-connect pair, the older - * references won't disconnect the newer connection. - * Synchronous source consumers have the opportunity to disconnect via dispose on the - * Disposable as subscribe() may never return in its own. - * - * Note however, that asynchronously disconnecting a running source might leave - * child observers without any terminal event; PublishSubject does not have this - * issue because the dispose() was always triggered by the child observers - * themselves. - */ + try { - connection.accept(ps); + connection.accept(conn); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } + if (doConnect) { - source.subscribe(ps); + source.subscribe(conn); } } - @SuppressWarnings("rawtypes") - static final class PublishObserver + @Override + protected void subscribeActual(Observer observer) { + PublishConnection conn; + + for (;;) { + conn = current.get(); + // we don't create a fresh connection if the current is terminated + if (conn == null) { + PublishConnection fresh = new PublishConnection(current); + if (!current.compareAndSet(conn, fresh)) { + continue; + } + conn = fresh; + } + break; + } + + InnerDisposable inner = new InnerDisposable(observer, conn); + observer.onSubscribe(inner); + if (conn.add(inner)) { + if (inner.isDisposed()) { + conn.remove(inner); + } + return; + } + // Late observers will be simply terminated + Throwable error = conn.error; + if (error != null) { + observer.onError(error); + } else { + observer.onComplete(); + } + } + + @Override + public void reset() { + PublishConnection conn = current.get(); + if (conn != null && conn.isDisposed()) { + current.compareAndSet(conn, null); + } + } + + @Override + public ObservableSource source() { + return source; + } + + static final class PublishConnection + extends AtomicReference[]> implements Observer, Disposable { - /** Holds onto the current connected PublishObserver. */ - final AtomicReference> current; - /** Indicates an empty array of inner observers. */ + private static final long serialVersionUID = -3251430252873581268L; + + final AtomicBoolean connect; + + final AtomicReference> current; + + final AtomicReference upstream; + + @SuppressWarnings("rawtypes") static final InnerDisposable[] EMPTY = new InnerDisposable[0]; - /** Indicates a terminated PublishObserver. */ - static final InnerDisposable[] TERMINATED = new InnerDisposable[0]; - /** Tracks the subscribed observers. */ - final AtomicReference[]> observers; - /** - * Atomically changed from false to true by connect to make sure the - * connection is only performed by one thread. - */ - final AtomicBoolean shouldConnect; + @SuppressWarnings("rawtypes") + static final InnerDisposable[] TERMINATED = new InnerDisposable[0]; - final AtomicReference upstream = new AtomicReference(); + Throwable error; @SuppressWarnings("unchecked") - PublishObserver(AtomicReference> current) { - this.observers = new AtomicReference[]>(EMPTY); + public PublishConnection(AtomicReference> current) { + this.connect = new AtomicBoolean(); this.current = current; - this.shouldConnect = new AtomicBoolean(); + this.upstream = new AtomicReference(); + lazySet(EMPTY); } @SuppressWarnings("unchecked") @Override public void dispose() { - InnerDisposable[] ps = observers.getAndSet(TERMINATED); - if (ps != TERMINATED) { - current.compareAndSet(PublishObserver.this, null); - - DisposableHelper.dispose(upstream); - } + getAndSet(TERMINATED); + current.compareAndSet(this, null); + DisposableHelper.dispose(upstream); } @Override public boolean isDisposed() { - return observers.get() == TERMINATED; + return get() == TERMINATED; } @Override public void onSubscribe(Disposable d) { - DisposableHelper.setOnce(this.upstream, d); + DisposableHelper.setOnce(upstream, d); } @Override public void onNext(T t) { - for (InnerDisposable inner : observers.get()) { - inner.child.onNext(t); + for (InnerDisposable inner : get()) { + inner.downstream.onNext(t); } } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public void onError(Throwable e) { - current.compareAndSet(this, null); - InnerDisposable[] a = observers.getAndSet(TERMINATED); - if (a.length != 0) { - for (InnerDisposable inner : a) { - inner.child.onError(e); + if (upstream.get() != DisposableHelper.DISPOSED) { + error = e; + upstream.lazySet(DisposableHelper.DISPOSED); + for (InnerDisposable inner : getAndSet(TERMINATED)) { + inner.downstream.onError(e); } } else { RxJavaPlugins.onError(e); } } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public void onComplete() { - current.compareAndSet(this, null); - for (InnerDisposable inner : observers.getAndSet(TERMINATED)) { - inner.child.onComplete(); + upstream.lazySet(DisposableHelper.DISPOSED); + for (InnerDisposable inner : getAndSet(TERMINATED)) { + inner.downstream.onComplete(); } } - /** - * Atomically try adding a new InnerDisposable to this Observer or return false if this - * Observer was terminated. - * @param producer the producer to add - * @return true if succeeded, false otherwise - */ - boolean add(InnerDisposable producer) { - // the state can change so we do a CAS loop to achieve atomicity + public boolean add(InnerDisposable inner) { for (;;) { - // get the current producer array - InnerDisposable[] c = observers.get(); - // if this subscriber-to-source reached a terminal state by receiving - // an onError or onComplete, just refuse to add the new producer - if (c == TERMINATED) { + InnerDisposable[] a = get(); + if (a == TERMINATED) { return false; } - // we perform a copy-on-write logic - int len = c.length; + int n = a.length; @SuppressWarnings("unchecked") - InnerDisposable[] u = new InnerDisposable[len + 1]; - System.arraycopy(c, 0, u, 0, len); - u[len] = producer; - // try setting the observers array - if (observers.compareAndSet(c, u)) { + InnerDisposable[] b = new InnerDisposable[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (compareAndSet(a, b)) { return true; } - // if failed, some other operation succeeded (another add, remove or termination) - // so retry } } - /** - * Atomically removes the given producer from the observers array. - * @param producer the producer to remove - */ @SuppressWarnings("unchecked") - void remove(InnerDisposable producer) { - // the state can change so we do a CAS loop to achieve atomicity + public void remove(InnerDisposable inner) { for (;;) { - // let's read the current observers array - InnerDisposable[] c = observers.get(); - // if it is either empty or terminated, there is nothing to remove so we quit - int len = c.length; - if (len == 0) { + InnerDisposable[] a = get(); + int n = a.length; + if (n == 0) { return; } - // let's find the supplied producer in the array - // although this is O(n), we don't expect too many child observers in general + int j = -1; - for (int i = 0; i < len; i++) { - if (c[i].equals(producer)) { + for (int i = 0; i < n; i++) { + if (a[i] == inner) { j = i; break; } } - // we didn't find it so just quit + if (j < 0) { return; } - // we do copy-on-write logic here - InnerDisposable[] u; - // we don't create a new empty array if producer was the single inhabitant - // but rather reuse an empty array - if (len == 1) { - u = EMPTY; - } else { - // otherwise, create a new array one less in size - u = new InnerDisposable[len - 1]; - // copy elements being before the given producer - System.arraycopy(c, 0, u, 0, j); - // copy elements being after the given producer - System.arraycopy(c, j + 1, u, j, len - j - 1); + InnerDisposable[] b = EMPTY; + if (n != 1) { + b = new InnerDisposable[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); } - // try setting this new array as - if (observers.compareAndSet(c, u)) { + if (compareAndSet(a, b)) { return; } - // if we failed, it means something else happened - // (a concurrent add/remove or termination), we need to retry } } } + /** - * A Disposable that manages the request and disposed state of a - * child Observer in thread-safe manner. - * {@code this} holds the parent PublishObserver or itself if disposed - * @param the value type + * Intercepts the dispose signal from the downstream and + * removes itself from the connection's observers array + * at most once. + * @param the element type */ static final class InnerDisposable - extends AtomicReference + extends AtomicReference> implements Disposable { - private static final long serialVersionUID = -1100270633763673112L; - /** The actual child subscriber. */ - final Observer child; - InnerDisposable(Observer child) { - this.child = child; - } + private static final long serialVersionUID = 7463222674719692880L; - @Override - public boolean isDisposed() { - return get() == this; + final Observer downstream; + + public InnerDisposable(Observer downstream, PublishConnection parent) { + this.downstream = downstream; + lazySet(parent); } - @SuppressWarnings("unchecked") @Override public void dispose() { - Object o = getAndSet(this); - if (o != null && o != this) { - ((PublishObserver)o).remove(this); - } - } - - void setParent(PublishObserver p) { - if (!compareAndSet(null, p)) { + PublishConnection p = getAndSet(null); + if (p != null) { p.remove(this); } } - } - - static final class PublishSource implements ObservableSource { - private final AtomicReference> curr; - - PublishSource(AtomicReference> curr) { - this.curr = curr; - } @Override - public void subscribe(Observer child) { - // create the backpressure-managing producer for this child - InnerDisposable inner = new InnerDisposable(child); - child.onSubscribe(inner); - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishObserver r = curr.get(); - // if there isn't one or it is disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishObserver u = new PublishObserver(curr); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - inner.setParent(r); - break; // NOPMD - } - /* - * The current PublishObserver has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child observers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishSubject would be to immediately terminate such child - * observers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * observers asynchronously before a connect call. - */ - } + public boolean isDisposed() { + return get() == null; } } } + diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 5306f4481d..c238403c20 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -119,11 +119,7 @@ void terminated(RefConnection rc) { } } if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); - } + source.reset(); } } } @@ -135,14 +131,10 @@ void timeout(RefConnection rc) { Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - if (connectionObject == null) { - rc.disconnectedEarly = true; - } else { - ((ResettableConnectable)source).resetIf(connectionObject); - } + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + source.reset(); } } } @@ -177,7 +169,7 @@ public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); synchronized (parent) { if (disconnectedEarly) { - ((ResettableConnectable)parent.source).resetIf(t); + parent.source.reset(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 11180bdefb..cffbeb26bf 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -31,7 +31,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource, ResettableConnectable { +public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource { /** The source observable. */ final ObservableSource source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -159,10 +159,12 @@ public ObservableSource source() { return source; } - @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void resetIf(Disposable connectionObject) { - current.compareAndSet((ReplayObserver)connectionObject, null); + public void reset() { + ReplayObserver conn = current.get(); + if (conn != null && conn.isDisposed()) { + current.compareAndSet(conn, null); + } } @Override @@ -1069,6 +1071,11 @@ public void connect(Consumer connection) { co.connect(connection); } + @Override + public void reset() { + co.reset(); + } + @Override protected void subscribeActual(Observer observer) { observable.subscribe(observer); diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index b5e54054b1..a0262f9e5f 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -32,6 +32,18 @@ * before the {@code Observable} begins emitting items. *

* + *

+ * When the upstream terminates, the {@code ConnectableObservable} remains in this terminated state and, + * depending on the actual underlying implementation, relays cached events to late {@link Observer}s. + * In order to reuse and restart this {@code ConnectableObservable}, the {@link #reset()} method has to be called. + * When called, this {@code ConnectableObservable} will appear as fresh, unconnected source to new {@link Observer}s. + * Disposing the connection will reset the {@code ConnectableFlowable} to its fresh state and there is no need to call + * {@code reset()} in this case. + *

+ * Note that although {@link #connect()} and {@link #reset()} are safe to call from multiple threads, it is recommended + * a dedicated thread or business logic manages the connection or resetting of a {@code ConnectableObservable} so that + * there is no unwanted signal loss due to early {@code connect()} or {@code reset()} calls while {@code Observer}s are + * still being subscribed to to this {@code ConnectableObservable} to receive signals from the get go. * * @see RxJava Wiki: * Connectable Observable Operators @@ -51,6 +63,15 @@ public abstract class ConnectableObservable extends Observable { */ public abstract void connect(@NonNull Consumer connection); + /** + * Resets this ConnectableObservable into its fresh state if it has terminated + * or has been disposed. + *

+ * Calling this method on a fresh or active {@code ConnectableObservable} has no effect. + * @since 3.0.0 + */ + public abstract void reset(); + /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying * {@link Observable} to its {@link Observer}s. @@ -88,7 +109,7 @@ public Observable refCount() { /** * Connects to the upstream {@code ConnectableObservable} if the number of subscribed - * subscriber reaches the specified count and disconnect if all subscribers have unsubscribed. + * observers reaches the specified count and disconnect if all subscribers have unsubscribed. *

*
Scheduler:
*
This {@code refCount} overload does not operate on any particular {@link Scheduler}.
@@ -106,7 +127,7 @@ public final Observable refCount(int subscriberCount) { /** * Connects to the upstream {@code ConnectableObservable} if the number of subscribed - * subscriber reaches 1 and disconnect after the specified + * observers reaches 1 and disconnect after the specified * timeout if all subscribers have unsubscribed. *
*
Scheduler:
@@ -127,7 +148,7 @@ public final Observable refCount(long timeout, TimeUnit unit) { /** * Connects to the upstream {@code ConnectableObservable} if the number of subscribed - * subscriber reaches 1 and disconnect after the specified + * observers reaches 1 and disconnect after the specified * timeout if all subscribers have unsubscribed. *
*
Scheduler:
@@ -148,7 +169,7 @@ public final Observable refCount(long timeout, TimeUnit unit, Scheduler sched /** * Connects to the upstream {@code ConnectableObservable} if the number of subscribed - * subscriber reaches the specified count and disconnect after the specified + * observers reaches the specified count and disconnect after the specified * timeout if all subscribers have unsubscribed. *
*
Scheduler:
@@ -170,7 +191,7 @@ public final Observable refCount(int subscriberCount, long timeout, TimeUnit /** * Connects to the upstream {@code ConnectableObservable} if the number of subscribed - * subscriber reaches the specified count and disconnect after the specified + * observers reaches the specified count and disconnect after the specified * timeout if all subscribers have unsubscribed. *
*
Scheduler:
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java index eac12749f5..8bd47e95f5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java @@ -262,9 +262,8 @@ public void testConnectWithNoSubscriber() { cf.subscribe(subscriber); // Emit 1 and 2 scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); - subscriber.assertValues(1L, 2L); - subscriber.assertNoErrors(); - subscriber.assertTerminated(); + // 3.x: Flowable.publish no longer drains the input buffer if there are no subscribers + subscriber.assertResult(0L, 1L, 2L); } @Test @@ -280,6 +279,8 @@ public void testSubscribeAfterDisconnectThenConnect() { ts1.assertValue(1); ts1.assertNoErrors(); ts1.assertTerminated(); + + source.reset(); TestSubscriber ts2 = new TestSubscriber(); @@ -313,7 +314,7 @@ public void testNoSubscriberRetentionOnCompleted() { ts1.assertNoErrors(); ts1.assertTerminated(); - assertNull(source.current.get()); + assertEquals(0, source.current.get().subscribers.get().length); } @Test @@ -344,7 +345,7 @@ public void testNoDisconnectSomeoneElse() { @SuppressWarnings("unchecked") static boolean checkPublishDisposed(Disposable d) { - return ((FlowablePublish.PublishSubscriber)d).isDisposed(); + return ((FlowablePublish.PublishConnection)d).isDisposed(); } @Test @@ -674,16 +675,13 @@ protected void subscribeActual(Subscriber subscriber) { @Test public void noErrorLoss() { - List errors = TestHelper.trackPluginErrors(); - try { - ConnectableFlowable cf = Flowable.error(new TestException()).publish(); - - cf.connect(); + ConnectableFlowable cf = Flowable.error(new TestException()).publish(); - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } + cf.connect(); + + // 3.x: terminal events are always kept until reset. + cf.test() + .assertFailure(TestException.class); } @Test @@ -857,38 +855,36 @@ public Integer apply(Integer v) throws Exception { @Test public void dryRunCrash() { - List errors = TestHelper.trackPluginErrors(); - try { - final TestSubscriber ts = new TestSubscriber(1L) { - @Override - public void onNext(Object t) { - super.onNext(t); - onComplete(); - cancel(); - } - }; + final TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Object t) { + super.onNext(t); + onComplete(); + cancel(); + } + }; - Flowable.range(1, 10) - .map(new Function() { - @Override - public Object apply(Integer v) throws Exception { - if (v == 2) { - throw new TestException(); - } - return v; + Flowable source = Flowable.range(1, 10) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + if (v == 2) { + throw new TestException(); } - }) - .publish() - .autoConnect() - .subscribe(ts); + return v; + } + }) + .publish() + .autoConnect(); - ts - .assertResult(1); + source.subscribe(ts); - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } + ts + .assertResult(1); + + // 3.x: terminal events remain observable until reset + source.test() + .assertFailure(TestException.class); } @Test @@ -906,7 +902,9 @@ public void subscribe(FlowableEmitter s) throws Exception { .publish(8) .autoConnect() .test(0L) - .assertFailure(MissingBackpressureException.class); + // 3.x emits errors last, even the full queue errors + .requestMore(10) + .assertFailure(MissingBackpressureException.class, 0, 1, 2, 3, 4, 5, 6, 7); TestHelper.assertError(errors, 0, MissingBackpressureException.class); } finally { @@ -964,20 +962,20 @@ public void run() { @Test public void removeNotPresent() { - final AtomicReference> ref = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); final ConnectableFlowable cf = new Flowable() { @Override @SuppressWarnings("unchecked") protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); - ref.set((PublishSubscriber)s); + ref.set((PublishConnection)s); } }.publish(); cf.connect(); - ref.get().add(new InnerSubscriber(new TestSubscriber())); + ref.get().add(new InnerSubscription(new TestSubscriber(), ref.get())); ref.get().remove(null); } @@ -1304,7 +1302,7 @@ public void publishCancelOneAsync2() { final TestSubscriber ts1 = new TestSubscriber(); - final AtomicReference> ref = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); cf.subscribe(new FlowableSubscriber() { @SuppressWarnings("unchecked") @@ -1312,7 +1310,7 @@ public void publishCancelOneAsync2() { public void onSubscribe(Subscription s) { ts1.onSubscribe(new BooleanSubscription()); // pretend to be cancelled without removing it from the subscriber list - ref.set((InnerSubscriber)s); + ref.set((InnerSubscription)s); } @Override @@ -1498,4 +1496,207 @@ public boolean test(List v) throws Exception { Arrays.asList(18, 19) ); } -} + + @Test + public void altConnectCrash() { + try { + new FlowablePublish(Flowable.empty(), 128) + .connect(new Consumer() { + @Override + public void accept(Disposable t) throws Exception { + throw new TestException(); + } + }); + fail("Should have thrown"); + } catch (TestException expected) { + // expected + } + } + + @Test + public void altConnectRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final ConnectableFlowable cf = + new FlowablePublish(Flowable.never(), 128); + + Runnable r = new Runnable() { + @Override + public void run() { + cf.connect(); + } + }; + + TestHelper.race(r, r); + } + } + + @Test + public void fusedPollCrash() { + Flowable.range(1, 5) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.flowableStripBoundary()) + .publish() + .refCount() + .test() + .assertFailure(TestException.class); + } + + @Test + public void syncFusedNoRequest() { + Flowable.range(1, 5) + .publish(1) + .refCount() + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void normalBackpressuredPolls() { + Flowable.range(1, 5) + .hide() + .publish(1) + .refCount() + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void emptyHidden() { + Flowable.empty() + .hide() + .publish(1) + .refCount() + .test() + .assertResult(); + } + + @Test + public void emptyFused() { + Flowable.empty() + .publish(1) + .refCount() + .test() + .assertResult(); + } + + @Test + public void overflowQueueRefCount() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + } + } + .publish(1) + .refCount() + .test(0) + .requestMore(1) + .assertFailure(MissingBackpressureException.class, 1); + } + + @Test + public void doubleErrorRefCount() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("one")); + s.onError(new TestException("two")); + } + } + .publish(1) + .refCount() + .test(0) + .assertFailureAndMessage(TestException.class, "one"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "two"); + assertEquals(1, errors.size()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void onCompleteAvailableUntilReset() { + ConnectableFlowable cf = Flowable.just(1).publish(); + + TestSubscriber ts = cf.test(); + ts.assertEmpty(); + + cf.connect(); + + ts.assertResult(1); + + cf.test().assertResult(); + + cf.reset(); + + ts = cf.test(); + ts.assertEmpty(); + + cf.connect(); + + ts.assertResult(1); + } + + @Test + public void onErrorAvailableUntilReset() { + ConnectableFlowable cf = Flowable.just(1) + .concatWith(Flowable.error(new TestException())) + .publish(); + + TestSubscriber ts = cf.test(); + ts.assertEmpty(); + + cf.connect(); + + ts.assertFailure(TestException.class, 1); + + cf.test().assertFailure(TestException.class); + + cf.reset(); + + ts = cf.test(); + ts.assertEmpty(); + + cf.connect(); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void disposeResets() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.publish(); + + assertFalse(pp.hasSubscribers()); + + Disposable d = cf.connect(); + + assertTrue(pp.hasSubscribers()); + + d.dispose(); + + assertFalse(pp.hasSubscribers()); + + TestSubscriber ts = cf.test(); + + cf.connect(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts.assertValuesOnly(1); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 13d61fa5da..e2262718e6 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -551,7 +551,7 @@ public Publisher apply(Long t1) { return Flowable.defer(new Supplier>() { @Override public Publisher get() { - return Flowable.error(new TestException("Some exception")); + return Flowable.error(new TestException("Some exception")); } }); } @@ -637,6 +637,11 @@ public void connect(Consumer connection) { calls[0]++; } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -653,6 +658,7 @@ protected void subscribeActual(Subscriber subscriber) { @Test public void replayNoLeak() throws Exception { + Thread.sleep(100); System.gc(); Thread.sleep(100); @@ -669,6 +675,7 @@ public Object call() throws Exception { source.subscribe(); + Thread.sleep(100); System.gc(); Thread.sleep(100); @@ -680,6 +687,7 @@ public Object call() throws Exception { @Test public void replayNoLeak2() throws Exception { + Thread.sleep(100); System.gc(); Thread.sleep(100); @@ -703,6 +711,7 @@ public Object call() throws Exception { d1 = null; d2 = null; + Thread.sleep(100); System.gc(); Thread.sleep(100); @@ -724,6 +733,7 @@ static final class ExceptionData extends Exception { @Test public void publishNoLeak() throws Exception { + Thread.sleep(100); System.gc(); Thread.sleep(100); @@ -740,12 +750,14 @@ public Object call() throws Exception { source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); - System.gc(); Thread.sleep(100); + System.gc(); + Thread.sleep(200); long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); } @@ -813,22 +825,17 @@ public void connect(Consumer connection) { } @Override - protected void subscribeActual(Subscriber subscriber) { - throw new TestException("subscribeActual"); + public void reset() { + // nothing to do in this test } - } - - static final class BadFlowableDispose extends ConnectableFlowable implements Disposable { @Override - public void dispose() { - throw new TestException("dispose"); + protected void subscribeActual(Subscriber subscriber) { + throw new TestException("subscribeActual"); } + } - @Override - public boolean isDisposed() { - return false; - } + static final class BadFlowableDispose extends ConnectableFlowable { @Override public void connect(Consumer connection) { @@ -839,6 +846,11 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + throw new TestException("dispose"); + } + @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -852,6 +864,11 @@ public void connect(Consumer connection) { throw new TestException("connect"); } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -924,6 +941,11 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Subscriber subscriber) { if (++count == 1) { @@ -955,8 +977,7 @@ public void badSourceSubscribe2() { } } - static final class BadFlowableConnect2 extends ConnectableFlowable - implements Disposable { + static final class BadFlowableConnect2 extends ConnectableFlowable { @Override public void connect(Consumer connection) { @@ -968,19 +989,14 @@ public void connect(Consumer connection) { } @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - subscriber.onComplete(); - } - - @Override - public void dispose() { + public void reset() { throw new TestException("dispose"); } @Override - public boolean isDisposed() { - return false; + protected void subscribeActual(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + subscriber.onComplete(); } } @@ -1217,6 +1233,11 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1326,6 +1347,7 @@ public void cancelTerminateStateExclusion() { rc.subscriberCount = 1; rc.connected = true; o.connection = rc; + rc.set(null); o.cancel(rc); o.connection = rc; @@ -1357,19 +1379,13 @@ public void replayRefCountShallBeThreadSafe() { } } - static final class TestConnectableFlowable extends ConnectableFlowable - implements Disposable { - - volatile boolean disposed; + static final class TestConnectableFlowable extends ConnectableFlowable { - @Override - public void dispose() { - disposed = true; - } + volatile boolean reset; @Override - public boolean isDisposed() { - return disposed; + public void reset() { + reset = true; } @Override @@ -1384,15 +1400,18 @@ protected void subscribeActual(Subscriber subscriber) { } @Test - public void timeoutDisposesSource() { - FlowableRefCount o = (FlowableRefCount)new TestConnectableFlowable().refCount(); + public void timeoutResetsSource() { + TestConnectableFlowable tcf = new TestConnectableFlowable(); + FlowableRefCount o = (FlowableRefCount)tcf.refCount(); RefConnection rc = new RefConnection(o); + rc.set(Disposables.empty()); o.connection = rc; o.timeout(rc); + - assertTrue(((Disposable)o.source).isDisposed()); + assertTrue(tcf.reset); } @Test @@ -1409,4 +1428,31 @@ public void disconnectBeforeConnect() { flowable.take(1).test().assertResult(2); } -} + + @Test + public void publishRefCountShallBeThreadSafe() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + Flowable flowable = Flowable.just(1).publish().refCount(); + + TestSubscriber subscriber1 = flowable + .subscribeOn(Schedulers.io()) + .test(); + + TestSubscriber subscriber2 = flowable + .subscribeOn(Schedulers.io()) + .test(); + + subscriber1 + .withTag("subscriber1 " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + + subscriber2 + .withTag("subscriber2 " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java index 2f2a0e677e..ff703266ad 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java @@ -279,6 +279,8 @@ public void testSubscribeAfterDisconnectThenConnect() { to1.assertNoErrors(); to1.assertTerminated(); + source.reset(); + TestObserver to2 = new TestObserver(); source.subscribe(to2); @@ -311,7 +313,7 @@ public void testNoSubscriberRetentionOnCompleted() { to1.assertNoErrors(); to1.assertTerminated(); - assertNull(source.current.get()); + assertEquals(0, source.current.get().get().length); } @Test @@ -342,7 +344,7 @@ public void testNoDisconnectSomeoneElse() { @SuppressWarnings("unchecked") static boolean checkPublishDisposed(Disposable d) { - return ((ObservablePublish.PublishObserver)d).isDisposed(); + return ((ObservablePublish.PublishConnection)d).isDisposed(); } @Test @@ -607,16 +609,13 @@ protected void subscribeActual(Observer observer) { @Test public void noErrorLoss() { - List errors = TestHelper.trackPluginErrors(); - try { - ConnectableObservable co = Observable.error(new TestException()).publish(); + ConnectableObservable co = Observable.error(new TestException()).publish(); - co.connect(); + co.connect(); - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } + // 3.x: terminal events remain observable until reset + co.test() + .assertFailure(TestException.class); } @Test @@ -758,4 +757,112 @@ public void disposedUpfront() { ((ObservablePublish)co).current.get().remove(null); } -} + + @Test + public void altConnectCrash() { + try { + new ObservablePublish(Observable.empty()) + .connect(new Consumer() { + @Override + public void accept(Disposable t) throws Exception { + throw new TestException(); + } + }); + fail("Should have thrown"); + } catch (TestException expected) { + // expected + } + } + + @Test + public void altConnectRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final ConnectableObservable co = + new ObservablePublish(Observable.never()); + + Runnable r = new Runnable() { + @Override + public void run() { + co.connect(); + } + }; + + TestHelper.race(r, r); + } + } + + @Test + public void onCompleteAvailableUntilReset() { + ConnectableObservable co = Observable.just(1).publish(); + + TestObserver to = co.test(); + to.assertEmpty(); + + co.connect(); + + to.assertResult(1); + + co.test().assertResult(); + + co.reset(); + + to = co.test(); + to.assertEmpty(); + + co.connect(); + + to.assertResult(1); + } + + @Test + public void onErrorAvailableUntilReset() { + ConnectableObservable co = Observable.just(1) + .concatWith(Observable.error(new TestException())) + .publish(); + + TestObserver to = co.test(); + to.assertEmpty(); + + co.connect(); + + to.assertFailure(TestException.class, 1); + + co.test().assertFailure(TestException.class); + + co.reset(); + + to = co.test(); + to.assertEmpty(); + + co.connect(); + + to.assertFailure(TestException.class, 1); + } + + @Test + public void disposeResets() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.publish(); + + assertFalse(ps.hasObservers()); + + Disposable d = co.connect(); + + assertTrue(ps.hasObservers()); + + d.dispose(); + + assertFalse(ps.hasObservers()); + + TestObserver to = co.test(); + + co.connect(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertValuesOnly(1); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 7b3717773d..8eec8664d2 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -615,6 +615,11 @@ public void connect(Consumer connection) { calls[0]++; } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Observer observer) { observer.onSubscribe(Disposables.disposed()); @@ -789,24 +794,24 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Observer observer) { throw new TestException("subscribeActual"); } } - static final class BadObservableDispose extends ConnectableObservable implements Disposable { + static final class BadObservableDispose extends ConnectableObservable { @Override - public void dispose() { + public void reset() { throw new TestException("dispose"); } - @Override - public boolean isDisposed() { - return false; - } - @Override public void connect(Consumer connection) { try { @@ -829,6 +834,11 @@ public void connect(Consumer connection) { throw new TestException("connect"); } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Observer observer) { observer.onSubscribe(Disposables.empty()); @@ -887,6 +897,11 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Observer observer) { if (++count == 1) { @@ -911,8 +926,7 @@ public void badSourceSubscribe2() { } } - static final class BadObservableConnect2 extends ConnectableObservable - implements Disposable { + static final class BadObservableConnect2 extends ConnectableObservable { @Override public void connect(Consumer connection) { @@ -924,19 +938,14 @@ public void connect(Consumer connection) { } @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - observer.onComplete(); - } - - @Override - public void dispose() { + public void reset() { throw new TestException("dispose"); } @Override - public boolean isDisposed() { - return false; + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); } } @@ -1004,10 +1013,13 @@ public void accept(Disposable d) throws Exception { for (int i = 0; i < 3; i++) { TestObserver to1 = source.test(); + to1.withTag("to1 " + i); to1.assertEmpty(); TestObserver to2 = source.test(); + to2.withTag("to2 " + i); + to1.assertResult(1, 2, 3, 4, 5); to2.assertResult(1, 2, 3, 4, 5); } @@ -1166,6 +1178,11 @@ public void connect(Consumer connection) { } } + @Override + public void reset() { + // nothing to do in this test + } + @Override protected void subscribeActual(Observer observer) { observer.onSubscribe(Disposables.empty()); @@ -1277,6 +1294,7 @@ public void cancelTerminateStateExclusion() { rc.subscriberCount = 1; rc.connected = true; o.connection = rc; + rc.lazySet(null); o.cancel(rc); o.connection = rc; @@ -1308,19 +1326,13 @@ public void replayRefCountShallBeThreadSafe() { } } - static final class TestConnectableObservable extends ConnectableObservable - implements Disposable { + static final class TestConnectableObservable extends ConnectableObservable { - volatile boolean disposed; + volatile boolean reset; @Override - public void dispose() { - disposed = true; - } - - @Override - public boolean isDisposed() { - return disposed; + public void reset() { + reset = true; } @Override @@ -1335,15 +1347,17 @@ protected void subscribeActual(Observer observer) { } @Test - public void timeoutDisposesSource() { - ObservableRefCount o = (ObservableRefCount)new TestConnectableObservable().refCount(); + public void timeoutResetsSource() { + TestConnectableObservable tco = new TestConnectableObservable(); + ObservableRefCount o = (ObservableRefCount)tco.refCount(); RefConnection rc = new RefConnection(o); + rc.set(Disposables.empty()); o.connection = rc; o.timeout(rc); - assertTrue(((Disposable)o.source).isDisposed()); + assertTrue(tco.reset); } @Test @@ -1360,4 +1374,31 @@ public void disconnectBeforeConnect() { observable.take(1).test().assertResult(2); } -} + + @Test + public void publishRefCountShallBeThreadSafe() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + Observable observable = Observable.just(1).publish().refCount(); + + TestObserver observer1 = observable + .subscribeOn(Schedulers.io()) + .test(); + + TestObserver observer2 = observable + .subscribeOn(Schedulers.io()) + .test(); + + observer1 + .withTag("observer1 " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + + observer2 + .withTag("observer2 " + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index eb8e613f1b..2fa0ab9532 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -1771,6 +1771,11 @@ public void connect(Consumer connection) { } + @Override + public void reset() { + // nothing to do in this test + } + @SuppressWarnings("unchecked") @Override protected void subscribeActual(Observer observer) { @@ -1815,6 +1820,11 @@ public void connect(Consumer connection) { } + @Override + public void reset() { + // nothing to do in this test + } + @SuppressWarnings("unchecked") @Override protected void subscribeActual(Subscriber subscriber) {