Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

most

Package Overview
Dependencies
Maintainers
1
Versions
78
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

most - npm Package Compare versions

Comparing version 0.13.1 to 0.13.2

lib/combinator/mergeConcurrently.js

2

bower.json
{
"name": "most",
"main": "most.js",
"version": "0.13.1",
"version": "0.13.2",
"homepage": "https://github.com/cujojs/most",

@@ -6,0 +6,0 @@ "authors": [

@@ -38,4 +38,4 @@ most.js API

* [filter](#filter)
* [distinct](#distinct)
* [distinctBy](#distinctby)
* [skipRepeats](#skipRepeats)
* [skipRepeatsWith](#skipRepeatsWith)
1. Transducer support

@@ -321,4 +321,6 @@ * [transduce](#transduce)

Note that when the stream ends (for example, by using [take](#take), [takeUntil](#until), etc.), it will automatically be disconnected from the event source. For example, in the case of DOM events, the underlying DOM event listener will be removed automatically.
When the stream ends (for example, by using [take](#take), [takeUntil](#until), etc.), it will automatically be disconnected from the event source. For example, in the case of DOM events, the underlying DOM event listener will be removed automatically.
Note on EventEmitter: EventEmitters and EventTargets, such as DOM nodes, behave differently in that EventEmitter allows events to be delivered in the same tick as a listener is added. When using EventEmitter, `most.fromEvent`, will *ensure asynchronous event delivery*, thereby preventing hazards of "maybe sync, maybe async" (aka zalgo) event delivery.
```js

@@ -770,12 +772,14 @@ var clicks = most.fromEvent('click', document.querySelector('.the-button'));

### distinct
### skipRepeats
####`stream.distinct() -> Stream`
####`most.distinct(stream) -> Stream`
**Deprecated alias:** `distinct`
Create a new stream with *adjacent duplicates* removed.
####`stream.skipRepeats() -> Stream`
####`most.skipRepeats(stream) -> Stream`
Create a new stream with *adjacent* repeated events removed.
```
stream: -1-2-2-3-4-4-5->
stream.distinct(): -1-2---3-4---5->
stream.skipRepeats(): -1-2---3-4---5->
```

@@ -785,12 +789,14 @@

### distinctBy
### skipRepeatsWith
####`stream.distinctBy(equals) -> Stream`
####`most.distinctBy(equals, stream) -> Stream`
**Deprecated alias:** `distinctBy`
Create a new stream with *adjacent duplicates* removed, using the provided `equals` function.
####`stream.skipRepeatsWith(equals) -> Stream`
####`most.skipRepeatsWith(equals, stream) -> Stream`
Create a new stream with *adjacent* repeated events removed, using the provided `equals` function.
```
stream: -a-b-B-c-D-d-e->
stream.distinctBy(equalsIgnoreCase): -a-b---c-D---e->
stream.skipRepeatsWith(equalsIgnoreCase): -a-b---c-D---e->
```

@@ -1401,2 +1407,10 @@

```
s1: abcd----abcd---->
s2: ------------|
s1.until(s2).debounce(2): -----d------d|
```
If the stream ends while there is a pending debounced event (e.g. via [`until`](#until), see example above), the pending event will be emitted just before the stream ends.
Debouncing can be extremely useful when dealing with bursts of similar events, for example, debouncing keypress events before initiating a remote search query in a browser application.

@@ -1403,0 +1417,0 @@

@@ -7,3 +7,4 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var Pipe = require('../sink/Pipe');
var drain = require('./observe').drain;
var runSource = require('../runSource');
var noop = require('../base').noop;

@@ -69,3 +70,3 @@ exports.scan = scan;

function reduce(f, initial, stream) {
return drain(new Stream(new Accumulate(f, initial, stream.source)));
return runSource(noop, new Accumulate(f, initial, stream.source));
}

@@ -72,0 +73,0 @@

@@ -5,3 +5,4 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var concurrentMergeMap = require('./concurrentMergeMap').concurrentMergeMap;
var mergeConcurrently = require('./mergeConcurrently').mergeConcurrently;
var map = require('./transform').map;

@@ -22,3 +23,3 @@ exports.concatMap = concatMap;

function concatMap(f, stream) {
return concurrentMergeMap(f, 1, stream);
return mergeConcurrently(1, map(f, stream));
}

@@ -10,4 +10,4 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

exports.filter = filter;
exports.distinct = distinct;
exports.distinctBy = distinctBy;
exports.skipRepeats = skipRepeats;
exports.skipRepeatsWith = skipRepeatsWith;

@@ -25,21 +25,21 @@ /**

/**
* Remove adjacent duplicates, using === to detect duplicates
* @param {Stream} stream stream from which to omit adjacent duplicates
* @returns {Stream} stream with no adjacent duplicates
* Skip repeated events, using === to detect duplicates
* @param {Stream} stream stream from which to omit repeated events
* @returns {Stream} stream without repeated events
*/
function distinct(stream) {
return distinctBy(same, stream);
function skipRepeats(stream) {
return skipRepeatsWith(same, stream);
}
/**
* Remove adjacent duplicates using the provided equals function to detect duplicates
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.
* @param {Stream} stream stream from which to omit adjacent duplicates
* @returns {Stream} stream with no adjacent duplicates
* Skip repeated events using the provided equals function to detect duplicates
* @param {function(a:*, b:*):boolean} equals optional function to compare items
* @param {Stream} stream stream from which to omit repeated events
* @returns {Stream} stream without repeated events
*/
function distinctBy(equals, stream) {
return new Stream(new Distinct(equals, stream.source));
function skipRepeatsWith(equals, stream) {
return new Stream(new SkipRepeats(equals, stream.source));
}
function Distinct(equals, source) {
function SkipRepeats(equals, source) {
this.equals = equals;

@@ -49,7 +49,7 @@ this.source = source;

Distinct.prototype.run = function(sink, scheduler) {
return this.source.run(new DistinctSink(this.equals, sink), scheduler);
SkipRepeats.prototype.run = function(sink, scheduler) {
return this.source.run(new SkipRepeatsSink(this.equals, sink), scheduler);
};
function DistinctSink(equals, sink) {
function SkipRepeatsSink(equals, sink) {
this.equals = equals;

@@ -61,6 +61,6 @@ this.sink = sink;

DistinctSink.prototype.end = Sink.prototype.end;
DistinctSink.prototype.error = Sink.prototype.error;
SkipRepeatsSink.prototype.end = Sink.prototype.end;
SkipRepeatsSink.prototype.error = Sink.prototype.error;
DistinctSink.prototype.event = function(t, x) {
SkipRepeatsSink.prototype.event = function(t, x) {
if(this.init) {

@@ -67,0 +67,0 @@ this.init = false;

@@ -5,4 +5,4 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var concurrentMergeMap = require('./concurrentMergeMap').concurrentMergeMap;
var identity = require('../base').identity;
var mergeConcurrently = require('./mergeConcurrently').mergeConcurrently;
var map = require('./transform').map;

@@ -20,3 +20,3 @@ exports.flatMap = flatMap;

function flatMap(f, stream) {
return concurrentMergeMap(f, Infinity, stream);
return join(map(f, stream));
}

@@ -31,3 +31,3 @@

function join(stream) {
return flatMap(identity, stream);
return mergeConcurrently(Infinity, stream);
}

@@ -75,2 +75,3 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

this.scheduler = scheduler;
this.value = void 0;
this.timer = null;

@@ -84,2 +85,3 @@

this._clearTimer();
this.value = x;
this.timer = this.scheduler.delay(this.dt, PropagateTask.event(x, this.sink));

@@ -89,3 +91,6 @@ };

DebounceSink.prototype.end = function(t, x) {
this._clearTimer();
if(this._clearTimer()) {
this.sink.event(t, this.value);
this.value = void 0;
}
this.sink.end(t, x);

@@ -104,6 +109,8 @@ };

DebounceSink.prototype._clearTimer = function() {
if(this.timer !== null) {
this.timer.cancel();
this.timer = null;
if(this.timer === null) {
return false;
}
this.timer.cancel();
this.timer = null;
return true;
};

@@ -7,3 +7,3 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var fromArray = require('../source/fromArray').fromArray;
var join = require('./flatMap').join;
var mergeConcurrently = require('./mergeConcurrently').mergeConcurrently;
var copy = require('../base').copy;

@@ -30,5 +30,6 @@

function mergeArray(streams) {
return streams.length === 0 ? empty()
: streams.length === 1 ? streams[0]
: join(fromArray(streams));
var l = streams.length;
return l === 0 ? empty()
: l === 1 ? streams[0]
: mergeConcurrently(l, fromArray(streams));
}

@@ -5,36 +5,27 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var Promise = require('../Promise');
var Observer = require('../sink/Observer');
var runSource = require('../runSource');
var noop = require('../base').noop;
var scheduler = require('../scheduler/defaultScheduler');
var resolve = Promise.resolve;
exports.observe = observe;
exports.drain = drain;
/**
* Observe all the event values in the stream in time order. The
* provided function `f` will be called for each event value
* @param {function(x:T):*} f function to call with each event value
* @param {Stream<T>} stream stream to observe
* @return {Promise} promise that fulfills after the stream ends without
* an error, or rejects if the stream ends with an error.
*/
function observe(f, stream) {
var source = stream.source;
return new Promise(function(res, rej) {
var disposable;
var observer = new Observer(f,
function (x) {
disposeThen(res, rej, disposable, x);
}, function(e) {
disposeThen(rej, rej, disposable, e);
});
disposable = source.run(observer, scheduler);
});
return runSource(f, stream.source);
}
function disposeThen(res, rej, disposable, x) {
resolve(disposable.dispose()).then(function () {
res(x);
}, rej);
}
/**
* "Run" a stream by
* @param stream
* @return {*}
*/
function drain(stream) {
return observe(noop, stream);
return runSource(noop, stream.source);
}

@@ -6,4 +6,6 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var Stream = require('../Stream');
var ChainDisposable = require('../disposable/ChainDisposable');
var EmptyDisposable = require('../disposable/EmptyDisposable');
var MulticastSource = require('../source/MulticastSource');
var until = require('./timeslice').takeUntil;
var mergeConcurrently = require('./mergeConcurrently').mergeConcurrently;
var map = require('./transform').map;

@@ -19,95 +21,9 @@ exports.switch = switchLatest;

function switchLatest(stream) {
return new Stream(new Switch(stream.source));
}
var upstream = new Stream(new MulticastSource(stream.source));
function Switch(source) {
this.source = source;
}
return mergeConcurrently(1, map(untilNext, upstream));
Switch.prototype.run = function(sink, scheduler) {
var switchSink = new SwitchSink(sink, scheduler);
return new ChainDisposable(switchSink, this.source.run(switchSink, scheduler));
};
function SwitchSink(sink, scheduler) {
this.sink = sink;
this.scheduler = scheduler;
this.current = null;
this.ended = false;
}
SwitchSink.prototype.event = function(t, stream) {
this._disposeCurrent(t); // TODO: capture the result of this dispose
this.current = new Segment(t, Infinity, this, this.sink);
this.current.disposable = stream.source.run(this.current, this.scheduler);
};
SwitchSink.prototype.end = function(t, x) {
this.ended = true;
this._checkEnd(t, x);
};
SwitchSink.prototype.error = function(t, e) {
this.ended = true;
this.sink.error(t, e);
};
SwitchSink.prototype.dispose = function() {
return this._disposeCurrent(0);
};
SwitchSink.prototype._disposeCurrent = function(t) {
if(this.current !== null) {
return this.current._dispose(t);
function untilNext(s) {
return until(upstream, s);
}
};
SwitchSink.prototype._disposeInner = function(t, inner) {
inner._dispose(t); // TODO: capture the result of this dispose
if(inner === this.current) {
this.current = null;
}
};
SwitchSink.prototype._checkEnd = function(t, x) {
if(this.ended && this.current === null) {
this.sink.end(t, x);
}
};
SwitchSink.prototype._endInner = function(t, x, inner) {
this._disposeInner(t, inner);
this._checkEnd(t, x);
};
SwitchSink.prototype._errorInner = function(t, e, inner) {
this._disposeInner(t, inner);
this.sink.error(t, e);
};
function Segment(min, max, outer, sink) {
this.min = min;
this.max = max;
this.outer = outer;
this.sink = sink;
this.disposable = new EmptyDisposable();
}
Segment.prototype.event = function(t, x) {
if(t < this.max) {
this.sink.event(Math.max(t, this.min), x);
}
};
Segment.prototype.end = function(t, x) {
this.outer._endInner(Math.max(t, this.min), x, this);
};
Segment.prototype.error = function(t, e) {
this.outer._errorInner(Math.max(t, this.min), e, this);
};
Segment.prototype._dispose = function(t) {
this.max = t;
return this.disposable.dispose();
};

@@ -24,2 +24,6 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

PropagateTask.error = function(value, sink) {
return new PropagateTask(error, value, sink);
};
PropagateTask.prototype.dispose = function() {

@@ -43,2 +47,6 @@ this.active = false;

function error(t, e, sink) {
sink.error(t, e);
}
function emit(t, x, sink) {

@@ -45,0 +53,0 @@ sink.event(t, x);

@@ -41,2 +41,1 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

};

@@ -8,2 +8,3 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var MulticastSource = require('./MulticastSource');
var AsyncSource = require('./AsyncSource');
var noop = require('../base').noop;

@@ -14,3 +15,3 @@

function create(run) {
return new Stream(new MulticastSource(new SubscriberSource(run)));
return new Stream(new MulticastSource(new AsyncSource(new SubscriberSource(run))));
}

@@ -17,0 +18,0 @@

@@ -33,7 +33,15 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

function runProducer(t, array, sink) {
for(var i=0, l=array.length; i<l && this.active; ++i) {
return produce(this, array, sink, 0);
}
function produce(task, array, sink, k) {
for(var i=k, l=array.length; i<l && task.active; ++i) {
sink.event(0, array[i]);
}
this.active && sink.end(0);
return end();
function end() {
return task.active && sink.end(0);
}
}

@@ -7,2 +7,3 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

var MulticastSource = require('./MulticastSource');
var PropagateTask = require('../scheduler/PropagateTask');
var base = require('../base');

@@ -28,3 +29,12 @@

function fromEventWhere(predicate, event, source) {
return new Stream(new MulticastSource(new EventSource(predicate, event, source)));
var s;
if(typeof source.addEventListener === 'function') {
s = new MulticastSource(new EventSource(predicate, event, source));
} else if(typeof source.addListener === 'function') {
s = new EventEmitterSource(predicate, event, source);
} else {
throw new Error('source must support addEventListener or addListener');
}
return new Stream(s);
}

@@ -39,6 +49,6 @@

EventSource.prototype.run = function(sink, scheduler) {
return new EventSink(this.where, this.event, this.source, sink, scheduler);
return new EventAdapter(this.where, this.event, this.source, sink, scheduler);
};
function EventSink(where, event, source, sink, scheduler) {
function EventAdapter(where, event, source, sink, scheduler) {
this.event = event;

@@ -60,34 +70,65 @@ this.source = source;

EventSink.prototype._init = function(addEvent, event, source) {
var doAddEvent = addEvent;
EventAdapter.prototype._init = function(addEvent, event, source) {
source.addEventListener(event, addEvent, false);
return addEvent;
};
if(typeof source.addEventListener === 'function') {
source.addEventListener(event, doAddEvent, false);
EventAdapter.prototype.dispose = function() {
if (typeof this.source.removeEventListener !== 'function') {
throw new Error('source must support removeEventListener or removeListener');
}
} else if(typeof source.addListener === 'function') {
// EventEmitter supports varargs (eg: emitter.emit('event', a, b, c, ...)) so
// have to support it here by turning into an array
doAddEvent = function addVarargs(a) {
return arguments.length > 1 ? addEvent(base.copy(arguments)) : addEvent(a);
};
this.source.removeEventListener(this.event, this._addEvent, false);
};
source.addListener(event, doAddEvent);
function EventEmitterSource(where, event, source) {
this.where = where;
this.event = event;
this.source = source;
}
} else {
throw new Error('source must support addEventListener or addListener');
EventEmitterSource.prototype.run = function(sink, scheduler) {
return new EventEmitterAdapter(this.where, this.event, this.source, sink, scheduler);
};
function EventEmitterAdapter(where, event, source, sink, scheduler) {
this.event = event;
this.source = source;
this.sink = sink;
this.where = where;
var self = this;
function addEvent(ev) {
if(self.where(ev) === false) {
return;
}
// NOTE: Because EventEmitter allows events in the same call stack as
// a listener is added, use the scheduler to buffer all events
// until the stack clears, then propagate.
scheduler.asap(new PropagateTask(tryEvent, ev, self.sink));
}
this._addEvent = this._init(addEvent, event, source);
}
EventEmitterAdapter.prototype._init = function(addEvent, event, source) {
var doAddEvent = addEvent;
// EventEmitter supports varargs (eg: emitter.emit('event', a, b, c, ...)) so
// have to support it here by turning into an array
doAddEvent = function addVarargs(a) {
return arguments.length > 1 ? addEvent(base.copy(arguments)) : addEvent(a);
};
source.addListener(event, doAddEvent);
return doAddEvent;
};
EventSink.prototype.dispose = function() {
if(typeof this.source.removeEventListener === 'function') {
this.source.removeEventListener(this.event, this._addEvent, false);
} else if(typeof this.source.removeListener === 'function') {
this.source.removeListener(this.event, this._addEvent);
} else {
EventEmitterAdapter.prototype.dispose = function() {
if (typeof this.source.removeListener !== 'function') {
throw new Error('source must support removeEventListener or removeListener');
}
this.source.removeListener(this.event, this._addEvent);
};

@@ -99,5 +140,5 @@

function tryEvent (t, ev, sink) {
function tryEvent (t, x, sink) {
try {
sink.event(t, ev);
sink.event(t, x);
} catch(e) {

@@ -104,0 +145,0 @@ sink.error(t, e);

@@ -19,4 +19,3 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

if(n === 1) {
var task = new StartMulticastTask(this.source, this.sink, scheduler);
this._disposable = scheduler.asap(task);
this._disposable = this.source.run(this.sink, scheduler);
}

@@ -38,18 +37,2 @@

function StartMulticastTask(source, sink, scheduler) {
this.source = source;
this.sink = sink;
this.scheduler = scheduler;
}
StartMulticastTask.prototype.run = function() {
return this.source.run(this.sink, this.scheduler);
};
StartMulticastTask.prototype.error = function(t, e) {
this.sink.error(t, e);
};
StartMulticastTask.prototype.dispose = base.noop;
function MulticastDisposable(source, sink) {

@@ -56,0 +39,0 @@ this.source = source;

@@ -406,5 +406,5 @@ /** @license MIT License (c) copyright 2010-2015 original author or authors */

exports.filter = filter.filter;
exports.distinct = filter.distinct;
exports.distinctBy = filter.distinctBy;
exports.filter = filter.filter;
exports.skipRepeats = exports.distinct = filter.skipRepeats;
exports.skipRepeatsWith = exports.distinctBy = filter.skipRepeatsWith;

@@ -423,18 +423,18 @@ /**

/**
* Remove adjacent duplicates, using === to compare items
* Skip repeated events, using === to compare items
* stream: -abbcd-
* distinct(stream): -ab-cd-
* @returns {Stream} stream with no adjacent duplicates
* @returns {Stream} stream with no repeated events
*/
Stream.prototype.distinct = function() {
return filter.distinct(this);
Stream.prototype.skipRepeats = Stream.prototype.distinct = function() {
return filter.skipRepeats(this);
};
/**
* Remove adjacent duplicates, using supplied equals function to compare items
* @param {function(a:*, b:*):boolean} equals function to compare items.
* @returns {Stream} stream with no adjacent duplicates
* Skip repeated events, using supplied equals function to compare items
* @param {function(a:*, b:*):boolean} equals function to compare items
* @returns {Stream} stream with no repeated events
*/
Stream.prototype.distinctBy = function(equals) {
return filter.distinctBy(equals, this);
Stream.prototype.skipRepeatsWith = Stream.prototype.distinctBy = function(equals) {
return filter.skipRepeatsWith(equals, this);
};

@@ -441,0 +441,0 @@

{
"name": "most",
"version": "0.13.1",
"version": "0.13.2",
"description": "Monadic streams",

@@ -5,0 +5,0 @@ "main": "most.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc