Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@graphy/core.iso.stream

Package Overview
Dependencies
Maintainers
2
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@graphy/core.iso.stream - npm Package Compare versions

Comparing version 3.0.6 to 3.1.0

472

main.js

@@ -14,13 +14,130 @@ const stream = require('stream');

until(s_event) {
until(s_event, b_return_stream) {
return new Promise((fk_until, fe_until) => {
this.on(s_event, fk_until);
// convert error to rejected promise
this.on('error', (e_stream) => {
fe_until(e_stream);
});
// special cases returns `this`
if(b_return_stream) {
this.once(s_event, (...a_args) => {
fk_until(this, ...a_args);
});
}
else {
this.once(s_event, fk_until);
}
});
}
bucket(s_encoding='utf8') {
let g_readable = this._readableState;
// object mode
if(g_readable.objectMode) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let a_data = [];
// pipe to writable
this.pipe(new stream.Writable({
write(w_event, s_write_encoding, fk_write) {
a_data.push(w_event);
fk_write();
},
writev(a_chunks, fk_writev) {
a_data.push(...a_chunks);
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(a_data);
});
});
}
// utf8-encoded strings
else if('utf8' === s_encoding || 'utf-8' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let s_data = '';
// set encoding
this.setEncoding(s_encoding);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: false,
write(s_chunk, s_write_encoding, fk_write) {
s_data += s_chunk;
fk_write();
},
writev(a_chunks, fk_writev) {
s_data += a_chunks.join('');
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(s_data);
});
});
}
// buffer
else if('buffer' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let ab_data = Buffer.from([]);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: true,
write(ab_chunk, s_write_encoding, fk_write) {
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length);
fk_write();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(ab_data);
});
});
}
}
}
class Writable extends stream.Writable {
until(s_event) {
until(s_event, b_return_stream) {
return new Promise((fk_until, fe_until) => {
this.on(s_event, fk_until);
// convert error to rejected promise
this.on('error', (e_stream) => {
fe_until(e_stream);
});
// special cases returns `this`
if(b_return_stream) {
this.once(s_event, (...a_args) => {
fk_until(this, ...a_args);
});
}
else {
this.once(s_event, fk_until);
}
});

@@ -30,10 +147,326 @@ }

class Duplex extends stream.Duplex {
until(s_event, b_return_stream) {
return new Promise((fk_until, fe_until) => {
// convert error to rejected promise
this.on('error', (e_stream) => {
fe_until(e_stream);
});
// special cases returns `this`
if(b_return_stream) {
this.once(s_event, (...a_args) => {
fk_until(this, ...a_args);
});
}
else {
this.once(s_event, fk_until);
}
});
}
bucket(s_encoding='utf8') {
let g_readable = this._readableState;
// object mode
if(g_readable.objectMode) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let a_data = [];
// pipe to writable
this.pipe(new stream.Writable({
write(w_event, s_write_encoding, fk_write) {
a_data.push(w_event);
fk_write();
},
writev(a_chunks, fk_writev) {
a_data.push(...a_chunks);
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(a_data);
});
});
}
// utf8-encoded strings
else if('utf8' === s_encoding || 'utf-8' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let s_data = '';
// set encoding
this.setEncoding(s_encoding);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: false,
write(s_chunk, s_write_encoding, fk_write) {
s_data += s_chunk;
fk_write();
},
writev(a_chunks, fk_writev) {
s_data += a_chunks.join('');
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(s_data);
});
});
}
// buffer
else if('buffer' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let ab_data = Buffer.from([]);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: true,
write(ab_chunk, s_write_encoding, fk_write) {
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length);
fk_write();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(ab_data);
});
});
}
}
}
class Transform extends stream.Transform {
until(s_event) {
until(s_event, b_return_stream) {
return new Promise((fk_until, fe_until) => {
this.on(s_event, fk_until);
// convert error to rejected promise
this.on('error', (e_stream) => {
fe_until(e_stream);
});
// special cases returns `this`
if(b_return_stream) {
this.once(s_event, (...a_args) => {
fk_until(this, ...a_args);
});
}
else {
this.once(s_event, fk_until);
}
});
}
bucket(s_encoding='utf8') {
let g_readable = this._readableState;
// object mode
if(g_readable.objectMode) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let a_data = [];
// pipe to writable
this.pipe(new stream.Writable({
write(w_event, s_write_encoding, fk_write) {
a_data.push(w_event);
fk_write();
},
writev(a_chunks, fk_writev) {
a_data.push(...a_chunks);
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(a_data);
});
});
}
// utf8-encoded strings
else if('utf8' === s_encoding || 'utf-8' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let s_data = '';
// set encoding
this.setEncoding(s_encoding);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: false,
write(s_chunk, s_write_encoding, fk_write) {
s_data += s_chunk;
fk_write();
},
writev(a_chunks, fk_writev) {
s_data += a_chunks.join('');
fk_writev();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(s_data);
});
});
}
// buffer
else if('buffer' === s_encoding) {
// async operation
return new Promise((fk_bucket, fe_bucket) => {
let ab_data = Buffer.from([]);
// pipe to writable
this.pipe(new stream.Writable({
decodeStrings: true,
write(ab_chunk, s_write_encoding, fk_write) {
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length);
fk_write();
},
}))
// error
.on('error', (e_stream) => {
fe_bucket(e_stream);
})
// wait for it to finish
.on('finish', () => {
fk_bucket(ab_data);
});
});
}
}
demolish(e_destroy) {
// do not allow to push
this.push = (z_chunk) => {
// ignore eof signals from node core
if(null === z_chunk) return;
// anything else is bad
throw new Error(`[ERR_STREAM_DESTROYED]: Cannot push after stream was destroyed`);
};
// do not allow to emit 'end'
this.emit = function(s_event, ...a_args) {
if('end' === s_event) return;
Object.getPrototypeOf(this).emit.apply(this, [s_event, ...a_args]);
};
// an error was given, destroy the stream as well
if(e_destroy) {
return this.destroy(e_destroy);
}
}
}
// eslint-disable-next-line no-new-func
const b_is_node = (new Function(/* syntax: js */ `try {return this===global;}catch(e){return false;}`))();
// node.js
if(b_is_node) {
// patch for node < v10
if((+(/^v(\d+)/.exec(process.version)[1])) < 10) {
// override destroy methods
Transform.prototype.destroy = Duplex.prototype.destroy = function(e_destroy, fke_destroy) {
this._readableState.destroyed = true;
this._writableState.destroyed = true;
let f_emit_close = () => {
if(!this._writableState.emitClose) return;
if(!this._readableState.emitClose) return;
this.emit('close');
};
this._destroy(e_destroy || null, (e_destroy_re) => {
if(!fke_destroy && e_destroy_re) {
process.nextTick(() => {
this.emit('error', e_destroy_re);
f_emit_close();
});
this._writableState.errorEmitted = true;
}
else {
process.nextTick(f_emit_close);
if(fke_destroy) fke_destroy(e_destroy_re);
}
});
return this;
};
// override default _destroy implementations
Transform.prototype._destroy = Duplex.prototype._destroy = (e_destroy, fke_destroy) => fke_destroy(e_destroy);
}
}
class Quads_To_Other extends Transform {
constructor() {
super({
writableObjectMode: true,
readableObjectMode: true,
});
// forward prefix and comment events
this.on('pipe', (ds_src) => {
ds_src
.on('prefix', (...a_args) => {
this.emit('prefix', ...a_args);
})
.on('comment', (...a_args) => {
this.emit('comment', ...a_args);
});
});
}
}
class Quads_To_JSON_Transform extends Quads_To_Other {
// serializse json
_transform(g_quad, s_encoding, fk_transform) {
fk_transform(null, JSON.stringify(g_quad.isolate())+'\n');
}
}
class Quads_To_Writable extends Quads_To_Other {
_transform(g_quad, s_encoding, fk_transform) {
fk_transform(null, {
type: 'quad',
value: g_quad,
});
}
}
module.exports = {

@@ -43,3 +476,30 @@ ...stream,

Writable,
Duplex,
Transform,
// create a transform from quad objects into JSON strings for trivial serialization
quads_to_json() {
return new Quads_To_JSON_Transform();
},
// create a transform from quad objects into writable data events
quads_to_writable() {
return new Quads_To_Writable();
},
// create a simple, single-event readable stream
source(w_push, s_encoding=null) {
// encoding not explicit, string given; assume utf8
if(!s_encoding && 'string' === typeof w_push) s_encoding = 'utf8';
// readable
return new Readable({
objectMode: !s_encoding && 'string' !== typeof w_push && !Buffer.isBuffer(w_push),
read() {
this.push(w_push, s_encoding);
this.push(null);
},
});
},
};

9

package.json
{
"name": "@graphy/core.iso.stream",
"version": "3.0.6",
"version": "3.1.0",
"description": "Provides isomorphic stream interface for node.js / browser and adds `.until`, a promisified version of the `.on` event listener",

@@ -25,3 +25,6 @@ "keywords": [

"main": "main.js",
"dependencies": {}
}
"dependencies": {},
"engines": {
"node": ">=8.4.0"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc