nanomessage (aka nm)
Simple module that helps you to build a request-response
abstraction on top of any other solution (e.g. streams).
Install
$ npm install nanomessage
Usage
import WebSocket from 'ws'
import { Nanomessage } from 'nanomessage'
const server = new WebSocket.Server({ port: 3000 })
server.on('connection', function connection (ws) {
const nm = new Nanomessage({
subscribe (ondata) {
ws.on('message', ondata)
},
send (msg) {
ws.send(msg)
},
onMessage (msg, opts) {
console.log(msg)
return 'pong from Alice'
}
})
nm.open().catch(err => console.error(err))
})
const ws = new WebSocket('ws://127.0.0.1:3000')
const Bob = new Nanomessage({
async open() {
if (ws.readyState === 0) {
await new Promise(resolve => ws.once('open', resolve))
}
},
subscribe (ondata) {
ws.on('message', ondata)
},
send (msg) {
ws.send(msg)
}
})
;(async () => {
await Bob.open()
console.log(await Bob.request('ping from Bob'))
})()
API
const nm = new Nanomessage(options)
Create a new nanomessage.
Options include:
send: (chunk: Buffer, info: Object) => (Promise|undefined)
: Defines how to send the messages provide it by nanomessage to the low level solution.subscribe: (onData: buf => Promise) => UnsubscribeFunction
: Defines how to read data from the low level solution.onMessage: (msg: *, info: Object) => Promise<Response>
: Async handler to process the incoming requests.open: () => Promise
: Defines a function to run before the nanomessage instance is opened.close: () => Promise
: Defines a function to run after the nanomessage instance was close.timeout: null
: Time to wait for the response of a request. Disabled by default.concurrency: { incoming: 256, outgoing: 256 }
: Defines how many requests do you want to run (outgoing) and process (incoming) in concurrent.valueEncoding: buffer-json
: Defines a compatible codec to encode/decode messages in nanomessage.
info
is an object with:
info.id: Number
: Incremental ID request.info.data: *
: Plain data to send.info.ephemeral: boolean
: It's true if the message is ephemeral.info.response: boolean
: It's true if the message is a response.info.responseData: *
: Plain data to response.
You can also extend from this prototype if you prefer:
const { Nanomessage } = require('nanomessage')
class CustomNanomessage exports Nanomessage {
constructor (...args) {
super(...args)
}
_subscribe (onData) {}
async _send (chunk, info) {}
async _onMessage (msg, info) {}
async _open() {
await super._open()
}
async _close () {
await super._close()
}
}
nm.requests: Array<Requests>
Get the current list of requests (inflight and pending).
nm.inflightRequests: Number
Number of requests processing in the queue.
nm.requestTimeout: Number
Get the current request timeout.
nm.concurrency: { incoming: Number, outgoing: Number }
Get the current concurrency.
nm.setRequestsTimeout(Number)
Change the timeout for the future requests.
nm.setConcurrency(Number | { incoming: Number, outgoing: Number })
Update the concurrency number of operations for incoming and outgoing requests.
nm.open() => Promise
Opens nanomessage and start listening for incoming data.
nm.close() => Promise
Closes nanomessage and unsubscribe from incoming data.
nm.request(data, [opts]) => Promise<Response>
Send a request and wait for a response. data
can be any serializable type supported by your codec.
opts.timeout: number
: Define a custom timeout for the current request.opts.signal: AbortSignal
: Set an abort signal object to cancel the request.
nm.send(data) => Promise<Response>
Send a ephemeral
message. data
can be any serializable type supported by your codec.
nm.processIncomingMessage(buf: Buffer) => Promise
Access directly to the handler of incoming messages. It's recommended to use the subscription model instead.
nm.setMessageHandler(handler) => Nanomessage
Defines a request handler. It will override the old handler.
Issues
:bug: If you found an issue we encourage you to report it on github. Please specify your OS and the actions to reproduce it.
Contributing
:busts_in_silhouette: Ideas and contributions to the project are welcome. You must follow this guideline.
License
MIT © A GEUT project