bare-channel
Advanced tools
Comparing version 3.0.5 to 4.0.0
96
index.js
@@ -1,2 +0,4 @@ | ||
const events = require('bare-events') | ||
/* global Bare */ | ||
const EventEmitter = require('bare-events') | ||
const structuredClone = require('bare-structured-clone') | ||
const FIFO = require('fast-fifo') | ||
@@ -13,3 +15,7 @@ const binding = require('./binding') | ||
this.destroyed = false | ||
this.handle = handle | ||
binding.channelRef(this.handle) | ||
} | ||
@@ -22,2 +28,4 @@ | ||
destroy () { | ||
if (this.destroyed) return | ||
this.destroyed = true | ||
binding.channelDestroy(this.handle) | ||
@@ -31,8 +39,7 @@ } | ||
class Port extends events.EventEmitter { | ||
class Port extends EventEmitter { | ||
constructor (channel) { | ||
super() | ||
this.closed = false | ||
this.remoteClosed = false | ||
this._channel = channel | ||
@@ -54,2 +61,5 @@ this._closing = null | ||
this.closed = false | ||
this.remoteClosed = false | ||
this.handle = binding.portInit(channel.handle, this, | ||
@@ -61,2 +71,4 @@ this._ondrain, | ||
) | ||
Port._ports.add(this) | ||
} | ||
@@ -76,16 +88,11 @@ | ||
async send (value) { | ||
if (typeof value === 'string') value = Buffer.from(value) | ||
async read () { | ||
do { | ||
if (this._buffer.length) return this._buffer.shift() | ||
} while (await this._wait()) | ||
while (true) { | ||
while (this._drainedPromise !== null) await this._drainedPromise | ||
if (this._closing !== null) return false | ||
if (binding.portWrite(this.handle, value)) break | ||
if (this._drainedPromise === null) this._drainedPromise = new Promise(this._drainedQueue) | ||
} | ||
return null | ||
} | ||
recvSync () { | ||
readSync () { | ||
while (true) { | ||
@@ -104,8 +111,21 @@ if (this._closing !== null) return null | ||
async recv () { | ||
do { | ||
if (this._buffer.length) return this._buffer.shift() | ||
} while (await this._wait()) | ||
async write (value, opts = {}) { | ||
const serialized = structuredClone.serializeWithTransfer(value, opts.transfer) | ||
return null | ||
const state = { start: 0, end: 0, buffer: null } | ||
structuredClone.preencode(state, serialized) | ||
const data = state.buffer = Buffer.allocUnsafe(state.end) | ||
structuredClone.encode(state, serialized) | ||
while (true) { | ||
while (this._drainedPromise !== null) await this._drainedPromise | ||
if (this._closing !== null) return false | ||
if (binding.portWrite(this.handle, data)) break | ||
if (this._drainedPromise === null) this._drainedPromise = new Promise(this._drainedQueue) | ||
} | ||
} | ||
@@ -127,6 +147,6 @@ | ||
async _close () { | ||
await Promise.resolve() // force one tick to avoid re-entry | ||
await Promise.resolve() // Avoid re-entry | ||
this.emit('closing') | ||
// drain any pending writes | ||
while (this._drainedPromise !== null) await this._drainedPromise | ||
@@ -136,7 +156,5 @@ | ||
// wait for the remote to signal end also | ||
if (!this.remoteClosed) await new Promise((resolve) => { this._onremoteclose = resolve }) | ||
this._onremoteclose = null | ||
// now destroy | ||
const destroyed = new Promise((resolve) => { this._ondestroyed = resolve }) | ||
@@ -147,2 +165,4 @@ binding.portDestroy(this.handle) | ||
Port._ports.delete(this) | ||
this.closed = true | ||
@@ -152,2 +172,11 @@ this.emit('close') | ||
ref () { | ||
binding.portRef(this.handle) | ||
} | ||
unref () { | ||
if (Bare.exiting) return // Unref'ed ports during exit is unsafe | ||
binding.portUnref(this.handle) | ||
} | ||
_wait () { | ||
@@ -174,5 +203,10 @@ if (this._backpressured) this._onflush() | ||
while (this._buffer.length < MAX_BUFFER) { | ||
const value = binding.portRead(this.handle) | ||
if (value === null) return | ||
this._buffer.push(ArrayBuffer.isView(value) ? Buffer.coerce(value) : value) | ||
const data = binding.portRead(this.handle) | ||
if (data === null) return | ||
const state = { start: 0, end: data.byteLength, buffer: data } | ||
const value = structuredClone.deserializeWithTransfer(structuredClone.decode(state)) | ||
this._buffer.push(value) | ||
this._onactive() | ||
@@ -198,3 +232,3 @@ } | ||
if (this._onremoteclose !== null) this._onremoteclose() | ||
else this.close() // run in bg | ||
else this.close() | ||
} | ||
@@ -205,2 +239,8 @@ | ||
} | ||
static _ports = new Set() | ||
} | ||
Bare.on('exit', async () => { | ||
for (const port of Port._ports) port.ref() | ||
}) |
{ | ||
"name": "bare-channel", | ||
"version": "3.0.5", | ||
"version": "4.0.0", | ||
"description": "Inter-thread messaging for JavaScript", | ||
@@ -29,2 +29,3 @@ "main": "index.js", | ||
"bare-events": "^2.0.0", | ||
"bare-structured-clone": "^1.0.2", | ||
"fast-fifo": "^1.3.2" | ||
@@ -31,0 +32,0 @@ }, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
170
296517
3
+ Addedbare-structured-clone@^1.0.2
+ Addedb4a@1.6.6(transitive)
+ Addedbare-structured-clone@1.2.5(transitive)
+ Addedbare-type@1.0.4(transitive)
+ Addedbits-to-bytes@1.3.0(transitive)
+ Addedcompact-encoding@2.15.0(transitive)
+ Addedcompact-encoding-bitfield@1.0.0(transitive)