Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx (feature): Add Rx.queue[A] for accumulating asynchronous events #3814

Merged
merged 6 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import wvlet.airframe.http.HttpMessage.{
StringMessage
}
import wvlet.airframe.http.{Compat, HttpMessage, HttpMethod, HttpMultiMap, HttpStatus, ServerAddress}
import wvlet.airframe.rx.{Rx, RxVar}
import wvlet.airframe.rx.{OnNext, Rx, RxSource, RxVar}
import wvlet.log.LogSupport

import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -119,7 +119,7 @@ class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) e
val decoder = js.Dynamic.newInstance(js.Dynamic.global.TextDecoder)("utf-8")
val decoderOptions = js.Dynamic.literal(stream = true)

val rx: RxVar[Option[ServerSentEvent]] = Rx.variable[Option[ServerSentEvent]](None)
val rx: RxSource[ServerSentEvent] = Rx.queue[ServerSentEvent]()

def process(): Future[Unit] = {
var id: Option[String] = None
Expand All @@ -136,7 +136,7 @@ class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) e
retry = retry,
data = eventData.mkString("\n")
)
rx := Some(ev)
rx.add(OnNext(ev))
}

id = None
Expand Down Expand Up @@ -197,7 +197,7 @@ class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) e
}

process()
rx.filter(_.isDefined).map(_.get)
rx
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ class JavaHttpClientChannel(val destination: ServerAddress, private[http] val co
}

private def readServerSentEventStream(httpResponse: java.net.http.HttpResponse[InputStream]): Rx[ServerSentEvent] = {
// Create Rx[ServerSentEvent] for reading the event stream
val rx = new RxBlockingQueue[ServerSentEvent]()

// Read the event stream in a separate thread
val executor = compat.defaultExecutionContext

// Create Rx[ServerSentEvent] for reading the event stream
val rx = new RxBlockingQueue[ServerSentEvent]()
executor.execute(new Runnable {
override def run(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RxBlockingQueue[A] extends RxSource[A] {
override def add(event: RxEvent): Unit = {
blockingQueue.add(event)
}
override def next: RxEvent = {
blockingQueue.take()
override def next: Rx[RxEvent] = {
Rx.const(blockingQueue.take())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ class RxBlockingQueue[A] extends RxSource[A]:

override def add(event: RxEvent): Unit =
blockingQueue.add(event)
override def next: RxEvent =
blockingQueue.take()
override def next: Rx[RxEvent] =
Rx.const(blockingQueue.take())
5 changes: 4 additions & 1 deletion airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,8 @@ object Rx extends LogSupport {
}

/**
* Create a lazily evaluated single value
* Create a lazily evaluated single value. A difference from Rx.const is that the value will be evaluated only when
* the value is requested.
*/
def single[A](v: => A): Rx[A] = SingleOp(LazyF0.apply(v))
def exception[A](e: Throwable): Rx[A] = fromTry(Failure[A](e))
Expand All @@ -642,6 +643,8 @@ object Rx extends LogSupport {
def some[A](v: => A): RxOption[A] = option(Some(v))
val none: RxOption[Nothing] = option(None)

def queue[A](): RxSource[A] = new RxQueue[A]

def join[A, B](a: RxOps[A], b: RxOps[B]): Rx[(A, B)] = JoinOp(a, b)
def join[A, B, C](a: RxOps[A], b: RxOps[B], c: RxOps[C]): Rx[(A, B, C)] = Join3Op(a, b, c)
def join[A, B, C, D](a: RxOps[A], b: RxOps[B], c: RxOps[C], d: RxOps[D]): Rx[(A, B, C, D)] = Join4Op(a, b, c, d)
Expand Down
59 changes: 59 additions & 0 deletions airframe-rx/src/main/scala/wvlet/airframe/rx/RxQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.rx

import scala.concurrent.{ExecutionContext, Promise}

/**
* JVM/JS compatible RxQueue implementation
* @tparam A
*/
class RxQueue[A] extends RxSource[A] {
private implicit val ec: ExecutionContext = compat.defaultExecutionContext
override def parents: Seq[Rx[_]] = Seq.empty

private var queue = scala.collection.immutable.Queue.empty[RxEvent]
private var waiting: Option[Promise[RxEvent]] = None

override def add(event: RxEvent): Unit = {
synchronized {
queue = queue.enqueue(event)
waiting match {
case Some(p) =>
if (queue.nonEmpty) {
waiting = None
val (e, newQueue) = queue.dequeue
queue = newQueue
p.success(e)
}
case None =>
// do nothing if there is no waiting promise
}
}
}

override def next: Rx[RxEvent] = {
synchronized {
if (queue.nonEmpty) {
val (e, newQueue) = queue.dequeue
queue = newQueue
Rx.const(e)
} else {
val p = Promise[RxEvent]()
waiting = Some(p)
Rx.future(p.future)
}
}
}
}
34 changes: 24 additions & 10 deletions airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -545,26 +545,40 @@ class RxRunner(
lastResult = RxResult.Stop
}
case source: RxSource[_] =>
var toContinue = true
var toContinue = true
var c1: Cancelable = Cancelable.empty
@tailrec
def loop: Unit = {
if (continuous || toContinue) {
val ev = source.next
ev match {
case OnNext(_) =>
effect(ev)
loop
val evRx = source.next
c1.cancel
c1 = run(evRx) {
case OnNext(ev: RxEvent) =>
ev match {
case OnNext(v) =>
effect(OnNext(v.asInstanceOf[A]))
case other =>
toContinue = false
effect(other)
}
case OnCompletion =>
// ok. Successfully received a single event from the source
RxResult.Continue
case other =>
toContinue = false
effect(other)
}
loop
}
}
loop
Cancelable { () =>
toContinue = false
source.add(OnError(new InterruptedException("cancelled")))
}
Cancelable.merge(
c1,
Cancelable { () =>
toContinue = false
source.add(OnError(new InterruptedException("cancelled")))
}
)
}
}

Expand Down
3 changes: 2 additions & 1 deletion airframe-rx/src/main/scala/wvlet/airframe/rx/RxSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ package wvlet.airframe.rx
*/
trait RxSource[A] extends Rx[A] {
def add(ev: RxEvent): Unit
def next: RxEvent
def next: Rx[RxEvent]
def stop(): Unit = add(OnCompletion)
}
Loading