Skip to content

Commit dccb460

Browse files
authored
airframe-rpc: Add RPC status code (#1973)
* Define standard RPC status code * Add gRPC status code * Add PackSupport to serialize error code * Add HttpStatus -> GrpcStatus mapping * Fix gRPC status mapping
1 parent 27cb12c commit dccb460

File tree

22 files changed

+1144
-45
lines changed

22 files changed

+1144
-45
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package wvlet.airframe.codec
15+
16+
import wvlet.airframe.msgpack.spi.Packer
17+
18+
/**
19+
* If a class is extending PackSupport, pack(Packer) method will be used for serializing (packing) this object.
20+
*
21+
* For deserializing (unpacking) data of type A, define a package method unapply(string:String): Option[A].
22+
*
23+
* In future, we may support unapply(u:Unpacker): Option[A] method
24+
*/
25+
trait PackSupport {
26+
def pack(packer: Packer): Unit
27+
}

airframe-codec/src/main/scala/wvlet/airframe/codec/PrimitiveCodec.scala

+14-13
Original file line numberDiff line numberDiff line change
@@ -921,19 +921,20 @@ object PrimitiveCodec {
921921
v match {
922922
case null => p.packNil
923923
// Primitive types
924-
case v: String => StringCodec.pack(p, v)
925-
case v: Boolean => BooleanCodec.pack(p, v)
926-
case v: Int => IntCodec.pack(p, v)
927-
case v: Long => LongCodec.pack(p, v)
928-
case v: Float => FloatCodec.pack(p, v)
929-
case v: Double => DoubleCodec.pack(p, v)
930-
case v: Byte => ByteCodec.pack(p, v)
931-
case v: Short => ShortCodec.pack(p, v)
932-
case v: Char => CharCodec.pack(p, v)
933-
case v: JSONValue => JSONValueCodec.pack(p, v)
934-
case v: Value => ValueCodec.pack(p, v)
935-
case v: Instant => p.packTimestamp(v)
936-
case v: ULID => ULIDCodec.pack(p, v)
924+
case v: String => StringCodec.pack(p, v)
925+
case v: Boolean => BooleanCodec.pack(p, v)
926+
case v: Int => IntCodec.pack(p, v)
927+
case v: Long => LongCodec.pack(p, v)
928+
case v: Float => FloatCodec.pack(p, v)
929+
case v: Double => DoubleCodec.pack(p, v)
930+
case v: Byte => ByteCodec.pack(p, v)
931+
case v: Short => ShortCodec.pack(p, v)
932+
case v: Char => CharCodec.pack(p, v)
933+
case v: JSONValue => JSONValueCodec.pack(p, v)
934+
case v: Value => ValueCodec.pack(p, v)
935+
case v: Instant => p.packTimestamp(v)
936+
case ps: PackSupport => ps.pack(p)
937+
case v: ULID => ULIDCodec.pack(p, v)
937938
// Arrays
938939
case v: Array[String] => StringArrayCodec.pack(p, v)
939940
case v: Array[Boolean] => BooleanArrayCodec.pack(p, v)

airframe-config/src/main/scala/wvlet/airframe/config/YamlReader.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class YamlReader(map: Map[AnyRef, AnyRef]) extends LogSupport {
148148
pack(packer, k)
149149
pack(packer, v)
150150
}
151-
case c if ReflectTypeUtil.isJavaColleciton(c.getClass) =>
151+
case c if ReflectTypeUtil.isJavaCollection(c.getClass) =>
152152
val cl = c.asInstanceOf[java.util.Collection[_]].asScala
153153
trace(s"pack collection (${cl.size})")
154154
packer.packArrayHeader(cl.size)

airframe-http-grpc/src/main/scala/wvlet/airframe/http/grpc/internal/GrpcException.scala

+20-24
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
*/
1414
package wvlet.airframe.http.grpc.internal
1515

16-
import io.grpc.{StatusException, StatusRuntimeException}
16+
import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException}
1717
import wvlet.airframe.codec.MessageCodecException
18-
import wvlet.airframe.http.{HttpServerException, HttpStatus}
18+
import wvlet.airframe.http.{GrpcStatus, HttpServerException, HttpStatus}
1919
import wvlet.log.LogSupport
2020

2121
import java.lang.reflect.InvocationTargetException
@@ -26,6 +26,8 @@ import scala.concurrent.ExecutionException
2626
*/
2727
object GrpcException extends LogSupport {
2828

29+
private[grpc] val rpcErrorKey = Metadata.Key.of[String]("airframe_rpc_error", Metadata.ASCII_STRING_MARSHALLER)
30+
2931
/**
3032
* Convert an exception to gRPC-specific exception types
3133
*
@@ -56,40 +58,34 @@ object GrpcException extends LogSupport {
5658
s
5759
case e: MessageCodecException =>
5860
io.grpc.Status.INTERNAL
59-
.withDescription(s"Failed to encode/decode data")
61+
.withDescription(s"Failed to encode/decode data: ${e.getMessage}")
6062
.withCause(e)
6163
.asRuntimeException()
6264
case e: IllegalArgumentException =>
6365
io.grpc.Status.INVALID_ARGUMENT
6466
.withCause(e)
67+
.withDescription(e.getMessage)
6568
.asRuntimeException()
6669
case e: UnsupportedOperationException =>
6770
io.grpc.Status.UNIMPLEMENTED
6871
.withCause(e)
72+
.withDescription(e.getMessage)
6973
.asRuntimeException()
7074
case e: HttpServerException =>
71-
val s = e.status match {
72-
case HttpStatus.BadRequest_400 =>
73-
io.grpc.Status.INVALID_ARGUMENT
74-
case HttpStatus.Unauthorized_401 =>
75-
io.grpc.Status.UNAUTHENTICATED
76-
case HttpStatus.Forbidden_403 =>
77-
io.grpc.Status.PERMISSION_DENIED
78-
case HttpStatus.NotFound_404 =>
79-
io.grpc.Status.UNIMPLEMENTED
80-
case HttpStatus.Conflict_409 =>
81-
io.grpc.Status.ALREADY_EXISTS
82-
case HttpStatus.TooManyRequests_429 | HttpStatus.BadGateway_502 | HttpStatus.ServiceUnavailable_503 |
83-
HttpStatus.GatewayTimeout_504 =>
84-
io.grpc.Status.UNAVAILABLE
85-
case s if s.isServerError =>
86-
io.grpc.Status.INTERNAL
87-
case s if s.isClientError =>
88-
io.grpc.Status.INVALID_ARGUMENT
89-
case other =>
90-
io.grpc.Status.UNKNOWN
75+
val grpcStatus = GrpcStatus.ofHttpStatus(e.status)
76+
val s = Status
77+
.fromCodeValue(grpcStatus.code)
78+
.withCause(e)
79+
.withDescription(e.getMessage)
80+
81+
if (e.message.nonEmpty) {
82+
val m = e.message
83+
val metadata = new Metadata()
84+
metadata.put[String](rpcErrorKey, s"${m.toContentString}")
85+
s.asRuntimeException(metadata)
86+
} else {
87+
s.asRuntimeException()
9188
}
92-
s.withCause(e).asRuntimeException()
9389
case other =>
9490
io.grpc.Status.INTERNAL
9591
.withCause(other)

airframe-http-grpc/src/main/scala/wvlet/airframe/http/grpc/internal/GrpcRequestHandler.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ class GrpcRequestHandler(
302302

303303
}
304304

305-
private[grpc] class RPCUnaryMethodHandler(rpcRequestHandler: GrpcRequestHandler) extends UnaryMethod[MsgPack, Any] {
305+
private[grpc] class RPCUnaryMethodHandler(rpcRequestHandler: GrpcRequestHandler)
306+
extends UnaryMethod[MsgPack, Any]
307+
with LogSupport {
306308
override def invoke(
307309
request: MsgPack,
308310
responseObserver: StreamObserver[Any]

airframe-http-grpc/src/main/scala/wvlet/airframe/http/grpc/internal/GrpcResponseHeaderInterceptor.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,6 @@ private[grpc] object GrpcResponseHeaderInterceptor extends ServerInterceptor {
4343
}
4444
},
4545
requestHeaders
46-
);
46+
)
4747
}
4848
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package wvlet.airframe.http.grpc
15+
16+
import io.grpc.Status
17+
import io.grpc.Status.Code
18+
import io.grpc.StatusRuntimeException
19+
import wvlet.airframe.http.Router
20+
import wvlet.airframe.http.grpc.GrpcErrorLogTest.DemoApiDebug
21+
import wvlet.airframe.http.grpc.example.DemoApi.DemoApiClient
22+
import wvlet.airframe.http.grpc.internal.GrpcException
23+
import wvlet.airspec.AirSpec
24+
25+
object GrpcErrorHandlingTest extends AirSpec {
26+
27+
protected override def design = {
28+
gRPC.server
29+
.withName("error-handling-test-api")
30+
.withRouter(Router.add[DemoApiDebug])
31+
.designWithChannel
32+
}
33+
34+
test("handle error") { (client: DemoApiClient) =>
35+
warn("Starting a gRPC error handling test")
36+
val ex = intercept[StatusRuntimeException] {
37+
client.error409Test
38+
}
39+
ex.getMessage.contains("409") shouldBe true
40+
ex.getStatus.isOk shouldBe false
41+
ex.getStatus.getCode shouldBe Code.ABORTED
42+
val trailers = Status.trailersFromThrowable(ex)
43+
val rpcError = trailers.get[String](GrpcException.rpcErrorKey)
44+
info(s"error trailer: ${rpcError}")
45+
}
46+
}

airframe-http-grpc/src/test/scala/wvlet/airframe/http/grpc/example/DemoApi.scala

+21-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import wvlet.airframe.codec.MessageCodecFactory
2020
import wvlet.airframe.http.grpc.internal.GrpcServiceBuilder
2121
import wvlet.airframe.http.grpc._
2222
import wvlet.airframe.http.router.Route
23-
import wvlet.airframe.http.{RPC, Router}
23+
import wvlet.airframe.http.{Http, HttpStatus, RPC, Router}
2424
import wvlet.airframe.msgpack.spi.MsgPack
2525
import wvlet.airframe.rx.{Rx, RxStream}
2626
import wvlet.log.LogSupport
@@ -61,6 +61,10 @@ trait DemoApi extends LogSupport {
6161
// do nothing
6262
debug(s"hello ${name}")
6363
}
64+
65+
def error409Test: String = {
66+
throw Http.serverException(HttpStatus.Conflict_409).withContent("test message")
67+
}
6468
}
6569

6670
object DemoApi {
@@ -79,6 +83,13 @@ object DemoApi {
7983
}
8084
}
8185

86+
/**
87+
* Manually build a gRPC client here as we can't use sbt-airframe.
88+
* @param channel
89+
* @param callOptions
90+
* @param codecFactory
91+
* @param encoding
92+
*/
8293
case class DemoApiClient(
8394
channel: Channel,
8495
callOptions: CallOptions = CallOptions.DEFAULT,
@@ -105,6 +116,8 @@ object DemoApi {
105116
GrpcServiceBuilder.buildMethodDescriptor(getRoute("helloOpt"), codecFactory)
106117
private val returnUnitMethodDescriptor =
107118
GrpcServiceBuilder.buildMethodDescriptor(getRoute("returnUnit"), codecFactory)
119+
private val errorTestMethodDescriptor =
120+
GrpcServiceBuilder.buildMethodDescriptor(getRoute("error409Test"), codecFactory)
108121

109122
def withEncoding(encoding: GrpcEncoding): DemoApiClient = {
110123
this.copy(encoding = encoding)
@@ -189,6 +202,13 @@ object DemoApi {
189202
.blockingUnaryCall(_channel, returnUnitMethodDescriptor, getCallOptions, encode(m))
190203
resp.asInstanceOf[Unit]
191204
}
205+
206+
def error409Test: String = {
207+
val resp = ClientCalls
208+
.blockingUnaryCall(_channel, errorTestMethodDescriptor, getCallOptions, encode(Map.empty))
209+
210+
resp.asInstanceOf[String]
211+
}
192212
}
193213

194214
}

0 commit comments

Comments
 (0)