Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RxJava 3 #2501

Merged
merged 5 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ io.reactivex.rxjava2:
javadocs:
- http://reactivex.io/RxJava/2.x/javadoc/

io.reactivex.rxjava3:
rxjava:
version: '3.0.0'
javadocs:
- http://reactivex.io/RxJava/3.x/javadoc/

io.zipkin.brave:
brave:
# ':site:javadoc' fails when we use a newer version of Javadoc.
Expand Down
2 changes: 0 additions & 2 deletions examples/context-propagation/rxjava/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ plugins {
dependencies {
implementation project(':core')
implementation project(':rxjava')

implementation 'net.javacrumbs.future-converter:future-converter-rxjava2-java8'
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ public class Main {

public static void main(String[] args) {
final Server backend = Server.builder()
.service("/square/{num}", ((ctx, req) -> {
.service("/square/{num}", (ctx, req) -> {
final long num = Long.parseLong(ctx.pathParam("num"));
return HttpResponse.of(Long.toString(num * num));
}))
})
.http(8081)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import java.util.ArrayList;
import java.util.List;

import net.javacrumbs.futureconverter.java8rx2.FutureConverter;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -21,10 +19,10 @@
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.ServiceRequestContext;

import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class MainService implements HttpService {

Expand Down Expand Up @@ -60,7 +58,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
.flattenAsFlowable(l -> l);

final Flowable<Long> extractNumsFromRequest =
FutureConverter.toSingle(req.aggregate())
Single.fromCompletionStage(req.aggregate())
// Unless you know what you're doing, always use subscribeOn with the context
// executor to have the context mounted and stay on a single thread to reduce
// concurrency issues.
Expand Down Expand Up @@ -96,13 +94,13 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
checkState(ServiceRequestContext.current() == ctx);
checkState(ctx.eventLoop().inEventLoop());

return FutureConverter.toSingle(backendClient.get("/square/" + num).aggregate());
return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
})
.map(AggregatedHttpResponse::contentUtf8)
.collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
.map(content -> HttpResponse.of(content.toString()))
.onErrorReturn(HttpResponse::ofFailure);

return HttpResponse.from(FutureConverter.toCompletableFuture(response));
return HttpResponse.from(response.toCompletionStage());
}
}
2 changes: 1 addition & 1 deletion rxjava/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dependencies {
api 'io.reactivex.rxjava2:rxjava'
api 'io.reactivex.rxjava3:rxjava'
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import com.linecorp.armeria.server.annotation.ResponseConverterFunctionProvider;
import com.linecorp.armeria.server.rxjava.ObservableResponseConverterFunction;

import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;

/**
* Provides an {@link ObservableResponseConverterFunction} to annotated services.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

/**
* Provide a default {@link com.linecorp.armeria.server.annotation.ResponseConverterFunction}
* which automatically converts an {@link io.reactivex.rxjava3.core.ObservableSource} into an
* {@link com.linecorp.armeria.common.HttpResponse} when the {@link io.reactivex.rxjava3.core.ObservableSource}
* is returned by an annotated HTTP service.
*/
@NonNullByDefault
package com.linecorp.armeria.internal.server.rxjava;

import com.linecorp.armeria.common.util.NonNullByDefault;
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.rxjava;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import com.linecorp.armeria.common.RequestContext;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.observables.ConnectableObservable;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Utility class to keep {@link RequestContext} during RxJava operations.
*/
public final class RequestContextAssembly {

@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super ConnectableObservable, ? extends ConnectableObservable>
oldOnConnectableObservableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super Completable, ? extends Completable> oldOnCompletableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super Single, ? extends Single> oldOnSingleAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super Maybe, ? extends Maybe> oldOnMaybeAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super Flowable, ? extends Flowable> oldOnFlowableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super ConnectableFlowable, ? extends ConnectableFlowable>
oldOnConnectableFlowableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
@GuardedBy("RequestContextAssembly.class")
private static Function<? super ParallelFlowable, ? extends ParallelFlowable> oldOnParallelAssembly;

@GuardedBy("RequestContextAssembly.class")
private static boolean enabled;

private RequestContextAssembly() {
}

/**
* Enable {@link RequestContext} during operators.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static synchronized void enable() {
if (enabled) {
return;
}

oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(compose(
oldOnObservableAssembly,
new ConditionalOnCurrentRequestContextFunction<Observable>() {
@Override
Observable applyActual(Observable o, RequestContext ctx) {
if (!(o instanceof Supplier)) {
return new RequestContextObservable(o, ctx);
}
if (o instanceof ScalarSupplier) {
return new RequestContextScalarSupplierObservable(o, ctx);
}
return new RequestContextSupplierObservable(o, ctx);
}
}));

oldOnConnectableObservableAssembly = RxJavaPlugins.getOnConnectableObservableAssembly();
RxJavaPlugins.setOnConnectableObservableAssembly(compose(
oldOnConnectableObservableAssembly,
new ConditionalOnCurrentRequestContextFunction<ConnectableObservable>() {
@Override
ConnectableObservable applyActual(ConnectableObservable co, RequestContext ctx) {
return new RequestContextConnectableObservable(co, ctx);
}
}));

oldOnCompletableAssembly = RxJavaPlugins.getOnCompletableAssembly();
RxJavaPlugins.setOnCompletableAssembly(
compose(oldOnCompletableAssembly,
new ConditionalOnCurrentRequestContextFunction<Completable>() {
@Override
Completable applyActual(Completable c, RequestContext ctx) {
return new RequestContextCompletable(c, ctx);
}
}));

oldOnSingleAssembly = RxJavaPlugins.getOnSingleAssembly();
RxJavaPlugins.setOnSingleAssembly(
compose(oldOnSingleAssembly, new ConditionalOnCurrentRequestContextFunction<Single>() {
@Override
Single applyActual(Single s, RequestContext ctx) {
if (!(s instanceof Supplier)) {
return new RequestContextSingle(s, ctx);
}
if (s instanceof ScalarSupplier) {
return new RequestContextScalarSupplierSingle(s, ctx);
}
return new RequestContextSupplierSingle(s, ctx);
}
}));

oldOnMaybeAssembly = RxJavaPlugins.getOnMaybeAssembly();
RxJavaPlugins.setOnMaybeAssembly(
compose(oldOnMaybeAssembly, new ConditionalOnCurrentRequestContextFunction<Maybe>() {
@Override
Maybe applyActual(Maybe m, RequestContext ctx) {
if (!(m instanceof Supplier)) {
return new RequestContextMaybe(m, ctx);
}
if (m instanceof ScalarSupplier) {
return new RequestContextScalarSupplierMaybe(m, ctx);
}
return new RequestContextSupplierMaybe(m, ctx);
}
}));

oldOnFlowableAssembly = RxJavaPlugins.getOnFlowableAssembly();
RxJavaPlugins.setOnFlowableAssembly(
compose(oldOnFlowableAssembly, new ConditionalOnCurrentRequestContextFunction<Flowable>() {
@Override
Flowable applyActual(Flowable f, RequestContext ctx) {
if (!(f instanceof Supplier)) {
return new RequestContextFlowable(f, ctx);
}
if (f instanceof ScalarSupplier) {
return new RequestContextScalarSupplierFlowable(f, ctx);
}
return new RequestContextSupplierFlowable(f, ctx);
}
}));

oldOnConnectableFlowableAssembly = RxJavaPlugins.getOnConnectableFlowableAssembly();
RxJavaPlugins.setOnConnectableFlowableAssembly(
compose(oldOnConnectableFlowableAssembly,
new ConditionalOnCurrentRequestContextFunction<ConnectableFlowable>() {
@Override
ConnectableFlowable applyActual(
ConnectableFlowable cf,
RequestContext ctx) {
return new RequestContextConnectableFlowable(
cf, ctx);
}
}
));

oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly();
RxJavaPlugins.setOnParallelAssembly(
compose(oldOnParallelAssembly,
new ConditionalOnCurrentRequestContextFunction<ParallelFlowable>() {
@Override
ParallelFlowable applyActual(ParallelFlowable pf, RequestContext ctx) {
return new RequestContextParallelFlowable(pf, ctx);
}
}
));
enabled = true;
}

/**
* Disable {@link RequestContext} during operators.
*/
public static synchronized void disable() {
if (!enabled) {
return;
}
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
oldOnObservableAssembly = null;
RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
oldOnConnectableObservableAssembly = null;
RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
oldOnCompletableAssembly = null;
RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
oldOnSingleAssembly = null;
RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
oldOnMaybeAssembly = null;
RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
oldOnFlowableAssembly = null;
RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
oldOnConnectableFlowableAssembly = null;
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
oldOnParallelAssembly = null;
enabled = false;
}

private abstract static class ConditionalOnCurrentRequestContextFunction<T> implements Function<T, T> {
@Override
public final T apply(T t) {
return RequestContext.mapCurrent(requestContext -> applyActual(t, requestContext), () -> t);
}

abstract T applyActual(T t, RequestContext ctx);
}

private static <T> Function<? super T, ? extends T> compose(
@Nullable Function<? super T, ? extends T> before,
Function<? super T, ? extends T> after) {
if (before == null) {
return after;
}
return (T v) -> after.apply(before.apply(v));
}
}
Loading