Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: ConnectableFlowable/ConnetableObservabe redesign #6519

Merged
merged 1 commit into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12488,7 +12488,7 @@ public final <R> Flowable<R> publish(Function<? super Flowable<T>, ? extends Pub
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableFlowable<T> publish(int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return FlowablePublish.create(this, bufferSize);
return RxJavaPlugins.onAssembly(new FlowablePublish<T>(this, bufferSize));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10207,7 +10207,7 @@ public final Observable<T> onTerminateDetach() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableObservable<T> publish() {
return ObservablePublish.create(this);
return RxJavaPlugins.onAssembly(new ObservablePublish<T>(this));
}

/**
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/io/reactivex/flowables/ConnectableFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,24 @@
* before the {@code Flowable} begins emitting items.
* <p>
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
*
* <p>
* When the upstream terminates, the {@code ConnectableFlowable} remains in this terminated state and,
* depending on the actual underlying implementation, relays cached events to late {@link Subscriber}s.
* In order to reuse and restart this {@code ConnectableFlowable}, the {@link #reset()} method has to be called.
* When called, this {@code ConnectableFlowable} will appear as fresh, unconnected source to new {@link Subscriber}s.
* Disposing the connection will reset the {@code ConnectableFlowable} to its fresh state and there is no need to call
* {@code reset()} in this case.
* <p>
* Note that although {@link #connect()} and {@link #reset()} are safe to call from multiple threads, it is recommended
* a dedicated thread or business logic manages the connection or resetting of a {@code ConnectableFlowable} so that
* there is no unwanted signal loss due to early {@code connect()} or {@code reset()} calls while {@code Subscriber}s are
* still being subscribed to to this {@code ConnectableFlowable} to receive signals from the get go.
* <p>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators">RxJava Wiki:
* Connectable Observable Operators</a>
* @param <T>
* the type of items emitted by the {@code ConnectableFlowable}
* @since 2.0.0
*/
public abstract class ConnectableFlowable<T> extends Flowable<T> {

Expand All @@ -53,6 +66,14 @@ public abstract class ConnectableFlowable<T> extends Flowable<T> {
*/
public abstract void connect(@NonNull Consumer<? super Disposable> connection);

/**
* Resets this ConnectableFlowable into its fresh state if it has terminated.
* <p>
* Calling this method on a fresh or active {@code ConnectableFlowable} has no effect.
* @since 3.0.0
*/
public abstract void reset();

/**
* Instructs the {@code ConnectableFlowable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
Expand Down

This file was deleted.

Loading