asynciterator
Advanced tools
Comparing version 1.0.1 to 1.1.0
var EventEmitter = require('events').EventEmitter; | ||
// Executes the given function with arguments as soon as possible | ||
var immediate = (function () { | ||
// Using native `setImmediate` is fastest in Node.js | ||
/* istanbul ignore else */ | ||
if (typeof process !== 'undefined' && !process.browser) | ||
return setImmediate; | ||
// In all other cases, rely on the `immediate` shim | ||
else { | ||
var immediate = require('immediate'), calls = 0; | ||
return function (f, x, y, z) { | ||
// Every once in a while, allow some time for events, painting, etc. | ||
calls++ === 1e4 ? setTimeout(f, calls = 0, x, y, z) : immediate(f, x, y, z); | ||
}; | ||
} | ||
}()); | ||
/** | ||
@@ -114,3 +130,3 @@ Names of possible iterator states. | ||
if (newState === ENDED) | ||
eventAsync ? setImmediate(emit, this, 'end') : this.emit('end'); | ||
eventAsync ? immediate(emit, this, 'end') : this.emit('end'); | ||
} | ||
@@ -179,3 +195,3 @@ return valid; | ||
: listeners.indexOf(listener) < 0)) | ||
this.addListener(eventName, listener); | ||
this.on(eventName, listener); | ||
}; | ||
@@ -214,3 +230,3 @@ | ||
function end(self) { self._end(); } | ||
function endAsync(self) { setImmediate(end, self); } | ||
function endAsync(self) { immediate(end, self); } | ||
@@ -239,3 +255,3 @@ /** | ||
if (readable) | ||
setImmediate(emit, this, 'readable'); | ||
immediate(emit, this, 'readable'); | ||
} | ||
@@ -291,3 +307,3 @@ }, | ||
if (this.readable) | ||
setImmediate(call, emitData, this); | ||
immediate(call, emitData, this); | ||
} | ||
@@ -330,3 +346,3 @@ } | ||
if (properties && (propertyName in properties)) | ||
setImmediate(callback, properties[propertyName]); | ||
immediate(callback, properties[propertyName]); | ||
// If the value was not set, store the callback for when the value will be set | ||
@@ -357,5 +373,5 @@ else { | ||
if (callbacks.length === 1) | ||
setImmediate(callbacks[0], value); | ||
immediate(callbacks[0], value); | ||
else { | ||
setImmediate(function () { | ||
immediate(function () { | ||
for (var i = 0; i < callbacks.length; i++) | ||
@@ -605,3 +621,3 @@ callbacks[i](value); | ||
this._buffer = []; | ||
this._pushed = 0; | ||
this._pushedCount = 0; | ||
this.maxBufferSize = maxBufferSize; | ||
@@ -611,3 +627,3 @@ | ||
this._reading = true; | ||
setImmediate(init, this, autoStart !== false || autoStart); | ||
immediate(init, this, autoStart !== false || autoStart); | ||
} | ||
@@ -735,3 +751,3 @@ AsyncIterator.subclass(BufferedIterator); | ||
throw new Error('Cannot push after the iterator was ended.'); | ||
this._pushed++; | ||
this._pushedCount++; | ||
this._buffer.push(item); | ||
@@ -760,3 +776,3 @@ this.readable = true; | ||
// Acquire reading lock and start reading, counting pushed items | ||
this._pushed = 0; | ||
this._pushedCount = 0; | ||
this._reading = true; | ||
@@ -776,3 +792,3 @@ this._read(neededItems, function () { | ||
// (even though all pushed items might already have been read) | ||
else if (self._pushed) { | ||
else if (self._pushedCount) { | ||
self.readable = true; | ||
@@ -790,3 +806,3 @@ // If the buffer is insufficiently full, continue filling | ||
self._reading = true; | ||
setImmediate(fillBufferAsyncCallback, self); | ||
immediate(fillBufferAsyncCallback, self); | ||
} | ||
@@ -874,2 +890,3 @@ } | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its transformation yields no items | ||
@param {AsyncIterator} [options.source] The source this iterator generates items from | ||
@@ -888,2 +905,3 @@ @extends BufferedIterator | ||
if (source) this.source = source; | ||
this._optional = !!(options && options.optional); | ||
} | ||
@@ -910,3 +928,3 @@ BufferedIterator.subclass(TransformIterator); | ||
else { | ||
source.once('end', destinationCloseWhenDone); | ||
source.on('end', destinationCloseWhenDone); | ||
source.on('readable', destinationFillBuffer); | ||
@@ -945,4 +963,4 @@ source.on('error', destinationEmitError); | ||
// Continue transforming until at least `count` items have been pushed | ||
if (self._pushed < count && !self.closed) | ||
setImmediate(readAndTransform, self, next, done); | ||
if (self._pushedCount < count && !self.closed) | ||
immediate(readAndTransform, self, next, done); | ||
else | ||
@@ -956,7 +974,21 @@ done(); | ||
var source = self._source, item; | ||
if (source && !source.ended && (item = source.read()) !== null) | ||
self._transform(item, next); | ||
if (source && !source.ended && (item = source.read()) !== null) { | ||
if (!self._optional) | ||
self._transform(item, next); | ||
else | ||
optionalTransform(self, item, next); | ||
} | ||
else | ||
done(); | ||
} | ||
// Tries to transform the item; | ||
// if the transformation yields no items, pushes the original item | ||
function optionalTransform(self, item, done) { | ||
var pushedCount = self._pushedCount; | ||
self._transform(item, function () { | ||
if (pushedCount === self._pushedCount) | ||
self._push(item); | ||
done(); | ||
}); | ||
} | ||
@@ -1030,2 +1062,3 @@ /** | ||
@param {Function} [options.transform] A function to asynchronously transform items from the source | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items | ||
@param {Array|AsyncIterator} [options.prepend] Items to insert before the source items | ||
@@ -1073,4 +1106,4 @@ @param {Array|AsyncIterator} [options.append] Items to insert after the source items | ||
// Continue transforming until at least `count` items have been pushed | ||
if (self._pushed < count && !self.closed) | ||
setImmediate(readAndTransformSimple, self, next, done); | ||
if (self._pushedCount < count && !self.closed) | ||
immediate(readAndTransformSimple, self, next, done); | ||
else | ||
@@ -1080,2 +1113,3 @@ done(); | ||
}; | ||
// Reads an item and performs each of the simple transformations on it | ||
function readAndTransformSimple(self, next, done) { | ||
@@ -1097,4 +1131,16 @@ // Verify we have a readable source | ||
// Map and transform the item | ||
item = self._map(item); | ||
return item === null ? next() : self._transform(item, next); | ||
var mappedItem = self._map(item); | ||
if (mappedItem !== null) { | ||
if (!self._optional) | ||
self._transform(mappedItem, next); | ||
else | ||
optionalTransform(self, mappedItem, next); | ||
} | ||
// Don't transform a `null` item | ||
else { | ||
if (self._optional) | ||
self._push(item); | ||
next(); | ||
} | ||
return; | ||
} | ||
@@ -1126,6 +1172,12 @@ self._offset--; | ||
done(); | ||
else | ||
inserter.on('data', push), inserter.once('end', end); | ||
else { | ||
inserter.on('data', push); | ||
inserter.on('end', end); | ||
} | ||
function push(item) { self._push(item); } | ||
function end() { inserter.removeListener('data', push); done(); } | ||
function end() { | ||
inserter.removeListener('data', push); | ||
inserter.removeListener('end', end); | ||
done(); | ||
} | ||
}; | ||
@@ -1268,3 +1320,3 @@ | ||
TransformIterator.call(this, source, options); | ||
this._transformers = []; | ||
this._transformerQueue = []; | ||
} | ||
@@ -1276,5 +1328,11 @@ TransformIterator.subclass(MultiTransformIterator); | ||
// Remove transformers that have ended | ||
var item, transformer, transformers = this._transformers, source = this._source; | ||
while ((transformer = transformers[0]) && transformer.ended) { | ||
transformer = transformers.shift(); | ||
var item, head, transformer, transformerQueue = this._transformerQueue, | ||
source = this._source, optional = this._optional; | ||
while ((head = transformerQueue[0]) && head.transformer.ended) { | ||
// If transforming is optional, push the original item if none was pushed | ||
if (optional && head.item !== null) | ||
this._push(head.item), count--; | ||
// Remove listeners from the transformer | ||
head = transformerQueue.shift(), transformer = head.transformer; | ||
transformer.removeListener('end', destinationFillBuffer); | ||
transformer.removeListener('readable', destinationFillBuffer); | ||
@@ -1285,3 +1343,3 @@ transformer.removeListener('error', destinationEmitError); | ||
// Create new transformers if there are less than the maximum buffer size | ||
while (source && !source.ended && transformers.length < this._maxBufferSize) { | ||
while (source && !source.ended && transformerQueue.length < this._maxBufferSize) { | ||
// Read an item to create the next transformer | ||
@@ -1292,17 +1350,20 @@ item = this._source.read(); | ||
// Create the transformer and listen to its events | ||
transformer = this._createTransformer(item); | ||
if (transformer && !transformer.ended) { | ||
transformer._destination = this; | ||
transformer.once('end', destinationFillBuffer); | ||
transformer.on('readable', destinationFillBuffer); | ||
transformer.on('error', destinationEmitError); | ||
transformers.push(transformer); | ||
} | ||
transformer = this._createTransformer(item) || new EmptyIterator(); | ||
transformer._destination = this; | ||
transformer.on('end', destinationFillBuffer); | ||
transformer.on('readable', destinationFillBuffer); | ||
transformer.on('error', destinationEmitError); | ||
transformerQueue.push({ transformer: transformer, item: item }); | ||
} | ||
// Try to read `count` items from the transformer | ||
transformer = transformers[0]; | ||
if (transformer) { | ||
while (count-- > 0 && (item = transformer.read()) !== null) | ||
head = transformerQueue[0]; | ||
if (head) { | ||
transformer = head.transformer; | ||
while (count-- > 0 && (item = transformer.read()) !== null) { | ||
this._push(item); | ||
// If a transformed item was pushed, no need to push the original anymore | ||
if (optional) | ||
head.item = null; | ||
} | ||
} | ||
@@ -1326,3 +1387,3 @@ // End the iterator if the source has ended | ||
// Only close if all transformers are read | ||
if (!this._transformers.length) | ||
if (!this._transformerQueue.length) | ||
this.close(); | ||
@@ -1466,3 +1527,3 @@ }; | ||
// When the source ends, close all clones that are fully read | ||
source.once('end', clonesEnd); | ||
source.on('end', clonesEnd); | ||
function clonesEnd() { | ||
@@ -1474,3 +1535,4 @@ for (var i = 0; i < clones.length; i++) { | ||
clones = null; | ||
source.removeListener('error', clonesEmitError); | ||
source.removeListener('end', clonesEnd); | ||
source.removeListener('error', clonesEmitError); | ||
source.removeListener('readable', clonesMakeReadable); | ||
@@ -1477,0 +1539,0 @@ } |
{ | ||
"name": "asynciterator", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "An asynchronous iterator library for advanced object pipelines.", | ||
@@ -14,5 +14,6 @@ "author": "Ruben Verborgh <ruben.verborgh@gmail.com>", | ||
"license": "MIT", | ||
"repository": "RubenVerborgh/AsyncIterator", | ||
"devDependencies": { | ||
"chai": "^3.5.0", | ||
"eslint": "^2.13.1", | ||
"eslint": "^3.0.1", | ||
"istanbul": "^0.4.4", | ||
@@ -23,3 +24,6 @@ "jsdoc": "^3.4.0", | ||
"sinon-chai": "^2.8.0" | ||
}, | ||
"dependencies": { | ||
"immediate": "^3.2.3" | ||
} | ||
} |
@@ -843,2 +843,52 @@ var TransformIterator = require('../asynciterator').TransformIterator; | ||
}); | ||
describe('A TransformIterator with optional set to false', function () { | ||
var iterator, source; | ||
before(function () { | ||
source = new ArrayIterator([1, 2, 3, 4, 5, 6]); | ||
iterator = new TransformIterator(source, { optional: false }); | ||
iterator._transform = function (item, done) { | ||
if (item % 3 !== 0) | ||
this._push('t' + item); | ||
done(); | ||
}; | ||
}); | ||
describe('when reading items', function () { | ||
var items = []; | ||
before(function (done) { | ||
iterator.on('data', function (item) { items.push(item); }); | ||
iterator.on('end', done); | ||
}); | ||
it('should return items not transformed into null', function () { | ||
items.should.deep.equal(['t1', 't2', 't4', 't5']); | ||
}); | ||
}); | ||
}); | ||
describe('A TransformIterator with optional set to true', function () { | ||
var iterator, source; | ||
before(function () { | ||
source = new ArrayIterator([1, 2, 3, 4, 5, 6]); | ||
iterator = new TransformIterator(source, { optional: true }); | ||
iterator._transform = function (item, done) { | ||
if (item % 3 !== 0) | ||
this._push('t' + item); | ||
done(); | ||
}; | ||
}); | ||
describe('when reading items', function () { | ||
var items = []; | ||
before(function (done) { | ||
iterator.on('data', function (item) { items.push(item); }); | ||
iterator.on('end', done); | ||
}); | ||
it('should return the transformed items, or if none, the item itself', function () { | ||
items.should.deep.equal(['t1', 't2', 3, 't4', 't5', 6]); | ||
}); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
297952
7251
1
+ Addedimmediate@^3.2.3
+ Addedimmediate@3.3.0(transitive)