Skip to content

Commit f96821f

Browse files
authored
3.x: Fix window (boundary, start/end) cancel and abandonment (#6762)
1 parent df30aa1 commit f96821f

12 files changed

+1056
-482
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+20
Original file line numberDiff line numberDiff line change
@@ -17965,6 +17965,11 @@ public final Flowable<Flowable<T>> window(
1796517965
* Publisher.
1796617966
* <p>
1796717967
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window8.png" alt="">
17968+
* <p>
17969+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
17970+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
17971+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
17972+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1796817973
* <dl>
1796917974
* <dt><b>Backpressure:</b></dt>
1797017975
* <dd>The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data
@@ -17995,6 +18000,11 @@ public final <B> Flowable<Flowable<T>> window(Publisher<B> boundaryIndicator) {
1799518000
* Publisher.
1799618001
* <p>
1799718002
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window8.png" alt="">
18003+
* <p>
18004+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18005+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18006+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18007+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1799818008
* <dl>
1799918009
* <dt><b>Backpressure:</b></dt>
1800018010
* <dd>The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data
@@ -18031,6 +18041,11 @@ public final <B> Flowable<Flowable<T>> window(Publisher<B> boundaryIndicator, in
1803118041
* {@code closingSelector} emits an item.
1803218042
* <p>
1803318043
* <img width="640" height="550" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window2.png" alt="">
18044+
* <p>
18045+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18046+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18047+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18048+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1803418049
* <dl>
1803518050
* <dt><b>Backpressure:</b></dt>
1803618051
* <dd>The outer Publisher of this operator doesn't support backpressure because the emission of new
@@ -18068,6 +18083,11 @@ public final <U, V> Flowable<Flowable<T>> window(
1806818083
* {@code closingSelector} emits an item.
1806918084
* <p>
1807018085
* <img width="640" height="550" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window2.png" alt="">
18086+
* <p>
18087+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18088+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18089+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18090+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1807118091
* <dl>
1807218092
* <dt><b>Backpressure:</b></dt>
1807318093
* <dd>The outer Publisher of this operator doesn't support backpressure because the emission of new

src/main/java/io/reactivex/rxjava3/core/Observable.java

+70
Original file line numberDiff line numberDiff line change
@@ -14536,6 +14536,11 @@ public final Observable<Observable<T>> window(long count, long skip, int bufferS
1453614536
* current window and propagates the notification from the source ObservableSource.
1453714537
* <p>
1453814538
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.png" alt="">
14539+
* <p>
14540+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14541+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14542+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14543+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1453914544
* <dl>
1454014545
* <dt><b>Scheduler:</b></dt>
1454114546
* <dd>This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.</dd>
@@ -14564,6 +14569,11 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time
1456414569
* current window and propagates the notification from the source ObservableSource.
1456514570
* <p>
1456614571
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.s.png" alt="">
14572+
* <p>
14573+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14574+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14575+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14576+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1456714577
* <dl>
1456814578
* <dt><b>Scheduler:</b></dt>
1456914579
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14594,6 +14604,11 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time
1459414604
* current window and propagates the notification from the source ObservableSource.
1459514605
* <p>
1459614606
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.s.png" alt="">
14607+
* <p>
14608+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14609+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14610+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14611+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1459714612
* <dl>
1459814613
* <dt><b>Scheduler:</b></dt>
1459914614
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14630,6 +14645,11 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time
1463014645
* ObservableSource emits the current window and propagates the notification from the source ObservableSource.
1463114646
* <p>
1463214647
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window5.png" alt="">
14648+
* <p>
14649+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14650+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14651+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14652+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1463314653
* <dl>
1463414654
* <dt><b>Scheduler:</b></dt>
1463514655
* <dd>This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.</dd>
@@ -14658,6 +14678,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
1465814678
* emits the current window and propagates the notification from the source ObservableSource.
1465914679
* <p>
1466014680
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.png" alt="">
14681+
* <p>
14682+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14683+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14684+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14685+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1466114686
* <dl>
1466214687
* <dt><b>Scheduler:</b></dt>
1466314688
* <dd>This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.</dd>
@@ -14690,6 +14715,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
1469014715
* emits the current window and propagates the notification from the source ObservableSource.
1469114716
* <p>
1469214717
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.png" alt="">
14718+
* <p>
14719+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14720+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14721+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14722+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1469314723
* <dl>
1469414724
* <dt><b>Scheduler:</b></dt>
1469514725
* <dd>This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.</dd>
@@ -14723,6 +14753,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
1472314753
* ObservableSource emits the current window and propagates the notification from the source ObservableSource.
1472414754
* <p>
1472514755
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window5.s.png" alt="">
14756+
* <p>
14757+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14758+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14759+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14760+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1472614761
* <dl>
1472714762
* <dt><b>Scheduler:</b></dt>
1472814763
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14754,6 +14789,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
1475414789
* current window and propagates the notification from the source ObservableSource.
1475514790
* <p>
1475614791
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
14792+
* <p>
14793+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14794+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14795+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14796+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1475714797
* <dl>
1475814798
* <dt><b>Scheduler:</b></dt>
1475914799
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14788,6 +14828,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
1478814828
* current window and propagates the notification from the source ObservableSource.
1478914829
* <p>
1479014830
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
14831+
* <p>
14832+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14833+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14834+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14835+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1479114836
* <dl>
1479214837
* <dt><b>Scheduler:</b></dt>
1479314838
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14824,6 +14869,11 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
1482414869
* current window and propagates the notification from the source ObservableSource.
1482514870
* <p>
1482614871
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
14872+
* <p>
14873+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14874+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14875+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14876+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1482714877
* <dl>
1482814878
* <dt><b>Scheduler:</b></dt>
1482914879
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
@@ -14865,6 +14915,11 @@ public final Observable<Observable<T>> window(
1486514915
* ObservableSource.
1486614916
* <p>
1486714917
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window8.png" alt="">
14918+
* <p>
14919+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14920+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14921+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14922+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1486814923
* <dl>
1486914924
* <dt><b>Scheduler:</b></dt>
1487014925
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -14891,6 +14946,11 @@ public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
1489114946
* ObservableSource.
1489214947
* <p>
1489314948
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window8.png" alt="">
14949+
* <p>
14950+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14951+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14952+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14953+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1489414954
* <dl>
1489514955
* <dt><b>Scheduler:</b></dt>
1489614956
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -14922,6 +14982,11 @@ public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary,
1492214982
* {@code closingIndicator} emits an item.
1492314983
* <p>
1492414984
* <img width="640" height="550" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window2.png" alt="">
14985+
* <p>
14986+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14987+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14988+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14989+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1492514990
* <dl>
1492614991
* <dt><b>Scheduler:</b></dt>
1492714992
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -14953,6 +15018,11 @@ public final <U, V> Observable<Observable<T>> window(
1495315018
* {@code closingIndicator} emits an item.
1495415019
* <p>
1495515020
* <img width="640" height="550" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window2.png" alt="">
15021+
* <p>
15022+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
15023+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
15024+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
15025+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1495615026
* <dl>
1495715027
* <dt><b>Scheduler:</b></dt>
1495815028
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,11 @@ void drain() {
240240

241241
if (emitted != requested.get()) {
242242
emitted++;
243-
downstream.onNext(w);
243+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(w);
244+
downstream.onNext(intercept);
245+
if (intercept.tryAbandon()) {
246+
w.onComplete();
247+
}
244248
} else {
245249
SubscriptionHelper.cancel(upstream);
246250
boundarySubscriber.dispose();

0 commit comments

Comments
 (0)