What is nats?
The 'nats' npm package is a client library for the NATS messaging system, which is a high-performance, lightweight, and open-source messaging system for cloud-native applications, IoT messaging, and microservices architectures. It provides publish-subscribe, request-reply, and distributed queueing functionalities.
What are nats's main functionalities?
Publish-Subscribe
This feature allows you to publish messages to a subject and have multiple subscribers receive those messages. The code sample demonstrates how to set up a simple publish-subscribe system where a message is published to the 'updates' subject and received by a subscriber.
const { connect, StringCodec } = require('nats');
(async () => {
const nc = await connect({ servers: 'demo.nats.io:4222' });
const sc = StringCodec();
// Subscriber
const sub = nc.subscribe('updates');
(async () => {
for await (const m of sub) {
console.log(`Received a message: ${sc.decode(m.data)}`);
}
})();
// Publisher
nc.publish('updates', sc.encode('Hello, NATS!'));
})();
Request-Reply
This feature allows you to send a request and receive a reply, enabling synchronous communication between services. The code sample shows how to set up a responder that listens for requests on the 'help' subject and a requester that sends a request and waits for a reply.
const { connect, StringCodec } = require('nats');
(async () => {
const nc = await connect({ servers: 'demo.nats.io:4222' });
const sc = StringCodec();
// Responder
nc.subscribe('help', {
callback: (err, msg) => {
if (err) {
console.error(err);
} else {
msg.respond(sc.encode('I can help!'));
}
}
});
// Requester
const msg = await nc.request('help', sc.encode('Need assistance'), { timeout: 1000 });
console.log(`Received reply: ${sc.decode(msg.data)}`);
})();
Distributed Queueing
This feature allows you to distribute tasks among multiple workers, ensuring that each task is processed by only one worker. The code sample demonstrates how to set up two workers that listen on the 'tasks' subject and a publisher that sends tasks to be processed.
const { connect, StringCodec } = require('nats');
(async () => {
const nc = await connect({ servers: 'demo.nats.io:4222' });
const sc = StringCodec();
// Worker 1
nc.subscribe('tasks', { queue: 'workers' }, (err, msg) => {
if (err) {
console.error(err);
} else {
console.log(`Worker 1 received: ${sc.decode(msg.data)}`);
}
});
// Worker 2
nc.subscribe('tasks', { queue: 'workers' }, (err, msg) => {
if (err) {
console.error(err);
} else {
console.log(`Worker 2 received: ${sc.decode(msg.data)}`);
}
});
// Publisher
nc.publish('tasks', sc.encode('Task 1'));
nc.publish('tasks', sc.encode('Task 2'));
})();
Other packages similar to nats
amqplib
The 'amqplib' package is a client for RabbitMQ, a widely-used message broker that supports multiple messaging protocols. Compared to NATS, RabbitMQ offers more advanced features like message persistence, complex routing, and transactions, but it is generally heavier and more complex to set up and manage.
kafka-node
The 'kafka-node' package is a client for Apache Kafka, a distributed streaming platform. Kafka is designed for high-throughput, fault-tolerant, and scalable messaging. It is more suitable for large-scale data streaming and log aggregation compared to NATS, which is more lightweight and easier to use for simple messaging needs.
mqtt
The 'mqtt' package is a client for the MQTT protocol, which is designed for lightweight, low-bandwidth, and low-latency communication, often used in IoT applications. While NATS is also lightweight, MQTT is specifically optimized for constrained environments and offers features like last will and testament (LWT) messages.
NATS - Node.js Client
A Node.js client for the NATS messaging system.
Installation
npm install nats
Basic Usage
var NATS = require('nats');
var nats = NATS.connect();
nats.publish('foo', 'Hello World!');
nats.subscribe('foo', function(msg) {
console.log('Received a message: ' + msg);
});
var sid = nats.subscribe('foo', function(msg) {});
nats.unsubscribe(sid);
var sid = nats.request('request', function(response) {
console.log('Got a response in msg stream: ' + response);
});
nats.request('help', null, {'max':1}, function(response) {
console.log('Got a response for help: ' + response);
});
nats.requestOne('help', null, {}, 1000, function(response) {
if(response instanceof NATS.NatsError && response.code === NATS.REQ_TIMEOUT) {
console.log('Request for help timed out.');
return;
}
console.log('Got a response for help: ' + response);
});
nats.subscribe('help', function(request, replyTo) {
nats.publish(replyTo, 'I can help!');
});
nats.close();
Wildcard Subscriptions
nats.subscribe('foo.*.baz', function(msg, reply, subject) {
console.log('Msg received on [' + subject + '] : ' + msg);
});
nats.subscribe('foo.bar.*', function(msg, reply, subject) {
console.log('Msg received on [' + subject + '] : ' + msg);
});
nats.subscribe('foo.>', function(msg, reply, subject) {
console.log('Msg received on [' + subject + '] : ' + msg);
});
Queue Groups
nats.subscribe('foo', {'queue':'job.workers'}, function() {
received += 1;
});
Clustered Usage
var nats = require('nats');
var servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222'];
var nc = nats.connect({'servers': servers});
console.log("Connected to " + nc.currentServer.url.host);
nc = nats.connect({'dontRandomize': true, 'servers':servers});
TLS
var nats = require('nats');
var fs = require('fs');
var nc = nats.connect({port: TLSPORT, tls: true});
var tlsOptions = {
rejectUnauthorized: false,
};
var nc = nats.connect({port: TLSPORT, tls: tlsOptions});
var tlsOptions = {
ca: [ fs.readFileSync('./test/certs/ca.pem') ]
};
var nc = nats.connect({port: TLSPORT, tls: tlsOptions});
var tlsOptions = {
key: fs.readFileSync('./test/certs/client-key.pem'),
cert: fs.readFileSync('./test/certs/client-cert.pem'),
ca: [ fs.readFileSync('./test/certs/ca.pem') ]
};
var nc = nats.connect({port: TLSPORT, tls: tlsOptions});
Authentication
var nc = NATS.connect("nats://foo:bar@localhost:4222");
var nc = NATS.connect({'url':"nats://localhost:4222", 'user':'foo', 'pass':'bar'});
var nc = NATS.connect("nats://mytoken@localhost:4222");
var nc = NATS.connect({'url':"nats://localhost:4222", 'token':'mytoken'});
Advanced Usage
nats.publish('foo', 'You done?', function() {
console.log('msg processed!');
});
nats.flush(function() {
console.log('All clear!');
});
var nc = nats.connect({port: PORT, yieldTime: 10});
var sid = nats.subscribe('foo', function() {
received += 1;
});
nats.timeout(sid, timeout_ms, expected, function() {
timeout = true;
});
nats.subscribe('foo', {'max':MAX_WANTED});
nats.unsubscribe(sid, MAX_WANTED);
var nats = require('nats');
var nc1 = nats.connect();
var nc2 = nats.connect();
nc1.subscribe('foo');
nc2.publish('foo');
nc = nats.connect({'servers':servers, 'encoding': 'ascii'});
nc = nats.connect({'preserveBuffers': true});
nc = nats.connect({'maxReconnectAttempts': -1, 'reconnectTimeWait': 250});
nc.on('error', function(err) {
console.log(err);
});
nc.on('connect', function(nc) {
console.log('connected');
});
nc.on('disconnect', function() {
console.log('disconnect');
});
nc.on('reconnecting', function() {
console.log('reconnecting');
});
nc.on('reconnect', function(nc) {
console.log('reconnect');
});
nc.on('close', function() {
console.log('close');
});
See examples and benchmarks for more information.
Connect Options
The following is the list of connection options and default values.
Option | Aliases | Default | Description |
---|
encoding | | "utf8" | Encoding specified by the client to encode/decode data |
json | | false | If true, message payloads are converted to/from JSON |
maxPingOut | | 2 | Max number of pings the client will allow unanswered before rasing a stale connection error |
maxReconnectAttempts | | 10 | Sets the maximun number of reconnect attempts. The value of -1 specifies no limit |
name | client | | Optional client name |
noRandomize | dontRandomize , NoRandomize | false | If set, the order of user-specified servers is randomized. |
pass | password | | Sets the password for a connection |
pedantic | | false | Turns on strict subject format checks |
pingInterval | | 120000 | Number of milliseconds between client-sent pings |
preserveBuffers | | false | If true, data for a message is returned as Buffer |
reconnect | | true | If false server will not attempt reconnecting |
reconnectTimeWait | | 2000 | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts |
servers | urls | | Array of connection url s |
tls | secure | false | This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. The value can also be an object specifying TLS certificate data. The properties ca , key , cert should contain the certificate file data. ca should be provided for self-signed certificates. key and cert are required for client provided certificates. rejectUnauthorized if true validates server's credentials |
token | | | Sets a authorization token for a connection |
url | uri | "nats://localhost:4222" | Connection url |
useOldRequestStyle | | false | If set to true calls to request() and requestOne() will create an inbox subscription per call. |
user | | | Sets the username for a connection |
verbose | | false | Turns on +OK protocol acknowledgements |
waitOnFirstConnect | | false | If true the server will fall back to a reconnect mode if it fails its first connection attempt. |
yieldTime | | | If set, processing will yield at least the specified number of milliseconds to IO callbacks before processing inbound messages |
Supported Node Versions
Support policy for Nodejs versions follows
Nodejs release support.
We will support and build node-nats on even Nodejs versions that are current
or in maintenance.
License
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.