Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

most

Package Overview
Dependencies
Maintainers
1
Versions
78
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

most - npm Package Compare versions

Comparing version 0.6.2 to 0.7.0

lib/combinators/error.js

3

bower.json
{
"name": "most",
"main": "most.js",
"version": "0.6.2",
"version": "0.7.0",
"homepage": "https://github.com/cujojs/most",

@@ -26,2 +26,3 @@ "authors": [

"bower_components",
"examples",
"test",

@@ -28,0 +29,0 @@ "tests",

@@ -1,3 +0,4 @@

var Promise = require('./promises').Promise;
var resolve = require('./promises').Promise.resolve;
var tail = require('./base').tail;
var dispatch = require('./dispatch');

@@ -11,17 +12,5 @@ /**

module.exports = function asap(f /*,...args*/) {
return Promise.resolve(tail(arguments)).then(function(args) {
return resolve(tail(arguments)).then(function(args) {
return dispatch(f, args);
});
};
function dispatch(f, args) {
/*jshint maxcomplexity:6*/
switch(args.length) {
case 0: return f();
case 1: return f(args[0]);
case 2: return f(args[0], args[1]);
case 3: return f(args[0], args[1], args[2]);
case 4: return f(args[0], args[1], args[2], args[3]);
default: return f.apply(void 0, args);
}
}

@@ -10,3 +10,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

exports.copy = copy;
exports.map = map;
exports.replace = replace;
exports.findIndex = findIndex;

@@ -45,2 +47,11 @@ function identity(x) {

function map(f, array) {
var l = array.length;
var a = new Array(l);
for(var i=0; i<l; ++i) {
a[i] = f(array[i]);
}
return a;
}
function replace(x, i, array) {

@@ -53,2 +64,11 @@ var l = array.length;

return a;
}
}
function findIndex(p, a) {
for(var i= 0, l= a.length; i<l; ++i) {
if(p(a[i])) {
return i;
}
}
return -1;
}

@@ -7,4 +7,10 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var identity = require('../base').identity;
var Yield = Stream.Yield;
exports.unfold = unfold;
exports.iterate = iterate;
exports.repeat = repeat;
/**

@@ -16,5 +22,5 @@ * Build a stream by unfolding steps from a seed value

*/
exports.unfold = function(f, x) {
function unfold(f, x) {
return new Stream(f, x);
};
}

@@ -27,7 +33,8 @@ /**

*/
exports.iterate = function(f, x) {
function iterate(f, x) {
var scheduler = Stream.getDefaultScheduler();
return new Stream(function(x) {
return new Yield(x, f(x));
return new Yield(scheduler.now(), x, f(x));
}, x);
};
}

@@ -39,8 +46,4 @@ /**

*/
exports.repeat = function(x) {
return new Stream(repeat, x);
};
function repeat(x) {
return new Yield(x, x);
return iterate(identity, x);
}

@@ -6,3 +6,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var promise = require('../promises');

@@ -12,2 +11,5 @@ var step = require('../step');

var when = promise.when;
var raceIndex = promise.raceIndex;
var Yield = step.Yield;
var End = step.End;

@@ -20,2 +22,3 @@ var Pair = step.Pair;

exports.filter = filter;
exports.takeUntil = takeUntil;
exports.take = take;

@@ -28,3 +31,6 @@ exports.takeWhile = takeWhile;

* Retain only items matching a predicate
* stream: -12345678-
* filter(x => x % 2 === 0, stream): --2-4-6-8-
* @param {function(x:*):boolean} p filtering predicate called for each item
* @param {Stream} stream stream to filter
* @returns {Stream} stream containing only items for which predicate returns truthy

@@ -34,3 +40,3 @@ */

var stepper = stream.step;
return new Stream(function(state) {
return stream.beget(function(state) {
return filterNext(p, stepper, state);

@@ -48,3 +54,6 @@ }, stream.state);

/**
* stream: -abcd-
* take(2, stream): -ab
* @param {function(x:*):boolean} p
* @param {Stream} stream stream from which to take
* @returns {Stream} stream containing items up to, but not including, the

@@ -55,6 +64,6 @@ * first item for which p returns falsy.

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
return when(function (i) {
return i.done || p(i.value) ? i
: new End();
: new End(i.time, i.value, s.state);
}, when(stepper, s));

@@ -65,3 +74,6 @@ }, stream.state);

/**
* stream: -123451234-
* takeWhile(x => x < 5, stream): -1234
* @param {Number} n
* @param {Stream} stream stream from which to take
* @returns {Stream} stream containing at most the first n items from this stream

@@ -71,18 +83,80 @@ */

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
if(s.value === 0) {
return new End();
return new End(s.time, s.value, s.state);
}
return when(function (i) {
return i.done ? i : yieldPair(i, s.value - 1);
return i.done ? i
: i.withState(new Yield(i.time, s.value - 1, i.state));
}, when(stepper, s.state));
}, new Pair(Math.max(0, n), stream.state));
}, new Yield(stream.scheduler.now(), n|0, stream.state));
}
/**
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b]
* @param {Stream} stream
* @returns {Stream} stream with no adjacent duplicates using === to
* recognize duplicates
* stream: -a-b-c-d-e-f-g
* signal: -------x
* takeUntil(signal, stream): -a-b-c-
* @param {Stream} signal retain only events in stream before the first
* event in signal
* @param {Stream} stream events to retain
* @returns {Stream} new stream containing only events that occur before
* the first event in signal.
*/
function takeUntil(signal, stream) {
return stream.begetWithDispose(stepTakeUntil, initTakeUntil(signal, stream),
disposeTakeUntil);
}
function stepTakeUntil(s) {
return s.time === Infinity ? stepTakeUntilSignal(s) : stepTakeUntilTime(s);
}
function stepTakeUntilTime(s) {
return when(function(i) {
return i.time < s.time ? i.withState(updateTakeUntilState(s, i.state))
: new End(s.time, i.value, s);
}, when(s.stream.step, s.state));
}
function stepTakeUntilSignal (s) {
return raceIndex(function (i, index) {
return index === 0 ? stepTakeUntilTime(updateTakeUntilTime(s, i.time))
: i.withState(updateTakeUntilState(s, i.state));
}, [getSignal(s), when(s.stream.step, s.state)]);
}
function disposeTakeUntil(t, x, s) {
return s.stream.dispose(t, x, s.state);
}
function getSignal (s) {
return s.until === void 0 ? when(s.signal.step, s.signal.state) : s.until;
}
function initTakeUntil (signal, stream) {
return new TakeUntil(void 0, Infinity, signal, stream, stream.state);
}
function updateTakeUntilState(s, newState) {
return new TakeUntil(s.until, s.time, s.signal, s.stream, newState);
}
function updateTakeUntilTime(s, t) {
return new TakeUntil(void 0, t, void 0, s.stream, s.state);
}
function TakeUntil(until, time, signal, stream, state) {
this.until = until, this.time = time, this.signal = signal;
this.stream = stream; this.state = state;
}
/**
* Remove adjacent duplicates, using === to detect duplicates
* stream: -abbcd-
* distinct(stream): -ab-cd-
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.
* @returns {Stream} stream with no adjacent duplicates
*/
function distinct(stream) {

@@ -93,5 +167,7 @@ return distinctBy(same, stream);

/**
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b]
* Remove adjacent duplicates using the provided equals function to detect duplicates
* stream: -abbcd-
* distinct(stream): -ab-cd-
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.
* @param {Stream} stream
* @param {Stream} stream stream from which to omit adjacent duplicates
* @returns {Stream} stream with no adjacent duplicates

@@ -101,3 +177,3 @@ */

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
return stepDistinct(equals, stepper, s);

@@ -104,0 +180,0 @@ }, new Pair(init, stream.state));

@@ -7,12 +7,16 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var reduce = require('../combinators/reduce').reduce;
var promise = require('../promises');
var step = require('../step');
var base = require('../base');
var replace = base.replace;
var copy = base.copy;
var map = base.map;
var raceIndex = promise.raceIndex;
var all = promise.Promise.all;
var when = promise.when;
var Yield = step.Yield;
var End = step.End;
var slice = Array.prototype.slice;
exports.merge = merge;

@@ -23,3 +27,3 @@ exports.mergeArray = mergeArray;

/**
* @returns {Stream} observable containing events from all observables in the argument
* @returns {Stream} stream containing events from all streams in the argument
* list in time order. If two events are simultaneous they will be merged in

@@ -29,22 +33,24 @@ * arbitrary order.

function merge(/*...observables*/) {
return mergeArray(slice.call(arguments));
return mergeArray(copy(arguments));
}
/**
* @param {Array} observables array of observables to merge
* @returns {Stream} observable containing events from all input observables
* @param {Array} streams array of stream to merge
* @returns {Stream} stream containing events from all input observables
* in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
function mergeArray(observables) {
return new Stream(stepMerge, observables.map(initStep));
function mergeArray(streams) {
return new Stream(stepMerge, map(initStep, streams), void 0, disposeRemaining);
}
/**
* @param {Stream} observableOfObservables observable of observables
* @returns {Stream} observable containing events from all observables in the
* @param {Stream} streamOfStreams stream of streams to merge
* @returns {Stream} stream containing events from all observables in the
* input in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
function mergeAll(observableOfObservables) {
return Stream.fromPromise(toArray(observableOfObservables)).flatMap(mergeArray);
function mergeAll(streamOfStreams) {
// TODO: implement a solution that doesn't involve converting to an array first
// We should be able to merge an infinite stream of streams.
return Stream.fromPromise(toArray(streamOfStreams)).flatMap(mergeArray);
}

@@ -54,3 +60,3 @@

if(s.length === 0) {
return new End();
return new End(0, void 0, []);
}

@@ -62,5 +68,5 @@

function stepAll (s) {
return s.map(function (s) {
return map(function (s) {
return stepPair(s.stream);
});
}, s);
}

@@ -70,15 +76,24 @@

return raceIndex(function(i, index) {
return handleStep(s, i, index);
}, s.map(getI));
return handleStep(i, index, s);
}, map(getIteration, s));
}
function handleStep(s, i, index) {
return i.done ? stepMerge(without(index, s))
: new Yield(i.value, stepAtIndex(s, i, index));
function handleStep(i, index, s) {
if(i.done) {
var sp = without(i, index, s);
if(s.length === 1) {
return when(function(s) {
return i.withState(s);
}, sp);
}
return when(stepMerge, sp);
}
return i.withState(stepAtIndex(i, index, s));
}
function stepAtIndex(s, i, index) {
return s.map(function (sn, j) {
return j === index ? stepPair(new Stream(sn.stream.step, i.state)) : sn;
});
function stepAtIndex(i, index, s) {
var sn = s[index];
return replace(stepPair(sn.stream.beget(sn.stream.step, i.state)), index, s);
}

@@ -94,17 +109,26 @@

function getI(s) {
function getIteration(s) {
return s.i;
}
function without(i, a) {
return a.filter(function(x, ai) {
return i !== ai;
});
function disposeRemaining(t, x, remaining) {
return all(map(function(s) {
return s.stream.dispose(t, x, s.stream.state);
}, remaining));
}
function without(step, index, arr) {
var stream = arr[index].stream;
return when(function() {
return arr.filter(function(x, ai) {
return index !== ai;
});
}, stream.dispose(step.time, step.value, step.state));
}
function toArray (observableOfObservables) {
return observableOfObservables.reduce(function (a, obs) {
return reduce(function (a, obs) {
a.push(obs);
return a;
}, []);
}, [], observableOfObservables);
}

@@ -11,3 +11,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Yield = step.Yield;
var End = step.End;

@@ -22,3 +21,3 @@

function empty() {
return new Stream(identity, new End());
return new Stream(identity, new End(Stream.getDefaultScheduler().now()));
}

@@ -33,13 +32,3 @@

function concat(left, right) {
return flatMap(identity, new Stream(identity, two(left, right)));
return flatMap(identity, Stream.from([left, right]));
}
/**
* Create a Step that yields a then b
* @param {*} a
* @param {*} b
* @returns {Yield}
*/
function two(a, b) {
return new Yield(a, new Yield(b, new End()));
}

