Comparing version 2.0.0-27 to 2.0.0-201
@@ -15,4 +15,4 @@ /* | ||
*/ | ||
'use strict' | ||
"use strict"; | ||
module.exports = require('./lib/nats') | ||
module.exports = require("./lib/src/mod"); |
@@ -5,7 +5,5 @@ # Maintainers | ||
### Core-maintainers | ||
### Maintainers | ||
- Alberto Ricart <alberto@nats.io> [@aricart](https://github.com/aricart) | ||
- Derek Collison <derek@nats.io> [@derekcollison](https://github.com/derekcollison) | ||
### Maintainers | ||
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic) | ||
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic) |
{ | ||
"name": "nats", | ||
"version": "2.0.0-27", | ||
"version": "2.0.0-201", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -29,15 +29,26 @@ "keywords": [ | ||
"contributors": [], | ||
"main": "./index.js", | ||
"main": "index.js", | ||
"types": "index.d.ts", | ||
"files": [ | ||
"lib/", | ||
"examples/", | ||
"OWNERS.md", | ||
"CODE-OF-CONDUCT.md", | ||
"LICENSE", | ||
"MAINTAINERS.md", | ||
"nats.d.ts" | ||
], | ||
"scripts": { | ||
"depcheck": "dependency-check --no-dev package.json", | ||
"depcheck:unused": "dependency-check package.json --no-dev --entry ./**/*.js", | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' nyc mocha --timeout 10000 --slow 750", | ||
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:typescript && npm run test:unit", | ||
"test:typescript": "./node_modules/typescript/bin/tsc --strict --noEmit tstest/main.ts", | ||
"coveralls": "nyc report --reporter=text-lcov | coveralls", | ||
"cover": "nyc report --reporter=html && open coverage/index.html", | ||
"lint": "npm run lint:js && npm run lint:ts", | ||
"lint:js": "standard './**/*.js'", | ||
"lint:ts": "tslint -c tslint.json index.d.ts 'tstest/**/*.ts'", | ||
"fmt": "standard --fix './**/*.js'" | ||
"build": "tsc", | ||
"cjs": "deno run --allow-all bin/cjs-fix-imports.ts -o nats-base-client/ ./.deps/nats.deno/nats-base-client/", | ||
"clean": "rm -Rf ./lib/* ./nats-base-client ./.deps", | ||
"clone-nbc": "mkdir -p ./.deps && cd ./.deps && git clone --branch=v1.0.0-3 https://github.com/nats-io/nats.deno.git", | ||
"fmt": "deno fmt ./src/ ./examples/ ./test/", | ||
"prepack": "npm run clone-nbc && npm run cjs && npm run build", | ||
"ava": "nyc ava --verbose -T 60000", | ||
"test": "npm run build && npm run ava", | ||
"debug-test": "node node_modules/.bin/ava --verbose -T 6500000 --match", | ||
"setup": "curl -fsSL https://deno.land/x/install/install.sh | sh", | ||
"stage": "npm run clean && npm run clone-nbc && npm run cjs && rm -Rf ./deps/ && npm run build", | ||
"cover:html": "nyc report --reporter=html && open coverage/index.html" | ||
}, | ||
@@ -48,36 +59,51 @@ "engines": { | ||
"dependencies": { | ||
"nuid": "^1.1.4", | ||
"ts-nkeys": "^1.0.16" | ||
"nkeys.js": "^1.0.0-5" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^13.7.4", | ||
"coveralls": "^3.0.11", | ||
"dependency-check": "^4.1.0", | ||
"eslint": "^6.8.0", | ||
"@types/node": "^14.6.0", | ||
"ava": "^3.11.1", | ||
"minimist": "^1.2.5", | ||
"mocha": "^7.0.1", | ||
"mocha-lcov-reporter": "1.3.0", | ||
"nyc": "^15.0.1", | ||
"should": "^13.2.3", | ||
"standard": "^14.3.3", | ||
"tslint": "^6.1.0", | ||
"typescript": "^3.8.3" | ||
"nyc": "^15.1.0", | ||
"tslint": "^6.1.3", | ||
"typescript": "^3.9.7" | ||
}, | ||
"typings": "./index.d.ts", | ||
"ava": { | ||
"failFast": false, | ||
"files": [ | ||
"./test/**/*.js", | ||
"!./test/helpers/**/*.js" | ||
] | ||
}, | ||
"nyc": { | ||
"extension": [ | ||
".ts", | ||
".js" | ||
], | ||
"include": [ | ||
"lib/**" | ||
"src/**/*.ts", | ||
"lib/src/**/*.js", | ||
"nats-base-client/**/*.ts", | ||
"lib/nats-base-client/**/*.js" | ||
], | ||
"exclude": [ | ||
"test/**", | ||
"nats-base-client/bench.ts", | ||
"nats-base-client/buffer.ts", | ||
"nats-base-client/codec.ts", | ||
"nats-base-client/databuffer.ts", | ||
"nats-base-client/muxsubscription.ts", | ||
"nats-base-client/nkeys.ts", | ||
"nats-base-client/nuid.ts", | ||
"nats-base-client/parser.ts", | ||
"nats-base-client/queued_iterator.ts", | ||
"nats-base-client/servers.ts", | ||
"nats-base-client/transport.ts", | ||
"nats-base-client/util.ts", | ||
"lib/test/**", | ||
"examples/**", | ||
"benchmark/**" | ||
] | ||
}, | ||
"bin": { | ||
"node-pub": "examples/node-pub", | ||
"node-sub": "examples/node-sub", | ||
"node-req": "examples/node-req", | ||
"node-reply": "examples/node-reply" | ||
], | ||
"sourceMap": true, | ||
"all": true | ||
} | ||
} |
847
README.md
@@ -1,531 +0,534 @@ | ||
# NATS.js - Node.js Client | ||
# NATS.js - A [NATS](http://nats.io) client for [Node.Js](https://nodejs.org/en/) | ||
A [Node.js](http://nodejs.org/) client for the [NATS messaging system](https://nats.io). | ||
[![license](https://img.shields.io/github/license/nats-io/node-nats.svg)](https://www.apache.org/licenses/LICENSE-2.0) | ||
[![Build Status](https://travis-ci.org/nats-io/nats.js.svg?branch=master)](https://travis-ci.org/nats-io/nats.js) | ||
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nats.js/badge.svg?branch=master)](https://coveralls.io/github/nats-io/nats.js?branch=master) | ||
A Node.js client for the [NATS messaging system](https://nats.io). | ||
[![License](https://img.shields.io/badge/Licence-Apache%202.0-blue.svg)](./LICENSE) | ||
![NATS.js CI](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg) | ||
[![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats) | ||
[![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats) | ||
[![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats) | ||
[![JavaScript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com) | ||
# Installation | ||
## Installation | ||
** :warning: NATS.js v2 is a preview** you can get the current development version by: | ||
```bash | ||
npm install nats | ||
# to install current dev version: | ||
npm install nats@next | ||
npm install nats@v2' | ||
``` | ||
The nats.js v2 client is under active development. All tests are passing. | ||
## Basic Usage | ||
The nats.js v2 client is not API compatible with previous versions of nats.js. | ||
For a migration guide, please see [the migration guide](migration.md). | ||
```javascript | ||
const NATS = require('nats') | ||
## Basics | ||
const nc = NATS.connect() | ||
// Simple publisher | ||
nc.publish('foo', 'Hello World!') | ||
### Connecting to a nats-server | ||
// Simple Subscriber - error is set if there was some error | ||
nc.subscribe('foo', (err, msg) => { | ||
if (err) { | ||
console.error(err) | ||
return | ||
} | ||
console.log('Received a message: ' + msg) | ||
}) | ||
To connect to a server you use the `connect()` function. It returns | ||
a connection that you can use to interact with the server. | ||
// Unsubscribing | ||
const sub = nc.subscribe('foo', (err, m) => { | ||
if(!err) { | ||
console.log(m) | ||
} else { | ||
console.error(err) | ||
} | ||
}) | ||
sub.unsubscribe() | ||
By default, a connection will attempt to auto-reconnect when dropped | ||
due to some networking type error. Messages that have not been | ||
sent to the server when a disconnect happens are lost. A client can queue | ||
new messages to be sent when the connection resumes. If the connection | ||
cannot be re-established the client will give up and `close` the connection. | ||
To learn when a connection closes, wait for the promise returned by the `closed()` | ||
function. If the close was due to an error, it will resolve to an error. | ||
// Subscription/Request are given two arguments: | ||
// - an error (undefined if no error) | ||
// - a message object | ||
// - the message has 4 properties: | ||
// - data - the message payload (can be undefined) | ||
// - subject - subject where the message was sent | ||
// - reply - the reply subject if this is a request (can be undefined) | ||
// - sid - the subscription id associated with the handler (same value as the return of subscribe()) | ||
nc.subscribe('foo', (_, m) => { | ||
if (m.reply) { | ||
nc.publish(m.reply, 'got ' + m.data + ' on ' + m.subject + ' in subscription id ' + m.sid) | ||
return | ||
} | ||
console.log('Received a message: ' + m.data + " it wasn't a request.") | ||
}) | ||
To disconnect from the nats-server, you call `close()` on the connection. | ||
Connections can also be closed if there's an error. For example, the | ||
server returns some run-time error. | ||
// Request, creates a subscription to handle any replies to the request | ||
// subject, and publishes the request with an optional payload. This usage | ||
// allows you to collect responses from multiple services | ||
nc.request('request', (_, m) => { | ||
console.log('Got a response in msg stream: ' + m.data) | ||
}) | ||
This first example shows basic options that you can provide to the connect | ||
function, note that you can specify multiple servers, allowing the client | ||
to cope with a server that is not available. | ||
// Request with a max option will unsubscribe after | ||
// the first max messages are received. You can also specify the number | ||
// of milliseconds you are willing to wait for the response - when a timeout | ||
// is specified, you can receive an error | ||
nc.request('help', (err, m) => { | ||
if (err && err.code === NATS.REQ_TIMEOUT) { | ||
console.log('request timed out') | ||
} else if (err) { | ||
console.error('request got error', err) | ||
} else { | ||
console.log('Got a response for help: ' + m.data) | ||
} | ||
}, null, { max: 1, timeout: 1000 }) | ||
// Replies | ||
nc.subscribe('help', (_, m) => { | ||
nc.publish(m.reply, 'I can help!') | ||
}) | ||
// Close connection | ||
nc.close() | ||
```javascript | ||
const { connect } = require("nats"); | ||
[ | ||
{}, | ||
{ servers: ["demo.nats.io:4442", "demo.nats.io:4222"] }, | ||
{ servers: "demo.nats.io:4443" }, | ||
{ port: 4222 }, | ||
{ servers: "localhost" }, | ||
] | ||
.forEach(async (v) => { | ||
await connect(v) | ||
.then((nc) => { | ||
console.log(`connected to ${nc.getServer()}`); | ||
nc.close(); | ||
}) | ||
.catch(() => { | ||
console.log(`unable to connect to ${JSON.stringify(v)}`); | ||
}); | ||
}); | ||
``` | ||
## JSON | ||
### Publish and Subscribe | ||
The basic client operations are to `subscribe` to receive messages, | ||
and publish to `send` messages. A subscription works as an async | ||
iterator where you process messages in a loop until the subscription | ||
closes. | ||
The `payload` connect property makes it easy to exchange JSON data with other | ||
clients when set to `Payload.JSON`: | ||
NATS is payload agnostic, this means that payloads are `Uint8Arrays`. | ||
You can easily send JSON or strings by using a `StringCodec` or a | ||
`JSONCodec`, or create a Codec of your own that handles the type | ||
of your data as necessary. | ||
```javascript | ||
const nc = NATS.connect({ payload: Payload.JSON }) | ||
nc.on('connect', () => { | ||
nc.on('error', (err) => { | ||
console.log(err) | ||
}) | ||
const { connect, StringCodec } = require("nats"); | ||
nc.subscribe('greeting', (_, m) => { | ||
// msg is a parsed JSON object object | ||
if (m.data && m.data.name && m.reply) { | ||
nc.publish(m.reply, { greeting: 'hello ' + m.data.name }) | ||
} | ||
}) | ||
(async () => { | ||
// create a connection to a nats-server | ||
const nc = await connect({ servers: "demo.nats.io" }); | ||
// As with all inputs from unknown sources, if you don't trust the data | ||
// you should verify it prior to accessing it. While JSON is safe because | ||
// it doesn't export functions, it is still possible for a client to | ||
// cause issues to a downstream consumer that is not written carefully | ||
nc.subscribe('unsafe', (_, m) => { | ||
// for example a client could inject a bogus `toString` property | ||
// which could cause your client to crash should you try to | ||
// concatenation with the `+` like this: | ||
// console.log("received", msg + "here"); | ||
// `TypeError: Cannot convert object to primitive value` | ||
// Note that simple `console.log(msg)` is fine. | ||
if (Object.hasOwnProperty.call(m, 'toString')) { | ||
console.log('tricky - trying to crash me:', m.toString) | ||
return | ||
} | ||
// create a codec to encode/decode payloads | ||
const sc = StringCodec(); | ||
// of course this is no different than using a value that is | ||
// expected in one format (say a number), but the client provides | ||
// a string: | ||
if (isNaN(m.data.amount) === false) { | ||
// do something with the number | ||
// create a simple subscriber and iterate over messages | ||
// matching the subscription | ||
const sub = nc.subscribe("hello"); | ||
(async () => { | ||
for await (const m of sub) { | ||
console.log(`[${sub.getProcessed()}]: ${sc.decode(m.data)}`); | ||
} | ||
// ... | ||
}) | ||
})().then(() => { | ||
console.log("subscription closed"); | ||
}); | ||
// the bad guy | ||
nc.publish('unsafe', { toString: 'no good' }) | ||
// publish two messages to the nats-server | ||
nc.publish("hello", sc.encode("world")); | ||
nc.publish("hello", sc.encode("again")); | ||
nc.flush(function () { | ||
nc.close() | ||
}) | ||
}) | ||
// we want to insure that messages that are in flight | ||
// get processed, so we are going to drain the | ||
// connection. Drain is the same as close, but makes | ||
// sure that all messages in flight get seen | ||
// by the iterator. After calling drain on the connection | ||
// the connection closes. | ||
await nc.drain(); | ||
console.log("connection ended"); | ||
})(); | ||
``` | ||
## Wildcard Subscriptions | ||
### Streams | ||
Streams are messages that are published at regular intervals. | ||
To get the messages, you simply subscribe to them. To stop | ||
getting the messages you unsubscribe. | ||
```javascript | ||
const nc = NATS.connect({ payload: Payload.JSON }) | ||
// "*" matches any token, at any level of the subject. | ||
nc.subscribe('foo.*.baz', (_, m) => { | ||
console.log('Msg received on [' + m.subject + '] : ' + m.data) | ||
}) | ||
const { connect, JSONCodec } = require("nats"); | ||
nc.subscribe('foo.bar.*', (_, m) => { | ||
console.log('Msg received on [' + m.subject + '] : ' + m.data) | ||
}) | ||
(async () => { | ||
// to create a connection to a nats-server: | ||
const nc = await connect({ servers: "demo.nats.io" }); | ||
// ">" matches any length of the tail of a subject, and can only be | ||
// the last token E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', | ||
// 'foo.foo.bar.bax.22' | ||
nc.subscribe('foo.>', (_, m) => { | ||
console.log('Msg received on [' + m.subject + '] : ' + m.data) | ||
}) | ||
``` | ||
// create a codec | ||
const jc = JSONCodec(); | ||
## Queue Groups | ||
console.info("enter the following command to get messages from the stream"); | ||
console.info( | ||
"node run examples/nats-sub.js -s demo.nats.io stream.demo", | ||
); | ||
```javascript | ||
// All subscriptions with the same queue name will form a queue group. | ||
// Each message will be delivered to only one subscriber per queue group, | ||
// queuing semantics. You can have as many queue groups as you wish. | ||
// Normal subscribers will continue to work as expected. | ||
let received = 0 | ||
nc.subscribe('foo', () => { | ||
received++ | ||
}, { queue: 'job.workers' }) | ||
const start = Date.now(); | ||
let sequence = 0; | ||
setInterval(() => { | ||
sequence++; | ||
const uptime = Date.now() - start; | ||
console.info(`publishing #${sequence}`); | ||
nc.publish("stream.demo", jc.encode({ sequence, uptime })); | ||
}, 1000); | ||
})(); | ||
``` | ||
## Clustered Usage | ||
```javascript | ||
const servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222'] | ||
### Wildcard Subscriptions | ||
Sometimes you want to process an event (message), based on the | ||
subject that was used to send it. In NATS this is accomplished | ||
by specifying wildcards in the subscription. Subjects that match | ||
the wildcards, are sent to the client. | ||
// Randomly connect to a server in the cluster group. | ||
// Note that because `url` is not specified, the default connection is called first | ||
// (nats://localhost:4222). If you don't want default connection, specify one of | ||
// the above the above servers as `url`: `nats.connect(servers[0], {'servers': servers});` | ||
let nc = NATS.connect({ servers: servers }) | ||
In the example below, I am using 3 different subscription | ||
to highlight that each subscription is independent. And if | ||
the subject you use matches one or more of them, they all | ||
will get a chance at processing the message. | ||
// currentServer is the URL of the connected server. | ||
nc.on('connect', () => { | ||
console.log('Connected to ' + nc.currentServer.url.host) | ||
}) | ||
```javascript | ||
const { connect, StringCodec } = require("nats"); | ||
// Preserve order when connecting to servers. | ||
nc = NATS.connect({ noRandomize: true, servers: servers }) | ||
``` | ||
(async () => { | ||
const nc = await connect({ servers: "demo.nats.io" }); | ||
const sc = StringCodec(); | ||
## Draining Connections and Subscriptions | ||
// subscriptions can have wildcard subjects | ||
// the '*' matches any string in the specified token position | ||
const s1 = nc.subscribe("help.*.system"); | ||
const s2 = nc.subscribe("help.me.*"); | ||
// the '>' matches any tokens in that position or following | ||
// '>' can only be specified at the end | ||
const s3 = nc.subscribe("help.>"); | ||
```javascript | ||
// Unsubscribing removes the subscription handler for a subscription | ||
// and cancels the subscription. Any pending messages on the client's | ||
// buffer are discarded. | ||
// | ||
// Draining is similar to unsubscribe, but the client instead | ||
// sends the unsubscribe request followed by a flush. When the flush | ||
// returns, the subscription handler is removed. Thus the client is | ||
// able to process all messages sent by the server before the subscription | ||
// handler is removed. | ||
// | ||
// Draining is particularly valuable with queue subscriptions preventing | ||
// messages from being lost. | ||
let c1 = 0 | ||
const sub = nc.subscribe('foo', () => { | ||
c1++ | ||
if (c1 === 1) { | ||
sub.drain((err) => { | ||
if (err) { | ||
console.error(err) | ||
return | ||
} | ||
console.log(`subscription ${sub.sid} drained`) | ||
}) | ||
async function printMsgs(s) { | ||
let subj = s.getSubject(); | ||
console.log(`listening for ${subj}`); | ||
const c = (13 - subj.length); | ||
const pad = "".padEnd(c); | ||
for await (const m of s) { | ||
console.log( | ||
`[${subj}]${pad} #${s.getProcessed()} - ${m.subject} ${ | ||
m.data ? " " + sc.decode(m.data) : "" | ||
}`, | ||
); | ||
} | ||
} | ||
}, { queue: 'q1' }) | ||
// It is possible to drain a connection, draining a connection: | ||
// - drains all subscriptions | ||
// - after calling drain it is impossible to make subscriptions or requests | ||
// - when all subscriptions are drained, it is impossible to publish | ||
// messages and drained connection is closed. | ||
// - finally, the callback handler is called (with possibly an error). | ||
printMsgs(s1); | ||
printMsgs(s2); | ||
printMsgs(s3); | ||
let c2 = 0 | ||
nc.subscribe('foo', () => { | ||
c2++ | ||
if (c2 === 1) { | ||
nc.drain((err) => { | ||
if (err) { | ||
console.error('error draining', err) | ||
return | ||
} | ||
console.log('connection drained') | ||
}) | ||
} | ||
}, { queue: 'q1' }) | ||
// don't exit until the client closes | ||
await nc.closed(); | ||
})(); | ||
``` | ||
## TLS | ||
### Services | ||
A service is a client that responds to requests from other clients. | ||
Now that you know how to create subscriptions, and know about wildcards, | ||
it is time to develop a service that mocks something useful. | ||
This example is a bit complicated, because we are going to use NATS | ||
not only to provide a service, but also to control the service. | ||
```javascript | ||
const NATS = require('nats') | ||
const fs = require('fs') | ||
const { connect, StringCodec } = require("nats"); | ||
// Simple TLS connect | ||
let nc = NATS.connect({ tls: true }) | ||
(async () => { | ||
// create a connection | ||
const nc = await connect({ servers: "demo.nats.io" }); | ||
// Overriding and not verifying the server | ||
let tlsOptions = { | ||
rejectUnauthorized: false | ||
} | ||
nc = NATS.connect({ tls: tlsOptions }) | ||
// nc.stream.authorized will be false | ||
// create a codec | ||
const sc = StringCodec(); | ||
// Use a specified CA for self-signed server certificates | ||
tlsOptions = { | ||
ca: [fs.readFileSync('./test/certs/ca.pem')] | ||
} | ||
nc = NATS.connect({ tls: tlsOptions }) | ||
// nc.stream.authorized should be true | ||
// A service is a subscriber that listens for messages, and responds | ||
const started = Date.now(); | ||
const sub = nc.subscribe("time"); | ||
requestHandler(sub); | ||
// Use a client certificate if the server requires | ||
tlsOptions = { | ||
key: fs.readFileSync('./test/certs/client-key.pem'), | ||
cert: fs.readFileSync('./test/certs/client-cert.pem'), | ||
ca: [fs.readFileSync('./test/certs/ca.pem')] | ||
} | ||
nc = NATS.connect({ tls: tlsOptions }) | ||
``` | ||
// If you wanted to manage a service - well NATS is awesome | ||
// for just that - setup another subscription where admin | ||
// messages can be sent | ||
const msub = nc.subscribe("admin.*"); | ||
adminHandler(msub); | ||
## Basic Authentication | ||
```javascript | ||
// Connect with username and password in the url | ||
let nc = NATS.connect('nats://foo:bar@localhost:4222') | ||
// wait for the client to close here. | ||
await nc.closed().then((err) => { | ||
let m = `connection to ${nc.getServer()} closed`; | ||
if (err) { | ||
m = `${m} with an error: ${err.message}`; | ||
} | ||
console.log(m); | ||
}); | ||
// Connect with username and password inside object | ||
nc = NATS.connect({ url: 'nats://localhost:4222', user: 'foo', pass: 'bar' }) | ||
// this implements the public service, and just prints | ||
async function requestHandler(sub) { | ||
console.log(`listening for ${sub.getSubject()} requests...`); | ||
let serviced = 0; | ||
for await (const m of sub) { | ||
serviced++; | ||
if (m.respond(sc.encode(new Date().toISOString()))) { | ||
console.info( | ||
`[${serviced}] handled ${m.data ? "- " + sc.decode(m.data) : ""}`, | ||
); | ||
} else { | ||
console.log(`[${serviced}] ignored - no reply subject`); | ||
} | ||
} | ||
} | ||
// Connect with token in url | ||
nc = NATS.connect('nats://mytoken@localhost:4222') | ||
// this implements the admin service | ||
async function adminHandler(sub) { | ||
console.log(`listening for ${sub.getSubject()} requests [uptime | stop]`); | ||
// Connect with token inside object | ||
nc = NATS.connect({ url: 'nats://localhost:4222', token: 'mytoken' }) | ||
// it would be very good to verify the origin of the request | ||
// before implementing something that allows your service to be managed. | ||
// NATS can limit which client can send or receive on what subjects. | ||
for await (const m of sub) { | ||
const chunks = m.subject.split("."); | ||
console.info(`[admin] handling ${chunks[1]}`); | ||
switch (chunks[1]) { | ||
case "uptime": | ||
// send the number of millis since up | ||
m.respond(sc.encode(`${Date.now() - started}`)); | ||
break; | ||
case "stop": | ||
m.respond(sc.encode("stopping....")); | ||
// finish requests by draining the subscription | ||
await sub.drain(); | ||
// close the connection | ||
const _ = nc.close(); | ||
break; | ||
default: | ||
console.log(`ignoring request`); | ||
} | ||
} | ||
} | ||
})(); | ||
``` | ||
## New Authentication (Nkeys and User Credentials) | ||
See examples for more usage. | ||
### Making Requests | ||
```javascript | ||
const nkeys = require('ts-nkeys') | ||
const { connect, StringCodec } = require("nats"); | ||
// Simple connect using credentials file. This loads JWT and signing key | ||
// each time that NATS connects. | ||
let nc = NATS.connect('connect.ngs.global', { credsFile: './myid.creds' }) | ||
(async () => { | ||
// create a connection | ||
const nc = await connect({ servers: "demo.nats.io:4222" }); | ||
// Manually, you need to specify the JWT, and seed and sign the challenge | ||
const jwt = 'eyJ0eXAiOiLN1...' | ||
const nkeySeed = 'SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4' | ||
const sk = nkeys.fromSeed(Buffer.from(nkeySeed)) | ||
// create an encoder | ||
const sc = StringCodec(); | ||
// Setting nkey and signing callback directly. | ||
nc = NATS.connect('nats://localhost:4222', { | ||
nkey: 'UAH42UG6PV552P5SWLWTBP3H3S5BHAVCO2IEKEXUANJXR75J63RQ5WM6', | ||
nonceSigner: function (nonce) { | ||
return sk.sign(nonce) | ||
} | ||
}) | ||
// a client makes a request and receives a promise for a message | ||
// by default the request times out after 1s (1000 millis) and has | ||
// no payload. | ||
await nc.request("time", sc.encode("hello!"), { timeout: 1000 }) | ||
.then((m) => { | ||
console.log(`got response: ${sc.decode(m.data)}`); | ||
}) | ||
.catch((err) => { | ||
console.log(`problem with request: ${err.message}`); | ||
}); | ||
// Setting user JWT statically. | ||
nc = NATS.connect({ | ||
userJWT: jwt, | ||
nonceSigner: function (nonce) { | ||
return sk.sign(nonce) | ||
} | ||
}) | ||
await nc.close(); | ||
})(); | ||
// Having user JWT be a function that returns the JWT. Can be useful for | ||
// loading a new JWT. | ||
nc = NATS.connect({ | ||
userJWT: function () { | ||
return jwt | ||
}, | ||
nonceSigner: function (nonce) { | ||
return sk.sign(nonce) | ||
} | ||
}) | ||
``` | ||
## Advanced Usage | ||
### Queue Groups | ||
Queue groups allow scaling of services horizontally. Subscriptions for members of a | ||
queue group are treated as a single service, that means when you send a message | ||
only a single client in a queue group will receive it. There can be multiple queue | ||
groups, and each is treated as an independent group. Non-queue subscriptions are | ||
also independent. | ||
```javascript | ||
const { | ||
connect, | ||
StringCodec, | ||
} = require("nats"); | ||
```javascript | ||
// Flush connection to server, callback fires when all messages have | ||
// been processed. | ||
nc.flush((err) => { | ||
if (err) { | ||
console.error('error flushing', err) | ||
return | ||
(async () => { | ||
async function createService( | ||
name, | ||
count = 1, | ||
queue = "" | ||
) { | ||
const conns = []; | ||
for (let i = 1; i <= count; i++) { | ||
const n = queue ? `${name}-${i}` : name; | ||
const nc = await connect( | ||
{ servers: "demo.nats.io:4222", name: `${n}` }, | ||
); | ||
nc.closed() | ||
.then((err) => { | ||
if (err) { | ||
console.error( | ||
`service ${n} exited because of error: ${err.message}`, | ||
); | ||
} | ||
}); | ||
// create a subscription - note the option for a queue, if set | ||
// any client with the same queue will be the queue group. | ||
const sub = nc.subscribe("echo", { queue: queue }); | ||
const _ = handleRequest(n, sub); | ||
console.log(`${nc.options.name} is listening for 'echo' requests...`); | ||
conns.push(nc); | ||
} | ||
return conns; | ||
} | ||
console.log('round trip to the server done') | ||
}) | ||
// If you want to make sure NATS yields during the processing | ||
// of messages, you can use an option to specify a yieldTime in ms. | ||
// During the processing of the inbound stream, NATS will yield if it | ||
// spends more than yieldTime milliseconds processing. | ||
nc = NATS.connect({ port: 4222, yieldTime: 10 }) | ||
const sc = StringCodec(); | ||
// Timeout a subscription unless a certain number of messages have been received | ||
// When a subscription times out, it automatically cancels. You can specify more | ||
// messages in the `expected` option. However that count of messages must be | ||
// received before the timeout specified. If `expected` is not specified, it | ||
// defaults to '1'. | ||
NATS.subscribe('foo', (err) => { | ||
if (err && err.code === TIMEOUT_ERR) { | ||
// didn't get the message | ||
// simple handler for service requests | ||
async function handleRequest(name, sub) { | ||
const p = 12 - name.length; | ||
const pad = "".padEnd(p); | ||
for await (const m of sub) { | ||
// respond returns true if the message had a reply subject, thus it could respond | ||
if (m.respond(m.data)) { | ||
console.log( | ||
`[${name}]:${pad} #${sub.getProcessed()} echoed ${sc.decode(m.data)}`, | ||
); | ||
} else { | ||
console.log( | ||
`[${name}]:${pad} #${sub.getProcessed()} ignoring request - no reply subject`, | ||
); | ||
} | ||
} | ||
} | ||
// do something | ||
}, {timeout: 1000, expected: 1}) | ||
// let's create two queue groups and a standalone subscriber | ||
const conns = []; | ||
conns.push(...await createService("echo", 3, "echo")); | ||
conns.push(...await createService("other-echo", 2, "other-echo")); | ||
conns.push(...await createService("standalone")); | ||
// Auto-unsubscribe after max messages received | ||
nc.subscribe('foo', () => {}, { max: 100 }) | ||
// or | ||
const sub = nc.subscribe('foo', () => {}) | ||
sub.unsubscribe(100) | ||
const a = []; | ||
conns.forEach((c) => { | ||
a.push(c.closed()); | ||
}); | ||
await Promise.all(a); | ||
})(); | ||
``` | ||
## Advanced Usage | ||
// Encodings | ||
### Authentication | ||
// By default messages received will be decoded using UTF8. To change that, | ||
// set the encoding option on the connection. | ||
Simple authentication just provides connection properties. The fundamental | ||
mechanism for authentication relies on an ["authenticator"](index.d.ts). | ||
Autheticators can be used for nkeys, and JWT authentication as well as | ||
any other. In cases where you want to dynamically obtain credentials, | ||
the authenticator is where you would provide this logic. | ||
NATS.connect({ encoding: 'ascii' }) | ||
```typescript | ||
// if the connection requires authentication, provide `user` and `pass` or | ||
// `token` options in the NatsConnectionOptions. | ||
import { connect } from "src/mod.ts"; | ||
// Binary Data | ||
const nc1 = await connect({ port: ns.port, user: "jenny", pass: "867-5309" }); | ||
const nc2 = await connect({ port: ns.port, token: "s3cret!" }); | ||
// To prevent payload conversion from a Buffer to a string, set the | ||
// `payload` option to `Payload.Binary`. Message payload return will be a Buffer. | ||
// nkeys | ||
const auth = nkeyAuthenticator(seed); | ||
const nc3 = await connect({ port: ns.port, authenticator: auth }); | ||
``` | ||
NATS.connect({ payload: Payload.Binary }) | ||
### Flush | ||
```javascript | ||
// flush sends a PING request to the servers and returns a promise | ||
// when the servers responds with a PONG. The flush guarantees that | ||
// things you published have been delivered to the servers. Typically | ||
// it is not necessary to use flush, but on tests it can be invaluable. | ||
nc.publish('foo'); | ||
nc.publish('bar'); | ||
await nc.flush(); | ||
``` | ||
// Reconnect Attempts and Time between reconnects | ||
// By default a NATS connection will try to reconnect to all servers 10 times | ||
// waiting 2 seconds between the previous reconnect to the server. If the | ||
// maximum number of retries is reached, the client will close the connection. | ||
// To change the default behaviour specify the max number of connection | ||
// attempts in `maxReconnectAttempts` (set to -1 to retry forever), and the | ||
// time in milliseconds between reconnects in `reconnectTimeWait`. | ||
NATS.connect({ maxReconnectAttempts: -1, reconnectTimeWait: 250 }) | ||
### Auto Unsubscribe | ||
```javascript | ||
// subscriptions can auto unsubscribe after a certain number of messages | ||
nc.subscribe('foo', {max:10}); | ||
``` | ||
# Events | ||
The nats client is an event emitter, you can listen to several kinds of events. | ||
### Timeout Subscriptions | ||
```javascript | ||
// emitted whenever there's an error. if you don't implement at least | ||
// the error handler, your program will crash if an error is emitted. | ||
nc.on('error', (err) => { | ||
console.log(err) | ||
}) | ||
// create subscription with a timeout, if no message arrives | ||
// within the timeout, the function running the iterator with | ||
// reject - depending on how you code it, you may need a | ||
// try/catch block. | ||
// import the connect function | ||
const { connect, ErrorCode } = require("nats"); | ||
// connect callback provides a reference to the connection as an argument | ||
nc.on('connect', (nc) => { | ||
console.log(`connect to ${nc.servers.getCurrent().url.host}`) | ||
}) | ||
(async () => { | ||
// to create a connection to a nats-server: | ||
const nc = await connect({ servers: "demo.nats.io" }); | ||
// emitted whenever the client disconnects from a server | ||
nc.on('disconnect', () => { | ||
console.log('disconnect') | ||
}) | ||
// create subscription with a timeout, if no message arrives | ||
// within the timeout, the subscription throws a timeout error | ||
const sub = nc.subscribe("hello", { timeout: 1000 }); | ||
(async () => { | ||
for await (const m of sub) { | ||
console.log(`got message #${sub.getProcessed()}`); | ||
} | ||
})().catch((err) => { | ||
if (err.code === ErrorCode.TIMEOUT) { | ||
console.log(`sub timed out!`); | ||
} else { | ||
console.log(`sub iterator got an error!`); | ||
} | ||
nc.close(); | ||
}); | ||
// emitted whenever the client is attempting to reconnect | ||
nc.on('reconnecting', () => { | ||
console.log('reconnecting') | ||
}) | ||
await nc.closed(); | ||
})(); | ||
``` | ||
// emitted whenever the client reconnects | ||
// reconnect callback provides a reference to the connection as an argument | ||
nc.on('reconnect', (nc) => { | ||
console.log(`reconnect to ${nc.servers.getCurrent().url.host}`) | ||
}) | ||
### Async vs. Callbacks | ||
// emitted when the connection is closed - once a connection is closed | ||
// the client has to create a new connection. | ||
nc.on('close', () => { | ||
console.log('close') | ||
}) | ||
Previous versions of the JavaScript NATS clients specified callbacks | ||
for message processing. This required complex handling logic when a | ||
service required coordination of operations. Callbacks are an | ||
inversion of control anti-pattern. | ||
// emitted whenever the client unsubscribes | ||
nc.on('unsubscribe', (sid, subject) => { | ||
console.log('unsubscribed subscription', sid, 'for subject', subject) | ||
}) | ||
The async APIs trivialize complex coordination and makes your code | ||
easier to maintain. With that said, there are some implications: | ||
// emitted whenever the server returns a permission error for | ||
// a publish for the current user. This sort of error | ||
// means that the client cannot publish/request | ||
// on the specific subject. Note that subscription permission | ||
// errors are delivered to the subscription's handler | ||
nc.on(pubError, (err) => { | ||
console.error('got a permissions error', err.message) | ||
}) | ||
``` | ||
- Async subscriptions buffer inbound messages. | ||
- Subscription processing delays until the runtime executes the promise related | ||
microtasks at the end of an event loop. | ||
See examples and benchmarks for more information. | ||
In a traditional callback based library, I/O happens after all data yielded by | ||
a read in the current event loop completes processing. This means that | ||
callbacks are invoked as part of processing. With async, processing is queued | ||
up in a microtask queue. At the end of the event loop, the runtime processes | ||
the microtasks, which in turn resumes your functions. As expected, this | ||
increases latency, but also provides additional liveliness. | ||
## Connect Options | ||
To reduce async latency, the NATS client allows processing a subscription | ||
in the same event loop that dispatched the message. Simply specify a `callback` | ||
in the subscription options. The signature for a callback is | ||
`(err: (Error|null), msg: Msg) => void`. When specified, the subscription | ||
iterator will never yield a message. | ||
The following is the list of connection options and default values. | ||
Note that `callback` likely shouldn't even be documented, as likely it | ||
is a workaround to an underlying application problem where you should be | ||
considering a different strategy to horizontally scale your application, | ||
or reduce pressure on the clients, such as using queue workers, | ||
or more explicitly targeting messages. | ||
| Option | Default | Description | ||
|-------- |--------- |------------ | ||
| `credsFile` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials). | ||
| `encoding` | `"utf8"` | Encoding specified by the client to encode/decode data | ||
| `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error | ||
| `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit | ||
| `name` | | Optional client name | ||
| `nkey` | `` | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials) | ||
| `noEcho` | `false` | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code `NO_ECHO_NOT_SUPPORTED` is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature. | ||
| `noMuxRequests` | `false` | If set to `true` calls to `request()` will create an inbox subscription per call. | ||
| `noRandomize` | `false` | If set, the order of user-specified servers is randomized. | ||
| `nonceSigner` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials). A function that takes a `Buffer` and returns a nkey signed signature. | ||
| `pass` | | Sets the password for a connection | ||
| `pedantic` | `false` | Turns on strict subject format checks | ||
| `pingInterval` | `120000` | Number of milliseconds between client-sent pings | ||
| `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts | ||
| `reconnect` | `true` | If false server will not attempt reconnecting | ||
| `servers` | | Array of connection `url`s | ||
| `timeout` | node default - no timeout | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted. | ||
| `tls` | `false` | This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. The value can also be an object specifying TLS certificate data. The properties `ca`, `key`, `cert` should contain the certificate file data. `ca` should be provided for self-signed certificates. `key` and `cert` are required for client provided certificates. `rejectUnauthorized` if `true` validates server's credentials | ||
| `tokenHandler` | | A function returning a `token` used for authentication. | ||
| `token` | | Sets a authorization token for a connection | ||
| `url` | `"nats://localhost:4222"` | Connection url | ||
| `userJWT` | | See [NKeys/User Credentials](https://github.com/nats-io/nats.js#new-authentication-nkeys-and-user-credentials). The property can be a JWT or a function that returns a JWT. | ||
| `user` | | Sets the username for a connection | ||
| `verbose` | `false` | Turns on `+OK` protocol acknowledgements | ||
| `waitOnFirstConnect` | `false` | If `true` the server will fall back to a reconnect mode if it fails its first connection attempt. | ||
| `yieldTime` | | If set, processing will yield at least the specified number of milliseconds to IO callbacks before processing inbound messages | ||
## Tools | ||
### Lifecycle/Informational Events | ||
Clients can get notification on various event types: | ||
- `Events.DISCONNECT` | ||
- `Events.RECONNECT` | ||
- `Events.UPDATE` | ||
- `Events.LDM` | ||
The examples, `node-pub`, `node-sub`, `node-req`, `node-reply` are now bound to `bin` entries on the npm package. | ||
You can use these while developing your own tools. After you install the `nats` npm package, you'll need to add | ||
a dependency on `minimist` before you can use the tools: | ||
The first two fire when a client disconnects and reconnects respectively. | ||
The payload will be the server where the event took place. | ||
```bash | ||
npm install nats | ||
npm install minimist | ||
... | ||
% npx node-sub hello & | ||
[1] 9208 | ||
% Listening on [hello] | ||
% npx node-pub hello world | ||
Received "world" | ||
Published [hello] : "world" | ||
``` | ||
The `UPDATE` event notifies whenever the client receives a cluster configuration | ||
update. The `ServersChanged` interface provides two arrays: `added` and `deleted` | ||
listing the servers that were added or removed. | ||
## Supported Node Versions | ||
The `LDM` event notifies that the current server has signaled that it | ||
is running in _Lame Duck Mode_ and will evict clients. Depending on the server | ||
configuration policy, the client may want to initiate an ordered shutdown, and | ||
initiate a new connection to a different server in the cluster. | ||
Our support policy for Nodejs versions follows [Nodejs release support]( https://github.com/nodejs/Release). | ||
We will support and build node-nats on even-numbered Nodejs versions that are current or in LTS. | ||
```javascript | ||
const nc = await connect(); | ||
(async () => { | ||
console.info(`connected ${nc.getServer()}`); | ||
for await (const s of nc.status()) { | ||
console.info(`${s.type}: ${s.data}`); | ||
} | ||
})().then(); | ||
nc.closed() | ||
.then((err) => { | ||
console.log( | ||
`connection closed ${err ? " with error: " + err.message : ""}`, | ||
); | ||
}); | ||
``` | ||
## Running Tests | ||
To run the tests, you need to have a `nats-server` executable in your path. Refer to the [server installation guide](https://nats-io.github.io/docs/nats_server/installation.html) in the NATS.io documentation. With that in place, you can run `npm test` to run all tests. | ||
## License | ||
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file. | ||
To be aware of when a client closes, wait for the `closed()` promise to resolve. | ||
When it resolves, the client has finished and won't reconnect. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 2 instances in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 2 instances in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
314918
1
6
75
4731
535
1
3
+ Addednkeys.js@^1.0.0-5
+ Addednkeys.js@1.1.0(transitive)
- Removednuid@^1.1.4
- Removedts-nkeys@^1.0.16
- Removednuid@1.1.6(transitive)
- Removedts-nkeys@1.0.16(transitive)