functional-streams
Advanced tools
Comparing version 1.0.10 to 1.0.11
@@ -11,4 +11,4 @@ /// <reference types="node" /> | ||
declare class MapTransform<T, U> extends Transform { | ||
_mapFunction: (t: T) => (U | Promise<U>); | ||
constructor(mapFunction: (t: T) => (U | Promise<U>)); | ||
_mapFunction: (t: T) => U | Promise<U>; | ||
constructor(mapFunction: (t: T) => U | Promise<U>); | ||
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void): void; | ||
@@ -18,3 +18,3 @@ } | ||
_filterFunction: (t: T) => boolean | Promise<boolean>; | ||
constructor(filterFunction: (t: T) => (boolean | Promise<boolean>)); | ||
constructor(filterFunction: (t: T) => boolean | Promise<boolean>); | ||
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void): void; | ||
@@ -21,0 +21,0 @@ } |
@@ -13,3 +13,3 @@ "use strict"; | ||
if (this._buffer.length === this._batchSize) { | ||
this.push((this._buffer)); | ||
this.push(this._buffer); | ||
this._buffer = []; | ||
@@ -21,3 +21,3 @@ } | ||
if (this._buffer.length > 0) { | ||
this.push((this._buffer)); | ||
this.push(this._buffer); | ||
} | ||
@@ -29,3 +29,3 @@ callback(); | ||
function isPromise(t) { | ||
return (t && typeof t.then === 'function'); | ||
return t && typeof t.then === 'function'; | ||
} | ||
@@ -43,4 +43,4 @@ class MapTransform extends stream_1.Transform { | ||
if (isPromise(result)) { | ||
result.then((syncResult) => { | ||
this.push((syncResult)); | ||
result.then(syncResult => { | ||
this.push(syncResult); | ||
callback(); | ||
@@ -50,3 +50,3 @@ }); | ||
else { | ||
this.push((result)); | ||
this.push(result); | ||
callback(); | ||
@@ -74,3 +74,3 @@ } | ||
if (isPromise(result)) { | ||
result.then((syncResult) => { | ||
result.then(syncResult => { | ||
filter(syncResult); | ||
@@ -98,3 +98,3 @@ }); | ||
}); | ||
this.on('error', (e) => { | ||
this.on('error', e => { | ||
this._promise.reject(e); | ||
@@ -106,6 +106,6 @@ }); | ||
if (isPromise(result)) { | ||
result.then((r) => { | ||
result.then(r => { | ||
this._cumulative = r; | ||
callback(); | ||
}, (e) => { | ||
}, e => { | ||
this._errored = true; | ||
@@ -121,10 +121,8 @@ callback(e); | ||
then(f, e) { | ||
return this._promise.promise | ||
.then(f, e); | ||
return this._promise.promise.then(f, e); | ||
} | ||
catch(e) { | ||
return this._promise.promise | ||
.catch(e); | ||
return this._promise.promise.catch(e); | ||
} | ||
} | ||
exports.Reduce = ReduceTransform; |
@@ -26,6 +26,6 @@ "use strict"; | ||
mapStream.on('readable', () => { | ||
const result = mapStream.read(); | ||
if (result === null) | ||
return; | ||
dest.push(result); | ||
let result; | ||
while ((result = mapStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -36,4 +36,4 @@ source.forEach(sourceStream.write.bind(sourceStream)); | ||
mapStream.on('end', resolve); | ||
}) | ||
.then(() => { | ||
mapStream.on('error', reject); | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, source.map(myMap)); | ||
@@ -54,6 +54,6 @@ }); | ||
filterStream.on('readable', () => { | ||
const result = filterStream.read(); | ||
if (result === null) | ||
return; | ||
dest.push(result); | ||
let result; | ||
while ((result = filterStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -64,4 +64,3 @@ source.forEach(sourceStream.write.bind(sourceStream)); | ||
filterStream.on('end', resolve); | ||
}) | ||
.then(() => { | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, source.filter(myFilter)); | ||
@@ -82,4 +81,3 @@ }); | ||
sourceStream.end(); | ||
return reduceStream | ||
.then((result) => { | ||
return reduceStream.then(result => { | ||
assert.deepStrictEqual(result, source.reduce(myReduce, '')); | ||
@@ -98,3 +96,2 @@ }); | ||
} | ||
; | ||
return this.push(this.data[this.i++]); | ||
@@ -108,4 +105,3 @@ } | ||
sourceStream.pipe(mapStream).pipe(reduceStream); | ||
return reduceStream | ||
.then((result) => { | ||
return reduceStream.then(result => { | ||
assert.deepStrictEqual(result, '246'); | ||
@@ -167,3 +163,3 @@ }); | ||
r = yield reduceStream; | ||
assert.throws(() => r, 'expected'); | ||
assert.throws(() => r, Error, 'expected'); | ||
} | ||
@@ -174,3 +170,5 @@ catch (e) { | ||
} | ||
assert.throws(() => { throw e; }, 'expected'); | ||
assert.throws(() => { | ||
throw e; | ||
}, Error, 'expected'); | ||
} | ||
@@ -189,4 +187,3 @@ })); | ||
sourceStream.pipe(reduceStream); | ||
return reduceStream | ||
.then((r) => { | ||
return reduceStream.then((r) => { | ||
assert.throws(() => r, 'expected'); | ||
@@ -207,6 +204,6 @@ }, (e) => { | ||
batchStream.on('readable', () => { | ||
const result = batchStream.read(); | ||
if (result === null) | ||
return; | ||
dest.push(result); | ||
let result; | ||
while ((result = batchStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -217,4 +214,3 @@ source.forEach(sourceStream.write.bind(sourceStream)); | ||
batchStream.on('end', resolve); | ||
}) | ||
.then(() => { | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, [[0, 1, 2], [3, 4, 5], [6]]); | ||
@@ -221,0 +217,0 @@ }); |
{ | ||
"name": "functional-streams", | ||
"version": "1.0.10", | ||
"version": "1.0.11", | ||
"description": "Native implementations of map, reduce, and batch, to act on streams.", | ||
@@ -5,0 +5,0 @@ "main": "build/streams.js", |
@@ -10,2 +10,2 @@ declare module 'promise-native-deferred' { | ||
export = Deferred; | ||
} | ||
} |
@@ -1,5 +0,4 @@ | ||
import {Transform, Readable, Writable} from 'stream'; | ||
import { Transform, Readable, Writable } from 'stream'; | ||
class BatchTransform<T> extends Transform { | ||
_batchSize: number; | ||
@@ -9,3 +8,3 @@ _buffer: T[]; | ||
constructor(batchSize: number) { | ||
super({objectMode: true}); | ||
super({ objectMode: true }); | ||
this._batchSize = batchSize; | ||
@@ -15,7 +14,11 @@ this._buffer = []; | ||
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void) { | ||
_transform( | ||
data: T, | ||
encoding: string, | ||
callback: (error?: any, result?: any) => void | ||
) { | ||
this._buffer.push(data); | ||
if (this._buffer.length === this._batchSize) { | ||
this.push((this._buffer)); | ||
this.push(this._buffer); | ||
this._buffer = []; | ||
@@ -27,5 +30,5 @@ } | ||
_flush(callback: (error?: any, result?: any) => void ) { | ||
_flush(callback: (error?: any, result?: any) => void) { | ||
if (this._buffer.length > 0) { | ||
this.push((this._buffer)); | ||
this.push(this._buffer); | ||
} | ||
@@ -38,11 +41,10 @@ | ||
function isPromise<T>(t: any): t is Promise<T> { | ||
return (t && typeof t.then === 'function'); | ||
return t && typeof t.then === 'function'; | ||
} | ||
class MapTransform<T, U> extends Transform { | ||
_mapFunction: (t: T) => (U | Promise<U>) | ||
constructor(mapFunction : (t: T) => (U | Promise<U>) ) { | ||
super({objectMode: true}); | ||
class MapTransform<T, U> extends Transform { | ||
_mapFunction: (t: T) => U | Promise<U>; | ||
constructor(mapFunction: (t: T) => U | Promise<U>) { | ||
super({ objectMode: true }); | ||
if (typeof mapFunction !== 'function') { | ||
@@ -53,11 +55,15 @@ throw new TypeError('mapFunction must be a function'); | ||
} | ||
_transform(data: T , encoding: string , callback: (error?: any, result?: any) => void) { | ||
const result = this._mapFunction(data) ; | ||
_transform( | ||
data: T, | ||
encoding: string, | ||
callback: (error?: any, result?: any) => void | ||
) { | ||
const result = this._mapFunction(data); | ||
if (isPromise(result)) { | ||
result.then( (syncResult) => { | ||
this.push((syncResult)); | ||
result.then(syncResult => { | ||
this.push(syncResult); | ||
callback(); | ||
}); | ||
} else { | ||
this.push((result)); | ||
this.push(result); | ||
callback(); | ||
@@ -69,7 +75,6 @@ } | ||
class FilterTransform<T> extends Transform { | ||
_filterFunction: (t: T) => boolean | Promise<boolean> | ||
constructor(filterFunction: (t:T) => (boolean | Promise<boolean>) ) { | ||
super({objectMode: true}); | ||
_filterFunction: (t: T) => boolean | Promise<boolean>; | ||
constructor(filterFunction: (t: T) => boolean | Promise<boolean>) { | ||
super({ objectMode: true }); | ||
if (typeof filterFunction !== 'function') { | ||
@@ -80,3 +85,7 @@ throw new TypeError('filterFunction must be a function'); | ||
} | ||
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void) { | ||
_transform( | ||
data: T, | ||
encoding: string, | ||
callback: (error?: any, result?: any) => void | ||
) { | ||
const result = this._filterFunction(data); | ||
@@ -90,5 +99,5 @@ | ||
}; | ||
if (isPromise(result)) { | ||
result.then( (syncResult) => { | ||
result.then(syncResult => { | ||
filter(syncResult); | ||
@@ -105,4 +114,6 @@ }); | ||
class ReduceTransform<T, R> extends Writable implements PromiseLike<R> { | ||
private _reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>; | ||
private _reduceFunction: ( | ||
cumulative: R | undefined, | ||
newItem: T | ||
) => R | Promise<R>; | ||
private _cumulative: R; | ||
@@ -112,6 +123,20 @@ private _errored: boolean = false; | ||
constructor(reduceFunction: (cumulative: R, newItem: T) => R | Promise<R>, begin: R); | ||
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>); | ||
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>, begin?: R) { | ||
super({objectMode: true}); | ||
constructor( | ||
reduceFunction: (cumulative: R, newItem: T) => R | Promise<R>, | ||
begin: R | ||
); | ||
constructor( | ||
reduceFunction: ( | ||
cumulative: R | undefined, | ||
newItem: T | ||
) => R | Promise<R> | ||
); | ||
constructor( | ||
reduceFunction: ( | ||
cumulative: R | undefined, | ||
newItem: T | ||
) => R | Promise<R>, | ||
begin?: R | ||
) { | ||
super({ objectMode: true }); | ||
this._reduceFunction = reduceFunction; | ||
@@ -127,19 +152,25 @@ this._cumulative = begin!; | ||
}); | ||
this.on('error', (e) => { | ||
this.on('error', e => { | ||
this._promise.reject(e); | ||
}) | ||
}); | ||
} | ||
_write(data: T, encoding: string, callback: (error?: any, result?: any) => void) { | ||
_write( | ||
data: T, | ||
encoding: string, | ||
callback: (error?: any, result?: any) => void | ||
) { | ||
const result = this._reduceFunction(this._cumulative, data); | ||
if (isPromise(result)) { | ||
result.then( (r) => { | ||
this._cumulative = r; | ||
callback(); | ||
}, (e) => { | ||
this._errored = true; | ||
callback(e); | ||
}); | ||
result.then( | ||
r => { | ||
this._cumulative = r; | ||
callback(); | ||
}, | ||
e => { | ||
this._errored = true; | ||
callback(e); | ||
} | ||
); | ||
} else { | ||
@@ -151,19 +182,16 @@ this._cumulative = result; | ||
then<T>(f: (r: R) => T | Promise<T>, e?: (e: Error) => any) { | ||
return this._promise.promise | ||
.then(f, e); | ||
return this._promise.promise.then(f, e); | ||
} | ||
catch(e?: (e: Error) => any) { | ||
return this._promise.promise | ||
.catch(e); | ||
return this._promise.promise.catch(e); | ||
} | ||
} | ||
export {BatchTransform as Batch, MapTransform as Map, FilterTransform as Filter, ReduceTransform as Reduce}; | ||
export { | ||
BatchTransform as Batch, | ||
MapTransform as Map, | ||
FilterTransform as Filter, | ||
ReduceTransform as Reduce | ||
}; |
133
src/test.ts
@@ -5,9 +5,14 @@ ///<reference types="mocha" /> | ||
import {PassThrough, Readable} from 'stream'; | ||
import {Map as MapStream, Filter as FilterStream, Batch, Reduce as ReduceStream} from './streams'; | ||
import { PassThrough, Readable } from 'stream'; | ||
import { | ||
Map as MapStream, | ||
Filter as FilterStream, | ||
Batch, | ||
Reduce as ReduceStream | ||
} from './streams'; | ||
describe('MapStream', () => { | ||
it('should function as a normal map', () => { | ||
const source = [0,1,2,3,4,5]; | ||
const sourceStream = new PassThrough({objectMode: true}); | ||
const source = [0, 1, 2, 3, 4, 5]; | ||
const sourceStream = new PassThrough({ objectMode: true }); | ||
const dest: string[] = []; | ||
@@ -23,6 +28,7 @@ | ||
mapStream.on('readable', () => { | ||
const result = mapStream.read(); | ||
if (result === null) return; | ||
let result; | ||
dest.push(result); | ||
while ((result = mapStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -35,14 +41,13 @@ | ||
mapStream.on('end', resolve); | ||
}) | ||
.then( () => { | ||
mapStream.on('error', reject); | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, source.map(myMap)); | ||
}); | ||
}); | ||
}); | ||
}) | ||
describe('FilterStream', () => { | ||
it('should function as a normal filter', () => { | ||
const source = [0,1,2,3,4,5]; | ||
const sourceStream = new PassThrough({objectMode: true}); | ||
const source = [0, 1, 2, 3, 4, 5]; | ||
const sourceStream = new PassThrough({ objectMode: true }); | ||
const dest: string[] = []; | ||
@@ -58,6 +63,7 @@ | ||
filterStream.on('readable', () => { | ||
const result = filterStream.read(); | ||
if (result === null) return; | ||
let result; | ||
dest.push(result); | ||
while ((result = filterStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -70,13 +76,12 @@ | ||
filterStream.on('end', resolve); | ||
}) | ||
.then( () => { | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, source.filter(myFilter)); | ||
}); | ||
}) | ||
}) | ||
}); | ||
}); | ||
describe('ReduceStream', () => { | ||
it('should function as a normal reduce', () => { | ||
const source = [0,1,2,3,4,5]; | ||
const sourceStream = new PassThrough({objectMode: true}); | ||
const source = [0, 1, 2, 3, 4, 5]; | ||
const sourceStream = new PassThrough({ objectMode: true }); | ||
@@ -90,20 +95,16 @@ function myReduce(result: string, n: number): string { | ||
source.forEach(sourceStream.write.bind(sourceStream)); | ||
sourceStream.end(); | ||
return reduceStream | ||
.then( (result) => { | ||
return reduceStream.then(result => { | ||
assert.deepStrictEqual(result, source.reduce(myReduce, '')); | ||
}); | ||
}) | ||
}); | ||
class SourceStream extends Readable { | ||
data = [1, 2, 3] | ||
data = [1, 2, 3]; | ||
i = 0; | ||
constructor() { | ||
super({objectMode: true}); | ||
super({ objectMode: true }); | ||
} | ||
@@ -113,4 +114,4 @@ | ||
if (this.i >= this.data.length) { | ||
return this.push(null) | ||
}; | ||
return this.push(null); | ||
} | ||
return this.push(this.data[this.i++]); | ||
@@ -121,17 +122,13 @@ } | ||
it('should finish after piping from a readablestream', () => { | ||
const sourceStream = new SourceStream(); | ||
const mapStream = new MapStream((n: number) => n*2); | ||
const mapStream = new MapStream((n: number) => n * 2); | ||
const reduceStream = new ReduceStream((s, n) => s + n.toString(), ''); | ||
sourceStream.pipe(mapStream).pipe(reduceStream); | ||
return reduceStream | ||
.then( (result) => { | ||
return reduceStream.then(result => { | ||
assert.deepStrictEqual(result, '246'); | ||
}); | ||
}) | ||
}); | ||
it('should finish when awaited with typescript', async () => { | ||
const sourceStream = new SourceStream(); | ||
@@ -143,8 +140,6 @@ const reduceStream = new ReduceStream((s, n) => s + n.toString(), ''); | ||
assert.deepStrictEqual(result, '123'); | ||
}); | ||
it('should throw when awaited with typescript', async () => { | ||
const arr = ['a', 'b'] | ||
const arr = ['a', 'b']; | ||
const sourceStream = new SourceStream(); | ||
@@ -161,5 +156,4 @@ const reduceStream = new ReduceStream((s, n: number) => s + arr[n], ''); | ||
} | ||
}); | ||
}) | ||
const jsTest = eval(`(async () => { | ||
@@ -178,3 +172,3 @@ | ||
it('should finish when awaited with javascript', jsTest); | ||
it('should finish when awaited with javascript', jsTest); | ||
@@ -203,3 +197,3 @@ function wait(ms: number) { | ||
r = await reduceStream; | ||
assert.throws(() => r, 'expected'); | ||
assert.throws(() => r, Error, 'expected'); | ||
} catch (e) { | ||
@@ -209,8 +203,13 @@ if (e instanceof assert.AssertionError) { | ||
} | ||
assert.throws(() => { throw e }, 'expected'); | ||
assert.throws( | ||
() => { | ||
throw e; | ||
}, | ||
Error, | ||
'expected' | ||
); | ||
} | ||
}); | ||
it('should promise catch when an async reduce function throws an error', async() => { | ||
it('should promise catch when an async reduce function throws an error', async () => { | ||
const sourceStream = new SourceStream(); | ||
@@ -228,18 +227,18 @@ const reduceStream = new ReduceStream(async (s, n: number) => { | ||
return reduceStream | ||
.then((r: string) => { | ||
return reduceStream.then( | ||
(r: string) => { | ||
assert.throws(() => r, 'expected'); | ||
}, (e: Error) => { | ||
}, | ||
(e: Error) => { | ||
//console.error(e); | ||
//assert.throws(() => { throw e }, 'expected'); | ||
}) | ||
} | ||
); | ||
}); | ||
}); | ||
}) | ||
}) | ||
describe('BatchStream', () => { | ||
it('should batch correctly', () => { | ||
const source = [0,1,2,3,4,5,6]; | ||
const sourceStream = new PassThrough({objectMode: true}); | ||
const source = [0, 1, 2, 3, 4, 5, 6]; | ||
const sourceStream = new PassThrough({ objectMode: true }); | ||
const dest: string[] = []; | ||
@@ -251,6 +250,7 @@ | ||
batchStream.on('readable', () => { | ||
const result = batchStream.read(); | ||
if (result === null) return; | ||
let result; | ||
dest.push(result); | ||
while ((result = batchStream.read()) !== null) { | ||
dest.push(result); | ||
} | ||
}); | ||
@@ -263,7 +263,6 @@ | ||
batchStream.on('end', resolve); | ||
}) | ||
.then( () => { | ||
assert.deepStrictEqual(dest, [[0,1,2],[3,4,5],[6]]); | ||
}).then(() => { | ||
assert.deepStrictEqual(dest, [[0, 1, 2], [3, 4, 5], [6]]); | ||
}); | ||
}) | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25271
11
744