Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

asynciterator

Package Overview
Dependencies
Maintainers
1
Versions
34
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

asynciterator - npm Package Compare versions

Comparing version 1.0.1 to 1.1.0

152

asynciterator.js
var EventEmitter = require('events').EventEmitter;
// Executes the given function with arguments as soon as possible
var immediate = (function () {
// Using native `setImmediate` is fastest in Node.js
/* istanbul ignore else */
if (typeof process !== 'undefined' && !process.browser)
return setImmediate;
// In all other cases, rely on the `immediate` shim
else {
var immediate = require('immediate'), calls = 0;
return function (f, x, y, z) {
// Every once in a while, allow some time for events, painting, etc.
calls++ === 1e4 ? setTimeout(f, calls = 0, x, y, z) : immediate(f, x, y, z);
};
}
}());
/**

@@ -114,3 +130,3 @@ Names of possible iterator states.

if (newState === ENDED)
eventAsync ? setImmediate(emit, this, 'end') : this.emit('end');
eventAsync ? immediate(emit, this, 'end') : this.emit('end');
}

@@ -179,3 +195,3 @@ return valid;

: listeners.indexOf(listener) < 0))
this.addListener(eventName, listener);
this.on(eventName, listener);
};

@@ -214,3 +230,3 @@

function end(self) { self._end(); }
function endAsync(self) { setImmediate(end, self); }
function endAsync(self) { immediate(end, self); }

@@ -239,3 +255,3 @@ /**

if (readable)
setImmediate(emit, this, 'readable');
immediate(emit, this, 'readable');
}

@@ -291,3 +307,3 @@ },

if (this.readable)
setImmediate(call, emitData, this);
immediate(call, emitData, this);
}

@@ -330,3 +346,3 @@ }

if (properties && (propertyName in properties))
setImmediate(callback, properties[propertyName]);
immediate(callback, properties[propertyName]);
// If the value was not set, store the callback for when the value will be set

@@ -357,5 +373,5 @@ else {

if (callbacks.length === 1)
setImmediate(callbacks[0], value);
immediate(callbacks[0], value);
else {
setImmediate(function () {
immediate(function () {
for (var i = 0; i < callbacks.length; i++)

@@ -605,3 +621,3 @@ callbacks[i](value);

this._buffer = [];
this._pushed = 0;
this._pushedCount = 0;
this.maxBufferSize = maxBufferSize;

@@ -611,3 +627,3 @@

this._reading = true;
setImmediate(init, this, autoStart !== false || autoStart);
immediate(init, this, autoStart !== false || autoStart);
}

@@ -735,3 +751,3 @@ AsyncIterator.subclass(BufferedIterator);

throw new Error('Cannot push after the iterator was ended.');
this._pushed++;
this._pushedCount++;
this._buffer.push(item);

@@ -760,3 +776,3 @@ this.readable = true;

// Acquire reading lock and start reading, counting pushed items
this._pushed = 0;
this._pushedCount = 0;
this._reading = true;

@@ -776,3 +792,3 @@ this._read(neededItems, function () {

// (even though all pushed items might already have been read)
else if (self._pushed) {
else if (self._pushedCount) {
self.readable = true;

@@ -790,3 +806,3 @@ // If the buffer is insufficiently full, continue filling

self._reading = true;
setImmediate(fillBufferAsyncCallback, self);
immediate(fillBufferAsyncCallback, self);
}

@@ -874,2 +890,3 @@ }

@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its transformation yields no items
@param {AsyncIterator} [options.source] The source this iterator generates items from

@@ -888,2 +905,3 @@ @extends BufferedIterator

if (source) this.source = source;
this._optional = !!(options && options.optional);
}

@@ -910,3 +928,3 @@ BufferedIterator.subclass(TransformIterator);

else {
source.once('end', destinationCloseWhenDone);
source.on('end', destinationCloseWhenDone);
source.on('readable', destinationFillBuffer);

@@ -945,4 +963,4 @@ source.on('error', destinationEmitError);

// Continue transforming until at least `count` items have been pushed
if (self._pushed < count && !self.closed)
setImmediate(readAndTransform, self, next, done);
if (self._pushedCount < count && !self.closed)
immediate(readAndTransform, self, next, done);
else

@@ -956,7 +974,21 @@ done();

var source = self._source, item;
if (source && !source.ended && (item = source.read()) !== null)
self._transform(item, next);
if (source && !source.ended && (item = source.read()) !== null) {
if (!self._optional)
self._transform(item, next);
else
optionalTransform(self, item, next);
}
else
done();
}
// Tries to transform the item;
// if the transformation yields no items, pushes the original item
function optionalTransform(self, item, done) {
var pushedCount = self._pushedCount;
self._transform(item, function () {
if (pushedCount === self._pushedCount)
self._push(item);
done();
});
}

@@ -1030,2 +1062,3 @@ /**

@param {Function} [options.transform] A function to asynchronously transform items from the source
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items
@param {Array|AsyncIterator} [options.prepend] Items to insert before the source items

@@ -1073,4 +1106,4 @@ @param {Array|AsyncIterator} [options.append] Items to insert after the source items

// Continue transforming until at least `count` items have been pushed
if (self._pushed < count && !self.closed)
setImmediate(readAndTransformSimple, self, next, done);
if (self._pushedCount < count && !self.closed)
immediate(readAndTransformSimple, self, next, done);
else

@@ -1080,2 +1113,3 @@ done();

};
// Reads an item and performs each of the simple transformations on it
function readAndTransformSimple(self, next, done) {

@@ -1097,4 +1131,16 @@ // Verify we have a readable source

// Map and transform the item
item = self._map(item);
return item === null ? next() : self._transform(item, next);
var mappedItem = self._map(item);
if (mappedItem !== null) {
if (!self._optional)
self._transform(mappedItem, next);
else
optionalTransform(self, mappedItem, next);
}
// Don't transform a `null` item
else {
if (self._optional)
self._push(item);
next();
}
return;
}

@@ -1126,6 +1172,12 @@ self._offset--;

done();
else
inserter.on('data', push), inserter.once('end', end);
else {
inserter.on('data', push);
inserter.on('end', end);
}
function push(item) { self._push(item); }
function end() { inserter.removeListener('data', push); done(); }
function end() {
inserter.removeListener('data', push);
inserter.removeListener('end', end);
done();
}
};

@@ -1268,3 +1320,3 @@

TransformIterator.call(this, source, options);
this._transformers = [];
this._transformerQueue = [];
}

@@ -1276,5 +1328,11 @@ TransformIterator.subclass(MultiTransformIterator);

// Remove transformers that have ended
var item, transformer, transformers = this._transformers, source = this._source;
while ((transformer = transformers[0]) && transformer.ended) {
transformer = transformers.shift();
var item, head, transformer, transformerQueue = this._transformerQueue,
source = this._source, optional = this._optional;
while ((head = transformerQueue[0]) && head.transformer.ended) {
// If transforming is optional, push the original item if none was pushed
if (optional && head.item !== null)
this._push(head.item), count--;
// Remove listeners from the transformer
head = transformerQueue.shift(), transformer = head.transformer;
transformer.removeListener('end', destinationFillBuffer);
transformer.removeListener('readable', destinationFillBuffer);

@@ -1285,3 +1343,3 @@ transformer.removeListener('error', destinationEmitError);

// Create new transformers if there are less than the maximum buffer size
while (source && !source.ended && transformers.length < this._maxBufferSize) {
while (source && !source.ended && transformerQueue.length < this._maxBufferSize) {
// Read an item to create the next transformer

@@ -1292,17 +1350,20 @@ item = this._source.read();

// Create the transformer and listen to its events
transformer = this._createTransformer(item);
if (transformer && !transformer.ended) {
transformer._destination = this;
transformer.once('end', destinationFillBuffer);
transformer.on('readable', destinationFillBuffer);
transformer.on('error', destinationEmitError);
transformers.push(transformer);
}
transformer = this._createTransformer(item) || new EmptyIterator();
transformer._destination = this;
transformer.on('end', destinationFillBuffer);
transformer.on('readable', destinationFillBuffer);
transformer.on('error', destinationEmitError);
transformerQueue.push({ transformer: transformer, item: item });
}
// Try to read `count` items from the transformer
transformer = transformers[0];
if (transformer) {
while (count-- > 0 && (item = transformer.read()) !== null)
head = transformerQueue[0];
if (head) {
transformer = head.transformer;
while (count-- > 0 && (item = transformer.read()) !== null) {
this._push(item);
// If a transformed item was pushed, no need to push the original anymore
if (optional)
head.item = null;
}
}

@@ -1326,3 +1387,3 @@ // End the iterator if the source has ended

// Only close if all transformers are read
if (!this._transformers.length)
if (!this._transformerQueue.length)
this.close();

@@ -1466,3 +1527,3 @@ };

// When the source ends, close all clones that are fully read
source.once('end', clonesEnd);
source.on('end', clonesEnd);
function clonesEnd() {

@@ -1474,3 +1535,4 @@ for (var i = 0; i < clones.length; i++) {

clones = null;
source.removeListener('error', clonesEmitError);
source.removeListener('end', clonesEnd);
source.removeListener('error', clonesEmitError);
source.removeListener('readable', clonesMakeReadable);

@@ -1477,0 +1539,0 @@ }

{
"name": "asynciterator",
"version": "1.0.1",
"version": "1.1.0",
"description": "An asynchronous iterator library for advanced object pipelines.",

@@ -14,5 +14,6 @@ "author": "Ruben Verborgh <ruben.verborgh@gmail.com>",

"license": "MIT",
"repository": "RubenVerborgh/AsyncIterator",
"devDependencies": {
"chai": "^3.5.0",
"eslint": "^2.13.1",
"eslint": "^3.0.1",
"istanbul": "^0.4.4",

@@ -23,3 +24,6 @@ "jsdoc": "^3.4.0",

"sinon-chai": "^2.8.0"
},
"dependencies": {
"immediate": "^3.2.3"
}
}

@@ -843,2 +843,52 @@ var TransformIterator = require('../asynciterator').TransformIterator;

});
describe('A TransformIterator with optional set to false', function () {
var iterator, source;
before(function () {
source = new ArrayIterator([1, 2, 3, 4, 5, 6]);
iterator = new TransformIterator(source, { optional: false });
iterator._transform = function (item, done) {
if (item % 3 !== 0)
this._push('t' + item);
done();
};
});
describe('when reading items', function () {
var items = [];
before(function (done) {
iterator.on('data', function (item) { items.push(item); });
iterator.on('end', done);
});
it('should return items not transformed into null', function () {
items.should.deep.equal(['t1', 't2', 't4', 't5']);
});
});
});
describe('A TransformIterator with optional set to true', function () {
var iterator, source;
before(function () {
source = new ArrayIterator([1, 2, 3, 4, 5, 6]);
iterator = new TransformIterator(source, { optional: true });
iterator._transform = function (item, done) {
if (item % 3 !== 0)
this._push('t' + item);
done();
};
});
describe('when reading items', function () {
var items = [];
before(function (done) {
iterator.on('data', function (item) { items.push(item); });
iterator.on('end', done);
});
it('should return the transformed items, or if none, the item itself', function () {
items.should.deep.equal(['t1', 't2', 3, 't4', 't5', 6]);
});
});
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc