Comparing version 0.6.2 to 0.7.0
{ | ||
"name": "most", | ||
"main": "most.js", | ||
"version": "0.6.2", | ||
"version": "0.7.0", | ||
"homepage": "https://github.com/cujojs/most", | ||
@@ -26,2 +26,3 @@ "authors": [ | ||
"bower_components", | ||
"examples", | ||
"test", | ||
@@ -28,0 +29,0 @@ "tests", |
@@ -1,3 +0,4 @@ | ||
var Promise = require('./promises').Promise; | ||
var resolve = require('./promises').Promise.resolve; | ||
var tail = require('./base').tail; | ||
var dispatch = require('./dispatch'); | ||
@@ -11,17 +12,5 @@ /** | ||
module.exports = function asap(f /*,...args*/) { | ||
return Promise.resolve(tail(arguments)).then(function(args) { | ||
return resolve(tail(arguments)).then(function(args) { | ||
return dispatch(f, args); | ||
}); | ||
}; | ||
function dispatch(f, args) { | ||
/*jshint maxcomplexity:6*/ | ||
switch(args.length) { | ||
case 0: return f(); | ||
case 1: return f(args[0]); | ||
case 2: return f(args[0], args[1]); | ||
case 3: return f(args[0], args[1], args[2]); | ||
case 4: return f(args[0], args[1], args[2], args[3]); | ||
default: return f.apply(void 0, args); | ||
} | ||
} |
@@ -10,3 +10,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
exports.copy = copy; | ||
exports.map = map; | ||
exports.replace = replace; | ||
exports.findIndex = findIndex; | ||
@@ -45,2 +47,11 @@ function identity(x) { | ||
function map(f, array) { | ||
var l = array.length; | ||
var a = new Array(l); | ||
for(var i=0; i<l; ++i) { | ||
a[i] = f(array[i]); | ||
} | ||
return a; | ||
} | ||
function replace(x, i, array) { | ||
@@ -53,2 +64,11 @@ var l = array.length; | ||
return a; | ||
} | ||
} | ||
function findIndex(p, a) { | ||
for(var i= 0, l= a.length; i<l; ++i) { | ||
if(p(a[i])) { | ||
return i; | ||
} | ||
} | ||
return -1; | ||
} |
@@ -7,4 +7,10 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Stream = require('../Stream'); | ||
var identity = require('../base').identity; | ||
var Yield = Stream.Yield; | ||
exports.unfold = unfold; | ||
exports.iterate = iterate; | ||
exports.repeat = repeat; | ||
/** | ||
@@ -16,5 +22,5 @@ * Build a stream by unfolding steps from a seed value | ||
*/ | ||
exports.unfold = function(f, x) { | ||
function unfold(f, x) { | ||
return new Stream(f, x); | ||
}; | ||
} | ||
@@ -27,7 +33,8 @@ /** | ||
*/ | ||
exports.iterate = function(f, x) { | ||
function iterate(f, x) { | ||
var scheduler = Stream.getDefaultScheduler(); | ||
return new Stream(function(x) { | ||
return new Yield(x, f(x)); | ||
return new Yield(scheduler.now(), x, f(x)); | ||
}, x); | ||
}; | ||
} | ||
@@ -39,8 +46,4 @@ /** | ||
*/ | ||
exports.repeat = function(x) { | ||
return new Stream(repeat, x); | ||
}; | ||
function repeat(x) { | ||
return new Yield(x, x); | ||
return iterate(identity, x); | ||
} |
@@ -6,3 +6,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Stream = require('../Stream'); | ||
var promise = require('../promises'); | ||
@@ -12,2 +11,5 @@ var step = require('../step'); | ||
var when = promise.when; | ||
var raceIndex = promise.raceIndex; | ||
var Yield = step.Yield; | ||
var End = step.End; | ||
@@ -20,2 +22,3 @@ var Pair = step.Pair; | ||
exports.filter = filter; | ||
exports.takeUntil = takeUntil; | ||
exports.take = take; | ||
@@ -28,3 +31,6 @@ exports.takeWhile = takeWhile; | ||
* Retain only items matching a predicate | ||
* stream: -12345678- | ||
* filter(x => x % 2 === 0, stream): --2-4-6-8- | ||
* @param {function(x:*):boolean} p filtering predicate called for each item | ||
* @param {Stream} stream stream to filter | ||
* @returns {Stream} stream containing only items for which predicate returns truthy | ||
@@ -34,3 +40,3 @@ */ | ||
var stepper = stream.step; | ||
return new Stream(function(state) { | ||
return stream.beget(function(state) { | ||
return filterNext(p, stepper, state); | ||
@@ -48,3 +54,6 @@ }, stream.state); | ||
/** | ||
* stream: -abcd- | ||
* take(2, stream): -ab | ||
* @param {function(x:*):boolean} p | ||
* @param {Stream} stream stream from which to take | ||
* @returns {Stream} stream containing items up to, but not including, the | ||
@@ -55,6 +64,6 @@ * first item for which p returns falsy. | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
return when(function (i) { | ||
return i.done || p(i.value) ? i | ||
: new End(); | ||
: new End(i.time, i.value, s.state); | ||
}, when(stepper, s)); | ||
@@ -65,3 +74,6 @@ }, stream.state); | ||
/** | ||
* stream: -123451234- | ||
* takeWhile(x => x < 5, stream): -1234 | ||
* @param {Number} n | ||
* @param {Stream} stream stream from which to take | ||
* @returns {Stream} stream containing at most the first n items from this stream | ||
@@ -71,18 +83,80 @@ */ | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
if(s.value === 0) { | ||
return new End(); | ||
return new End(s.time, s.value, s.state); | ||
} | ||
return when(function (i) { | ||
return i.done ? i : yieldPair(i, s.value - 1); | ||
return i.done ? i | ||
: i.withState(new Yield(i.time, s.value - 1, i.state)); | ||
}, when(stepper, s.state)); | ||
}, new Pair(Math.max(0, n), stream.state)); | ||
}, new Yield(stream.scheduler.now(), n|0, stream.state)); | ||
} | ||
/** | ||
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b] | ||
* @param {Stream} stream | ||
* @returns {Stream} stream with no adjacent duplicates using === to | ||
* recognize duplicates | ||
* stream: -a-b-c-d-e-f-g | ||
* signal: -------x | ||
* takeUntil(signal, stream): -a-b-c- | ||
* @param {Stream} signal retain only events in stream before the first | ||
* event in signal | ||
* @param {Stream} stream events to retain | ||
* @returns {Stream} new stream containing only events that occur before | ||
* the first event in signal. | ||
*/ | ||
function takeUntil(signal, stream) { | ||
return stream.begetWithDispose(stepTakeUntil, initTakeUntil(signal, stream), | ||
disposeTakeUntil); | ||
} | ||
function stepTakeUntil(s) { | ||
return s.time === Infinity ? stepTakeUntilSignal(s) : stepTakeUntilTime(s); | ||
} | ||
function stepTakeUntilTime(s) { | ||
return when(function(i) { | ||
return i.time < s.time ? i.withState(updateTakeUntilState(s, i.state)) | ||
: new End(s.time, i.value, s); | ||
}, when(s.stream.step, s.state)); | ||
} | ||
function stepTakeUntilSignal (s) { | ||
return raceIndex(function (i, index) { | ||
return index === 0 ? stepTakeUntilTime(updateTakeUntilTime(s, i.time)) | ||
: i.withState(updateTakeUntilState(s, i.state)); | ||
}, [getSignal(s), when(s.stream.step, s.state)]); | ||
} | ||
function disposeTakeUntil(t, x, s) { | ||
return s.stream.dispose(t, x, s.state); | ||
} | ||
function getSignal (s) { | ||
return s.until === void 0 ? when(s.signal.step, s.signal.state) : s.until; | ||
} | ||
function initTakeUntil (signal, stream) { | ||
return new TakeUntil(void 0, Infinity, signal, stream, stream.state); | ||
} | ||
function updateTakeUntilState(s, newState) { | ||
return new TakeUntil(s.until, s.time, s.signal, s.stream, newState); | ||
} | ||
function updateTakeUntilTime(s, t) { | ||
return new TakeUntil(void 0, t, void 0, s.stream, s.state); | ||
} | ||
function TakeUntil(until, time, signal, stream, state) { | ||
this.until = until, this.time = time, this.signal = signal; | ||
this.stream = stream; this.state = state; | ||
} | ||
/** | ||
* Remove adjacent duplicates, using === to detect duplicates | ||
* stream: -abbcd- | ||
* distinct(stream): -ab-cd- | ||
* @param {?function(a:*, b:*):boolean} equals optional function to compare items. | ||
* @returns {Stream} stream with no adjacent duplicates | ||
*/ | ||
function distinct(stream) { | ||
@@ -93,5 +167,7 @@ return distinctBy(same, stream); | ||
/** | ||
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b] | ||
* Remove adjacent duplicates using the provided equals function to detect duplicates | ||
* stream: -abbcd- | ||
* distinct(stream): -ab-cd- | ||
* @param {?function(a:*, b:*):boolean} equals optional function to compare items. | ||
* @param {Stream} stream | ||
* @param {Stream} stream stream from which to omit adjacent duplicates | ||
* @returns {Stream} stream with no adjacent duplicates | ||
@@ -101,3 +177,3 @@ */ | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
return stepDistinct(equals, stepper, s); | ||
@@ -104,0 +180,0 @@ }, new Pair(init, stream.state)); |
@@ -7,12 +7,16 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Stream = require('../Stream'); | ||
var reduce = require('../combinators/reduce').reduce; | ||
var promise = require('../promises'); | ||
var step = require('../step'); | ||
var base = require('../base'); | ||
var replace = base.replace; | ||
var copy = base.copy; | ||
var map = base.map; | ||
var raceIndex = promise.raceIndex; | ||
var all = promise.Promise.all; | ||
var when = promise.when; | ||
var Yield = step.Yield; | ||
var End = step.End; | ||
var slice = Array.prototype.slice; | ||
exports.merge = merge; | ||
@@ -23,3 +27,3 @@ exports.mergeArray = mergeArray; | ||
/** | ||
* @returns {Stream} observable containing events from all observables in the argument | ||
* @returns {Stream} stream containing events from all streams in the argument | ||
* list in time order. If two events are simultaneous they will be merged in | ||
@@ -29,22 +33,24 @@ * arbitrary order. | ||
function merge(/*...observables*/) { | ||
return mergeArray(slice.call(arguments)); | ||
return mergeArray(copy(arguments)); | ||
} | ||
/** | ||
* @param {Array} observables array of observables to merge | ||
* @returns {Stream} observable containing events from all input observables | ||
* @param {Array} streams array of stream to merge | ||
* @returns {Stream} stream containing events from all input observables | ||
* in time order. If two events are simultaneous they will be merged in | ||
* arbitrary order. | ||
*/ | ||
function mergeArray(observables) { | ||
return new Stream(stepMerge, observables.map(initStep)); | ||
function mergeArray(streams) { | ||
return new Stream(stepMerge, map(initStep, streams), void 0, disposeRemaining); | ||
} | ||
/** | ||
* @param {Stream} observableOfObservables observable of observables | ||
* @returns {Stream} observable containing events from all observables in the | ||
* @param {Stream} streamOfStreams stream of streams to merge | ||
* @returns {Stream} stream containing events from all observables in the | ||
* input in time order. If two events are simultaneous they will be merged in | ||
* arbitrary order. | ||
*/ | ||
function mergeAll(observableOfObservables) { | ||
return Stream.fromPromise(toArray(observableOfObservables)).flatMap(mergeArray); | ||
function mergeAll(streamOfStreams) { | ||
// TODO: implement a solution that doesn't involve converting to an array first | ||
// We should be able to merge an infinite stream of streams. | ||
return Stream.fromPromise(toArray(streamOfStreams)).flatMap(mergeArray); | ||
} | ||
@@ -54,3 +60,3 @@ | ||
if(s.length === 0) { | ||
return new End(); | ||
return new End(0, void 0, []); | ||
} | ||
@@ -62,5 +68,5 @@ | ||
function stepAll (s) { | ||
return s.map(function (s) { | ||
return map(function (s) { | ||
return stepPair(s.stream); | ||
}); | ||
}, s); | ||
} | ||
@@ -70,15 +76,24 @@ | ||
return raceIndex(function(i, index) { | ||
return handleStep(s, i, index); | ||
}, s.map(getI)); | ||
return handleStep(i, index, s); | ||
}, map(getIteration, s)); | ||
} | ||
function handleStep(s, i, index) { | ||
return i.done ? stepMerge(without(index, s)) | ||
: new Yield(i.value, stepAtIndex(s, i, index)); | ||
function handleStep(i, index, s) { | ||
if(i.done) { | ||
var sp = without(i, index, s); | ||
if(s.length === 1) { | ||
return when(function(s) { | ||
return i.withState(s); | ||
}, sp); | ||
} | ||
return when(stepMerge, sp); | ||
} | ||
return i.withState(stepAtIndex(i, index, s)); | ||
} | ||
function stepAtIndex(s, i, index) { | ||
return s.map(function (sn, j) { | ||
return j === index ? stepPair(new Stream(sn.stream.step, i.state)) : sn; | ||
}); | ||
function stepAtIndex(i, index, s) { | ||
var sn = s[index]; | ||
return replace(stepPair(sn.stream.beget(sn.stream.step, i.state)), index, s); | ||
} | ||
@@ -94,17 +109,26 @@ | ||
function getI(s) { | ||
function getIteration(s) { | ||
return s.i; | ||
} | ||
function without(i, a) { | ||
return a.filter(function(x, ai) { | ||
return i !== ai; | ||
}); | ||
function disposeRemaining(t, x, remaining) { | ||
return all(map(function(s) { | ||
return s.stream.dispose(t, x, s.stream.state); | ||
}, remaining)); | ||
} | ||
function without(step, index, arr) { | ||
var stream = arr[index].stream; | ||
return when(function() { | ||
return arr.filter(function(x, ai) { | ||
return index !== ai; | ||
}); | ||
}, stream.dispose(step.time, step.value, step.state)); | ||
} | ||
function toArray (observableOfObservables) { | ||
return observableOfObservables.reduce(function (a, obs) { | ||
return reduce(function (a, obs) { | ||
a.push(obs); | ||
return a; | ||
}, []); | ||
}, [], observableOfObservables); | ||
} |
@@ -11,3 +11,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Yield = step.Yield; | ||
var End = step.End; | ||
@@ -22,3 +21,3 @@ | ||
function empty() { | ||
return new Stream(identity, new End()); | ||
return new Stream(identity, new End(Stream.getDefaultScheduler().now())); | ||
} | ||
@@ -33,13 +32,3 @@ | ||
function concat(left, right) { | ||
return flatMap(identity, new Stream(identity, two(left, right))); | ||
return flatMap(identity, Stream.from([left, right])); | ||
} | ||
/** | ||
* Create a Step that yields a then b | ||
* @param {*} a | ||
* @param {*} b | ||
* @returns {Yield} | ||
*/ | ||
function two(a, b) { | ||
return new Yield(a, new Yield(b, new End())); | ||
} |
@@ -1,6 +0,10 @@ | ||
var promise = require('../promises'); | ||
/** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
/** @author Brian Cavalier */ | ||
/** @author John Hann */ | ||
/** @module */ | ||
var asap = require('../asap'); | ||
var runStream = require('../runStream'); | ||
var when = require('../promises').when; | ||
var when = promise.when; | ||
exports.reduce = reduce; | ||
@@ -19,3 +23,3 @@ exports.reduce1 = reduce1; | ||
function reduce(f, z, stream) { | ||
return asap(reduceStep, f, z, stream.step, stream.state); | ||
return asap(runStream, f, z, stream, stream.state, disposeReduce); | ||
} | ||
@@ -34,11 +38,11 @@ | ||
return stream.head().then(function(z) { | ||
return reduce(f, z, stream.tail()); | ||
var tail = stream.tail(); | ||
return runStream(f, z, tail, tail.state, disposeReduce); | ||
}); | ||
} | ||
function reduceStep(f, z, stepper, state) { | ||
return when(function (i) { | ||
return i.done ? z | ||
: reduceStep(f, f(z, i.value), stepper, i.state); | ||
}, when(stepper, state)); | ||
function disposeReduce(stream, z, i) { | ||
return when(function() { | ||
return z; | ||
}, stream.dispose(i.time, i.value, i.state)); | ||
} |
@@ -24,3 +24,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
function switchLatest(stream) { | ||
return new Stream(stepSwitch, initState(stream)); | ||
return stream.beget(stepSwitch, initState(stream)); | ||
} | ||
@@ -34,7 +34,7 @@ | ||
return raceIndex(function(i, index) { | ||
return doSwitchNext(s, i, index); | ||
return doSwitchNext(i, index, s); | ||
}, s.current); | ||
} | ||
function doSwitchNext(s, i, index) { | ||
function doSwitchNext(i, index, s) { | ||
/*jshint maxcomplexity:7*/ | ||
@@ -62,3 +62,3 @@ var never = Stream.never(); | ||
// Inner not done, yield latest value | ||
return new Yield(i.value, updateInner(new Stream(s.inner.step, i.state), s)); | ||
return new Yield(i.time, i.value, updateInner(s.inner.beget(s.inner.step, i.state), s)); | ||
} | ||
@@ -82,4 +82,11 @@ | ||
function end(outer, endStep) { | ||
return { outer: outer, inner: Stream.never(), current: [endStep, endStep] }; | ||
} | ||
function awaitNextOuter(outer, i) { | ||
return switchNext(updateBoth(new Stream(outer.step, i.state), i.value)); | ||
var next = i.done ? end(outer, i) | ||
: updateBoth(outer.beget(outer.step, i.state), i.value); | ||
return switchNext(next); | ||
} | ||
@@ -96,1 +103,2 @@ | ||
} | ||
@@ -7,3 +7,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Stream = require('../Stream'); | ||
var Scheduler = require('../Scheduler'); | ||
var promise = require('../promises'); | ||
@@ -19,3 +18,2 @@ var step = require('../step'); | ||
var yieldPair = step.yieldPair; | ||
var ensureScheduler = Scheduler.ensure; | ||
@@ -30,2 +28,3 @@ exports.periodic = periodic; | ||
exports.throttleOn = throttleOn; | ||
exports.sync = sync; | ||
@@ -38,3 +37,3 @@ /** | ||
function periodic(period) { | ||
return periodicOn(Scheduler.getDefault(), period); | ||
return periodicOn(Stream.getDefaultScheduler(), period); | ||
} | ||
@@ -50,10 +49,60 @@ | ||
function periodicOn(scheduler, period) { | ||
return new Stream(function(p) { | ||
var now = p.state.now(); | ||
return delayed(p.value, new Yield(now, p), p.state); | ||
}, new Pair(Math.max(1, period), ensureScheduler(scheduler))); | ||
var stream = new Stream(function (s) { | ||
return new Yield(s, s, s + period); | ||
}, scheduler.now(), scheduler); | ||
return skipPast(Math.ceil(period * 0.01), sync(stream)); | ||
} | ||
/** | ||
* Skip all events that are in the past, as determined by the supplied | ||
* stream's scheduler | ||
* @private | ||
* @param {Number} epsilon account for timer resolution (or lack thereof) | ||
* @param stream | ||
* @returns {Stream} | ||
*/ | ||
function skipPast(epsilon, stream) { | ||
return stream.beget(function(s) { | ||
return stepSkipPast(epsilon, stream.scheduler, stream.step, s); | ||
}, stream.state); | ||
} | ||
function stepSkipPast (epsilon, scheduler, stepper, s) { | ||
return when(function (i) { | ||
return i.time + epsilon >= scheduler.now() ? i | ||
: stepSkipPast(epsilon, scheduler, stepper, i.state); | ||
}, Promise.resolve(s).then(stepper)); | ||
} | ||
/** | ||
* Synchronize a stream's items with its scheduler, ensuring that | ||
* items are emitted only at their specified time | ||
* @private | ||
* @param {Stream} stream stream to synchronize | ||
* @returns {Stream} new stream whose items are synchronized to the | ||
* stream's scheduler | ||
*/ | ||
function sync(stream) { | ||
return stream.beget(makeSyncStepper(stream.scheduler, stream.step), stream.state); | ||
} | ||
function makeSyncStepper(scheduler, stepper) { | ||
return function (s) { | ||
return syncStep(scheduler, stepper, s); | ||
}; | ||
} | ||
function syncStep (scheduler, stepper, s) { | ||
return when(function (i) { | ||
return getSyncStep(scheduler, i); | ||
}, when(stepper, s)); | ||
} | ||
function getSyncStep (scheduler, i) { | ||
var now = scheduler.now(); | ||
return now < i.time ? delayed(i.time - now, i, scheduler) : i; | ||
} | ||
/** | ||
* @param {Number} delayTime milliseconds to delay each item using | ||
@@ -65,3 +114,3 @@ * the provided scheduler | ||
function delay(delayTime, stream) { | ||
return delayOn(Scheduler.getDefault(), delayTime, stream); | ||
return delayOn(stream.scheduler, delayTime, stream); | ||
} | ||
@@ -76,11 +125,15 @@ | ||
function delayOn(scheduler, delayTime, stream) { | ||
var stepDelay = delayStep(delayTime); | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return when(function(i) { | ||
return i.done ? i | ||
: delayed(s.value, yieldPair(i, s.value), scheduler); | ||
}, when(stepper, s.state)); | ||
}, new Pair(Math.max(0, delayTime), stream.state)); | ||
return stream.beget(makeSyncStepper(scheduler, function(s) { | ||
return when(stepDelay, when(stepper, s)); | ||
}), stream.state); | ||
} | ||
function delayStep(dt) { | ||
return function(i) { | ||
return i.delay(dt); | ||
}; | ||
} | ||
/** | ||
@@ -95,3 +148,3 @@ * Limit the rate of events | ||
function throttle(period, stream) { | ||
return throttleOn(Scheduler.getDefault(), period, stream); | ||
return throttleOn(stream.scheduler, period, stream); | ||
} | ||
@@ -112,3 +165,3 @@ | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
return throttleNext(stepper, s, period, scheduler); | ||
@@ -121,9 +174,7 @@ }, new Pair(-1, stream.state)); | ||
if(i.done) { | ||
return i; | ||
return i.withState(s.state); | ||
} | ||
var now = scheduler.now(); | ||
var end = s.value; | ||
return now > end ? yieldPair(i, now + period) | ||
: throttleNext(stepper, new Pair(end, i.state), period, scheduler); | ||
return i.time > s.value ? yieldPair(i, i.time + period) | ||
: throttleNext(stepper, new Pair(s.value, i.state), period, scheduler); | ||
}, when(stepper, s.state)); | ||
@@ -142,3 +193,3 @@ } | ||
function debounce(period, stream) { | ||
return debounceOn(Scheduler.getDefault(), period, stream); | ||
return debounceOn(stream.scheduler, period, stream); | ||
} | ||
@@ -158,3 +209,3 @@ | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
return stepDebounce(scheduler, period, stepper, s); | ||
@@ -171,3 +222,3 @@ }, { timer: void 0, prev: void 0, next: void 0, state: stream.state }); | ||
return index === 0 ? yieldDebounced(s) | ||
: winner.done ? winner | ||
: winner.done ? winner.withState(s.state) | ||
: stepEarliest(scheduler, period, step, nextState(scheduler, period, step, winner)); | ||
@@ -182,3 +233,3 @@ }, [s.timer, s.next]); | ||
function nextState (scheduler, period, step, winner) { | ||
return { timer: delayed(period, 'timer', scheduler), prev: winner, | ||
return { timer: delayed(period, new Yield(scheduler.now() + period, 'timer'), scheduler), prev: winner, | ||
next: when(step, winner.state), state: winner.state }; | ||
@@ -189,5 +240,12 @@ } | ||
return when(function (prev) { | ||
return new Yield(prev.value, { timer: never(), prev: void 0, | ||
return new Yield(prev.time, prev.value, { timer: never(), prev: void 0, | ||
next: s.next, state: prev.state }); | ||
}, s.prev); | ||
} | ||
} | ||
function ensureScheduler(scheduler) { | ||
if(typeof scheduler === 'undefined') { | ||
return Stream.getDefaultScheduler(); | ||
} | ||
return scheduler; | ||
} |
@@ -6,3 +6,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Stream = require('../Stream'); | ||
var promise = require('../promises'); | ||
@@ -29,3 +28,3 @@ var step = require('../step'); | ||
var stepper = stream.step; | ||
return new Stream(function (state) { | ||
return stream.beget(function (state) { | ||
return mapNext(f, stepper, state); | ||
@@ -50,3 +49,3 @@ }, stream.state); | ||
var stepper = stream.step; | ||
return new Stream(function (state) { | ||
return stream.beget(function (state) { | ||
return tapNext(f, stepper, state); | ||
@@ -87,3 +86,3 @@ }, stream.state); | ||
var stepper = stream.step; | ||
return new Stream(function(s) { | ||
return stream.beget(function(s) { | ||
return stepScan(f, stepper, s); | ||
@@ -96,7 +95,7 @@ }, new Pair(initial, stream.state)); | ||
if (i.done) { | ||
return i; | ||
return i.withState(s.state); | ||
} | ||
var value = f(s.value, i.value); | ||
return new Yield(value, new Pair(value, i.state)); | ||
return new Yield(i.time, value, new Pair(value, i.state)); | ||
}, when(stepper, s.state)); | ||
@@ -113,3 +112,3 @@ } | ||
function flatMap(f, stream) { | ||
return new Stream(stepChain, new Outer(f, stream)); | ||
return stream.begetWithDispose(stepChain, new Outer(f, stream), disposeInnerThenOuter); | ||
@@ -121,2 +120,15 @@ function stepChain(s) { | ||
function disposeInnerThenOuter(t, x, s) { | ||
if(s === void 0) { | ||
return x; | ||
} | ||
return when(function disposeOuter() { | ||
return disposeStream(t, x, s.outer); | ||
}, disposeStream(t, x, s.inner)); | ||
} | ||
function disposeStream(t, x, stream) { | ||
return stream !== void 0 ? stream.dispose(t, x, stream.state) : x; | ||
} | ||
// flatMap outer/inner "loop" helpers | ||
@@ -135,3 +147,3 @@ | ||
return i.done ? i | ||
: stepInner(stepNext, f, new Stream(outer.step, i.state), f(i.value)); | ||
: stepInner(stepNext, f, outer.beget(outer.step, i.state), f(i.value)); | ||
}, Promise.resolve(streamNext(outer))); | ||
@@ -150,4 +162,8 @@ } | ||
return when(function(ii) { | ||
return ii.done ? stepOuter(stepNext, f, outer) | ||
: new Yield(ii.value, new Inner(f, outer, new Stream(inner.step, ii.state))); | ||
if(ii.done) { | ||
return when(function() { | ||
return stepOuter(stepNext, f, outer); | ||
}, inner.dispose(ii.time, ii.value, ii.state)); | ||
} | ||
return ii.withState(new Inner(f, outer, inner.beget(inner.step, ii.state))); | ||
}, Promise.resolve(streamNext(inner))); | ||
@@ -154,0 +170,0 @@ } |
@@ -10,8 +10,8 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var base = require('../base'); | ||
var dispatch = require('../dispatch'); | ||
var when = promise.when; | ||
var Promise = promise.Promise; | ||
var all = promise.Promise.all; | ||
var isDone = step.isDone; | ||
var getValue = step.getValue; | ||
var Yield = step.Yield; | ||
@@ -22,2 +22,4 @@ var End = step.End; | ||
var copy = base.copy; | ||
var map = base.map; | ||
var findIndex = base.findIndex; | ||
@@ -30,7 +32,7 @@ exports.zipWith = zipWith; | ||
/** | ||
* Combine events from all observables using f | ||
* Combine events from all streams using f | ||
* @param {function(a:Stream, b:Stream, ...):*} f function to combine items | ||
* @returns {Stream} observable containing | ||
*/ | ||
function zipWith(f /*,...observables*/) { | ||
function zipWith(f /*,...streams*/) { | ||
return zipArrayWith(f, tail(arguments)); | ||
@@ -40,6 +42,6 @@ } | ||
/** | ||
* Combine events from all observables by collecting them into an array | ||
* Combine events from all streams by collecting them into an array | ||
* @returns {*} | ||
*/ | ||
function zip(/*...observables*/) { | ||
function zip(/*...streams*/) { | ||
return zipArrayWith(Array, copy(arguments)); | ||
@@ -49,33 +51,64 @@ } | ||
/** | ||
* Combine events from all observables by collecting them into an array | ||
* @param {Array} observables array of observables to zip | ||
* Combine events from all streams by collecting them into an array | ||
* @param {Array} streams array of streams to zip | ||
* @returns {*} | ||
*/ | ||
function zipArray(observables) { | ||
return zipArrayWith(Array, observables); | ||
function zipArray(streams) { | ||
return zipArrayWith(Array, streams); | ||
} | ||
/** | ||
* Combine events from all observables using f | ||
* Combine events from all streams using f | ||
* @param {function(a:Stream, b:Stream, ...):*} f function to combine items | ||
* @param {Array} observables array of observables to zip | ||
* @returns {Stream} observable containing | ||
* @param {Array} streams array of observables to zip | ||
* @returns {Stream} stream containing items from all input streams combined | ||
* using f | ||
*/ | ||
function zipArrayWith(f, observables) { | ||
function zipArrayWith(f, streams) { | ||
return new Stream(function(ss) { | ||
return stepZip(f, ss); | ||
}, observables); | ||
}, streams, void 0, disposeAll); | ||
} | ||
function stepZip (f, streams) { | ||
return Promise.all(streams.map(streamNext)).then(function (is) { | ||
if (is.some(isDone)) { | ||
return new End(); | ||
return all(map(streamNext, streams)).then(function (iterations) { | ||
return handleStepZip(f, streams, iterations); | ||
}); | ||
} | ||
function handleStepZip(f, streams, iterations) { | ||
var done = findIndex(isDone, iterations); | ||
if(done < 0) { | ||
return applyZipWith(f, streams, iterations); | ||
} | ||
var ended = iterations[done]; | ||
return new End(ended.time, ended.value, streams); | ||
} | ||
function disposeAll(t, x, streams) { | ||
return all(map(function(stream) { | ||
return stream.dispose(t, x, stream.state); | ||
}, streams)); | ||
} | ||
function applyZipWith(f, streams, iterations) { | ||
var t = 0; | ||
var values = new Array(iterations.length); | ||
var states = new Array(iterations.length); | ||
var stream, it; | ||
for(var i=0, l=iterations.length; i<l; ++i) { | ||
stream = streams[i]; | ||
it = iterations[i]; | ||
if(it.time > t) { | ||
t = it.time; | ||
} | ||
var value = f.apply(void 0, is.map(getValue)); | ||
return new Yield(value, is.map(function (it, i) { | ||
return new Stream(streams[i].step, it.state); | ||
})); | ||
}); | ||
values[i] = it.value; | ||
states[i] = stream.beget(stream.step, it.state); | ||
} | ||
return new Yield(t, dispatch(f, values), states); | ||
} | ||
@@ -82,0 +115,0 @@ |
@@ -6,5 +6,8 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Promise = require('./promises').Promise; | ||
var promise = require('./promises'); | ||
var step = require('./step'); | ||
var resolve = promise.Promise.resolve; | ||
var when = promise.when; | ||
var Yield = step.Yield; | ||
@@ -15,7 +18,32 @@ var End = step.End; | ||
exports.head = head; | ||
exports.makeIterable = makeIterable; | ||
exports.getIterator = getIterator; | ||
function from(x) { | ||
/*global Set, Symbol*/ | ||
var iteratorSymbol; | ||
// Firefox ships a partial implementation using the name @@iterator. | ||
// https://bugzilla.mozilla.org/show_bug.cgi?id=907077#c14 | ||
if (typeof Set === 'function' && typeof new Set()['@@iterator'] === 'function') { | ||
iteratorSymbol = '@@iterator'; | ||
} else { | ||
iteratorSymbol = typeof Symbol === 'function' && Symbol.iterator || | ||
'_es6shim_iterator_'; | ||
} | ||
function makeIterable(makeIterator, obj) { | ||
obj[iteratorSymbol] = makeIterator; | ||
} | ||
function isIterable(o) { | ||
return typeof o[iteratorSymbol] === 'function'; | ||
} | ||
function getIterator(o) { | ||
return o[iteratorSymbol](); | ||
} | ||
function from(scheduler, x) { | ||
/*jshint maxcomplexity:6*/ | ||
if(Array.isArray(x)) { | ||
return new ArrayIterable(x); | ||
return new ArrayIterable(scheduler.now, x); | ||
} | ||
@@ -25,11 +53,11 @@ | ||
if(typeof x !== 'function' && typeof x.length === 'number') { | ||
return new ArrayIterable(x); | ||
return new ArrayIterable(scheduler.now, x); | ||
} | ||
if(typeof x.iterator === 'function') { | ||
return x; | ||
if(isIterable(x)) { | ||
return new IterableAdapter(scheduler.now, x); | ||
} | ||
if(typeof x.next === 'function') { | ||
return new IterableWrapper(x); | ||
return new IterableWrapper(new IteratorAdapter(scheduler.now(), x)); | ||
} | ||
@@ -42,10 +70,32 @@ } | ||
function head(iterable) { | ||
var iterator = iterable.iterator(); | ||
var iterator = getIterator(iterable); | ||
var iteration = iterator.next(); | ||
return Promise.resolve(iteration).then(function(iteration) { | ||
return resolve(iteration).then(function(iteration) { | ||
return iteration.done ? iteration | ||
: new Yield(iteration.value, new IterableWrapper(iterator)); | ||
: new Yield(iteration.time, iteration.value, new IterableWrapper(iterator)); | ||
}); | ||
} | ||
function IterableAdapter(now, iterable) { | ||
this.now = now; | ||
this.iterable = iterable; | ||
} | ||
makeIterable(function() { | ||
return new IteratorAdapter(this.now(), getIterator(this.iterable)); | ||
}, IterableAdapter.prototype); | ||
function IteratorAdapter(time, iterator) { | ||
this.time = time; | ||
this._iterator = iterator; | ||
} | ||
IteratorAdapter.prototype.next = function() { | ||
var time = this.time; | ||
return when(function(i) { | ||
return i.done ? new End(time, i.value) | ||
: new Yield(time, i.value, this); | ||
}, this._iterator.next()); | ||
}; | ||
function IterableWrapper(iterator) { | ||
@@ -55,15 +105,17 @@ this._iterator = iterator; | ||
IterableWrapper.prototype.iterator = function() { | ||
makeIterable(function() { | ||
return this._iterator; | ||
}; | ||
}, IterableWrapper.prototype); | ||
function ArrayIterable(array) { | ||
function ArrayIterable(now, array) { | ||
this.now = now; | ||
this.array = array; | ||
} | ||
ArrayIterable.prototype.iterator = function() { | ||
return new ArrayIterator(this.array); | ||
}; | ||
makeIterable(function() { | ||
return new ArrayIterator(this.now(), this.array); | ||
}, ArrayIterable.prototype); | ||
function ArrayIterator(array) { | ||
function ArrayIterator(time, array) { | ||
this.time = time; | ||
this.array = array; | ||
@@ -75,4 +127,4 @@ this.index = 0; | ||
return this.index < this.array.length | ||
? new Yield(this.array[this.index++]) | ||
: new End(); | ||
? new Yield(this.time, this.array[this.index++], this) | ||
: new End(this.time, void 0); | ||
}; |
@@ -1,4 +0,4 @@ | ||
var Promise = require('when/es6-shim/Promise'); | ||
var WhenPromise = require('when/es6-shim/Promise'); | ||
exports.Promise = Promise; | ||
exports.Promise = WhenPromise; | ||
exports.defer = defer; | ||
@@ -10,5 +10,3 @@ exports.delay = delay; | ||
var neverP = Object.create(Promise.prototype); | ||
neverP.then = never; | ||
neverP.catch = never; | ||
var neverP = { then: never, catch: never }; | ||
@@ -30,3 +28,3 @@ /** | ||
var d = { promise: void 0, resolve: void 0, reject: void 0 }; | ||
d.promise = new Promise(function(resolve, reject) { | ||
d.promise = new WhenPromise(function(resolve, reject) { | ||
d.resolve = resolve; | ||
@@ -80,3 +78,3 @@ d.reject = reject; | ||
var done = false; | ||
return new Promise(runRaceIndex); | ||
return new WhenPromise(runRaceIndex); | ||
@@ -90,3 +88,3 @@ function runRaceIndex(resolve, reject) { | ||
function settleOne(resolve, reject, i, p) { | ||
Promise.resolve(p).then(function(x) { | ||
WhenPromise.resolve(p).then(function(x) { | ||
if(!done) { | ||
@@ -93,0 +91,0 @@ done = true; |
@@ -8,4 +8,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var step = require('./step'); | ||
var makeIterable = require('./iterable').makeIterable; | ||
var Promise = promise.Promise; | ||
var reject = promise.Promise.reject; | ||
var defer = promise.defer; | ||
@@ -27,3 +28,6 @@ | ||
* before pulling subsequent iterations. When the producer is faster, items | ||
* are buffered. | ||
* are buffered according to the supplied bufferPolicy. | ||
* @param {function(s:Step, items:Array):Array=keepAll} bufferPolicy queue buffering | ||
* policy that will be used to manage consumer queue sizes. Defaults to keepAll, | ||
* which allows queues to grow forever. | ||
* @constructor | ||
@@ -36,13 +40,23 @@ */ | ||
this._awaiting = []; | ||
this._ended = false; | ||
this._ended = defer(); | ||
this.ended = this._ended.promise; | ||
this.isEnded = false; | ||
} | ||
Queue.prototype.iterator = function() { | ||
Queue.disposeQueue = disposeQueue; | ||
function disposeQueue(t, x, queue) { | ||
return queue.end(t, x); | ||
} | ||
makeIterable(function() { | ||
return new QueueIterator(this); | ||
}; | ||
}, Queue.prototype); | ||
Queue.prototype.add = function(x) { | ||
checkEnd(this._ended); | ||
Queue.prototype.add = function(t, x) { | ||
if (this.isEnded) { | ||
throw new Error('Queue ended'); | ||
} | ||
var iteration = new Yield(x); | ||
var iteration = new Yield(t, x, this); | ||
if (this._awaiting.length === 0) { | ||
@@ -55,7 +69,9 @@ this._items = this._bufferPolicy(iteration, this._items); | ||
Queue.prototype.end = function(x) { | ||
checkEnd(this._ended); | ||
Queue.prototype.end = function(t, x) { | ||
if(this.isEnded) { | ||
return; | ||
} | ||
this._ended = true; | ||
var end = new End(x); | ||
this.isEnded = true; | ||
var end = new End(t, x, this); | ||
if(this._awaiting.length > 0) { | ||
@@ -66,29 +82,28 @@ this._awaiting.reduce(resolveAll, end); | ||
} | ||
this._ended.resolve(this); | ||
}; | ||
function QueueIterator(q) { | ||
this.q = q; | ||
} | ||
QueueIterator.prototype.next = function() { | ||
var q = this.q; | ||
if (q._items.length > 0) { | ||
return q._items.shift(); | ||
Queue.prototype.get = function() { | ||
if (this._items.length > 0) { | ||
return this._items.shift(); | ||
} | ||
if (q._ended) { | ||
return Promise.reject(new Error('closed')); | ||
if (this.isEnded) { | ||
return reject(new Error('Queue ended')); | ||
} | ||
var consumer = defer(); | ||
q._awaiting.push(consumer); | ||
this._awaiting.push(consumer); | ||
return consumer.promise; | ||
}; | ||
function checkEnd(closed) { | ||
if(closed) { | ||
throw new Error('Queue end'); | ||
} | ||
function QueueIterator(q) { | ||
this.queue = q; | ||
} | ||
QueueIterator.prototype.next = function() { | ||
return this.queue.get(); | ||
}; | ||
function resolveAll(end, consumer) { | ||
@@ -95,0 +110,0 @@ consumer.resolve(end); |
@@ -20,26 +20,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
Scheduler.defaultNow = defaultNow; | ||
var defaultScheduler; | ||
Scheduler.ensure = function(scheduler) { | ||
if(typeof scheduler === 'undefined') { | ||
return Scheduler.getDefault(); | ||
} | ||
return scheduler; | ||
}; | ||
Scheduler.getDefault = function() { | ||
if(defaultScheduler === void 0) { | ||
defaultScheduler = new Scheduler(); | ||
} | ||
return defaultScheduler; | ||
}; | ||
Scheduler.setDefault = function(scheduler) { | ||
if(scheduler != null) { | ||
defaultScheduler = scheduler; | ||
} | ||
}; | ||
function Scheduler(setTimer, clearTimer, now, errorHandler) { | ||
@@ -50,3 +26,4 @@ this.now = now || defaultNow; | ||
this._handleError = errorHandler || logAndReschedule; | ||
this._timer = void 0; | ||
this._timer = null; | ||
this._nextArrival = 0; | ||
this._tasks = []; | ||
@@ -61,6 +38,2 @@ | ||
Scheduler.prototype = { | ||
periodic: function(period, run, state) { | ||
return this._schedule(period, period, run, state); | ||
}, | ||
delayed: function(delay, run, state) { | ||
@@ -80,7 +53,2 @@ return this._schedule(delay, -1, run, state); | ||
shutdown: function() { | ||
this._clearTimer(this._timer); | ||
this._tasks.length = 0; | ||
}, | ||
_schedule: function(delay, period, run, state) { | ||
@@ -103,5 +71,9 @@ var now = this.now(); | ||
_schedulerNextArrival: function (nextArrival, now) { | ||
this._nextArrival = nextArrival; | ||
var delay = Math.max(0, nextArrival - now); | ||
this._timer = this._setTimer(this._runReadyTasksBound, delay); | ||
}, | ||
_scheduleNextRun: function(now) { | ||
this._clearTimer(this._timer); | ||
if(this._tasks.length === 0) { | ||
@@ -111,4 +83,10 @@ return; | ||
var nextArrival = Math.max(0, this._tasks[0].arrival - now); | ||
this._timer = this._setTimer(this._runReadyTasksBound, nextArrival); | ||
var nextArrival = this._tasks[0].arrival; | ||
if(this._timer === null) { | ||
this._schedulerNextArrival(nextArrival, now); | ||
} else if(nextArrival < this._nextArrival) { | ||
this._clearTimer(this._timer); | ||
this._schedulerNextArrival(nextArrival, now); | ||
} | ||
}, | ||
@@ -126,11 +104,14 @@ | ||
_runReadyTasks: function() { | ||
this._timer = null; | ||
var now = this.now(); | ||
var tasks = []; | ||
var tasks = this._tasks; | ||
var reschedule = []; | ||
while(this._tasks.length > 0 && this._tasks[0].arrival <= now) { | ||
tasks.push(this._runTask(now, this._tasks.shift())); | ||
while(tasks.length > 0 && tasks[0].arrival <= now) { | ||
reschedule.push(this._runTask(now, tasks.shift())); | ||
} | ||
for(var i=0; i<tasks.length; ++i) { | ||
this._scheduleNextTask(now, tasks[i]); | ||
for(var i=0; i<reschedule.length; ++i) { | ||
this._scheduleNextTask(now, reschedule[i]); | ||
} | ||
@@ -171,4 +152,4 @@ | ||
throw e; | ||
}); | ||
}, 0); | ||
return task; | ||
} |
@@ -19,12 +19,36 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
function Yield(x, s) { | ||
this.done = false; this.value = x; this.state = s; | ||
/** | ||
* A step that yields a new value and a new state that can be used to produce | ||
* another step | ||
* @param {Number} t time the value became or will become available | ||
* @param {*} x value | ||
* @param {*} s new state | ||
* @constructor | ||
*/ | ||
function Yield(t, x, s) { | ||
this.time = t; this.done = false; this.value = x; this.state = s; | ||
} | ||
Yield.prototype.map = function(f) { | ||
return new Yield(f(this.value), this.state); | ||
return new Yield(this.time, f(this.value), this.state); | ||
}; | ||
function End(x) { | ||
this.done = true; this.value = x; this.state = this; | ||
Yield.prototype.delay = function(dt) { | ||
return new Yield(this.time + dt, this.value, this.state); | ||
}; | ||
Yield.prototype.withState = function(state) { | ||
return new Yield(this.time, this.value, state); | ||
}; | ||
/** | ||
* A step that represents end of stream. The optional value is *not* in the stream, | ||
* but rather a custom end of end of stream marker value. | ||
* @param {Number} t end time | ||
* @param {?*} x optional end signal value | ||
* @param {*} s end state | ||
* @constructor | ||
*/ | ||
function End(t, x, s) { | ||
this.time = t; this.done = true; this.value = x; this.state = s; | ||
} | ||
@@ -36,2 +60,10 @@ | ||
End.prototype.delay = function(dt) { | ||
return new End(this.time + dt, this.value, this.state); | ||
}; | ||
End.prototype.withState = function(state) { | ||
return new End(this.time, this.value, state); | ||
}; | ||
function isDone(step) { | ||
@@ -45,6 +77,2 @@ return step.done; | ||
function getState(step) { | ||
return step.state; | ||
} | ||
function getValueOrFail(step) { | ||
@@ -57,2 +85,12 @@ if(step.done) { | ||
function getState(step) { | ||
return step.state; | ||
} | ||
/** | ||
* A simple value, state pair | ||
* @param {*} x | ||
* @param {*} s | ||
* @constructor | ||
*/ | ||
function Pair(x, s) { | ||
@@ -63,3 +101,3 @@ this.value = x; this.state = s; | ||
function yieldPair(step, x) { | ||
return new Yield(step.value, new Pair(x, step.state)); | ||
return new Yield(step.time, step.value, new Pair(x, step.state)); | ||
} |
@@ -6,2 +6,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var Scheduler = require('./Scheduler'); | ||
var promise = require('./promises'); | ||
@@ -11,7 +12,6 @@ var step = require('./step'); | ||
var base = require('./base'); | ||
var asap = require('./asap'); | ||
module.exports = Stream; | ||
var Promise = promise.Promise; | ||
//var Promise = promise.Promise; | ||
var when = promise.when; | ||
@@ -32,2 +32,20 @@ var neverPromise = promise.never; | ||
Stream.getDefaultScheduler = getDefaultScheduler; | ||
Stream.setDefaultScheduler = setDefaultScheduler; | ||
var defaultScheduler; | ||
function getDefaultScheduler() { | ||
if(defaultScheduler === void 0) { | ||
defaultScheduler = new Scheduler(); | ||
} | ||
return defaultScheduler; | ||
} | ||
function setDefaultScheduler(scheduler) { | ||
if(scheduler != null) { | ||
defaultScheduler = scheduler; | ||
} | ||
} | ||
/** | ||
@@ -41,7 +59,10 @@ * Stream that generates items by repeatedly calling the provided | ||
* @param {*} state initial state | ||
* @param {Scheduler=} scheduler | ||
* @constructor | ||
*/ | ||
function Stream(step, state) { | ||
function Stream(step, state, scheduler, dispose) { | ||
this.step = step; | ||
this.state = state; | ||
this.scheduler = scheduler === void 0 ? getDefaultScheduler() : scheduler; | ||
this.dispose = typeof dispose === 'function' ? dispose : returnEndValue; | ||
} | ||
@@ -73,3 +94,3 @@ | ||
Stream.from = function(iterable) { | ||
return new Stream(iterableHead, iterableFrom(iterable)); | ||
return new Stream(iterableHead, scheduledIterable(iterable)); | ||
}; | ||
@@ -86,28 +107,22 @@ | ||
/** | ||
* Observe all items in the stream | ||
* @param {function(*):undefined|Promise} f function which will be called | ||
* for each item in the stream. It may return a promise to exert a simple | ||
* form of back pressure: f is guaranteed not to receive the next item in | ||
* the stream before the promise fulfills. Returning a non-promise has no | ||
* effect on back pressure | ||
* @returns {Promise} promise that fulfills after all items have been observed, | ||
* and the stream has ended. | ||
* @param {function(state:*):Step} step | ||
* @param {*} state | ||
* @returns {Stream} new stream with the supplied stepper and state, which shares | ||
* this stream's scheduler and dispose function | ||
*/ | ||
Stream.prototype.forEach = Stream.prototype.observe = function(f) { | ||
return asap(runStream, f, this.step, this.state); | ||
Stream.prototype.beget = function(step, state) { | ||
return new Stream(step, state, this.scheduler, this.dispose); | ||
}; | ||
function runStream(f, stepper, state) { | ||
return when(function (s) { | ||
if (s.done) { | ||
return s.value; | ||
} | ||
/** | ||
* @param {function(state:*):Step} step | ||
* @param {*} state | ||
* @param {function(s:Stream, endValue:*, end:End):*} dispose | ||
* @returns {Stream} new stream with the supplied stepper, state, and dispose | ||
* function, which shares this stream's scheduler | ||
*/ | ||
Stream.prototype.begetWithDispose = function(step, state, dispose) { | ||
return new Stream(step, state, this.scheduler, dispose); | ||
}; | ||
return when(function (x) { | ||
return x instanceof End ? x.value | ||
: runStream(f, stepper, s.state); | ||
}, f(s.value)); | ||
}, when(stepper, state)); | ||
} | ||
/** | ||
@@ -117,3 +132,3 @@ * @returns {Promise} a promise for the first item in the stream | ||
Stream.prototype.head = function() { | ||
return when(getValueOrFail, Promise.resolve(streamNext(this))); | ||
return Promise.resolve(streamNext(this)).then(getValueOrFail); | ||
}; | ||
@@ -125,3 +140,3 @@ | ||
Stream.prototype.tail = function() { | ||
return new Stream(this.step, when(getState, streamNext(this))); | ||
return this.beget(this.step, when(getState, streamNext(this))); | ||
}; | ||
@@ -136,3 +151,12 @@ | ||
function once(x) { | ||
return new Yield(x, new End()); | ||
var t = getDefaultScheduler().now(); | ||
return new Yield(t, x, new End(t)); | ||
} | ||
function scheduledIterable(iterable) { | ||
return iterableFrom(getDefaultScheduler(), iterable); | ||
} | ||
function returnEndValue(t, x) { | ||
return x; | ||
} |
110
most.js
@@ -9,2 +9,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */ | ||
var cons = base.cons; | ||
var tail = base.tail; | ||
@@ -31,3 +32,3 @@ /** | ||
exports.repeat = repeat; | ||
exports.cons = exports.startsWith = consStream; | ||
exports.cons = exports.startWith = consStream; | ||
@@ -61,11 +62,12 @@ /** | ||
// Creating | ||
// EXPERIMENTAL: API may change | ||
var create = require('./lib/combinators/create'); | ||
var create = require('./lib/source/create'); | ||
/** | ||
* Create a stream by calling producer with functions for adding items to | ||
* the stream and for ending the stream. | ||
* @param {function(add:function(x:*), end:function(error?:Error))} producer | ||
* @returns {Stream} | ||
* Create a stream by imperatively pushing events. | ||
* @param {function(add:function(x), end:function(e)):function} run function | ||
* that will receive 2 functions as arguments, the first to add new values to the | ||
* stream and the second to end the stream. It may *return* a function that | ||
* will be called once all consumers have stopped observing the stream. | ||
* @returns {Stream} stream containing all events added by run before end | ||
*/ | ||
@@ -75,2 +77,36 @@ exports.create = create.create; | ||
//----------------------------------------------------------------------- | ||
// Adapting other sources | ||
var events = require('./lib/source/fromEvent'); | ||
/** | ||
* Create a stream of events from the supplied EventTarget or EventEmitter | ||
* @param {String} event event name | ||
* @param {EventTarget|EventEmitter} source EventTarget or EventEmitter. The source | ||
* must support either addEventListener/removeEventListener (w3c EventTarget: | ||
* http://www.w3.org/TR/DOM-Level-2-Events/events.html#Events-EventTarget), | ||
* or addListener/removeListener (node EventEmitter: http://nodejs.org/api/events.html) | ||
* @returns {Stream} stream of events of the specified type from the source | ||
*/ | ||
exports.fromEvent = events.fromEvent; | ||
//----------------------------------------------------------------------- | ||
// Observing | ||
var observing = require('./lib/combinators/observe'); | ||
var observe = observing.observe; | ||
var observeUntil = observing.observeUntil; | ||
exports.forEach = exports.observe = observe; | ||
exports.forEachUntil = exports.observeUntil = observeUntil; | ||
/** | ||
* Process all the events in the stream | ||
* @type {Function} | ||
*/ | ||
Stream.prototype.forEach = Stream.prototype.observe = function(f, signal) { | ||
return arguments.length < 2 ? observe(f, this) : observeUntil(f, signal, this); | ||
}; | ||
//----------------------------------------------------------------------- | ||
// Transforming | ||
@@ -88,6 +124,6 @@ | ||
exports.ap = ap; | ||
exports.flatMap = flatMap; | ||
exports.flatMap = exports.chain = flatMap; | ||
exports.flatten = flatten; | ||
exports.scan = scan; | ||
exports.tap = tap; | ||
exports.scan = scan; | ||
exports.tap = tap; | ||
@@ -166,2 +202,3 @@ /** | ||
var filterStream = filter.filter; | ||
var takeUntil = filter.takeUntil; | ||
var take = filter.take; | ||
@@ -173,2 +210,3 @@ var takeWhile = filter.takeWhile; | ||
exports.filter = filterStream; | ||
exports.takeUntil = takeUntil; | ||
exports.take = take; | ||
@@ -181,2 +219,4 @@ exports.takeWhile = takeWhile; | ||
* Retain only items matching a predicate | ||
* stream: -12345678- | ||
* filter(x => x % 2 === 0, stream): --2-4-6-8- | ||
* @param {function(x:*):boolean} p filtering predicate called for each item | ||
@@ -190,3 +230,19 @@ * @returns {Stream} stream containing only items for which predicate returns truthy | ||
/** | ||
* @param {Number} n | ||
* stream: -a-b-c-d-e-f-g- | ||
* signal: -------x | ||
* takeUntil(signal, stream): -a-b-c- | ||
* @param {Stream} signal retain only events in stream before the first | ||
* event in signal | ||
* @param {Stream} stream events to retain | ||
* @returns {Stream} new stream containing only events that occur before | ||
* the first event in signal. | ||
*/ | ||
Stream.prototype.takeUntil = function(signal) { | ||
return takeUntil(signal, this); | ||
}; | ||
/** | ||
* stream: -abcd- | ||
* take(2, stream): -ab | ||
* @param {Number} n take up to this many events | ||
* @returns {Stream} stream containing at most the first n items from this stream | ||
@@ -199,2 +255,4 @@ */ | ||
/** | ||
* stream: -123451234- | ||
* takeWhile(x => x < 5, stream): -1234 | ||
* @param {function(x:*):boolean} p | ||
@@ -209,3 +267,5 @@ * @returns {Stream} stream containing items up to, but not including, the | ||
/** | ||
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b] | ||
* Remove adjacent duplicates | ||
* stream: -abbcd- | ||
* distinct(stream): -ab-cd- | ||
* @param {?function(a:*, b:*):boolean} equals optional function to compare items. | ||
@@ -289,3 +349,3 @@ * @returns {Stream} stream with no adjacent duplicates | ||
Stream.prototype.zipWith = function(f /*,...ss*/) { | ||
return zipArrayWith(f, cons(this, arguments)); | ||
return zipArrayWith(f, cons(this, tail(arguments))); | ||
}; | ||
@@ -395,1 +455,25 @@ | ||
}; | ||
//----------------------------------------------------------------------- | ||
// Error handling | ||
var error = require('./lib/combinators/error'); | ||
var flatMapError = error.flatMapError; | ||
var throwError = error.throwError; | ||
exports.flatMapError = flatMapError; | ||
exports.throwError = throwError; | ||
/** | ||
* If this stream encounters an error, recover and continue with items from stream | ||
* returned by f. | ||
* stream: -a-b-c-X- | ||
* f(X): d-e-f-g- | ||
* flatMapError(f, stream): -a-b-c-d-e-f-g- | ||
* @param {function(error:*):Stream} f function which returns a new stream | ||
* @returns {Stream} new stream which will recover from an error by calling f | ||
*/ | ||
Stream.prototype.flatMapError = function(f) { | ||
return flatMapError(f, this); | ||
}; |
{ | ||
"name": "most", | ||
"version": "0.6.2", | ||
"version": "0.7.0", | ||
"description": "Monadic streams", | ||
@@ -5,0 +5,0 @@ "main": "most.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
76932
31
2316