1
1
/*
2
- * Copyright 2018 LINE Corporation
2
+ * Copyright 2020 LINE Corporation
3
3
*
4
4
* LINE Corporation licenses this file to you under the Apache License,
5
5
* version 2.0 (the "License"); you may not use this file except in compliance
16
16
17
17
package com .linecorp .armeria .common .rxjava ;
18
18
19
- import java .util .concurrent .Callable ;
20
-
21
19
import javax .annotation .Nullable ;
22
20
import javax .annotation .concurrent .GuardedBy ;
23
21
24
22
import com .linecorp .armeria .common .RequestContext ;
25
23
26
- import io .reactivex .Completable ;
27
- import io .reactivex .Flowable ;
28
- import io .reactivex .Maybe ;
29
- import io .reactivex .Observable ;
30
- import io .reactivex .Single ;
31
- import io .reactivex .flowables .ConnectableFlowable ;
32
- import io .reactivex .functions .Function ;
33
- import io .reactivex .internal .fuseable .ScalarCallable ;
34
- import io .reactivex .observables .ConnectableObservable ;
35
- import io .reactivex .parallel .ParallelFlowable ;
36
- import io .reactivex .plugins .RxJavaPlugins ;
24
+ import io .reactivex .rxjava3 .core .Completable ;
25
+ import io .reactivex .rxjava3 .core .Flowable ;
26
+ import io .reactivex .rxjava3 .core .Maybe ;
27
+ import io .reactivex .rxjava3 .core .Observable ;
28
+ import io .reactivex .rxjava3 .core .Single ;
29
+ import io .reactivex .rxjava3 .flowables .ConnectableFlowable ;
30
+ import io .reactivex .rxjava3 .functions .Function ;
31
+ import io .reactivex .rxjava3 .functions .Supplier ;
32
+ import io .reactivex .rxjava3 .internal .fuseable .ScalarSupplier ;
33
+ import io .reactivex .rxjava3 .observables .ConnectableObservable ;
34
+ import io .reactivex .rxjava3 .parallel .ParallelFlowable ;
35
+ import io .reactivex .rxjava3 .plugins .RxJavaPlugins ;
37
36
38
37
/**
39
38
* Utility class to keep {@link RequestContext} during RxJava operations.
@@ -78,9 +77,6 @@ public final class RequestContextAssembly {
78
77
@ GuardedBy ("RequestContextAssembly.class" )
79
78
private static boolean enabled ;
80
79
81
- private RequestContextAssembly () {
82
- }
83
-
84
80
/**
85
81
* Enable {@link RequestContext} during operators.
86
82
*/
@@ -96,13 +92,13 @@ public static synchronized void enable() {
96
92
new ConditionalOnCurrentRequestContextFunction <Observable >() {
97
93
@ Override
98
94
Observable applyActual (Observable o , RequestContext ctx ) {
99
- if (!(o instanceof Callable )) {
95
+ if (!(o instanceof Supplier )) {
100
96
return new RequestContextObservable (o , ctx );
101
97
}
102
- if (o instanceof ScalarCallable ) {
103
- return new RequestContextScalarCallableObservable (o , ctx );
98
+ if (o instanceof ScalarSupplier ) {
99
+ return new RequestContextScalarSupplierObservable (o , ctx );
104
100
}
105
- return new RequestContextCallableObservable (o , ctx );
101
+ return new RequestContextSupplierObservable (o , ctx );
106
102
}
107
103
}));
108
104
@@ -121,16 +117,8 @@ ConnectableObservable applyActual(ConnectableObservable co, RequestContext ctx)
121
117
compose (oldOnCompletableAssembly ,
122
118
new ConditionalOnCurrentRequestContextFunction <Completable >() {
123
119
@ Override
124
- Completable applyActual (Completable c ,
125
- RequestContext ctx ) {
126
- if (!(c instanceof Callable )) {
127
- return new RequestContextCompletable (c , ctx );
128
- }
129
- if (c instanceof ScalarCallable ) {
130
- return new RequestContextScalarCallableCompletable (
131
- c , ctx );
132
- }
133
- return new RequestContextCallableCompletable (c , ctx );
120
+ Completable applyActual (Completable c , RequestContext ctx ) {
121
+ return new RequestContextCompletable (c , ctx );
134
122
}
135
123
}));
136
124
@@ -139,13 +127,13 @@ Completable applyActual(Completable c,
139
127
compose (oldOnSingleAssembly , new ConditionalOnCurrentRequestContextFunction <Single >() {
140
128
@ Override
141
129
Single applyActual (Single s , RequestContext ctx ) {
142
- if (!(s instanceof Callable )) {
130
+ if (!(s instanceof Supplier )) {
143
131
return new RequestContextSingle (s , ctx );
144
132
}
145
- if (s instanceof ScalarCallable ) {
146
- return new RequestContextScalarCallableSingle (s , ctx );
133
+ if (s instanceof ScalarSupplier ) {
134
+ return new RequestContextScalarSupplierSingle (s , ctx );
147
135
}
148
- return new RequestContextCallableSingle (s , ctx );
136
+ return new RequestContextSupplierSingle (s , ctx );
149
137
}
150
138
}));
151
139
@@ -154,13 +142,13 @@ Single applyActual(Single s, RequestContext ctx) {
154
142
compose (oldOnMaybeAssembly , new ConditionalOnCurrentRequestContextFunction <Maybe >() {
155
143
@ Override
156
144
Maybe applyActual (Maybe m , RequestContext ctx ) {
157
- if (!(m instanceof Callable )) {
145
+ if (!(m instanceof Supplier )) {
158
146
return new RequestContextMaybe (m , ctx );
159
147
}
160
- if (m instanceof ScalarCallable ) {
161
- return new RequestContextScalarCallableMaybe (m , ctx );
148
+ if (m instanceof ScalarSupplier ) {
149
+ return new RequestContextScalarSupplierMaybe (m , ctx );
162
150
}
163
- return new RequestContextCallableMaybe (m , ctx );
151
+ return new RequestContextSupplierMaybe (m , ctx );
164
152
}
165
153
}));
166
154
@@ -169,13 +157,13 @@ Maybe applyActual(Maybe m, RequestContext ctx) {
169
157
compose (oldOnFlowableAssembly , new ConditionalOnCurrentRequestContextFunction <Flowable >() {
170
158
@ Override
171
159
Flowable applyActual (Flowable f , RequestContext ctx ) {
172
- if (!(f instanceof Callable )) {
160
+ if (!(f instanceof Supplier )) {
173
161
return new RequestContextFlowable (f , ctx );
174
162
}
175
- if (f instanceof ScalarCallable ) {
176
- return new RequestContextScalarCallableFlowable (f , ctx );
163
+ if (f instanceof ScalarSupplier ) {
164
+ return new RequestContextScalarSupplierFlowable (f , ctx );
177
165
}
178
- return new RequestContextCallableFlowable (f , ctx );
166
+ return new RequestContextSupplierFlowable (f , ctx );
179
167
}
180
168
}));
181
169
@@ -184,11 +172,8 @@ Flowable applyActual(Flowable f, RequestContext ctx) {
184
172
compose (oldOnConnectableFlowableAssembly ,
185
173
new ConditionalOnCurrentRequestContextFunction <ConnectableFlowable >() {
186
174
@ Override
187
- ConnectableFlowable applyActual (
188
- ConnectableFlowable cf ,
189
- RequestContext ctx ) {
190
- return new RequestContextConnectableFlowable (
191
- cf , ctx );
175
+ ConnectableFlowable applyActual (ConnectableFlowable cf , RequestContext ctx ) {
176
+ return new RequestContextConnectableFlowable (cf , ctx );
192
177
}
193
178
}
194
179
));
@@ -232,6 +217,8 @@ public static synchronized void disable() {
232
217
enabled = false ;
233
218
}
234
219
220
+ private RequestContextAssembly () {}
221
+
235
222
private abstract static class ConditionalOnCurrentRequestContextFunction <T > implements Function <T , T > {
236
223
@ Override
237
224
public final T apply (T t ) {
0 commit comments