@@ -1,6 +0,10 @@

var promise = require('../promises');
/** @license MIT License (c) copyright 2010-2014 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/** @module */
var asap = require('../asap');
var runStream = require('../runStream');
var when = require('../promises').when;
var when = promise.when;
exports.reduce = reduce;

@@ -19,3 +23,3 @@ exports.reduce1 = reduce1;

function reduce(f, z, stream) {
return asap(reduceStep, f, z, stream.step, stream.state);
return asap(runStream, f, z, stream, stream.state, disposeReduce);
}

@@ -34,11 +38,11 @@

return stream.head().then(function(z) {
return reduce(f, z, stream.tail());
var tail = stream.tail();
return runStream(f, z, tail, tail.state, disposeReduce);
});
}
function reduceStep(f, z, stepper, state) {
return when(function (i) {
return i.done ? z
: reduceStep(f, f(z, i.value), stepper, i.state);
}, when(stepper, state));
function disposeReduce(stream, z, i) {
return when(function() {
return z;
}, stream.dispose(i.time, i.value, i.state));
}

@@ -24,3 +24,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

function switchLatest(stream) {
return new Stream(stepSwitch, initState(stream));
return stream.beget(stepSwitch, initState(stream));
}

@@ -34,7 +34,7 @@

return raceIndex(function(i, index) {
return doSwitchNext(s, i, index);
return doSwitchNext(i, index, s);
}, s.current);
}
function doSwitchNext(s, i, index) {
function doSwitchNext(i, index, s) {
/*jshint maxcomplexity:7*/

@@ -62,3 +62,3 @@ var never = Stream.never();

// Inner not done, yield latest value
return new Yield(i.value, updateInner(new Stream(s.inner.step, i.state), s));
return new Yield(i.time, i.value, updateInner(s.inner.beget(s.inner.step, i.state), s));
}

@@ -82,4 +82,11 @@

function end(outer, endStep) {
return { outer: outer, inner: Stream.never(), current: [endStep, endStep] };
}
function awaitNextOuter(outer, i) {
return switchNext(updateBoth(new Stream(outer.step, i.state), i.value));
var next = i.done ? end(outer, i)
: updateBoth(outer.beget(outer.step, i.state), i.value);
return switchNext(next);
}

@@ -96,1 +103,2 @@

}

@@ -7,3 +7,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var Scheduler = require('../Scheduler');
var promise = require('../promises');

@@ -19,3 +18,2 @@ var step = require('../step');

var yieldPair = step.yieldPair;
var ensureScheduler = Scheduler.ensure;

@@ -30,2 +28,3 @@ exports.periodic = periodic;

exports.throttleOn = throttleOn;
exports.sync = sync;

@@ -38,3 +37,3 @@ /**

function periodic(period) {
return periodicOn(Scheduler.getDefault(), period);
return periodicOn(Stream.getDefaultScheduler(), period);
}

@@ -50,10 +49,60 @@

function periodicOn(scheduler, period) {
return new Stream(function(p) {
var now = p.state.now();
return delayed(p.value, new Yield(now, p), p.state);
}, new Pair(Math.max(1, period), ensureScheduler(scheduler)));
var stream = new Stream(function (s) {
return new Yield(s, s, s + period);
}, scheduler.now(), scheduler);
return skipPast(Math.ceil(period * 0.01), sync(stream));
}
/**
* Skip all events that are in the past, as determined by the supplied
* stream's scheduler
* @private
* @param {Number} epsilon account for timer resolution (or lack thereof)
* @param stream
* @returns {Stream}
*/
function skipPast(epsilon, stream) {
return stream.beget(function(s) {
return stepSkipPast(epsilon, stream.scheduler, stream.step, s);
}, stream.state);
}
function stepSkipPast (epsilon, scheduler, stepper, s) {
return when(function (i) {
return i.time + epsilon >= scheduler.now() ? i
: stepSkipPast(epsilon, scheduler, stepper, i.state);
}, Promise.resolve(s).then(stepper));
}
/**
* Synchronize a stream's items with its scheduler, ensuring that
* items are emitted only at their specified time
* @private
* @param {Stream} stream stream to synchronize
* @returns {Stream} new stream whose items are synchronized to the
* stream's scheduler
*/
function sync(stream) {
return stream.beget(makeSyncStepper(stream.scheduler, stream.step), stream.state);
}
function makeSyncStepper(scheduler, stepper) {
return function (s) {
return syncStep(scheduler, stepper, s);
};
}
function syncStep (scheduler, stepper, s) {
return when(function (i) {
return getSyncStep(scheduler, i);
}, when(stepper, s));
}
function getSyncStep (scheduler, i) {
var now = scheduler.now();
return now < i.time ? delayed(i.time - now, i, scheduler) : i;
}
/**
* @param {Number} delayTime milliseconds to delay each item using

@@ -65,3 +114,3 @@ * the provided scheduler

function delay(delayTime, stream) {
return delayOn(Scheduler.getDefault(), delayTime, stream);
return delayOn(stream.scheduler, delayTime, stream);
}

@@ -76,11 +125,15 @@

function delayOn(scheduler, delayTime, stream) {
var stepDelay = delayStep(delayTime);
var stepper = stream.step;
return new Stream(function(s) {
return when(function(i) {
return i.done ? i
: delayed(s.value, yieldPair(i, s.value), scheduler);
}, when(stepper, s.state));
}, new Pair(Math.max(0, delayTime), stream.state));
return stream.beget(makeSyncStepper(scheduler, function(s) {
return when(stepDelay, when(stepper, s));
}), stream.state);
}
function delayStep(dt) {
return function(i) {
return i.delay(dt);
};
}
/**

@@ -95,3 +148,3 @@ * Limit the rate of events

function throttle(period, stream) {
return throttleOn(Scheduler.getDefault(), period, stream);
return throttleOn(stream.scheduler, period, stream);
}

@@ -112,3 +165,3 @@

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
return throttleNext(stepper, s, period, scheduler);

@@ -121,9 +174,7 @@ }, new Pair(-1, stream.state));

if(i.done) {
return i;
return i.withState(s.state);
}
var now = scheduler.now();
var end = s.value;
return now > end ? yieldPair(i, now + period)
: throttleNext(stepper, new Pair(end, i.state), period, scheduler);
return i.time > s.value ? yieldPair(i, i.time + period)
: throttleNext(stepper, new Pair(s.value, i.state), period, scheduler);
}, when(stepper, s.state));

@@ -142,3 +193,3 @@ }

function debounce(period, stream) {
return debounceOn(Scheduler.getDefault(), period, stream);
return debounceOn(stream.scheduler, period, stream);
}

@@ -158,3 +209,3 @@

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
return stepDebounce(scheduler, period, stepper, s);

@@ -171,3 +222,3 @@ }, { timer: void 0, prev: void 0, next: void 0, state: stream.state });

return index === 0 ? yieldDebounced(s)
: winner.done ? winner
: winner.done ? winner.withState(s.state)
: stepEarliest(scheduler, period, step, nextState(scheduler, period, step, winner));

@@ -182,3 +233,3 @@ }, [s.timer, s.next]);

function nextState (scheduler, period, step, winner) {
return { timer: delayed(period, 'timer', scheduler), prev: winner,
return { timer: delayed(period, new Yield(scheduler.now() + period, 'timer'), scheduler), prev: winner,
next: when(step, winner.state), state: winner.state };

@@ -189,5 +240,12 @@ }

return when(function (prev) {
return new Yield(prev.value, { timer: never(), prev: void 0,
return new Yield(prev.time, prev.value, { timer: never(), prev: void 0,
next: s.next, state: prev.state });
}, s.prev);
}
}
function ensureScheduler(scheduler) {
if(typeof scheduler === 'undefined') {
return Stream.getDefaultScheduler();
}
return scheduler;
}

@@ -6,3 +6,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var promise = require('../promises');

@@ -29,3 +28,3 @@ var step = require('../step');

var stepper = stream.step;
return new Stream(function (state) {
return stream.beget(function (state) {
return mapNext(f, stepper, state);

@@ -50,3 +49,3 @@ }, stream.state);

var stepper = stream.step;
return new Stream(function (state) {
return stream.beget(function (state) {
return tapNext(f, stepper, state);

@@ -87,3 +86,3 @@ }, stream.state);

var stepper = stream.step;
return new Stream(function(s) {
return stream.beget(function(s) {
return stepScan(f, stepper, s);

@@ -96,7 +95,7 @@ }, new Pair(initial, stream.state));

if (i.done) {
return i;
return i.withState(s.state);
}
var value = f(s.value, i.value);
return new Yield(value, new Pair(value, i.state));
return new Yield(i.time, value, new Pair(value, i.state));
}, when(stepper, s.state));

@@ -113,3 +112,3 @@ }

function flatMap(f, stream) {
return new Stream(stepChain, new Outer(f, stream));
return stream.begetWithDispose(stepChain, new Outer(f, stream), disposeInnerThenOuter);

@@ -121,2 +120,15 @@ function stepChain(s) {

function disposeInnerThenOuter(t, x, s) {
if(s === void 0) {
return x;
}
return when(function disposeOuter() {
return disposeStream(t, x, s.outer);
}, disposeStream(t, x, s.inner));
}
function disposeStream(t, x, stream) {
return stream !== void 0 ? stream.dispose(t, x, stream.state) : x;
}
// flatMap outer/inner "loop" helpers

@@ -135,3 +147,3 @@

return i.done ? i
: stepInner(stepNext, f, new Stream(outer.step, i.state), f(i.value));
: stepInner(stepNext, f, outer.beget(outer.step, i.state), f(i.value));
}, Promise.resolve(streamNext(outer)));

@@ -150,4 +162,8 @@ }

return when(function(ii) {
return ii.done ? stepOuter(stepNext, f, outer)
: new Yield(ii.value, new Inner(f, outer, new Stream(inner.step, ii.state)));
if(ii.done) {
return when(function() {
return stepOuter(stepNext, f, outer);
}, inner.dispose(ii.time, ii.value, ii.state));
}
return ii.withState(new Inner(f, outer, inner.beget(inner.step, ii.state)));
}, Promise.resolve(streamNext(inner)));

@@ -154,0 +170,0 @@ }

@@ -10,8 +10,8 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var base = require('../base');
var dispatch = require('../dispatch');
var when = promise.when;
var Promise = promise.Promise;
var all = promise.Promise.all;
var isDone = step.isDone;
var getValue = step.getValue;
var Yield = step.Yield;

@@ -22,2 +22,4 @@ var End = step.End;

var copy = base.copy;
var map = base.map;
var findIndex = base.findIndex;

@@ -30,7 +32,7 @@ exports.zipWith = zipWith;

/**
* Combine events from all observables using f
* Combine events from all streams using f
* @param {function(a:Stream, b:Stream, ...):*} f function to combine items
* @returns {Stream} observable containing
*/
function zipWith(f /*,...observables*/) {
function zipWith(f /*,...streams*/) {
return zipArrayWith(f, tail(arguments));

@@ -40,6 +42,6 @@ }

/**
* Combine events from all observables by collecting them into an array
* Combine events from all streams by collecting them into an array
* @returns {*}
*/
function zip(/*...observables*/) {
function zip(/*...streams*/) {
return zipArrayWith(Array, copy(arguments));

@@ -49,33 +51,64 @@ }

/**
* Combine events from all observables by collecting them into an array
* @param {Array} observables array of observables to zip
* Combine events from all streams by collecting them into an array
* @param {Array} streams array of streams to zip
* @returns {*}
*/
function zipArray(observables) {
return zipArrayWith(Array, observables);
function zipArray(streams) {
return zipArrayWith(Array, streams);
}
/**
* Combine events from all observables using f
* Combine events from all streams using f
* @param {function(a:Stream, b:Stream, ...):*} f function to combine items
* @param {Array} observables array of observables to zip
* @returns {Stream} observable containing
* @param {Array} streams array of observables to zip
* @returns {Stream} stream containing items from all input streams combined
* using f
*/
function zipArrayWith(f, observables) {
function zipArrayWith(f, streams) {
return new Stream(function(ss) {
return stepZip(f, ss);
}, observables);
}, streams, void 0, disposeAll);
}
function stepZip (f, streams) {
return Promise.all(streams.map(streamNext)).then(function (is) {
if (is.some(isDone)) {
return new End();
return all(map(streamNext, streams)).then(function (iterations) {
return handleStepZip(f, streams, iterations);
});
}
function handleStepZip(f, streams, iterations) {
var done = findIndex(isDone, iterations);
if(done < 0) {
return applyZipWith(f, streams, iterations);
}
var ended = iterations[done];
return new End(ended.time, ended.value, streams);
}
function disposeAll(t, x, streams) {
return all(map(function(stream) {
return stream.dispose(t, x, stream.state);
}, streams));
}
function applyZipWith(f, streams, iterations) {
var t = 0;
var values = new Array(iterations.length);
var states = new Array(iterations.length);
var stream, it;
for(var i=0, l=iterations.length; i<l; ++i) {
stream = streams[i];
it = iterations[i];
if(it.time > t) {
t = it.time;
}
var value = f.apply(void 0, is.map(getValue));
return new Yield(value, is.map(function (it, i) {
return new Stream(streams[i].step, it.state);
}));
});
values[i] = it.value;
states[i] = stream.beget(stream.step, it.state);
}
return new Yield(t, dispatch(f, values), states);
}

@@ -82,0 +115,0 @@

@@ -6,5 +6,8 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Promise = require('./promises').Promise;
var promise = require('./promises');
var step = require('./step');
var resolve = promise.Promise.resolve;
var when = promise.when;
var Yield = step.Yield;

@@ -15,7 +18,32 @@ var End = step.End;

exports.head = head;
exports.makeIterable = makeIterable;
exports.getIterator = getIterator;
function from(x) {
/*global Set, Symbol*/
var iteratorSymbol;
// Firefox ships a partial implementation using the name @@iterator.
// https://bugzilla.mozilla.org/show_bug.cgi?id=907077#c14
if (typeof Set === 'function' && typeof new Set()['@@iterator'] === 'function') {
iteratorSymbol = '@@iterator';
} else {
iteratorSymbol = typeof Symbol === 'function' && Symbol.iterator ||
'_es6shim_iterator_';
}
function makeIterable(makeIterator, obj) {
obj[iteratorSymbol] = makeIterator;
}
function isIterable(o) {
return typeof o[iteratorSymbol] === 'function';
}
function getIterator(o) {
return o[iteratorSymbol]();
}
function from(scheduler, x) {
/*jshint maxcomplexity:6*/
if(Array.isArray(x)) {
return new ArrayIterable(x);
return new ArrayIterable(scheduler.now, x);
}

@@ -25,11 +53,11 @@

if(typeof x !== 'function' && typeof x.length === 'number') {
return new ArrayIterable(x);
return new ArrayIterable(scheduler.now, x);
}
if(typeof x.iterator === 'function') {
return x;
if(isIterable(x)) {
return new IterableAdapter(scheduler.now, x);
}
if(typeof x.next === 'function') {
return new IterableWrapper(x);
return new IterableWrapper(new IteratorAdapter(scheduler.now(), x));
}

@@ -42,10 +70,32 @@ }

function head(iterable) {
var iterator = iterable.iterator();
var iterator = getIterator(iterable);
var iteration = iterator.next();
return Promise.resolve(iteration).then(function(iteration) {
return resolve(iteration).then(function(iteration) {
return iteration.done ? iteration
: new Yield(iteration.value, new IterableWrapper(iterator));
: new Yield(iteration.time, iteration.value, new IterableWrapper(iterator));
});
}
function IterableAdapter(now, iterable) {
this.now = now;
this.iterable = iterable;
}
makeIterable(function() {
return new IteratorAdapter(this.now(), getIterator(this.iterable));
}, IterableAdapter.prototype);
function IteratorAdapter(time, iterator) {
this.time = time;
this._iterator = iterator;
}
IteratorAdapter.prototype.next = function() {
var time = this.time;
return when(function(i) {
return i.done ? new End(time, i.value)
: new Yield(time, i.value, this);
}, this._iterator.next());
};
function IterableWrapper(iterator) {

@@ -55,15 +105,17 @@ this._iterator = iterator;

IterableWrapper.prototype.iterator = function() {
makeIterable(function() {
return this._iterator;
};
}, IterableWrapper.prototype);
function ArrayIterable(array) {
function ArrayIterable(now, array) {
this.now = now;
this.array = array;
}
ArrayIterable.prototype.iterator = function() {
return new ArrayIterator(this.array);
};
makeIterable(function() {
return new ArrayIterator(this.now(), this.array);
}, ArrayIterable.prototype);
function ArrayIterator(array) {
function ArrayIterator(time, array) {
this.time = time;
this.array = array;

@@ -75,4 +127,4 @@ this.index = 0;

return this.index < this.array.length
? new Yield(this.array[this.index++])
: new End();
? new Yield(this.time, this.array[this.index++], this)
: new End(this.time, void 0);
};

@@ -1,4 +0,4 @@

var Promise = require('when/es6-shim/Promise');
var WhenPromise = require('when/es6-shim/Promise');
exports.Promise = Promise;
exports.Promise = WhenPromise;
exports.defer = defer;

@@ -10,5 +10,3 @@ exports.delay = delay;

var neverP = Object.create(Promise.prototype);
neverP.then = never;
neverP.catch = never;
var neverP = { then: never, catch: never };

@@ -30,3 +28,3 @@ /**

var d = { promise: void 0, resolve: void 0, reject: void 0 };
d.promise = new Promise(function(resolve, reject) {
d.promise = new WhenPromise(function(resolve, reject) {
d.resolve = resolve;

@@ -80,3 +78,3 @@ d.reject = reject;

var done = false;
return new Promise(runRaceIndex);
return new WhenPromise(runRaceIndex);

@@ -90,3 +88,3 @@ function runRaceIndex(resolve, reject) {

function settleOne(resolve, reject, i, p) {
Promise.resolve(p).then(function(x) {
WhenPromise.resolve(p).then(function(x) {
if(!done) {

@@ -93,0 +91,0 @@ done = true;

@@ -8,4 +8,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var step = require('./step');
var makeIterable = require('./iterable').makeIterable;
var Promise = promise.Promise;
var reject = promise.Promise.reject;
var defer = promise.defer;

@@ -27,3 +28,6 @@

* before pulling subsequent iterations. When the producer is faster, items
* are buffered.
* are buffered according to the supplied bufferPolicy.
* @param {function(s:Step, items:Array):Array=keepAll} bufferPolicy queue buffering
* policy that will be used to manage consumer queue sizes. Defaults to keepAll,
* which allows queues to grow forever.
* @constructor

@@ -36,13 +40,23 @@ */

this._awaiting = [];
this._ended = false;
this._ended = defer();
this.ended = this._ended.promise;
this.isEnded = false;
}
Queue.prototype.iterator = function() {
Queue.disposeQueue = disposeQueue;
function disposeQueue(t, x, queue) {
return queue.end(t, x);
}
makeIterable(function() {
return new QueueIterator(this);
};
}, Queue.prototype);
Queue.prototype.add = function(x) {
checkEnd(this._ended);
Queue.prototype.add = function(t, x) {
if (this.isEnded) {
throw new Error('Queue ended');
}
var iteration = new Yield(x);
var iteration = new Yield(t, x, this);
if (this._awaiting.length === 0) {

@@ -55,7 +69,9 @@ this._items = this._bufferPolicy(iteration, this._items);

Queue.prototype.end = function(x) {
checkEnd(this._ended);
Queue.prototype.end = function(t, x) {
if(this.isEnded) {
return;
}
this._ended = true;
var end = new End(x);
this.isEnded = true;
var end = new End(t, x, this);
if(this._awaiting.length > 0) {

@@ -66,29 +82,28 @@ this._awaiting.reduce(resolveAll, end);

}
this._ended.resolve(this);
};
function QueueIterator(q) {
this.q = q;
}
QueueIterator.prototype.next = function() {
var q = this.q;
if (q._items.length > 0) {
return q._items.shift();
Queue.prototype.get = function() {
if (this._items.length > 0) {
return this._items.shift();
}
if (q._ended) {
return Promise.reject(new Error('closed'));
if (this.isEnded) {
return reject(new Error('Queue ended'));
}
var consumer = defer();
q._awaiting.push(consumer);
this._awaiting.push(consumer);
return consumer.promise;
};
function checkEnd(closed) {
if(closed) {
throw new Error('Queue end');
}
function QueueIterator(q) {
this.queue = q;
}
QueueIterator.prototype.next = function() {
return this.queue.get();
};
function resolveAll(end, consumer) {

@@ -95,0 +110,0 @@ consumer.resolve(end);

@@ -20,26 +20,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

Scheduler.defaultNow = defaultNow;
var defaultScheduler;
Scheduler.ensure = function(scheduler) {
if(typeof scheduler === 'undefined') {
return Scheduler.getDefault();
}
return scheduler;
};
Scheduler.getDefault = function() {
if(defaultScheduler === void 0) {
defaultScheduler = new Scheduler();
}
return defaultScheduler;
};
Scheduler.setDefault = function(scheduler) {
if(scheduler != null) {
defaultScheduler = scheduler;
}
};
function Scheduler(setTimer, clearTimer, now, errorHandler) {

@@ -50,3 +26,4 @@ this.now = now || defaultNow;

this._handleError = errorHandler || logAndReschedule;
this._timer = void 0;
this._timer = null;
this._nextArrival = 0;
this._tasks = [];

@@ -61,6 +38,2 @@

Scheduler.prototype = {
periodic: function(period, run, state) {
return this._schedule(period, period, run, state);
},
delayed: function(delay, run, state) {

@@ -80,7 +53,2 @@ return this._schedule(delay, -1, run, state);

shutdown: function() {
this._clearTimer(this._timer);
this._tasks.length = 0;
},
_schedule: function(delay, period, run, state) {

@@ -103,5 +71,9 @@ var now = this.now();

_schedulerNextArrival: function (nextArrival, now) {
this._nextArrival = nextArrival;
var delay = Math.max(0, nextArrival - now);
this._timer = this._setTimer(this._runReadyTasksBound, delay);
},
_scheduleNextRun: function(now) {
this._clearTimer(this._timer);
if(this._tasks.length === 0) {

@@ -111,4 +83,10 @@ return;

var nextArrival = Math.max(0, this._tasks[0].arrival - now);
this._timer = this._setTimer(this._runReadyTasksBound, nextArrival);
var nextArrival = this._tasks[0].arrival;
if(this._timer === null) {
this._schedulerNextArrival(nextArrival, now);
} else if(nextArrival < this._nextArrival) {
this._clearTimer(this._timer);
this._schedulerNextArrival(nextArrival, now);
}
},

@@ -126,11 +104,14 @@

_runReadyTasks: function() {
this._timer = null;
var now = this.now();
var tasks = [];
var tasks = this._tasks;
var reschedule = [];
while(this._tasks.length > 0 && this._tasks[0].arrival <= now) {
tasks.push(this._runTask(now, this._tasks.shift()));
while(tasks.length > 0 && tasks[0].arrival <= now) {
reschedule.push(this._runTask(now, tasks.shift()));
}
for(var i=0; i<tasks.length; ++i) {
this._scheduleNextTask(now, tasks[i]);
for(var i=0; i<reschedule.length; ++i) {
this._scheduleNextTask(now, reschedule[i]);
}

@@ -171,4 +152,4 @@

throw e;
});
}, 0);
return task;
}

@@ -19,12 +19,36 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

function Yield(x, s) {
this.done = false; this.value = x; this.state = s;
/**
* A step that yields a new value and a new state that can be used to produce
* another step
* @param {Number} t time the value became or will become available
* @param {*} x value
* @param {*} s new state
* @constructor
*/
function Yield(t, x, s) {
this.time = t; this.done = false; this.value = x; this.state = s;
}
Yield.prototype.map = function(f) {
return new Yield(f(this.value), this.state);
return new Yield(this.time, f(this.value), this.state);
};
function End(x) {
this.done = true; this.value = x; this.state = this;
Yield.prototype.delay = function(dt) {
return new Yield(this.time + dt, this.value, this.state);
};
Yield.prototype.withState = function(state) {
return new Yield(this.time, this.value, state);
};
/**
* A step that represents end of stream. The optional value is *not* in the stream,
* but rather a custom end of end of stream marker value.
* @param {Number} t end time
* @param {?*} x optional end signal value
* @param {*} s end state
* @constructor
*/
function End(t, x, s) {
this.time = t; this.done = true; this.value = x; this.state = s;
}

@@ -36,2 +60,10 @@

End.prototype.delay = function(dt) {
return new End(this.time + dt, this.value, this.state);
};
End.prototype.withState = function(state) {
return new End(this.time, this.value, state);
};
function isDone(step) {

@@ -45,6 +77,2 @@ return step.done;

function getState(step) {
return step.state;
}
function getValueOrFail(step) {

@@ -57,2 +85,12 @@ if(step.done) {

function getState(step) {
return step.state;
}
/**
* A simple value, state pair
* @param {*} x
* @param {*} s
* @constructor
*/
function Pair(x, s) {

@@ -63,3 +101,3 @@ this.value = x; this.state = s;

function yieldPair(step, x) {
return new Yield(step.value, new Pair(x, step.state));
return new Yield(step.time, step.value, new Pair(x, step.state));
}

@@ -6,2 +6,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Scheduler = require('./Scheduler');
var promise = require('./promises');

@@ -11,7 +12,6 @@ var step = require('./step');

var base = require('./base');
var asap = require('./asap');
module.exports = Stream;
var Promise = promise.Promise;
//var Promise = promise.Promise;
var when = promise.when;

@@ -32,2 +32,20 @@ var neverPromise = promise.never;

Stream.getDefaultScheduler = getDefaultScheduler;
Stream.setDefaultScheduler = setDefaultScheduler;
var defaultScheduler;
function getDefaultScheduler() {
if(defaultScheduler === void 0) {
defaultScheduler = new Scheduler();
}
return defaultScheduler;
}
function setDefaultScheduler(scheduler) {
if(scheduler != null) {
defaultScheduler = scheduler;
}
}
/**

@@ -41,7 +59,10 @@ * Stream that generates items by repeatedly calling the provided

* @param {*} state initial state
* @param {Scheduler=} scheduler
* @constructor
*/
function Stream(step, state) {
function Stream(step, state, scheduler, dispose) {
this.step = step;
this.state = state;
this.scheduler = scheduler === void 0 ? getDefaultScheduler() : scheduler;
this.dispose = typeof dispose === 'function' ? dispose : returnEndValue;
}

@@ -73,3 +94,3 @@

Stream.from = function(iterable) {
return new Stream(iterableHead, iterableFrom(iterable));
return new Stream(iterableHead, scheduledIterable(iterable));
};

@@ -86,28 +107,22 @@

/**
* Observe all items in the stream
* @param {function(*):undefined|Promise} f function which will be called
* for each item in the stream. It may return a promise to exert a simple
* form of back pressure: f is guaranteed not to receive the next item in
* the stream before the promise fulfills. Returning a non-promise has no
* effect on back pressure
* @returns {Promise} promise that fulfills after all items have been observed,
* and the stream has ended.
* @param {function(state:*):Step} step
* @param {*} state
* @returns {Stream} new stream with the supplied stepper and state, which shares
* this stream's scheduler and dispose function
*/
Stream.prototype.forEach = Stream.prototype.observe = function(f) {
return asap(runStream, f, this.step, this.state);
Stream.prototype.beget = function(step, state) {
return new Stream(step, state, this.scheduler, this.dispose);
};
function runStream(f, stepper, state) {
return when(function (s) {
if (s.done) {
return s.value;
}
/**
* @param {function(state:*):Step} step
* @param {*} state
* @param {function(s:Stream, endValue:*, end:End):*} dispose
* @returns {Stream} new stream with the supplied stepper, state, and dispose
* function, which shares this stream's scheduler
*/
Stream.prototype.begetWithDispose = function(step, state, dispose) {
return new Stream(step, state, this.scheduler, dispose);
};
return when(function (x) {
return x instanceof End ? x.value
: runStream(f, stepper, s.state);
}, f(s.value));
}, when(stepper, state));
}
/**

@@ -117,3 +132,3 @@ * @returns {Promise} a promise for the first item in the stream

Stream.prototype.head = function() {
return when(getValueOrFail, Promise.resolve(streamNext(this)));
return Promise.resolve(streamNext(this)).then(getValueOrFail);
};

@@ -125,3 +140,3 @@

Stream.prototype.tail = function() {
return new Stream(this.step, when(getState, streamNext(this)));
return this.beget(this.step, when(getState, streamNext(this)));
};

@@ -136,3 +151,12 @@

function once(x) {
return new Yield(x, new End());
var t = getDefaultScheduler().now();
return new Yield(t, x, new End(t));
}
function scheduledIterable(iterable) {
return iterableFrom(getDefaultScheduler(), iterable);
}
function returnEndValue(t, x) {
return x;
}

@@ -9,2 +9,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var cons = base.cons;
var tail = base.tail;

@@ -31,3 +32,3 @@ /**

exports.repeat = repeat;
exports.cons = exports.startsWith = consStream;
exports.cons = exports.startWith = consStream;

@@ -61,11 +62,12 @@ /**

// Creating
// EXPERIMENTAL: API may change
var create = require('./lib/combinators/create');
var create = require('./lib/source/create');
/**
* Create a stream by calling producer with functions for adding items to
* the stream and for ending the stream.
* @param {function(add:function(x:*), end:function(error?:Error))} producer
* @returns {Stream}
* Create a stream by imperatively pushing events.
* @param {function(add:function(x), end:function(e)):function} run function
* that will receive 2 functions as arguments, the first to add new values to the
* stream and the second to end the stream. It may *return* a function that
* will be called once all consumers have stopped observing the stream.
* @returns {Stream} stream containing all events added by run before end
*/

@@ -75,2 +77,36 @@ exports.create = create.create;

//-----------------------------------------------------------------------
// Adapting other sources
var events = require('./lib/source/fromEvent');
/**
* Create a stream of events from the supplied EventTarget or EventEmitter
* @param {String} event event name
* @param {EventTarget|EventEmitter} source EventTarget or EventEmitter. The source
* must support either addEventListener/removeEventListener (w3c EventTarget:
* http://www.w3.org/TR/DOM-Level-2-Events/events.html#Events-EventTarget),
* or addListener/removeListener (node EventEmitter: http://nodejs.org/api/events.html)
* @returns {Stream} stream of events of the specified type from the source
*/
exports.fromEvent = events.fromEvent;
//-----------------------------------------------------------------------
// Observing
var observing = require('./lib/combinators/observe');
var observe = observing.observe;
var observeUntil = observing.observeUntil;
exports.forEach = exports.observe = observe;
exports.forEachUntil = exports.observeUntil = observeUntil;
/**
* Process all the events in the stream
* @type {Function}
*/
Stream.prototype.forEach = Stream.prototype.observe = function(f, signal) {
return arguments.length < 2 ? observe(f, this) : observeUntil(f, signal, this);
};
//-----------------------------------------------------------------------
// Transforming

@@ -88,6 +124,6 @@

exports.ap = ap;
exports.flatMap = flatMap;
exports.flatMap = exports.chain = flatMap;
exports.flatten = flatten;
exports.scan = scan;
exports.tap = tap;
exports.scan = scan;
exports.tap = tap;

@@ -166,2 +202,3 @@ /**

var filterStream = filter.filter;
var takeUntil = filter.takeUntil;
var take = filter.take;

@@ -173,2 +210,3 @@ var takeWhile = filter.takeWhile;

exports.filter = filterStream;
exports.takeUntil = takeUntil;
exports.take = take;

@@ -181,2 +219,4 @@ exports.takeWhile = takeWhile;

* Retain only items matching a predicate
* stream: -12345678-
* filter(x => x % 2 === 0, stream): --2-4-6-8-
* @param {function(x:*):boolean} p filtering predicate called for each item

@@ -190,3 +230,19 @@ * @returns {Stream} stream containing only items for which predicate returns truthy

/**
* @param {Number} n
* stream: -a-b-c-d-e-f-g-
* signal: -------x
* takeUntil(signal, stream): -a-b-c-
* @param {Stream} signal retain only events in stream before the first
* event in signal
* @param {Stream} stream events to retain
* @returns {Stream} new stream containing only events that occur before
* the first event in signal.
*/
Stream.prototype.takeUntil = function(signal) {
return takeUntil(signal, this);
};
/**
* stream: -abcd-
* take(2, stream): -ab
* @param {Number} n take up to this many events
* @returns {Stream} stream containing at most the first n items from this stream

@@ -199,2 +255,4 @@ */

/**
* stream: -123451234-
* takeWhile(x => x < 5, stream): -1234
* @param {function(x:*):boolean} p

@@ -209,3 +267,5 @@ * @returns {Stream} stream containing items up to, but not including, the

/**
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b]
* Remove adjacent duplicates
* stream: -abbcd-
* distinct(stream): -ab-cd-
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.

@@ -289,3 +349,3 @@ * @returns {Stream} stream with no adjacent duplicates

Stream.prototype.zipWith = function(f /*,...ss*/) {
return zipArrayWith(f, cons(this, arguments));
return zipArrayWith(f, cons(this, tail(arguments)));
};

@@ -395,1 +455,25 @@

};
//-----------------------------------------------------------------------
// Error handling
var error = require('./lib/combinators/error');
var flatMapError = error.flatMapError;
var throwError = error.throwError;
exports.flatMapError = flatMapError;
exports.throwError = throwError;
/**
* If this stream encounters an error, recover and continue with items from stream
* returned by f.
* stream: -a-b-c-X-
* f(X): d-e-f-g-
* flatMapError(f, stream): -a-b-c-d-e-f-g-
* @param {function(error:*):Stream} f function which returns a new stream
* @returns {Stream} new stream which will recover from an error by calling f
*/
Stream.prototype.flatMapError = function(f) {
return flatMapError(f, this);
};
{
"name": "most",
"version": "0.6.2",
"version": "0.7.0",
"description": "Monadic streams",

@@ -5,0 +5,0 @@ "main": "most.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc