diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java index f66d38169e..ae027e2d04 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java @@ -92,14 +92,14 @@ public void onNext(T t) { long i = index; UnicastProcessor w = window; - WindowSubscribeIntercept intercept = null; + FlowableWindowSubscribeIntercept intercept = null; if (i == 0) { getAndIncrement(); w = UnicastProcessor.create(bufferSize, this); window = w; - intercept = new WindowSubscribeIntercept(w); + intercept = new FlowableWindowSubscribeIntercept(w); downstream.onNext(intercept); } @@ -211,7 +211,7 @@ public void onSubscribe(Subscription s) { public void onNext(T t) { long i = index; - WindowSubscribeIntercept intercept = null; + FlowableWindowSubscribeIntercept intercept = null; UnicastProcessor w = window; if (i == 0) { getAndIncrement(); @@ -219,7 +219,7 @@ public void onNext(T t) { w = UnicastProcessor.create(bufferSize, this); window = w; - intercept = new WindowSubscribeIntercept(w); + intercept = new FlowableWindowSubscribeIntercept(w); downstream.onNext(intercept); } @@ -477,7 +477,7 @@ void drain() { break; } - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(t); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(t); a.onNext(intercept); if (intercept.tryAbandon()) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowSubscribeIntercept.java similarity index 90% rename from src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java rename to src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowSubscribeIntercept.java index 4fcbdee225..8f6cf0a2da 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowSubscribeIntercept.java @@ -25,13 +25,13 @@ * @param the element type of the flow. * @since 3.0.0 */ -final class WindowSubscribeIntercept extends Flowable { +final class FlowableWindowSubscribeIntercept extends Flowable { final FlowableProcessor window; final AtomicBoolean once; - WindowSubscribeIntercept(FlowableProcessor source) { + FlowableWindowSubscribeIntercept(FlowableProcessor source) { this.window = source; this.once = new AtomicBoolean(); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java index c340c0e0db..0dfec4e4af 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java @@ -28,7 +28,6 @@ import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.BackpressureHelper; import io.reactivex.rxjava3.processors.UnicastProcessor; -import io.reactivex.rxjava3.subscribers.SerializedSubscriber; public final class FlowableWindowTimed extends AbstractFlowableWithUpstream> { final long timespan; @@ -53,23 +52,21 @@ public FlowableWindowTimed(Flowable source, } @Override - protected void subscribeActual(Subscriber> s) { - SerializedSubscriber> actual = new SerializedSubscriber>(s); - + protected void subscribeActual(Subscriber> downstream) { if (timespan == timeskip) { if (maxSize == Long.MAX_VALUE) { source.subscribe(new WindowExactUnboundedSubscriber( - actual, + downstream, timespan, unit, scheduler, bufferSize)); return; } source.subscribe(new WindowExactBoundedSubscriber( - actual, + downstream, timespan, unit, scheduler, bufferSize, maxSize, restartTimerOnMaxSize)); return; } - source.subscribe(new WindowSkipSubscriber(actual, + source.subscribe(new WindowSkipSubscriber(downstream, timespan, timeskip, unit, scheduler.createWorker(), bufferSize)); } @@ -100,8 +97,8 @@ abstract static class AbstractWindowSubscriber final AtomicInteger windowCount; - AbstractWindowSubscriber(Subscriber> actual, long timespan, TimeUnit unit, int bufferSize) { - this.downstream = actual; + AbstractWindowSubscriber(Subscriber> downstream, long timespan, TimeUnit unit, int bufferSize) { + this.downstream = downstream; this.queue = new MpscLinkedQueue(); this.timespan = timespan; this.unit = unit; @@ -204,7 +201,7 @@ void createFirstWindow() { emitted = 1; - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit)); @@ -293,7 +290,7 @@ else if (!isEmpty) { window = UnicastProcessor.create(bufferSize, windowRunnable); this.window = window; - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); if (intercept.tryAbandon()) { @@ -372,7 +369,7 @@ void createFirstWindow() { windowCount.getAndIncrement(); window = UnicastProcessor.create(bufferSize, this); - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); Runnable boundaryTask = new WindowBoundaryRunnable(this, 1L); @@ -510,7 +507,7 @@ UnicastProcessor createNewWindow(UnicastProcessor window) { window = UnicastProcessor.create(bufferSize, this); this.window = window; - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); if (restartTimerOnMaxSize) { @@ -573,7 +570,7 @@ void createFirstWindow() { UnicastProcessor window = UnicastProcessor.create(bufferSize, this); windows.add(window); - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); @@ -647,7 +644,7 @@ void drain() { UnicastProcessor window = UnicastProcessor.create(bufferSize, this); windows.add(window); - WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(window); downstream.onNext(intercept); worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java index 08eb0327a6..3f11f3952a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java @@ -77,14 +77,17 @@ public void onSubscribe(Disposable d) { @Override public void onNext(T t) { UnicastSubject w = window; + ObservableWindowSubscribeIntercept intercept = null; if (w == null && !cancelled) { w = UnicastSubject.create(capacityHint, this); window = w; - downstream.onNext(w); + intercept = new ObservableWindowSubscribeIntercept(w); + downstream.onNext(intercept); } if (w != null) { w.onNext(t); + if (++size >= count) { size = 0; window = null; @@ -93,6 +96,12 @@ public void onNext(T t) { upstream.dispose(); } } + + if (intercept != null && intercept.tryAbandon()) { + w.onComplete(); + w = null; + window = null; + } } } @@ -180,11 +189,14 @@ public void onNext(T t) { long s = skip; + ObservableWindowSubscribeIntercept intercept = null; + if (i % s == 0 && !cancelled) { wip.getAndIncrement(); UnicastSubject w = UnicastSubject.create(capacityHint, this); + intercept = new ObservableWindowSubscribeIntercept(w); ws.offer(w); - downstream.onNext(w); + downstream.onNext(intercept); } long c = firstEmission + 1; @@ -205,6 +217,10 @@ public void onNext(T t) { } index = i + 1; + + if (intercept != null && intercept.tryAbandon()) { + intercept.window.onComplete(); + } } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowSubscribeIntercept.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowSubscribeIntercept.java new file mode 100644 index 0000000000..da6f96f2a3 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowSubscribeIntercept.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.subjects.Subject; + +/** + * Wrapper for a Subject that detects an incoming subscriber. + * @param the element type of the flow. + * @since 3.0.0 + */ +final class ObservableWindowSubscribeIntercept extends Observable { + + final Subject window; + + final AtomicBoolean once; + + ObservableWindowSubscribeIntercept(Subject source) { + this.window = source; + this.once = new AtomicBoolean(); + } + + @Override + protected void subscribeActual(Observer s) { + window.subscribe(s); + once.set(true); + } + + boolean tryAbandon() { + return !once.get() && once.compareAndSet(false, true); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java index cdd64cd6ae..3913bf2911 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java @@ -15,17 +15,16 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; -import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Scheduler.Worker; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.*; -import io.reactivex.rxjava3.internal.observers.QueueDrainObserver; +import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; -import io.reactivex.rxjava3.internal.util.NotificationLite; -import io.reactivex.rxjava3.observers.SerializedObserver; import io.reactivex.rxjava3.subjects.UnicastSubject; public final class ObservableWindowTimed extends AbstractObservableWithUpstream> { @@ -37,8 +36,7 @@ public final class ObservableWindowTimed extends AbstractObservableWithUpstre final int bufferSize; final boolean restartTimerOnMaxSize; - public ObservableWindowTimed( - ObservableSource source, + public ObservableWindowTimed(Observable source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize, int bufferSize, boolean restartTimerOnMaxSize) { super(source); @@ -52,223 +50,282 @@ public ObservableWindowTimed( } @Override - public void subscribeActual(Observer> t) { - SerializedObserver> actual = new SerializedObserver>(t); - + protected void subscribeActual(Observer> downstream) { if (timespan == timeskip) { if (maxSize == Long.MAX_VALUE) { source.subscribe(new WindowExactUnboundedObserver( - actual, + downstream, timespan, unit, scheduler, bufferSize)); return; } source.subscribe(new WindowExactBoundedObserver( - actual, + downstream, timespan, unit, scheduler, bufferSize, maxSize, restartTimerOnMaxSize)); return; } - source.subscribe(new WindowSkipObserver(actual, + source.subscribe(new WindowSkipObserver(downstream, timespan, timeskip, unit, scheduler.createWorker(), bufferSize)); } - static final class WindowExactUnboundedObserver - extends QueueDrainObserver> - implements Observer, Disposable, Runnable { + abstract static class AbstractWindowObserver + extends AtomicInteger + implements Observer, Disposable { + private static final long serialVersionUID = 5724293814035355511L; + + final Observer> downstream; + + final SimplePlainQueue queue; + final long timespan; final TimeUnit unit; - final Scheduler scheduler; final int bufferSize; - Disposable upstream; + long emitted; - UnicastSubject window; + volatile boolean done; + Throwable error; - final SequentialDisposable timer = new SequentialDisposable(); + Disposable upstream; - static final Object NEXT = new Object(); + final AtomicBoolean downstreamCancelled; - volatile boolean terminated; + volatile boolean upstreamCancelled; - WindowExactUnboundedObserver(Observer> actual, long timespan, TimeUnit unit, - Scheduler scheduler, int bufferSize) { - super(actual, new MpscLinkedQueue()); + final AtomicInteger windowCount; + + AbstractWindowObserver(Observer> downstream, long timespan, TimeUnit unit, int bufferSize) { + this.downstream = downstream; + this.queue = new MpscLinkedQueue(); this.timespan = timespan; this.unit = unit; - this.scheduler = scheduler; this.bufferSize = bufferSize; + this.downstreamCancelled = new AtomicBoolean(); + this.windowCount = new AtomicInteger(1); } @Override - public void onSubscribe(Disposable d) { + public final void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; - window = UnicastSubject.create(bufferSize); - - Observer> a = downstream; - a.onSubscribe(this); - - a.onNext(window); + downstream.onSubscribe(this); - if (!cancelled) { - Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); - timer.replace(task); - } + createFirstWindow(); } } + abstract void createFirstWindow(); + @Override - public void onNext(T t) { - if (terminated) { - return; - } - if (fastEnter()) { - window.onNext(t); - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } - } - drainLoop(); + public final void onNext(T t) { + queue.offer(t); + drain(); } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { error = t; done = true; - if (enter()) { - drainLoop(); - } - - downstream.onError(t); + drain(); } @Override - public void onComplete() { + public final void onComplete() { done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - downstream.onComplete(); + @Override + public final void dispose() { + if (downstreamCancelled.compareAndSet(false, true)) { + windowDone(); + } } @Override - public void dispose() { - cancelled = true; + public final boolean isDisposed() { + return downstreamCancelled.get(); + } + + final void windowDone() { + if (windowCount.decrementAndGet() == 0) { + cleanupResources(); + upstream.dispose(); + upstreamCancelled = true; + drain(); + } + } + + abstract void cleanupResources(); + + abstract void drain(); + } + + static final class WindowExactUnboundedObserver + extends AbstractWindowObserver + implements Runnable { + + private static final long serialVersionUID = 1155822639622580836L; + + final Scheduler scheduler; + + UnicastSubject window; + + final SequentialDisposable timer; + + static final Object NEXT_WINDOW = new Object(); + + final Runnable windowRunnable; + + WindowExactUnboundedObserver(Observer> actual, long timespan, TimeUnit unit, + Scheduler scheduler, int bufferSize) { + super(actual, timespan, unit, bufferSize); + this.scheduler = scheduler; + this.timer = new SequentialDisposable(); + this.windowRunnable = new WindowRunnable(); } @Override - public boolean isDisposed() { - return cancelled; + void createFirstWindow() { + if (!downstreamCancelled.get()) { + windowCount.getAndIncrement(); + window = UnicastSubject.create(bufferSize, windowRunnable); + + emitted = 1; + + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); + + timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit)); + + if (intercept.tryAbandon()) { + window.onComplete(); + } + } } @Override public void run() { - if (cancelled) { - terminated = true; - } - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } + queue.offer(NEXT_WINDOW); + drain(); } - void drainLoop() { + @Override + void drain() { + if (getAndIncrement() != 0) { + return; + } - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = downstream; - UnicastSubject w = window; + final SimplePlainQueue queue = this.queue; + final Observer> downstream = this.downstream; + UnicastSubject window = this.window; int missed = 1; for (;;) { - for (;;) { - boolean term = terminated; // NOPMD - - boolean d = done; - - Object o = q.poll(); - - if (d && (o == null || o == NEXT)) { - window = null; - q.clear(); - Throwable err = error; - if (err != null) { - w.onError(err); + if (upstreamCancelled) { + queue.clear(); + window = null; + this.window = null; + } else { + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + if (window != null) { + window.onError(ex); + } + downstream.onError(ex); } else { - w.onComplete(); + if (window != null) { + window.onComplete(); + } + downstream.onComplete(); } - timer.dispose(); - return; + cleanupResources(); + upstreamCancelled = true; + continue; } + else if (!isEmpty) { - if (o == null) { - break; - } + if (o == NEXT_WINDOW) { + if (window != null) { + window.onComplete(); + window = null; + this.window = null; + } + if (downstreamCancelled.get()) { + timer.dispose(); + } else { + emitted++; - if (o == NEXT) { - w.onComplete(); - if (!term) { - w = UnicastSubject.create(bufferSize); - window = w; + windowCount.getAndIncrement(); + window = UnicastSubject.create(bufferSize, windowRunnable); + this.window = window; - a.onNext(w); - } else { - upstream.dispose(); + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); + + if (intercept.tryAbandon()) { + window.onComplete(); + } + } + } else if (window != null) { + @SuppressWarnings("unchecked") + T item = (T)o; + window.onNext(item); } + continue; } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } } } + + @Override + void cleanupResources() { + timer.dispose(); + } + + final class WindowRunnable implements Runnable { + @Override + public void run() { + windowDone(); + } + } } static final class WindowExactBoundedObserver - extends QueueDrainObserver> - implements Disposable { - final long timespan; - final TimeUnit unit; + extends AbstractWindowObserver + implements Runnable { + private static final long serialVersionUID = -6130475889925953722L; + final Scheduler scheduler; - final int bufferSize; final boolean restartTimerOnMaxSize; final long maxSize; - final Scheduler.Worker worker; long count; - long producerIndex; - - Disposable upstream; - UnicastSubject window; - volatile boolean terminated; - - final SequentialDisposable timer = new SequentialDisposable(); + final SequentialDisposable timer; WindowExactBoundedObserver( Observer> actual, long timespan, TimeUnit unit, Scheduler scheduler, int bufferSize, long maxSize, boolean restartTimerOnMaxSize) { - super(actual, new MpscLinkedQueue()); - this.timespan = timespan; - this.unit = unit; + super(actual, timespan, unit, bufferSize); this.scheduler = scheduler; - this.bufferSize = bufferSize; this.maxSize = maxSize; this.restartTimerOnMaxSize = restartTimerOnMaxSize; if (restartTimerOnMaxSize) { @@ -276,415 +333,288 @@ static final class WindowExactBoundedObserver } else { worker = null; } + this.timer = new SequentialDisposable(); } @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(this.upstream, d)) { - this.upstream = d; - - Observer> a = downstream; - - a.onSubscribe(this); - - if (cancelled) { - return; - } + void createFirstWindow() { + if (!downstreamCancelled.get()) { + emitted = 1; - UnicastSubject w = UnicastSubject.create(bufferSize); - window = w; + windowCount.getAndIncrement(); + window = UnicastSubject.create(bufferSize, this); - a.onNext(w); + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); - Disposable task; - ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); + Runnable boundaryTask = new WindowBoundaryRunnable(this, 1L); if (restartTimerOnMaxSize) { - task = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); + timer.replace(worker.schedulePeriodically(boundaryTask, timespan, timespan, unit)); } else { - task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); + timer.replace(scheduler.schedulePeriodicallyDirect(boundaryTask, timespan, timespan, unit)); } - timer.replace(task); - } - } - - @Override - public void onNext(T t) { - if (terminated) { - return; - } - - if (fastEnter()) { - UnicastSubject w = window; - w.onNext(t); - - long c = count + 1; - - if (c >= maxSize) { - producerIndex++; - count = 0; - - w.onComplete(); - - w = UnicastSubject.create(bufferSize); - window = w; - downstream.onNext(w); - if (restartTimerOnMaxSize) { - Disposable tm = timer.get(); - tm.dispose(); - Disposable task = worker.schedulePeriodically( - new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - - DisposableHelper.replace(timer, task); - } - } else { - count = c; - } - - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; + if (intercept.tryAbandon()) { + window.onComplete(); } } - drainLoop(); } @Override - public void onError(Throwable t) { - error = t; - done = true; - if (enter()) { - drainLoop(); - } - - downstream.onError(t); + public void run() { + windowDone(); } @Override - public void onComplete() { - done = true; - if (enter()) { - drainLoop(); + void cleanupResources() { + timer.dispose(); + Worker w = worker; + if (w != null) { + w.dispose(); } - - downstream.onComplete(); } - @Override - public void dispose() { - cancelled = true; + void boundary(WindowBoundaryRunnable sender) { + queue.offer(sender); + drain(); } @Override - public boolean isDisposed() { - return cancelled; - } - - void disposeTimer() { - DisposableHelper.dispose(timer); - Worker w = worker; - if (w != null) { - w.dispose(); + void drain() { + if (getAndIncrement() != 0) { + return; } - } - - void drainLoop() { - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = downstream; - UnicastSubject w = window; int missed = 1; - for (;;) { + final SimplePlainQueue queue = this.queue; + final Observer> downstream = this.downstream; + UnicastSubject window = this.window; - for (;;) { - if (terminated) { - upstream.dispose(); - q.clear(); - disposeTimer(); - return; - } - - boolean d = done; + for (;;) { - Object o = q.poll(); + if (upstreamCancelled) { + queue.clear(); + window = null; + this.window = null; + } else { - boolean empty = o == null; - boolean isHolder = o instanceof ConsumerIndexHolder; + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; - if (d && (empty || isHolder)) { - window = null; - q.clear(); - Throwable err = error; - if (err != null) { - w.onError(err); + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + if (window != null) { + window.onError(ex); + } + downstream.onError(ex); } else { - w.onComplete(); + if (window != null) { + window.onComplete(); + } + downstream.onComplete(); } - disposeTimer(); - return; - } - - if (empty) { - break; - } - - if (isHolder) { - ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; - if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) { - w.onComplete(); - count = 0; - w = UnicastSubject.create(bufferSize); - window = w; - - a.onNext(w); + cleanupResources(); + upstreamCancelled = true; + continue; + } else if (!isEmpty) { + if (o instanceof WindowBoundaryRunnable) { + WindowBoundaryRunnable boundary = (WindowBoundaryRunnable) o; + if (boundary.index == emitted || !restartTimerOnMaxSize) { + this.count = 0; + window = createNewWindow(window); + } + } else if (window != null) { + @SuppressWarnings("unchecked") + T item = (T)o; + window.onNext(item); + + long count = this.count + 1; + if (count == maxSize) { + this.count = 0; + window = createNewWindow(window); + } else { + this.count = count; + } } + continue; } + } - w.onNext(NotificationLite.getValue(o)); - long c = count + 1; - - if (c >= maxSize) { - producerIndex++; - count = 0; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } - w.onComplete(); + UnicastSubject createNewWindow(UnicastSubject window) { + if (window != null) { + window.onComplete(); + window = null; + } - w = UnicastSubject.create(bufferSize); - window = w; - downstream.onNext(w); + if (downstreamCancelled.get()) { + cleanupResources(); + } else { + long emitted = this.emitted; + this.emitted = ++emitted; - if (restartTimerOnMaxSize) { - Disposable tm = timer.get(); - tm.dispose(); + windowCount.getAndIncrement(); + window = UnicastSubject.create(bufferSize, this); + this.window = window; - Disposable task = worker.schedulePeriodically( - new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - if (!timer.compareAndSet(tm, task)) { - task.dispose(); - } - } + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); - } else { - count = c; - } + if (restartTimerOnMaxSize) { + timer.update(worker.schedulePeriodically(new WindowBoundaryRunnable(this, emitted), timespan, timespan, unit)); } - missed = leave(-missed); - if (missed == 0) { - break; + if (intercept.tryAbandon()) { + window.onComplete(); } } + + return window; } - static final class ConsumerIndexHolder implements Runnable { - final long index; + static final class WindowBoundaryRunnable implements Runnable { + final WindowExactBoundedObserver parent; - ConsumerIndexHolder(long index, WindowExactBoundedObserver parent) { - this.index = index; + + final long index; + + WindowBoundaryRunnable(WindowExactBoundedObserver parent, long index) { this.parent = parent; + this.index = index; } @Override public void run() { - WindowExactBoundedObserver p = parent; - - if (!p.cancelled) { - p.queue.offer(this); - } else { - p.terminated = true; - } - if (p.enter()) { - p.drainLoop(); - } + parent.boundary(this); } } } static final class WindowSkipObserver - extends QueueDrainObserver> - implements Disposable, Runnable { - final long timespan; + extends AbstractWindowObserver + implements Runnable { + private static final long serialVersionUID = -7852870764194095894L; + final long timeskip; - final TimeUnit unit; final Scheduler.Worker worker; - final int bufferSize; final List> windows; - Disposable upstream; - - volatile boolean terminated; - WindowSkipObserver(Observer> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.timespan = timespan; + super(actual, timespan, unit, bufferSize); this.timeskip = timeskip; - this.unit = unit; this.worker = worker; - this.bufferSize = bufferSize; this.windows = new LinkedList>(); } @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(this.upstream, d)) { - this.upstream = d; - - downstream.onSubscribe(this); - - if (cancelled) { - return; - } - - final UnicastSubject w = UnicastSubject.create(bufferSize); - windows.add(w); + void createFirstWindow() { + if (!downstreamCancelled.get()) { + emitted = 1; - downstream.onNext(w); - worker.schedule(new CompletionTask(w), timespan, unit); + windowCount.getAndIncrement(); + UnicastSubject window = UnicastSubject.create(bufferSize, this); + windows.add(window); - worker.schedulePeriodically(this, timeskip, timeskip, unit); - } + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); - } + worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); + worker.schedulePeriodically(new WindowBoundaryRunnable(this, true), timeskip, timeskip, unit); - @Override - public void onNext(T t) { - if (fastEnter()) { - for (UnicastSubject w : windows) { - w.onNext(t); - } - if (leave(-1) == 0) { - return; + if (intercept.tryAbandon()) { + window.onComplete(); + windows.remove(window); } - } else { - queue.offer(t); - if (!enter()) { - return; - } - } - drainLoop(); - } - - @Override - public void onError(Throwable t) { - error = t; - done = true; - if (enter()) { - drainLoop(); } - - downstream.onError(t); - } - - @Override - public void onComplete() { - done = true; - if (enter()) { - drainLoop(); - } - - downstream.onComplete(); } @Override - public void dispose() { - cancelled = true; + void cleanupResources() { + worker.dispose(); } @Override - public boolean isDisposed() { - return cancelled; - } - - void complete(UnicastSubject w) { - queue.offer(new SubjectWork(w, false)); - if (enter()) { - drainLoop(); + void drain() { + if (getAndIncrement() != 0) { + return; } - } - - @SuppressWarnings("unchecked") - void drainLoop() { - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = downstream; - final List> ws = windows; int missed = 1; + final SimplePlainQueue queue = this.queue; + final Observer> downstream = this.downstream; + final List> windows = this.windows; for (;;) { - - for (;;) { - if (terminated) { - upstream.dispose(); - q.clear(); - ws.clear(); - worker.dispose(); - return; - } - - boolean d = done; - - Object v = q.poll(); - - boolean empty = v == null; - boolean sw = v instanceof SubjectWork; - - if (d && (empty || sw)) { - q.clear(); - Throwable e = error; - if (e != null) { - for (UnicastSubject w : ws) { - w.onError(e); + if (upstreamCancelled) { + queue.clear(); + windows.clear(); + } else { + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + for (UnicastSubject window : windows) { + window.onError(ex); } + downstream.onError(ex); } else { - for (UnicastSubject w : ws) { - w.onComplete(); + for (UnicastSubject window : windows) { + window.onComplete(); } + downstream.onComplete(); } - ws.clear(); - worker.dispose(); - return; - } + cleanupResources(); + upstreamCancelled = true; + continue; + } else if (!isEmpty) { + if (o == WINDOW_OPEN) { + if (!downstreamCancelled.get()) { + long emitted = this.emitted; + this.emitted = ++emitted; - if (empty) { - break; - } + windowCount.getAndIncrement(); + UnicastSubject window = UnicastSubject.create(bufferSize, this); + windows.add(window); - if (sw) { - SubjectWork work = (SubjectWork)v; + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(window); + downstream.onNext(intercept); - if (work.open) { - if (cancelled) { - continue; - } + worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); - final UnicastSubject w = UnicastSubject.create(bufferSize); - ws.add(w); - a.onNext(w); - - worker.schedule(new CompletionTask(w), timespan, unit); + if (intercept.tryAbandon()) { + window.onComplete(); + } + } + } else if (o == WINDOW_CLOSE) { + if (!windows.isEmpty()) { + windows.remove(0).onComplete(); + } } else { - ws.remove(work.w); - work.w.onComplete(); - if (ws.isEmpty() && cancelled) { - terminated = true; + @SuppressWarnings("unchecked") + T item = (T)o; + for (UnicastSubject window : windows) { + window.onNext(item); } } - } else { - for (UnicastSubject w : ws) { - w.onNext((T)v); - } + continue; } } - - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } @@ -693,37 +623,31 @@ void drainLoop() { @Override public void run() { - - UnicastSubject w = UnicastSubject.create(bufferSize); - - SubjectWork sw = new SubjectWork(w, true); - if (!cancelled) { - queue.offer(sw); - } - if (enter()) { - drainLoop(); - } + windowDone(); } - static final class SubjectWork { - final UnicastSubject w; - final boolean open; - SubjectWork(UnicastSubject w, boolean open) { - this.w = w; - this.open = open; - } + void boundary(boolean isOpen) { + queue.offer(isOpen ? WINDOW_OPEN : WINDOW_CLOSE); + drain(); } - final class CompletionTask implements Runnable { - private final UnicastSubject w; + static final Object WINDOW_OPEN = new Object(); + static final Object WINDOW_CLOSE = new Object(); + + static final class WindowBoundaryRunnable implements Runnable { - CompletionTask(UnicastSubject w) { - this.w = w; + final WindowSkipObserver parent; + + final boolean isOpen; + + WindowBoundaryRunnable(WindowSkipObserver parent, boolean isOpen) { + this.parent = parent; + this.isOpen = isOpen; } @Override public void run() { - complete(w); + parent.boundary(isOpen); } } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java index e90d46bd36..8053b0f532 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java @@ -17,7 +17,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.Test; @@ -37,7 +37,7 @@ public class ObservableWindowWithSizeTest extends RxJavaTest { private static List> toLists(Observable> observables) { final List> lists = new ArrayList>(); - Observable.concat(observables.map(new Function, Observable>>() { + Observable.concatEager(observables.map(new Function, Observable>>() { @Override public Observable> apply(Observable xs) { return xs.toList().toObservable(); @@ -376,4 +376,166 @@ public void accept(Observable w) throws Exception { to[0].assertFailure(TestException.class, 1); } + + @Test + public void cancellingWindowCancelsUpstreamSize() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(10) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamSize() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(10) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + inner.get().test().assertResult(1); + } + + @Test + public void cancellingWindowCancelsUpstreamSkip() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(5, 10) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamSkip() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(5, 10) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + inner.get().test().assertResult(1); + } + + @Test + public void cancellingWindowCancelsUpstreamOverlap() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(5, 3) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamOverlap() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(5, 3) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + inner.get().test().assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java index ba9544eb9c..271cde0208 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -949,4 +949,154 @@ public void accept(Observable v) throws Exception { assertFalse("The doOnNext got interrupted!", isInterrupted.get()); } + + @Test + public void cancellingWindowCancelsUpstreamExactTime() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(10, TimeUnit.MINUTES) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTime() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(10, TimeUnit.MINUTES) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } + + @Test + public void cancellingWindowCancelsUpstreamExactTimeAndSize() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(10, TimeUnit.MINUTES, 100) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTimeAndSize() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(10, TimeUnit.MINUTES, 100) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } + + @Test + public void cancellingWindowCancelsUpstreamExactTimeSkip() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(10, 15, TimeUnit.MINUTES) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTimeSkip() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(10, 15, TimeUnit.MINUTES) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Subject still has subscribers!", ps.hasObservers()); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } }