nanomessage-rpc
Advanced tools
Comparing version 1.2.1 to 2.0.0
@@ -9,2 +9,4 @@ # Changelog | ||
## 2.0.0 - 2020-07-21 [YANKED] | ||
## 1.2.1 - 2020-03-27 [YANKED] | ||
@@ -25,2 +27,2 @@ | ||
## 1.0.0 - 2020-03-11 [YANKED] | ||
[Unreleased]: https://github.com/geut/nanomessage-rpc/compare/v1.2.1...HEAD | ||
[Unreleased]: https://github.com/geut/nanomessage-rpc/compare/v2.0.0...HEAD |
135
index.js
@@ -0,16 +1,27 @@ | ||
const { EventEmitter } = require('events') | ||
const Emittery = require('emittery') | ||
const eos = require('end-of-stream') | ||
const jsonCodec = require('buffer-json-encoding') | ||
const nanomessage = require('nanomessage') | ||
const assert = require('nanocustomassert') | ||
const { NanoresourcePromise } = require('nanoresource-promise') | ||
const { encodeError, decodeError, ERR_ACTION_NAME_MISSING, ERR_ACTION_RESPONSE_ERROR } = require('./lib/errors') | ||
const kNanomessage = Symbol('rpc.nanomessage') | ||
const kOnmessage = Symbol('rpc.onmessage') | ||
const kSend = Symbol('rpc.send') | ||
const kSubscribe = Symbol('rpc.subscribe') | ||
const kActions = Symbol('rpc.actions') | ||
const kEmittery = Symbol('rpc.emittery') | ||
const Codec = require('./lib/codec') | ||
const { | ||
encodeError, | ||
decodeError, | ||
NRPC_ERR_ACTION_NAME_MISSING, | ||
NRPC_ERR_ACTION_RESPONSE_ERROR, | ||
NRPC_ERR_CLOSE, | ||
NRPC_ERR_NOT_OPEN | ||
} = require('./lib/errors') | ||
const kNanomessage = Symbol('nrpc.nanomessage') | ||
const kOnmessage = Symbol('nrpc.onmessage') | ||
const kSend = Symbol('nrpc.send') | ||
const kSubscribe = Symbol('nrpc.subscribe') | ||
const kActions = Symbol('nrpc.actions') | ||
const kEmittery = Symbol('nrpc.emittery') | ||
const kOnCloseDestroyStream = Symbol('nrpc.onclosedestroystream') | ||
const kFastCheckOpen = Symbol('nrpc.fastcheckopen') | ||
class NanomessageRPC extends NanoresourcePromise { | ||
@@ -20,8 +31,10 @@ constructor (socket, opts = {}) { | ||
const { codec = jsonCodec } = opts | ||
const { onCloseDestroyStream = true, onError = () => {}, valueEncoding, ...nanomessageOpts } = opts | ||
this.socket = socket | ||
this.ee = new EventEmitter() | ||
this[kOnCloseDestroyStream] = onCloseDestroyStream | ||
this[kNanomessage] = nanomessage({ | ||
codec, | ||
...opts, | ||
...nanomessageOpts, | ||
valueEncoding: new Codec(valueEncoding), | ||
onMessage: this[kOnmessage].bind(this), | ||
@@ -34,11 +47,43 @@ send: this[kSend].bind(this), | ||
this._onError = onError | ||
this.ee.on('error', err => { | ||
this._onError(err) | ||
}) | ||
eos(socket, () => { | ||
this.close().catch(err => { | ||
process.nextTick(() => { | ||
throw err | ||
}) | ||
}) | ||
this | ||
.close() | ||
.catch(err => process.nextTick(() => this.ee.emit('error', err))) | ||
}) | ||
} | ||
get requests () { | ||
return this[kNanomessage].requests | ||
} | ||
get inflightRequests () { | ||
return this[kNanomessage].inflightRequests | ||
} | ||
get requestTimeout () { | ||
return this[kNanomessage].timeout | ||
} | ||
get concurrency () { | ||
return this[kNanomessage].concurrency | ||
} | ||
setRequestsTimeout (timeout) { | ||
this[kNanomessage].setRequestsTimeout(timeout) | ||
} | ||
setConcurrency (concurrency) { | ||
this[kNanomessage].setConcurrency(concurrency) | ||
} | ||
onError (cb) { | ||
this._onError = cb | ||
} | ||
action (name, handler) { | ||
@@ -55,11 +100,12 @@ this[kActions].set(name, handler) | ||
call (name, data) { | ||
const request = this[kNanomessage].request({ action: name, data }) | ||
const request = this[kNanomessage].request({ name, data }) | ||
const cancel = request.cancel | ||
const promise = this.open() | ||
const promise = this[kFastCheckOpen]() | ||
.then(() => request) | ||
.then((result) => { | ||
if (result.err) { | ||
const ErrorDecoded = decodeError(result.code, result.unformatMessage) | ||
throw new ErrorDecoded(...result.args) | ||
if (result.error) { | ||
const err = result.data | ||
const ErrorDecoded = decodeError(err.code, err.unformatMessage) | ||
throw new ErrorDecoded(...err.args) | ||
} else { | ||
@@ -74,12 +120,7 @@ return result.data | ||
async emit (name, data) { | ||
emit (name, data) { | ||
assert(typeof name === 'string', 'name must be a valid string') | ||
await this.open() | ||
if (name.startsWith('rpc-')) { | ||
return this[kEmittery].emit(name, data) | ||
} else { | ||
return this[kNanomessage].send({ event: name, data }) | ||
} | ||
return this[kFastCheckOpen]() | ||
.then(() => this[kNanomessage].send({ name, data, event: true })) | ||
} | ||
@@ -103,4 +144,5 @@ | ||
_open () { | ||
return this[kNanomessage].open() | ||
async _open () { | ||
await this[kNanomessage].open() | ||
this.ee.emit('opened') | ||
} | ||
@@ -111,3 +153,3 @@ | ||
new Promise(resolve => { | ||
if (this.socket.destroyed) return resolve() | ||
if (this.socket.destroyed || !this[kOnCloseDestroyStream]) return resolve() | ||
eos(this.socket, () => resolve()) | ||
@@ -118,12 +160,17 @@ this.socket.destroy() | ||
]) | ||
return this[kEmittery].emit('rpc-closed') | ||
this.ee.emit('closed') | ||
} | ||
async [kFastCheckOpen] () { | ||
if (this.closed || this.closing) throw new NRPC_ERR_CLOSE() | ||
if (this.opening) return this.open() | ||
if (!this.opened) throw new NRPC_ERR_NOT_OPEN() | ||
} | ||
[kSubscribe] (ondata) { | ||
const reader = async (data) => { | ||
const reader = (data) => { | ||
try { | ||
await this[kEmittery].emit('rpc-data', data) | ||
await ondata(data) | ||
ondata(data) | ||
} catch (err) { | ||
await this[kEmittery].emit('rpc-error', err) | ||
process.nextTick(() => this.ee.emit('error', err)) | ||
} | ||
@@ -143,10 +190,12 @@ } | ||
if (message.event) { | ||
await this[kEmittery].emit(message.event, message.data) | ||
await this[kEmittery].emit(message.name, message.data).catch((err) => { | ||
process.nextTick(() => this.ee.emit('error', err)) | ||
}) | ||
return | ||
} | ||
const action = this[kActions].get(message.action) | ||
const action = this[kActions].get(message.name) | ||
if (!action) { | ||
return encodeError(new ERR_ACTION_NAME_MISSING(message.action)) | ||
return encodeError(new NRPC_ERR_ACTION_NAME_MISSING(message.name)) | ||
} | ||
@@ -156,3 +205,3 @@ | ||
const result = await action(message.data) | ||
return { action: message.action, data: result } | ||
return { data: result } | ||
} catch (err) { | ||
@@ -162,3 +211,3 @@ if (err.isNanoerror) { | ||
} | ||
return encodeError(new ERR_ACTION_RESPONSE_ERROR(err.message)) | ||
return encodeError(new NRPC_ERR_ACTION_RESPONSE_ERROR(err.message)) | ||
} | ||
@@ -170,2 +219,2 @@ } | ||
module.exports.NanomessageRPC = NanomessageRPC | ||
module.exports.errors = { ERR_ACTION_NAME_MISSING, ERR_ACTION_RESPONSE_ERROR } | ||
module.exports.errors = { NRPC_ERR_ACTION_NAME_MISSING, NRPC_ERR_ACTION_RESPONSE_ERROR, NRPC_ERR_CLOSE, NRPC_ERR_NOT_OPEN } |
@@ -13,3 +13,3 @@ const nanoerror = require('nanoerror') | ||
function encodeError (err) { | ||
return { err: true, code: err.code, unformatMessage: err.unformatMessage, args: err.args } | ||
return { error: true, data: { code: err.code, unformatMessage: err.unformatMessage, args: err.args } } | ||
} | ||
@@ -27,3 +27,7 @@ | ||
createError('ERR_ACTION_NAME_MISSING', 'missing action handler for: %s') | ||
createError('ERR_ACTION_RESPONSE_ERROR', '%s') | ||
createError('NRPC_ERR_ACTION_NAME_MISSING', 'missing action handler for: %s') | ||
createError('NRPC_ERR_ACTION_RESPONSE_ERROR', '%s') | ||
createError('NRPC_ERR_CLOSE', 'nanomessage-rpc was closed') | ||
createError('NRPC_ERR_NOT_OPEN', 'nanomessage-rpc is not open') | ||
createError('NRPC_ERR_ENCODE', 'error encoding the request: %s') | ||
createError('NRPC_ERR_DECODE', 'error decoding the request: %s') |
{ | ||
"name": "nanomessage-rpc", | ||
"version": "1.2.1", | ||
"version": "2.0.0", | ||
"description": "Tiny rpc on top of nanomessage", | ||
@@ -11,3 +11,3 @@ "main": "index.js", | ||
"scripts": { | ||
"start": "node index.js", | ||
"benchmark": "node benchmark.js", | ||
"test": "jest --passWithNoTests", | ||
@@ -20,3 +20,2 @@ "posttest": "npm run lint", | ||
"dependencies": { | ||
"buffer-json-encoding": "^1.0.2", | ||
"emittery": "^0.6.0", | ||
@@ -26,4 +25,5 @@ "end-of-stream": "^1.4.4", | ||
"nanoerror": "^1.1.0", | ||
"nanomessage": "^5.3.0", | ||
"nanoresource-promise": "^1.2.1" | ||
"nanomessage": "^8.1.0", | ||
"nanoresource-promise": "^2.0.0", | ||
"varint": "^5.0.0" | ||
}, | ||
@@ -33,6 +33,6 @@ "devDependencies": { | ||
"@geut/xd": "^1.5.0", | ||
"duplexify": "^4.1.1", | ||
"jest": "^24.8.0", | ||
"nanobench": "^2.1.1", | ||
"standard": "^14.3.1", | ||
"through2": "^3.0.1" | ||
"streamx": "^2.6.4" | ||
}, | ||
@@ -39,0 +39,0 @@ "jest": { |
@@ -82,5 +82,5 @@ # nanomessage-rpc | ||
- `timeout: 10 * 1000`: Time to wait for the response of a request. | ||
- `concurrency: Infinity`: Defines how many requests do you want to run in concurrent. | ||
- `codec: JSON`: Defines a [compatible codec](https://github.com/mafintosh/codecs) to encode/decode messages in nanomessage. | ||
- `timeout: Infinity`: Time to wait for the response of a request. | ||
- `concurrency: { incoming: 256, outgoing: 256 }`: Defines how many requests do you want to run in concurrent. | ||
- `valueEncoding: buffer-json`: Defines an [abstract-encoding](https://github.com/mafintosh/abstract-encoding) to encode/decode messages in nanomessage. | ||
@@ -87,0 +87,0 @@ #### `rpc.open() -> Promise` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
17843
7
342
1
+ Addedvarint@^5.0.0
+ Addedfastq@1.17.1(transitive)
+ Addednanomessage@8.4.0(transitive)
+ Addedreusify@1.0.4(transitive)
+ Addedvarint@5.0.2(transitive)
- Removedbuffer-json-encoding@^1.0.2
- Removedbuffer-json@2.0.0(transitive)
- Removedbuffer-json-encoding@1.0.2(transitive)
- Removedeventemitter3@4.0.7(transitive)
- Removedhyperid@2.3.1(transitive)
- Removedinherits@2.0.4(transitive)
- Removednanomessage@5.4.0(transitive)
- Removednanoresource@1.3.0(transitive)
- Removednanoresource-promise@1.2.2(transitive)
- Removedp-finally@1.0.0(transitive)
- Removedp-queue@6.6.2(transitive)
- Removedp-timeout@3.2.0(transitive)
- Removeduuid@8.3.2(transitive)
- Removeduuid-parse@1.1.0(transitive)
Updatednanomessage@^8.1.0
Updatednanoresource-promise@^2.0.0