Skip to content

Commit 519dbca

Browse files
authored
Support RxJava 3 (#2501)
Motivation: Recently RxJava 3 has been released. https://github.com/ReactiveX/RxJava/releases/tag/v3.0.0 Armeria supports RxJava2 integration with `armeria-rxjava`. This PR migrates `RequestContextAssembly` from RxJava2 to RxJava3. RxJava 3 being based on Java 8 also supports seamless conversions between CompletionStage and Single, Maybe, Observable. Modifications: * Rename original `armeria-rxjava` to `armeria-rxjava2` * Make `armeria-rxjava` support RxJava 3 * Use built-in converter methods for ObservableResponseConverterFunction * Change `*Callable` to `*Supplier` ReactiveX/RxJava#6511 * Remove `assemblyContext.push()` in `RequestContextScalarCallable*.get()` * Don't need to push `RequestContext` for scalar type such as `Maybe.just(T)` * Delegate `reset()` method to `RequestContextConnectableFlowable`, `RequestContextConnectableObservable` https://github.com/ReactiveX/RxJava/wiki/What's-different-in-3.0#connectable-source-reset * Migrate `examples/context-propagation/rxjava` to RxJava 3 Result: You can now use RxJava 3 with Armeria. Fixes: #2378
1 parent 001a1f7 commit 519dbca

File tree

72 files changed

+2613
-248
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2613
-248
lines changed

dependencies.yml

+6
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ io.reactivex.rxjava2:
241241
javadocs:
242242
- http://reactivex.io/RxJava/2.x/javadoc/
243243

244+
io.reactivex.rxjava3:
245+
rxjava:
246+
version: '3.0.0'
247+
javadocs:
248+
- http://reactivex.io/RxJava/3.x/javadoc/
249+
244250
io.zipkin.brave:
245251
brave:
246252
# ':site:javadoc' fails when we use a newer version of Javadoc.

examples/context-propagation/rxjava/build.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ plugins {
55
dependencies {
66
implementation project(':core')
77
implementation project(':rxjava')
8-
9-
implementation 'net.javacrumbs.future-converter:future-converter-rxjava2-java8'
108
}
119

1210
application {

examples/context-propagation/rxjava/src/main/java/example/armeria/contextpropagation/rxjava/Main.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ public class Main {
88

99
public static void main(String[] args) {
1010
final Server backend = Server.builder()
11-
.service("/square/{num}", ((ctx, req) -> {
11+
.service("/square/{num}", (ctx, req) -> {
1212
final long num = Long.parseLong(ctx.pathParam("num"));
1313
return HttpResponse.of(Long.toString(num * num));
14-
}))
14+
})
1515
.http(8081)
1616
.build();
1717

examples/context-propagation/rxjava/src/main/java/example/armeria/contextpropagation/rxjava/MainService.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.ArrayList;
88
import java.util.List;
99

10-
import net.javacrumbs.futureconverter.java8rx2.FutureConverter;
11-
1210
import com.google.common.base.Splitter;
1311
import com.google.common.collect.ImmutableList;
1412
import com.google.common.collect.Iterables;
@@ -21,10 +19,10 @@
2119
import com.linecorp.armeria.server.HttpService;
2220
import com.linecorp.armeria.server.ServiceRequestContext;
2321

24-
import io.reactivex.Flowable;
25-
import io.reactivex.Scheduler;
26-
import io.reactivex.Single;
27-
import io.reactivex.schedulers.Schedulers;
22+
import io.reactivex.rxjava3.core.Flowable;
23+
import io.reactivex.rxjava3.core.Scheduler;
24+
import io.reactivex.rxjava3.core.Single;
25+
import io.reactivex.rxjava3.schedulers.Schedulers;
2826

2927
public class MainService implements HttpService {
3028

@@ -60,7 +58,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
6058
.flattenAsFlowable(l -> l);
6159

6260
final Flowable<Long> extractNumsFromRequest =
63-
FutureConverter.toSingle(req.aggregate())
61+
Single.fromCompletionStage(req.aggregate())
6462
// Unless you know what you're doing, always use subscribeOn with the context
6563
// executor to have the context mounted and stay on a single thread to reduce
6664
// concurrency issues.
@@ -96,13 +94,13 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
9694
checkState(ServiceRequestContext.current() == ctx);
9795
checkState(ctx.eventLoop().inEventLoop());
9896

99-
return FutureConverter.toSingle(backendClient.get("/square/" + num).aggregate());
97+
return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
10098
})
10199
.map(AggregatedHttpResponse::contentUtf8)
102100
.collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
103101
.map(content -> HttpResponse.of(content.toString()))
104102
.onErrorReturn(HttpResponse::ofFailure);
105103

106-
return HttpResponse.from(FutureConverter.toCompletableFuture(response));
104+
return HttpResponse.from(response.toCompletionStage());
107105
}
108106
}

rxjava/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
dependencies {
2-
api 'io.reactivex.rxjava2:rxjava'
2+
api 'io.reactivex.rxjava3:rxjava'
33
}

rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextAssembly.java

+35-48
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 LINE Corporation
2+
* Copyright 2020 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -16,24 +16,23 @@
1616

1717
package com.linecorp.armeria.common.rxjava;
1818

19-
import java.util.concurrent.Callable;
20-
2119
import javax.annotation.Nullable;
2220
import javax.annotation.concurrent.GuardedBy;
2321

2422
import com.linecorp.armeria.common.RequestContext;
2523

26-
import io.reactivex.Completable;
27-
import io.reactivex.Flowable;
28-
import io.reactivex.Maybe;
29-
import io.reactivex.Observable;
30-
import io.reactivex.Single;
31-
import io.reactivex.flowables.ConnectableFlowable;
32-
import io.reactivex.functions.Function;
33-
import io.reactivex.internal.fuseable.ScalarCallable;
34-
import io.reactivex.observables.ConnectableObservable;
35-
import io.reactivex.parallel.ParallelFlowable;
36-
import io.reactivex.plugins.RxJavaPlugins;
24+
import io.reactivex.rxjava3.core.Completable;
25+
import io.reactivex.rxjava3.core.Flowable;
26+
import io.reactivex.rxjava3.core.Maybe;
27+
import io.reactivex.rxjava3.core.Observable;
28+
import io.reactivex.rxjava3.core.Single;
29+
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
30+
import io.reactivex.rxjava3.functions.Function;
31+
import io.reactivex.rxjava3.functions.Supplier;
32+
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
33+
import io.reactivex.rxjava3.observables.ConnectableObservable;
34+
import io.reactivex.rxjava3.parallel.ParallelFlowable;
35+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3736

3837
/**
3938
* Utility class to keep {@link RequestContext} during RxJava operations.
@@ -78,9 +77,6 @@ public final class RequestContextAssembly {
7877
@GuardedBy("RequestContextAssembly.class")
7978
private static boolean enabled;
8079

81-
private RequestContextAssembly() {
82-
}
83-
8480
/**
8581
* Enable {@link RequestContext} during operators.
8682
*/
@@ -96,13 +92,13 @@ public static synchronized void enable() {
9692
new ConditionalOnCurrentRequestContextFunction<Observable>() {
9793
@Override
9894
Observable applyActual(Observable o, RequestContext ctx) {
99-
if (!(o instanceof Callable)) {
95+
if (!(o instanceof Supplier)) {
10096
return new RequestContextObservable(o, ctx);
10197
}
102-
if (o instanceof ScalarCallable) {
103-
return new RequestContextScalarCallableObservable(o, ctx);
98+
if (o instanceof ScalarSupplier) {
99+
return new RequestContextScalarSupplierObservable(o, ctx);
104100
}
105-
return new RequestContextCallableObservable(o, ctx);
101+
return new RequestContextSupplierObservable(o, ctx);
106102
}
107103
}));
108104

@@ -121,16 +117,8 @@ ConnectableObservable applyActual(ConnectableObservable co, RequestContext ctx)
121117
compose(oldOnCompletableAssembly,
122118
new ConditionalOnCurrentRequestContextFunction<Completable>() {
123119
@Override
124-
Completable applyActual(Completable c,
125-
RequestContext ctx) {
126-
if (!(c instanceof Callable)) {
127-
return new RequestContextCompletable(c, ctx);
128-
}
129-
if (c instanceof ScalarCallable) {
130-
return new RequestContextScalarCallableCompletable(
131-
c, ctx);
132-
}
133-
return new RequestContextCallableCompletable(c, ctx);
120+
Completable applyActual(Completable c, RequestContext ctx) {
121+
return new RequestContextCompletable(c, ctx);
134122
}
135123
}));
136124

@@ -139,13 +127,13 @@ Completable applyActual(Completable c,
139127
compose(oldOnSingleAssembly, new ConditionalOnCurrentRequestContextFunction<Single>() {
140128
@Override
141129
Single applyActual(Single s, RequestContext ctx) {
142-
if (!(s instanceof Callable)) {
130+
if (!(s instanceof Supplier)) {
143131
return new RequestContextSingle(s, ctx);
144132
}
145-
if (s instanceof ScalarCallable) {
146-
return new RequestContextScalarCallableSingle(s, ctx);
133+
if (s instanceof ScalarSupplier) {
134+
return new RequestContextScalarSupplierSingle(s, ctx);
147135
}
148-
return new RequestContextCallableSingle(s, ctx);
136+
return new RequestContextSupplierSingle(s, ctx);
149137
}
150138
}));
151139

@@ -154,13 +142,13 @@ Single applyActual(Single s, RequestContext ctx) {
154142
compose(oldOnMaybeAssembly, new ConditionalOnCurrentRequestContextFunction<Maybe>() {
155143
@Override
156144
Maybe applyActual(Maybe m, RequestContext ctx) {
157-
if (!(m instanceof Callable)) {
145+
if (!(m instanceof Supplier)) {
158146
return new RequestContextMaybe(m, ctx);
159147
}
160-
if (m instanceof ScalarCallable) {
161-
return new RequestContextScalarCallableMaybe(m, ctx);
148+
if (m instanceof ScalarSupplier) {
149+
return new RequestContextScalarSupplierMaybe(m, ctx);
162150
}
163-
return new RequestContextCallableMaybe(m, ctx);
151+
return new RequestContextSupplierMaybe(m, ctx);
164152
}
165153
}));
166154

@@ -169,13 +157,13 @@ Maybe applyActual(Maybe m, RequestContext ctx) {
169157
compose(oldOnFlowableAssembly, new ConditionalOnCurrentRequestContextFunction<Flowable>() {
170158
@Override
171159
Flowable applyActual(Flowable f, RequestContext ctx) {
172-
if (!(f instanceof Callable)) {
160+
if (!(f instanceof Supplier)) {
173161
return new RequestContextFlowable(f, ctx);
174162
}
175-
if (f instanceof ScalarCallable) {
176-
return new RequestContextScalarCallableFlowable(f, ctx);
163+
if (f instanceof ScalarSupplier) {
164+
return new RequestContextScalarSupplierFlowable(f, ctx);
177165
}
178-
return new RequestContextCallableFlowable(f, ctx);
166+
return new RequestContextSupplierFlowable(f, ctx);
179167
}
180168
}));
181169

@@ -184,11 +172,8 @@ Flowable applyActual(Flowable f, RequestContext ctx) {
184172
compose(oldOnConnectableFlowableAssembly,
185173
new ConditionalOnCurrentRequestContextFunction<ConnectableFlowable>() {
186174
@Override
187-
ConnectableFlowable applyActual(
188-
ConnectableFlowable cf,
189-
RequestContext ctx) {
190-
return new RequestContextConnectableFlowable(
191-
cf, ctx);
175+
ConnectableFlowable applyActual(ConnectableFlowable cf, RequestContext ctx) {
176+
return new RequestContextConnectableFlowable(cf, ctx);
192177
}
193178
}
194179
));
@@ -232,6 +217,8 @@ public static synchronized void disable() {
232217
enabled = false;
233218
}
234219

220+
private RequestContextAssembly() {}
221+
235222
private abstract static class ConditionalOnCurrentRequestContextFunction<T> implements Function<T, T> {
236223
@Override
237224
public final T apply(T t) {

rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextCompletable.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 LINE Corporation
2+
* Copyright 2020 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -19,9 +19,9 @@
1919
import com.linecorp.armeria.common.RequestContext;
2020
import com.linecorp.armeria.common.util.SafeCloseable;
2121

22-
import io.reactivex.Completable;
23-
import io.reactivex.CompletableObserver;
24-
import io.reactivex.CompletableSource;
22+
import io.reactivex.rxjava3.core.Completable;
23+
import io.reactivex.rxjava3.core.CompletableObserver;
24+
import io.reactivex.rxjava3.core.CompletableSource;
2525

2626
final class RequestContextCompletable extends Completable {
2727

rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextCompletableObserver.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 LINE Corporation
2+
* Copyright 2020 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -19,9 +19,9 @@
1919
import com.linecorp.armeria.common.RequestContext;
2020
import com.linecorp.armeria.common.util.SafeCloseable;
2121

22-
import io.reactivex.CompletableObserver;
23-
import io.reactivex.disposables.Disposable;
24-
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.rxjava3.core.CompletableObserver;
23+
import io.reactivex.rxjava3.disposables.Disposable;
24+
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
2525

2626
final class RequestContextCompletableObserver implements CompletableObserver, Disposable {
2727
private final CompletableObserver actual;

rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextConditionalSubscriber.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 LINE Corporation
2+
* Copyright 2020 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -19,9 +19,9 @@
1919
import com.linecorp.armeria.common.RequestContext;
2020
import com.linecorp.armeria.common.util.SafeCloseable;
2121

22-
import io.reactivex.internal.fuseable.ConditionalSubscriber;
23-
import io.reactivex.internal.fuseable.QueueSubscription;
24-
import io.reactivex.internal.subscribers.BasicFuseableConditionalSubscriber;
22+
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
23+
import io.reactivex.rxjava3.internal.fuseable.QueueSubscription;
24+
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber;
2525

2626
final class RequestContextConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
2727

@@ -73,7 +73,7 @@ public int requestFusion(int mode) {
7373
}
7474

7575
@Override
76-
public T poll() throws Exception {
76+
public T poll() throws Throwable {
7777
return qs.poll();
7878
}
7979
}

rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextConnectableFlowable.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 LINE Corporation
2+
* Copyright 2020 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -21,10 +21,10 @@
2121
import com.linecorp.armeria.common.RequestContext;
2222
import com.linecorp.armeria.common.util.SafeCloseable;
2323

24-
import io.reactivex.disposables.Disposable;
25-
import io.reactivex.flowables.ConnectableFlowable;
26-
import io.reactivex.functions.Consumer;
27-
import io.reactivex.internal.fuseable.ConditionalSubscriber;
24+
import io.reactivex.rxjava3.disposables.Disposable;
25+
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
26+
import io.reactivex.rxjava3.functions.Consumer;
27+
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
2828

2929
final class RequestContextConnectableFlowable<T> extends ConnectableFlowable<T> {
3030
private final ConnectableFlowable<T> source;
@@ -40,9 +40,8 @@ final class RequestContextConnectableFlowable<T> extends ConnectableFlowable<T>
4040
protected void subscribeActual(Subscriber<? super T> s) {
4141
try (SafeCloseable ignored = assemblyContext.push()) {
4242
if (s instanceof ConditionalSubscriber) {
43-
source.subscribe(new RequestContextConditionalSubscriber<>(
44-
(ConditionalSubscriber<? super T>) s, assemblyContext
45-
));
43+
source.subscribe(new RequestContextConditionalSubscriber<>((ConditionalSubscriber<? super T>) s,
44+
assemblyContext));
4645
} else {
4746
source.subscribe(new RequestContextSubscriber<>(s, assemblyContext));
4847
}
@@ -55,4 +54,9 @@ public void connect(Consumer<? super Disposable> connection) {
5554
source.connect(connection);
5655
}
5756
}
57+
58+
@Override
59+
public void reset() {
60+
source.reset();
61+
}
5862
}

0 commit comments

Comments
 (0)