Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http (feature): Support SSE text/event-stream in Scala.js #3812

Merged
merged 2 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
package wvlet.airframe.http.client

import org.scalajs.dom.{Headers, RequestRedirect}
import wvlet.airframe.http.HttpMessage.{ByteArrayMessage, EmptyMessage, Request, Response, StringMessage}
import wvlet.airframe.http.HttpMessage.{
ByteArrayMessage,
EmptyMessage,
Request,
Response,
ServerSentEvent,
StringMessage
}
import wvlet.airframe.http.{Compat, HttpMessage, HttpMethod, HttpMultiMap, HttpStatus, ServerAddress}
import wvlet.airframe.rx.Rx
import wvlet.airframe.rx.{Rx, RxVar}
import wvlet.log.LogSupport

import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js
import scala.scalajs.js.JSConverters.*
import scala.scalajs.js.typedarray.Uint8Array
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -94,6 +102,9 @@ class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) e
resp.text().toFuture.map { body =>
r.withContent(body)
}
} else if (r.isContentTypeEventStream) {
val events = readStream(resp)
Future.apply(r.copy(events = events))
} else {
resp.arrayBuffer().toFuture.map { body =>
r.withContent(new Int8Array(body).toArray)
Expand All @@ -104,4 +115,89 @@ class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) e
Rx.future(future)
}

private def readStream(resp: org.scalajs.dom.Response): Rx[ServerSentEvent] = {
val decoder = js.Dynamic.newInstance(js.Dynamic.global.TextDecoder)("utf-8")
val decoderOptions = js.Dynamic.literal(stream = true)

val rx: RxVar[Option[ServerSentEvent]] = Rx.variable[Option[ServerSentEvent]](None)

def process(): Future[Unit] = {
var id: Option[String] = None
var event: Option[String] = None
var retry: Option[Long] = None
val data = List.newBuilder[String]

def emit(): Unit = {
val eventData = data.result()
if (eventData.nonEmpty) {
val ev = ServerSentEvent(
id = id,
event = event,
retry = retry,
data = eventData.mkString("\n")
)
rx := Some(ev)
}

id = None
event = None
retry = None
data.clear()
}

def processLine(line: String): Unit = {
line match {
case null =>
emit()
case l if l.isEmpty() =>
emit()
case l if l.startsWith(":") =>
// Skip comments
case _ =>
val kv = line.split(":", 2)
if (kv.length == 2) {
val key = kv(0).trim
val value = kv(1).trim
key match {
case "id" =>
id = Some(value)
case "event" =>
event = Some(value)
case "retry" =>
retry = Try(value.toLong).toOption
case "data" =>
data += value
case _ =>
// Ignore unknown fields
}
} else {
// Ignore invalid line
emit()
}
}
}

resp.body.getReader().read().toFuture.flatMap { result =>
if (result.done) {
emit()
rx.stop()
Future.unit
} else {
val arr: Uint8Array = result.value
val buf: String = decoder.decode(arr, decoderOptions).asInstanceOf[String]
val lines = buf.split("\n")
lines.foreach { line =>
processLine(line)
}

// Continue reading the next chunk
process()
}
}
}

process()
rx.filter(_.isDefined).map(_.get)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package wvlet.airframe.http
import wvlet.airframe.http.Http.formatInstant
import wvlet.airframe.http.HttpMessage.{Message, StringMessage}
import wvlet.airframe.http.HttpMessage.{Message, ServerSentEvent, StringMessage}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.rx.Rx

Expand Down Expand Up @@ -125,6 +125,10 @@ trait HttpMessage[Raw] extends HttpMessageBase[Raw] {
def isContentTypeMsgPack: Boolean = {
contentType.exists(x => x == HttpHeader.MediaType.ApplicationMsgPack || x == "application/x-msgpack")
}
def isContentTypeEventStream: Boolean = {
contentType.exists(x => x.startsWith("text/event-stream"))
}

def acceptsJson: Boolean = {
accept.exists(x => x == HttpHeader.MediaType.ApplicationJson || x.startsWith("application/json"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FetchTest extends AirSpec:
.newAsyncClient(PUBLIC_REST_SERVICE)
}

test("js http sync client") { (client: AsyncClient) =>
test("js http async client") { (client: AsyncClient) =>
test("GET") {
client
.send(Http.GET("/posts/1"))
Expand Down Expand Up @@ -84,4 +84,5 @@ class FetchTest extends AirSpec:
fail(s"should not reach here")
}
}

}
Loading