asynciterator
Advanced tools
Comparing version 3.0.0-alpha.0 to 3.0.0-beta.0
2649
asynciterator.js
@@ -1,16 +0,9 @@ | ||
const { EventEmitter } = require('events'); | ||
const queueMicrotask = require('queue-microtask'); | ||
/** | ||
Names of possible iterator states. | ||
The state's position in the array corresponds to its ID. | ||
@name AsyncIterator.STATES | ||
@type String[] | ||
@protected | ||
*/ | ||
const STATES = AsyncIterator.STATES = ['INIT', 'OPEN', 'CLOSING', 'CLOSED', 'ENDED', 'DESTROYED']; | ||
const INIT = 0, OPEN = 1, CLOSING = 2, CLOSED = 3, ENDED = 4, DESTROYED = 5; | ||
for (const id in STATES) | ||
AsyncIterator[STATES[id]] = id; | ||
* An asynchronous iterator library for advanced object pipelines | ||
* @module asynciterator | ||
*/ | ||
import { EventEmitter } from 'events'; | ||
import queueMicrotask from 'queue-microtask'; | ||
/** | ||
@@ -20,6 +13,5 @@ ID of the INIT state. | ||
It can already produce items. | ||
@name AsyncIterator.INIT | ||
@type integer | ||
@protected | ||
*/ | ||
export const INIT = 1 << 0; | ||
@@ -29,6 +21,5 @@ /** | ||
An iterator is open if it can generate new items. | ||
@name AsyncIterator.OPEN | ||
@type integer | ||
@protected | ||
*/ | ||
export const OPEN = 1 << 1; | ||
@@ -38,6 +29,5 @@ /** | ||
An iterator is closing if item generation is pending but will not be scheduled again. | ||
@name AsyncIterator.CLOSING | ||
@type integer | ||
@protected | ||
*/ | ||
export const CLOSING = 1 << 2; | ||
@@ -48,6 +38,5 @@ /** | ||
Items might still be available. | ||
@name AsyncIterator.CLOSED | ||
@type integer | ||
@protected | ||
*/ | ||
export const CLOSED = 1 << 3; | ||
@@ -58,212 +47,212 @@ /** | ||
The 'end' event is guaranteed to have been called when in this state. | ||
@name AsyncIterator.ENDED | ||
@type integer | ||
@protected | ||
*/ | ||
export const ENDED = 1 << 4; | ||
/** | ||
ID of the DESTROYED state. | ||
An iterator has been destroyed after calling {@link AsyncIterator#destroy}. | ||
An iterator has been destroyed | ||
after calling {@link module:asynciterator.AsyncIterator#destroy}. | ||
The 'end' event has not been called, as pending elements were voided. | ||
@name AsyncIterator.DESTROYED | ||
@type integer | ||
@protected | ||
*/ | ||
export const DESTROYED = 1 << 5; | ||
/** | ||
Creates a new `AsyncIterator`. | ||
@public | ||
@constructor | ||
@classdesc An asynchronous iterator provides pull-based access to a stream of objects. | ||
@extends EventEmitter | ||
An asynchronous iterator provides pull-based access to a stream of objects. | ||
@extends module:asynciterator.EventEmitter | ||
*/ | ||
function AsyncIterator() { | ||
if (!(this instanceof AsyncIterator)) | ||
return new AsyncIterator(); | ||
EventEmitter.call(this); | ||
this.on('newListener', waitForDataListener); | ||
this._state = OPEN; | ||
this._readable = false; | ||
} | ||
export class AsyncIterator extends EventEmitter { | ||
/** Creates a new `AsyncIterator`. */ | ||
constructor() { | ||
super(); | ||
this._state = OPEN; | ||
this._readable = false; | ||
this.on('newListener', waitForDataListener); | ||
} | ||
/** | ||
Makes the prototype of the current constructor a prototype for the given constructor. | ||
@protected | ||
@function AsyncIterator.subclass | ||
@param {Function} Constructor The constructor that should inherit from the current constructor | ||
@returns {AsyncIterator} The constructor's prototype | ||
*/ | ||
(function subclass(Constructor) { | ||
Constructor.prototype = Object.create(this.prototype, | ||
{ constructor: { value: Constructor, configurable: true, writable: true } }); | ||
Constructor.subclass = subclass; | ||
}).call(EventEmitter, AsyncIterator); | ||
/** | ||
Changes the iterator to the given state if possible and necessary, | ||
possibly emitting events to signal that change. | ||
@protected | ||
@param {integer} newState The ID of the new state (from the `STATES` array) | ||
@param {boolean} [eventAsync=false] Whether resulting events should be emitted asynchronously | ||
@returns {boolean} Whether the state was changed | ||
@emits AsyncIterator.end | ||
*/ | ||
AsyncIterator.prototype._changeState = function (newState, eventAsync) { | ||
// Validate the state change | ||
const valid = newState > this._state && this._state < ENDED; | ||
if (valid) { | ||
this._state = newState; | ||
// Emit the `end` event when changing to ENDED | ||
if (newState === ENDED) { | ||
if (!eventAsync) | ||
this.emit('end'); | ||
else | ||
queueMicrotask(() => this.emit('end')); | ||
/** | ||
Changes the iterator to the given state if possible and necessary, | ||
possibly emitting events to signal that change. | ||
@protected | ||
@param {integer} newState The ID of the new state | ||
@param {boolean} [eventAsync=false] Whether resulting events should be emitted asynchronously | ||
@returns {boolean} Whether the state was changed | ||
@emits module:asynciterator.AsyncIterator.end | ||
*/ | ||
_changeState(newState, eventAsync) { | ||
// Validate the state change | ||
const valid = newState > this._state && this._state < ENDED; | ||
if (valid) { | ||
this._state = newState; | ||
// Emit the `end` event when changing to ENDED | ||
if (newState === ENDED) { | ||
if (!eventAsync) | ||
this.emit('end'); | ||
else | ||
queueMicrotask(() => this.emit('end')); | ||
} | ||
} | ||
return valid; | ||
} | ||
return valid; | ||
}; | ||
/** | ||
Tries to read the next item from the iterator. | ||
This is the main method for reading the iterator in _on-demand mode_, | ||
where new items are only created when needed by consumers. | ||
If no items are currently available, this methods returns `null`. | ||
The {@link AsyncIterator.event:readable} event will then signal when new items might be ready. | ||
To read all items from the iterator, | ||
switch to _flow mode_ by subscribing to the {@link AsyncIterator.event:data} event. | ||
When in flow mode, do not use the `read` method. | ||
@returns {object?} The next item, or `null` if none is available | ||
*/ | ||
AsyncIterator.prototype.read = function () { | ||
return null; | ||
}; | ||
/** | ||
Tries to read the next item from the iterator. | ||
This is the main method for reading the iterator in _on-demand mode_, | ||
where new items are only created when needed by consumers. | ||
If no items are currently available, this methods returns `null`. | ||
The {@link module:asynciterator.event:readable} event | ||
will then signal when new items might be ready. | ||
To read all items from the iterator, | ||
switch to _flow mode_ by subscribing | ||
to the {@link module:asynciterator.event:data} event. | ||
When in flow mode, do not use the `read` method. | ||
@returns {object?} The next item, or `null` if none is available | ||
*/ | ||
read() { | ||
return null; | ||
} | ||
/** | ||
Emitted when the iterator might have new items available | ||
after having had no items available right before this event. | ||
If the iterator is not in flow mode, | ||
items can be retrieved by calling {@link AsyncIterator#read}. | ||
@event AsyncIterator.readable | ||
*/ | ||
/** | ||
The iterator emits a `readable` event when it might have new items available | ||
after having had no items available right before this event. | ||
If the iterator is not in flow mode, items can be retrieved | ||
by calling {@link module:asynciterator.AsyncIterator#read}. | ||
@event module:asynciterator.readable | ||
*/ | ||
/** | ||
Invokes the callback for each remaining item in the iterator. | ||
Switches the iterator to flow mode. | ||
@param {Function} callback A function that will be called with each item | ||
@param {object?} self The `this` pointer for the callback | ||
*/ | ||
AsyncIterator.prototype.each = function (callback, self) { | ||
this.on('data', self ? callback.bind(self) : callback); | ||
}; | ||
/** | ||
The iterator emits a `data` event with a new item as soon as it becomes available. | ||
When one or more listeners are attached to the `data` event, | ||
the iterator switches to _flow mode_, | ||
generating and emitting new items as fast as possible. | ||
This drains the source and might create backpressure on the consumers, | ||
so only subscribe to this event if this behavior is intended. | ||
In flow mode, don't use {@link module:asynciterator.AsyncIterator#read}. | ||
To switch back to _on-demand mode_, remove all listeners from the `data` event. | ||
You can then obtain items through `read` again. | ||
@event module:asynciterator.data | ||
@param {object} item The new item | ||
*/ | ||
/** | ||
Verifies whether the iterator has listeners for the given event. | ||
@private | ||
@param {string} eventName The name of the event | ||
@returns {boolean} Whether the iterator has listeners | ||
*/ | ||
AsyncIterator.prototype._hasListeners = function (eventName) { | ||
return this._events && (eventName in this._events); | ||
}; | ||
/** | ||
Invokes the callback for each remaining item in the iterator. | ||
Switches the iterator to flow mode. | ||
@param {Function} callback A function that will be called with each item | ||
@param {object?} self The `this` pointer for the callback | ||
*/ | ||
each(callback, self) { | ||
this.on('data', self ? callback.bind(self) : callback); | ||
} | ||
/** | ||
Adds the listener to the event, if it has not been added previously. | ||
@private | ||
@param {string} eventName The name of the event | ||
@param {Function} listener The listener to add | ||
*/ | ||
AsyncIterator.prototype._addSingleListener = function (eventName, listener) { | ||
const listeners = this._events && this._events[eventName]; | ||
if (!listeners || | ||
(isFunction(listeners) ? listeners !== listener : listeners.indexOf(listener) < 0)) | ||
this.on(eventName, listener); | ||
}; | ||
/** | ||
Verifies whether the iterator has listeners for the given event. | ||
@private | ||
@param {string} eventName The name of the event | ||
@returns {boolean} Whether the iterator has listeners | ||
*/ | ||
_hasListeners(eventName) { | ||
return this._events && (eventName in this._events); | ||
} | ||
/** | ||
Stops the iterator from generating new items. | ||
Already generated items or terminating items can still be emitted. | ||
After this, the iterator will end asynchronously. | ||
@emits AsyncIterator.end | ||
*/ | ||
AsyncIterator.prototype.close = function () { | ||
if (this._changeState(CLOSED)) | ||
this._endAsync(); | ||
}; | ||
/** | ||
Adds the listener to the event, if it has not been added previously. | ||
@private | ||
@param {string} eventName The name of the event | ||
@param {Function} listener The listener to add | ||
*/ | ||
_addSingleListener(eventName, listener) { | ||
const listeners = this._events && this._events[eventName]; | ||
if (!listeners || | ||
(isFunction(listeners) ? listeners !== listener : listeners.indexOf(listener) < 0)) | ||
this.on(eventName, listener); | ||
} | ||
/** | ||
Destroy the iterator and stop it from generating new items. | ||
This will not do anything if the iterator was already ended or destroyed. | ||
All internal resources will be released an no new items will be emitted, | ||
even not already generated items. | ||
Implementors should not override this method, | ||
but instead implement {@link AsyncIterator#_destroy}. | ||
@param {Error} [cause] An optional error to emit. | ||
@emits AsyncIterator.end | ||
@emits AsyncIterator.error Only emitted if an error is passed. | ||
*/ | ||
AsyncIterator.prototype.destroy = function (cause) { | ||
if (!this.done) { | ||
this._destroy(cause, error => { | ||
cause = cause || error; | ||
if (cause) | ||
this.emit('error', cause); | ||
this._end(true); | ||
}); | ||
/** | ||
Stops the iterator from generating new items. | ||
Already generated items or terminating items can still be emitted. | ||
After this, the iterator will end asynchronously. | ||
@emits module:asynciterator.AsyncIterator.end | ||
*/ | ||
close() { | ||
if (this._changeState(CLOSED)) | ||
this._endAsync(); | ||
} | ||
}; | ||
/** | ||
Called by {@link AsyncIterator#destroy}. | ||
Implementers can override this, but this should not be called directly. | ||
@param {?Error} cause The reason why the iterator is destroyed. | ||
@param {Function} callback A callback function with an optional error argument. | ||
*/ | ||
AsyncIterator.prototype._destroy = function (cause, callback) { | ||
callback(); | ||
}; | ||
/** | ||
Destroy the iterator and stop it from generating new items. | ||
This will not do anything if the iterator was already ended or destroyed. | ||
All internal resources will be released an no new items will be emitted, | ||
even not already generated items. | ||
Implementors should not override this method, | ||
but instead implement {@link module:asynciterator.AsyncIterator#_destroy}. | ||
@param {Error} [cause] An optional error to emit. | ||
@emits module:asynciterator.AsyncIterator.end | ||
@emits module:asynciterator.AsyncIterator.error Only if an error is passed. | ||
*/ | ||
destroy(cause) { | ||
if (!this.done) { | ||
this._destroy(cause, error => { | ||
cause = cause || error; | ||
if (cause) | ||
this.emit('error', cause); | ||
this._end(true); | ||
}); | ||
} | ||
} | ||
/** | ||
Ends the iterator and cleans up. | ||
Should never be called before {@link AsyncIterator#close}; | ||
typically, `close` is responsible for calling `_end`. | ||
@param {boolean} [destroy] If the iterator should be forcefully destroyed. | ||
@protected | ||
@emits AsyncIterator.end | ||
*/ | ||
AsyncIterator.prototype._end = function (destroy) { | ||
if (this._changeState(destroy ? DESTROYED : ENDED)) { | ||
this._readable = false; | ||
this.removeAllListeners('readable'); | ||
this.removeAllListeners('data'); | ||
this.removeAllListeners('end'); | ||
/** | ||
Called by {@link module:asynciterator.AsyncIterator#destroy}. | ||
Implementers can override this, but this should not be called directly. | ||
@protected | ||
@param {?Error} cause The reason why the iterator is destroyed. | ||
@param {Function} callback A callback function with an optional error argument. | ||
*/ | ||
_destroy(cause, callback) { | ||
callback(); | ||
} | ||
}; | ||
/** | ||
Asynchronously calls `_end`. | ||
*/ | ||
AsyncIterator.prototype._endAsync = function () { | ||
queueMicrotask(() => this._end()); | ||
}; | ||
/** | ||
Ends the iterator and cleans up. | ||
Should never be called before {@link module:asynciterator.AsyncIterator#close}; | ||
typically, `close` is responsible for calling `_end`. | ||
@param {boolean} [destroy] If the iterator should be forcefully destroyed. | ||
@protected | ||
@emits module:asynciterator.AsyncIterator.end | ||
*/ | ||
_end(destroy) { | ||
if (this._changeState(destroy ? DESTROYED : ENDED)) { | ||
this._readable = false; | ||
this.removeAllListeners('readable'); | ||
this.removeAllListeners('data'); | ||
this.removeAllListeners('end'); | ||
} | ||
} | ||
/** | ||
Emitted after the last item of the iterator has been read. | ||
@event AsyncIterator.end | ||
*/ | ||
/** | ||
Asynchronously calls `_end`. | ||
@protected | ||
*/ | ||
_endAsync() { | ||
queueMicrotask(() => this._end()); | ||
} | ||
/** | ||
Gets or sets whether this iterator might have items available for read. | ||
A value of `false` means there are _definitely_ no items available; | ||
a value of `true` means items _might_ be available. | ||
@name AsyncIterator#readable | ||
@type boolean | ||
@emits AsyncIterator.readable | ||
*/ | ||
Object.defineProperty(AsyncIterator.prototype, 'readable', { | ||
get() { | ||
/** | ||
The `end` event is emitted after the last item of the iterator has been read. | ||
@event module:asynciterator.end | ||
*/ | ||
/** | ||
Gets or sets whether this iterator might have items available for read. | ||
A value of `false` means there are _definitely_ no items available; | ||
a value of `true` means items _might_ be available. | ||
@type boolean | ||
@emits module:asynciterator.AsyncIterator.readable | ||
*/ | ||
get readable() { | ||
return this._readable; | ||
}, | ||
set(readable) { | ||
} | ||
set readable(readable) { | ||
readable = Boolean(readable) && !this.done; | ||
@@ -277,65 +266,260 @@ // Set the readable value only if it has changed | ||
} | ||
}, | ||
enumerable: true, | ||
}); | ||
} | ||
/** | ||
Gets whether the iterator has stopped generating new items. | ||
@name AsyncIterator#closed | ||
@type boolean | ||
@readonly | ||
*/ | ||
Object.defineProperty(AsyncIterator.prototype, 'closed', { | ||
get() { return this._state >= CLOSING; }, | ||
enumerable: true, | ||
}); | ||
/** | ||
Gets whether the iterator has stopped generating new items. | ||
@type boolean | ||
@readonly | ||
*/ | ||
get closed() { | ||
return this._state >= CLOSING; | ||
} | ||
/** | ||
Gets whether the iterator has finished emitting items. | ||
@name AsyncIterator#ended | ||
@type boolean | ||
@readonly | ||
*/ | ||
Object.defineProperty(AsyncIterator.prototype, 'ended', { | ||
get() { return this._state === ENDED; }, | ||
enumerable: true, | ||
}); | ||
/** | ||
Gets whether the iterator has finished emitting items. | ||
@type boolean | ||
@readonly | ||
*/ | ||
get ended() { | ||
return this._state === ENDED; | ||
} | ||
/** | ||
Gets whether the iterator has been destroyed. | ||
@name AsyncIterator#destroyed | ||
@type boolean | ||
@readonly | ||
*/ | ||
Object.defineProperty(AsyncIterator.prototype, 'destroyed', { | ||
get() { return this._state === DESTROYED; }, | ||
enumerable: true, | ||
}); | ||
/** | ||
Gets whether the iterator has been destroyed. | ||
@type boolean | ||
@readonly | ||
*/ | ||
get destroyed() { | ||
return this._state === DESTROYED; | ||
} | ||
/** | ||
Gets whether the iterator will not emit anymore items, | ||
either due to being closed or due to being destroyed. | ||
@name AsyncIterator#done | ||
@type boolean | ||
@readonly | ||
*/ | ||
Object.defineProperty(AsyncIterator.prototype, 'done', { | ||
get() { return this._state >= ENDED; }, | ||
enumerable: true, | ||
}); | ||
/** | ||
Gets whether the iterator will not emit anymore items, | ||
either due to being closed or due to being destroyed. | ||
@type boolean | ||
@readonly | ||
*/ | ||
get done() { | ||
return this._state >= ENDED; | ||
} | ||
/** | ||
The iterator emits a `data` event with a new item as soon as it becomes available. | ||
When one or more listeners are attached to the `data` event, | ||
the iterator switches to _flow mode_, | ||
generating and emitting new items as fast as possible. | ||
This drains the source and might create backpressure on the consumers, | ||
so only subscribe to this event if this behavior is intended. | ||
In flow mode, don't use the {@link AsyncIterator#read} method. | ||
To switch back to _on-demand mode_, remove all listeners from the `data` event. | ||
You can then obtain items through {@link AsyncIterator#read} again. | ||
@event AsyncIterator.data | ||
@param {object} item The new item | ||
*/ | ||
/* Generates a textual representation of the iterator. */ | ||
toString() { | ||
const details = this._toStringDetails(); | ||
return `[${this.constructor.name}${details ? ` ${details}` : ''}]`; | ||
} | ||
/** | ||
Generates details for a textual representation of the iterator. | ||
@protected | ||
*/ | ||
_toStringDetails() { | ||
return ''; | ||
} | ||
/** | ||
Retrieves the property with the given name from the iterator. | ||
If no callback is passed, it returns the value of the property | ||
or `undefined` if the property is not set. | ||
If a callback is passed, it returns `undefined` | ||
and calls the callback with the property the moment it is set. | ||
@param {string} propertyName The name of the property to retrieve | ||
@param {Function} [callback] A one-argument callback to receive the property value | ||
@returns {object?} The value of the property (if set and no callback is given) | ||
*/ | ||
getProperty(propertyName, callback) { | ||
const properties = this._properties; | ||
// If no callback was passed, return the property value | ||
if (!callback) | ||
return properties && properties[propertyName]; | ||
// If the value has been set, send it through the callback | ||
if (properties && (propertyName in properties)) { | ||
queueMicrotask(() => callback(properties[propertyName])); | ||
} | ||
// If the value was not set, store the callback for when the value will be set | ||
else { | ||
let propertyCallbacks; | ||
if (!(propertyCallbacks = this._propertyCallbacks)) | ||
this._propertyCallbacks = propertyCallbacks = Object.create(null); | ||
if (propertyName in propertyCallbacks) | ||
propertyCallbacks[propertyName].push(callback); | ||
else | ||
propertyCallbacks[propertyName] = [callback]; | ||
} | ||
return undefined; | ||
} | ||
/** | ||
Sets the property with the given name to the value. | ||
@param {string} propertyName The name of the property to set | ||
@param {object?} value The new value of the property | ||
*/ | ||
setProperty(propertyName, value) { | ||
const properties = this._properties || (this._properties = Object.create(null)); | ||
properties[propertyName] = value; | ||
// Execute getter callbacks that were waiting for this property to be set | ||
const propertyCallbacks = this._propertyCallbacks; | ||
const callbacks = propertyCallbacks && propertyCallbacks[propertyName]; | ||
if (callbacks) { | ||
delete propertyCallbacks[propertyName]; | ||
queueMicrotask(() => { | ||
for (const callback of callbacks) | ||
callback(value); | ||
}); | ||
// Remove _propertyCallbacks if no pending callbacks are left | ||
for (propertyName in propertyCallbacks) | ||
return; | ||
delete this._propertyCallbacks; | ||
} | ||
} | ||
/** | ||
Retrieves all properties of the iterator. | ||
@returns {object} An object with property names as keys. | ||
*/ | ||
getProperties() { | ||
const properties = this._properties, copy = {}; | ||
for (const name in properties) | ||
copy[name] = properties[name]; | ||
return copy; | ||
} | ||
/** | ||
Sets all of the given properties. | ||
@param {object} properties Key/value pairs of properties to set | ||
*/ | ||
setProperties(properties) { | ||
for (const propertyName in properties) | ||
this.setProperty(propertyName, properties[propertyName]); | ||
} | ||
/** | ||
Copies the given properties from the source iterator. | ||
@param {module:asynciterator.AsyncIterator} source The iterator to copy from | ||
@param {Array} propertyNames List of property names to copy | ||
*/ | ||
copyProperties(source, propertyNames) { | ||
for (const propertyName of propertyNames) { | ||
source.getProperty(propertyName, value => | ||
this.setProperty(propertyName, value)); | ||
} | ||
} | ||
/** | ||
Transforms items from this iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {object|Function} [options] Settings of the iterator, or the transformation function | ||
@param {integer} [options.maxbufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {integer} [options.offset] The number of items to skip | ||
@param {integer} [options.limit] The maximum number of items | ||
@param {Function} [options.filter] A function to synchronously filter items from the source | ||
@param {Function} [options.map] A function to synchronously transform items from the source | ||
@param {Function} [options.transform] A function to asynchronously transform items from the source | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items | ||
@param {Array|module:asynciterator.AsyncIterator} [options.prepend] Items to insert before the source items | ||
@param {Array|module:asynciterator.AsyncIterator} [options.append] Items to insert after the source items | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator | ||
*/ | ||
transform(options) { | ||
return new SimpleTransformIterator(this, options); | ||
} | ||
/** | ||
Maps items from this iterator using the given function. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Function} mapper A mapping function to call on this iterator's (remaining) items | ||
@param {object?} self The `this` pointer for the mapping function | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator | ||
*/ | ||
map(mapper, self) { | ||
return this.transform({ map: self ? mapper.bind(self) : mapper }); | ||
} | ||
/** | ||
Return items from this iterator that match the filter. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Function} filter A filter function to call on this iterator's (remaining) items | ||
@param {object?} self The `this` pointer for the filter function | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that filters items from this iterator | ||
*/ | ||
filter(filter, self) { | ||
return this.transform({ filter: self ? filter.bind(self) : filter }); | ||
} | ||
/** | ||
Prepends the items after those of the current iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|module:asynciterator.AsyncIterator} items Items to insert before this iterator's (remaining) items | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator | ||
*/ | ||
prepend(items) { | ||
return this.transform({ prepend: items }); | ||
} | ||
/** | ||
Appends the items after those of the current iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|module:asynciterator.AsyncIterator} items Items to insert after this iterator's (remaining) items | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator | ||
*/ | ||
append(items) { | ||
return this.transform({ append: items }); | ||
} | ||
/** | ||
Surrounds items of the current iterator with the given items. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|module:asynciterator.AsyncIterator} prepend Items to insert before this iterator's (remaining) items | ||
@param {Array|module:asynciterator.AsyncIterator} append Items to insert after this iterator's (remaining) items | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator | ||
*/ | ||
surround(prepend, append) { | ||
return this.transform({ prepend, append }); | ||
} | ||
/** | ||
Skips the given number of items from the current iterator. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} offset The number of items to skip | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items | ||
*/ | ||
skip(offset) { | ||
return this.transform({ offset }); | ||
} | ||
/** | ||
Limits the current iterator to the given number of items. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} limit The maximum number of items | ||
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items | ||
*/ | ||
take(limit) { | ||
return this.transform({ limit }); | ||
} | ||
/** | ||
Limits the current iterator to the given range. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} start Index of the first item to return | ||
@param {integer} end Index of the last item to return | ||
@returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range | ||
*/ | ||
range(start, end) { | ||
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) }); | ||
} | ||
/** | ||
Creates a copy of the current iterator, | ||
containing all items emitted from this point onward. | ||
Further copies can be created; they will all start from this same point. | ||
After this operation, only read the returned copies instead of the original iterator. | ||
@returns {module:asynciterator.AsyncIterator} A new iterator that contains all future items of this iterator | ||
*/ | ||
clone() { | ||
return new ClonedIterator(this); | ||
} | ||
} | ||
// Starts emitting `data` events when `data` listeners are added | ||
@@ -363,321 +547,194 @@ function waitForDataListener(eventName) { | ||
/** | ||
Retrieves the property with the given name from the iterator. | ||
If no callback is passed, it returns the value of the property | ||
or `undefined` if the property is not set. | ||
If a callback is passed, it returns `undefined` | ||
and calls the callback with the property the moment it is set. | ||
@param {string} propertyName The name of the property to retrieve | ||
@param {Function} [callback] A one-argument callback to receive the property value | ||
@returns {object?} The value of the property (if set and no callback is given) | ||
*/ | ||
AsyncIterator.prototype.getProperty = function (propertyName, callback) { | ||
const properties = this._properties; | ||
// If no callback was passed, return the property value | ||
if (!callback) | ||
return properties && properties[propertyName]; | ||
// If the value has been set, send it through the callback | ||
if (properties && (propertyName in properties)) { | ||
queueMicrotask(() => callback(properties[propertyName])); | ||
} | ||
// If the value was not set, store the callback for when the value will be set | ||
else { | ||
let propertyCallbacks; | ||
if (!(propertyCallbacks = this._propertyCallbacks)) | ||
this._propertyCallbacks = propertyCallbacks = Object.create(null); | ||
if (propertyName in propertyCallbacks) | ||
propertyCallbacks[propertyName].push(callback); | ||
else | ||
propertyCallbacks[propertyName] = [callback]; | ||
} | ||
return undefined; | ||
}; | ||
/** | ||
Sets the property with the given name to the value. | ||
@param {string} propertyName The name of the property to set | ||
@param {object?} value The new value of the property | ||
An iterator that doesn't emit any items. | ||
@extends module:asynciterator.AsyncIterator | ||
*/ | ||
AsyncIterator.prototype.setProperty = function (propertyName, value) { | ||
const properties = this._properties || (this._properties = Object.create(null)); | ||
properties[propertyName] = value; | ||
// Execute getter callbacks that were waiting for this property to be set | ||
const propertyCallbacks = this._propertyCallbacks; | ||
const callbacks = propertyCallbacks && propertyCallbacks[propertyName]; | ||
if (callbacks) { | ||
delete propertyCallbacks[propertyName]; | ||
queueMicrotask(() => { | ||
for (const callback of callbacks) | ||
callback(value); | ||
}); | ||
// Remove _propertyCallbacks if no pending callbacks are left | ||
for (propertyName in propertyCallbacks) | ||
return; | ||
delete this._propertyCallbacks; | ||
export class EmptyIterator extends AsyncIterator { | ||
/** Creates a new `EmptyIterator`. */ | ||
constructor() { | ||
super(); | ||
this._changeState(ENDED, true); | ||
} | ||
}; | ||
/** | ||
Retrieves all properties of the iterator. | ||
@returns {object} An object with property names as keys. | ||
*/ | ||
AsyncIterator.prototype.getProperties = function () { | ||
const properties = this._properties, copy = {}; | ||
for (const name in properties) | ||
copy[name] = properties[name]; | ||
return copy; | ||
}; | ||
/** | ||
Sets all of the given properties. | ||
@param {object} properties Key/value pairs of properties to set | ||
*/ | ||
AsyncIterator.prototype.setProperties = function (properties) { | ||
for (const propertyName in properties) | ||
this.setProperty(propertyName, properties[propertyName]); | ||
}; | ||
/** | ||
Copies the given properties from the source iterator. | ||
@param {AsyncIterator} source The iterator to copy from | ||
@param {Array} propertyNames List of property names to copy | ||
*/ | ||
AsyncIterator.prototype.copyProperties = function (source, propertyNames) { | ||
for (let i = 0; i < propertyNames.length; i++) | ||
copyProperty(source, this, propertyNames[i]); | ||
}; | ||
function copyProperty(source, destination, propertyName) { | ||
source.getProperty(propertyName, value => { | ||
destination.setProperty(propertyName, value); | ||
}); | ||
} | ||
/* Generates a textual representation of the iterator. */ | ||
AsyncIterator.prototype.toString = function () { | ||
const details = this._toStringDetails(); | ||
return `[${this.constructor.name}${details ? ` ${details}` : ''}]`; | ||
}; | ||
/** | ||
Generates details for a textual representation of the iterator. | ||
@protected | ||
An iterator that emits a single item. | ||
@extends module:asynciterator.AsyncIterator | ||
*/ | ||
AsyncIterator.prototype._toStringDetails = function () { /* */ }; | ||
export class SingletonIterator extends AsyncIterator { | ||
/** | ||
Creates a new `SingletonIterator`. | ||
@param {object} item The item that will be emitted. | ||
*/ | ||
constructor(item) { | ||
super(); | ||
this._item = item; | ||
if (item === null) | ||
this.close(); | ||
else | ||
this.readable = true; | ||
} | ||
/* Reads the item from the iterator. */ | ||
read() { | ||
const item = this._item; | ||
this._item = null; | ||
this.close(); | ||
return item; | ||
} | ||
/** | ||
Creates a new `EmptyIterator`. | ||
@constructor | ||
@classdesc An iterator that doesn't emit any items. | ||
@extends AsyncIterator | ||
*/ | ||
function EmptyIterator() { | ||
if (!(this instanceof EmptyIterator)) | ||
return new EmptyIterator(); | ||
AsyncIterator.call(this); | ||
this._changeState(ENDED, true); | ||
/* Generates details for a textual representation of the iterator. */ | ||
_toStringDetails() { | ||
return this._item === null ? '' : `(${ this._item })`; | ||
} | ||
} | ||
AsyncIterator.subclass(EmptyIterator); | ||
/** | ||
Creates a new `SingletonIterator`. | ||
@constructor | ||
@classdesc An iterator that emits a single item. | ||
@param {object} item The item that will be emitted. | ||
@extends AsyncIterator | ||
An iterator that emits the items of a given array. | ||
@extends module:asynciterator.AsyncIterator | ||
*/ | ||
function SingletonIterator(item) { | ||
if (!(this instanceof SingletonIterator)) | ||
return new SingletonIterator(item); | ||
AsyncIterator.call(this); | ||
this._item = item; | ||
if (item === null) | ||
this.close(); | ||
else | ||
export class ArrayIterator extends AsyncIterator { | ||
/** | ||
Creates a new `ArrayIterator`. | ||
@param {Array} items The items that will be emitted. | ||
*/ | ||
constructor(items) { | ||
super(); | ||
if (!(items && items.length > 0)) | ||
return this.close(); | ||
this._buffer = Array.prototype.slice.call(items); | ||
this.readable = true; | ||
} | ||
AsyncIterator.subclass(SingletonIterator); | ||
} | ||
/* Reads the item from the iterator. */ | ||
SingletonIterator.prototype.read = function () { | ||
const item = this._item; | ||
this._item = null; | ||
this.close(); | ||
return item; | ||
}; | ||
/* Generates details for a textual representation of the iterator. */ | ||
SingletonIterator.prototype._toStringDetails = function () { | ||
return this._item === null ? '' : `(${ this._item })`; | ||
}; | ||
/** | ||
Creates a new `ArrayIterator`. | ||
@constructor | ||
@classdesc An iterator that emits the items of a given array. | ||
@param {Array} items The items that will be emitted. | ||
@extends AsyncIterator | ||
*/ | ||
function ArrayIterator(items) { | ||
if (!(this instanceof ArrayIterator)) | ||
return new ArrayIterator(items); | ||
AsyncIterator.call(this); | ||
if (!(items && items.length > 0)) | ||
return this.close(); | ||
this._buffer = Array.prototype.slice.call(items); | ||
this.readable = true; | ||
} | ||
AsyncIterator.subclass(ArrayIterator); | ||
/* Reads an item from the iterator. */ | ||
ArrayIterator.prototype.read = function () { | ||
const buffer = this._buffer; | ||
let item = null; | ||
if (buffer) { | ||
item = buffer.shift(); | ||
if (!buffer.length) { | ||
delete this._buffer; | ||
this.close(); | ||
/* Reads an item from the iterator. */ | ||
read() { | ||
const buffer = this._buffer; | ||
let item = null; | ||
if (buffer) { | ||
item = buffer.shift(); | ||
if (!buffer.length) { | ||
delete this._buffer; | ||
this.close(); | ||
} | ||
} | ||
return item; | ||
} | ||
return item; | ||
}; | ||
/* Generates details for a textual representation of the iterator. */ | ||
ArrayIterator.prototype._toStringDetails = function () { | ||
return `(${ this._buffer && this._buffer.length || 0 })`; | ||
}; | ||
/* Generates details for a textual representation of the iterator. */ | ||
_toStringDetails() { | ||
return `(${ this._buffer && this._buffer.length || 0 })`; | ||
} | ||
/* Called by {@link AsyncIterator#destroy} */ | ||
ArrayIterator.prototype._destroy = function (error, callback) { | ||
delete this._buffer; | ||
callback(); | ||
}; | ||
/* Called by {@link module:asynciterator.AsyncIterator#destroy} */ | ||
_destroy(error, callback) { | ||
delete this._buffer; | ||
callback(); | ||
} | ||
} | ||
/** | ||
Creates a new `IntegerIterator`. | ||
@constructor | ||
@classdesc An iterator that enumerates integers in a certain range. | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.start=0] The first number to emit | ||
@param {integer} [options.end=Infinity] The last number to emit | ||
@param {integer} [options.step=1] The increment between two numbers | ||
@extends AsyncIterator | ||
An iterator that enumerates integers in a certain range. | ||
@extends module:asynciterator.AsyncIterator | ||
*/ | ||
function IntegerIterator(options) { | ||
if (!(this instanceof IntegerIterator)) | ||
return new IntegerIterator(options); | ||
AsyncIterator.call(this); | ||
export class IntegerIterator extends AsyncIterator { | ||
/** | ||
Creates a new `IntegerIterator`. | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.start=0] The first number to emit | ||
@param {integer} [options.end=Infinity] The last number to emit | ||
@param {integer} [options.step=1] The increment between two numbers | ||
*/ | ||
constructor({ start = 0, step = 1, end } = {}) { | ||
super(); | ||
// Determine step size | ||
let { step, end: last, start: next } = options || {}; | ||
step = isFinite(step) ? Math.floor(step) : 1; | ||
this._step = step; | ||
// Determine the first number | ||
if (Number.isFinite(start)) | ||
start = Math.trunc(start); | ||
this._next = start; | ||
// Determine the next number | ||
if (typeof next !== 'number') | ||
next = 0; | ||
else if (isFinite(next)) | ||
next = Math.floor(next); | ||
this._next = next; | ||
// Determine step size | ||
if (Number.isFinite(step)) | ||
step = Math.trunc(step); | ||
this._step = step; | ||
// Determine the last number | ||
if (isFinite(last)) { | ||
last = Math.floor(last); | ||
// Determine the last number | ||
const ascending = step >= 0; | ||
const direction = ascending ? Infinity : -Infinity; | ||
if (Number.isFinite(end)) | ||
end = Math.trunc(end); | ||
else if (end !== -direction) | ||
end = direction; | ||
this._last = end; | ||
// Start iteration if there is at least one item; close otherwise | ||
if (!Number.isFinite(start) || (ascending ? start > end : start < end)) | ||
this.close(); | ||
else | ||
this.readable = true; | ||
} | ||
else { | ||
// Counting towards plus or minus infinity? | ||
const limit = step >= 0 ? Infinity : -Infinity; | ||
if (last !== -limit) | ||
last = limit; | ||
/* Reads an item from the iterator. */ | ||
read() { | ||
if (this.closed) | ||
return null; | ||
const current = this._next, step = this._step, last = this._last, | ||
next = this._next += step; | ||
if (step >= 0 ? next > last : next < last) | ||
this.close(); | ||
return current; | ||
} | ||
this._last = last; | ||
// Start iteration if there is at least one item; close otherwise | ||
if (!isFinite(next) || (step >= 0 ? next > last : next < last)) | ||
this.close(); | ||
else | ||
this.readable = true; | ||
/* Generates details for a textual representation of the iterator. */ | ||
_toStringDetails() { | ||
return `(${ this._next }...${ this._last })`; | ||
} | ||
} | ||
AsyncIterator.subclass(IntegerIterator); | ||
/* Reads an item from the iterator. */ | ||
IntegerIterator.prototype.read = function () { | ||
if (this.closed) | ||
return null; | ||
const current = this._next, step = this._step, last = this._last, next = this._next += step; | ||
if (step >= 0 ? next > last : next < last) | ||
this.close(); | ||
return current; | ||
}; | ||
/* Generates details for a textual representation of the iterator. */ | ||
IntegerIterator.prototype._toStringDetails = function () { | ||
return `(${ this._next }...${ this._last })`; | ||
}; | ||
/** | ||
Creates an iterator of natural numbers within the given range. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} [start=0] The first number to emit | ||
@param {integer} [end=Infinity] The last number to emit | ||
@param {integer} [step=1] The increment between two numbers | ||
@returns {IntegerIterator} An iterator of natural numbers within the given range | ||
*/ | ||
AsyncIterator.range = function (start, end, step) { | ||
return new IntegerIterator({ start, end, step }); | ||
}; | ||
/** | ||
Creates a new `BufferedIterator`. | ||
@constructor | ||
@classdesc A iterator that maintains an internal buffer of items. | ||
A iterator that maintains an internal buffer of items. | ||
This class serves as a base class for other iterators | ||
with a typically complex item generation process. | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.maxBufferSize=4] The number of items to preload in the internal buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@extends AsyncIterator | ||
@extends module:asynciterator.AsyncIterator | ||
*/ | ||
function BufferedIterator(options) { | ||
if (!(this instanceof BufferedIterator)) | ||
return new BufferedIterator(options); | ||
AsyncIterator.call(this); | ||
export class BufferedIterator extends AsyncIterator { | ||
/** | ||
Creates a new `BufferedIterator`. | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.maxBufferSize=4] The number of items to preload in the internal buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
*/ | ||
constructor({ maxBufferSize = 4, autoStart = true } = {}) { | ||
super(); | ||
// Set up the internal buffer | ||
const { maxBufferSize, autoStart } = options || {}; | ||
this._state = INIT; | ||
this._buffer = []; | ||
this._pushedCount = 0; | ||
this.maxBufferSize = maxBufferSize; | ||
// Set up the internal buffer | ||
this._state = INIT; | ||
this._buffer = []; | ||
this._pushedCount = 0; | ||
this.maxBufferSize = maxBufferSize; | ||
// Acquire reading lock to read initialization items | ||
this._reading = true; | ||
queueMicrotask(() => this._init(autoStart !== false || autoStart)); | ||
} | ||
AsyncIterator.subclass(BufferedIterator); | ||
// Acquire reading lock to read initialization items | ||
this._reading = true; | ||
queueMicrotask(() => this._init(autoStart)); | ||
} | ||
/** | ||
Gets or sets the maximum number of items to preload in the internal buffer. | ||
A `BufferedIterator` tries to fill its buffer as far as possible. | ||
Set to `Infinity` to fully drain the source. | ||
@name BufferedIterator#maxBufferSize | ||
@type number | ||
*/ | ||
Object.defineProperty(BufferedIterator.prototype, 'maxBufferSize', { | ||
get() { | ||
/** | ||
The maximum number of items to preload in the internal buffer. | ||
A `BufferedIterator` tries to fill its buffer as far as possible. | ||
Set to `Infinity` to fully drain the source. | ||
@type number | ||
*/ | ||
get maxBufferSize() { | ||
return this._maxBufferSize; | ||
}, | ||
set(maxBufferSize) { | ||
} | ||
set maxBufferSize(maxBufferSize) { | ||
// Allow only positive integers and infinity | ||
if (maxBufferSize !== Infinity) { | ||
maxBufferSize = !isFinite(maxBufferSize) ? 4 : | ||
Math.max(Math.floor(maxBufferSize), 1); | ||
maxBufferSize = !Number.isFinite(maxBufferSize) ? 4 : | ||
Math.max(Math.trunc(maxBufferSize), 1); | ||
} | ||
@@ -691,269 +748,266 @@ // Only set the maximum buffer size if it changes | ||
} | ||
}, | ||
enumerable: true, | ||
}); | ||
} | ||
/** | ||
Initializing the iterator by calling {@link BufferedIterator#_begin} | ||
and changing state from INIT to OPEN. | ||
@protected | ||
@param {boolean} autoStart Whether reading of items should immediately start after OPEN. | ||
*/ | ||
BufferedIterator.prototype._init = function (autoStart) { | ||
// Perform initialization tasks | ||
let doneCalled = false; | ||
this._reading = true; | ||
this._begin(() => { | ||
if (doneCalled) | ||
throw new Error('done callback called multiple times'); | ||
doneCalled = true; | ||
// Open the iterator and start buffering | ||
this._reading = false; | ||
this._changeState(OPEN); | ||
if (autoStart) | ||
this._fillBufferAsync(); | ||
// If reading should not start automatically, the iterator doesn't become readable. | ||
// Therefore, mark the iterator as (potentially) readable so consumers know it might be read. | ||
else | ||
this.readable = true; | ||
}); | ||
}; | ||
/** | ||
Writes beginning items and opens iterator resources. | ||
Should never be called before {@link BufferedIterator#_init}; | ||
typically, `_init` is responsible for calling `_begin`. | ||
@protected | ||
@param {function} done To be called when initialization is complete | ||
*/ | ||
BufferedIterator.prototype._begin = function (done) { | ||
done(); | ||
}; | ||
/** | ||
Tries to read the next item from the iterator. | ||
If the buffer is empty, | ||
this method calls {@link BufferedIterator#_read} to fetch items. | ||
@returns {object?} The next item, or `null` if none is available | ||
*/ | ||
BufferedIterator.prototype.read = function () { | ||
if (this.done) | ||
return null; | ||
// Try to retrieve an item from the buffer | ||
const buffer = this._buffer; | ||
let item; | ||
if (buffer.length !== 0) { | ||
item = buffer.shift(); | ||
/** | ||
Initializing the iterator by calling {@link BufferedIterator#_begin} | ||
and changing state from INIT to OPEN. | ||
@protected | ||
@param {boolean} autoStart Whether reading of items should immediately start after OPEN. | ||
*/ | ||
_init(autoStart) { | ||
// Perform initialization tasks | ||
let doneCalled = false; | ||
this._reading = true; | ||
this._begin(() => { | ||
if (doneCalled) | ||
throw new Error('done callback called multiple times'); | ||
doneCalled = true; | ||
// Open the iterator and start buffering | ||
this._reading = false; | ||
this._changeState(OPEN); | ||
if (autoStart) | ||
this._fillBufferAsync(); | ||
// If reading should not start automatically, the iterator doesn't become readable. | ||
// Therefore, mark the iterator as (potentially) readable so consumers know it might be read. | ||
else | ||
this.readable = true; | ||
}); | ||
} | ||
else { | ||
item = null; | ||
this.readable = false; | ||
} | ||
// If the buffer is becoming empty, either fill it or end the iterator | ||
if (!this._reading && buffer.length < this._maxBufferSize) { | ||
// If the iterator is not closed and thus may still generate new items, fill the buffer | ||
if (!this.closed) | ||
this._fillBufferAsync(); | ||
// No new items will be generated, so if none are buffered, the iterator ends here | ||
else if (!buffer.length) | ||
this._endAsync(); | ||
/** | ||
Writes beginning items and opens iterator resources. | ||
Should never be called before {@link BufferedIterator#_init}; | ||
typically, `_init` is responsible for calling `_begin`. | ||
@protected | ||
@param {function} done To be called when initialization is complete | ||
*/ | ||
_begin(done) { | ||
done(); | ||
} | ||
return item; | ||
}; | ||
/** | ||
Tries to read the next item from the iterator. | ||
If the buffer is empty, | ||
this method calls {@link BufferedIterator#_read} to fetch items. | ||
@returns {object?} The next item, or `null` if none is available | ||
*/ | ||
read() { | ||
if (this.done) | ||
return null; | ||
/** | ||
Tries to generate the given number of items. | ||
Implementers should add `count` items through {@link BufferedIterator#_push}. | ||
@protected | ||
@param {integer} count The number of items to generate | ||
@param {function} done To be called when reading is complete | ||
*/ | ||
BufferedIterator.prototype._read = function (count, done) { | ||
done(); | ||
}; | ||
// Try to retrieve an item from the buffer | ||
const buffer = this._buffer; | ||
let item; | ||
if (buffer.length !== 0) { | ||
item = buffer.shift(); | ||
} | ||
else { | ||
item = null; | ||
this.readable = false; | ||
} | ||
/** | ||
Adds an item to the internal buffer. | ||
@protected | ||
@param {object} item The item to add | ||
@emits AsyncIterator.readable | ||
*/ | ||
BufferedIterator.prototype._push = function (item) { | ||
if (!this.done) { | ||
this._pushedCount++; | ||
this._buffer.push(item); | ||
this.readable = true; | ||
// If the buffer is becoming empty, either fill it or end the iterator | ||
if (!this._reading && buffer.length < this._maxBufferSize) { | ||
// If the iterator is not closed and thus may still generate new items, fill the buffer | ||
if (!this.closed) | ||
this._fillBufferAsync(); | ||
// No new items will be generated, so if none are buffered, the iterator ends here | ||
else if (!buffer.length) | ||
this._endAsync(); | ||
} | ||
return item; | ||
} | ||
}; | ||
/** | ||
Fills the internal buffer until `this._maxBufferSize` items are present. | ||
This method calls {@link BufferedIterator#_read} to fetch items. | ||
@protected | ||
@emits AsyncIterator.readable | ||
*/ | ||
BufferedIterator.prototype._fillBuffer = function () { | ||
let neededItems; | ||
// Avoid recursive reads | ||
if (this._reading) { | ||
// Do nothing | ||
/** | ||
Tries to generate the given number of items. | ||
Implementers should add `count` items through {@link BufferedIterator#_push}. | ||
@protected | ||
@param {integer} count The number of items to generate | ||
@param {function} done To be called when reading is complete | ||
*/ | ||
_read(count, done) { | ||
done(); | ||
} | ||
// If iterator closing started in the meantime, don't generate new items anymore | ||
else if (this.closed) { | ||
this._completeClose(); | ||
/** | ||
Adds an item to the internal buffer. | ||
@protected | ||
@param {object} item The item to add | ||
@emits module:asynciterator.AsyncIterator.readable | ||
*/ | ||
_push(item) { | ||
if (!this.done) { | ||
this._pushedCount++; | ||
this._buffer.push(item); | ||
this.readable = true; | ||
} | ||
} | ||
// Otherwise, try to fill empty spaces in the buffer by generating new items | ||
else if ((neededItems = Math.min(this._maxBufferSize - this._buffer.length, 128)) > 0) { | ||
// Acquire reading lock and start reading, counting pushed items | ||
this._pushedCount = 0; | ||
this._reading = true; | ||
this._read(neededItems, () => { | ||
// Verify the callback is only called once | ||
if (!neededItems) | ||
throw new Error('done callback called multiple times'); | ||
neededItems = 0; | ||
// Release reading lock | ||
this._reading = false; | ||
// If the iterator was closed while reading, complete closing | ||
if (this.closed) { | ||
this._completeClose(); | ||
} | ||
// If the iterator pushed one or more items, | ||
// it might currently be able to generate additional items | ||
// (even though all pushed items might already have been read) | ||
else if (this._pushedCount) { | ||
this.readable = true; | ||
// If the buffer is insufficiently full, continue filling | ||
if (this._buffer.length < this._maxBufferSize / 2) | ||
this._fillBufferAsync(); | ||
} | ||
}); | ||
/** | ||
Fills the internal buffer until `this._maxBufferSize` items are present. | ||
This method calls {@link BufferedIterator#_read} to fetch items. | ||
@protected | ||
@emits module:asynciterator.AsyncIterator.readable | ||
*/ | ||
_fillBuffer() { | ||
let neededItems; | ||
// Avoid recursive reads | ||
if (this._reading) { | ||
// Do nothing | ||
} | ||
// If iterator closing started in the meantime, don't generate new items anymore | ||
else if (this.closed) { | ||
this._completeClose(); | ||
} | ||
// Otherwise, try to fill empty spaces in the buffer by generating new items | ||
else if ((neededItems = Math.min(this._maxBufferSize - this._buffer.length, 128)) > 0) { | ||
// Acquire reading lock and start reading, counting pushed items | ||
this._pushedCount = 0; | ||
this._reading = true; | ||
this._read(neededItems, () => { | ||
// Verify the callback is only called once | ||
if (!neededItems) | ||
throw new Error('done callback called multiple times'); | ||
neededItems = 0; | ||
// Release reading lock | ||
this._reading = false; | ||
// If the iterator was closed while reading, complete closing | ||
if (this.closed) { | ||
this._completeClose(); | ||
} | ||
// If the iterator pushed one or more items, | ||
// it might currently be able to generate additional items | ||
// (even though all pushed items might already have been read) | ||
else if (this._pushedCount) { | ||
this.readable = true; | ||
// If the buffer is insufficiently full, continue filling | ||
if (this._buffer.length < this._maxBufferSize / 2) | ||
this._fillBufferAsync(); | ||
} | ||
}); | ||
} | ||
} | ||
}; | ||
/** | ||
Schedules `_fillBuffer` asynchronously. | ||
*/ | ||
BufferedIterator.prototype._fillBufferAsync = function () { | ||
// Acquire reading lock to avoid recursive reads | ||
if (!this._reading) { | ||
this._reading = true; | ||
queueMicrotask(() => { | ||
// Release reading lock so _fillBuffer` can take it | ||
this._reading = false; | ||
this._fillBuffer(); | ||
}); | ||
/** | ||
Schedules `_fillBuffer` asynchronously. | ||
*/ | ||
_fillBufferAsync() { | ||
// Acquire reading lock to avoid recursive reads | ||
if (!this._reading) { | ||
this._reading = true; | ||
queueMicrotask(() => { | ||
// Release reading lock so _fillBuffer` can take it | ||
this._reading = false; | ||
this._fillBuffer(); | ||
}); | ||
} | ||
} | ||
}; | ||
/** | ||
Stops the iterator from generating new items | ||
after a possible pending read operation has finished. | ||
Already generated, pending, or terminating items can still be emitted. | ||
After this, the iterator will end asynchronously. | ||
@emits AsyncIterator.end | ||
*/ | ||
BufferedIterator.prototype.close = function () { | ||
// If the iterator is not currently reading, we can close immediately | ||
if (!this._reading) | ||
this._completeClose(); | ||
// Closing cannot complete when reading, so temporarily assume CLOSING state | ||
// `_fillBuffer` becomes responsible for calling `_completeClose` | ||
else | ||
this._changeState(CLOSING); | ||
}; | ||
/** | ||
Stops the iterator from generating new items | ||
after a possible pending read operation has finished. | ||
Already generated, pending, or terminating items can still be emitted. | ||
After this, the iterator will end asynchronously. | ||
@emits module:asynciterator.AsyncIterator.end | ||
*/ | ||
close() { | ||
// If the iterator is not currently reading, we can close immediately | ||
if (!this._reading) | ||
this._completeClose(); | ||
// Closing cannot complete when reading, so temporarily assume CLOSING state | ||
// `_fillBuffer` becomes responsible for calling `_completeClose` | ||
else | ||
this._changeState(CLOSING); | ||
} | ||
/** | ||
Stops the iterator from generating new items, | ||
switching from `CLOSING` state into `CLOSED` state. | ||
@protected | ||
@emits AsyncIterator.end | ||
*/ | ||
BufferedIterator.prototype._completeClose = function () { | ||
if (this._changeState(CLOSED)) { | ||
// Write possible terminating items | ||
this._reading = true; | ||
this._flush(() => { | ||
if (!this._reading) | ||
throw new Error('done callback called multiple times'); | ||
this._reading = false; | ||
// If no items are left, end the iterator | ||
// Otherwise, `read` becomes responsible for ending the iterator | ||
if (!this._buffer.length) | ||
this._endAsync(); | ||
}); | ||
/** | ||
Stops the iterator from generating new items, | ||
switching from `CLOSING` state into `CLOSED` state. | ||
@protected | ||
@emits module:asynciterator.AsyncIterator.end | ||
*/ | ||
_completeClose() { | ||
if (this._changeState(CLOSED)) { | ||
// Write possible terminating items | ||
this._reading = true; | ||
this._flush(() => { | ||
if (!this._reading) | ||
throw new Error('done callback called multiple times'); | ||
this._reading = false; | ||
// If no items are left, end the iterator | ||
// Otherwise, `read` becomes responsible for ending the iterator | ||
if (!this._buffer.length) | ||
this._endAsync(); | ||
}); | ||
} | ||
} | ||
}; | ||
/* Called by {@link AsyncIterator#destroy} */ | ||
BufferedIterator.prototype._destroy = function (error, callback) { | ||
this._buffer = []; | ||
callback(); | ||
}; | ||
/* Called by {@link module:asynciterator.AsyncIterator#destroy} */ | ||
_destroy(error, callback) { | ||
this._buffer = []; | ||
callback(); | ||
} | ||
/** | ||
Writes terminating items and closes iterator resources. | ||
Should never be called before {@link BufferedIterator#close}; | ||
typically, `close` is responsible for calling `_flush`. | ||
@protected | ||
@param {function} done To be called when termination is complete | ||
*/ | ||
BufferedIterator.prototype._flush = function (done) { | ||
done(); | ||
}; | ||
/** | ||
Writes terminating items and closes iterator resources. | ||
Should never be called before {@link BufferedIterator#close}; | ||
typically, `close` is responsible for calling `_flush`. | ||
@protected | ||
@param {function} done To be called when termination is complete | ||
*/ | ||
_flush(done) { | ||
done(); | ||
} | ||
/* Generates details for a textual representation of the iterator. */ | ||
BufferedIterator.prototype._toStringDetails = function () { | ||
const buffer = this._buffer, { length } = buffer; | ||
return `{${ length ? `next: ${ buffer[0] }, ` : '' }buffer: ${ length }}`; | ||
}; | ||
/** | ||
Generates details for a textual representation of the iterator. | ||
@protected | ||
*/ | ||
_toStringDetails() { | ||
const buffer = this._buffer, { length } = buffer; | ||
return `{${ length ? `next: ${ buffer[0] }, ` : '' }buffer: ${ length }}`; | ||
} | ||
} | ||
/** | ||
Creates a new `TransformIterator`. | ||
An iterator that generates items based on a source iterator. | ||
This class serves as a base class for other iterators. | ||
@constructor | ||
@classdesc An iterator that generates items based on a source iterator. | ||
@param {AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.maxBufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its transformation yields no items | ||
@param {boolean} [options.destroySource=true] Whether the source should be destroyed when this transformed iterator is closed or destroyed | ||
@param {AsyncIterator} [options.source] The source this iterator generates items from | ||
@extends BufferedIterator | ||
@extends module:asynciterator.BufferedIterator | ||
*/ | ||
function TransformIterator(source, options) { | ||
if (!(this instanceof TransformIterator)) | ||
return new TransformIterator(source, options); | ||
// Shift arguments if the first is not a source | ||
if (!source || !isFunction(source.read)) { | ||
if (!options) | ||
options = source; | ||
source = options && options.source; | ||
export class TransformIterator extends BufferedIterator { | ||
/** | ||
Creates a new `TransformIterator`. | ||
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
@param {integer} [options.maxBufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its transformation yields no items | ||
@param {boolean} [options.destroySource=true] Whether the source should be destroyed when this transformed iterator is closed or destroyed | ||
@param {module:asynciterator.AsyncIterator} [options.source] The source this iterator generates items from | ||
*/ | ||
constructor(source, options = source || {}) { | ||
super(options); | ||
// Initialize source and settings | ||
if (!source || !isFunction(source.read)) | ||
source = options.source; | ||
if (source) | ||
this.source = source; | ||
this._optional = Boolean(options.optional); | ||
this._destroySource = options.destroySource !== false; | ||
} | ||
BufferedIterator.call(this, options); | ||
if (source) | ||
this.source = source; | ||
this._optional = Boolean(options && options.optional); | ||
this._destroySource = !options || options.destroySource !== false; | ||
} | ||
BufferedIterator.subclass(TransformIterator); | ||
/** | ||
Gets or sets the source this iterator generates items from. | ||
@name TransformIterator#source | ||
@type AsyncIterator | ||
*/ | ||
Object.defineProperty(TransformIterator.prototype, 'source', { | ||
get() { | ||
/** | ||
The source this iterator generates items from. | ||
@type module:asynciterator.AsyncIterator | ||
*/ | ||
get source() { | ||
return this._source; | ||
}, | ||
set(source) { | ||
} | ||
set source(source) { | ||
// Validate and set source | ||
this._validateSource(source); | ||
this._source = source; | ||
this._source = this._validateSource(source); | ||
source._destination = this; | ||
@@ -971,5 +1025,100 @@ | ||
} | ||
}, | ||
enumerable: true, | ||
}); | ||
} | ||
/** | ||
Validates whether the given iterator can be used as a source. | ||
@protected | ||
@param {object} source The source to validate | ||
@param {boolean} allowDestination Whether the source can already have a destination | ||
*/ | ||
_validateSource(source, allowDestination) { | ||
if (this._source) | ||
throw new Error('The source cannot be changed after it has been set'); | ||
if (!source || !isFunction(source.read) || !isFunction(source.on)) | ||
throw new Error(`Invalid source: ${ source}`); | ||
if (!allowDestination && source._destination) | ||
throw new Error('The source already has a destination'); | ||
return source; | ||
} | ||
/** | ||
Tries to read a transformed item. | ||
*/ | ||
_read(count, done) { | ||
const next = () => { | ||
// Continue transforming until at least `count` items have been pushed | ||
if (this._pushedCount < count && !this.closed) | ||
queueMicrotask(() => this._readAndTransform(next, done)); | ||
else | ||
done(); | ||
}; | ||
this._readAndTransform(next, done); | ||
} | ||
/** | ||
Reads a transforms an item | ||
*/ | ||
_readAndTransform(next, done) { | ||
// If the source exists and still can read items, | ||
// try to read and transform the next item. | ||
const source = this._source; | ||
let item; | ||
if (source && !source.ended && (item = source.read()) !== null) { | ||
if (!this._optional) | ||
this._transform(item, next); | ||
else | ||
this._optionalTransform(item, next); | ||
} | ||
else { done(); } | ||
} | ||
/** | ||
Tries to transform the item; | ||
if the transformation yields no items, pushes the original item. | ||
*/ | ||
_optionalTransform(item, done) { | ||
const pushedCount = this._pushedCount; | ||
this._transform(item, () => { | ||
if (pushedCount === this._pushedCount) | ||
this._push(item); | ||
done(); | ||
}); | ||
} | ||
/** | ||
Generates items based on the item from the source. | ||
Implementers should add items through {@link BufferedIterator#_push}. | ||
The default implementation pushes the source item as-is. | ||
@protected | ||
@param {object} item The last read item from the source | ||
@param {function} done To be called when reading is complete | ||
*/ | ||
_transform(item, done) { | ||
this._push(item); | ||
done(); | ||
} | ||
/** | ||
Closes the iterator when pending items are transformed. | ||
@protected | ||
*/ | ||
_closeWhenDone() { | ||
this.close(); | ||
} | ||
/* Cleans up the source iterator and ends. */ | ||
_end(destroy) { | ||
const source = this._source; | ||
if (source) { | ||
source.removeListener('end', destinationCloseWhenDone); | ||
source.removeListener('error', destinationEmitError); | ||
source.removeListener('readable', destinationFillBuffer); | ||
delete source._destination; | ||
if (this._destroySource) | ||
source.destroy(); | ||
} | ||
super._end(destroy); | ||
} | ||
} | ||
function destinationEmitError(error) { | ||
@@ -985,472 +1134,268 @@ this._destination.emit('error', error); | ||
/** | ||
Validates whether the given iterator can be used as a source. | ||
@protected | ||
@param {object} source The source to validate | ||
@param {boolean} allowDestination Whether the source can already have a destination | ||
*/ | ||
TransformIterator.prototype._validateSource = function (source, allowDestination) { | ||
if (this._source) | ||
throw new Error('The source cannot be changed after it has been set'); | ||
if (!source || !isFunction(source.read) || !isFunction(source.on)) | ||
throw new Error(`Invalid source: ${ source}`); | ||
if (!allowDestination && source._destination) | ||
throw new Error('The source already has a destination'); | ||
}; | ||
/** | ||
Tries to read a transformed item. | ||
An iterator that generates items based on a source iterator | ||
and simple transformation steps passed as arguments. | ||
@extends module:asynciterator.TransformIterator | ||
*/ | ||
TransformIterator.prototype._read = function (count, done) { | ||
const next = () => { | ||
// Continue transforming until at least `count` items have been pushed | ||
if (this._pushedCount < count && !this.closed) | ||
queueMicrotask(() => this._readAndTransform(next, done)); | ||
else | ||
done(); | ||
}; | ||
this._readAndTransform(next, done); | ||
}; | ||
export class SimpleTransformIterator extends TransformIterator { | ||
/** | ||
Creates a new `SimpleTransformIterator`. | ||
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object|Function} [options] Settings of the iterator, or the transformation function | ||
@param {integer} [options.maxbufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {module:asynciterator.AsyncIterator} [options.source] The source this iterator generates items from | ||
@param {integer} [options.offset] The number of items to skip | ||
@param {integer} [options.limit] The maximum number of items | ||
@param {Function} [options.filter] A function to synchronously filter items from the source | ||
@param {Function} [options.map] A function to synchronously transform items from the source | ||
@param {Function} [options.transform] A function to asynchronously transform items from the source | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items | ||
@param {Array|module:asynciterator.AsyncIterator} [options.prepend] Items to insert before the source items | ||
@param {Array|module:asynciterator.AsyncIterator} [options.append] Items to insert after the source items | ||
*/ | ||
constructor(source, options) { | ||
super(source, options); | ||
/** | ||
Reads a transforms an item | ||
*/ | ||
TransformIterator.prototype._readAndTransform = function (next, done) { | ||
// If the source exists and still can read items, | ||
// try to read and transform the next item. | ||
const source = this._source; | ||
let item; | ||
if (source && !source.ended && (item = source.read()) !== null) { | ||
if (!this._optional) | ||
this._transform(item, next); | ||
else | ||
this._optionalTransform(item, next); | ||
// Set transformation steps from the options | ||
options = options || !isFunction(source && source.read) && source; | ||
if (options) { | ||
const transform = isFunction(options) ? options : options.transform; | ||
const { limit, offset, filter, map, prepend, append } = options; | ||
// Don't emit any items when bounds are unreachable | ||
if (offset === Infinity || limit === -Infinity) { | ||
this._limit = 0; | ||
} | ||
else { | ||
if (Number.isFinite(offset)) | ||
this._offset = Math.max(Math.trunc(offset), 0); | ||
if (Number.isFinite(limit)) | ||
this._limit = Math.max(Math.trunc(limit), 0); | ||
if (isFunction(filter)) | ||
this._filter = filter; | ||
if (isFunction(map)) | ||
this._map = map; | ||
if (isFunction(transform)) | ||
this._transform = transform; | ||
} | ||
if (prepend) | ||
this._prepender = prepend.on ? prepend : new ArrayIterator(prepend); | ||
if (append) | ||
this._appender = append.on ? append : new ArrayIterator(append); | ||
} | ||
} | ||
else { done(); } | ||
}; | ||
/** | ||
Tries to transform the item; | ||
if the transformation yields no items, pushes the original item. | ||
*/ | ||
TransformIterator.prototype._optionalTransform = function (item, done) { | ||
const pushedCount = this._pushedCount; | ||
this._transform(item, () => { | ||
if (pushedCount === this._pushedCount) | ||
this._push(item); | ||
done(); | ||
}); | ||
}; | ||
/* Tries to read and transform items */ | ||
_read(count, done) { | ||
const next = () => this._readAndTransformSimple(count, nextAsync, done); | ||
function nextAsync() { | ||
queueMicrotask(next); | ||
} | ||
this._readAndTransformSimple(count, nextAsync, done); | ||
} | ||
/** | ||
Generates items based on the item from the source. | ||
Implementers should add items through {@link BufferedIterator#_push}. | ||
The default implementation pushes the source item as-is. | ||
@protected | ||
@param {object} item The last read item from the source | ||
@param {function} done To be called when reading is complete | ||
*/ | ||
TransformIterator.prototype._transform = function (item, done) { | ||
this._push(item); | ||
done(); | ||
}; | ||
/* Reads and transform items */ | ||
_readAndTransformSimple(count, next, done) { | ||
// Verify we have a readable source | ||
const source = this._source; | ||
let item; | ||
if (!source || source.ended) { | ||
done(); | ||
return; | ||
} | ||
// Verify we are still below the limit | ||
if (this._limit === 0) | ||
this.close(); | ||
/** | ||
Closes the iterator when pending items are transformed. | ||
@protected | ||
*/ | ||
TransformIterator.prototype._closeWhenDone = function () { | ||
this.close(); | ||
}; | ||
// Try to read the next item until at least `count` items have been pushed | ||
while (!this.closed && this._pushedCount < count && (item = source.read()) !== null) { | ||
// Verify the item passes the filter and we've reached the offset | ||
if (!this._filter(item) || this._offset !== 0 && this._offset--) | ||
continue; | ||
/* Cleans up the source iterator and ends. */ | ||
TransformIterator.prototype._end = function (destroy) { | ||
const source = this._source; | ||
if (source) { | ||
source.removeListener('end', destinationCloseWhenDone); | ||
source.removeListener('error', destinationEmitError); | ||
source.removeListener('readable', destinationFillBuffer); | ||
delete source._destination; | ||
if (this._destroySource) | ||
source.destroy(); | ||
// Synchronously map the item | ||
const mappedItem = this._map === null ? item : this._map(item); | ||
// Skip `null` items, pushing the original item if the mapping was optional | ||
if (mappedItem === null) { | ||
if (this._optional) | ||
this._push(item); | ||
} | ||
// Skip the asynchronous phase if no transformation was specified | ||
else if (this._transform === null) { | ||
this._push(mappedItem); | ||
} | ||
// Asynchronously transform the item, and wait for `next` to call back | ||
else { | ||
if (!this._optional) | ||
this._transform(mappedItem, next); | ||
else | ||
this._optionalTransform(mappedItem, next); | ||
return; | ||
} | ||
// Stop when we've reached the limit | ||
if (--this._limit === 0) | ||
this.close(); | ||
} | ||
done(); | ||
} | ||
BufferedIterator.prototype._end.call(this, destroy); | ||
}; | ||
/** | ||
Creates an iterator that wraps around a given iterator or readable stream. | ||
Use this to convert an iterator-like object into a full-featured AsyncIterator. | ||
After this operation, only read the returned iterator instead of the given one. | ||
@function | ||
@param {AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
@returns {AsyncIterator} A new iterator with the items from the given iterator | ||
*/ | ||
AsyncIterator.wrap = TransformIterator; | ||
// Prepends items to the iterator | ||
_begin(done) { | ||
this._insert(this._prepender, done); | ||
delete this._prepender; | ||
} | ||
// Appends items to the iterator | ||
_flush(done) { | ||
this._insert(this._appender, done); | ||
delete this._appender; | ||
} | ||
/** | ||
Creates a new `SimpleTransformIterator`. | ||
@constructor | ||
@classdesc An iterator that generates items based on a source iterator | ||
and simple transformation steps passed as arguments. | ||
@param {AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object|Function} [options] Settings of the iterator, or the transformation function | ||
@param {integer} [options.maxbufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {AsyncIterator} [options.source] The source this iterator generates items from | ||
@param {integer} [options.offset] The number of items to skip | ||
@param {integer} [options.limit] The maximum number of items | ||
@param {Function} [options.filter] A function to synchronously filter items from the source | ||
@param {Function} [options.map] A function to synchronously transform items from the source | ||
@param {Function} [options.transform] A function to asynchronously transform items from the source | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items | ||
@param {Array|AsyncIterator} [options.prepend] Items to insert before the source items | ||
@param {Array|AsyncIterator} [options.append] Items to insert after the source items | ||
@extends TransformIterator | ||
*/ | ||
function SimpleTransformIterator(source, options) { | ||
if (!(this instanceof SimpleTransformIterator)) | ||
return new SimpleTransformIterator(source, options); | ||
TransformIterator.call(this, source, options); | ||
// Set transformation steps from the options | ||
options = options || !isFunction(source && source.read) && source; | ||
if (options) { | ||
const transform = isFunction(options) ? options : options.transform; | ||
const { limit, offset, filter, map, prepend, append } = options; | ||
// Don't emit any items when bounds are unreachable | ||
if (offset === Infinity || limit === -Infinity) { | ||
this._limit = 0; | ||
// Inserts items in the iterator | ||
_insert(inserter, done) { | ||
const push = item => this._push(item); | ||
if (!inserter || inserter.ended) { | ||
done(); | ||
} | ||
else { | ||
if (isFinite(offset)) | ||
this._offset = Math.max(Math.floor(offset), 0); | ||
if (isFinite(limit)) | ||
this._limit = Math.max(Math.floor(limit), 0); | ||
if (isFunction(filter)) | ||
this._filter = filter; | ||
if (isFunction(map)) | ||
this._map = map; | ||
if (isFunction(transform)) | ||
this._transform = transform; | ||
inserter.on('data', push); | ||
inserter.on('end', end); | ||
} | ||
if (prepend) | ||
this._prepender = prepend.on ? prepend : new ArrayIterator(prepend); | ||
if (append) | ||
this._appender = append.on ? append : new ArrayIterator(append); | ||
function end() { | ||
inserter.removeListener('data', push); | ||
inserter.removeListener('end', end); | ||
done(); | ||
} | ||
} | ||
} | ||
TransformIterator.subclass(SimpleTransformIterator); | ||
// Default settings | ||
SimpleTransformIterator.prototype._offset = 0; | ||
SimpleTransformIterator.prototype._limit = Infinity; | ||
SimpleTransformIterator.prototype._map = null; | ||
SimpleTransformIterator.prototype._transform = null; | ||
SimpleTransformIterator.prototype._filter = function () { | ||
return true; | ||
}; | ||
Object.assign(SimpleTransformIterator.prototype, { | ||
_offset: 0, | ||
_limit: Infinity, | ||
_map: null, | ||
_transform: null, | ||
_filter: () => true, | ||
}); | ||
/* Tries to read and transform items */ | ||
SimpleTransformIterator.prototype._read = function (count, done) { | ||
const next = () => this._readAndTransformSimple(count, nextAsync, done); | ||
function nextAsync() { | ||
queueMicrotask(next); | ||
} | ||
this._readAndTransformSimple(count, nextAsync, done); | ||
}; | ||
/* Reads and transform items */ | ||
SimpleTransformIterator.prototype._readAndTransformSimple = function (count, next, done) { | ||
// Verify we have a readable source | ||
const source = this._source; | ||
let item; | ||
if (!source || source.ended) { | ||
done(); | ||
return; | ||
/** | ||
An iterator that generates items by transforming each item of a source | ||
with a different iterator. | ||
@extends module:asynciterator.TransformIterator | ||
*/ | ||
export class MultiTransformIterator extends TransformIterator { | ||
/** | ||
Creates a new `MultiTransformIterator`. | ||
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
*/ | ||
constructor(source, options) { | ||
super(source, options); | ||
this._transformerQueue = []; | ||
} | ||
// Verify we are still below the limit | ||
if (this._limit === 0) | ||
this.close(); | ||
// Try to read the next item until at least `count` items have been pushed | ||
while (!this.closed && this._pushedCount < count && (item = source.read()) !== null) { | ||
// Verify the item passes the filter and we've reached the offset | ||
if (!this._filter(item) || this._offset !== 0 && this._offset--) | ||
continue; | ||
/* Tries to read and transform items */ | ||
_read(count, done) { | ||
// Remove transformers that have ended | ||
const transformerQueue = this._transformerQueue, | ||
source = this._source, optional = this._optional; | ||
let item, head, transformer; | ||
while ((head = transformerQueue[0]) && head.transformer.ended) { | ||
// If transforming is optional, push the original item if none was pushed | ||
if (optional && head.item !== null) { | ||
count--; | ||
this._push(head.item); | ||
} | ||
// Remove listeners from the transformer | ||
head = transformerQueue.shift(); | ||
transformer = head.transformer; | ||
transformer.removeListener('end', destinationFillBuffer); | ||
transformer.removeListener('readable', destinationFillBuffer); | ||
transformer.removeListener('error', destinationEmitError); | ||
} | ||
// Synchronously map the item | ||
const mappedItem = this._map === null ? item : this._map(item); | ||
// Skip `null` items, pushing the original item if the mapping was optional | ||
if (mappedItem === null) { | ||
if (this._optional) | ||
// Create new transformers if there are less than the maximum buffer size | ||
while (source && !source.ended && transformerQueue.length < this._maxBufferSize) { | ||
// Read an item to create the next transformer | ||
item = this._source.read(); | ||
if (item === null) | ||
break; | ||
// Create the transformer and listen to its events | ||
transformer = this._createTransformer(item) || new EmptyIterator(); | ||
transformer._destination = this; | ||
transformer.on('end', destinationFillBuffer); | ||
transformer.on('readable', destinationFillBuffer); | ||
transformer.on('error', destinationEmitError); | ||
transformerQueue.push({ transformer, item }); | ||
} | ||
// Try to read `count` items from the transformer | ||
head = transformerQueue[0]; | ||
if (head) { | ||
transformer = head.transformer; | ||
while (count-- > 0 && (item = transformer.read()) !== null) { | ||
this._push(item); | ||
// If a transformed item was pushed, no need to push the original anymore | ||
if (optional) | ||
head.item = null; | ||
} | ||
} | ||
// Skip the asynchronous phase if no transformation was specified | ||
else if (this._transform === null) { | ||
this._push(mappedItem); | ||
// End the iterator if the source has ended | ||
else if (source && source.ended) { | ||
this.close(); | ||
} | ||
// Asynchronously transform the item, and wait for `next` to call back | ||
else { | ||
if (!this._optional) | ||
this._transform(mappedItem, next); | ||
else | ||
this._optionalTransform(mappedItem, next); | ||
return; | ||
} | ||
done(); | ||
} | ||
// Stop when we've reached the limit | ||
if (--this._limit === 0) | ||
this.close(); | ||
/** | ||
Creates a transformer for the given item. | ||
@param {object} item The last read item from the source | ||
@returns {module:asynciterator.AsyncIterator} An iterator that transforms the given item | ||
*/ | ||
_createTransformer(item) { | ||
return new SingletonIterator(item); | ||
} | ||
done(); | ||
}; | ||
// Prepends items to the iterator | ||
SimpleTransformIterator.prototype._begin = function (done) { | ||
this._insert(this._prepender, done); | ||
delete this._prepender; | ||
}; | ||
// Appends items to the iterator | ||
SimpleTransformIterator.prototype._flush = function (done) { | ||
this._insert(this._appender, done); | ||
delete this._appender; | ||
}; | ||
// Inserts items in the iterator | ||
SimpleTransformIterator.prototype._insert = function (inserter, done) { | ||
const push = item => this._push(item); | ||
if (!inserter || inserter.ended) { | ||
done(); | ||
/* Closes the iterator when pending items are transformed. */ | ||
_closeWhenDone() { | ||
// Only close if all transformers are read | ||
if (!this._transformerQueue.length) | ||
this.close(); | ||
} | ||
else { | ||
inserter.on('data', push); | ||
inserter.on('end', end); | ||
} | ||
function end() { | ||
inserter.removeListener('data', push); | ||
inserter.removeListener('end', end); | ||
done(); | ||
} | ||
}; | ||
} | ||
/** | ||
Transforms items from this iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {object|Function} [options] Settings of the iterator, or the transformation function | ||
@param {integer} [options.maxbufferSize=4] The maximum number of items to keep in the buffer | ||
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction | ||
@param {integer} [options.offset] The number of items to skip | ||
@param {integer} [options.limit] The maximum number of items | ||
@param {Function} [options.filter] A function to synchronously filter items from the source | ||
@param {Function} [options.map] A function to synchronously transform items from the source | ||
@param {Function} [options.transform] A function to asynchronously transform items from the source | ||
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items | ||
@param {Array|AsyncIterator} [options.prepend] Items to insert before the source items | ||
@param {Array|AsyncIterator} [options.append] Items to insert after the source items | ||
@returns {AsyncIterator} A new iterator that maps the items from this iterator | ||
*/ | ||
AsyncIterator.prototype.transform = function (options) { | ||
return new SimpleTransformIterator(this, options); | ||
}; | ||
/** | ||
Maps items from this iterator using the given function. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Function} mapper A mapping function to call on this iterator's (remaining) items | ||
@param {object?} self The `this` pointer for the mapping function | ||
@returns {AsyncIterator} A new iterator that maps the items from this iterator | ||
An iterator that copies items from another iterator. | ||
@extends module:asynciterator.TransformIterator | ||
*/ | ||
AsyncIterator.prototype.map = function (mapper, self) { | ||
return this.transform({ map: self ? mapper.bind(self) : mapper }); | ||
}; | ||
export class ClonedIterator extends TransformIterator { | ||
/** | ||
Creates a new `ClonedIterator`. | ||
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator copies items from | ||
*/ | ||
constructor(source) { | ||
super(source, { autoStart: false }); | ||
this._reading = false; | ||
this._readPosition = 0; | ||
} | ||
/** | ||
Return items from this iterator that match the filter. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Function} filter A filter function to call on this iterator's (remaining) items | ||
@param {object?} self The `this` pointer for the filter function | ||
@returns {AsyncIterator} A new iterator that filters items from this iterator | ||
*/ | ||
AsyncIterator.prototype.filter = function (filter, self) { | ||
return this.transform({ filter: self ? filter.bind(self) : filter }); | ||
}; | ||
/** | ||
Prepends the items after those of the current iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|AsyncIterator} items Items to insert before this iterator's (remaining) items | ||
@returns {AsyncIterator} A new iterator that prepends items to this iterator | ||
*/ | ||
AsyncIterator.prototype.prepend = function (items) { | ||
return this.transform({ prepend: items }); | ||
}; | ||
/** | ||
Appends the items after those of the current iterator. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|AsyncIterator} items Items to insert after this iterator's (remaining) items | ||
@returns {AsyncIterator} A new iterator that appends items to this iterator | ||
*/ | ||
AsyncIterator.prototype.append = function (items) { | ||
return this.transform({ append: items }); | ||
}; | ||
/** | ||
Surrounds items of the current iterator with the given items. | ||
After this operation, only read the returned iterator instead of the current one. | ||
@param {Array|AsyncIterator} prepend Items to insert before this iterator's (remaining) items | ||
@param {Array|AsyncIterator} append Items to insert after this iterator's (remaining) items | ||
@returns {AsyncIterator} A new iterator that appends and prepends items to this iterator | ||
*/ | ||
AsyncIterator.prototype.surround = function (prepend, append) { | ||
return this.transform({ prepend, append }); | ||
}; | ||
/** | ||
Skips the given number of items from the current iterator. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} offset The number of items to skip | ||
@returns {AsyncIterator} A new iterator that skips the given number of items | ||
*/ | ||
AsyncIterator.prototype.skip = function (offset) { | ||
return this.transform({ offset }); | ||
}; | ||
/** | ||
Limits the current iterator to the given number of items. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} limit The maximum number of items | ||
@returns {AsyncIterator} A new iterator with at most the given number of items | ||
*/ | ||
AsyncIterator.prototype.take = function (limit) { | ||
return this.transform({ limit }); | ||
}; | ||
/** | ||
Limits the current iterator to the given range. | ||
The current iterator may not be read anymore until the returned iterator ends. | ||
@param {integer} start Index of the first item to return | ||
@param {integer} end Index of the last item to return | ||
@returns {AsyncIterator} A new iterator with items in the given range | ||
*/ | ||
AsyncIterator.prototype.range = function (start, end) { | ||
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) }); | ||
}; | ||
/** | ||
Creates a new `MultiTransformIterator`. | ||
@constructor | ||
@classdesc An iterator that generates items by transforming each item of a source | ||
with a different iterator. | ||
@param {AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
@extends TransformIterator | ||
*/ | ||
function MultiTransformIterator(source, options) { | ||
if (!(this instanceof MultiTransformIterator)) | ||
return new MultiTransformIterator(source, options); | ||
TransformIterator.call(this, source, options); | ||
this._transformerQueue = []; | ||
} | ||
TransformIterator.subclass(MultiTransformIterator); | ||
/* Tries to read and transform items */ | ||
MultiTransformIterator.prototype._read = function (count, done) { | ||
// Remove transformers that have ended | ||
const transformerQueue = this._transformerQueue, | ||
source = this._source, optional = this._optional; | ||
let item, head, transformer; | ||
while ((head = transformerQueue[0]) && head.transformer.ended) { | ||
// If transforming is optional, push the original item if none was pushed | ||
if (optional && head.item !== null) { | ||
count--; | ||
this._push(head.item); | ||
} | ||
// Remove listeners from the transformer | ||
head = transformerQueue.shift(); | ||
transformer = head.transformer; | ||
transformer.removeListener('end', destinationFillBuffer); | ||
transformer.removeListener('readable', destinationFillBuffer); | ||
transformer.removeListener('error', destinationEmitError); | ||
_init() { | ||
// skip buffered iterator initialization, since we read from history | ||
} | ||
// Create new transformers if there are less than the maximum buffer size | ||
while (source && !source.ended && transformerQueue.length < this._maxBufferSize) { | ||
// Read an item to create the next transformer | ||
item = this._source.read(); | ||
if (item === null) | ||
break; | ||
// Create the transformer and listen to its events | ||
transformer = this._createTransformer(item) || new EmptyIterator(); | ||
transformer._destination = this; | ||
transformer.on('end', destinationFillBuffer); | ||
transformer.on('readable', destinationFillBuffer); | ||
transformer.on('error', destinationEmitError); | ||
transformerQueue.push({ transformer, item }); | ||
close() { | ||
// skip buffered iterator cleanup | ||
AsyncIterator.prototype.close.call(this); | ||
} | ||
// Try to read `count` items from the transformer | ||
head = transformerQueue[0]; | ||
if (head) { | ||
transformer = head.transformer; | ||
while (count-- > 0 && (item = transformer.read()) !== null) { | ||
this._push(item); | ||
// If a transformed item was pushed, no need to push the original anymore | ||
if (optional) | ||
head.item = null; | ||
} | ||
// The source this iterator copies items from | ||
get source() { | ||
return this._source; | ||
} | ||
// End the iterator if the source has ended | ||
else if (source && source.ended) { | ||
this.close(); | ||
} | ||
done(); | ||
}; | ||
/** | ||
Creates a transformer for the given item. | ||
@param {object} item The last read item from the source | ||
@returns {AsyncIterator} An iterator that transforms the given item | ||
*/ | ||
MultiTransformIterator.prototype._createTransformer = SingletonIterator; | ||
/* Closes the iterator when pending items are transformed. */ | ||
MultiTransformIterator.prototype._closeWhenDone = function () { | ||
// Only close if all transformers are read | ||
if (!this._transformerQueue.length) | ||
this.close(); | ||
}; | ||
/** | ||
Creates a new `ClonedIterator`. | ||
@constructor | ||
@classdesc An iterator that copies items from another iterator. | ||
@param {AsyncIterator|Readable} [source] The source this iterator copies items from | ||
@extends TransformIterator | ||
*/ | ||
function ClonedIterator(source) { | ||
if (!(this instanceof ClonedIterator)) | ||
return new ClonedIterator(source); | ||
// Although ClonedIterator inherits from TransformIterator and hence BufferedIterator, | ||
// we do not need the buffering because items arrive directly from a history buffer. | ||
// Therefore, initialize as an AsyncIterator, which does not set up buffering. | ||
AsyncIterator.call(this); | ||
this._readPosition = 0; | ||
if (source) | ||
this.source = source; | ||
} | ||
TransformIterator.subclass(ClonedIterator); | ||
// The source this iterator copies items from | ||
Object.defineProperty(ClonedIterator.prototype, 'source', { | ||
get() { | ||
return this._source; | ||
}, | ||
set(source) { | ||
set source(source) { | ||
// Validate and set the source | ||
@@ -1480,156 +1425,187 @@ let history = source && source._destination; | ||
const callbacks = propertyCallbacks[propertyName]; | ||
for (let i = 0; i < callbacks.length; i++) | ||
getSourceProperty(this, source, propertyName, callbacks[i]); | ||
for (const callback of callbacks) | ||
this._getSourceProperty(source, propertyName, callback); | ||
} | ||
}, | ||
enumerable: true, | ||
}); | ||
} | ||
// Retrieves the property with the given name from the clone or its source. | ||
ClonedIterator.prototype.getProperty = function (propertyName, callback) { | ||
const properties = this._properties, source = this._source, | ||
hasProperty = properties && (propertyName in properties); | ||
// If no callback was passed, return the property value | ||
if (!callback) | ||
return hasProperty ? properties[propertyName] : source && source.getProperty(propertyName); | ||
// Try to look up the property in this clone | ||
AsyncIterator.prototype.getProperty.call(this, propertyName, callback); | ||
// If the property is not set on this clone, it might become set on the source first | ||
if (source && !hasProperty) | ||
getSourceProperty(this, source, propertyName, callback); | ||
return undefined; | ||
}; | ||
// Retrieves the property with the given name from the source | ||
function getSourceProperty(clone, source, propertyName, callback) { | ||
source.getProperty(propertyName, value => { | ||
// Only send the source's property if it was not set on the clone in the meantime | ||
if (!clone._properties || !(propertyName in clone._properties)) | ||
callback(value); | ||
}); | ||
} | ||
// Retrieves the property with the given name from the clone or its source. | ||
getProperty(propertyName, callback) { | ||
const properties = this._properties, source = this._source, | ||
hasProperty = properties && (propertyName in properties); | ||
// If no callback was passed, return the property value | ||
if (!callback) | ||
return hasProperty ? properties[propertyName] : source && source.getProperty(propertyName); | ||
// Try to look up the property in this clone | ||
super.getProperty(propertyName, callback); | ||
// If the property is not set on this clone, it might become set on the source first | ||
if (source && !hasProperty) | ||
this._getSourceProperty(source, propertyName, callback); | ||
return undefined; | ||
} | ||
// Retrieves all properties of the iterator and its source. | ||
ClonedIterator.prototype.getProperties = function () { | ||
const base = this._source ? this._source.getProperties() : {}, properties = this._properties; | ||
for (const name in properties) | ||
base[name] = properties[name]; | ||
return base; | ||
}; | ||
// Retrieves the property with the given name from the source | ||
_getSourceProperty(source, propertyName, callback) { | ||
source.getProperty(propertyName, value => { | ||
// Only send the source's property if it was not set on the clone in the meantime | ||
if (!this._properties || !(propertyName in this._properties)) | ||
callback(value); | ||
}); | ||
} | ||
/* Generates details for a textual representation of the iterator. */ | ||
ClonedIterator.prototype._toStringDetails = function () { | ||
const source = this._source; | ||
return `{source: ${ source ? source.toString() : 'none' }}`; | ||
}; | ||
// Retrieves all properties of the iterator and its source. | ||
getProperties() { | ||
const base = this._source ? this._source.getProperties() : {}, properties = this._properties; | ||
for (const name in properties) | ||
base[name] = properties[name]; | ||
return base; | ||
} | ||
// Stores the history of a source, so it can be cloned | ||
function HistoryReader(source) { | ||
const history = []; | ||
let clones; | ||
/* Generates details for a textual representation of the iterator. */ | ||
_toStringDetails() { | ||
const source = this._source; | ||
return `{source: ${ source ? source.toString() : 'none' }}`; | ||
} | ||
// Tries to read the item at the given history position | ||
this.readAt = function (pos) { | ||
/* Tries to read an item */ | ||
read() { | ||
const source = this._source; | ||
let item = null; | ||
// Retrieve an item from history when available | ||
if (pos < history.length) | ||
item = history[pos]; | ||
// Read a new item from the source when possible | ||
else if (!source.ended && (item = source.read()) !== null) | ||
history[pos] = item; | ||
if (!this.done && source) { | ||
// Try to read an item at the current point in history | ||
const history = source._destination; | ||
if ((item = history.readAt(this._readPosition)) !== null) | ||
this._readPosition++; | ||
else | ||
this.readable = false; | ||
// Close the iterator if we are at the end of the source | ||
if (history.endsAt(this._readPosition)) | ||
this.close(); | ||
} | ||
return item; | ||
}; | ||
} | ||
// Determines whether the given position is the end of the source | ||
this.endsAt = function (pos) { | ||
return pos === history.length && source.ended; | ||
}; | ||
/* End the iterator and cleans up. */ | ||
_end(destroy) { | ||
// Unregister from a possible history reader | ||
const history = this._source && this._source._destination; | ||
if (history) | ||
history.unregister(this); | ||
// Registers a clone for history updates | ||
this.register = function (clone) { | ||
if (clones) | ||
clones.push(clone); | ||
}; | ||
// Don't call TransformIterator#_end, | ||
// as it would make the source inaccessible for other clones | ||
BufferedIterator.prototype._end.call(this, destroy); | ||
} | ||
} | ||
// Unregisters a clone for history updates | ||
this.unregister = function (clone) { | ||
let cloneIndex; | ||
if (clones && (cloneIndex = clones.indexOf(clone)) >= 0) | ||
clones.splice(cloneIndex, 1); | ||
}; | ||
// Listen to source events to trigger events in subscribed clones | ||
if (!source.ended) { | ||
clones = []; | ||
source.on('readable', clonesMakeReadable); | ||
source.on('end', clonesEnd); | ||
source.on('error', clonesEmitError); | ||
// Stores the history of a source, so it can be cloned | ||
class HistoryReader { | ||
constructor(source) { | ||
this._source = source; | ||
this._clones = null; | ||
this._history = []; | ||
// If the source can still emit items, set up cloning | ||
if (!source.ended) { | ||
// When the source becomes readable, makes all clones readable | ||
const setReadable = () => { | ||
for (const clone of this._clones) | ||
clone.readable = true; | ||
}; | ||
// When the source errors, re-emits the error | ||
const emitError = error => { | ||
for (const clone of this._clones) | ||
clone.emit('error', error); | ||
}; | ||
// When the source ends, closes all clones that are fully read | ||
const end = () => { | ||
// Close the clone if all items had been emitted | ||
for (const clone of this._clones) { | ||
if (clone._readPosition === this._history.length) | ||
clone.close(); | ||
} | ||
this._clones = null; | ||
// Remove source listeners, since no further events will be emitted | ||
source.removeListener('end', end); | ||
source.removeListener('error', emitError); | ||
source.removeListener('readable', setReadable); | ||
}; | ||
// Listen to source events to trigger events in subscribed clones | ||
this._clones = []; | ||
source.on('end', end); | ||
source.on('error', emitError); | ||
source.on('readable', setReadable); | ||
} | ||
} | ||
// When the source becomes readable, makes all clones readable | ||
function clonesMakeReadable() { | ||
for (let i = 0; i < clones.length; i++) | ||
clones[i].readable = true; | ||
// Registers a clone for history updates | ||
register(clone) { | ||
if (this._clones !== null) | ||
this._clones.push(clone); | ||
} | ||
// When the source ends, closes all clones that are fully read | ||
function clonesEnd() { | ||
for (let i = 0; i < clones.length; i++) { | ||
if (clones[i]._readPosition === history.length) | ||
clones[i].close(); | ||
} | ||
clones = null; | ||
source.removeListener('end', clonesEnd); | ||
source.removeListener('error', clonesEmitError); | ||
source.removeListener('readable', clonesMakeReadable); | ||
// Unregisters a clone for history updates | ||
unregister(clone) { | ||
if (this._clones !== null) | ||
this._clones = this._clones.filter(c => c !== clone); | ||
} | ||
// When the source errors, re-emits the error | ||
function clonesEmitError(error) { | ||
for (let i = 0; i < clones.length; i++) | ||
clones[i].emit('error', error); | ||
// Tries to read the item at the given history position | ||
readAt(pos) { | ||
let item = null; | ||
// Retrieve an item from history when available | ||
if (pos < this._history.length) | ||
item = this._history[pos]; | ||
// Read a new item from the source when possible | ||
else if (!this._source.ended && (item = this._source.read()) !== null) | ||
this._history[pos] = item; | ||
return item; | ||
} | ||
} | ||
/* Tries to read an item */ | ||
ClonedIterator.prototype.read = function () { | ||
const source = this._source; | ||
let item = null; | ||
if (!this.done && source) { | ||
// Try to read an item at the current point in history | ||
const history = source._destination; | ||
if ((item = history.readAt(this._readPosition)) !== null) | ||
this._readPosition++; | ||
else | ||
this.readable = false; | ||
// Close the iterator if we are at the end of the source | ||
if (history.endsAt(this._readPosition)) | ||
this.close(); | ||
// Determines whether the given position is the end of the source | ||
endsAt(pos) { | ||
return this._source.ended && this._history.length === pos; | ||
} | ||
return item; | ||
}; | ||
} | ||
/* End the iterator and cleans up. */ | ||
ClonedIterator.prototype._end = function (destroy) { | ||
// Unregister from a possible history reader | ||
const history = this._source && this._source._destination; | ||
if (history) | ||
history.unregister(this); | ||
/** | ||
Creates an iterator that wraps around a given iterator or readable stream. | ||
Use this to convert an iterator-like object into a full-featured AsyncIterator. | ||
After this operation, only read the returned iterator instead of the given one. | ||
@function | ||
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from | ||
@param {object} [options] Settings of the iterator | ||
@returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator | ||
*/ | ||
export function wrap(source, options) { | ||
return new TransformIterator(source, options); | ||
} | ||
// Don't call TransformIterator#_end, | ||
// as it would make the source inaccessible for other clones | ||
BufferedIterator.prototype._end.call(this, destroy); | ||
}; | ||
/** | ||
Creates an empty iterator. | ||
*/ | ||
export function empty() { | ||
return new EmptyIterator(); | ||
} | ||
// Disable buffer cleanup | ||
ClonedIterator.prototype.close = AsyncIterator.prototype.close; | ||
/** | ||
Creates an iterator with a single item. | ||
@param {object} item the item | ||
*/ | ||
export function single(item) { | ||
return new SingletonIterator(item); | ||
} | ||
/** | ||
Creates a copy of the current iterator, | ||
containing all items emitted from this point onward. | ||
Further copies can be created; they will all start from this same point. | ||
After this operation, only read the returned copies instead of the original iterator. | ||
@returns {AsyncIterator} A new iterator that contains all future items of this iterator | ||
*/ | ||
AsyncIterator.prototype.clone = function () { | ||
return new ClonedIterator(this); | ||
}; | ||
Creates an iterator for the given array. | ||
@param {Array} items the items | ||
*/ | ||
export function fromArray(items) { | ||
return new ArrayIterator(items); | ||
} | ||
// Determines whether the given object is a function | ||
@@ -1639,14 +1615,1 @@ function isFunction(object) { | ||
} | ||
// Export all submodules | ||
module.exports = AsyncIterator; | ||
AsyncIterator.AsyncIterator = AsyncIterator; | ||
AsyncIterator.EmptyIterator = AsyncIterator.empty = EmptyIterator; | ||
AsyncIterator.SingletonIterator = AsyncIterator.single = SingletonIterator; | ||
AsyncIterator.ArrayIterator = AsyncIterator.fromArray = ArrayIterator; | ||
AsyncIterator.IntegerIterator = IntegerIterator; | ||
AsyncIterator.BufferedIterator = BufferedIterator; | ||
AsyncIterator.TransformIterator = TransformIterator; | ||
AsyncIterator.SimpleTransformIterator = SimpleTransformIterator; | ||
AsyncIterator.MultiTransformIterator = MultiTransformIterator; | ||
AsyncIterator.ClonedIterator = ClonedIterator; |
{ | ||
"name": "asynciterator", | ||
"version": "3.0.0-alpha.0", | ||
"version": "3.0.0-beta.0", | ||
"description": "An asynchronous iterator library for advanced object pipelines.", | ||
"author": "Ruben Verborgh <ruben@verborgh.org>", | ||
"main": "asynciterator.js", | ||
"type": "module", | ||
"main": "./asynciterator.cjs", | ||
"exports": { | ||
"import": "./asynciterator.js", | ||
"require": "./asynciterator.cjs" | ||
}, | ||
"module": "./asynciterator.js", | ||
"sideEffects": false, | ||
"files": [ | ||
"asynciterator.js" | ||
"asynciterator.js", | ||
"asynciterator.cjs" | ||
], | ||
"scripts": { | ||
"test": "mocha", | ||
"cover": "nyc -- mocha -R dot", | ||
"build": "babel asynciterator.js > asynciterator.cjs", | ||
"prepare": "npm run build", | ||
"test": "c8 mocha", | ||
"lint": "eslint *.js test", | ||
@@ -18,20 +27,20 @@ "docs": "jsdoc -c jsdoc.json README.md *.js" | ||
"repository": "RubenVerborgh/AsyncIterator", | ||
"dependencies": { | ||
"queue-microtask": "^1.1.2" | ||
}, | ||
"devDependencies": { | ||
"@babel/cli": "^7.10.1", | ||
"@babel/core": "^7.10.2", | ||
"@babel/plugin-transform-modules-commonjs": "^7.10.1", | ||
"@babel/register": "^7.10.1", | ||
"c8": "^7.2.0", | ||
"chai": "^4.2.0", | ||
"eslint": "^5.15.1", | ||
"husky": "^4.2.5", | ||
"jaguarjs-jsdoc": "^1.1.0", | ||
"jsdoc": "^3.5.5", | ||
"mocha": "^6.0.2", | ||
"nyc": "^15.1.0", | ||
"pre-commit": "^1.2.2", | ||
"mocha": "^8.0.0", | ||
"sinon": "^7.2.7", | ||
"sinon-chai": "^3.3.0" | ||
}, | ||
"pre-commit": [ | ||
"lint", | ||
"cover" | ||
], | ||
"dependencies": { | ||
"queue-microtask": "^1.1.2" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
115053
5
2794
Yes
13