Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http (feature): Support reading Sever-Sent Event (SSE) in JVM http clients #3810

Merged
merged 6 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.http.netty

import wvlet.airframe.http.{Endpoint, Http, RxRouter}
import wvlet.airframe.http.HttpMessage.{Response, ServerSentEvent}
import wvlet.airframe.http.client.AsyncClient
import wvlet.airspec.AirSpec

class SSEApi {
@Endpoint(path = "/v1/sse")
def sse(): Response = {
Http
.response()
.withContentType("text/event-stream")
.withContent(s"""data: hello stream
|
|data: another stream message
|data: with two lines
|
|event: custom-event
|data: hello custom event
|
|: this is a comment
|
|id: 123
|data: hello again
|
|id: 1234
|event: custom-event
|data: hello again 2
|
|retry: 1000
|data: need to retry
|""".stripMargin)
}
}

class SSETest extends AirSpec {
initDesign {
_.add(
Netty.server
.withRouter(RxRouter.of[SSEApi])
.designWithAsyncClient
)
}

test("read sse events") { (client: AsyncClient) =>
val rx = client.send(
Http.GET("/v1/sse")
)
rx.map { resp =>
resp.statusCode shouldBe 200

val events = resp.events.map { e =>
debug(e)
e
}.toSeq

val expected = List(
ServerSentEvent(data = "hello stream"),
ServerSentEvent(data = "another stream message\nwith two lines"),
ServerSentEvent(event = Some("custom-event"), data = "hello custom event"),
ServerSentEvent(id = Some("123"), data = "hello again"),
ServerSentEvent(id = Some("1234"), event = Some("custom-event"), data = "hello again 2"),
ServerSentEvent(retry = Some(1000), data = "need to retry")
)

trace(events.mkString("\n"))
trace(expected.mkString("\n"))
events shouldBe expected
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@ package wvlet.airframe.http.client
import wvlet.airframe.control.Control.withResource
import wvlet.airframe.control.IO
import wvlet.airframe.http.*
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.rx.Rx
import wvlet.airframe.http.HttpMessage.{Request, Response, ServerSentEvent}
import wvlet.airframe.rx.{OnCompletion, OnError, OnNext, Rx, RxBlockingQueue}
import wvlet.log.LogSupport

import java.io.InputStream
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.URI
import java.net.http.HttpClient.{Redirect, Version}
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.util.function.Consumer
import java.util.zip.{GZIPInputStream, InflaterInputStream}
import scala.annotation.tailrec
import scala.jdk.CollectionConverters.*
import scala.util.Try

/**
* Http connection implementation using Http Client of Java 11
Expand Down Expand Up @@ -138,23 +140,120 @@ class JavaHttpClientChannel(val destination: ServerAddress, private[http] val co
h.result()
}

// Decompress contents
val body: Array[Byte] = withResource {
header.get(HttpHeader.ContentEncoding).map(_.toLowerCase()) match {
case Some("gzip") =>
new GZIPInputStream(httpResponse.body())
case Some("deflate") =>
new InflaterInputStream(httpResponse.body())
case _ =>
httpResponse.body()
val status = HttpStatus.ofCode(httpResponse.statusCode())
val isEventStream = header.get(HttpHeader.ContentType).exists(_.startsWith("text/event-stream"))
if (isEventStream) {
HttpMessage.Response(
status = status,
header = header,
events = readServerSentEventStream(httpResponse)
)
} else { // Decompress contents
val body: Array[Byte] = withResource {
header.get(HttpHeader.ContentEncoding).map(_.toLowerCase()) match {
case Some("gzip") =>
new GZIPInputStream(httpResponse.body())
case Some("deflate") =>
new InflaterInputStream(httpResponse.body())
case _ =>
httpResponse.body()
}
} { (in: InputStream) =>
IO.readFully(in)
}
} { (in: InputStream) =>
IO.readFully(in)
Http
.response(status)
.withHeader(header)
.withContent(HttpMessage.byteArrayMessage(body))
}
}

private def readServerSentEventStream(httpResponse: java.net.http.HttpResponse[InputStream]): Rx[ServerSentEvent] = {
// Create Rx[ServerSentEvent] for reading the event stream
val rx = new RxBlockingQueue[ServerSentEvent]()

// Read the event stream in a separate thread
val executor = compat.defaultExecutionContext
executor.execute(new Runnable {
override def run(): Unit = {
try {
withResource(new BufferedReader(new InputStreamReader(httpResponse.body()))) { reader =>
var id: Option[String] = None
var eventType: Option[String] = None
var retry: Option[Long] = None
val data = List.newBuilder[String]

def emit(): Unit = {
val eventData = data.result()
if (eventData.nonEmpty) {
val ev = ServerSentEvent(
id = id,
event = eventType,
retry = retry,
data = eventData.mkString("\n")
)
rx.add(OnNext(ev))
}

// clear the data
id = None
eventType = None
retry = None
data.clear()
}

@tailrec
def processLine(): Unit = {
val line = reader.readLine()
line match {
case null =>
emit()
// no more line
case l if l.isEmpty =>
emit()
processLine()
case l if l.startsWith(":") =>
// skip comments
processLine()
case _ =>
val kv = line.split(":", 2)
if (kv.length == 2) {
val key = kv(0).trim
val value = kv(1).trim
key match {
case "id" =>
id = Some(value)
case "event" =>
eventType = Some(value)
case "data" =>
data += value
case "retry" =>
retry = Try(value.toLong).toOption
case _ =>
// Ignore unknown fields
}
} else {
// Ignore invalid lines
// Send the last event {
emit()
}
processLine()
}
}

Http
.response(HttpStatus.ofCode(httpResponse.statusCode()))
.withHeader(header)
.withContent(HttpMessage.byteArrayMessage(body))
processLine()
}

// Report the completion
rx.add(OnCompletion)
} catch {
case e: Throwable =>
rx.add(OnError(e))
}
}
})

rx
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package wvlet.airframe.http
import wvlet.airframe.http.Http.formatInstant
import wvlet.airframe.http.HttpMessage.{Message, StringMessage}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.rx.Rx

import java.nio.charset.StandardCharsets
import java.time.Instant
Expand Down Expand Up @@ -278,7 +279,8 @@ object HttpMessage {
case class Response(
status: HttpStatus = HttpStatus.Ok_200,
header: HttpMultiMap = HttpMultiMap.empty,
message: Message = EmptyMessage
message: Message = EmptyMessage,
events: Rx[ServerSentEvent] = Rx.empty
) extends HttpMessage[Response] {
override def toString: String = s"Response(${status},${header})"

Expand Down Expand Up @@ -326,4 +328,13 @@ object HttpMessage {
override protected def adapter: HttpResponseAdapter[Response] = HttpMessageResponseAdapter
override def toRaw: Response = raw
}

case class ServerSentEvent(
id: Option[String] = None,
event: Option[String] = None,
retry: Option[Long] = None,
// event data string. If multiple data entries are reported, concatenated with newline
data: String
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ trait HttpChannel extends AutoCloseable {
/**
* Send an async request as is to the destination. Until the returned Rx is evaluated (e.g., by calling Rx.run), the
* request is not sent.
*
* For SSE (Server-Sent Events) requests, the returned [[Response.events]] will have an Rx stream of
* [[ServerSentEvent]]
* @param req
* @param channelConfig
* @return
*/
def sendAsync(req: Request, channelConfig: HttpChannelConfig): Rx[Response]

}
Loading