NATS.js - Node.js Client
A Node.js client for the NATS messaging system.
Installation
npm install nats
npm install nats@next
Basic Usage
const NATS = require('nats')
const nc = NATS.connect()
nc.publish('foo', 'Hello World!')
nc.subscribe('foo', (err, msg) => {
if (err) {
console.error(err)
return
}
console.log('Received a message: ' + msg)
})
const sub = nc.subscribe('foo', (err, m) => {})
sub.unsubscribe()
nc.subscribe('foo', (_, m) => {
if (m.reply) {
nc.publish(m.reply, 'got ' + m.data + ' on ' + m.subject + ' in subscription id ' + m.sid)
return
}
console.log('Received a message: ' + m.data + " it wasn't a request.")
})
nc.request('request', (_, m) => {
console.log('Got a response in msg stream: ' + m.data)
})
nc.request('help', (err, m) => {
if (err && err.code === NATS.REQ_TIMEOUT) {
console.log('request timed out')
} else if (err) {
console.error('request got error', err)
} else {
console.log('Got a response for help: ' + m.data)
}
}, null, { max: 1, timeout: 1000 })
nc.subscribe('help', (_, m) => {
nc.publish(m.reply, 'I can help!')
})
nc.close()
JSON
The json
connect property makes it easier to exchange JSON data with other
clients.
const nc = NATS.connect({ json: true })
nc.on('connect', () => {
nc.on('error', (err) => {
console.log(err)
})
nc.subscribe('greeting', (_, m) => {
if (m.data && m.data.name && m.reply) {
nc.publish(m.reply, { greeting: 'hello ' + m.data.name })
}
})
nc.subscribe('unsafe', (_, m) => {
if (Object.hasOwnProperty.call(m, 'toString')) {
console.log('tricky - trying to crash me:', m.toString)
return
}
if (isNaN(m.data.amount) === false) {
}
})
nc.publish('unsafe', { toString: 'no good' })
nc.flush(function () {
nc.close()
})
})
Wildcard Subscriptions
const nc = NATS.connect({ json: true })
nc.subscribe('foo.*.baz', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
nc.subscribe('foo.bar.*', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
nc.subscribe('foo.>', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
Queue Groups
let received = 0
nc.subscribe('foo', () => {
received++
}, { queue: 'job.workers' })
Clustered Usage
const servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222']
let nc = NATS.connect({ servers: servers })
nc.on('connect', () => {
console.log('Connected to ' + nc.currentServer.url.host)
})
nc = NATS.connect({ noRandomize: true, servers: servers })
Draining Connections and Subscriptions
let c1 = 0
const sub = nc.subscribe('foo', () => {
c1++
if (c1 === 1) {
sub.drain((err) => {
if (err) {
console.error(err)
return
}
console.log(`subscription ${sub.sid} drained`)
})
}
}, { queue: 'q1' })
let c2 = 0
nc.subscribe('foo', () => {
c2++
if (c2 === 1) {
nc.drain((err) => {
if (err) {
console.error('error draining', err)
return
}
console.log('connection drained')
})
}
}, { queue: 'q1' })
TLS
const NATS = require('nats')
const fs = require('fs')
let nc = NATS.connect({ tls: true })
let tlsOptions = {
rejectUnauthorized: false
}
nc = NATS.connect({ tls: tlsOptions })
tlsOptions = {
ca: [fs.readFileSync('./test/certs/ca.pem')]
}
nc = NATS.connect({ tls: tlsOptions })
tlsOptions = {
key: fs.readFileSync('./test/certs/client-key.pem'),
cert: fs.readFileSync('./test/certs/client-cert.pem'),
ca: [fs.readFileSync('./test/certs/ca.pem')]
}
nc = NATS.connect({ tls: tlsOptions })
Basic Authentication
let nc = NATS.connect('nats://foo:bar@localhost:4222')
nc = NATS.connect({ url: 'nats://localhost:4222', user: 'foo', pass: 'bar' })
nc = NATS.connect('nats://mytoken@localhost:4222')
nc = NATS.connect({ url: 'nats://localhost:4222', token: 'mytoken' })
New Authentication (Nkeys and User Credentials)
See examples for more usage.
const nkeys = require('ts-nkeys')
let nc = NATS.connect('connect.ngs.global', NATS.creds('./myid.creds'))
const jwt = 'eyJ0eXAiOiLN1...'
const nkeySeed = 'SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4'
const sk = nkeys.fromSeed(Buffer.from(nkeySeed))
nc = NATS.connect('nats://localhost:4222', {
nkey: 'UAH42UG6PV552P5SWLWTBP3H3S5BHAVCO2IEKEXUANJXR75J63RQ5WM6',
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
nc = NATS.connect({
userJWT: jwt,
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
nc = NATS.connect({
userJWT: function () {
return jwt
},
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
Advanced Usage
nc.flush((err) => {
if (err) {
console.error('error flushing', err)
return
}
console.log('round trip to the server done')
})
nc = NATS.connect({ port: 4222, yieldTime: 10 })
NATS.subscribe('foo', (err) => {
if (err && err.code === TIMEOUT_ERR) {
}
}, {timeout: 1000, expected: 1})
nc.subscribe('foo', () => {}, { max: 100 })
const sub = nc.subscribe('foo', () => {})
sub.unsubscribe(100)
NATS.connect({ encoding: 'ascii' })
NATS.connect({ preserveBuffers: true })
NATS.connect({ maxReconnectAttempts: -1, reconnectTimeWait: 250 })
Events
The nats client is an event emitter, you can listen to several kinds of events.
nc.on('error', (err) => {
console.log(err)
})
nc.on('connect', (nc) => {
console.log(`connect to ${nc.currentServer.url.host}`)
})
nc.on('disconnect', () => {
console.log('disconnect')
})
nc.on('reconnecting', () => {
console.log('reconnecting')
})
nc.on('reconnect', (nc) => {
console.log(`reconnect to ${nc.currentServer.url.host}`)
})
nc.on('close', () => {
console.log('close')
})
nc.on('unsubscribe', (sid, subject) => {
console.log('unsubscribed subscription', sid, 'for subject', subject)
})
nc.on('permission_error', (err) => {
console.error('got a permissions error', err.message)
})
See examples and benchmarks for more information.
Connect Options
The following is the list of connection options and default values.
Option | Default | Description |
---|
encoding | "utf8" | Encoding specified by the client to encode/decode data |
json | false | If true, message payloads are converted to/from JSON |
maxPingOut | 2 | Max number of pings the client will allow unanswered before raising a stale connection error |
maxReconnectAttempts | 10 | Sets the maximum number of reconnect attempts. The value of -1 specifies no limit |
name | | Optional client name |
nkey | `` | See NKeys/User Credentials |
noEcho | false | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code NO_ECHO_NOT_SUPPORTED is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature. |
noRandomize | false | If set, the order of user-specified servers is randomized. |
nonceSigner | `` | See NKeys/User Credentials. A function that takes a Buffer and returns a nkey signed signature. |
pass | | Sets the password for a connection |
pedantic | false | Turns on strict subject format checks |
pingInterval | 120000 | Number of milliseconds between client-sent pings |
preserveBuffers | false | If true, data for a message is returned as Buffer |
reconnectTimeWait | 2000 | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts |
reconnect | true | If false server will not attempt reconnecting |
servers | | Array of connection url s |
timeout | node default - no timeout | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a connection_timeout event with a NatsError that provides the hostport of the server where the connection was attempted. |
tls | false | This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. The value can also be an object specifying TLS certificate data. The properties ca , key , cert should contain the certificate file data. ca should be provided for self-signed certificates. key and cert are required for client provided certificates. rejectUnauthorized if true validates server's credentials |
token | | Sets a authorization token for a connection |
tokenHandler | | A function returning a token used for authentication. |
url | "nats://localhost:4222" | Connection url |
useOldRequestStyle | false | If set to true calls to request() and requestOne() will create an inbox subscription per call. |
user | | Sets the username for a connection |
userCreds | `` | See NKeys/User Credentials. Set with NATS.creds() . |
userJWT | `` | See NKeys/User Credentials. The property can be a JWT or a function that returns a JWT. |
verbose | false | Turns on +OK protocol acknowledgements |
waitOnFirstConnect | false | If true the server will fall back to a reconnect mode if it fails its first connection attempt. |
yieldTime | | If set, processing will yield at least the specified number of milliseconds to IO callbacks before processing inbound messages |
Tools
The examples, node-pub
, node-sub
, node-req
, node-reply
are now bound to bin
entries on the npm package.
You can use these while developing your own tools. After you install the nats
npm package, you'll need to add
a dependency on minimist
before you can use the tools:
npm install nats
npm install minimist
...
% npx node-sub hello &
[1] 9208
% Listening on [hello]
% npx node-pub hello world
Received "world"
Published [hello] : "world"
Supported Node Versions
Our support policy for Nodejs versions follows Nodejs release support.
We will support and build node-nats on even-numbered Nodejs versions that are current or in LTS.
Running Tests
To run the tests, you need to have a nats-server
executable in your path. Refer to the server installation guide in the NATS.io documentation. With that in place, you can run npm test
to run all tests.
License
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.