duplexer2
Advanced tools
Comparing version 0.1.2 to 0.1.3
58
index.js
@@ -5,31 +5,13 @@ "use strict"; | ||
var Duplex = stream.Duplex; | ||
var Readable = stream.Readable; | ||
var DuplexWrapper = exports.DuplexWrapper = function DuplexWrapper(options, writable, readable) { | ||
if (readable === undefined) { | ||
function DuplexWrapper(options, writable, readable) { | ||
if (typeof readable === "undefined") { | ||
readable = writable; | ||
writable = options; | ||
options = {}; | ||
} else { | ||
options = options || {}; | ||
options = null; | ||
} | ||
Duplex.call(this, options); | ||
stream.Duplex.call(this, options); | ||
if (options.bubbleErrors === undefined) { | ||
this._bubbleErrors = true; | ||
} else { | ||
if (typeof options.bubbleErrors !== "boolean") { | ||
throw new TypeError( | ||
String(options.bubbleErrors) + | ||
" is not a Boolean value. `bubbleErrors` option of duplexer2 must be Boolean (`true` by default)." | ||
); | ||
} | ||
this._bubbleErrors = options.bubbleErrors; | ||
} | ||
this._shouldRead = false; | ||
if (typeof readable.read !== "function") { | ||
readable = (new Readable()).wrap(readable); | ||
readable = (new stream.Readable()).wrap(readable); | ||
} | ||
@@ -39,2 +21,3 @@ | ||
this._readable = readable; | ||
this._waiting = false; | ||
@@ -51,5 +34,6 @@ var self = this; | ||
readable.on("data", function(e) { | ||
if (!self.push(e)) { | ||
readable.pause(); | ||
readable.on("readable", function() { | ||
if (self._waiting) { | ||
self._waiting = false; | ||
self._read(); | ||
} | ||
@@ -59,15 +43,15 @@ }); | ||
readable.once("end", function() { | ||
return self.push(null); | ||
self.push(null); | ||
}); | ||
if (this._bubbleErrors) { | ||
if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) { | ||
writable.on("error", function(err) { | ||
return self.emit("error", err); | ||
self.emit("error", err); | ||
}); | ||
readable.on("error", function(err) { | ||
return self.emit("error", err); | ||
self.emit("error", err); | ||
}); | ||
} | ||
}; | ||
} | ||
@@ -81,3 +65,11 @@ DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}}); | ||
DuplexWrapper.prototype._read = function _read() { | ||
this._readable.resume(); | ||
var buf; | ||
var reads = 0; | ||
while ((buf = this._readable.read()) !== null) { | ||
this.push(buf); | ||
reads++; | ||
} | ||
if (reads === 0) { | ||
this._waiting = true; | ||
} | ||
}; | ||
@@ -88,1 +80,3 @@ | ||
}; | ||
module.exports.DuplexWrapper = DuplexWrapper; |
{ | ||
"name": "duplexer2", | ||
"version": "0.1.2", | ||
"version": "0.1.3", | ||
"description": "Like duplexer but using streams3", | ||
@@ -9,5 +9,3 @@ "files": [ | ||
"scripts": { | ||
"pretest": "eslint --config @shinnn/node-legacy --rule 'quotes: [2, double]' example.js --rule 'no-underscore-dangle: 0' index.js test.js", | ||
"test": "node --strong_mode --harmony_destructuring --harmony_rest_parameters --harmony_spreadcalls --throw-deprecation --trace-sync-io --track-heap-objects test.js | tap-spec", | ||
"coverage": "node --strong_mode --harmony_destructuring --harmony_rest_parameters --harmony_spreadcalls node_modules/.bin/istanbul cover test.js" | ||
"test": "mocha -R tap" | ||
}, | ||
@@ -29,10 +27,4 @@ "repository": "deoxxa/duplexer2", | ||
"devDependencies": { | ||
"@shinnn/eslint-config-node-legacy": "^1.0.0", | ||
"eslint": "^1.7.1", | ||
"istanbul": "^0.3.22", | ||
"object-assign": "^4.0.1", | ||
"tap-spec": "^4.1.0", | ||
"tape": "^4.2.1", | ||
"through": "^2.3.8" | ||
"mocha": "^2.2.5" | ||
} | ||
} |
@@ -6,27 +6,49 @@ # duplexer2 [![Build Status](https://travis-ci.org/deoxxa/duplexer2.svg?branch=master)](https://travis-ci.org/deoxxa/duplexer2) [![Coverage Status](https://coveralls.io/repos/deoxxa/duplexer2/badge.svg?branch=master&service=github)](https://coveralls.io/github/deoxxa/duplexer2?branch=master) | ||
```javascript | ||
const duplexer2 = require("."); | ||
const {Readable, Writable} = require("stream"); | ||
var stream = require("stream"); | ||
const writable = new Writable({ | ||
write(data, enc, cb) { | ||
if (readable.push(data)) { | ||
cb(); | ||
return; | ||
} | ||
readable.once("drain", cb); | ||
var duplexer2 = require("duplexer2"); | ||
var writable = new stream.Writable({objectMode: true}), | ||
readable = new stream.Readable({objectMode: true}); | ||
writable._write = function _write(input, encoding, done) { | ||
if (readable.push(input)) { | ||
return done(); | ||
} else { | ||
readable.once("drain", done); | ||
} | ||
}); | ||
}; | ||
const readable = new Readable({read() {/* no-op */}}); | ||
readable._read = function _read(n) { | ||
// no-op | ||
}; | ||
// simulate the readable thing closing after a bit | ||
writable.once("finish", () => setTimeout(() => readable.push(null), 300)); | ||
writable.once("finish", function() { | ||
setTimeout(function() { | ||
readable.push(null); | ||
}, 500); | ||
}); | ||
const duplex = duplexer2({}, writable, readable) | ||
.on("data", data => console.log("got data", data.toString())) | ||
.on("finish", () => console.log("got finish event")) | ||
.on("end", () => console.log("got end event")); | ||
var duplex = duplexer2(writable, readable); | ||
duplex.write("oh, hi there", () => console.log("finished writing")); | ||
duplex.end(() => console.log("finished ending")); | ||
duplex.on("data", function(e) { | ||
console.log("got data", JSON.stringify(e)); | ||
}); | ||
duplex.on("finish", function() { | ||
console.log("got finish event"); | ||
}); | ||
duplex.on("end", function() { | ||
console.log("got end event"); | ||
}); | ||
duplex.write("oh, hi there", function() { | ||
console.log("finished writing"); | ||
}); | ||
duplex.end(function() { | ||
console.log("finished ending"); | ||
}); | ||
``` | ||
@@ -33,0 +55,0 @@ |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
1
116
6396
59
1