diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index acbd48c0..b5b11525 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -78,6 +78,23 @@ trait EventStream[+T] extends Foreachable[T] { * be fired by the new EventStream. */ def filter(f: T => Boolean): EventStream[T] + /** + * Returns a new EventStream that propagates only first n events + * of this EventStream + * @param n number of first events to propagate + */ + def take(n: Int): EventStream[T] + /** + * Returns a new EventStream that propagates only first event + * of this EventStream + */ + def once: EventStream[T] + /** + * Returns a new EventStream that propagates this EventStream's events + * until the provided EventStream stream fires first event. + * @param stream the stream, that will shut down this EventStream + */ + def until(stream: EventStream[Any])(implicit observing: Observing): EventStream[T] /** * Filter and map in one step. Takes a PartialFunction. * Whenever an event is received, if the PartialFunction @@ -276,6 +293,29 @@ class EventSource[T] extends EventStream[T] with Logger { next } } + + class Take(n: Int) extends ChildEventSource[T, Int](0) { + override def debugName = "%s.take(%d)" format (EventSource.this.debugName, n) + def handler = (event, last) => { + if (last < n) + fire(event) + else + EventSource.this.removeListener(listener) + last + 1 + } + } + + class Until(stream: EventStream[Any], observing: Observing) extends ChildEventSource[T, Boolean](true) { + override def debugName = "%s.until(%s)" format (EventSource.this.debugName, stream) + stream.once.foreach(_ => state = false)(observing) + def handler = (event, _) => { + if (state) + fire(event) + else + EventSource.this.removeListener(listener) + state + } + } class Collected[U](pf: PartialFunction[T, U]) extends ChildEventSource[U, Unit] { override def debugName = "%s.collect(%s)" format (EventSource.this.debugName, pf) @@ -355,6 +395,12 @@ class EventSource[T] extends EventStream[T] with Logger { } def collect[U](pf: PartialFunction[T, U]): EventStream[U] = new Collected(pf) + + def once: EventStream[T] = new Take(1) + + def take(n: Int) = new Take(n) + + def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = new Until(es, observing) def map[U](f: T => U): EventStream[U] = { new ChildEventSource[U, Unit] { @@ -565,6 +611,9 @@ trait EventStreamProxy[T] extends EventStream[T] { def nonrecursive: EventStream[T] = underlying.nonrecursive def distinct: EventStream[T] = underlying.distinct def nonblocking: EventStream[T] = underlying.nonblocking + def once: EventStream[T] = underlying.once + def take(n: Int): EventStream[T] = underlying.take(n) + def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = underlying.until(es)(observing) def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness def throttle(period: Long): EventStream[T] = underlying.throttle(period) private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f) diff --git a/reactive-core/src/test/scala/reactive/EventStreamTests.scala b/reactive-core/src/test/scala/reactive/EventStreamTests.scala index d52b8680..8c643f74 100644 --- a/reactive-core/src/test/scala/reactive/EventStreamTests.scala +++ b/reactive-core/src/test/scala/reactive/EventStreamTests.scala @@ -113,6 +113,45 @@ class EventStreamTests extends FunSuite with ShouldMatchers with CollectEvents w es fire 2 } should equal (List(2, 1)) } + + test("take") { + val es = new EventSource[Int] {} + val n = 3 + val take = es take n + collecting(take){ + es fire 2 + es fire 1 + es fire 4 + es fire 2 + es fire 42 + } should equal (List(2, 1, 4)) + } + + test("once") { + val es = new EventSource[Int] {} + val onced = es.once + collecting(onced){ + es fire 2 + es fire 1 + es fire 4 + es fire 2 + es fire 42 + } should equal (List(2)) + } + + test("until") { + val es = new EventSource[Int] {} + val killer = new EventSource[Unit] {} + val res = es.until(killer) + collecting(res){ + es fire 2 + es fire 1 + es fire 4 + killer fire() + es fire 2 + es fire 42 + } should equal (List(2, 1, 4)) + } test("foldLeft") { val es = new EventSource[Int] {} diff --git a/reactive-web-demo/src/main/webapp/core/EventStream.html b/reactive-web-demo/src/main/webapp/core/EventStream.html index 0c6dbcba..196f0016 100644 --- a/reactive-web-demo/src/main/webapp/core/EventStream.html +++ b/reactive-web-demo/src/main/webapp/core/EventStream.html @@ -104,6 +104,23 @@
takeWhile
take
and once
When you just want to receive only first n
events
+you can use take
method. It takes n
as parameter
+and return you EventStream
. You should think about this method
+as the similar one in collection framework.
once
is special case when you only need first event
+of the EventStream
and it actually equals take(1)
until
In some cases you need to shut down one EventStream
+by some event from another EventStream
.
+So lets think about it that way: You want to take all events until some other event happens.
+In terms of reactive those events are provided by two EventStream
s.
+So for that case you can use one.until(another)
method, where another
+is EventStream
that shut down one
after its first event.
+
At this point you may be wondering why go through all these hoops? Why not use the regular listener pattern directly, like Swing