Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RxJava 3 #2501

Merged
merged 5 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ io.reactivex.rxjava2:
javadocs:
- http://reactivex.io/RxJava/2.x/javadoc/

io.reactivex.rxjava3:
rxjava:
version: '3.0.0'
javadocs:
- http://reactivex.io/RxJava/3.x/javadoc/

io.zipkin.brave:
brave:
# ':site:javadoc' fails when we use a newer version of Javadoc.
Expand Down
2 changes: 0 additions & 2 deletions examples/context-propagation/rxjava/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ plugins {
dependencies {
implementation project(':core')
implementation project(':rxjava')

implementation 'net.javacrumbs.future-converter:future-converter-rxjava2-java8'
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ public class Main {

public static void main(String[] args) {
final Server backend = Server.builder()
.service("/square/{num}", ((ctx, req) -> {
.service("/square/{num}", (ctx, req) -> {
final long num = Long.parseLong(ctx.pathParam("num"));
return HttpResponse.of(Long.toString(num * num));
}))
})
.http(8081)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import java.util.ArrayList;
import java.util.List;

import net.javacrumbs.futureconverter.java8rx2.FutureConverter;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -21,10 +19,10 @@
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.ServiceRequestContext;

import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class MainService implements HttpService {

Expand Down Expand Up @@ -60,7 +58,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
.flattenAsFlowable(l -> l);

final Flowable<Long> extractNumsFromRequest =
FutureConverter.toSingle(req.aggregate())
Single.fromCompletionStage(req.aggregate())
// Unless you know what you're doing, always use subscribeOn with the context
// executor to have the context mounted and stay on a single thread to reduce
// concurrency issues.
Expand Down Expand Up @@ -96,13 +94,13 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
checkState(ServiceRequestContext.current() == ctx);
checkState(ctx.eventLoop().inEventLoop());

return FutureConverter.toSingle(backendClient.get("/square/" + num).aggregate());
return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
})
.map(AggregatedHttpResponse::contentUtf8)
.collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
.map(content -> HttpResponse.of(content.toString()))
.onErrorReturn(HttpResponse::ofFailure);

return HttpResponse.from(FutureConverter.toCompletableFuture(response));
return HttpResponse.from(response.toCompletionStage());
}
}
2 changes: 1 addition & 1 deletion rxjava/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dependencies {
api 'io.reactivex.rxjava2:rxjava'
api 'io.reactivex.rxjava3:rxjava'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 LINE Corporation
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -16,24 +16,23 @@

package com.linecorp.armeria.common.rxjava;

import java.util.concurrent.Callable;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import com.linecorp.armeria.common.RequestContext;

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Function;
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.observables.ConnectableObservable;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Utility class to keep {@link RequestContext} during RxJava operations.
Expand Down Expand Up @@ -78,9 +77,6 @@ public final class RequestContextAssembly {
@GuardedBy("RequestContextAssembly.class")
private static boolean enabled;

private RequestContextAssembly() {
}

/**
* Enable {@link RequestContext} during operators.
*/
Expand All @@ -96,13 +92,13 @@ public static synchronized void enable() {
new ConditionalOnCurrentRequestContextFunction<Observable>() {
@Override
Observable applyActual(Observable o, RequestContext ctx) {
if (!(o instanceof Callable)) {
if (!(o instanceof Supplier)) {
return new RequestContextObservable(o, ctx);
}
if (o instanceof ScalarCallable) {
return new RequestContextScalarCallableObservable(o, ctx);
if (o instanceof ScalarSupplier) {
return new RequestContextScalarSupplierObservable(o, ctx);
}
return new RequestContextCallableObservable(o, ctx);
return new RequestContextSupplierObservable(o, ctx);
}
}));

Expand All @@ -121,16 +117,8 @@ ConnectableObservable applyActual(ConnectableObservable co, RequestContext ctx)
compose(oldOnCompletableAssembly,
new ConditionalOnCurrentRequestContextFunction<Completable>() {
@Override
Completable applyActual(Completable c,
RequestContext ctx) {
if (!(c instanceof Callable)) {
return new RequestContextCompletable(c, ctx);
}
if (c instanceof ScalarCallable) {
return new RequestContextScalarCallableCompletable(
c, ctx);
}
return new RequestContextCallableCompletable(c, ctx);
Completable applyActual(Completable c, RequestContext ctx) {
return new RequestContextCompletable(c, ctx);
}
}));

Expand All @@ -139,13 +127,13 @@ Completable applyActual(Completable c,
compose(oldOnSingleAssembly, new ConditionalOnCurrentRequestContextFunction<Single>() {
@Override
Single applyActual(Single s, RequestContext ctx) {
if (!(s instanceof Callable)) {
if (!(s instanceof Supplier)) {
return new RequestContextSingle(s, ctx);
}
if (s instanceof ScalarCallable) {
return new RequestContextScalarCallableSingle(s, ctx);
if (s instanceof ScalarSupplier) {
return new RequestContextScalarSupplierSingle(s, ctx);
}
return new RequestContextCallableSingle(s, ctx);
return new RequestContextSupplierSingle(s, ctx);
}
}));

Expand All @@ -154,13 +142,13 @@ Single applyActual(Single s, RequestContext ctx) {
compose(oldOnMaybeAssembly, new ConditionalOnCurrentRequestContextFunction<Maybe>() {
@Override
Maybe applyActual(Maybe m, RequestContext ctx) {
if (!(m instanceof Callable)) {
if (!(m instanceof Supplier)) {
return new RequestContextMaybe(m, ctx);
}
if (m instanceof ScalarCallable) {
return new RequestContextScalarCallableMaybe(m, ctx);
if (m instanceof ScalarSupplier) {
return new RequestContextScalarSupplierMaybe(m, ctx);
}
return new RequestContextCallableMaybe(m, ctx);
return new RequestContextSupplierMaybe(m, ctx);
}
}));

Expand All @@ -169,13 +157,13 @@ Maybe applyActual(Maybe m, RequestContext ctx) {
compose(oldOnFlowableAssembly, new ConditionalOnCurrentRequestContextFunction<Flowable>() {
@Override
Flowable applyActual(Flowable f, RequestContext ctx) {
if (!(f instanceof Callable)) {
if (!(f instanceof Supplier)) {
return new RequestContextFlowable(f, ctx);
}
if (f instanceof ScalarCallable) {
return new RequestContextScalarCallableFlowable(f, ctx);
if (f instanceof ScalarSupplier) {
return new RequestContextScalarSupplierFlowable(f, ctx);
}
return new RequestContextCallableFlowable(f, ctx);
return new RequestContextSupplierFlowable(f, ctx);
}
}));

Expand All @@ -184,11 +172,8 @@ Flowable applyActual(Flowable f, RequestContext ctx) {
compose(oldOnConnectableFlowableAssembly,
new ConditionalOnCurrentRequestContextFunction<ConnectableFlowable>() {
@Override
ConnectableFlowable applyActual(
ConnectableFlowable cf,
RequestContext ctx) {
return new RequestContextConnectableFlowable(
cf, ctx);
ConnectableFlowable applyActual(ConnectableFlowable cf, RequestContext ctx) {
return new RequestContextConnectableFlowable(cf, ctx);
}
}
));
Expand Down Expand Up @@ -232,6 +217,8 @@ public static synchronized void disable() {
enabled = false;
}

private RequestContextAssembly() {}

private abstract static class ConditionalOnCurrentRequestContextFunction<T> implements Function<T, T> {
@Override
public final T apply(T t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 LINE Corporation
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -19,9 +19,9 @@
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableSource;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.CompletableSource;

final class RequestContextCompletable extends Completable {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 LINE Corporation
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -19,9 +19,9 @@
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

final class RequestContextCompletableObserver implements CompletableObserver, Disposable {
private final CompletableObserver actual;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 LINE Corporation
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -19,9 +19,9 @@
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.fuseable.QueueSubscription;
import io.reactivex.internal.subscribers.BasicFuseableConditionalSubscriber;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.internal.fuseable.QueueSubscription;
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber;

final class RequestContextConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {

Expand Down Expand Up @@ -73,7 +73,7 @@ public int requestFusion(int mode) {
}

@Override
public T poll() throws Exception {
public T poll() throws Throwable {
return qs.poll();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 LINE Corporation
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -21,10 +21,10 @@
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;

final class RequestContextConnectableFlowable<T> extends ConnectableFlowable<T> {
private final ConnectableFlowable<T> source;
Expand All @@ -40,9 +40,8 @@ final class RequestContextConnectableFlowable<T> extends ConnectableFlowable<T>
protected void subscribeActual(Subscriber<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new RequestContextConditionalSubscriber<>(
(ConditionalSubscriber<? super T>) s, assemblyContext
));
source.subscribe(new RequestContextConditionalSubscriber<>((ConditionalSubscriber<? super T>) s,
assemblyContext));
} else {
source.subscribe(new RequestContextSubscriber<>(s, assemblyContext));
}
Expand All @@ -55,4 +54,9 @@ public void connect(Consumer<? super Disposable> connection) {
source.connect(connection);
}
}

@Override
public void reset() {
source.reset();
}
}
Loading