New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 2.0.0-16 to 2.0.0-17

6

index.d.ts

@@ -216,3 +216,2 @@ /*

export interface Sub {
sid: number;
/**

@@ -284,2 +283,7 @@ * Unsubscribe with optional max number of messages before unsubscribing.

isDraining(): boolean;
/**
* @return the id of the subscription
*/
getID(): number
}

@@ -286,0 +290,0 @@

409

lib/nats.js

@@ -199,7 +199,15 @@ /*

function Client (opts) {
this.subs = new Subs(this)
this.reqs = new Reqs(this)
this.servers = new Servers(this)
this.reconnects = 0
this.connected = false
this.wasConnected = false
this.reconnecting = false
this.pending = []
this.pout = 0
events.EventEmitter.call(this)
this.parseOptions(opts)
this.initState()
// Select a server to connect to.
this.selectServer()
this.servers.selectServer()
this.createConnection()

@@ -254,12 +262,2 @@ }

function shuffle (array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1))
const temp = array[i]
array[i] = array[j]
array[j] = temp
}
return array
}
/**

@@ -354,33 +352,8 @@ * Parse the constructor/connect options.

}
// For cluster support
this.servers = []
this.servers.init()
if (Array.isArray(options.servers)) {
options.servers.forEach((server) => {
this.servers.push(new Server(new url.URL(server)))
})
// Randomize if needed
if (options.noRandomize !== true) {
shuffle(this.servers)
}
// if they gave an URL we should add it if different
if (options.url !== undefined && this.servers.indexOf(options.url) === -1) {
// Make url first element so it is attempted first
this.servers.unshift(new Server(new url.URL(options.url)))
}
} else {
if (undefined === options.url) {
options.url = DEFAULT_URI
}
this.servers.push(new Server(new url.URL(options.url)))
}
// If we are not setup for tls, but were handed a url with a tls:// prefix
// then upgrade to tls.
if (options.tls === false) {
this.servers.forEach((server) => {
if (server.url.protocol === 'tls' || server.url.protocol === 'tls:') {
options.tls = true
}
})
options.tls = this.servers.hasTLS() !== undefined
}

@@ -424,43 +397,2 @@

/**
* Properly select the next server.
* We rotate the server list as we go,
* we also pull auth from urls as needed, or
* if they were set in options use that as override.
*
* @api private
*/
Client.prototype.selectServer = function () {
const server = this.servers.shift()
// Place in client context.
this.currentServer = server
this.url = server.url
const un = server.url.username || ''
const pw = server.url.password || ''
let up = ''
if (un !== '' && pw !== '') {
up = un + ':' + pw
} else if (un !== '') {
up = un
}
if (up !== '') {
const auth = up.split(':')
if (auth.length !== 1) {
if (this.options.user === undefined) {
this.user = auth[0]
}
if (this.options.pass === undefined) {
this.pass = auth[1]
}
} else {
if (this.options.token === undefined) {
this.token = auth[0]
}
}
}
this.servers.push(server)
return server
}
Client.prototype.checkNoEchoMismatch = function () {

@@ -572,3 +504,3 @@ if ((this.info.proto === undefined || this.info.proto < 1) && this.options.noEcho) {

Client.prototype.checkNkeyMismatch = function () {
if (undefined === this.info.nonce) {
if (this.info.nonce === undefined) {
return false

@@ -580,4 +512,11 @@ }

// Treat this as a filename.
// For now we will not capture an error on file not found etc.
const contents = fs.readFileSync(this.options.userCreds).toString()
let contents = null
try {
contents = fs.readFileSync(this.options.userCreds).toString()
} catch (err) {
this.emit(Events.Error, new NatsError(BAD_CREDS_MSG, ErrorCode.BAD_CREDS, err))
this.closeStream()
return true
}
if (CREDS.exec(contents) === null) {

@@ -599,3 +538,3 @@ this.emit(Events.Error, new NatsError(BAD_CREDS_MSG, ErrorCode.BAD_CREDS))

if (undefined === this.options.nonceSigner) {
if (this.options.nonceSigner === undefined) {
this.emit(Events.Error, new NatsError(SIGNATURE_REQUIRED_MSG, ErrorCode.SIGNATURE_REQUIRED))

@@ -605,3 +544,3 @@ this.closeStream()

}
if (typeof (this.options.nonceSigner) !== 'function') {
if (typeof this.options.nonceSigner !== 'function') {
this.emit(Events.Error, new NatsError(SIGCB_NOTFUNC_MSG, ErrorCode.SIGCB_NOTFUNC))

@@ -611,3 +550,3 @@ this.closeStream()

}
if (undefined === this.options.nkey && undefined === this.options.userJWT) {
if (this.options.nkey === undefined && this.options.userJWT === undefined) {
this.emit(Events.Error, new NatsError(NKEY_OR_JWT_REQ_MSG, ErrorCode.NKEY_OR_JWT_REQ))

@@ -631,3 +570,3 @@ this.closeStream()

this.wasConnected = true
this.currentServer.didConnect = true
this.servers.getCurrent().didConnect = true

@@ -706,3 +645,3 @@ this.emit(event, this)

stream.on('close', () => {
const done = (this.closed === true || this.options.reconnect === false || this.servers.length === 0)
const done = (this.closed === true || this.options.reconnect === false || this.servers.isEmpty())
// if connected, it resets everything as partial buffers may have been sent

@@ -743,3 +682,3 @@ // this will also reset the heartbeats, but not other timers on requests or subscriptions

// If we were connected just return, close event will process
if (this.wasConnected === true && this.currentServer.didConnect === true) {
if (this.wasConnected === true && this.servers.getCurrent().didConnect === true) {
return

@@ -751,3 +690,3 @@ }

// this list. Unless overridden
if (this.wasConnected === false && this.currentServer.didConnect === false) {
if (this.wasConnected === false && this.servers.getCurrent().didConnect === false) {
// We can override this behavior with waitOnFirstConnect, which will

@@ -757,5 +696,5 @@ // treat it like a reconnect scenario.

// Pretend to move us into a reconnect state.
this.currentServer.didConnect = true
this.servers.getCurrent().didConnect = true
} else {
this.servers.splice(this.servers.length - 1, 1)
this.servers.remove(this.servers.getCurrent())
}

@@ -766,3 +705,3 @@ }

// to the server and we only have one, and close
if (this.wasConnected === false && this.servers.length === 0) {
if (this.wasConnected === false && this.servers.isEmpty()) {
this.emit(Events.Error, new NatsError(CONN_ERR_MSG_PREFIX + exception, ErrorCode.CONN_ERR, exception))

@@ -873,3 +812,3 @@ this.close()

// Create the stream
this.stream = net.createConnection(this.url.port, this.url.hostname)
this.stream = net.createConnection(this.servers.getCurrent().url.port, this.servers.getCurrent().url.hostname)
// this change makes it a bit faster on Linux, slightly worse on OS X

@@ -882,19 +821,2 @@ this.stream.setNoDelay(true)

/**
* Initialize client state.
*
* @api private
*/
Client.prototype.initState = function () {
this.ssid = 0
this.subs = new Subs(this)
this.reqs = new Reqs(this)
this.reconnects = 0
this.connected = false
this.wasConnected = false
this.reconnecting = false
this.pending = []
this.pout = 0
}
/**
* Close the connection to the server.

@@ -1129,3 +1051,3 @@ *

// Always try to read the connect_urls from info
this.processServerUpdate()
this.servers.processServerUpdate()

@@ -1186,3 +1108,3 @@ // Process first INFO

// reset the reconnects for this server
this.currentServer.reconnects = 0
this.servers.getCurrent().reconnects = 0
// invoke the callback

@@ -1282,52 +1204,2 @@ this.connectCB()

/**
* Process server updates in info object
* @api internal
*/
Client.prototype.processServerUpdate = function () {
// noinspection JSUnresolvedVariable
if (this.info.connect_urls && this.info.connect_urls.length > 0) {
// parse the infos
const tmp = {}
this.info.connect_urls.forEach((server) => {
const u = 'nats://' + server
const s = new Server(new url.URL(u))
// implicit servers are ones added via the info connect_urls
s.implicit = true
tmp[s.url.href] = s
})
// remove implicit servers that are no longer reported
const toDelete = []
this.servers.forEach((s, index) => {
const u = s.url.href
if (s.implicit && this.currentServer.url.href !== u && tmp[u] === undefined) {
// server was removed
toDelete.push(index)
}
// remove this entry from reported
delete tmp[u]
})
// perform the deletion
toDelete.reverse()
toDelete.forEach((index) => {
this.servers.splice(index, 1)
})
// remaining servers are new
const newURLs = []
for (const k in tmp) {
if (Object.hasOwnProperty.call(tmp, k)) {
this.servers.push(tmp[k])
newURLs.push(k)
}
}
if (newURLs.length) {
this.emit(InternalEvents.Servers, newURLs)
}
}
}
/**
* Process a delivered message and deliver to appropriate subscriber.

@@ -1431,16 +1303,2 @@ *

/**
* Push a new cluster server.
*
* @param {String} uri
* @api public
*/
Client.prototype.addServer = function (uri) {
this.servers.push(new Server(new url.URL(uri)))
if (this.options.noRandomize !== true) {
shuffle(this.servers)
}
}
/**
* Flush outbound queue to server and call optional callback when server has processed

@@ -1489,4 +1347,6 @@ * all data.

subs.forEach((sub) => {
sub.drain((err) => {
errs.push(err)
sub.doDrain((err) => {
if (err) {
errs.push(err)
}
drains.push(sub)

@@ -1496,2 +1356,3 @@ if (drains.length === subs.length) {

this.flush(() => {
this.emit(Events.Close)
this.close()

@@ -1512,6 +1373,6 @@ if (typeof callback === 'function') {

})
// no subscriptions
if (subs.length === 0) {
this.noMorePublishing = true
this.emit(Events.Close)
this.close()

@@ -1749,9 +1610,9 @@ if (typeof callback === 'function') {

}
this.currentServer.reconnects += 1
this.servers.getCurrent().reconnects += 1
this.reconnects += 1
this.createConnection()
if (this.currentServer.didConnect === true) {
if (this.servers.getCurrent().didConnect === true) {
this.emit(Events.Reconnecting)
}
this.currentServer.lastConnect = Date.now()
this.servers.getCurrent().lastConnect = Date.now()
}

@@ -1766,3 +1627,3 @@

// Just return if no more servers
if (this.servers.length === 0) {
if (this.servers.isEmpty()) {
return

@@ -1777,3 +1638,3 @@ }

let wait = 0
if (this.servers[0].didConnect === true) {
if (this.servers.peek().didConnect === true) {
wait = this.options.reconnectTimeWait

@@ -1785,7 +1646,7 @@ }

let maxWait = wait
for (let i = 0; i < this.servers.length; i++) {
const srv = this.selectServer()
for (let i = 0; i < this.servers.length(); i++) {
const srv = this.servers.selectServer()
if (srv.reconnects >= this.options.maxReconnectAttempts && this.options.maxReconnectAttempts !== -1) {
// remove the server - we already tried connecting max number of times
this.servers.pop()
this.servers.remove(srv)
continue

@@ -1811,3 +1672,3 @@ }

if (this.servers.length === 0) {
if (this.servers.isEmpty()) {
// we have no more servers

@@ -1918,2 +1779,8 @@ this.cleanupTimers()

}
this.doDrain(callback)
}
// internal sub drain doesn't check if the connection
// is draining as this is the method used to drain the connection
Sub.prototype.doDrain = function (callback) {
if (this.closed) {

@@ -1971,6 +1838,3 @@ if (typeof callback === 'function') {

Sub.prototype.getReceived = function () {
if (!this.closed) {
return this.received
}
return -1
return this.received
}

@@ -1993,2 +1857,6 @@

Sub.prototype.getID = function () {
return this.sid
}
Sub.prototype.subCmd = function () {

@@ -2146,1 +2014,156 @@ if (this.queue) {

}
function Servers (nc) {
this.nc = nc
this.pool = []
this.current = null
}
Servers.prototype.init = function () {
if (Array.isArray(this.nc.options.servers)) {
this.nc.options.servers.forEach((u) => {
this.pool.push(new Server(new url.URL(u)))
})
// Randomize if needed
if (this.nc.options.noRandomize !== true) {
this.shuffle()
}
// if they gave an URL we should add it if different
if (this.nc.options.url !== undefined && this.pool.indexOf(this.nc.options.url) === -1) {
// Make url first element so it is attempted first
this.pool.unshift(new Server(new url.URL(this.nc.options.url)))
}
} else {
if (this.nc.options.url === undefined) {
this.nc.options.url = DEFAULT_URI
}
this.pool.push(new Server(new url.URL(this.nc.options.url)))
}
}
Servers.prototype.selectServer = function () {
const server = this.pool.shift()
// Place in client context.
this.current = server
const un = server.url.username || ''
const pw = server.url.password || ''
let up = ''
if (un !== '' && pw !== '') {
up = un + ':' + pw
} else if (un !== '') {
up = un
}
if (up !== '') {
const auth = up.split(':')
if (auth.length !== 1) {
if (this.nc.options.user === undefined) {
this.nc.user = auth[0]
}
if (this.nc.options.pass === undefined) {
this.nc.pass = auth[1]
}
} else {
if (this.nc.options.token === undefined) {
this.nc.token = auth[0]
}
}
}
this.pool.push(server)
return server
}
Servers.prototype.processServerUpdate = function () {
// noinspection JSUnresolvedVariable
if (this.nc.info.connect_urls && this.nc.info.connect_urls.length > 0) {
// parse the infos
const tmp = {}
this.nc.info.connect_urls.forEach((server) => {
const u = 'nats://' + server
const s = new Server(new url.URL(u))
// implicit servers are ones added via the info connect_urls
s.implicit = true
tmp[s.url.href] = s
})
// remove implicit servers that are no longer reported
const toDelete = []
this.pool.forEach((s, index) => {
const u = s.url.href
if (s.implicit && this.current.url.href !== u && tmp[u] === undefined) {
// server was removed
toDelete.push(index)
}
// remove this entry from reported
delete tmp[u]
})
// perform the deletion
toDelete.reverse()
toDelete.forEach((index) => {
this.pool.splice(index, 1)
})
// remaining servers are new
const newURLs = []
for (const k in tmp) {
if (Object.hasOwnProperty.call(tmp, k)) {
this.pool.push(tmp[k])
newURLs.push(k)
}
}
if (newURLs.length) {
this.nc.emit(InternalEvents.Servers, newURLs)
}
}
}
Servers.prototype.add = function (uri) {
this.pool.push(new Server(new url.URL(uri)))
if (this.nc.options.noRandomize !== true) {
this.shuffle()
}
}
Servers.prototype.length = function () {
return this.pool.length
}
Servers.prototype.peek = function () {
return this.pool[0]
}
Servers.prototype.hasTLS = function () {
this.pool.find((s) => {
return (s.url.protocol === 'tls' || s.url.protocol === 'tls:')
})
}
Servers.prototype.remove = function (s) {
this.pool = this.pool.filter((t) => {
return t !== s
})
}
Servers.prototype.getAll = function () {
return this.pool
}
Servers.prototype.isEmpty = function () {
return this.pool.length === 0
}
Servers.prototype.getCurrent = function () {
return this.current
}
Servers.prototype.shuffle = function () {
for (let i = this.pool.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1))
const temp = this.pool[i]
this.pool[i] = this.pool[j]
this.pool[j] = temp
}
}
{
"name": "nats",
"version": "2.0.0-16",
"version": "2.0.0-17",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -5,0 +5,0 @@ "keywords": [

@@ -426,3 +426,3 @@ # NATS.js - Node.js Client

nc.on('connect', (nc) => {
console.log(`connect to ${nc.currentServer.url.host}`)
console.log(`connect to ${nc.servers.getCurrent().url.host}`)
})

@@ -443,3 +443,3 @@

nc.on('reconnect', (nc) => {
console.log(`reconnect to ${nc.currentServer.url.host}`)
console.log(`reconnect to ${nc.servers.getCurrent().url.host}`)
})

@@ -446,0 +446,0 @@

@@ -6,2 +6,3 @@ - [ ] try to bind async server errors to matching callbacks

- [X] m.respond() - to reply to requests
- [X] Request object needs `cancel()`
- [X] Request object needs `cancel()`
- [ ] Implement payload (string, json, binary)
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc