Skip to content

Commit 735add2

Browse files
authored
3.x: [Java 8] Upgrade to Java 8, add Flowable.fromX operators (#6765)
* 3.x: [Java 8] Upgrade to Java 8, add Flowable.fromX operators * Add NonNull annotation to the new fromX methods * Annotate return type argument to Flowable<@nonnull T>
1 parent f96821f commit 735add2

17 files changed

+1419
-85
lines changed

build.gradle

+4-9
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ apply plugin: "com.jfrog.bintray"
6666
apply plugin: "com.jfrog.artifactory"
6767
apply plugin: "eclipse"
6868

69-
sourceCompatibility = JavaVersion.VERSION_1_6
70-
targetCompatibility = JavaVersion.VERSION_1_6
69+
sourceCompatibility = JavaVersion.VERSION_1_8
70+
targetCompatibility = JavaVersion.VERSION_1_8
7171

7272
repositories {
7373
mavenCentral()
7474
}
7575

7676
dependencies {
77-
signature "org.codehaus.mojo.signature:java16:1.1@signature"
77+
signature "org.codehaus.mojo.signature:java18:1.0@signature"
7878

7979
api "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
8080
jmh "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
@@ -103,14 +103,9 @@ javadoc {
103103
options.stylesheetFile = new File(projectDir, "gradle/stylesheet.css");
104104

105105
options.links(
106-
"https://docs.oracle.com/javase/7/docs/api/",
106+
"https://docs.oracle.com/javase/8/docs/api/",
107107
"http://www.reactive-streams.org/reactive-streams-${reactiveStreamsVersion}-javadoc/"
108108
)
109-
110-
if (JavaVersion.current().isJava7()) {
111-
// "./gradle/stylesheet.css" only supports Java 7
112-
options.addStringOption("stylesheetfile", rootProject.file("./gradle/stylesheet.css").toString())
113-
}
114109
}
115110

116111
animalsniffer {

src/main/java/io/reactivex/rxjava3/annotations/NonNull.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import java.lang.annotation.*;
2020

2121
/**
22-
* Indicates that a field/parameter/variable/return type is never null.
22+
* Indicates that a field/parameter/variable/type parameter/return type is never null.
2323
*/
2424
@Documented
25-
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE})
25+
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE, TYPE_PARAMETER, TYPE_USE})
2626
@Retention(value = CLASS)
2727
public @interface NonNull { }
2828

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+147-28
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.rxjava3.internal.jdk8;
14+
15+
import java.util.concurrent.CompletionStage;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
import java.util.function.BiConsumer;
18+
19+
import org.reactivestreams.Subscriber;
20+
21+
import io.reactivex.rxjava3.core.Flowable;
22+
import io.reactivex.rxjava3.internal.subscriptions.DeferredScalarSubscription;
23+
24+
/**
25+
* Wrap a CompletionStage and signal its outcome.
26+
* @param <T> the element type
27+
* @since 3.0.0
28+
*/
29+
public final class FlowableFromCompletionStage<T> extends Flowable<T> {
30+
31+
final CompletionStage<T> stage;
32+
33+
public FlowableFromCompletionStage(CompletionStage<T> stage) {
34+
this.stage = stage;
35+
}
36+
37+
@Override
38+
protected void subscribeActual(Subscriber<? super T> s) {
39+
// We need an indirection because one can't detach from a whenComplete
40+
// and cancellation should not hold onto the stage.
41+
BiConsumerAtomicReference<T> whenReference = new BiConsumerAtomicReference<>();
42+
CompletionStageHandler<T> handler = new CompletionStageHandler<>(s, whenReference);
43+
whenReference.lazySet(handler);
44+
45+
s.onSubscribe(handler);
46+
stage.whenComplete(whenReference);
47+
}
48+
49+
static final class CompletionStageHandler<T>
50+
extends DeferredScalarSubscription<T>
51+
implements BiConsumer<T, Throwable> {
52+
53+
private static final long serialVersionUID = 4665335664328839859L;
54+
55+
final BiConsumerAtomicReference<T> whenReference;
56+
57+
CompletionStageHandler(Subscriber<? super T> downstream, BiConsumerAtomicReference<T> whenReference) {
58+
super(downstream);
59+
this.whenReference = whenReference;
60+
}
61+
62+
@Override
63+
public void accept(T item, Throwable error) {
64+
if (error != null) {
65+
downstream.onError(error);
66+
}
67+
else if (item != null) {
68+
complete(item);
69+
} else {
70+
downstream.onError(new NullPointerException("The CompletionStage terminated with null."));
71+
}
72+
}
73+
74+
@Override
75+
public void cancel() {
76+
super.cancel();
77+
whenReference.set(null);
78+
}
79+
}
80+
81+
static final class BiConsumerAtomicReference<T> extends AtomicReference<BiConsumer<T, Throwable>>
82+
implements BiConsumer<T, Throwable> {
83+
84+
private static final long serialVersionUID = 45838553147237545L;
85+
86+
@Override
87+
public void accept(T t, Throwable u) {
88+
BiConsumer<T, Throwable> biConsumer = get();
89+
if (biConsumer != null) {
90+
biConsumer.accept(t, u);
91+
}
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)