functional-streams
Advanced tools
Comparing version 1.0.4 to 1.0.5
@@ -23,6 +23,8 @@ /// <reference types="node" /> | ||
private _cumulative; | ||
private _promise; | ||
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>, begin?: R); | ||
_write(data: T, encoding: string, callback: (error?: any, result?: any) => void): void; | ||
then<T>(f: (r: R) => T | Promise<T>): Promise<T>; | ||
then<T>(f: (r: R) => T | Promise<T>, e?: (e: Error) => any): Promise<any>; | ||
catch(e?: (e: Error) => any): Promise<any>; | ||
} | ||
export { BatchTransform as Batch, MapTransform as Map, FilterTransform as Filter, ReduceTransform as Reduce }; |
@@ -79,2 +79,3 @@ "use strict"; | ||
exports.Filter = FilterTransform; | ||
const Deferred = require("promise-native-deferred"); | ||
class ReduceTransform extends stream_1.Writable { | ||
@@ -85,2 +86,9 @@ constructor(reduceFunction, begin) { | ||
this._cumulative = begin; | ||
this._promise = new Deferred(); | ||
this.on('finish', () => { | ||
this._promise.resolve(this._cumulative); | ||
}); | ||
this.on('error', (e) => { | ||
this._promise.reject(e); | ||
}); | ||
} | ||
@@ -100,11 +108,11 @@ _write(data, encoding, callback) { | ||
} | ||
then(f) { | ||
return new Promise((resolve, reject) => { | ||
this.on('finish', () => { | ||
resolve(this._cumulative); | ||
}); | ||
}) | ||
.then(f); | ||
then(f, e) { | ||
return this._promise.promise | ||
.then(f, e); | ||
} | ||
catch(e) { | ||
return this._promise.promise | ||
.catch(e); | ||
} | ||
} | ||
exports.Reduce = ReduceTransform; |
@@ -114,2 +114,17 @@ "use strict"; | ||
})); | ||
it('should throw when awaited with typescript', () => __awaiter(this, void 0, void 0, function* () { | ||
const arr = ['a', 'b']; | ||
const sourceStream = new SourceStream(); | ||
const reduceStream = new streams_1.Reduce((s, n) => s + arr[n], ''); | ||
sourceStream.pipe(reduceStream); | ||
try { | ||
const result = yield reduceStream; | ||
assert.fail('no error', 'error', 'should have thrown', '=='); | ||
} | ||
catch (e) { | ||
// all good; | ||
console.log('caught'); | ||
return; | ||
} | ||
})); | ||
const jsTest = eval(`(async () => { | ||
@@ -116,0 +131,0 @@ |
{ | ||
"name": "functional-streams", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "Native implementations of map, reduce, and batch, to act on streams.", | ||
@@ -35,3 +35,6 @@ "main": "build/streams.js", | ||
"typescript": "^2.4.1" | ||
}, | ||
"dependencies": { | ||
"promise-native-deferred": "^3.0.0" | ||
} | ||
} |
@@ -94,13 +94,24 @@ import {Transform, Readable, Writable} from 'stream'; | ||
import Deferred = require('promise-native-deferred'); | ||
class ReduceTransform<T, R> extends Writable implements PromiseLike<R> { | ||
private _reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>; | ||
private _cumulative: R | undefined; | ||
private _cumulative: R; | ||
private _promise: Deferred<R>; | ||
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>, begin?: R) { | ||
super({objectMode: true}); | ||
this._reduceFunction = reduceFunction; | ||
this._cumulative = begin; | ||
this._cumulative = begin!; | ||
this._promise = new Deferred<R>(); | ||
this.on('finish', () => { | ||
this._promise.resolve(this._cumulative); | ||
}); | ||
this.on('error', (e) => { | ||
this._promise.reject(e); | ||
}) | ||
@@ -124,11 +135,12 @@ } | ||
then<T>(f: (r: R) => T | Promise<T>) { | ||
return new Promise((resolve, reject) => { | ||
this.on('finish', () => { | ||
resolve(this._cumulative); | ||
}) | ||
}) | ||
.then(f); | ||
then<T>(f: (r: R) => T | Promise<T>, e?: (e: Error) => any) { | ||
return this._promise.promise | ||
.then(f, e); | ||
} | ||
catch(e?: (e: Error) => any) { | ||
return this._promise.promise | ||
.catch(e); | ||
} | ||
} | ||
@@ -135,0 +147,0 @@ |
@@ -136,2 +136,19 @@ ///<reference types="mocha" /> | ||
}); | ||
it('should throw when awaited with typescript', async () => { | ||
const arr = ['a', 'b'] | ||
const sourceStream = new SourceStream(); | ||
const reduceStream = new ReduceStream((s, n: number) => s + arr[n], ''); | ||
sourceStream.pipe(reduceStream); | ||
try { | ||
const result = await reduceStream; | ||
assert.fail('no error', 'error', 'should have thrown', '=='); | ||
} catch (e) { | ||
// all good; | ||
return; | ||
} | ||
}) | ||
@@ -138,0 +155,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21632
10
587
1
+ Addedpromise-native-deferred@3.0.0(transitive)