Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 2.0.0-27 to 2.0.0-201

examples/bench.js

4

index.js

@@ -15,4 +15,4 @@ /*

*/
'use strict'
"use strict";
module.exports = require('./lib/nats')
module.exports = require("./lib/src/mod");

@@ -5,7 +5,5 @@ # Maintainers

### Core-maintainers
### Maintainers
- Alberto Ricart <alberto@nats.io> [@aricart](https://github.com/aricart)
- Derek Collison <derek@nats.io> [@derekcollison](https://github.com/derekcollison)
### Maintainers
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic)
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic)
{
"name": "nats",
"version": "2.0.0-27",
"version": "2.0.0-201",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -29,15 +29,26 @@ "keywords": [

"contributors": [],
"main": "./index.js",
"main": "index.js",
"types": "index.d.ts",
"files": [
"lib/",
"examples/",
"OWNERS.md",
"CODE-OF-CONDUCT.md",
"LICENSE",
"MAINTAINERS.md",
"nats.d.ts"
],
"scripts": {
"depcheck": "dependency-check --no-dev package.json",
"depcheck:unused": "dependency-check package.json --no-dev --entry ./**/*.js",
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' nyc mocha --timeout 10000 --slow 750",
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:typescript && npm run test:unit",
"test:typescript": "./node_modules/typescript/bin/tsc --strict --noEmit tstest/main.ts",
"coveralls": "nyc report --reporter=text-lcov | coveralls",
"cover": "nyc report --reporter=html && open coverage/index.html",
"lint": "npm run lint:js && npm run lint:ts",
"lint:js": "standard './**/*.js'",
"lint:ts": "tslint -c tslint.json index.d.ts 'tstest/**/*.ts'",
"fmt": "standard --fix './**/*.js'"
"build": "tsc",
"cjs": "deno run --allow-all bin/cjs-fix-imports.ts -o nats-base-client/ ./.deps/nats.deno/nats-base-client/",
"clean": "rm -Rf ./lib/* ./nats-base-client ./.deps",
"clone-nbc": "mkdir -p ./.deps && cd ./.deps && git clone --branch=v1.0.0-3 https://github.com/nats-io/nats.deno.git",
"fmt": "deno fmt ./src/ ./examples/ ./test/",
"prepack": "npm run clone-nbc && npm run cjs && npm run build",
"ava": "nyc ava --verbose -T 60000",
"test": "npm run build && npm run ava",
"debug-test": "node node_modules/.bin/ava --verbose -T 6500000 --match",
"setup": "curl -fsSL https://deno.land/x/install/install.sh | sh",
"stage": "npm run clean && npm run clone-nbc && npm run cjs && rm -Rf ./deps/ && npm run build",
"cover:html": "nyc report --reporter=html && open coverage/index.html"
},

@@ -48,36 +59,51 @@ "engines": {

"dependencies": {
"nuid": "^1.1.4",
"ts-nkeys": "^1.0.16"
"nkeys.js": "^1.0.0-5"
},
"devDependencies": {
"@types/node": "^13.7.4",
"coveralls": "^3.0.11",
"dependency-check": "^4.1.0",
"eslint": "^6.8.0",
"@types/node": "^14.6.0",
"ava": "^3.11.1",
"minimist": "^1.2.5",
"mocha": "^7.0.1",
"mocha-lcov-reporter": "1.3.0",
"nyc": "^15.0.1",
"should": "^13.2.3",
"standard": "^14.3.3",
"tslint": "^6.1.0",
"typescript": "^3.8.3"
"nyc": "^15.1.0",
"tslint": "^6.1.3",
"typescript": "^3.9.7"
},
"typings": "./index.d.ts",
"ava": {
"failFast": false,
"files": [
"./test/**/*.js",
"!./test/helpers/**/*.js"
]
},
"nyc": {
"extension": [
".ts",
".js"
],
"include": [
"lib/**"
"src/**/*.ts",
"lib/src/**/*.js",
"nats-base-client/**/*.ts",
"lib/nats-base-client/**/*.js"
],
"exclude": [
"test/**",
"nats-base-client/bench.ts",
"nats-base-client/buffer.ts",
"nats-base-client/codec.ts",
"nats-base-client/databuffer.ts",
"nats-base-client/muxsubscription.ts",
"nats-base-client/nkeys.ts",
"nats-base-client/nuid.ts",
"nats-base-client/parser.ts",
"nats-base-client/queued_iterator.ts",
"nats-base-client/servers.ts",
"nats-base-client/transport.ts",
"nats-base-client/util.ts",
"lib/test/**",
"examples/**",
"benchmark/**"
]
},
"bin": {
"node-pub": "examples/node-pub",
"node-sub": "examples/node-sub",
"node-req": "examples/node-req",
"node-reply": "examples/node-reply"
],
"sourceMap": true,
"all": true
}
}

@@ -1,531 +0,534 @@

# NATS.js - Node.js Client
# NATS.js - A [NATS](http://nats.io) client for [Node.Js](https://nodejs.org/en/)
A [Node.js](http://nodejs.org/) client for the [NATS messaging system](https://nats.io).
[![license](https://img.shields.io/github/license/nats-io/node-nats.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![Build Status](https://travis-ci.org/nats-io/nats.js.svg?branch=master)](https://travis-ci.org/nats-io/nats.js)
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nats.js/badge.svg?branch=master)](https://coveralls.io/github/nats-io/nats.js?branch=master)
A Node.js client for the [NATS messaging system](https://nats.io).
[![License](https://img.shields.io/badge/Licence-Apache%202.0-blue.svg)](./LICENSE)
![NATS.js CI](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg)
[![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats)
[![JavaScript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com)
# Installation
## Installation
** :warning: NATS.js v2 is a preview** you can get the current development version by:
```bash
npm install nats
# to install current dev version:
npm install nats@next
npm install nats@v2'
```
The nats.js v2 client is under active development. All tests are passing.
## Basic Usage
The nats.js v2 client is not API compatible with previous versions of nats.js.
For a migration guide, please see [the migration guide](migration.md).
```javascript
const NATS = require('nats')
## Basics
const nc = NATS.connect()
// Simple publisher
nc.publish('foo', 'Hello World!')
### Connecting to a nats-server
// Simple Subscriber - error is set if there was some error
nc.subscribe('foo', (err, msg) => {
if (err) {
console.error(err)
return
}
console.log('Received a message: ' + msg)
})
To connect to a server you use the `connect()` function. It returns
a connection that you can use to interact with the server.
// Unsubscribing
const sub = nc.subscribe('foo', (err, m) => {
if(!err) {
console.log(m)
} else {
console.error(err)
}
})
sub.unsubscribe()
By default, a connection will attempt to auto-reconnect when dropped
due to some networking type error. Messages that have not been
sent to the server when a disconnect happens are lost. A client can queue
new messages to be sent when the connection resumes. If the connection
cannot be re-established the client will give up and `close` the connection.
To learn when a connection closes, wait for the promise returned by the `closed()`
function. If the close was due to an error, it will resolve to an error.
// Subscription/Request are given two arguments:
// - an error (undefined if no error)
// - a message object
// - the message has 4 properties:
// - data - the message payload (can be undefined)
// - subject - subject where the message was sent
// - reply - the reply subject if this is a request (can be undefined)
// - sid - the subscription id associated with the handler (same value as the return of subscribe())
nc.subscribe('foo', (_, m) => {
if (m.reply) {
nc.publish(m.reply, 'got ' + m.data + ' on ' + m.subject + ' in subscription id ' + m.sid)
return
}
console.log('Received a message: ' + m.data + " it wasn't a request.")
})
To disconnect from the nats-server, you call `close()` on the connection.
Connections can also be closed if there's an error. For example, the
server returns some run-time error.
// Request, creates a subscription to handle any replies to the request
// subject, and publishes the request with an optional payload. This usage
// allows you to collect responses from multiple services
nc.request('request', (_, m) => {
console.log('Got a response in msg stream: ' + m.data)
})
This first example shows basic options that you can provide to the connect
function, note that you can specify multiple servers, allowing the client
to cope with a server that is not available.
// Request with a max option will unsubscribe after
// the first max messages are received. You can also specify the number
// of milliseconds you are willing to wait for the response - when a timeout
// is specified, you can receive an error
nc.request('help', (err, m) => {
if (err && err.code === NATS.REQ_TIMEOUT) {
console.log('request timed out')
} else if (err) {
console.error('request got error', err)
} else {
console.log('Got a response for help: ' + m.data)
}
}, null, { max: 1, timeout: 1000 })
// Replies
nc.subscribe('help', (_, m) => {
nc.publish(m.reply, 'I can help!')
})
// Close connection
nc.close()
```javascript
const { connect } = require("nats");
[
{},
{ servers: ["demo.nats.io:4442", "demo.nats.io:4222"] },
{ servers: "demo.nats.io:4443" },
{ port: 4222 },
{ servers: "localhost" },
]
.forEach(async (v) => {
await connect(v)
.then((nc) => {
console.log(`connected to ${nc.getServer()}`);
nc.close();
})
.catch(() => {
console.log(`unable to connect to ${JSON.stringify(v)}`);
});
});
```
## JSON
### Publish and Subscribe
The basic client operations are to `subscribe` to receive messages,
and publish to `send` messages. A subscription works as an async
iterator where you process messages in a loop until the subscription
closes.
The `payload` connect property makes it easy to exchange JSON data with other
clients when set to `Payload.JSON`:
NATS is payload agnostic, this means that payloads are `Uint8Arrays`.
You can easily send JSON or strings by using a `StringCodec` or a
`JSONCodec`, or create a Codec of your own that handles the type
of your data as necessary.
```javascript
const nc = NATS.connect({ payload: Payload.JSON })
nc.on('connect', () => {
nc.on('error', (err) => {
console.log(err)
})
const { connect, StringCodec } = require("nats");
nc.subscribe('greeting', (_, m) => {
// msg is a parsed JSON object object
if (m.data && m.data.name && m.reply) {
nc.publish(m.reply, { greeting: 'hello ' + m.data.name })
}
})
(async () => {
// create a connection to a nats-server
const nc = await connect({ servers: "demo.nats.io" });
// As with all inputs from unknown sources, if you don't trust the data
// you should verify it prior to accessing it. While JSON is safe because
// it doesn't export functions, it is still possible for a client to
// cause issues to a downstream consumer that is not written carefully
nc.subscribe('unsafe', (_, m) => {
// for example a client could inject a bogus `toString` property
// which could cause your client to crash should you try to
// concatenation with the `+` like this:
// console.log("received", msg + "here");
// `TypeError: Cannot convert object to primitive value`
// Note that simple `console.log(msg)` is fine.
if (Object.hasOwnProperty.call(m, 'toString')) {
console.log('tricky - trying to crash me:', m.toString)
return
}
// create a codec to encode/decode payloads
const sc = StringCodec();
// of course this is no different than using a value that is
// expected in one format (say a number), but the client provides
// a string:
if (isNaN(m.data.amount) === false) {
// do something with the number
// create a simple subscriber and iterate over messages
// matching the subscription
const sub = nc.subscribe("hello");
(async () => {
for await (const m of sub) {
console.log(`[${sub.getProcessed()}]: ${sc.decode(m.data)}`);
}
// ...
})
})().then(() => {
console.log("subscription closed");
});
// the bad guy
nc.publish('unsafe', { toString: 'no good' })
// publish two messages to the nats-server
nc.publish("hello", sc.encode("world"));
nc.publish("hello", sc.encode("again"));
nc.flush(function () {
nc.close()
})
})
// we want to insure that messages that are in flight
// get processed, so we are going to drain the
// connection. Drain is the same as close, but makes
// sure that all messages in flight get seen
// by the iterator. After calling drain on the connection
// the connection closes.
await nc.drain();
console.log("connection ended");
})();
```
## Wildcard Subscriptions
### Streams
Streams are messages that are published at regular intervals.
To get the messages, you simply subscribe to them. To stop
getting the messages you unsubscribe.
```javascript
const nc = NATS.connect({ payload: Payload.JSON })
// "*" matches any token, at any level of the subject.
nc.subscribe('foo.*.baz', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
const { connect, JSONCodec } = require("nats");
nc.subscribe('foo.bar.*', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
(async () => {
// to create a connection to a nats-server:
const nc = await connect({ servers: "demo.nats.io" });
// ">" matches any length of the tail of a subject, and can only be
// the last token E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz',
// 'foo.foo.bar.bax.22'
nc.subscribe('foo.>', (_, m) => {
console.log('Msg received on [' + m.subject + '] : ' + m.data)
})
```
// create a codec
const jc = JSONCodec();
## Queue Groups
console.info("enter the following command to get messages from the stream");
console.info(
"node run examples/nats-sub.js -s demo.nats.io stream.demo",
);
```javascript
// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.
let received = 0
nc.subscribe('foo', () => {
received++
}, { queue: 'job.workers' })
const start = Date.now();
let sequence = 0;
setInterval(() => {
sequence++;
const uptime = Date.now() - start;
console.info(`publishing #${sequence}`);
nc.publish("stream.demo", jc.encode({ sequence, uptime }));
}, 1000);
})();
```
## Clustered Usage
```javascript
const servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222']
### Wildcard Subscriptions
Sometimes you want to process an event (message), based on the
subject that was used to send it. In NATS this is accomplished
by specifying wildcards in the subscription. Subjects that match
the wildcards, are sent to the client.
// Randomly connect to a server in the cluster group.
// Note that because `url` is not specified, the default connection is called first
// (nats://localhost:4222). If you don't want default connection, specify one of
// the above the above servers as `url`: `nats.connect(servers[0], {'servers': servers});`
let nc = NATS.connect({ servers: servers })
In the example below, I am using 3 different subscription
to highlight that each subscription is independent. And if
the subject you use matches one or more of them, they all
will get a chance at processing the message.
// currentServer is the URL of the connected server.
nc.on('connect', () => {
console.log('Connected to ' + nc.currentServer.url.host)
})
```javascript
const { connect, StringCodec } = require("nats");
// Preserve order when connecting to servers.
nc = NATS.connect({ noRandomize: true, servers: servers })
```
(async () => {
const nc = await connect({ servers: "demo.nats.io" });
const sc = StringCodec();
## Draining Connections and Subscriptions
// subscriptions can have wildcard subjects
// the '*' matches any string in the specified token position
const s1 = nc.subscribe("help.*.system");
const s2 = nc.subscribe("help.me.*");
// the '>' matches any tokens in that position or following
// '>' can only be specified at the end
const s3 = nc.subscribe("help.>");
```javascript
// Unsubscribing removes the subscription handler for a subscription
// and cancels the subscription. Any pending messages on the client's
// buffer are discarded.
//
// Draining is similar to unsubscribe, but the client instead
// sends the unsubscribe request followed by a flush. When the flush
// returns, the subscription handler is removed. Thus the client is
// able to process all messages sent by the server before the subscription
// handler is removed.
//
// Draining is particularly valuable with queue subscriptions preventing
// messages from being lost.
let c1 = 0
const sub = nc.subscribe('foo', () => {
c1++
if (c1 === 1) {
sub.drain((err) => {
if (err) {
console.error(err)
return
}
console.log(`subscription ${sub.sid} drained`)
})
async function printMsgs(s) {
let subj = s.getSubject();
console.log(`listening for ${subj}`);
const c = (13 - subj.length);
const pad = "".padEnd(c);
for await (const m of s) {
console.log(
`[${subj}]${pad} #${s.getProcessed()} - ${m.subject} ${
m.data ? " " + sc.decode(m.data) : ""
}`,
);
}
}
}, { queue: 'q1' })
// It is possible to drain a connection, draining a connection:
// - drains all subscriptions
// - after calling drain it is impossible to make subscriptions or requests
// - when all subscriptions are drained, it is impossible to publish
// messages and drained connection is closed.
// - finally, the callback handler is called (with possibly an error).
printMsgs(s1);
printMsgs(s2);
printMsgs(s3);
let c2 = 0
nc.subscribe('foo', () => {
c2++
if (c2 === 1) {
nc.drain((err) => {
if (err) {
console.error('error draining', err)
return
}
console.log('connection drained')
})
}
}, { queue: 'q1' })
// don't exit until the client closes
await nc.closed();
})();
```
## TLS
### Services
A service is a client that responds to requests from other clients.
Now that you know how to create subscriptions, and know about wildcards,
it is time to develop a service that mocks something useful.
This example is a bit complicated, because we are going to use NATS
not only to provide a service, but also to control the service.
```javascript
const NATS = require('nats')
const fs = require('fs')
const { connect, StringCodec } = require("nats");
// Simple TLS connect
let nc = NATS.connect({ tls: true })
(async () => {
// create a connection
const nc = await connect({ servers: "demo.nats.io" });
// Overriding and not verifying the server
let tlsOptions = {
rejectUnauthorized: false
}
nc = NATS.connect({ tls: tlsOptions })
// nc.stream.authorized will be false
// create a codec
const sc = StringCodec();
// Use a specified CA for self-signed server certificates
tlsOptions = {
ca: [fs.readFileSync('./test/certs/ca.pem')]
}
nc = NATS.connect({ tls: tlsOptions })
// nc.stream.authorized should be true
// A service is a subscriber that listens for messages, and responds
const started = Date.now();
const sub = nc.subscribe("time");
requestHandler(sub);
// Use a client certificate if the server requires
tlsOptions = {
key: fs.readFileSync('./test/certs/client-key.pem'),
cert: fs.readFileSync('./test/certs/client-cert.pem'),
ca: [fs.readFileSync('./test/certs/ca.pem')]
}
nc = NATS.connect({ tls: tlsOptions })
```
// If you wanted to manage a service - well NATS is awesome
// for just that - setup another subscription where admin
// messages can be sent
const msub = nc.subscribe("admin.*");
adminHandler(msub);
## Basic Authentication
```javascript
// Connect with username and password in the url
let nc = NATS.connect('nats://foo:bar@localhost:4222')
// wait for the client to close here.
await nc.closed().then((err) => {
let m = `connection to ${nc.getServer()} closed`;
if (err) {
m = `${m} with an error: ${err.message}`;
}
console.log(m);
});
// Connect with username and password inside object
nc = NATS.connect({ url: 'nats://localhost:4222', user: 'foo', pass: 'bar' })
// this implements the public service, and just prints
async function requestHandler(sub) {
console.log(`listening for ${sub.getSubject()} requests...`);
let serviced = 0;
for await (const m of sub) {
serviced++;
if (m.respond(sc.encode(new Date().toISOString()))) {
console.info(
`[${serviced}] handled ${m.data ? "- " + sc.decode(m.data) : ""}`,
);
} else {
console.log(`[${serviced}] ignored - no reply subject`);
}
}
}
// Connect with token in url
nc = NATS.connect('nats://mytoken@localhost:4222')
// this implements the admin service
async function adminHandler(sub) {
console.log(`listening for ${sub.getSubject()} requests [uptime | stop]`);
// Connect with token inside object
nc = NATS.connect({ url: 'nats://localhost:4222', token: 'mytoken' })
// it would be very good to verify the origin of the request
// before implementing something that allows your service to be managed.
// NATS can limit which client can send or receive on what subjects.
for await (const m of sub) {
const chunks = m.subject.split(".");
console.info(`[admin] handling ${chunks[1]}`);
switch (chunks[1]) {
case "uptime":
// send the number of millis since up
m.respond(sc.encode(`${Date.now() - started}`));
break;
case "stop":
m.respond(sc.encode("stopping...."));
// finish requests by draining the subscription
await sub.drain();
// close the connection
const _ = nc.close();
break;
default:
console.log(`ignoring request`);
}
}
}
})();
```
## New Authentication (Nkeys and User Credentials)
See examples for more usage.
### Making Requests
```javascript
const nkeys = require('ts-nkeys')
const { connect, StringCodec } = require("nats");
// Simple connect using credentials file. This loads JWT and signing key
// each time that NATS connects.
let nc = NATS.connect('connect.ngs.global', { credsFile: './myid.creds' })
(async () => {
// create a connection
const nc = await connect({ servers: "demo.nats.io:4222" });
// Manually, you need to specify the JWT, and seed and sign the challenge
const jwt = 'eyJ0eXAiOiLN1...'
const nkeySeed = 'SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4'
const sk = nkeys.fromSeed(Buffer.from(nkeySeed))
// create an encoder
const sc = StringCodec();
// Setting nkey and signing callback directly.
nc = NATS.connect('nats://localhost:4222', {
nkey: 'UAH42UG6PV552P5SWLWTBP3H3S5BHAVCO2IEKEXUANJXR75J63RQ5WM6',
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
// a client makes a request and receives a promise for a message
// by default the request times out after 1s (1000 millis) and has
// no payload.
await nc.request("time", sc.encode("hello!"), { timeout: 1000 })
.then((m) => {
console.log(`got response: ${sc.decode(m.data)}`);
})
.catch((err) => {
console.log(`problem with request: ${err.message}`);
});
// Setting user JWT statically.
nc = NATS.connect({
userJWT: jwt,
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
await nc.close();
})();
// Having user JWT be a function that returns the JWT. Can be useful for
// loading a new JWT.
nc = NATS.connect({
userJWT: function () {
return jwt
},
nonceSigner: function (nonce) {
return sk.sign(nonce)
}
})
```
## Advanced Usage
### Queue Groups
Queue groups allow scaling of services horizontally. Subscriptions for members of a
queue group are treated as a single service, that means when you send a message
only a single client in a queue group will receive it. There can be multiple queue
groups, and each is treated as an independent group. Non-queue subscriptions are
also independent.
```javascript
const {
connect,
StringCodec,
} = require("nats");
```javascript
// Flush connection to server, callback fires when all messages have
// been processed.
nc.flush((err) => {
if (err) {
console.error('error flushing', err)
return
(async () => {
async function createService(
name,
count = 1,
queue = ""
) {
const conns = [];
for (let i = 1; i <= count; i++) {
const n = queue ? `${name}-${i}` : name;
const nc = await connect(
{ servers: "demo.nats.io:4222", name: `${n}` },
);
nc.closed()
.then((err) => {
if (err) {
console.error(
`service ${n} exited because of error: ${err.message}`,
);
}
});
// create a subscription - note the option for a queue, if set
// any client with the same queue will be the queue group.
const sub = nc.subscribe("echo", { queue: queue });
const _ = handleRequest(n, sub);
console.log(`${nc.options.name} is listening for 'echo' requests...`);
conns.push(nc);
}
return conns;
}
console.log('round trip to the server done')
})
// If you want to make sure NATS yields during the processing
// of messages, you can use an option to specify a yieldTime in ms.
// During the processing of the inbound stream, NATS will yield if it
// spends more than yieldTime milliseconds processing.
nc = NATS.connect({ port: 4222, yieldTime: 10 })
const sc = StringCodec();
// Timeout a subscription unless a certain number of messages have been received
// When a subscription times out, it automatically cancels. You can specify more
// messages in the `expected` option. However that count of messages must be
// received before the timeout specified. If `expected` is not specified, it
// defaults to '1'.
NATS.subscribe('foo', (err) => {
if (err && err.code === TIMEOUT_ERR) {
// didn't get the message
// simple handler for service requests
async function handleRequest(name, sub) {
const p = 12 - name.length;
const pad = "".padEnd(p);
for await (const m of sub) {
// respond returns true if the message had a reply subject, thus it could respond
if (m.respond(m.data)) {
console.log(
`[${name}]:${pad} #${sub.getProcessed()} echoed ${sc.decode(m.data)}`,
);
} else {
console.log(
`[${name}]:${pad} #${sub.getProcessed()} ignoring request - no reply subject`,
);
}
}
}
// do something
}, {timeout: 1000, expected: 1})
// let's create two queue groups and a standalone subscriber
const conns = [];
conns.push(...await createService("echo", 3, "echo"));
conns.push(...await createService("other-echo", 2, "other-echo"));
conns.push(...await createService("standalone"));
// Auto-unsubscribe after max messages received
nc.subscribe('foo', () => {}, { max: 100 })
// or
const sub = nc.subscribe('foo', () => {})
sub.unsubscribe(100)
const a = [];
conns.forEach((c) => {
a.push(c.closed());
});
await Promise.all(a);
})();
```
## Advanced Usage
// Encodings
### Authentication
// By default messages received will be decoded using UTF8. To change that,
// set the encoding option on the connection.
Simple authentication just provides connection properties. The fundamental
mechanism for authentication relies on an ["authenticator"](index.d.ts).
Autheticators can be used for nkeys, and JWT authentication as well as
any other. In cases where you want to dynamically obtain credentials,
the authenticator is where you would provide this logic.
NATS.connect({ encoding: 'ascii' })
```typescript
// if the connection requires authentication, provide `user` and `pass` or
// `token` options in the NatsConnectionOptions.
import { connect } from "src/mod.ts";
// Binary Data
const nc1 = await connect({ port: ns.port, user: "jenny", pass: "867-5309" });
const nc2 = await connect({ port: ns.port, token: "s3cret!" });
// To prevent payload conversion from a Buffer to a string, set the
// `payload` option to `Payload.Binary`. Message payload return will be a Buffer.
// nkeys
const auth = nkeyAuthenticator(seed);
const nc3 = await connect({ port: ns.port, authenticator: auth });
```
NATS.connect({ payload: Payload.Binary })
### Flush
```javascript
// flush sends a PING request to the servers and returns a promise
// when the servers responds with a PONG. The flush guarantees that
// things you published have been delivered to the servers. Typically
// it is not necessary to use flush, but on tests it can be invaluable.
nc.publish('foo');
nc.publish('bar');
await nc.flush();
```
// Reconnect Attempts and Time between reconnects
// By default a NATS connection will try to reconnect to all servers 10 times
// waiting 2 seconds between the previous reconnect to the server. If the
// maximum number of retries is reached, the client will close the connection.
// To change the default behaviour specify the max number of connection
// attempts in `maxReconnectAttempts` (set to -1 to retry forever), and the
// time in milliseconds between reconnects in `reconnectTimeWait`.
NATS.connect({ maxReconnectAttempts: -1, reconnectTimeWait: 250 })
### Auto Unsubscribe
```javascript
// subscriptions can auto unsubscribe after a certain number of messages
nc.subscribe('foo', {max:10});
```
# Events
The nats client is an event emitter, you can listen to several kinds of events.
### Timeout Subscriptions
```javascript
// emitted whenever there's an error. if you don't implement at least
// the error handler, your program will crash if an error is emitted.
nc.on('error', (err) => {
console.log(err)
})
// create subscription with a timeout, if no message arrives
// within the timeout, the function running the iterator with
// reject - depending on how you code it, you may need a
// try/catch block.
// import the connect function
const { connect, ErrorCode } = require("nats");
// connect callback provides a reference to the connection as an argument
nc.on('connect', (nc) => {
console.log(`connect to ${nc.servers.getCurrent().url.host}`)
})
(async () => {
// to create a connection to a nats-server:
const nc = await connect({ servers: "demo.nats.io" });
// emitted whenever the client disconnects from a server
nc.on('disconnect', () => {
console.log('disconnect')
})
// create subscription with a timeout, if no message arrives
// within the timeout, the subscription throws a timeout error
const sub = nc.subscribe("hello", { timeout: 1000 });
(async () => {
for await (const m of sub) {
console.log(`got message #${sub.getProcessed()}`);
}
})().catch((err) => {
if (err.code === ErrorCode.TIMEOUT) {
console.log(`sub timed out!`);
} else {
console.log(`sub iterator got an error!`);
}
nc.close();
});
// emitted whenever the client is attempting to reconnect
nc.on('reconnecting', () => {
console.log('reconnecting')
})
await nc.closed();
})();
```
// emitted whenever the client reconnects
// reconnect callback provides a reference to the connection as an argument
nc.on('reconnect', (nc) => {
console.log(`reconnect to ${nc.servers.getCurrent().url.host}`)
})
### Async vs. Callbacks
// emitted when the connection is closed - once a connection is closed
// the client has to create a new connection.
nc.on('close', () => {
console.log('close')
})
Previous versions of the JavaScript NATS clients specified callbacks
for message processing. This required complex handling logic when a
service required coordination of operations. Callbacks are an
inversion of control anti-pattern.
// emitted whenever the client unsubscribes
nc.on('unsubscribe', (sid, subject) => {
console.log('unsubscribed subscription', sid, 'for subject', subject)
})
The async APIs trivialize complex coordination and makes your code
easier to maintain. With that said, there are some implications:
// emitted whenever the server returns a permission error for
// a publish for the current user. This sort of error
// means that the client cannot publish/request
// on the specific subject. Note that subscription permission
// errors are delivered to the subscription's handler
nc.on(pubError, (err) => {
console.error('got a permissions error', err.message)
})
```
- Async subscriptions buffer inbound messages.
- Subscription processing delays until the runtime executes the promise related
microtasks at the end of an event loop.
See examples and benchmarks for more information.
In a traditional callback based library, I/O happens after all data yielded by
a read in the current event loop completes processing. This means that
callbacks are invoked as part of processing. With async, processing is queued
up in a microtask queue. At the end of the event loop, the runtime processes
the microtasks, which in turn resumes your functions. As expected, this
increases latency, but also provides additional liveliness.
## Connect Options
To reduce async latency, the NATS client allows processing a subscription
in the same event loop that dispatched the message. Simply specify a `callback`
in the subscription options. The signature for a callback is
`(err: (Error|null), msg: Msg) => void`. When specified, the subscription
iterator will never yield a message.
The following is the list of connection options and default values.
Note that `callback` likely shouldn't even be documented, as likely it
is a workaround to an underlying application problem where you should be
considering a different strategy to horizontally scale your application,
or reduce pressure on the clients, such as using queue workers,
or more explicitly targeting messages.
| Option | Default | Description
|-------- |--------- |------------
| `credsFile` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials).
| `encoding` | `"utf8"` | Encoding specified by the client to encode/decode data
| `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error
| `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit
| `name` | | Optional client name
| `nkey` | `` | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials)
| `noEcho` | `false` | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code `NO_ECHO_NOT_SUPPORTED` is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature.
| `noMuxRequests` | `false` | If set to `true` calls to `request()` will create an inbox subscription per call.
| `noRandomize` | `false` | If set, the order of user-specified servers is randomized.
| `nonceSigner` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials). A function that takes a `Buffer` and returns a nkey signed signature.
| `pass` | | Sets the password for a connection
| `pedantic` | `false` | Turns on strict subject format checks
| `pingInterval` | `120000` | Number of milliseconds between client-sent pings
| `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts
| `reconnect` | `true` | If false server will not attempt reconnecting
| `servers` | | Array of connection `url`s
| `timeout` | node default - no timeout | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted.
| `tls` | `false` | This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. The value can also be an object specifying TLS certificate data. The properties `ca`, `key`, `cert` should contain the certificate file data. `ca` should be provided for self-signed certificates. `key` and `cert` are required for client provided certificates. `rejectUnauthorized` if `true` validates server's credentials
| `tokenHandler` | | A function returning a `token` used for authentication.
| `token` | | Sets a authorization token for a connection
| `url` | `"nats://localhost:4222"` | Connection url
| `userJWT` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials). The property can be a JWT or a function that returns a JWT.
| `user` | | Sets the username for a connection
| `verbose` | `false` | Turns on `+OK` protocol acknowledgements
| `waitOnFirstConnect` | `false` | If `true` the server will fall back to a reconnect mode if it fails its first connection attempt.
| `yieldTime` | | If set, processing will yield at least the specified number of milliseconds to IO callbacks before processing inbound messages
## Tools
### Lifecycle/Informational Events
Clients can get notification on various event types:
- `Events.DISCONNECT`
- `Events.RECONNECT`
- `Events.UPDATE`
- `Events.LDM`
The examples, `node-pub`, `node-sub`, `node-req`, `node-reply` are now bound to `bin` entries on the npm package.
You can use these while developing your own tools. After you install the `nats` npm package, you'll need to add
a dependency on `minimist` before you can use the tools:
The first two fire when a client disconnects and reconnects respectively.
The payload will be the server where the event took place.
```bash
npm install nats
npm install minimist
...
% npx node-sub hello &
[1] 9208
% Listening on [hello]
% npx node-pub hello world
Received "world"
Published [hello] : "world"
```
The `UPDATE` event notifies whenever the client receives a cluster configuration
update. The `ServersChanged` interface provides two arrays: `added` and `deleted`
listing the servers that were added or removed.
## Supported Node Versions
The `LDM` event notifies that the current server has signaled that it
is running in _Lame Duck Mode_ and will evict clients. Depending on the server
configuration policy, the client may want to initiate an ordered shutdown, and
initiate a new connection to a different server in the cluster.
Our support policy for Nodejs versions follows [Nodejs release support]( https://github.com/nodejs/Release).
We will support and build node-nats on even-numbered Nodejs versions that are current or in LTS.
```javascript
const nc = await connect();
(async () => {
console.info(`connected ${nc.getServer()}`);
for await (const s of nc.status()) {
console.info(`${s.type}: ${s.data}`);
}
})().then();
nc.closed()
.then((err) => {
console.log(
`connection closed ${err ? " with error: " + err.message : ""}`,
);
});
```
## Running Tests
To run the tests, you need to have a `nats-server` executable in your path. Refer to the [server installation guide](https://nats-io.github.io/docs/nats_server/installation.html) in the NATS.io documentation. With that in place, you can run `npm test` to run all tests.
## License
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.
To be aware of when a client closes, wait for the `closed()` promise to resolve.
When it resolves, the client has finished and won't reconnect.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc