Skip to content
This repository was archived by the owner on Sep 10, 2024. It is now read-only.

Take, once, until methods #71

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions reactive-core/src/main/scala/reactive/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ trait EventStream[+T] extends Foreachable[T] {
* be fired by the new EventStream.
*/
def filter(f: T => Boolean): EventStream[T]
/**
* Returns a new EventStream that propagates only first n events
* of this EventStream
* @param n number of first events to propagate
*/
def take(n: Int): EventStream[T]
/**
* Returns a new EventStream that propagates only first event
* of this EventStream
*/
def once: EventStream[T]
/**
* Returns a new EventStream that propagates this EventStream's events
* until the provided EventStream stream fires first event.
* @param stream the stream, that will shut down this EventStream
*/
def until(stream: EventStream[Any])(implicit observing: Observing): EventStream[T]
/**
* Filter and map in one step. Takes a PartialFunction.
* Whenever an event is received, if the PartialFunction
Expand Down Expand Up @@ -276,6 +293,29 @@ class EventSource[T] extends EventStream[T] with Logger {
next
}
}

class Take(n: Int) extends ChildEventSource[T, Int](0) {
override def debugName = "%s.take(%d)" format (EventSource.this.debugName, n)
def handler = (event, last) => {
if (last < n)
fire(event)
else
EventSource.this.removeListener(listener)
last + 1
}
}

class Until(stream: EventStream[Any], observing: Observing) extends ChildEventSource[T, Boolean](true) {
override def debugName = "%s.until(%s)" format (EventSource.this.debugName, stream)
stream.once.foreach(_ => state = false)(observing)
def handler = (event, _) => {
if (state)
fire(event)
else
EventSource.this.removeListener(listener)
state
}
}

class Collected[U](pf: PartialFunction[T, U]) extends ChildEventSource[U, Unit] {
override def debugName = "%s.collect(%s)" format (EventSource.this.debugName, pf)
Expand Down Expand Up @@ -355,6 +395,12 @@ class EventSource[T] extends EventStream[T] with Logger {
}

def collect[U](pf: PartialFunction[T, U]): EventStream[U] = new Collected(pf)

def once: EventStream[T] = new Take(1)

def take(n: Int) = new Take(n)

def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = new Until(es, observing)

def map[U](f: T => U): EventStream[U] = {
new ChildEventSource[U, Unit] {
Expand Down Expand Up @@ -565,6 +611,9 @@ trait EventStreamProxy[T] extends EventStream[T] {
def nonrecursive: EventStream[T] = underlying.nonrecursive
def distinct: EventStream[T] = underlying.distinct
def nonblocking: EventStream[T] = underlying.nonblocking
def once: EventStream[T] = underlying.once
def take(n: Int): EventStream[T] = underlying.take(n)
def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = underlying.until(es)(observing)
def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness
def throttle(period: Long): EventStream[T] = underlying.throttle(period)
private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f)
Expand Down
39 changes: 39 additions & 0 deletions reactive-core/src/test/scala/reactive/EventStreamTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,45 @@ class EventStreamTests extends FunSuite with ShouldMatchers with CollectEvents w
es fire 2
} should equal (List(2, 1))
}

test("take") {
val es = new EventSource[Int] {}
val n = 3
val take = es take n
collecting(take){
es fire 2
es fire 1
es fire 4
es fire 2
es fire 42
} should equal (List(2, 1, 4))
}

test("once") {
val es = new EventSource[Int] {}
val onced = es.once
collecting(onced){
es fire 2
es fire 1
es fire 4
es fire 2
es fire 42
} should equal (List(2))
}

test("until") {
val es = new EventSource[Int] {}
val killer = new EventSource[Unit] {}
val res = es.until(killer)
collecting(res){
es fire 2
es fire 1
es fire 4
killer fire()
es fire 2
es fire 42
} should equal (List(2, 1, 4))
}

test("foldLeft") {
val es = new EventSource[Int] {}
Expand Down
17 changes: 17 additions & 0 deletions reactive-web-demo/src/main/webapp/core/EventStream.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ <h3>Finer-grained lifetime control: <code>takeWhile</code></h3>

<button onclick="showdemo('EventStream_takeWhile')">Live Example</button>

<h3><code>take</code> and <code>once</code></h3>
<p>When you just want to receive only first <code>n</code> events
you can use <code>take</code> method. It takes <code>n</code> as parameter
and return you <code>EventStream</code>. You should think about this method
as the similar one in collection framework. </p>
<p> <code>once</code> is special case when you only need first event
of the <code>EventStream</code> and it actually equals <code>take(1)</code></p>

<h3><code>until</code></h3>
<p>In some cases you need to shut down one <code>EventStream</code>
by some event from another <code>EventStream</code>.
So lets think about it that way: You want to take all events until some other event happens.
In terms of reactive those events are provided by two <code>EventStream</code>s.
So for that case you can use <code>one.until(another)</code> method, where <code>another</code>
is <code>EventStream</code> that shut down <code>one</code> after its first event.
</p>

<h3>What's the point?</h3>
<p>At this point you may be wondering why go through all these
hoops? Why not use the regular listener pattern directly, like Swing
Expand Down