Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

most

Package Overview
Dependencies
Maintainers
1
Versions
78
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

most - npm Package Compare versions

Comparing version 0.7.0 to 0.8.0

docs/api.md

4

bower.json
{
"name": "most",
"main": "most.js",
"version": "0.7.0",
"version": "0.8.0",
"homepage": "https://github.com/cujojs/most",

@@ -32,4 +32,4 @@ "authors": [

"dependencies": {
"when": "~3.4"
"when": "~3.5"
}
}

@@ -8,5 +8,7 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

exports.cons = cons;
exports.append = append;
exports.tail = tail;
exports.copy = copy;
exports.map = map;
exports.reduce = reduce;
exports.replace = replace;

@@ -29,2 +31,13 @@ exports.findIndex = findIndex;

function append(x, a) {
var l = a.length;
var b = new Array(l+1);
for(var i=0; i<l; ++i) {
b[i] = a[i];
}
b[l] = x;
return b;
}
function tail(array) {

@@ -57,2 +70,10 @@ var l = array.length - 1;

function reduce(f, z, array) {
var r = z;
for(var i=0, l=array.length; i<l; ++i) {
r = f(r, array[i], i);
}
return r;
}
function replace(x, i, array) {

@@ -59,0 +80,0 @@ var l = array.length;

@@ -62,2 +62,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

throw e;
}
}

@@ -10,6 +10,6 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var when = promise.when;
var raceIndex = promise.raceIndex;
var Yield = step.Yield;
var End = step.End;
var unamb = step.unamb;
var Pair = step.Pair;

@@ -99,22 +99,19 @@ var yieldPair = step.yieldPair;

function takeUntil(signal, stream) {
return stream.begetWithDispose(stepTakeUntil, initTakeUntil(signal, stream),
disposeTakeUntil);
return stream.begetWithDispose(stepTakeUntil,
new TakeUntil(null, signal, stream, stream.state), disposeTakeUntil);
}
function stepTakeUntil(s) {
return s.time === Infinity ? stepTakeUntilSignal(s) : stepTakeUntilTime(s);
}
if(s.until === null) {
s = new TakeUntil(awaitSignal(s), s.signal, s.stream, s.state);
}
function stepTakeUntilTime(s) {
return when(function(i) {
return i.time < s.time ? i.withState(updateTakeUntilState(s, i.state))
: new End(s.time, i.value, s);
}, when(s.stream.step, s.state));
return unamb(function (i, index) {
return handleTakeUntil(s, i, index);
}, [s.until, when(s.stream.step, s.state)]);
}
function stepTakeUntilSignal (s) {
return raceIndex(function (i, index) {
return index === 0 ? stepTakeUntilTime(updateTakeUntilTime(s, i.time))
: i.withState(updateTakeUntilState(s, i.state));
}, [getSignal(s), when(s.stream.step, s.state)]);
function handleTakeUntil (s, i, index) {
return index === 0 ? endTakeUntil(s, i)
: i.withState(new TakeUntil(s.until, s.signal, s.stream, i.state));
}

@@ -126,24 +123,14 @@

function getSignal (s) {
return s.until === void 0 ? when(s.signal.step, s.signal.state) : s.until;
function awaitSignal(s) {
return when(s.signal.step, s.signal.state);
}
function initTakeUntil (signal, stream) {
return new TakeUntil(void 0, Infinity, signal, stream, stream.state);
function endTakeUntil(s, i) {
return new End(i.time, i.value, new TakeUntil(null, null, s.stream, s.state));
}
function updateTakeUntilState(s, newState) {
return new TakeUntil(s.until, s.time, s.signal, s.stream, newState);
function TakeUntil(until, signal, stream, state) {
this.until = until; this.signal = signal; this.stream = stream; this.state = state;
}
function updateTakeUntilTime(s, t) {
return new TakeUntil(void 0, t, void 0, s.stream, s.state);
}
function TakeUntil(until, time, signal, stream, state) {
this.until = until, this.time = time, this.signal = signal;
this.stream = stream; this.state = state;
}
/**

@@ -153,3 +140,3 @@ * Remove adjacent duplicates, using === to detect duplicates

* distinct(stream): -ab-cd-
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.
* @param {Stream} stream stream from which to omit adjacent duplicates
* @returns {Stream} stream with no adjacent duplicates

@@ -156,0 +143,0 @@ */

@@ -7,19 +7,8 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var reduce = require('../combinators/reduce').reduce;
var promise = require('../promises');
var step = require('../step');
var base = require('../base');
var empty = require('./monoid').empty;
var join = require('./join').join;
var copy = require('../base').copy;
var replace = base.replace;
var copy = base.copy;
var map = base.map;
var raceIndex = promise.raceIndex;
var all = promise.Promise.all;
var when = promise.when;
var End = step.End;
exports.merge = merge;
exports.mergeArray = mergeArray;
exports.mergeAll = mergeAll;

@@ -31,3 +20,3 @@ /**

*/
function merge(/*...observables*/) {
function merge(/*...streams*/) {
return mergeArray(copy(arguments));

@@ -43,88 +32,11 @@ }

function mergeArray(streams) {
return new Stream(stepMerge, map(initStep, streams), void 0, disposeRemaining);
}
/**
* @param {Stream} streamOfStreams stream of streams to merge
* @returns {Stream} stream containing events from all observables in the
* input in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
function mergeAll(streamOfStreams) {
// TODO: implement a solution that doesn't involve converting to an array first
// We should be able to merge an infinite stream of streams.
return Stream.fromPromise(toArray(streamOfStreams)).flatMap(mergeArray);
}
function stepMerge(s) {
if(s.length === 0) {
return new End(0, void 0, []);
if(streams.length === 0) {
return empty();
}
return stepEarliest(s[0].i === void 0 ? stepAll(s): s);
}
function stepAll (s) {
return map(function (s) {
return stepPair(s.stream);
}, s);
}
function stepEarliest(s) {
return raceIndex(function(i, index) {
return handleStep(i, index, s);
}, map(getIteration, s));
}
function handleStep(i, index, s) {
if(i.done) {
var sp = without(i, index, s);
if(s.length === 1) {
return when(function(s) {
return i.withState(s);
}, sp);
}
return when(stepMerge, sp);
if(streams.length === 1) {
return streams[0];
}
return i.withState(stepAtIndex(i, index, s));
return join(Stream.from(streams));
}
function stepAtIndex(i, index, s) {
var sn = s[index];
return replace(stepPair(sn.stream.beget(sn.stream.step, i.state)), index, s);
}
function stepPair(stream) {
return { stream: stream, i: when(stream.step, stream.state) };
}
function initStep(s) {
return { stream: s, i: void 0 };
}
function getIteration(s) {
return s.i;
}
function disposeRemaining(t, x, remaining) {
return all(map(function(s) {
return s.stream.dispose(t, x, s.stream.state);
}, remaining));
}
function without(step, index, arr) {
var stream = arr[index].stream;
return when(function() {
return arr.filter(function(x, ai) {
return index !== ai;
});
}, stream.dispose(step.time, step.value, step.state));
}
function toArray (observableOfObservables) {
return reduce(function (a, obs) {
a.push(obs);
return a;
}, [], observableOfObservables);
}

@@ -7,5 +7,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var Stream = require('../Stream');
var flatMap = require('./transform').flatMap;
var step = require('../step');
var identity = require('../base').identity;
var when = require('../promises').when;

@@ -31,3 +31,29 @@ var End = step.End;

function concat(left, right) {
return flatMap(identity, Stream.from([left, right]));
return new Stream(stepConcat, { stream: left, state: left.state, tail: right }, void 0, disposeCurrent);
}
function stepConcat(s) {
return when(function(i) {
return handleStep(s, i);
}, s.stream.step(s.state));
}
function handleStep(s, i) {
if(i.done) {
return when(function() {
return yieldTailOrEnd(s, i);
}, s.stream.dispose(i.time, i.value, i.state));
}
return i.withState({ stream: s.stream, state: i.state, tail: s.tail });
}
function yieldTailOrEnd(s, i) {
var tail = s.tail;
return tail === null ? i.withState({ stream: s.stream, state: i.state, tail: null }) :
stepConcat({ stream: tail, state: tail.state, tail: null });
}
function disposeCurrent(t, x, s) {
return s.stream.dispose(t, x, s.state);
}

@@ -8,9 +8,6 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var promise = require('../promises');
var step = require('../step');
var unamb = require('../step').unamb;
var raceIndex = promise.raceIndex;
var when = promise.when;
var Yield = step.Yield;
exports.switch = switchLatest;

@@ -25,11 +22,11 @@

function switchLatest(stream) {
return stream.beget(stepSwitch, initState(stream));
return stream.begetWithDispose(stepSwitch, initState(stream), disposeInner);
}
function stepSwitch(s) {
return switchNext(s.current === void 0 ? updateBoth(s.outer, s.inner) : s);
return switchNext(s.current === null ? updateBoth(s.outer, s.inner) : s);
}
function switchNext(s) {
return raceIndex(function(i, index) {
return unamb(function(i, index) {
return doSwitchNext(i, index, s);

@@ -47,7 +44,11 @@ }, s.current);

if(i.done) {
return s.inner === never ? i
return s.inner === never ? i.withState(s)
: switchNext(updateOuter(never, s));
}
// Outer not done, step outer to get next inner stream
return awaitNextOuter(s.outer, i);
// Signal lost interest in current inner
return when(function() {
return awaitNextOuter(s.outer, i);
}, s.inner.dispose(i.time, i.value, s.inner.state));
}

@@ -58,3 +59,3 @@

// next inner stream
return s.outer === never ? i
return s.outer === never ? i.withState(s)
: stepBoth(s.outer, s.current[0]);

@@ -64,7 +65,7 @@ }

// Inner not done, yield latest value
return new Yield(i.time, i.value, updateInner(s.inner.beget(s.inner.step, i.state), s));
return i.withState(updateInner(s.inner.beget(s.inner.step, i.state), s));
}
function initState(outer) {
return { outer: outer, inner: Stream.never(), current: void 0 };
return { outer: outer, inner: Stream.never(), current: null };
}

@@ -105,1 +106,4 @@

function disposeInner(t, x, s) {
return s.inner.dispose(t, x, s.inner.state);
}

@@ -13,5 +13,5 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var never = promise.never;
var raceIndex = promise.raceIndex;
var Yield = step.Yield;
var Pair = step.Pair;
var unamb = step.unamb;
var yieldPair = step.yieldPair;

@@ -39,2 +39,3 @@

/**
* @deprecated
* Create a stream that emits the current time periodically using

@@ -115,2 +116,3 @@ * the provided scheduler

/**
* @deprecated
* @param {Scheduler} scheduler

@@ -148,2 +150,3 @@ * @param {Number} delayTime milliseconds to delay each item

/**
* @deprecated
* Limit the rate of events

@@ -191,2 +194,3 @@ * stream: abcd----abcd----

/**
* @deprecated
* Wait for a burst of events to subside and emit only the last event in the burst

@@ -213,7 +217,7 @@ * stream: abcd----abcd----

function stepEarliest(scheduler, period, step, s) {
return raceIndex(function(winner, index) {
return index === 0 ? yieldDebounced(s)
return unamb(function(winner, index) {
return index > 0 ? yieldDebounced(s)
: winner.done ? winner.withState(s.state)
: stepEarliest(scheduler, period, step, nextState(scheduler, period, step, winner));
}, [s.timer, s.next]);
}, [s.next, s.timer]);
}

@@ -220,0 +224,0 @@

@@ -6,2 +6,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var join = require('./join').join;
var promise = require('../promises');

@@ -40,2 +41,13 @@ var step = require('../step');

/**
* Map each value in the stream to a new stream, and merge it into the
* returned stream.
* @param {function(x:*):Stream} f chaining function, must return a Stream
* @param {Stream} stream
* @returns {Stream} new stream containing all items from each stream returned by f
*/
function flatMap(f, stream) {
return join(map(f, stream));
}
/**
* Perform a side effect for each item in the stream

@@ -63,6 +75,6 @@ * @param {function(x:*):*} f side effect to execute for each item. The

/**
* Assume this stream contains functions, and apply each function to each item
* in the provided stream. This generates, in effect, a cross product.
* Assume fs is a stream containing functions, and apply each function to each value
* in the xs stream. This generates, in effect, a cross product.
* @param {Stream} fs stream of functions to apply to the xs
* @param {Stream} xs stream of items to which to apply all the fs
* @param {Stream} xs stream of values to which to apply all the fs
* @returns {Stream} stream containing the cross product of items

@@ -101,69 +113,1 @@ */

}
/**
* Map each value in the stream to a new stream, and emit its values
* into the returned stream.
* @param {function(x:*):Stream} f chaining function, must return a Stream
* @param {Stream} stream
* @returns {Stream} new stream containing all items from each stream returned by f
*/
function flatMap(f, stream) {
return stream.begetWithDispose(stepChain, new Outer(f, stream), disposeInnerThenOuter);
function stepChain(s) {
return s.step(stepChain);
}
}
function disposeInnerThenOuter(t, x, s) {
if(s === void 0) {
return x;
}
return when(function disposeOuter() {
return disposeStream(t, x, s.outer);
}, disposeStream(t, x, s.inner));
}
function disposeStream(t, x, stream) {
return stream !== void 0 ? stream.dispose(t, x, stream.state) : x;
}
// flatMap outer/inner "loop" helpers
function Outer(f, outer) {
this.f = f; this.outer = outer; this.inner = void 0;
}
Outer.prototype.step = function(stepNext) {
return stepOuter(stepNext, this.f, this.outer);
};
function stepOuter(stepNext, f, outer) {
return when(function(i) {
return i.done ? i
: stepInner(stepNext, f, outer.beget(outer.step, i.state), f(i.value));
}, Promise.resolve(streamNext(outer)));
}
function Inner(f, outer, inner) {
this.f = f; this.outer = outer; this.inner = inner;
}
Inner.prototype.step = function(stepNext) {
return stepInner(stepNext, this.f, this.outer, this.inner);
};
function stepInner(stepNext, f, outer, inner) {
return when(function(ii) {
if(ii.done) {
return when(function() {
return stepOuter(stepNext, f, outer);
}, inner.dispose(ii.time, ii.value, ii.state));
}
return ii.withState(new Inner(f, outer, inner.beget(inner.step, ii.state)));
}, Promise.resolve(streamNext(inner)));
}
function streamNext(s) {
return when(s.step, s.state);
}

@@ -20,10 +20,7 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var tail = base.tail;
var copy = base.copy;
var map = base.map;
var findIndex = base.findIndex;
exports.zipWith = zipWith;
exports.zip = zip;
exports.zipArray = zipArray;
exports.zipArrayWith = zipArrayWith;

@@ -33,26 +30,9 @@ /**

* @param {function(a:Stream, b:Stream, ...):*} f function to combine items
* @returns {Stream} observable containing
* @returns {Stream} stream containing
*/
function zipWith(f /*,...streams*/) {
return zipArrayWith(f, tail(arguments));
function zip(f /*,...streams*/) {
return zipArray(f, tail(arguments));
}
/**
* Combine events from all streams by collecting them into an array
* @returns {*}
*/
function zip(/*...streams*/) {
return zipArrayWith(Array, copy(arguments));
}
/**
* Combine events from all streams by collecting them into an array
* @param {Array} streams array of streams to zip
* @returns {*}
*/
function zipArray(streams) {
return zipArrayWith(Array, streams);
}
/**
* Combine events from all streams using f

@@ -64,3 +44,3 @@ * @param {function(a:Stream, b:Stream, ...):*} f function to combine items

*/
function zipArrayWith(f, streams) {
function zipArray(f, streams) {
return new Stream(function(ss) {

@@ -117,2 +97,1 @@ return stepZip(f, ss);

}
/** @license MIT License (c) copyright 2010-2014 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/** @module */

@@ -6,0 +5,0 @@ module.exports = function dispatch(f, args) {

@@ -7,17 +7,6 @@ var WhenPromise = require('when/es6-shim/Promise');

exports.when = when;
exports.raceIndex = raceIndex;
exports.never = never;
exports.never = WhenPromise.never;
exports.getStatus = WhenPromise._handler;
var neverP = { then: never, catch: never };
/**
* @returns {Promise} a promise that never resolves. Returns a singleton
* that consumes no additional resources even when adding handlers with
* then() and catch().
*/
function never() {
return neverP;
}
/**
* Create a { promise, resolve, reject } tuple

@@ -67,28 +56,1 @@ * @returns {{promise:Promise, resolve:function(*), reject:function(*)}} tuple

/**
* Like Promise.race, but calls f with the value *and index* of the first
* fulfilled promise, and returns a promise for the result.
* @param {function(x:*, i:Number):*} f function to apply to first
* fulfilled value and its index
* @param {Array} promises
* @returns {Promise}
*/
function raceIndex(f, promises) {
var done = false;
return new WhenPromise(runRaceIndex);
function runRaceIndex(resolve, reject) {
for(var i= 0, l=promises.length; i<l; ++i) {
settleOne(resolve, reject, i, promises[i]);
}
}
function settleOne(resolve, reject, i, p) {
WhenPromise.resolve(p).then(function(x) {
if(!done) {
done = true;
resolve(f(x, i));
}
}, reject);
}
}

@@ -66,2 +66,17 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

Queue.prototype.error = function(e) {
if(this.isEnded) {
return;
}
this.isEnded = true;
this._ended.reject(e);
if(this._awaiting.length > 0) {
this._awaiting.reduce(resolveAll, this._ended.promise);
} else {
this._items.push(this._ended.promise);
}
};
Queue.prototype.end = function(t, x) {

@@ -125,2 +140,2 @@ if(this.isEnded) {

};
}
}

@@ -30,2 +30,1 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

}

@@ -11,2 +11,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

exports.fromEventWhere = fromEventWhere;
exports.fromEvent = fromEvent;

@@ -16,3 +17,6 @@

* Create a stream of events from the supplied EventTarget or EventEmitter
* @param {String} event event name
* @param {function(event:*)} predicate filtering predicate call for each source event.
* If it returns `false`, the event will NOT be added to the stream.
* If it returns any other value (including falsy, eg undefined), the event will be added
* @param {String} name event name, e.g. 'click'
* @param {EventTarget|EventEmitter} source EventTarget or EventEmitter. The source

@@ -24,8 +28,33 @@ * must support either addEventListener/removeEventListener (w3c EventTarget:

*/
function fromEvent(event, source) {
return fromSource(new MulticastSource(Stream.getDefaultScheduler(), function(add) {
return subscribe(event, source, add);
function fromEventWhere(predicate, name, source) {
return fromSource(createMulticastSource(function(add) {
return subscribe(name, source, addWhere);
function addWhere(e) {
if(predicate(e) !== false) {
add(e);
}
}
}));
}
/**
* Create a stream of events from the supplied EventTarget or EventEmitter
* @param {String} name event name, e.g. 'click'
* @param {EventTarget|EventEmitter} source EventTarget or EventEmitter. The source
* must support either addEventListener/removeEventListener (w3c EventTarget:
* http://www.w3.org/TR/DOM-Level-2-Events/events.html#Events-EventTarget),
* or addListener/removeListener (node EventEmitter: http://nodejs.org/api/events.html)
* @returns {Stream} stream of events of the specified type from the source
*/
function fromEvent(name, source) {
return fromSource(createMulticastSource(function(add) {
return subscribe(name, source, add);
}));
}
function createMulticastSource(run) {
return new MulticastSource(Stream.getDefaultScheduler(), run);
}
function subscribe(event, source, add) {

@@ -32,0 +61,0 @@ if(typeof source.addEventListener === 'function') {

@@ -16,3 +16,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

* @private
* @param {{iterator:function():Object, scheduler:Scheduler}} source
* @param {{iterator:function():Object, scheduler:Scheduler, disposer:function():*}} source
* @returns {Stream} stream containing all items emitted by the source

@@ -19,0 +19,0 @@ */

@@ -34,3 +34,3 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var self = this;
this._doPublishNext = function(x) {
this._doNext = function(x) {
return self._publishNext(x);

@@ -44,2 +44,5 @@ };

};
this._doError = function(e) {
return self._error(e);
};
}

@@ -54,3 +57,3 @@

this._subscribers = [q];
this._dispose = this._run.call(void 0, this._doPublishNext, this._doEnd);
this._dispose = runPublisher(this._run, this._doNext, this._doEnd, this._doError);
} else {

@@ -63,2 +66,10 @@ this._subscribers.push(q);

function runPublisher(publisher, next, end, error) {
try {
return publisher.call(void 0, next, end, error);
} catch(e) {
error(e);
}
}
MulticastSource.prototype.disposer = Queue.disposeQueue;

@@ -93,2 +104,9 @@

function noop() {}
MulticastSource.prototype._error = function(e) {
for(var i=0, s=this._subscribers, l=s.length; i<l; ++i) {
s[i].error(e);
this._remove(s[i]);
}
};
function noop() {}

@@ -8,2 +8,7 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

var promise = require('./promises');
var race = promise.Promise.race;
var getStatus = promise.getStatus;
exports.Yield = Yield;

@@ -20,2 +25,4 @@ exports.End = End;

exports.unamb = unamb;
/**

@@ -89,2 +96,46 @@ * A step that yields a new value and a new state that can be used to produce

/**
* Unambiguously decide which step is the earliest, and call f with
* that step and its index in the steps array.
* This is a more precise race than Promise.race. Promise.race always
* returns the settled promise with the lowest array index, even if a
* promise with a higher array index actually won the race.
* unamb checks all indices to find the step with the earliest time.
* @param {function(x:*, i:Number):*} f function to apply to earliest
* step and its index
* @param {Array} steps
* @returns {Promise} promise for the result of applying f
*/
function unamb(f, steps) {
var winner = decide(steps);
if(winner === null) {
return race(steps).then(function() {
return decideWith(f, steps);
});
}
return f(winner.value, winner.index);
}
function decideWith(f, steps) {
var winner = decide(steps);
return f(winner.value, winner.index);
}
function decide(steps) {
var index = -1;
var winner;
for(var i=0, t=Infinity, h; i<steps.length; ++i) {
h = getStatus(steps[i]);
if(h.state() > 0 && h.value.time < t) {
index = i;
winner = h.value;
t = winner.time;
}
}
return index < 0 ? null : { index: index, value: winner };
}
/**
* A simple value, state pair

@@ -91,0 +142,0 @@ * @param {*} x

@@ -153,2 +153,2 @@ /** @license MIT License (c) copyright 2010-2014 original author or authors */

return x;
}
}
/** @license MIT License (c) copyright 2010-2014 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/** @module */
var base = require('./lib/base');
var identity = base.identity;
var cons = base.cons;
var tail = base.tail;
var replace = base.replace;

@@ -23,2 +21,9 @@ /**

//-----------------------------------------------------------------------
// Lifting functions
var lift = require('./lib/combinators/lift').lift;
exports.lift = lift;
//-----------------------------------------------------------------------
// Building

@@ -32,26 +37,27 @@

exports.repeat = repeat;
exports.cons = exports.startWith = consStream;
//-----------------------------------------------------------------------
// Extending
var extend = require('./lib/combinators/extend');
var cycle = extend.cycle;
var consStream = extend.cons;
exports.cycle = cycle;
exports.startWith = consStream;
/**
* Tie this stream into a circle, thus creating an infinite stream
* @returns {Stream} infinite stream that replays all items from this stream
* @returns {Stream} new infinite stream
*/
Stream.prototype.cycle = function() {
return repeat(this).flatMap(identity);
return cycle(this);
};
/**
* @param {*} x
* @param {Stream} stream
* @returns {Stream} new stream containing x followed by all items in this stream
*/
function consStream(x, stream) {
return concat(Stream.of(x), stream);
}
/**
* @param {*} x item to prepend
* @returns {Stream} a new stream with x prepended
*/
Stream.prototype.cons = Stream.prototype.startWith = function(x) {
Stream.prototype.startWith = function(x) {
return consStream(x, this);

@@ -90,2 +96,3 @@ };

exports.fromEvent = events.fromEvent;
exports.fromEventWhere = events.fromEventWhere;

@@ -124,3 +131,2 @@ //-----------------------------------------------------------------------

exports.flatMap = exports.chain = flatMap;
exports.flatten = flatten;
exports.scan = scan;

@@ -159,19 +165,2 @@ exports.tap = tap;

/**
* Flatten a stream of stream of x into a stream of x
* @returns {Stream} stream of x
*/
Stream.prototype.flatten = function() {
return flatMap(identity, this);
};
/**
* Flatten a stream of stream of x into a stream of x
* @param {Stream} stream stream of stream of x
* @returns {Stream} stream of x
*/
function flatten(stream) {
return flatMap(identity, stream);
}
/**
* Create a stream containing successive reduce results of applying f to

@@ -316,22 +305,27 @@ * the previous reduce result and the current stream item.

//-----------------------------------------------------------------------
// Zipping
// Combining
var zip = require('./lib/combinators/zip');
var zipArray = zip.zipArray;
var zipArrayWith = zip.zipArrayWith;
var combine = require('./lib/combinators/combine');
var combineArray = combine.combineArray;
exports.zip = zip.zip;
exports.zipWith = zip.zipWith;
exports.zipArray = zipArray;
exports.zipArrayWith = zipArrayWith;
exports.combine = combine.combine;
/**
* Pair-wise combine items with those in s. Given 2 streams:
* [1,2,3] zip [4,5,6] -> [[1,4],[2,5],[3,6]]
* @returns {Stream} new stream containing pairs
* Combine latest events from all input streams
* @param {function(...events):*} f function to combine most recent events
* @returns {Stream} stream containing the result of applying f to the most recent
* event of each input stream, whenever a new event arrives on any stream.
*/
Stream.prototype.zip = function(/*,...ss*/) {
return zipArray(cons(this, arguments));
Stream.prototype.combine = function(f /*,...streams*/) {
return combineArray(f, replace(this, 0, arguments));
};
//-----------------------------------------------------------------------
// Zipping
var zip = require('./lib/combinators/zip');
var zipArray = zip.zipArray;
exports.zip = zip.zip;
/**

@@ -343,4 +337,4 @@ * Pair-wise combine items with those in s. Given 2 streams:

*/
Stream.prototype.zipWith = function(f /*,...ss*/) {
return zipArrayWith(f, cons(this, tail(arguments)));
Stream.prototype.zip = function(f /*,...ss*/) {
return zipArray(f, replace(this, 0, arguments));
};

@@ -353,7 +347,4 @@

var mergeArray = merge.mergeArray;
var mergeAll = merge.mergeAll;
exports.merge = merge.merge;
exports.mergeArray = mergeArray;
exports.mergeAll = mergeAll;
exports.merge = merge.merge;

@@ -366,12 +357,24 @@ /**

*/
Stream.prototype.merge = function(/*,...ss*/) {
Stream.prototype.merge = function(/*,...streams*/) {
return mergeArray(cons(this, arguments));
};
//-----------------------------------------------------------------------
// Higher-order stream
//-----------------------------------------------------------------------
//-----------------------------------------------------------------------
// Joining (flattening)
var join = require('./lib/combinators/join').join;
exports.join = join;
/**
* Assumes this is a stream of streams, and merges all items into a single stream.
* @returns {Stream} stream containing items from all streams.
* Monadic join. Flatten a Stream<Stream<X>> to Stream<X> by merging inner
* streams to the outer. Event arrival times are preserved.
* @returns {Stream}
*/
Stream.prototype.mergeAll = function() {
return mergeAll(this);
Stream.prototype.join = function() {
return join(this);
};

@@ -401,25 +404,16 @@

var delay = timed.delay;
var delayOn = timed.delayOn;
var throttle = timed.throttle;
var throttleOn = timed.throttleOn;
var debounce = timed.debounce;
var debounceOn = timed.debounceOn;
exports.periodic = timed.periodic;
exports.periodicOn = timed.periodicOn;
exports.delay = delay;
exports.delayOn = delayOn;
exports.throttle = throttle;
exports.throttleOn = throttleOn;
exports.debounce = debounce;
exports.debounceOn = debounceOn;
/**
* @param {Number} delayTime milliseconds to delay each item
* @param {Scheduler=} scheduler optional scheduler to use
* @returns {Stream} new stream containing the same items, but delayed by ms
*/
Stream.prototype.delay = function(delayTime, scheduler) {
return arguments.length > 1 ? delayOn(scheduler, delayTime, this)
: delay(delayTime, this);
Stream.prototype.delay = function(delayTime) {
return delay(delayTime, this);
};

@@ -432,8 +426,6 @@

* @param {Number} period time to suppress events
* @param {Scheduler=} scheduler optional scheduler
* @returns {Stream} new stream that skips events for throttle period
*/
Stream.prototype.throttle = function(period, scheduler) {
return arguments.length > 1 ? throttleOn(scheduler, period, this)
: throttle(period, this);
Stream.prototype.throttle = function(period) {
return throttle(period, this);
};

@@ -447,8 +439,6 @@

* on the provided scheduler will be suppressed
* @param {Scheduler=} scheduler optional scheduler
* @returns {Stream} new debounced stream
*/
Stream.prototype.debounce = function(period, scheduler) {
return arguments.length > 1 ? debounceOn(scheduler, period, this)
: debounce(period, this);
Stream.prototype.debounce = function(period) {
return debounce(period, this);
};

@@ -478,2 +468,2 @@

return flatMapError(f, this);
};
};
{
"name": "most",
"version": "0.7.0",
"version": "0.8.0",
"description": "Monadic streams",

@@ -30,4 +30,4 @@ "main": "most.js",

"dependencies": {
"when": "~3.4"
"when": "~3.5"
}
}
[![Build Status](https://travis-ci.org/cujojs/most.svg?branch=master)](https://travis-ci.org/cujojs/most)
# Monadic Stream
# Monadic streams for reactive programming
Most.js is a toolkit for composing asynchronous operations on streams of data and values that vary over time, without many of the hazards of mutable shared state. It provides a powerful set of observable streams and operations for merging, filtering, transforming, and reducing them.
Most.js is a toolkit for reactive programming. It helps you compose asynchronous operations on streams of values and events, e.g. WebSocket messages, DOM events, etc, and on time-varying values, e.g. the "current value" of an &lt;input&gt;, without many of the hazards of side effects and mutable shared state.
## What can it do?
It provides a small but powerful set of operations for merging, filtering, transforming, and reducing event streams and time-varying values.
*Examples coming soon*
* [API docs](docs/api.md)
* [Examples](examples)
* [Get it](#get-it)
## Why use it?
## Simple example
*Coming soon*
Here's a simple program that displays the result of adding two inputs. The result is reactive and updates whenever *either* input changes.
First, the HTML fragment for the inputs and a place to display the live result:
```html
<form>
<input class="x"> + <input class="y"> = <span class="result"></span>
</form>
```
Using most.js to make it reactive:
```js
var most = require('most');
var xInput = document.querySelector('input.x');
var yInput = document.querySelector('input.y');
var resultNode = document.querySelector('.result');
exports.main = function() {
// x represents the current value of xInput
var x = most.fromEvent('input', xInput).map(toNumber);
// x represents the current value of yInput
var y = most.fromEvent('input', yInput).map(toNumber);
// result is the live current value of adding x and y
var result = most.combine(add, x, y);
// Observe the result value by rendering it to the resultNode
result.observe(renderResult);
};
function add(x, y) {
return x + y;
}
function toNumber(e) {
return Number(e.target.value);
}
function renderResult(result) {
resultNode.textContent = result;
}
```
## More examples
To [run the example above](examples/add-inputs) and [others](examples) using [RaveJS](https://github.com/RaveJS/rave): clone the repo into a web servable dir, `cd examples/<name> && bower install`, and load `index.html` in your browser.
## Get it
`npm install --save most` or `bower install --save most`
then
```js
var most = require('most');
```
## But what about

@@ -19,5 +79,5 @@

Promises are another elegant and powerful data structure for composing asynchronous operations. Promises and observable streams are clearly related in that they provide tools for managing asynchrony. However, they each have their strengths.
Promises are another elegant and powerful data structure for composing asynchronous operations. Promises and reactive streams are clearly related in that they provide tools for managing asynchrony. However, they each have their strengths.
Promises deal with single, asynchronous, immutable values and provide operations for transforming them, and providing asynchronous error handling and flow control. Observable streams deal with *sequences of asynchronous values*, and as such, provide a similar but typically broader set of operations.
Promises deal with single, asynchronous, immutable values and provide operations for transforming them, and provide asynchronous error handling and flow control. Event streams represent sequences of asynchronous values or values that vary over time. They provide a similar, but typically broader, set of operations.

@@ -24,0 +84,0 @@ Most.js interoperates seamlessly with ES6 and Promises/A+ promises. In fact, it even uses promises internally. For example, reducing a stream returns a promise for the final result:

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc