Comparing version 3.4.1 to 3.5.0
31
index.js
@@ -11,3 +11,3 @@ const b4a = require('b4a') | ||
class Channel { | ||
constructor (mux, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy) { | ||
constructor (mux, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy, ondrain) { | ||
this.userData = userData | ||
@@ -27,2 +27,3 @@ this.protocol = protocol | ||
this.ondestroy = ondestroy | ||
this.ondrain = ondrain | ||
@@ -43,2 +44,6 @@ this._handshake = handshake | ||
get drained () { | ||
return this._mux.drained | ||
} | ||
open (handshake) { | ||
@@ -225,3 +230,5 @@ const id = this._mux._free.length > 0 | ||
return mux.stream.write(state.buffer) | ||
mux.drained = mux.stream.write(state.buffer) | ||
return mux.drained | ||
} | ||
@@ -257,2 +264,3 @@ } | ||
this.corked = 0 | ||
this.drained = true | ||
@@ -275,2 +283,3 @@ this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe) | ||
this.stream.on('data', this._ondata.bind(this)) | ||
this.stream.on('drain', this._ondrain.bind(this)) | ||
this.stream.on('end', this._onend.bind(this)) | ||
@@ -326,3 +335,3 @@ this.stream.on('error', noop) // we handle this in "close" | ||
createChannel ({ userData = null, protocol, aliases = [], id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { | ||
createChannel ({ userData = null, protocol, aliases = [], id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop, ondrain = noop }) { | ||
if (this.stream.destroyed) return null | ||
@@ -334,3 +343,3 @@ | ||
if (info.incoming.length === 0) { | ||
return new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy) | ||
return new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy, ondrain) | ||
} | ||
@@ -344,3 +353,3 @@ | ||
const session = new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy) | ||
const session = new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy, ondrain) | ||
@@ -388,3 +397,3 @@ session._remoteId = remoteId | ||
this.stream.write(state.buffer) | ||
this.drained = this.stream.write(state.buffer) | ||
} | ||
@@ -428,2 +437,10 @@ | ||
_ondrain () { | ||
this.drained = true | ||
for (const s of this._local) { | ||
if (s !== null) s._track(s.ondrain(s)) | ||
} | ||
} | ||
_onend () { // TODO: support half open mode for the users who wants that here | ||
@@ -645,3 +662,3 @@ this.stream.end() | ||
this.stream.write(buffer) | ||
this.drained = this.stream.write(buffer) | ||
} | ||
@@ -648,0 +665,0 @@ |
{ | ||
"name": "protomux", | ||
"version": "3.4.1", | ||
"version": "3.5.0", | ||
"description": "Multiplex multiple message oriented protocols over a stream", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -111,2 +111,14 @@ # protomux | ||
#### `const opened = mux.opened({ protocol, id })` | ||
Boolean that indicates if the channel is opened. | ||
#### `mux.pair({ protocol, id }, callback)` | ||
Register a callback to be called everytime a new channel is requested. | ||
#### `mux.unpair({ protocol, id })` | ||
Unregisters the pair callback. | ||
#### `channel.open([handshake])` | ||
@@ -118,3 +130,3 @@ | ||
Add a message. Options include: | ||
Add/register a message type for a certain encoding. Options include: | ||
@@ -121,0 +133,0 @@ ``` js |
49
test.js
@@ -433,4 +433,53 @@ const Protomux = require('./') | ||
test('drain', function (t) { | ||
t.plan(7) | ||
const mux1 = new Protomux(new SecretStream(true)) | ||
const mux2 = new Protomux(new SecretStream(false)) | ||
t.ok(mux1.drained) | ||
t.ok(mux2.drained) | ||
replicate(mux1, mux2) | ||
const a = mux1.createChannel({ | ||
protocol: 'foo', | ||
messages: [ | ||
{ encoding: c.string } | ||
] | ||
}) | ||
t.ok(a.drained) | ||
a.open() | ||
const b = mux2.createChannel({ | ||
protocol: 'foo', | ||
messages: [ | ||
{ encoding: c.string } | ||
], | ||
ondrain (c) { | ||
t.is(c, b) | ||
t.ok(mux1.drained) | ||
} | ||
}) | ||
t.ok(b.drained) | ||
b.open() | ||
while (true) { | ||
const drained = b.messages[0].send('hello world') | ||
if (b.drained !== drained) t.fail('Drained property should be equal as in channel') | ||
if (mux2.drained !== drained) t.fail('Drained property should be equal as in mux') | ||
if (!drained) { | ||
t.pass() | ||
break | ||
} | ||
} | ||
}) | ||
function replicate (a, b) { | ||
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream) | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
31628
882
180