🚀 Socket Launch Week Day 5:Introducing Repository Access Permissions and Custom Roles.Learn more
Sign In

bare-pipe

Package Overview
Dependencies
Maintainers
1
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bare-pipe - npm Package Compare versions

Comparing version
4.1.5
to
4.2.0
+74
-15
binding.c

@@ -27,2 +27,3 @@ #include <assert.h>

js_ref_t *on_read;
js_ref_t *on_handle;
js_ref_t *on_close;

@@ -248,2 +249,31 @@

size_t pending = uv_pipe_pending_count((uv_pipe_t *) stream);
while (pending > 0 && !pipe->exiting && !pipe->closing) {
uv_handle_type type = uv_pipe_pending_type((uv_pipe_t *) stream);
js_value_t *on_handle;
err = js_get_reference_value(env, pipe->on_handle, &on_handle);
assert(err == 0);
js_value_t *argv[1];
err = js_create_uint32(env, (uint32_t) type, &argv[0]);
assert(err == 0);
js_call_function(env, ctx, on_handle, 1, argv, NULL);
size_t next = uv_pipe_pending_count((uv_pipe_t *) stream);
if (next >= pending) break;
pending = next;
}
if (pipe->exiting || pipe->closing) {
err = js_close_handle_scope(env, scope);
assert(err == 0);
return;
}
js_value_t *on_read;

@@ -320,2 +350,5 @@ err = js_get_reference_value(env, pipe->on_read, &on_read);

err = js_delete_reference(env, pipe->on_handle);
assert(err == 0);
err = js_delete_reference(env, pipe->on_close);

@@ -358,4 +391,4 @@ assert(err == 0);

size_t argc = 8;
js_value_t *argv[8];
size_t argc = 10;
js_value_t *argv[10];

@@ -365,3 +398,3 @@ err = js_get_callback_info(env, info, &argc, argv, NULL, NULL);

assert(argc == 8);
assert(argc == 10);

@@ -378,4 +411,8 @@ uv_loop_t *loop;

err = uv_pipe_init(loop, &pipe->handle, 0);
bool ipc;
err = js_get_value_bool(env, argv[1], &ipc);
assert(err == 0);
err = uv_pipe_init(loop, &pipe->handle, ipc ? 1 : 0);
if (err < 0) {

@@ -395,23 +432,26 @@ err = js_throw_error(env, uv_err_name(err), uv_strerror(err));

err = js_create_reference(env, argv[1], 1, &pipe->ctx);
err = js_create_reference(env, argv[2], 1, &pipe->ctx);
assert(err == 0);
err = js_create_reference(env, argv[2], 1, &pipe->on_connection);
err = js_create_reference(env, argv[3], 1, &pipe->on_connection);
assert(err == 0);
err = js_create_reference(env, argv[3], 1, &pipe->on_connect);
err = js_create_reference(env, argv[4], 1, &pipe->on_connect);
assert(err == 0);
err = js_create_reference(env, argv[4], 1, &pipe->on_write);
err = js_create_reference(env, argv[5], 1, &pipe->on_write);
assert(err == 0);
err = js_create_reference(env, argv[5], 1, &pipe->on_end);
err = js_create_reference(env, argv[6], 1, &pipe->on_end);
assert(err == 0);
err = js_create_reference(env, argv[6], 1, &pipe->on_read);
err = js_create_reference(env, argv[7], 1, &pipe->on_read);
assert(err == 0);
err = js_create_reference(env, argv[7], 1, &pipe->on_close);
err = js_create_reference(env, argv[8], 1, &pipe->on_handle);
assert(err == 0);
err = js_create_reference(env, argv[9], 1, &pipe->on_close);
assert(err == 0);
err = js_add_deferred_teardown_callback(env, bare_pipe__on_teardown, (void *) pipe, &pipe->teardown);

@@ -580,4 +620,4 @@ assert(err == 0);

size_t argc = 2;
js_value_t *argv[2];
size_t argc = 3;
js_value_t *argv[3];

@@ -587,3 +627,3 @@ err = js_get_callback_info(env, info, &argc, argv, NULL, NULL);

assert(argc == 2);
assert(argc == 3);

@@ -594,2 +634,13 @@ bare_pipe_t *pipe;

uv_stream_t *send_handle = NULL;
js_value_type_t send_handle_type;
err = js_typeof(env, argv[2], &send_handle_type);
assert(err == 0);
if (send_handle_type != js_null && send_handle_type != js_undefined) {
err = js_get_arraybuffer_info(env, argv[2], (void **) &send_handle, NULL);
assert(err == 0);
}
js_value_t *arr = argv[1];

@@ -619,3 +670,7 @@

err = uv_write(req, (uv_stream_t *) &pipe->handle, bufs, bufs_len, bare_pipe__on_write);
if (send_handle) {
err = uv_write2(req, (uv_stream_t *) &pipe->handle, bufs, bufs_len, send_handle, bare_pipe__on_write);
} else {
err = uv_write(req, (uv_stream_t *) &pipe->handle, bufs, bufs_len, bare_pipe__on_write);
}

@@ -852,2 +907,6 @@ free(bufs);

V("WRITABLE", bare_pipe_writable)
V("UV_NAMED_PIPE", UV_NAMED_PIPE)
V("UV_TCP", UV_TCP)
V("UV_UDP", UV_UDP)
#undef V

@@ -854,0 +913,0 @@

import EventEmitter, { EventMap } from 'bare-events'
import Buffer, { BufferEncoding } from 'bare-buffer'
import { Duplex, DuplexEvents } from 'bare-stream'

@@ -6,4 +7,13 @@ import PipeError from './lib/errors'

declare const ipcHandle: unique symbol
declare const ipcAccept: unique symbol
interface IPCAcceptable {
readonly [ipcHandle]: unknown
[ipcAccept]?(): void
}
interface PipeEvents extends DuplexEvents {
connect: []
handle: [type: number]
}

@@ -14,2 +24,3 @@

eagerOpen?: boolean
ipc?: boolean
readBufferSize?: number

@@ -22,3 +33,3 @@ }

interface Pipe<M extends PipeEvents = PipeEvents> extends Duplex<M> {
interface Pipe<M extends PipeEvents = PipeEvents> extends Duplex<M>, IPCAcceptable {
readonly connecting: boolean

@@ -36,2 +47,14 @@ readonly pending: boolean

write(
chunk: Buffer | string,
encoding: BufferEncoding,
handle?: IPCAcceptable,
cb?: (err: Error | null) => void
): boolean
write(chunk: Buffer | string, encoding: BufferEncoding, cb?: (err: Error | null) => void): boolean
write(chunk: Buffer | string, handle?: IPCAcceptable, cb?: (err: Error | null) => void): boolean
write(chunk: Buffer | string, cb?: (err: Error | null) => void): boolean
accept<T extends IPCAcceptable>(target: T): T
ref(): this

@@ -55,2 +78,3 @@ unref(): this

allowHalfOpen?: boolean
ipc?: boolean
pauseOnConnect?: boolean

@@ -109,2 +133,3 @@ readBufferSize?: number

export {
type IPCAcceptable,
type PipeEvents,

@@ -111,0 +136,0 @@ type PipeOptions,

+151
-10

@@ -10,2 +10,5 @@ const EventEmitter = require('bare-events')

const ipcHandle = Symbol.for('bare.ipc.handle')
const ipcAccept = Symbol.for('bare.ipc.accept')
module.exports = exports = class Pipe extends Duplex {

@@ -18,3 +21,8 @@ constructor(path, opts = {}) {

const { readBufferSize = defaultReadBufferSize, allowHalfOpen = true, eagerOpen = true } = opts
const {
readBufferSize = defaultReadBufferSize,
allowHalfOpen = true,
eagerOpen = true,
ipc = false
} = opts

@@ -26,2 +34,3 @@ super({ eagerOpen })

this._allowHalfOpen = allowHalfOpen
this._ipc = ipc

@@ -33,5 +42,10 @@ this._fd = -1

this._pendingWrite = null
this._pendingWriteSegments = null
this._pendingWriteIdx = 0
this._pendingFinal = null
this._pendingDestroy = null
this._handleQueue = []
this._handleQueueSize = 0
this._buffer = Buffer.alloc(readBufferSize)

@@ -41,2 +55,3 @@

this._buffer,
ipc,
this,

@@ -48,2 +63,3 @@ noop,

this._onread,
this._onhandle,
this._onclose

@@ -83,2 +99,6 @@ )

get [ipcHandle]() {
return this._handle
}
open(fd, opts = {}, onconnect) {

@@ -159,2 +179,39 @@ if (typeof opts === 'function') {

write(chunk, encoding, handle, cb) {
if (typeof encoding === 'function') {
cb = encoding
encoding = undefined
handle = null
} else if (typeof encoding === 'object' && encoding !== null) {
if (typeof handle === 'function') cb = handle
handle = encoding
encoding = undefined
} else if (typeof handle === 'function') {
cb = handle
handle = null
}
if (handle) this._handleQueueSize++
this._handleQueue.push(handle || null)
if (encoding) return super.write(chunk, encoding, cb)
return super.write(chunk, cb)
}
accept(target) {
const handle = target[ipcHandle]
if (handle === undefined) {
throw errors.INVALID_IPC_TARGET('Target does not implement the IPC handle protocol')
}
binding.accept(this._handle, handle)
if (typeof target[ipcAccept] === 'function') target[ipcAccept]()
return target
}
ref() {

@@ -172,2 +229,6 @@ binding.ref(this._handle)

[ipcAccept]() {
this._onaccept()
}
_open(cb) {

@@ -188,10 +249,59 @@ if (this._state & constants.state.CONNECTED) return cb(null)

_writev(batch, cb) {
this._pendingWrite = [cb, batch]
this._pendingWrite = cb
binding.writev(
this._handle,
batch.map(({ chunk }) => chunk)
)
if (this._handleQueueSize === 0) {
this._handleQueue = []
this._pendingWriteSegments = null
binding.writev(
this._handle,
batch.map(({ chunk }) => chunk),
null
)
return
}
const handles = this._handleQueue.splice(0, batch.length)
// Fill any holes in the batch for messages that don't carry a handle. Each
// message that does carry a handle decrements the handle queue size.
for (let i = 0; i < batch.length; i++) {
if (handles[i] === undefined) handles[i] = null
else if (handles[i] !== null) this._handleQueueSize--
}
const segments = []
let i = 0
while (i < batch.length) {
if (handles[i] !== null) {
segments.push({ chunks: [batch[i]], handle: handles[i] })
i++
} else {
const start = i
while (i < batch.length && handles[i] === null) i++
segments.push({ chunks: batch.slice(start, i), handle: null })
}
}
this._pendingWriteSegments = segments
this._pendingWriteIdx = 0
this._writeNextSegment()
}
_writeNextSegment() {
const segment = this._pendingWriteSegments[this._pendingWriteIdx]
const chunks = []
for (let i = 0; i < segment.chunks.length; i++) {
chunks.push(segment.chunks[i].chunk)
}
let sendHandle = null
if (segment.handle !== null) sendHandle = segment.handle[ipcHandle]
binding.writev(this._handle, chunks, sendHandle)
}
_final(cb) {

@@ -232,4 +342,21 @@ if (this._state & constants.state.READABLE && this._state & constants.state.WRITABLE) {

if (this._pendingWrite === null) return
const cb = this._pendingWrite[0]
if (this._pendingWriteSegments === null) {
const cb = this._pendingWrite
this._pendingWrite = null
cb(err)
return
}
this._pendingWriteIdx++
if (err === null && this._pendingWriteIdx < this._pendingWriteSegments.length) {
this._writeNextSegment()
return
}
const cb = this._pendingWrite
this._pendingWrite = null
this._pendingWriteSegments = null
this._pendingWriteIdx = 0
cb(err)

@@ -266,2 +393,7 @@ }

_onaccept() {
this._state |= constants.state.CONNECTED | constants.state.READABLE | constants.state.WRITABLE
this._continueOpen()
}
_onread(err, read) {

@@ -289,2 +421,6 @@ if (err) {

_onhandle(type) {
this.emit('handle', type)
}
_onwrite(err) {

@@ -339,3 +475,4 @@ this._continueWrite(err)

allowHalfOpen = true,
pauseOnConnect = false
pauseOnConnect = false,
ipc = false
} = opts

@@ -348,2 +485,3 @@

this._pauseOnConnect = pauseOnConnect
this._ipc = ipc

@@ -398,2 +536,3 @@ this._path = null

empty,
this._ipc,
this,

@@ -405,2 +544,3 @@ this._onconnection,

noop,
noop,
this._onclose

@@ -475,3 +615,4 @@ )

allowHalfOpen: this._allowHalfOpen,
eagerOpen: !this._pauseOnConnect
eagerOpen: !this._pauseOnConnect,
ipc: this._ipc
})

@@ -483,3 +624,3 @@

pipe._path = this._path
pipe._state |= constants.state.CONNECTED | constants.state.READABLE | constants.state.WRITABLE
pipe._onaccept()

@@ -486,0 +627,0 @@ this._connections.add(pipe)

@@ -13,4 +13,9 @@ declare const constants: {

}
handle: {
NAMED_PIPE: number
TCP: number
UDP: number
}
}
export = constants

@@ -0,1 +1,3 @@

const binding = require('../binding')
module.exports = {

@@ -12,3 +14,8 @@ state: {

UNREFED: 0x100
},
handle: {
NAMED_PIPE: binding.UV_NAMED_PIPE,
TCP: binding.UV_TCP,
UDP: binding.UV_UDP
}
}

@@ -26,2 +26,6 @@ module.exports = class PipeError extends Error {

}
static INVALID_IPC_TARGET(msg) {
return new PipeError(msg, PipeError.INVALID_IPC_TARGET)
}
}
+6
-2
{
"name": "bare-pipe",
"version": "4.1.5",
"version": "4.2.0",
"description": "Native I/O pipes for JavaScript",

@@ -31,3 +31,5 @@ "exports": {

"scripts": {
"test": "prettier . --check && bare test.js"
"format": "prettier --write . && lunte --fix",
"lint": "prettier --check . && lunte",
"test": "brittle-bare --coverage test.js"
},

@@ -53,4 +55,6 @@ "repository": {

"bare-path": "^3.0.0",
"bare-tcp": "^2.4.1",
"brittle": "^3.2.1",
"cmake-bare": "^1.1.6",
"lunte": "^1.8.0",
"prettier": "^3.4.1",

@@ -57,0 +61,0 @@ "prettier-config-holepunch": "^2.0.0"

+193
-0

@@ -19,4 +19,197 @@ # bare-pipe

## API
#### `const pipe = new Pipe([path][, options])`
Create a new pipe. If `path` is a number, it is treated as a file descriptor to open. If it is a string, it is treated as a path to connect to.
Options include:
```js
options = {
readBufferSize: 65536,
allowHalfOpen: true,
eagerOpen: true,
ipc: false
}
```
Set `ipc: true` to enable handle passing over the pipe. See [IPC handle passing](#ipc-handle-passing).
#### `pipe.connecting`
Whether the pipe is currently connecting.
#### `pipe.pending`
Whether the pipe has not yet connected.
#### `pipe.readyState`
The current state of the pipe. One of `'open'`, `'readOnly'`, `'writeOnly'`, or `'opening'`.
#### `pipe.open(fd[, options][, onconnect])`
Open the pipe on the given file descriptor.
#### `pipe.connect(path[, options][, onconnect])`
Connect the pipe to `path`. `onconnect` is called when the connection is established.
#### `pipe.write(chunk[, encoding][, handle][, cb])`
Write `chunk` to the pipe. If `handle` is given and the pipe was created with `ipc: true`, the handle is transferred to the receiver alongside the chunk. `handle` must implement the [`IPCAcceptable`](#ipc-handle-passing) protocol.
#### `pipe.accept(target)`
Accept a pending handle into `target`. `target` must implement the [`IPCAcceptable`](#ipc-handle-passing) protocol. Call this synchronously from the `'handle'` event listener. Throws `INVALID_IPC_TARGET` if `target` does not implement the protocol.
#### `pipe.ref()`
Ref the pipe, preventing the process from exiting.
#### `pipe.unref()`
Unref the pipe, allowing the process to exit.
#### `event: 'connect'`
Emitted when the pipe connects.
#### `event: 'handle'`
Emitted on the receiving side for each pending handle when the pipe was created with `ipc: true`. The argument is the handle type, one of `Pipe.constants.handle.NAMED_PIPE`, `TCP`, or `UDP`. The listener must call `pipe.accept(target)` synchronously to claim the handle. Multiple handles arriving in a single read are emitted in arrival order before the corresponding `'data'` event.
#### `const server = Pipe.createServer([options][, onconnection])`
Create a new pipe server. `server` extends <https://github.com/holepunchto/bare-events>.
Options include:
```js
options = {
readBufferSize: 65536,
allowHalfOpen: true,
pauseOnConnect: false,
ipc: false
}
```
These options are applied to each incoming pipe.
#### `server.listening`
Whether the server is listening.
#### `server.address()`
Returns the bound path, or `null` if the server is not listening.
#### `server.listen(path[, backlog[, options]][, onlistening])`
Start listening for connections on `path`. `backlog` defaults to `511`.
#### `server.close([onclose])`
Close the server. No new connections will be accepted. The server emits `close` after all existing connections have ended.
#### `server.ref()`
Ref the server, preventing the process from exiting.
#### `server.unref()`
Unref the server, allowing the process to exit.
#### `event: 'listening'`
Emitted when the server starts listening.
#### `event: 'connection'`
Emitted when a new connection is received. The argument is a `Pipe`.
#### `event: 'close'`
Emitted when the server closes.
#### `event: 'error'`
Emitted when an error occurs.
#### `const pipe = Pipe.createConnection(path[, options][, onconnect])`
Create a new pipe and connect it to `path`. Shorthand for `new Pipe(options).connect(path, options, onconnect)`.
#### `Pipe.pipe()`
Returns `[read, write]`, a pair of file descriptors connected to each other.
#### `Pipe.constants`
Object containing internal state constants and handle type constants:
```js
Pipe.constants.handle.NAMED_PIPE
Pipe.constants.handle.TCP
Pipe.constants.handle.UDP
```
## IPC handle passing
Pipes created with `ipc: true` can transfer libuv handles (named pipes, TCP sockets, UDP sockets) to a peer alongside the byte stream. The peer receives a `'handle'` event for each transferred handle, in arrival order, before the corresponding `'data'` event.
Sender:
```js
const left = new Pipe(fd, { ipc: true })
const socket = tcp.createConnection(port)
socket.on('connect', () => {
left.write(Buffer.from('here'), socket)
})
```
Receiver:
```js
const right = new Pipe(fd, { ipc: true })
right.on('handle', (type) => {
if (type === Pipe.constants.handle.TCP) {
const socket = new tcp.Socket()
right.accept(socket)
socket.on('data', console.log)
}
})
```
### `IPCAcceptable` protocol
Any object passed to `pipe.accept(target)` or `pipe.write(chunk, handle, ...)` must implement two well-known symbols:
```js
const ipcHandle = Symbol.for('bare.ipc.handle')
const ipcAccept = Symbol.for('bare.ipc.accept')
class MyTarget {
get [ipcHandle]() {
return this._handle // An ArrayBuffer backing a libuv `uv_*_t` struct
}
[ipcAccept]() {
// Optional: Called synchronously after the handle has been transferred
}
}
```
- `Symbol.for('bare.ipc.handle')` (required): A getter returning the underlying libuv handle (typically an `ArrayBuffer` whose first bytes are a `uv_stream_t` / `uv_udp_t`).
- `Symbol.for('bare.ipc.accept')` (optional): A method invoked synchronously after the handle has been transferred. Use it to initialize per-handle state (e.g. address lookup).
`Pipe`, `bare-tcp`'s `Socket`, and any compatible package implement this protocol natively, so a `bare-tcp` socket can be passed and received via `bare-pipe` IPC without any glue code.
TypeScript users can import the `IPCAcceptable` interface from `bare-pipe` to type the protocol.
## License
Apache-2.0

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet