Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nanomessage-rpc

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nanomessage-rpc - npm Package Compare versions

Comparing version 1.2.1 to 2.0.0

lib/codec.js

4

CHANGELOG.md

@@ -9,2 +9,4 @@ # Changelog

## 2.0.0 - 2020-07-21 [YANKED]
## 1.2.1 - 2020-03-27 [YANKED]

@@ -25,2 +27,2 @@

## 1.0.0 - 2020-03-11 [YANKED]
[Unreleased]: https://github.com/geut/nanomessage-rpc/compare/v1.2.1...HEAD
[Unreleased]: https://github.com/geut/nanomessage-rpc/compare/v2.0.0...HEAD

@@ -0,16 +1,27 @@

const { EventEmitter } = require('events')
const Emittery = require('emittery')
const eos = require('end-of-stream')
const jsonCodec = require('buffer-json-encoding')
const nanomessage = require('nanomessage')
const assert = require('nanocustomassert')
const { NanoresourcePromise } = require('nanoresource-promise')
const { encodeError, decodeError, ERR_ACTION_NAME_MISSING, ERR_ACTION_RESPONSE_ERROR } = require('./lib/errors')
const kNanomessage = Symbol('rpc.nanomessage')
const kOnmessage = Symbol('rpc.onmessage')
const kSend = Symbol('rpc.send')
const kSubscribe = Symbol('rpc.subscribe')
const kActions = Symbol('rpc.actions')
const kEmittery = Symbol('rpc.emittery')
const Codec = require('./lib/codec')
const {
encodeError,
decodeError,
NRPC_ERR_ACTION_NAME_MISSING,
NRPC_ERR_ACTION_RESPONSE_ERROR,
NRPC_ERR_CLOSE,
NRPC_ERR_NOT_OPEN
} = require('./lib/errors')
const kNanomessage = Symbol('nrpc.nanomessage')
const kOnmessage = Symbol('nrpc.onmessage')
const kSend = Symbol('nrpc.send')
const kSubscribe = Symbol('nrpc.subscribe')
const kActions = Symbol('nrpc.actions')
const kEmittery = Symbol('nrpc.emittery')
const kOnCloseDestroyStream = Symbol('nrpc.onclosedestroystream')
const kFastCheckOpen = Symbol('nrpc.fastcheckopen')
class NanomessageRPC extends NanoresourcePromise {

@@ -20,8 +31,10 @@ constructor (socket, opts = {}) {

const { codec = jsonCodec } = opts
const { onCloseDestroyStream = true, onError = () => {}, valueEncoding, ...nanomessageOpts } = opts
this.socket = socket
this.ee = new EventEmitter()
this[kOnCloseDestroyStream] = onCloseDestroyStream
this[kNanomessage] = nanomessage({
codec,
...opts,
...nanomessageOpts,
valueEncoding: new Codec(valueEncoding),
onMessage: this[kOnmessage].bind(this),

@@ -34,11 +47,43 @@ send: this[kSend].bind(this),

this._onError = onError
this.ee.on('error', err => {
this._onError(err)
})
eos(socket, () => {
this.close().catch(err => {
process.nextTick(() => {
throw err
})
})
this
.close()
.catch(err => process.nextTick(() => this.ee.emit('error', err)))
})
}
get requests () {
return this[kNanomessage].requests
}
get inflightRequests () {
return this[kNanomessage].inflightRequests
}
get requestTimeout () {
return this[kNanomessage].timeout
}
get concurrency () {
return this[kNanomessage].concurrency
}
setRequestsTimeout (timeout) {
this[kNanomessage].setRequestsTimeout(timeout)
}
setConcurrency (concurrency) {
this[kNanomessage].setConcurrency(concurrency)
}
onError (cb) {
this._onError = cb
}
action (name, handler) {

@@ -55,11 +100,12 @@ this[kActions].set(name, handler)

call (name, data) {
const request = this[kNanomessage].request({ action: name, data })
const request = this[kNanomessage].request({ name, data })
const cancel = request.cancel
const promise = this.open()
const promise = this[kFastCheckOpen]()
.then(() => request)
.then((result) => {
if (result.err) {
const ErrorDecoded = decodeError(result.code, result.unformatMessage)
throw new ErrorDecoded(...result.args)
if (result.error) {
const err = result.data
const ErrorDecoded = decodeError(err.code, err.unformatMessage)
throw new ErrorDecoded(...err.args)
} else {

@@ -74,12 +120,7 @@ return result.data

async emit (name, data) {
emit (name, data) {
assert(typeof name === 'string', 'name must be a valid string')
await this.open()
if (name.startsWith('rpc-')) {
return this[kEmittery].emit(name, data)
} else {
return this[kNanomessage].send({ event: name, data })
}
return this[kFastCheckOpen]()
.then(() => this[kNanomessage].send({ name, data, event: true }))
}

@@ -103,4 +144,5 @@

_open () {
return this[kNanomessage].open()
async _open () {
await this[kNanomessage].open()
this.ee.emit('opened')
}

@@ -111,3 +153,3 @@

new Promise(resolve => {
if (this.socket.destroyed) return resolve()
if (this.socket.destroyed || !this[kOnCloseDestroyStream]) return resolve()
eos(this.socket, () => resolve())

@@ -118,12 +160,17 @@ this.socket.destroy()

])
return this[kEmittery].emit('rpc-closed')
this.ee.emit('closed')
}
async [kFastCheckOpen] () {
if (this.closed || this.closing) throw new NRPC_ERR_CLOSE()
if (this.opening) return this.open()
if (!this.opened) throw new NRPC_ERR_NOT_OPEN()
}
[kSubscribe] (ondata) {
const reader = async (data) => {
const reader = (data) => {
try {
await this[kEmittery].emit('rpc-data', data)
await ondata(data)
ondata(data)
} catch (err) {
await this[kEmittery].emit('rpc-error', err)
process.nextTick(() => this.ee.emit('error', err))
}

@@ -143,10 +190,12 @@ }

if (message.event) {
await this[kEmittery].emit(message.event, message.data)
await this[kEmittery].emit(message.name, message.data).catch((err) => {
process.nextTick(() => this.ee.emit('error', err))
})
return
}
const action = this[kActions].get(message.action)
const action = this[kActions].get(message.name)
if (!action) {
return encodeError(new ERR_ACTION_NAME_MISSING(message.action))
return encodeError(new NRPC_ERR_ACTION_NAME_MISSING(message.name))
}

@@ -156,3 +205,3 @@

const result = await action(message.data)
return { action: message.action, data: result }
return { data: result }
} catch (err) {

@@ -162,3 +211,3 @@ if (err.isNanoerror) {

}
return encodeError(new ERR_ACTION_RESPONSE_ERROR(err.message))
return encodeError(new NRPC_ERR_ACTION_RESPONSE_ERROR(err.message))
}

@@ -170,2 +219,2 @@ }

module.exports.NanomessageRPC = NanomessageRPC
module.exports.errors = { ERR_ACTION_NAME_MISSING, ERR_ACTION_RESPONSE_ERROR }
module.exports.errors = { NRPC_ERR_ACTION_NAME_MISSING, NRPC_ERR_ACTION_RESPONSE_ERROR, NRPC_ERR_CLOSE, NRPC_ERR_NOT_OPEN }

@@ -13,3 +13,3 @@ const nanoerror = require('nanoerror')

function encodeError (err) {
return { err: true, code: err.code, unformatMessage: err.unformatMessage, args: err.args }
return { error: true, data: { code: err.code, unformatMessage: err.unformatMessage, args: err.args } }
}

@@ -27,3 +27,7 @@

createError('ERR_ACTION_NAME_MISSING', 'missing action handler for: %s')
createError('ERR_ACTION_RESPONSE_ERROR', '%s')
createError('NRPC_ERR_ACTION_NAME_MISSING', 'missing action handler for: %s')
createError('NRPC_ERR_ACTION_RESPONSE_ERROR', '%s')
createError('NRPC_ERR_CLOSE', 'nanomessage-rpc was closed')
createError('NRPC_ERR_NOT_OPEN', 'nanomessage-rpc is not open')
createError('NRPC_ERR_ENCODE', 'error encoding the request: %s')
createError('NRPC_ERR_DECODE', 'error decoding the request: %s')
{
"name": "nanomessage-rpc",
"version": "1.2.1",
"version": "2.0.0",
"description": "Tiny rpc on top of nanomessage",

@@ -11,3 +11,3 @@ "main": "index.js",

"scripts": {
"start": "node index.js",
"benchmark": "node benchmark.js",
"test": "jest --passWithNoTests",

@@ -20,3 +20,2 @@ "posttest": "npm run lint",

"dependencies": {
"buffer-json-encoding": "^1.0.2",
"emittery": "^0.6.0",

@@ -26,4 +25,5 @@ "end-of-stream": "^1.4.4",

"nanoerror": "^1.1.0",
"nanomessage": "^5.3.0",
"nanoresource-promise": "^1.2.1"
"nanomessage": "^8.1.0",
"nanoresource-promise": "^2.0.0",
"varint": "^5.0.0"
},

@@ -33,6 +33,6 @@ "devDependencies": {

"@geut/xd": "^1.5.0",
"duplexify": "^4.1.1",
"jest": "^24.8.0",
"nanobench": "^2.1.1",
"standard": "^14.3.1",
"through2": "^3.0.1"
"streamx": "^2.6.4"
},

@@ -39,0 +39,0 @@ "jest": {

@@ -82,5 +82,5 @@ # nanomessage-rpc

- `timeout: 10 * 1000`: Time to wait for the response of a request.
- `concurrency: Infinity`: Defines how many requests do you want to run in concurrent.
- `codec: JSON`: Defines a [compatible codec](https://github.com/mafintosh/codecs) to encode/decode messages in nanomessage.
- `timeout: Infinity`: Time to wait for the response of a request.
- `concurrency: { incoming: 256, outgoing: 256 }`: Defines how many requests do you want to run in concurrent.
- `valueEncoding: buffer-json`: Defines an [abstract-encoding](https://github.com/mafintosh/abstract-encoding) to encode/decode messages in nanomessage.

@@ -87,0 +87,0 @@ #### `rpc.open() -> Promise`

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc