Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

functional-streams

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

functional-streams - npm Package Compare versions

Comparing version 1.0.10 to 1.0.11

.prettierrc

6

build/streams.d.ts

@@ -11,4 +11,4 @@ /// <reference types="node" />

declare class MapTransform<T, U> extends Transform {
_mapFunction: (t: T) => (U | Promise<U>);
constructor(mapFunction: (t: T) => (U | Promise<U>));
_mapFunction: (t: T) => U | Promise<U>;
constructor(mapFunction: (t: T) => U | Promise<U>);
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void): void;

@@ -18,3 +18,3 @@ }

_filterFunction: (t: T) => boolean | Promise<boolean>;
constructor(filterFunction: (t: T) => (boolean | Promise<boolean>));
constructor(filterFunction: (t: T) => boolean | Promise<boolean>);
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void): void;

@@ -21,0 +21,0 @@ }

@@ -13,3 +13,3 @@ "use strict";

if (this._buffer.length === this._batchSize) {
this.push((this._buffer));
this.push(this._buffer);
this._buffer = [];

@@ -21,3 +21,3 @@ }

if (this._buffer.length > 0) {
this.push((this._buffer));
this.push(this._buffer);
}

@@ -29,3 +29,3 @@ callback();

function isPromise(t) {
return (t && typeof t.then === 'function');
return t && typeof t.then === 'function';
}

@@ -43,4 +43,4 @@ class MapTransform extends stream_1.Transform {

if (isPromise(result)) {
result.then((syncResult) => {
this.push((syncResult));
result.then(syncResult => {
this.push(syncResult);
callback();

@@ -50,3 +50,3 @@ });

else {
this.push((result));
this.push(result);
callback();

@@ -74,3 +74,3 @@ }

if (isPromise(result)) {
result.then((syncResult) => {
result.then(syncResult => {
filter(syncResult);

@@ -98,3 +98,3 @@ });

});
this.on('error', (e) => {
this.on('error', e => {
this._promise.reject(e);

@@ -106,6 +106,6 @@ });

if (isPromise(result)) {
result.then((r) => {
result.then(r => {
this._cumulative = r;
callback();
}, (e) => {
}, e => {
this._errored = true;

@@ -121,10 +121,8 @@ callback(e);

then(f, e) {
return this._promise.promise
.then(f, e);
return this._promise.promise.then(f, e);
}
catch(e) {
return this._promise.promise
.catch(e);
return this._promise.promise.catch(e);
}
}
exports.Reduce = ReduceTransform;

@@ -26,6 +26,6 @@ "use strict";

mapStream.on('readable', () => {
const result = mapStream.read();
if (result === null)
return;
dest.push(result);
let result;
while ((result = mapStream.read()) !== null) {
dest.push(result);
}
});

@@ -36,4 +36,4 @@ source.forEach(sourceStream.write.bind(sourceStream));

mapStream.on('end', resolve);
})
.then(() => {
mapStream.on('error', reject);
}).then(() => {
assert.deepStrictEqual(dest, source.map(myMap));

@@ -54,6 +54,6 @@ });

filterStream.on('readable', () => {
const result = filterStream.read();
if (result === null)
return;
dest.push(result);
let result;
while ((result = filterStream.read()) !== null) {
dest.push(result);
}
});

@@ -64,4 +64,3 @@ source.forEach(sourceStream.write.bind(sourceStream));

filterStream.on('end', resolve);
})
.then(() => {
}).then(() => {
assert.deepStrictEqual(dest, source.filter(myFilter));

@@ -82,4 +81,3 @@ });

sourceStream.end();
return reduceStream
.then((result) => {
return reduceStream.then(result => {
assert.deepStrictEqual(result, source.reduce(myReduce, ''));

@@ -98,3 +96,2 @@ });

}
;
return this.push(this.data[this.i++]);

@@ -108,4 +105,3 @@ }

sourceStream.pipe(mapStream).pipe(reduceStream);
return reduceStream
.then((result) => {
return reduceStream.then(result => {
assert.deepStrictEqual(result, '246');

@@ -167,3 +163,3 @@ });

r = yield reduceStream;
assert.throws(() => r, 'expected');
assert.throws(() => r, Error, 'expected');
}

@@ -174,3 +170,5 @@ catch (e) {

}
assert.throws(() => { throw e; }, 'expected');
assert.throws(() => {
throw e;
}, Error, 'expected');
}

@@ -189,4 +187,3 @@ }));

sourceStream.pipe(reduceStream);
return reduceStream
.then((r) => {
return reduceStream.then((r) => {
assert.throws(() => r, 'expected');

@@ -207,6 +204,6 @@ }, (e) => {

batchStream.on('readable', () => {
const result = batchStream.read();
if (result === null)
return;
dest.push(result);
let result;
while ((result = batchStream.read()) !== null) {
dest.push(result);
}
});

@@ -217,4 +214,3 @@ source.forEach(sourceStream.write.bind(sourceStream));

batchStream.on('end', resolve);
})
.then(() => {
}).then(() => {
assert.deepStrictEqual(dest, [[0, 1, 2], [3, 4, 5], [6]]);

@@ -221,0 +217,0 @@ });

{
"name": "functional-streams",
"version": "1.0.10",
"version": "1.0.11",
"description": "Native implementations of map, reduce, and batch, to act on streams.",

@@ -5,0 +5,0 @@ "main": "build/streams.js",

@@ -10,2 +10,2 @@ declare module 'promise-native-deferred' {

export = Deferred;
}
}

@@ -1,5 +0,4 @@

import {Transform, Readable, Writable} from 'stream';
import { Transform, Readable, Writable } from 'stream';
class BatchTransform<T> extends Transform {
_batchSize: number;

@@ -9,3 +8,3 @@ _buffer: T[];

constructor(batchSize: number) {
super({objectMode: true});
super({ objectMode: true });
this._batchSize = batchSize;

@@ -15,7 +14,11 @@ this._buffer = [];

_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void) {
_transform(
data: T,
encoding: string,
callback: (error?: any, result?: any) => void
) {
this._buffer.push(data);
if (this._buffer.length === this._batchSize) {
this.push((this._buffer));
this.push(this._buffer);
this._buffer = [];

@@ -27,5 +30,5 @@ }

_flush(callback: (error?: any, result?: any) => void ) {
_flush(callback: (error?: any, result?: any) => void) {
if (this._buffer.length > 0) {
this.push((this._buffer));
this.push(this._buffer);
}

@@ -38,11 +41,10 @@

function isPromise<T>(t: any): t is Promise<T> {
return (t && typeof t.then === 'function');
return t && typeof t.then === 'function';
}
class MapTransform<T, U> extends Transform {
_mapFunction: (t: T) => (U | Promise<U>)
constructor(mapFunction : (t: T) => (U | Promise<U>) ) {
super({objectMode: true});
class MapTransform<T, U> extends Transform {
_mapFunction: (t: T) => U | Promise<U>;
constructor(mapFunction: (t: T) => U | Promise<U>) {
super({ objectMode: true });
if (typeof mapFunction !== 'function') {

@@ -53,11 +55,15 @@ throw new TypeError('mapFunction must be a function');

}
_transform(data: T , encoding: string , callback: (error?: any, result?: any) => void) {
const result = this._mapFunction(data) ;
_transform(
data: T,
encoding: string,
callback: (error?: any, result?: any) => void
) {
const result = this._mapFunction(data);
if (isPromise(result)) {
result.then( (syncResult) => {
this.push((syncResult));
result.then(syncResult => {
this.push(syncResult);
callback();
});
} else {
this.push((result));
this.push(result);
callback();

@@ -69,7 +75,6 @@ }

class FilterTransform<T> extends Transform {
_filterFunction: (t: T) => boolean | Promise<boolean>
constructor(filterFunction: (t:T) => (boolean | Promise<boolean>) ) {
super({objectMode: true});
_filterFunction: (t: T) => boolean | Promise<boolean>;
constructor(filterFunction: (t: T) => boolean | Promise<boolean>) {
super({ objectMode: true });
if (typeof filterFunction !== 'function') {

@@ -80,3 +85,7 @@ throw new TypeError('filterFunction must be a function');

}
_transform(data: T, encoding: string, callback: (error?: any, result?: any) => void) {
_transform(
data: T,
encoding: string,
callback: (error?: any, result?: any) => void
) {
const result = this._filterFunction(data);

@@ -90,5 +99,5 @@

};
if (isPromise(result)) {
result.then( (syncResult) => {
result.then(syncResult => {
filter(syncResult);

@@ -105,4 +114,6 @@ });

class ReduceTransform<T, R> extends Writable implements PromiseLike<R> {
private _reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>;
private _reduceFunction: (
cumulative: R | undefined,
newItem: T
) => R | Promise<R>;
private _cumulative: R;

@@ -112,6 +123,20 @@ private _errored: boolean = false;

constructor(reduceFunction: (cumulative: R, newItem: T) => R | Promise<R>, begin: R);
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>);
constructor(reduceFunction: (cumulative: R | undefined, newItem: T) => R | Promise<R>, begin?: R) {
super({objectMode: true});
constructor(
reduceFunction: (cumulative: R, newItem: T) => R | Promise<R>,
begin: R
);
constructor(
reduceFunction: (
cumulative: R | undefined,
newItem: T
) => R | Promise<R>
);
constructor(
reduceFunction: (
cumulative: R | undefined,
newItem: T
) => R | Promise<R>,
begin?: R
) {
super({ objectMode: true });
this._reduceFunction = reduceFunction;

@@ -127,19 +152,25 @@ this._cumulative = begin!;

});
this.on('error', (e) => {
this.on('error', e => {
this._promise.reject(e);
})
});
}
_write(data: T, encoding: string, callback: (error?: any, result?: any) => void) {
_write(
data: T,
encoding: string,
callback: (error?: any, result?: any) => void
) {
const result = this._reduceFunction(this._cumulative, data);
if (isPromise(result)) {
result.then( (r) => {
this._cumulative = r;
callback();
}, (e) => {
this._errored = true;
callback(e);
});
result.then(
r => {
this._cumulative = r;
callback();
},
e => {
this._errored = true;
callback(e);
}
);
} else {

@@ -151,19 +182,16 @@ this._cumulative = result;

then<T>(f: (r: R) => T | Promise<T>, e?: (e: Error) => any) {
return this._promise.promise
.then(f, e);
return this._promise.promise.then(f, e);
}
catch(e?: (e: Error) => any) {
return this._promise.promise
.catch(e);
return this._promise.promise.catch(e);
}
}
export {BatchTransform as Batch, MapTransform as Map, FilterTransform as Filter, ReduceTransform as Reduce};
export {
BatchTransform as Batch,
MapTransform as Map,
FilterTransform as Filter,
ReduceTransform as Reduce
};

@@ -5,9 +5,14 @@ ///<reference types="mocha" />

import {PassThrough, Readable} from 'stream';
import {Map as MapStream, Filter as FilterStream, Batch, Reduce as ReduceStream} from './streams';
import { PassThrough, Readable } from 'stream';
import {
Map as MapStream,
Filter as FilterStream,
Batch,
Reduce as ReduceStream
} from './streams';
describe('MapStream', () => {
it('should function as a normal map', () => {
const source = [0,1,2,3,4,5];
const sourceStream = new PassThrough({objectMode: true});
const source = [0, 1, 2, 3, 4, 5];
const sourceStream = new PassThrough({ objectMode: true });
const dest: string[] = [];

@@ -23,6 +28,7 @@

mapStream.on('readable', () => {
const result = mapStream.read();
if (result === null) return;
let result;
dest.push(result);
while ((result = mapStream.read()) !== null) {
dest.push(result);
}
});

@@ -35,14 +41,13 @@

mapStream.on('end', resolve);
})
.then( () => {
mapStream.on('error', reject);
}).then(() => {
assert.deepStrictEqual(dest, source.map(myMap));
});
});
});
})
describe('FilterStream', () => {
it('should function as a normal filter', () => {
const source = [0,1,2,3,4,5];
const sourceStream = new PassThrough({objectMode: true});
const source = [0, 1, 2, 3, 4, 5];
const sourceStream = new PassThrough({ objectMode: true });
const dest: string[] = [];

@@ -58,6 +63,7 @@

filterStream.on('readable', () => {
const result = filterStream.read();
if (result === null) return;
let result;
dest.push(result);
while ((result = filterStream.read()) !== null) {
dest.push(result);
}
});

@@ -70,13 +76,12 @@

filterStream.on('end', resolve);
})
.then( () => {
}).then(() => {
assert.deepStrictEqual(dest, source.filter(myFilter));
});
})
})
});
});
describe('ReduceStream', () => {
it('should function as a normal reduce', () => {
const source = [0,1,2,3,4,5];
const sourceStream = new PassThrough({objectMode: true});
const source = [0, 1, 2, 3, 4, 5];
const sourceStream = new PassThrough({ objectMode: true });

@@ -90,20 +95,16 @@ function myReduce(result: string, n: number): string {

source.forEach(sourceStream.write.bind(sourceStream));
sourceStream.end();
return reduceStream
.then( (result) => {
return reduceStream.then(result => {
assert.deepStrictEqual(result, source.reduce(myReduce, ''));
});
})
});
class SourceStream extends Readable {
data = [1, 2, 3]
data = [1, 2, 3];
i = 0;
constructor() {
super({objectMode: true});
super({ objectMode: true });
}

@@ -113,4 +114,4 @@

if (this.i >= this.data.length) {
return this.push(null)
};
return this.push(null);
}
return this.push(this.data[this.i++]);

@@ -121,17 +122,13 @@ }

it('should finish after piping from a readablestream', () => {
const sourceStream = new SourceStream();
const mapStream = new MapStream((n: number) => n*2);
const mapStream = new MapStream((n: number) => n * 2);
const reduceStream = new ReduceStream((s, n) => s + n.toString(), '');
sourceStream.pipe(mapStream).pipe(reduceStream);
return reduceStream
.then( (result) => {
return reduceStream.then(result => {
assert.deepStrictEqual(result, '246');
});
})
});
it('should finish when awaited with typescript', async () => {
const sourceStream = new SourceStream();

@@ -143,8 +140,6 @@ const reduceStream = new ReduceStream((s, n) => s + n.toString(), '');

assert.deepStrictEqual(result, '123');
});
it('should throw when awaited with typescript', async () => {
const arr = ['a', 'b']
const arr = ['a', 'b'];
const sourceStream = new SourceStream();

@@ -161,5 +156,4 @@ const reduceStream = new ReduceStream((s, n: number) => s + arr[n], '');

}
});
})
const jsTest = eval(`(async () => {

@@ -178,3 +172,3 @@

it('should finish when awaited with javascript', jsTest);
it('should finish when awaited with javascript', jsTest);

@@ -203,3 +197,3 @@ function wait(ms: number) {

r = await reduceStream;
assert.throws(() => r, 'expected');
assert.throws(() => r, Error, 'expected');
} catch (e) {

@@ -209,8 +203,13 @@ if (e instanceof assert.AssertionError) {

}
assert.throws(() => { throw e }, 'expected');
assert.throws(
() => {
throw e;
},
Error,
'expected'
);
}
});
it('should promise catch when an async reduce function throws an error', async() => {
it('should promise catch when an async reduce function throws an error', async () => {
const sourceStream = new SourceStream();

@@ -228,18 +227,18 @@ const reduceStream = new ReduceStream(async (s, n: number) => {

return reduceStream
.then((r: string) => {
return reduceStream.then(
(r: string) => {
assert.throws(() => r, 'expected');
}, (e: Error) => {
},
(e: Error) => {
//console.error(e);
//assert.throws(() => { throw e }, 'expected');
})
}
);
});
});
})
})
describe('BatchStream', () => {
it('should batch correctly', () => {
const source = [0,1,2,3,4,5,6];
const sourceStream = new PassThrough({objectMode: true});
const source = [0, 1, 2, 3, 4, 5, 6];
const sourceStream = new PassThrough({ objectMode: true });
const dest: string[] = [];

@@ -251,6 +250,7 @@

batchStream.on('readable', () => {
const result = batchStream.read();
if (result === null) return;
let result;
dest.push(result);
while ((result = batchStream.read()) !== null) {
dest.push(result);
}
});

@@ -263,7 +263,6 @@

batchStream.on('end', resolve);
})
.then( () => {
assert.deepStrictEqual(dest, [[0,1,2],[3,4,5],[6]]);
}).then(() => {
assert.deepStrictEqual(dest, [[0, 1, 2], [3, 4, 5], [6]]);
});
})
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc