Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 2.0.0-15 to 2.0.0-16

612

lib/nats.js

@@ -872,3 +872,4 @@ /*

this.ssid = 0
this.subs = {}
this.subs = new Subs(this)
this.reqs = new Reqs(this)
this.reconnects = 0

@@ -907,21 +908,8 @@ this.connected = false

this.cancelHeartbeat()
if (this.respmux && this.respmux.requestMap) {
for (const p in this.respmux.requestMap) {
if (Object.hasOwnProperty.call(this.respmux.requestMap, p)) {
this.cancelMuxRequest(p)
}
}
}
if (this.subs) {
for (const p in this.subs) {
if (Object.hasOwnProperty.call(this.subs, p)) {
const sub = this.subs[p]
if (sub.timeout) {
clearTimeout(sub.timeout)
delete sub.timeout
}
}
}
this.subs.clearTimers()
}
if (this.reqs) {
this.reqs.clearTimers()
}
}

@@ -1058,24 +1046,6 @@

let protos = ''
for (const sid in this.subs) {
if (Object.hasOwnProperty.call(this.subs, sid)) {
const sub = this.subs[sid]
let proto
if (sub.queue) {
proto = [SUB, sub.subject, sub.queue, sid + CR_LF]
} else {
proto = [SUB, sub.subject, sid + CR_LF]
}
protos += proto.join(SPC)
if (sub.max) {
const max = sub.max - sub.received
if (max > 0) {
proto = [UNSUB, sid, max + CR_LF]
} else {
proto = [UNSUB, sid + CR_LF]
}
protos += proto.join(SPC)
}
}
}
this.subs.getAll().forEach((s) => {
protos += s.subCmd()
protos += s.reUnsubCmd()
})
if (protos.length > 0) {

@@ -1351,3 +1321,3 @@ this.stream.write(protos)

Client.prototype.processMsg = function () {
const sub = this.subs[this.payload.sid]
const sub = this.subs.get(this.payload.sid)
if (sub !== undefined) {

@@ -1366,6 +1336,5 @@ sub.received += 1

if (sub.received === sub.max) {
delete this.subs[this.payload.sid]
this.emit(Events.Unsubscribe, { sid: this.payload.sid, subject: sub.subject, queue: sub.queue })
sub.unsubscribe()
} else if (sub.received > sub.max) {
this.unsubscribe(this.payload.sid)
sub.unsubscribe()
sub.callback = null

@@ -1429,13 +1398,10 @@ }

const m = srvError.match(/^'Permissions Violation for (Subscription|Publish) to "(.+)"'/)
let sub
if (m) {
switch (m[1]) {
case 'Subscription':
for (const p in this.subs) {
if (Object.hasOwnProperty.call(this.subs, p)) {
const sub = this.subs[p]
if (sub.subject === m[2]) {
sub.callback(err)
sub.s.unsubscribe()
}
}
sub = this.subs.getBySubject(m[2])
if (sub) {
sub.callback(err)
sub.unsubscribe()
}

@@ -1503,15 +1469,8 @@ break

this.draining = true
const subs = []
const subs = this.subs.getAll()
const drains = []
for (const sid in this.subs) {
if (Object.hasOwnProperty.call(this.subs, sid)) {
const sub = this.subs[sid]
sub.sid = sid
subs.push(sub)
}
}
const errs = []
subs.forEach((sub) => {
this.drainSubscription(sub.sid, (err) => {
sub.drain((err) => {
errs.push(err)

@@ -1709,112 +1668,6 @@ drains.push(sub)

}
this.ssid += 1
const sub = {
ssid: this.ssid,
subject: subject,
callback: callback,
received: 0
}
this.subs[this.ssid] = sub
let proto
if (typeof opts.queue === 'string') {
this.subs[this.ssid].queue = opts.queue
proto = [SUB, subject, opts.queue, this.ssid + CR_LF]
} else {
proto = [SUB, subject, this.ssid + CR_LF]
}
this.sendCommand(proto.join(SPC))
this.emit(Events.Subscribe, { sid: this.ssid, subject: subject, queue: opts.queue })
if (opts.max) {
this.unsubscribe(this.ssid, opts.max)
}
sub.s = new Sub(this, this.ssid)
if (opts.timeout) {
sub.s.setTimeout(opts.timeout, opts.expected)
}
return sub.s
return this.subs.addSubscription(subject, callback, opts)
}
/**
* @api private
*/
Client.prototype.unsubscribe = function (sid, max) {
if (!sid || this.closed) {
return
}
// in the case of new muxRequest, it is possible they want perform
// an unsubscribe with the returned 'sid'. Intercept that and clear
// the request configuration. Mux requests are always negative numbers
if (sid < 0) {
this.cancelMuxRequest(sid)
return
}
let proto
if (max) {
proto = [UNSUB, sid, max + CR_LF]
} else {
proto = [UNSUB, sid + CR_LF]
}
this.sendCommand(proto.join(SPC))
const sub = this.subs[sid]
if (sub === undefined) {
return
}
sub.max = max
if (sub.max === undefined || (sub.received >= sub.max)) {
// remove any timeouts that may be pending
if (sub.timeout) {
clearTimeout(sub.timeout)
sub.timeout = null
}
delete this.subs[sid]
this.emit(Events.Unsubscribe, { sid: this.ssid, subject: sub.subject, queue: sub.queue })
}
}
/**
* @private
*/
Client.prototype.drainSubscription = function (sid, callback) {
if (this.handledClosedOrDraining(callback)) {
return
}
const sub = this.subs[sid]
if (sub === undefined) {
if (typeof callback === 'function') {
callback()
}
return
}
if (sub.draining) {
if (typeof callback === 'function') {
callback(new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING))
} else {
throw (new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING))
}
return
}
sub.draining = true
const proto = [UNSUB, sid + CR_LF]
this.sendCommand(proto.join(SPC))
this.flush((err) => {
if (sub.timeout) {
clearTimeout(sub.timeout)
sub.timeout = null
}
delete this.subs[sid]
if (typeof callback === 'function') {
callback(err)
}
this.emit(Events.Unsubscribe, { sid: this.ssid, subject: sub.subject, queue: sub.queue })
})
}
/**
* Publish a message with an implicit inbox listener as the reply. Message is optional.

@@ -1843,3 +1696,3 @@ * This should be treated as a subscription. You can optionally indicate how many

if (this.handledSubArgErrors(subject, callback, opts)) {
return 0
return null
}

@@ -1853,17 +1706,8 @@ if (!opts.max) {

const sub = this.subscribe(inbox, callback, opts)
this.doPublish(subject, inbox, data, callback)
return new Req(sub.nc, sub.sid)
this.doPublish(subject, inbox, data)
return new Req(sub.nc, sub.sid, inbox, callback, opts, sub)
} else {
const conf = this.initMuxRequestDetails(callback, opts.max)
this.doPublish(subject, conf.inbox, data, callback)
if (opts.timeout) {
conf.timeout = setTimeout(() => {
if (conf.callback) {
conf.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + conf.id, ErrorCode.REQ_TIMEOUT))
}
this.cancelMuxRequest(conf.token)
}, opts.timeout)
}
conf.r = new Req(this, conf.id)
return conf.r
const req = this.reqs.addRequest(callback, opts)
this.doPublish(subject, req.subject, data)
return req
}

@@ -1873,119 +1717,2 @@ }

/**
* Strips the prefix of the request reply to derive the token.
* This is internal and only used by the new requestOne.
*
* @api private
*/
Client.prototype.extractToken = function (subject) {
return subject.substr(this.respmux.inboxPrefixLen)
}
/**
* Creates a subscription for the global inbox in the new requestOne.
* Request tokens, timer, and callbacks are tracked here.
*
* @api private
*/
Client.prototype.createResponseMux = function () {
if (!this.respmux) {
const inbox = this.createInbox()
const ginbox = inbox + '.*'
const sid = this.subscribe(ginbox, (err, msg) => {
const token = this.extractToken(msg.subject)
const conf = this.getMuxRequestConfig(token)
if (conf) {
if (conf.callback) {
conf.callback(err, msg)
}
if (Object.hasOwnProperty.call(conf, 'expected')) {
conf.received++
if (conf.received >= conf.expected) {
this.cancelMuxRequest(token)
this.emit(Events.Unsubscribe, { sid: sid, subject: msg.subject })
}
}
}
})
this.respmux = {}
this.respmux.inbox = inbox
this.respmux.inboxPrefixLen = inbox.length + 1
this.respmux.subscriptionID = sid
this.respmux.requestMap = {}
this.respmux.nextID = -1
this.respmux.length = 0
}
return this.respmux.inbox
}
/**
* Stores the request callback and other details
*
* @api private
*/
Client.prototype.initMuxRequestDetails = function (callback, expected) {
const ginbox = this.createResponseMux()
const token = nuid.next()
const inbox = ginbox + '.' + token
const conf = {
token: token,
callback: callback,
inbox: inbox,
id: this.respmux.nextID--,
received: 0
}
if (expected > 0) {
conf.expected = expected
}
this.respmux.requestMap[token] = conf
this.respmux.length++
return conf
}
/**
* Returns the mux request configuration
* @param token
* @returns Object
*/
Client.prototype.getMuxRequestConfig = function (token) {
// if the token is a number, we have a fake sid, find the request
if (typeof token === 'number') {
let entry = null
for (const p in this.respmux.requestMap) {
if (Object.hasOwnProperty.call(this.respmux.requestMap, p)) {
const v = this.respmux.requestMap[p]
if (v.id === token) {
entry = v
break
}
}
}
if (entry) {
// noinspection JSUnresolvedVariable
token = entry.token
}
}
return this.respmux.requestMap[token]
}
/**
* Cancels the mux request
*
* @api private
*/
Client.prototype.cancelMuxRequest = function (token) {
const conf = this.getMuxRequestConfig(token)
if (conf) {
if (conf.timeout) {
clearTimeout(conf.timeout)
}
// the token could be sid, so use the one in the conf
delete this.respmux.requestMap[conf.token]
this.respmux.length--
}
return conf
}
/**
* Report number of outstanding subscriptions on this connection.

@@ -1997,3 +1724,3 @@ *

Client.prototype.numSubscriptions = function () {
return Object.keys(this.subs).length
return this.subs.length
}

@@ -2081,26 +1808,128 @@

function Sub (nc, sid) {
function Subs (nc) {
this.sids = 0
this.nc = nc
this.sidToSub = {}
this.length = 0
}
Subs.prototype.addSubscription = function (subject, callback, opts) {
this.sids++
const sid = this.sids
const sub = new Sub(this.nc, sid, subject, callback, opts)
this.sidToSub[sid] = sub
this.length++
this.nc.sendCommand(sub.subCmd())
if (sub.getMax() > 0) {
sub.unsubscribe(sub.getMax())
}
this.nc.emit(Events.Subscribe, { sid: sub.sid, subject: sub.subject, queue: sub.queue })
return sub
}
Subs.prototype.remove = function (sub) {
if (sub) {
if (sub.timeout) {
clearTimeout(sub.timeout)
sub.timeout = null
}
delete this.sidToSub[sub.sid]
sub.closed = true
this.length--
this.nc.emit(Events.Unsubscribe, { sid: sub.sid, subject: sub.subject, queue: sub.queue })
}
}
Subs.prototype.get = function (sid) {
return this.sidToSub[sid]
}
Subs.prototype.getAll = function () {
const a = []
for (const p in this.sidToSub) {
if (Object.hasOwnProperty.call(this.sidToSub, p)) {
const sub = this.sidToSub[p]
a.push(sub)
}
}
return a
}
Subs.prototype.getBySubject = function (subj) {
return this.getAll().find((e) => {
return subj === e.subject
})
}
Subs.prototype.clearTimers = function () {
this.getAll().forEach((s) => {
s.cancelTimeout()
})
}
function Sub (nc, sid, subject, callback, opts) {
this.nc = nc
this.closed = false
this.sid = sid
this.subject = subject
this.callback = callback
this.received = 0
this.queue = opts.queue || null
this.max = opts.max || undefined
if (opts.timeout) {
this.setTimeout(opts.timeout, opts.expected)
}
}
Sub.prototype.unsubscribe = function (max) {
this.nc.unsubscribe(this.sid, max)
if (this.nc.closed) {
return
}
if (!this.closed) {
this.max = max
this.nc.sendCommand(this.unsubCmd())
if (this.max === undefined || (this.received >= this.max)) {
this.nc.subs.remove(this)
}
}
}
Sub.prototype.drain = function (callback) {
this.nc.drainSubscription(this.sid, callback)
if (this.nc.handledClosedOrDraining(callback)) {
return
}
if (this.closed) {
if (typeof callback === 'function') {
callback()
}
return
}
if (this.draining) {
if (typeof callback === 'function') {
callback(new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING))
} else {
throw (new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING))
}
return
}
this.draining = true
this.nc.sendCommand(this.unsubCmd())
this.nc.flush((err) => {
this.nc.subs.remove(this)
if (typeof callback === 'function') {
callback(err)
}
})
}
Sub.prototype.hasTimeout = function () {
const sub = this.nc.subs[this.sid]
return sub && sub.timeout
return !this.closed && this.timeout
}
Sub.prototype.cancelTimeout = function () {
const sub = this.nc.subs[this.sid]
if (sub && sub.timeout) {
clearTimeout(sub.timeout)
delete sub.timeout
delete sub.expected
if (!this.closed && this.timeout) {
clearTimeout(this.timeout)
delete this.timeout
delete this.expected
return true

@@ -2113,8 +1942,7 @@ }

this.cancelTimeout()
const sub = this.nc.subs[this.sid]
if (sub) {
sub.expected = (max || 1)
sub.timeout = setTimeout(() => {
if (!this.closed) {
this.expected = (max || 1)
this.timeout = setTimeout(() => {
this.unsubscribe()
this.nc.dispatchError(sub.callback, new NatsError(TIMEOUT_MSG, ErrorCode.TIMEOUT_ERR))
this.nc.dispatchError(this.callback, new NatsError(TIMEOUT_MSG, ErrorCode.TIMEOUT_ERR))
}, millis)

@@ -2127,5 +1955,4 @@ return true

Sub.prototype.getReceived = function () {
const sub = this.nc.subs[this.sid]
if (sub) {
return sub.received
if (!this.closed) {
return this.received
}

@@ -2136,5 +1963,4 @@ return -1

Sub.prototype.getMax = function () {
const sub = this.nc.subs[this.sid]
if (sub) {
return sub.max || -1
if (!this.closed) {
return this.max || -1
}

@@ -2145,18 +1971,148 @@ return -1

Sub.prototype.isCancelled = function () {
const sub = this.nc.subs[this.sid]
return sub === undefined
return this.closed
}
Sub.prototype.isDraining = function () {
const sub = this.nc.subs[this.sid]
return sub && sub.draining
return !this.closed && this.draining
}
function Req (nc, sid) {
Sub.prototype.subCmd = function () {
if (this.queue) {
return [SUB, this.subject, this.queue, this.sid + CR_LF].join(SPC)
}
return [SUB, this.subject, this.sid + CR_LF].join(SPC)
}
Sub.prototype.reUnsubCmd = function () {
if (this.max) {
const max = this.max - this.received
if (max > 0) {
return [UNSUB, this.sid, max + CR_LF].join(SPC)
}
return [UNSUB, this.sid + CR_LF].join(SPC)
}
return ''
}
Sub.prototype.unsubCmd = function () {
if (this.max) {
return [UNSUB, this.sid, this.max + CR_LF].join(SPC)
}
return [UNSUB, this.sid + CR_LF].join(SPC)
}
function Reqs (nc) {
this.sids = 0
this.nc = nc
this.inbox = this.nc.createInbox()
this.inboxPrefixLen = this.inbox.length + 1
this.tokenToReq = {}
this.length = 0
this.sub = null
}
Reqs.prototype.init = function () {
if (this.sub === null) {
this.sub = this.nc.subscribe(`${this.inbox}.*`, (err, m) => {
const token = this.extractToken(m.subject)
const req = this.get(token)
if (req) {
req.received++
if (req.callback && typeof req.callback === 'function') {
req.callback(err, m)
}
if (Object.hasOwnProperty.call(req, 'max')) {
if (req.received >= req.max) {
req.cancel()
}
}
}
})
}
}
Reqs.prototype.clearTimers = function () {
this.getAll().forEach((s) => {
s.cancel()
})
}
Reqs.prototype.addRequest = function (callback, opts) {
// insure we have a subscription
this.init()
this.sids--
const token = nuid.next()
const subject = `${this.inbox}.${token}`
const sid = this.sids
const req = new Req(this.nc, sid, subject, token, callback, opts)
this.tokenToReq[token] = req
this.length++
if (opts.timeout) {
req.timeout = setTimeout(() => {
if (req.callback) {
req.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + req.sid, ErrorCode.REQ_TIMEOUT))
}
this.nc.reqs.remove(req)
}, opts.timeout)
}
return req
}
Reqs.prototype.remove = function (req) {
if (req) {
if (req.timeout) {
clearTimeout(req.timeout)
req.timeout = null
}
delete this.tokenToReq[req.token]
req.closed = true
this.length--
this.nc.emit(Events.Unsubscribe, { sid: req.sid, subject: req.subject })
}
}
Reqs.prototype.extractToken = function (s) {
return s.substr(this.inboxPrefixLen)
}
Reqs.prototype.get = function (token) {
return this.tokenToReq[token]
}
Reqs.prototype.getAll = function () {
const a = []
for (const p in this.tokenToReq) {
if (Object.hasOwnProperty.call(this.tokenToReq, p)) {
const req = this.tokenToReq[p]
a.push(req)
}
}
return a
}
function Req (nc, sid, subject, token, callback, opts, sub) {
this.nc = nc
this.sid = sid
this.closed = false
this.subject = subject
this.token = token
this.callback = callback
this.received = 0
this.max = opts.max || undefined
this.sub = sub
}
Req.prototype.cancel = function () {
this.nc.unsubscribe(this.sid)
if (this.nc.closed) {
return
}
if (!this.closed) {
if (this.sub) {
this.sub.unsubscribe()
} else {
this.nc.reqs.remove(this)
}
}
}

@@ -2163,0 +2119,0 @@

{
"name": "nats",
"version": "2.0.0-15",
"version": "2.0.0-16",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -5,0 +5,0 @@ "keywords": [

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc