@graphy/core.iso.stream
Advanced tools
Comparing version 3.0.6 to 3.1.0
472
main.js
@@ -14,13 +14,130 @@ const stream = require('stream'); | ||
until(s_event) { | ||
until(s_event, b_return_stream) { | ||
return new Promise((fk_until, fe_until) => { | ||
this.on(s_event, fk_until); | ||
// convert error to rejected promise | ||
this.on('error', (e_stream) => { | ||
fe_until(e_stream); | ||
}); | ||
// special cases returns `this` | ||
if(b_return_stream) { | ||
this.once(s_event, (...a_args) => { | ||
fk_until(this, ...a_args); | ||
}); | ||
} | ||
else { | ||
this.once(s_event, fk_until); | ||
} | ||
}); | ||
} | ||
bucket(s_encoding='utf8') { | ||
let g_readable = this._readableState; | ||
// object mode | ||
if(g_readable.objectMode) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let a_data = []; | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
write(w_event, s_write_encoding, fk_write) { | ||
a_data.push(w_event); | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
a_data.push(...a_chunks); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(a_data); | ||
}); | ||
}); | ||
} | ||
// utf8-encoded strings | ||
else if('utf8' === s_encoding || 'utf-8' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let s_data = ''; | ||
// set encoding | ||
this.setEncoding(s_encoding); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: false, | ||
write(s_chunk, s_write_encoding, fk_write) { | ||
s_data += s_chunk; | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
s_data += a_chunks.join(''); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(s_data); | ||
}); | ||
}); | ||
} | ||
// buffer | ||
else if('buffer' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let ab_data = Buffer.from([]); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: true, | ||
write(ab_chunk, s_write_encoding, fk_write) { | ||
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length); | ||
fk_write(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(ab_data); | ||
}); | ||
}); | ||
} | ||
} | ||
} | ||
class Writable extends stream.Writable { | ||
until(s_event) { | ||
until(s_event, b_return_stream) { | ||
return new Promise((fk_until, fe_until) => { | ||
this.on(s_event, fk_until); | ||
// convert error to rejected promise | ||
this.on('error', (e_stream) => { | ||
fe_until(e_stream); | ||
}); | ||
// special cases returns `this` | ||
if(b_return_stream) { | ||
this.once(s_event, (...a_args) => { | ||
fk_until(this, ...a_args); | ||
}); | ||
} | ||
else { | ||
this.once(s_event, fk_until); | ||
} | ||
}); | ||
@@ -30,10 +147,326 @@ } | ||
class Duplex extends stream.Duplex { | ||
until(s_event, b_return_stream) { | ||
return new Promise((fk_until, fe_until) => { | ||
// convert error to rejected promise | ||
this.on('error', (e_stream) => { | ||
fe_until(e_stream); | ||
}); | ||
// special cases returns `this` | ||
if(b_return_stream) { | ||
this.once(s_event, (...a_args) => { | ||
fk_until(this, ...a_args); | ||
}); | ||
} | ||
else { | ||
this.once(s_event, fk_until); | ||
} | ||
}); | ||
} | ||
bucket(s_encoding='utf8') { | ||
let g_readable = this._readableState; | ||
// object mode | ||
if(g_readable.objectMode) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let a_data = []; | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
write(w_event, s_write_encoding, fk_write) { | ||
a_data.push(w_event); | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
a_data.push(...a_chunks); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(a_data); | ||
}); | ||
}); | ||
} | ||
// utf8-encoded strings | ||
else if('utf8' === s_encoding || 'utf-8' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let s_data = ''; | ||
// set encoding | ||
this.setEncoding(s_encoding); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: false, | ||
write(s_chunk, s_write_encoding, fk_write) { | ||
s_data += s_chunk; | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
s_data += a_chunks.join(''); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(s_data); | ||
}); | ||
}); | ||
} | ||
// buffer | ||
else if('buffer' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let ab_data = Buffer.from([]); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: true, | ||
write(ab_chunk, s_write_encoding, fk_write) { | ||
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length); | ||
fk_write(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(ab_data); | ||
}); | ||
}); | ||
} | ||
} | ||
} | ||
class Transform extends stream.Transform { | ||
until(s_event) { | ||
until(s_event, b_return_stream) { | ||
return new Promise((fk_until, fe_until) => { | ||
this.on(s_event, fk_until); | ||
// convert error to rejected promise | ||
this.on('error', (e_stream) => { | ||
fe_until(e_stream); | ||
}); | ||
// special cases returns `this` | ||
if(b_return_stream) { | ||
this.once(s_event, (...a_args) => { | ||
fk_until(this, ...a_args); | ||
}); | ||
} | ||
else { | ||
this.once(s_event, fk_until); | ||
} | ||
}); | ||
} | ||
bucket(s_encoding='utf8') { | ||
let g_readable = this._readableState; | ||
// object mode | ||
if(g_readable.objectMode) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let a_data = []; | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
write(w_event, s_write_encoding, fk_write) { | ||
a_data.push(w_event); | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
a_data.push(...a_chunks); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(a_data); | ||
}); | ||
}); | ||
} | ||
// utf8-encoded strings | ||
else if('utf8' === s_encoding || 'utf-8' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let s_data = ''; | ||
// set encoding | ||
this.setEncoding(s_encoding); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: false, | ||
write(s_chunk, s_write_encoding, fk_write) { | ||
s_data += s_chunk; | ||
fk_write(); | ||
}, | ||
writev(a_chunks, fk_writev) { | ||
s_data += a_chunks.join(''); | ||
fk_writev(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(s_data); | ||
}); | ||
}); | ||
} | ||
// buffer | ||
else if('buffer' === s_encoding) { | ||
// async operation | ||
return new Promise((fk_bucket, fe_bucket) => { | ||
let ab_data = Buffer.from([]); | ||
// pipe to writable | ||
this.pipe(new stream.Writable({ | ||
decodeStrings: true, | ||
write(ab_chunk, s_write_encoding, fk_write) { | ||
ab_data = Buffer.concat([ab_data, ab_chunk], ab_data.length+ab_chunk.length); | ||
fk_write(); | ||
}, | ||
})) | ||
// error | ||
.on('error', (e_stream) => { | ||
fe_bucket(e_stream); | ||
}) | ||
// wait for it to finish | ||
.on('finish', () => { | ||
fk_bucket(ab_data); | ||
}); | ||
}); | ||
} | ||
} | ||
demolish(e_destroy) { | ||
// do not allow to push | ||
this.push = (z_chunk) => { | ||
// ignore eof signals from node core | ||
if(null === z_chunk) return; | ||
// anything else is bad | ||
throw new Error(`[ERR_STREAM_DESTROYED]: Cannot push after stream was destroyed`); | ||
}; | ||
// do not allow to emit 'end' | ||
this.emit = function(s_event, ...a_args) { | ||
if('end' === s_event) return; | ||
Object.getPrototypeOf(this).emit.apply(this, [s_event, ...a_args]); | ||
}; | ||
// an error was given, destroy the stream as well | ||
if(e_destroy) { | ||
return this.destroy(e_destroy); | ||
} | ||
} | ||
} | ||
// eslint-disable-next-line no-new-func | ||
const b_is_node = (new Function(/* syntax: js */ `try {return this===global;}catch(e){return false;}`))(); | ||
// node.js | ||
if(b_is_node) { | ||
// patch for node < v10 | ||
if((+(/^v(\d+)/.exec(process.version)[1])) < 10) { | ||
// override destroy methods | ||
Transform.prototype.destroy = Duplex.prototype.destroy = function(e_destroy, fke_destroy) { | ||
this._readableState.destroyed = true; | ||
this._writableState.destroyed = true; | ||
let f_emit_close = () => { | ||
if(!this._writableState.emitClose) return; | ||
if(!this._readableState.emitClose) return; | ||
this.emit('close'); | ||
}; | ||
this._destroy(e_destroy || null, (e_destroy_re) => { | ||
if(!fke_destroy && e_destroy_re) { | ||
process.nextTick(() => { | ||
this.emit('error', e_destroy_re); | ||
f_emit_close(); | ||
}); | ||
this._writableState.errorEmitted = true; | ||
} | ||
else { | ||
process.nextTick(f_emit_close); | ||
if(fke_destroy) fke_destroy(e_destroy_re); | ||
} | ||
}); | ||
return this; | ||
}; | ||
// override default _destroy implementations | ||
Transform.prototype._destroy = Duplex.prototype._destroy = (e_destroy, fke_destroy) => fke_destroy(e_destroy); | ||
} | ||
} | ||
class Quads_To_Other extends Transform { | ||
constructor() { | ||
super({ | ||
writableObjectMode: true, | ||
readableObjectMode: true, | ||
}); | ||
// forward prefix and comment events | ||
this.on('pipe', (ds_src) => { | ||
ds_src | ||
.on('prefix', (...a_args) => { | ||
this.emit('prefix', ...a_args); | ||
}) | ||
.on('comment', (...a_args) => { | ||
this.emit('comment', ...a_args); | ||
}); | ||
}); | ||
} | ||
} | ||
class Quads_To_JSON_Transform extends Quads_To_Other { | ||
// serializse json | ||
_transform(g_quad, s_encoding, fk_transform) { | ||
fk_transform(null, JSON.stringify(g_quad.isolate())+'\n'); | ||
} | ||
} | ||
class Quads_To_Writable extends Quads_To_Other { | ||
_transform(g_quad, s_encoding, fk_transform) { | ||
fk_transform(null, { | ||
type: 'quad', | ||
value: g_quad, | ||
}); | ||
} | ||
} | ||
module.exports = { | ||
@@ -43,3 +476,30 @@ ...stream, | ||
Writable, | ||
Duplex, | ||
Transform, | ||
// create a transform from quad objects into JSON strings for trivial serialization | ||
quads_to_json() { | ||
return new Quads_To_JSON_Transform(); | ||
}, | ||
// create a transform from quad objects into writable data events | ||
quads_to_writable() { | ||
return new Quads_To_Writable(); | ||
}, | ||
// create a simple, single-event readable stream | ||
source(w_push, s_encoding=null) { | ||
// encoding not explicit, string given; assume utf8 | ||
if(!s_encoding && 'string' === typeof w_push) s_encoding = 'utf8'; | ||
// readable | ||
return new Readable({ | ||
objectMode: !s_encoding && 'string' !== typeof w_push && !Buffer.isBuffer(w_push), | ||
read() { | ||
this.push(w_push, s_encoding); | ||
this.push(null); | ||
}, | ||
}); | ||
}, | ||
}; |
{ | ||
"name": "@graphy/core.iso.stream", | ||
"version": "3.0.6", | ||
"version": "3.1.0", | ||
"description": "Provides isomorphic stream interface for node.js / browser and adds `.until`, a promisified version of the `.on` event listener", | ||
@@ -25,3 +25,6 @@ "keywords": [ | ||
"main": "main.js", | ||
"dependencies": {} | ||
} | ||
"dependencies": {}, | ||
"engines": { | ||
"node": ">=8.4.0" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses dynamic code execution (e.g., eval()), which is a dangerous practice. This can prevent the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12000
439
2