From 1432e594a18590881879498578f4fd7bf28a8c00 Mon Sep 17 00:00:00 2001 From: Justin Woo Date: Sat, 12 Dec 2015 15:31:00 +0100 Subject: [PATCH] feat(Observable): add pairwise operator bring in pairwise operator from RxJS4 --- doc/index.md | 3 +- .../immediate-scheduler/operators/pairwise.js | 18 ++++ spec/operators/pairwise-spec.js | 88 +++++++++++++++++++ src/Rx.KitchenSink.ts | 2 + src/add/operator/pairwise.ts | 7 ++ src/operator/pairwise.ts | 38 ++++++++ 6 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 perf/micro/immediate-scheduler/operators/pairwise.js create mode 100644 spec/operators/pairwise-spec.js create mode 100644 src/add/operator/pairwise.ts create mode 100644 src/operator/pairwise.ts diff --git a/doc/index.md b/doc/index.md index 9cb5950785..e3602ca32a 100644 --- a/doc/index.md +++ b/doc/index.md @@ -57,6 +57,7 @@ - [mergeAll](function/index.html#static-function-mergeAll) - [multicast](function/index.html#static-function-multicast) - [observeOn](function/index.html#static-function-observeOn) +- [pairwise](function/index.html#static-function-pairwise) - [partition](function/index.html#static-function-partition) - [publish](function/index.html#static-function-publish) - [publishBehavior](function/index.html#static-function-publishBehavior) @@ -89,4 +90,4 @@ - [windowWhen](function/index.html#static-function-windowWhen) - [withLatestFrom](function/index.html#static-function-withLatestFrom) - [zip](function/index.html#static-function-zip) -- [zipAll](function/index.html#static-function-zipAll) \ No newline at end of file +- [zipAll](function/index.html#static-function-zipAll) diff --git a/perf/micro/immediate-scheduler/operators/pairwise.js b/perf/micro/immediate-scheduler/operators/pairwise.js new file mode 100644 index 0000000000..fb578b5cb5 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/pairwise.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldPairwiseWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).pairwise(); + var newPairwiseWithImmediateScheduler = RxNew.Observable.range(0, 25).pairwise(); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old pairwise with immediate scheduler', function () { + oldPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new pairwise with immediate scheduler', function () { + newPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/pairwise-spec.js b/spec/operators/pairwise-spec.js new file mode 100644 index 0000000000..f7fbc7db63 --- /dev/null +++ b/spec/operators/pairwise-spec.js @@ -0,0 +1,88 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.pairwise()', function () { + it('should pairwise things', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var e1subs = '^ !'; + var expected = '------v--w--x--y--z--|'; + + var values = { + v: ['b', 'c'], + w: ['c', 'd'], + x: ['d', 'e'], + y: ['e', 'f'], + z: ['f', 'g'] + }; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not emit on single-element streams', function () { + var e1 = hot('-----^--b----|'); + var e1subs = '^ !'; + var expected = '--------|'; + + var values = { + }; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle mid-stream throw', function () { + var e1 = hot('--a--^--b--c--d--e--#'); + var e1subs = '^ !'; + var expected = '------v--w--x--#'; + + var values = { + v: ['b', 'c'], + w: ['c', 'd'], + x: ['d', 'e'] + }; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle empty', function () { + var e1 = cold('|'); + var e1subs = '(^!)'; + var expected = '|'; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle never', function () { + var e1 = cold('-'); + var e1subs = '^'; + var expected = '-'; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle throw', function () { + var e1 = cold('#'); + var e1subs = '(^!)'; + var expected = '#'; + + var source = e1.pairwise(); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 168ec5c6f6..e8ab53e445 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -13,6 +13,7 @@ export interface KitchenSinkOperators extends CoreOperators { findIndex?: (predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any) => Observable; max?: (comparer?: (x: R, y: T) => R) => Observable; min?: (comparer?: (x: R, y: T) => R) => Observable; + pairwise?: () => Observable; timeInterval?: (scheduler?: IScheduler) => Observable; mergeScan?: (project: (acc: R, x: T) => Observable, seed: R, concurrent?: number) => Observable; exhaust?: () => Observable; @@ -89,6 +90,7 @@ import './add/operator/mergeScan'; import './add/operator/min'; import './add/operator/multicast'; import './add/operator/observeOn'; +import './add/operator/pairwise'; import './add/operator/partition'; import './add/operator/publish'; import './add/operator/publishBehavior'; diff --git a/src/add/operator/pairwise.ts b/src/add/operator/pairwise.ts new file mode 100644 index 0000000000..155f49ab0b --- /dev/null +++ b/src/add/operator/pairwise.ts @@ -0,0 +1,7 @@ +import {Observable} from '../../Observable'; +import {pairwise} from '../../operator/pairwise'; +import {KitchenSinkOperators} from '../../Rx.KitchenSink'; +const observableProto = (>Observable.prototype); +observableProto.pairwise = pairwise; + +export var _void: void; diff --git a/src/operator/pairwise.ts b/src/operator/pairwise.ts new file mode 100644 index 0000000000..107b183a86 --- /dev/null +++ b/src/operator/pairwise.ts @@ -0,0 +1,38 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; + +/** + * Returns a new observable that triggers on the second and following inputs. + * An input that triggers an event will return an pair of [(N - 1)th, Nth]. + * The (N-1)th is stored in the internal state until Nth input occurs. + * @returns {Observable} an observable of pairs of values. + */ +export function pairwise(): Observable { + return this.lift(new PairwiseOperator()); +} + +class PairwiseOperator implements Operator { + call(subscriber: Subscriber): Subscriber { + return new PairwiseSubscriber(subscriber); + } +} + +class PairwiseSubscriber extends Subscriber { + private prev: T; + private hasPrev: boolean = false; + + constructor(destination: Subscriber) { + super(destination); + } + + _next(value: T): void { + if (this.hasPrev) { + this.destination.next([this.prev, value]); + } else { + this.hasPrev = true; + } + + this.prev = value; + } +}