Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

estream

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

estream - npm Package Compare versions

Comparing version 1.0.0 to 2.0.0

32

api/estream.md

@@ -70,8 +70,5 @@ # addMethods

The only way to create a new Estream.
You must pass a function, which gets called immediately,
to get access to 'push', 'error' and 'end'.
**Parameters**
- `fn` **[Object](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object)** the source function
- `options` **[Object](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object)** set of estream options

@@ -82,3 +79,3 @@

```javascript
var estream1 = ES(function(push, error, end) {}, { buffer: false });
var es1 = estream({ buffer: false });
```

@@ -91,21 +88,19 @@

Pushes an end event down the estream.
The estream param is bound automatically when creating a new estream
and is the third parameter passed to the function you pass to createEstream.
**Parameters**
- `estream` **Estream**
- `value` **Any** the value
Returns **** this
# error
Pushes an error event down the estream.
The estream param is bound automatically when creating a new estream
and is the second parameter passed to the function you pass to createEstream.
**Parameters**
- `estream` **Estream**
- `value` **Any** the value
Returns **** this
# EsData

@@ -301,3 +296,3 @@

A helper function for getting only error event values from Estreams.
A helper function for getting only end event values from Estreams.

@@ -312,4 +307,4 @@ **Parameters**

var estream1 = es();
var estream1.onError(function(eventValue, estream1, off) {
console.log('got a error event value', eventValue);
var estream1.onEnd(function(eventValue, estream1, off) {
console.log('got a end event value', eventValue);
});

@@ -322,3 +317,3 @@ ```

A helper function for getting only end event values from Estreams.
A helper function for getting only error event values from Estreams.

@@ -333,4 +328,4 @@ **Parameters**

var estream1 = es();
var estream1.onEnd(function(eventValue, estream1, off) {
console.log('got a end event value', eventValue);
var estream1.onError(function(eventValue, estream1, off) {
console.log('got a error event value', eventValue);
});

@@ -344,10 +339,9 @@ ```

Pushes a data event down the estream.
The estream param is bound automatically when creating a new estream
and is the first parameter passed to the function you pass to createEstream.
**Parameters**
- `estream` **Estream**
- `value` **Any** the value
Returns **** this
# scan

@@ -354,0 +348,0 @@

@@ -106,55 +106,2 @@ var ES_EVENT_TYPE = '@@EsEvent';

/**
* Pushes a data event down the estream.
* The estream param is bound automatically when creating a new estream
* and is the first parameter passed to the function you pass to createEstream.
*
* @name push
* @param {Estream} estream
* @param {*} value - the value
*/
function push(estream, value) {
_processEvent(
estream,
(value && value.type === ES_EVENT_TYPE) ? value : new EsData(value)
);
}
/**
* Pushes an error event down the estream.
* The estream param is bound automatically when creating a new estream
* and is the second parameter passed to the function you pass to createEstream.
*
* @name error
* @param {Estream} estream
* @param {*} value - the value
*/
function error(estream, value) {
_processEvent(
estream,
(value && value.type === ES_EVENT_TYPE) ? value : new EsError(value)
);
}
/**
* Pushes an end event down the estream.
* The estream param is bound automatically when creating a new estream
* and is the third parameter passed to the function you pass to createEstream.
*
* @name end
* @param {Estream} estream
* @param {*} value - the value
*/
function end(estream, value) {
if (arguments.length === 2) {
if (value.type === ES_EVENT_TYPE) {
_processEvent(estream, value);
} else {
_processEvent(estream, new EsEnd(value));
}
} else {
_processEvent(estream, new EsEnd());
}
}
/**
* A pre-bound function that unsubscribes a consumer/subscriber from a stream.

@@ -293,2 +240,52 @@ * This is returned for every "on" function.

/**
* Pushes a data event down the estream.
*
* @name push
* @param {*} value - the value
* @return this
*/
Estream.prototype.push = function(value) {
_processEvent(
this,
(value && value.type === ES_EVENT_TYPE) ? value : new EsData(value)
);
return this;
};
/**
* Pushes an error event down the estream.
*
* @name error
* @param {*} value - the value
* @return this
*/
Estream.prototype.error = function(value) {
_processEvent(
this,
(value && value.type === ES_EVENT_TYPE) ? value : new EsError(value)
);
return this;
};
/**
* Pushes an end event down the estream.
*
* @name end
* @param {*} value - the value
* @return this
*/
Estream.prototype.end = function(value) {
if (arguments.length > 0) {
if (value.type === ES_EVENT_TYPE) {
_processEvent(this, value);
} else {
_processEvent(this, new EsEnd(value));
}
} else {
_processEvent(this, new EsEnd());
}
return this;
};
/**
* Adds a consumer/subscriber to an estream.

@@ -429,17 +426,17 @@ * When an event gets pushed down an estream, the consumer will get as params:

Estream.prototype.map = function(fn) {
return createEstream(function(p, e) {
this.on(function(event) {
if (event.isData) {
var mappedValue;
try {
mappedValue = fn(event.value);
p(mappedValue);
} catch (err) {
e(err);
}
} else {
p(event);
var s = createEstream();
this.on(function(event) {
if (event.isData) {
var mappedValue;
try {
mappedValue = fn(event.value);
s.push(mappedValue);
} catch (err) {
s.error(err);
}
});
}.bind(this));
} else {
s.push(event);
}
});
return s;
};

@@ -464,17 +461,17 @@

Estream.prototype.scan = function(fn, acc) {
return createEstream(function(p, e) {
this.on(function(event) {
if (event.isData) {
var nextAcc;
try {
nextAcc = fn(acc, event.value);
p(acc = nextAcc);
} catch (err) {
e(err);
}
} else {
p(event);
var s = createEstream();
this.on(function(event) {
if (event.isData) {
var nextAcc;
try {
nextAcc = fn(acc, event.value);
s.push(acc = nextAcc);
} catch (err) {
s.error(err);
}
});
}.bind(this));
} else {
s.push(event);
}
});
return s;
};

@@ -498,17 +495,17 @@

Estream.prototype.filter = function(fn) {
return createEstream(function(p, e) {
this.on(function(event) {
if (event.isData) {
try {
if (fn(event.value)) {
p(event);
}
} catch (err) {
e(err);
var s = createEstream();
this.on(function(event) {
if (event.isData) {
try {
if (fn(event.value)) {
s.push(event);
}
} else {
p(event);
} catch (err) {
s.error(err);
}
});
}.bind(this));
} else {
s.push(event);
}
});
return s;
};

@@ -531,7 +528,7 @@

Estream.prototype.filterEvent = function(fn) {
return createEstream(function(p) {
this.on(function(event) {
if (fn(event)) p(event);
});
}.bind(this));
var s = createEstream();
this.on(function(event) {
if (fn(event)) s.push(event);
});
return s;
};

@@ -591,7 +588,4 @@

* The only way to create a new Estream.
* You must pass a function, which gets called immediately,
* to get access to 'push', 'error' and 'end'.
*
* @name createEstream
* @param {Object} fn - the source function
* @param {Object} options - set of estream options

@@ -601,7 +595,6 @@ * @return {Estream}

* @example
* var estream1 = ES(function(push, error, end) {}, { buffer: false });
* var es1 = estream({ buffer: false });
*/
function createEstream(fn, options) {
function createEstream(options) {
var estream = new Estream(options);
fn(push.bind(null, estream), error.bind(null, estream), end.bind(null, estream));
return estream;

@@ -608,0 +601,0 @@ }

@@ -19,10 +19,10 @@ var estream = require('../estream');

var dataTO;
return estream(function(push) {
es.on(function(event) {
clearTimeout(dataTO);
dataTO = setTimeout(function() {
push(event);
}, interval);
});
var s = estream();
es.on(function(event) {
clearTimeout(dataTO);
dataTO = setTimeout(function() {
s.push(event);
}, interval);
});
return s;
}

@@ -29,0 +29,0 @@

@@ -12,10 +12,9 @@ var estream = require('../estream');

function endOnError(es) {
return estream(function(push, error, end) {
es.onError(function(err) {
error(err);
end();
});
var s = estream();
es.onError(function(err) {
s.error(err).end();
});
return s;
}
module.exports = endOnError;

@@ -23,22 +23,21 @@ var estream = require('../estream');

function reduce(fn, acc, es) {
return estream(function(push, error, end) {
es.on(function(event) {
if (event.isData) {
try {
acc = fn(acc, event.value);
} catch (err) {
error(err);
}
} else if (event.isError) {
push(event);
} else {
try {
end(event.value.reduce(fn, acc));
} catch (err) {
error(err);
end();
}
var s = estream();
es.on(function(event) {
if (event.isData) {
try {
acc = fn(acc, event.value);
} catch (err) {
s.error(err);
}
});
} else if (event.isError) {
s.push(event);
} else {
try {
s.end(event.value.reduce(fn, acc));
} catch (err) {
s.error(err).end();
}
}
});
return s;
}

@@ -45,0 +44,0 @@

@@ -19,14 +19,14 @@ var estream = require('../estream');

function take(count, es) {
return estream(function(push, error, end) {
es.on(function(event, self, off) {
if (event.isData) {
count--;
}
push(event);
if (count < 1) {
end();
off();
}
});
var s = estream();
es.on(function(event, self, off) {
if (event.isData) {
count--;
}
s.push(event);
if (count < 1) {
s.end();
off();
}
});
return s;
}

@@ -33,0 +33,0 @@

@@ -22,11 +22,11 @@ var estream = require('../estream');

function takeUntil(fn, es) {
return estream(function(push, error, end) {
es.on(function(event, _, off) {
push(event);
if (fn(event)) {
end();
off();
}
});
var s = estream();
es.on(function(event, _, off) {
s.push(event);
if (fn(event)) {
s.end();
off();
}
});
return s;
}

@@ -33,0 +33,0 @@

{
"name": "estream",
"version": "1.0.0",
"version": "2.0.0",
"description": "A javascript library with a simplified take on event streams.",

@@ -5,0 +5,0 @@ "main": "estream.js",

@@ -13,3 +13,3 @@ # Estream

* **EsData** - These are objects that are used to represent successful data.
* **EsError** - These are objects that are used to represent an error, either from the source itself or internally in the stream.
* **EsError** - These are objects that are used to represent an error, either from the source itself or internally in the stream (e.g. like from a failed map function)
* **EsEnd** - These are objects that are used to represent an end to an estream. All values are wrapped in an array as estreams can have multiple source estreams that end. Once an end is emitted by a stream, no more events will be emitted and all references to the consuming functions will be removed.

@@ -19,19 +19,11 @@

```javascript
var backendStream = estream(function(push, error, end) {
pollForData(function(err, res) {
if (err) {
error(err);
} else {
push(res);
}
});
var clickStream = estream();
var count = 0;
document.addEventListener('click', function(e) {
clickStream.push(++count);
});
backendStream.onData(function(data) {
// got some data
clickStream.onData(function(value) {
// value == count
});
backendStream.onError(function(error) {
// got an error
});
```

@@ -48,2 +40,28 @@

## Uni-Directional
Estreams flow one way. A consumer cannot trigger an estream to push data or execute a function, nor can it pass data up the stream to be used in some way. However, because anything that has acesss to a stream can push data into it, you can set up a circular flow of reactive estreams. For example:
```javascript
var estream = require('./estream');
var s1 = estream();
var s2 = estream();
s1.onData(function(val) {
// simulate a server or async action
setTimeout(function() {
s2.push(val + 5);
}, 500);
});
s2.onData(function(val) {
setTimeout(function() {
s1.push(val + 10);
}, 500);
});
s1.push(1); // kick things off.
```
## Inspiration

@@ -50,0 +68,0 @@

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var debounce = require('../modules/debounce');

@@ -9,8 +9,7 @@

var debounce500 = debounce(500); // this tests the auto-curry
var s = ES(function(push) {
setTimeout(function() {
push(1);
push(2);
assert.equal(val, 0);
});
var s = estream();
setTimeout(function() {
s.push(1);
s.push(2);
assert.equal(val, 0);
});

@@ -17,0 +16,0 @@

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var endOnError = require('../modules/end-on-error');

@@ -7,5 +7,4 @@

it('creates an estream that ends on an error', function(done) {
var s1 = ES(function(push, error) {
setTimeout(error.bind(null, 'error'));
});
var s1 = estream();
setTimeout(s1.error.bind(s1, 'error'));
var called = 0;

@@ -12,0 +11,0 @@

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var R = require('ramda');
var noop = function() {};
var add1 = function(x) { return x + 1; };

@@ -12,9 +11,8 @@ var sum = function(acc, value) { return acc + value; };

it('can create a Estream', function() {
assert.equal(typeof ES, 'function');
assert.equal(typeof estream, 'function');
});
it('has a method to push data into it and a consume that data', function(done) {
var s = ES(function(push) {
push(5);
});
var s = estream();
s.push(5);
s.on(function(data) {

@@ -28,5 +26,4 @@ assert.equal(data.isData, true);

it('routes errors', function(done) {
var s = ES(function(push, error) {
error('error');
});
var s = estream();
s.error('error');
s.on(function(err) {

@@ -40,5 +37,4 @@ assert.equal(err.isError, true);

it('routes ends', function(done) {
var s = ES(function(push, error, end) {
end('byebye');
});
var s = estream();
s.end('byebye');
s.on(function(end) {

@@ -52,13 +48,11 @@ assert.equal(end.isEnd, true);

it('can combine multiple source estreams', function(done) {
var s1 = ES(function(push, error, end) {
setTimeout(push.bind(null, 1));
setTimeout(end.bind(null, 'hello'), 150);
});
var s2 = ES(function(push, error, end) {
setTimeout(push.bind(null, 10));
setTimeout(error.bind(null, 'error'), 50);
setTimeout(end.bind(null, 'bye'), 300);
});
var s3 = ES.combine([s1, s2]);
var s4 = ES.combine([s3]);
var s1 = estream();
setTimeout(s1.push.bind(s1, 1));
setTimeout(s1.end.bind(s1, 'hello'), 150);
var s2 = estream();
setTimeout(s2.push.bind(s2, 10));
setTimeout(s2.error.bind(s2, 'error'), 50);
setTimeout(s2.end.bind(s2, 'bye'), 300);
var s3 = estream.combine([s1, s2]);
var s4 = estream.combine([s3]);
var called = 0;

@@ -95,7 +89,6 @@

it('has helper methods for consuming error and end messages', function(done) {
var s = ES(function(push, error, end) {
push(5);
error('error');
end('bye');
});
var s = estream();
s.push(5);
s.error('error');
s.end('bye');
var errorCalled;

@@ -116,13 +109,13 @@

var called;
var s = ES(function(push, error) {
setTimeout(function() {
push(1);
assert.ok(called);
error(new Error('boom'));
});
var s = estream();
setTimeout(function() {
s.push(1);
assert.ok(called);
s.error(new Error('boom'));
});
s.on(function(x, estream, unsubscribe) {
s.on(function(x, sref, unsubscribe) {
if (!called) {
assert.equal(x.value, 1);
assert.equal(sref, s);
unsubscribe();

@@ -139,5 +132,4 @@ called = true;

it('freezes event objects that are emitted', function(done) {
var s = ES(function(push) {
push(5);
});
var s = estream();
s.push(5);

@@ -152,8 +144,7 @@ s.on(function(event) {

it('returns the buffer with `getBuffer`', function(done) {
var s = ES(function(push, error) {
push(3);
push(5);
push(4);
setTimeout(error.bind(null, 'boom'), 100);
});
var s = estream();
s.push(3);
s.push(5);
s.push(4);
setTimeout(s.error.bind(s, 'boom'), 100);

@@ -177,8 +168,7 @@ var buffer = s.getBuffer(2);

it('clears the buffer with `clearbuffer`', function() {
var s = ES(function(push, error) {
push(3);
push(5);
push(4);
setTimeout(error.bind(null, 'boom'), 100);
});
var s = estream();
s.push(3);
s.push(5);
s.push(4);
setTimeout(s.error.bind(s, 'boom'), 100);

@@ -195,8 +185,7 @@ var buffer = s.getBuffer(2);

it('does not buffers messages with buffer set to false', function(done) {
var s = ES(function(push, error) {
push(3);
push(4);
error(new Error('boom'));
setTimeout(push.bind(null, 1), 50);
}, { buffer: false });
var s = estream({ buffer: false });
s.push(3);
s.push(4);
s.error(new Error('boom'));
setTimeout(s.push.bind(s, 1), 50);

@@ -210,7 +199,7 @@ s.on(function(x) {

it('has an addMethods method', function() {
ES.addMethods([{
estream.addMethods([{
name: 'fmap',
fn: function() {}
}]);
var p = ES(noop);
var p = estream();
assert.equal(typeof p.fmap, 'function');

@@ -220,19 +209,18 @@ });

it('empties the buffer if events happened before a consumer was added', function(done) {
var called;
var s = ES(function(push) {
push(3);
called = true;
});
var s = estream();
s.push(3);
s.on(function(x) {
assert.ok(called);
assert.equal(x.value, 3);
});
setTimeout(function() {
assert.equal(s.getBuffer().length, 0);
done();
});
}, 50);
});
it('doesnt keep a buffer if buffer option set to false', function(done) {
var s = ES(function(push) {
push(3);
}, { buffer: false });
var s = estream({ buffer: false });
s.push(3);

@@ -249,5 +237,4 @@ s.on(function() {

it('is a functor', function() {
var s = ES(function(push) {
push(1);
});
var s = estream();
s.push(1);
var add2 = function(x) { return x + 2; };

@@ -282,7 +269,7 @@ var composedMap = R.pipe(add1, add2);

it('maps a function over data values', function(done) {
var s1 = ES(function(push, error) {
error('error');
push(4);
});
var called = 0;
var s1 = estream();
s1.error('error');
s1.push(4);
s1

@@ -302,5 +289,4 @@ .map(add1)

it('catches errors', function(done) {
var s1 = ES(function(push) {
push(4);
});
var s1 = estream();
s1.push(4);
var errorFn = function() { throw new Error('boom'); };

@@ -319,7 +305,6 @@ s1

it('reduces pushed data events', function(done) {
var s = ES(function(push) {
push(5);
push(10);
});
var called = 0;
var s = estream();
s.push(5);
s.push(10);

@@ -339,6 +324,6 @@ s.scan(sum, 5)

it('catches errors', function(done) {
var s1 = ES(function(push) {
push(4);
});
var s1 = estream();
var errorFn = function() { throw new Error('boom'); };
s1.push(4);
s1

@@ -356,7 +341,6 @@ .scan(errorFn, 0)

it('filters data event values', function(done) {
var s = ES(function(push) {
push(5);
push(11);
});
var called = 0;
var s = estream();
s.push(5);
s.push(11);

@@ -372,6 +356,5 @@ s.filter(isGt10)

it('catches errors', function(done) {
var s1 = ES(function(push) {
push(4);
});
var s1 = estream();
var errorFn = function() { throw new Error('boom'); };
s1.push(4);
s1

@@ -389,8 +372,7 @@ .filter(errorFn)

it('filters events', function(done) {
var s = ES(function(push, error, end) {
push(5);
push('hello');
error('boom');
end('end');
});
var s = estream();
s.push(5);
s.push('hello');
s.error('boom');
s.end('end');

@@ -397,0 +379,0 @@ s.filterEvent(function (e) {

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var reduce = require('../modules/reduce');

@@ -8,9 +8,8 @@ var sum = function(acc, value) { return acc + value; };

it('reduces the values of a single stream', function(done) {
var s = ES(function(push, error, end) {
push(5);
push(10);
error(new Error('blah'));
push(3);
end(2);
});
var s = estream();
s.push(5);
s.push(10);
s.error(new Error('blah'));
s.push(3);
s.end(2);
var called = 0;

@@ -33,12 +32,10 @@

it('reduces the values of multiple streams', function(done) {
var s1 = ES(function(push, error, end) {
push(5);
setTimeout(push.bind(null, 3), 50);
setTimeout(end.bind(null, 4), 200);
});
var s2 = ES(function(push, error, end) {
push(10);
setTimeout(end.bind(null, 6), 500);
});
var s3 = ES.combine([s1, s2]);
var s1 = estream();
s1.push(5);
setTimeout(s1.push.bind(s1, 3), 50);
setTimeout(s1.end.bind(s1, 4), 200);
var s2 = estream();
s2.push(10);
setTimeout(s2.end.bind(s2, 6), 500);
var s3 = estream.combine([s1, s2]);

@@ -45,0 +42,0 @@ reduce(sum, 0, s3).on(function(event) {

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var take = require('../modules/take');

@@ -8,9 +8,8 @@

var count = 0;
var s = ES(function(push, error) {
setTimeout(function() {
push(1);
error('boom');
push(2);
push(3);
});
var s = estream();
setTimeout(function() {
s.push(1);
s.error('boom');
s.push(2);
s.push(3);
});

@@ -17,0 +16,0 @@

var assert = require('assert');
var ES = require('../estream');
var estream = require('../estream');
var takeUntil = require('../modules/takeUntil');

@@ -8,9 +8,8 @@

var count = 0;
var s = ES(function(push, error) {
setTimeout(function() {
push(1);
error('boom');
push(11);
push(20);
});
var s = estream();
setTimeout(function() {
s.push(1);
s.error('boom');
s.push(11);
s.push(20);
});

@@ -17,0 +16,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc