Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http (feature): Support adding HttpClientContext to Http client readAs, call, rpc methods #3864

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,24 @@ trait SyncClientCompat:
* @throws HttpClientException
* if failed to read or process the response
*/
inline def readAs[Resp](req: Request): Resp =
self.readAsInternal[Resp](req, Surface.of[Resp])
inline def readAs[Resp](req: Request, context: HttpClientContext = HttpClientContext.empty): Resp =
self.readAsInternal[Resp](req, Surface.of[Resp], context)

inline def call[Req, Resp](
req: Request,
requestContent: Req
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Resp =
self.callInternal[Req, Resp](req, Surface.of[Req], Surface.of[Resp], requestContent)
self.callInternal[Req, Resp](req, Surface.of[Req], Surface.of[Resp], requestContent, context)

trait AsyncClientCompat:
self: AsyncClient =>
inline def readAs[Resp](req: Request): Rx[Resp] =
self.readAsInternal[Resp](req, Surface.of[Resp])
inline def readAs[Resp](req: Request, context: HttpClientContext = HttpClientContext.empty): Rx[Resp] =
self.readAsInternal[Resp](req, Surface.of[Resp], context)

inline def call[Req, Resp](
req: Request,
requestContent: Req
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Rx[Resp] =
self.callInternal[Req, Resp](req, Surface.of[Req], Surface.of[Resp], requestContent)
self.callInternal[Req, Resp](req, Surface.of[Req], Surface.of[Resp], requestContent, context)
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient]
var lastResponse: Option[Response] = None
// Build a chain of request filters
def requestPipeline =
loggingFilter(context)
loggingFilter(context.withClientName(config.name))
.andThen { req =>
channel
.sendAsync(req, config)
Expand Down Expand Up @@ -97,9 +97,10 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient]

def readAsInternal[Resp](
req: Request,
responseSurface: Surface
responseSurface: Surface,
context: HttpClientContext = HttpClientContext.empty
): Rx[Resp] = {
send(req).toRx.map { resp =>
send(req, context).toRx.map { resp =>
HttpClients.parseResponse[Resp](config, responseSurface, resp)
}
}
Expand All @@ -108,12 +109,13 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient]
req: Request,
requestSurface: Surface,
responseSurface: Surface,
requestContent: Req
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Rx[Resp] = {
Rx
.const(HttpClients.prepareRequest(config, req, requestSurface, requestContent))
.flatMap { (newRequest: Request) =>
send(newRequest, HttpClientContext(config.name)).toRx.map { resp =>
send(newRequest, context).toRx.map { resp =>
HttpClients.parseResponse[Resp](config, responseSurface, resp)
}
}
Expand All @@ -129,17 +131,17 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient]
*/
def rpc[Req, Resp](
method: RPCMethod,
requestContent: Req
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Rx[Resp] = {
Rx
.const(HttpClients.prepareRPCRequest(config, method.path, method.requestSurface, requestContent))
.flatMap { (request: Request) =>
val context = HttpClientContext(
clientName = config.name,
val ctx = context.copy(
rpcMethod = Some(method),
rpcInput = Some(requestContent)
)
sendSafe(request, context).toRx
sendSafe(request, ctx).toRx
.map { (response: Response) =>
if (response.status.isSuccessful) {
val ret = HttpClients.parseRPCResponse(config, response, method.responseSurface)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ case class HttpClientContext(
rpcInput: Option[Any] = None,
// Extra parameters used for logging
logParameters: Map[String, Any] = Map.empty
)
) {
def withClientName(name: String): HttpClientContext = {
this.copy(clientName = name)
}
}

object HttpClientContext {
object empty extends HttpClientContext("default", None, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit

// Build a chain of request filters
def requestPipeline: RxHttpEndpoint = {
loggingFilter(context)
loggingFilter(context.withClientName(config.name))
.andThen { req =>
Rx.single(channel.send(req, config))
.tap { resp =>
Expand Down Expand Up @@ -102,20 +102,22 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit

def readAsInternal[Resp](
req: Request,
responseSurface: Surface
responseSurface: Surface,
context: HttpClientContext = HttpClientContext.empty
): Resp = {
val resp: Response = send(req, HttpClientContext(config.name))
val resp: Response = send(req, context)
HttpClients.parseResponse[Resp](config, responseSurface, resp)
}

def callInternal[Req, Resp](
req: Request,
requestSurface: Surface,
responseSurface: Surface,
requestContent: Req
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Resp = {
val newRequest = HttpClients.prepareRequest(config, req, requestSurface, requestContent)
val resp: Response = send(newRequest, HttpClientContext(config.name))
val resp: Response = send(newRequest, context)
HttpClients.parseResponse[Resp](config, responseSurface, resp)
}

Expand All @@ -129,17 +131,20 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit
* @throws RPCException
* when RPC request fails
*/
def rpc[Req, Resp](method: RPCMethod, requestContent: Req): Resp = {
def rpc[Req, Resp](
method: RPCMethod,
requestContent: Req,
context: HttpClientContext = HttpClientContext.empty
): Resp = {
val request: Request =
HttpClients.prepareRPCRequest(config, method.path, method.requestSurface, requestContent)

val context = HttpClientContext(
clientName = config.name,
val ctx = context.copy(
rpcMethod = Some(method),
rpcInput = Some(requestContent)
)
// sendSafe method internally handles retries and HttpClientException, and then it returns the last response
val response: Response = sendSafe(request, context = context)
val response: Response = sendSafe(request, context = ctx)

// Parse the RPC response
if (response.status.isSuccessful) {
Expand Down
Loading