Comparing version 0.0.2 to 0.0.3
86
index.js
@@ -27,11 +27,55 @@ /* | ||
assert(readable) | ||
assert(writable) | ||
this._options = options | ||
this._readable = readable | ||
this._writable = writable | ||
stream.Duplex.call(this, options) | ||
this.hook(writable, readable) | ||
this._firstPayloadMethod = 'write' | ||
this.on('finish', function() { | ||
if (this._writable) | ||
return this._writable.end() | ||
this._firstPayloadMethod = 'end' | ||
}) | ||
} | ||
util.inherits(ReaDuplexer, stream.Duplex) | ||
ReaDuplexer.prototype.hookWritable = function hookWritable(writable) { | ||
var that = this | ||
, dummyWritable = new stream.Writable(options) | ||
assert(!this._readable, 'already hooked to a Writable') | ||
assert(writable) | ||
this._writable = writable | ||
writable.on('drain', function() { | ||
that.emit('drain') | ||
}) | ||
writable.on('error', function(err) { | ||
that.emit('error', err) | ||
}) | ||
if (this._firstPayload) { | ||
this._writable[this._firstPayloadMethod]( | ||
this._firstPayload.chunk | ||
, this._firstPayload.enc | ||
, this._firstPayload.cb) | ||
delete this._firstPayload | ||
} | ||
return this | ||
} | ||
ReaDuplexer.prototype.hookReadable = function hookReadable(readable) { | ||
var that = this | ||
, dummyWritable = new stream.Writable(this._options) | ||
assert(!this._readable, 'already hooked to a Readable') | ||
assert(readable) | ||
this._readable = readable | ||
dummyWritable._write = function dummyWrite(chunk, enc, cb) { | ||
@@ -48,7 +92,3 @@ if (that.push(chunk, enc)) | ||
writable.on('drain', function() { | ||
that.emit('drain') | ||
}) | ||
;[readable, writable, dummyWritable].forEach(function(stream) { | ||
;[readable, dummyWritable].forEach(function(stream) { | ||
stream.on('error', function(err) { | ||
@@ -59,13 +99,17 @@ that.emit('error', err) | ||
this.on('finish', function() { | ||
writable.end() | ||
}) | ||
readable.pipe(dummyWritable) | ||
stream.Duplex.call(this, options) | ||
return this | ||
} | ||
util.inherits(ReaDuplexer, stream.Duplex) | ||
ReaDuplexer.prototype.hook = function hook(writable, readable) { | ||
if (writable) | ||
this.hookWritable(writable) | ||
if (readable) | ||
this.hookReadable(readable) | ||
return this | ||
} | ||
ReaDuplexer.prototype._read = function read(n) { | ||
@@ -79,5 +123,13 @@ if (this._lastReadCallback) | ||
ReaDuplexer.prototype._write = function write(chunk, enc, cb) { | ||
return this._writable.write(chunk, enc, cb) | ||
if (this._writable) | ||
return this._writable.write(chunk, enc, cb) | ||
// we are in delayed open | ||
this._firstPayload = { | ||
chunk: chunk | ||
, enc: enc | ||
, cb: cb | ||
} | ||
} | ||
module.exports = ReaDuplexer |
{ | ||
"name": "reduplexer", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Another Stream2 duplexer", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -14,7 +14,7 @@ # reduplexer(writable, readable, options) | ||
```js | ||
var cp = require('child_process') | ||
, duplex = require('reduplexer') | ||
, grep = cp.exec('grep Stream') | ||
var cp = require('child_process') | ||
, duplexer = require('reduplexer') | ||
, grep = cp.exec('grep Stream') | ||
duplex(grep.stdin, grep.stdout, { objectMode: true }) | ||
duplexer(grep.stdin, grep.stdout, { objectMode: true }) | ||
``` | ||
@@ -26,2 +26,44 @@ | ||
## API | ||
* <a href="#reduplexer"><code><b>reduplexer()</b></code></a> | ||
* <a href="#hookWritable"><code>duplex.<b>hookWritable()</b></code></a> | ||
* <a href="#hookReadable"><code>duplex.<b>hookReadable()</b></code></a> | ||
* <a href="#hook"><code>duplex.<b>hook()</b></code></a> | ||
------------------------------------------------------- | ||
<a name="reduplexer"></a> | ||
### reduplexer(writable, readable, options) | ||
Create a [`Duplex`][3] stream based on `writable` and `readable` using | ||
the given options. | ||
`writable` and `readable` can be null, and in that case they can be | ||
'hooked' later. | ||
------------------------------------------------------- | ||
<a name="hookWritable"></a> | ||
### duplex.hookWritable(writable) | ||
Hooks a `Writable` stream. It will throw if a `Writable` is already hooked. | ||
------------------------------------------------------- | ||
<a name="hookReadable"></a> | ||
### duplex.hookReadable(writable) | ||
Hooks a `Readable` stream. It will throw if a `Readable` stream is already hooked. | ||
------------------------------------------------------- | ||
<a name="hook"></a> | ||
### duplex.hook(writable, readable) | ||
Shortcut for: | ||
```js | ||
duplex.hookWritable(writable) | ||
duplex.hookReadable(readable) | ||
``` | ||
But it will not throw if any of the two are missing. | ||
## Tests | ||
@@ -28,0 +70,0 @@ |
110
test.js
@@ -93,1 +93,111 @@ | ||
}) | ||
test('late hook', function(t) { | ||
t.plan(2) | ||
var writable = new stream.Writable() | ||
, readable = new stream.Readable() | ||
// nothing here, let's hook them up later | ||
, instance = duplexer() | ||
writable._write = function(chunk, enc, cb) { | ||
t.equal(chunk.toString(), 'writable') | ||
cb() | ||
} | ||
readable._read = function(n) { | ||
this.push('readable') | ||
this.push(null) | ||
} | ||
instance.on('data', function(chunk) { | ||
t.equal(chunk.toString(), 'readable') | ||
}) | ||
instance.end('writable') | ||
// separate hooks for writable | ||
instance.hookWritable(writable) | ||
// and readable | ||
instance.hookReadable(readable) | ||
}) | ||
test('shortcut hook', function(t) { | ||
t.plan(2) | ||
var writable = new stream.Writable() | ||
, readable = new stream.Readable() | ||
// nothing here, let's hook them up later | ||
, instance = duplexer() | ||
writable._write = function(chunk, enc, cb) { | ||
t.equal(chunk.toString(), 'writable') | ||
cb() | ||
} | ||
readable._read = function(n) { | ||
this.push('readable') | ||
this.push(null) | ||
} | ||
instance.on('data', function(chunk) { | ||
t.equal(chunk.toString(), 'readable') | ||
}) | ||
instance.end('writable') | ||
// single hook for both! | ||
instance.hook(writable, readable) | ||
}) | ||
test('double hook', function(t) { | ||
t.plan(2) | ||
var writable = new stream.Writable() | ||
, readable = new stream.Readable() | ||
// nothing here, let's hook them up later | ||
, instance = duplexer() | ||
writable._write = function(chunk, enc, cb) { | ||
t.equal(chunk.toString(), 'writable') | ||
cb() | ||
} | ||
readable._read = function(n) { | ||
} | ||
instance.hook(writable, readable) | ||
t.test('writable', function(t) { | ||
var thrown = false | ||
try { | ||
instance.hookWritable(writable) | ||
} catch(err) { | ||
thrown = true | ||
} | ||
t.assert(thrown, 'must have thrown') | ||
t.end() | ||
}) | ||
t.test('readable', function(t) { | ||
var thrown = false | ||
try { | ||
instance.hookReadable(readable) | ||
} catch(err) { | ||
thrown = true | ||
} | ||
t.assert(thrown, 'must have thrown') | ||
t.end() | ||
}) | ||
t.end() | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
241
92
217054