Comparing version 2.0.0-15 to 2.0.0-16
612
lib/nats.js
@@ -872,3 +872,4 @@ /* | ||
this.ssid = 0 | ||
this.subs = {} | ||
this.subs = new Subs(this) | ||
this.reqs = new Reqs(this) | ||
this.reconnects = 0 | ||
@@ -907,21 +908,8 @@ this.connected = false | ||
this.cancelHeartbeat() | ||
if (this.respmux && this.respmux.requestMap) { | ||
for (const p in this.respmux.requestMap) { | ||
if (Object.hasOwnProperty.call(this.respmux.requestMap, p)) { | ||
this.cancelMuxRequest(p) | ||
} | ||
} | ||
} | ||
if (this.subs) { | ||
for (const p in this.subs) { | ||
if (Object.hasOwnProperty.call(this.subs, p)) { | ||
const sub = this.subs[p] | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout) | ||
delete sub.timeout | ||
} | ||
} | ||
} | ||
this.subs.clearTimers() | ||
} | ||
if (this.reqs) { | ||
this.reqs.clearTimers() | ||
} | ||
} | ||
@@ -1058,24 +1046,6 @@ | ||
let protos = '' | ||
for (const sid in this.subs) { | ||
if (Object.hasOwnProperty.call(this.subs, sid)) { | ||
const sub = this.subs[sid] | ||
let proto | ||
if (sub.queue) { | ||
proto = [SUB, sub.subject, sub.queue, sid + CR_LF] | ||
} else { | ||
proto = [SUB, sub.subject, sid + CR_LF] | ||
} | ||
protos += proto.join(SPC) | ||
if (sub.max) { | ||
const max = sub.max - sub.received | ||
if (max > 0) { | ||
proto = [UNSUB, sid, max + CR_LF] | ||
} else { | ||
proto = [UNSUB, sid + CR_LF] | ||
} | ||
protos += proto.join(SPC) | ||
} | ||
} | ||
} | ||
this.subs.getAll().forEach((s) => { | ||
protos += s.subCmd() | ||
protos += s.reUnsubCmd() | ||
}) | ||
if (protos.length > 0) { | ||
@@ -1351,3 +1321,3 @@ this.stream.write(protos) | ||
Client.prototype.processMsg = function () { | ||
const sub = this.subs[this.payload.sid] | ||
const sub = this.subs.get(this.payload.sid) | ||
if (sub !== undefined) { | ||
@@ -1366,6 +1336,5 @@ sub.received += 1 | ||
if (sub.received === sub.max) { | ||
delete this.subs[this.payload.sid] | ||
this.emit(Events.Unsubscribe, { sid: this.payload.sid, subject: sub.subject, queue: sub.queue }) | ||
sub.unsubscribe() | ||
} else if (sub.received > sub.max) { | ||
this.unsubscribe(this.payload.sid) | ||
sub.unsubscribe() | ||
sub.callback = null | ||
@@ -1429,13 +1398,10 @@ } | ||
const m = srvError.match(/^'Permissions Violation for (Subscription|Publish) to "(.+)"'/) | ||
let sub | ||
if (m) { | ||
switch (m[1]) { | ||
case 'Subscription': | ||
for (const p in this.subs) { | ||
if (Object.hasOwnProperty.call(this.subs, p)) { | ||
const sub = this.subs[p] | ||
if (sub.subject === m[2]) { | ||
sub.callback(err) | ||
sub.s.unsubscribe() | ||
} | ||
} | ||
sub = this.subs.getBySubject(m[2]) | ||
if (sub) { | ||
sub.callback(err) | ||
sub.unsubscribe() | ||
} | ||
@@ -1503,15 +1469,8 @@ break | ||
this.draining = true | ||
const subs = [] | ||
const subs = this.subs.getAll() | ||
const drains = [] | ||
for (const sid in this.subs) { | ||
if (Object.hasOwnProperty.call(this.subs, sid)) { | ||
const sub = this.subs[sid] | ||
sub.sid = sid | ||
subs.push(sub) | ||
} | ||
} | ||
const errs = [] | ||
subs.forEach((sub) => { | ||
this.drainSubscription(sub.sid, (err) => { | ||
sub.drain((err) => { | ||
errs.push(err) | ||
@@ -1709,112 +1668,6 @@ drains.push(sub) | ||
} | ||
this.ssid += 1 | ||
const sub = { | ||
ssid: this.ssid, | ||
subject: subject, | ||
callback: callback, | ||
received: 0 | ||
} | ||
this.subs[this.ssid] = sub | ||
let proto | ||
if (typeof opts.queue === 'string') { | ||
this.subs[this.ssid].queue = opts.queue | ||
proto = [SUB, subject, opts.queue, this.ssid + CR_LF] | ||
} else { | ||
proto = [SUB, subject, this.ssid + CR_LF] | ||
} | ||
this.sendCommand(proto.join(SPC)) | ||
this.emit(Events.Subscribe, { sid: this.ssid, subject: subject, queue: opts.queue }) | ||
if (opts.max) { | ||
this.unsubscribe(this.ssid, opts.max) | ||
} | ||
sub.s = new Sub(this, this.ssid) | ||
if (opts.timeout) { | ||
sub.s.setTimeout(opts.timeout, opts.expected) | ||
} | ||
return sub.s | ||
return this.subs.addSubscription(subject, callback, opts) | ||
} | ||
/** | ||
* @api private | ||
*/ | ||
Client.prototype.unsubscribe = function (sid, max) { | ||
if (!sid || this.closed) { | ||
return | ||
} | ||
// in the case of new muxRequest, it is possible they want perform | ||
// an unsubscribe with the returned 'sid'. Intercept that and clear | ||
// the request configuration. Mux requests are always negative numbers | ||
if (sid < 0) { | ||
this.cancelMuxRequest(sid) | ||
return | ||
} | ||
let proto | ||
if (max) { | ||
proto = [UNSUB, sid, max + CR_LF] | ||
} else { | ||
proto = [UNSUB, sid + CR_LF] | ||
} | ||
this.sendCommand(proto.join(SPC)) | ||
const sub = this.subs[sid] | ||
if (sub === undefined) { | ||
return | ||
} | ||
sub.max = max | ||
if (sub.max === undefined || (sub.received >= sub.max)) { | ||
// remove any timeouts that may be pending | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout) | ||
sub.timeout = null | ||
} | ||
delete this.subs[sid] | ||
this.emit(Events.Unsubscribe, { sid: this.ssid, subject: sub.subject, queue: sub.queue }) | ||
} | ||
} | ||
/** | ||
* @private | ||
*/ | ||
Client.prototype.drainSubscription = function (sid, callback) { | ||
if (this.handledClosedOrDraining(callback)) { | ||
return | ||
} | ||
const sub = this.subs[sid] | ||
if (sub === undefined) { | ||
if (typeof callback === 'function') { | ||
callback() | ||
} | ||
return | ||
} | ||
if (sub.draining) { | ||
if (typeof callback === 'function') { | ||
callback(new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING)) | ||
} else { | ||
throw (new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING)) | ||
} | ||
return | ||
} | ||
sub.draining = true | ||
const proto = [UNSUB, sid + CR_LF] | ||
this.sendCommand(proto.join(SPC)) | ||
this.flush((err) => { | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout) | ||
sub.timeout = null | ||
} | ||
delete this.subs[sid] | ||
if (typeof callback === 'function') { | ||
callback(err) | ||
} | ||
this.emit(Events.Unsubscribe, { sid: this.ssid, subject: sub.subject, queue: sub.queue }) | ||
}) | ||
} | ||
/** | ||
* Publish a message with an implicit inbox listener as the reply. Message is optional. | ||
@@ -1843,3 +1696,3 @@ * This should be treated as a subscription. You can optionally indicate how many | ||
if (this.handledSubArgErrors(subject, callback, opts)) { | ||
return 0 | ||
return null | ||
} | ||
@@ -1853,17 +1706,8 @@ if (!opts.max) { | ||
const sub = this.subscribe(inbox, callback, opts) | ||
this.doPublish(subject, inbox, data, callback) | ||
return new Req(sub.nc, sub.sid) | ||
this.doPublish(subject, inbox, data) | ||
return new Req(sub.nc, sub.sid, inbox, callback, opts, sub) | ||
} else { | ||
const conf = this.initMuxRequestDetails(callback, opts.max) | ||
this.doPublish(subject, conf.inbox, data, callback) | ||
if (opts.timeout) { | ||
conf.timeout = setTimeout(() => { | ||
if (conf.callback) { | ||
conf.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + conf.id, ErrorCode.REQ_TIMEOUT)) | ||
} | ||
this.cancelMuxRequest(conf.token) | ||
}, opts.timeout) | ||
} | ||
conf.r = new Req(this, conf.id) | ||
return conf.r | ||
const req = this.reqs.addRequest(callback, opts) | ||
this.doPublish(subject, req.subject, data) | ||
return req | ||
} | ||
@@ -1873,119 +1717,2 @@ } | ||
/** | ||
* Strips the prefix of the request reply to derive the token. | ||
* This is internal and only used by the new requestOne. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.extractToken = function (subject) { | ||
return subject.substr(this.respmux.inboxPrefixLen) | ||
} | ||
/** | ||
* Creates a subscription for the global inbox in the new requestOne. | ||
* Request tokens, timer, and callbacks are tracked here. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.createResponseMux = function () { | ||
if (!this.respmux) { | ||
const inbox = this.createInbox() | ||
const ginbox = inbox + '.*' | ||
const sid = this.subscribe(ginbox, (err, msg) => { | ||
const token = this.extractToken(msg.subject) | ||
const conf = this.getMuxRequestConfig(token) | ||
if (conf) { | ||
if (conf.callback) { | ||
conf.callback(err, msg) | ||
} | ||
if (Object.hasOwnProperty.call(conf, 'expected')) { | ||
conf.received++ | ||
if (conf.received >= conf.expected) { | ||
this.cancelMuxRequest(token) | ||
this.emit(Events.Unsubscribe, { sid: sid, subject: msg.subject }) | ||
} | ||
} | ||
} | ||
}) | ||
this.respmux = {} | ||
this.respmux.inbox = inbox | ||
this.respmux.inboxPrefixLen = inbox.length + 1 | ||
this.respmux.subscriptionID = sid | ||
this.respmux.requestMap = {} | ||
this.respmux.nextID = -1 | ||
this.respmux.length = 0 | ||
} | ||
return this.respmux.inbox | ||
} | ||
/** | ||
* Stores the request callback and other details | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.initMuxRequestDetails = function (callback, expected) { | ||
const ginbox = this.createResponseMux() | ||
const token = nuid.next() | ||
const inbox = ginbox + '.' + token | ||
const conf = { | ||
token: token, | ||
callback: callback, | ||
inbox: inbox, | ||
id: this.respmux.nextID--, | ||
received: 0 | ||
} | ||
if (expected > 0) { | ||
conf.expected = expected | ||
} | ||
this.respmux.requestMap[token] = conf | ||
this.respmux.length++ | ||
return conf | ||
} | ||
/** | ||
* Returns the mux request configuration | ||
* @param token | ||
* @returns Object | ||
*/ | ||
Client.prototype.getMuxRequestConfig = function (token) { | ||
// if the token is a number, we have a fake sid, find the request | ||
if (typeof token === 'number') { | ||
let entry = null | ||
for (const p in this.respmux.requestMap) { | ||
if (Object.hasOwnProperty.call(this.respmux.requestMap, p)) { | ||
const v = this.respmux.requestMap[p] | ||
if (v.id === token) { | ||
entry = v | ||
break | ||
} | ||
} | ||
} | ||
if (entry) { | ||
// noinspection JSUnresolvedVariable | ||
token = entry.token | ||
} | ||
} | ||
return this.respmux.requestMap[token] | ||
} | ||
/** | ||
* Cancels the mux request | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.cancelMuxRequest = function (token) { | ||
const conf = this.getMuxRequestConfig(token) | ||
if (conf) { | ||
if (conf.timeout) { | ||
clearTimeout(conf.timeout) | ||
} | ||
// the token could be sid, so use the one in the conf | ||
delete this.respmux.requestMap[conf.token] | ||
this.respmux.length-- | ||
} | ||
return conf | ||
} | ||
/** | ||
* Report number of outstanding subscriptions on this connection. | ||
@@ -1997,3 +1724,3 @@ * | ||
Client.prototype.numSubscriptions = function () { | ||
return Object.keys(this.subs).length | ||
return this.subs.length | ||
} | ||
@@ -2081,26 +1808,128 @@ | ||
function Sub (nc, sid) { | ||
function Subs (nc) { | ||
this.sids = 0 | ||
this.nc = nc | ||
this.sidToSub = {} | ||
this.length = 0 | ||
} | ||
Subs.prototype.addSubscription = function (subject, callback, opts) { | ||
this.sids++ | ||
const sid = this.sids | ||
const sub = new Sub(this.nc, sid, subject, callback, opts) | ||
this.sidToSub[sid] = sub | ||
this.length++ | ||
this.nc.sendCommand(sub.subCmd()) | ||
if (sub.getMax() > 0) { | ||
sub.unsubscribe(sub.getMax()) | ||
} | ||
this.nc.emit(Events.Subscribe, { sid: sub.sid, subject: sub.subject, queue: sub.queue }) | ||
return sub | ||
} | ||
Subs.prototype.remove = function (sub) { | ||
if (sub) { | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout) | ||
sub.timeout = null | ||
} | ||
delete this.sidToSub[sub.sid] | ||
sub.closed = true | ||
this.length-- | ||
this.nc.emit(Events.Unsubscribe, { sid: sub.sid, subject: sub.subject, queue: sub.queue }) | ||
} | ||
} | ||
Subs.prototype.get = function (sid) { | ||
return this.sidToSub[sid] | ||
} | ||
Subs.prototype.getAll = function () { | ||
const a = [] | ||
for (const p in this.sidToSub) { | ||
if (Object.hasOwnProperty.call(this.sidToSub, p)) { | ||
const sub = this.sidToSub[p] | ||
a.push(sub) | ||
} | ||
} | ||
return a | ||
} | ||
Subs.prototype.getBySubject = function (subj) { | ||
return this.getAll().find((e) => { | ||
return subj === e.subject | ||
}) | ||
} | ||
Subs.prototype.clearTimers = function () { | ||
this.getAll().forEach((s) => { | ||
s.cancelTimeout() | ||
}) | ||
} | ||
function Sub (nc, sid, subject, callback, opts) { | ||
this.nc = nc | ||
this.closed = false | ||
this.sid = sid | ||
this.subject = subject | ||
this.callback = callback | ||
this.received = 0 | ||
this.queue = opts.queue || null | ||
this.max = opts.max || undefined | ||
if (opts.timeout) { | ||
this.setTimeout(opts.timeout, opts.expected) | ||
} | ||
} | ||
Sub.prototype.unsubscribe = function (max) { | ||
this.nc.unsubscribe(this.sid, max) | ||
if (this.nc.closed) { | ||
return | ||
} | ||
if (!this.closed) { | ||
this.max = max | ||
this.nc.sendCommand(this.unsubCmd()) | ||
if (this.max === undefined || (this.received >= this.max)) { | ||
this.nc.subs.remove(this) | ||
} | ||
} | ||
} | ||
Sub.prototype.drain = function (callback) { | ||
this.nc.drainSubscription(this.sid, callback) | ||
if (this.nc.handledClosedOrDraining(callback)) { | ||
return | ||
} | ||
if (this.closed) { | ||
if (typeof callback === 'function') { | ||
callback() | ||
} | ||
return | ||
} | ||
if (this.draining) { | ||
if (typeof callback === 'function') { | ||
callback(new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING)) | ||
} else { | ||
throw (new NatsError(SUB_DRAINING_MSG, ErrorCode.SUB_DRAINING)) | ||
} | ||
return | ||
} | ||
this.draining = true | ||
this.nc.sendCommand(this.unsubCmd()) | ||
this.nc.flush((err) => { | ||
this.nc.subs.remove(this) | ||
if (typeof callback === 'function') { | ||
callback(err) | ||
} | ||
}) | ||
} | ||
Sub.prototype.hasTimeout = function () { | ||
const sub = this.nc.subs[this.sid] | ||
return sub && sub.timeout | ||
return !this.closed && this.timeout | ||
} | ||
Sub.prototype.cancelTimeout = function () { | ||
const sub = this.nc.subs[this.sid] | ||
if (sub && sub.timeout) { | ||
clearTimeout(sub.timeout) | ||
delete sub.timeout | ||
delete sub.expected | ||
if (!this.closed && this.timeout) { | ||
clearTimeout(this.timeout) | ||
delete this.timeout | ||
delete this.expected | ||
return true | ||
@@ -2113,8 +1942,7 @@ } | ||
this.cancelTimeout() | ||
const sub = this.nc.subs[this.sid] | ||
if (sub) { | ||
sub.expected = (max || 1) | ||
sub.timeout = setTimeout(() => { | ||
if (!this.closed) { | ||
this.expected = (max || 1) | ||
this.timeout = setTimeout(() => { | ||
this.unsubscribe() | ||
this.nc.dispatchError(sub.callback, new NatsError(TIMEOUT_MSG, ErrorCode.TIMEOUT_ERR)) | ||
this.nc.dispatchError(this.callback, new NatsError(TIMEOUT_MSG, ErrorCode.TIMEOUT_ERR)) | ||
}, millis) | ||
@@ -2127,5 +1955,4 @@ return true | ||
Sub.prototype.getReceived = function () { | ||
const sub = this.nc.subs[this.sid] | ||
if (sub) { | ||
return sub.received | ||
if (!this.closed) { | ||
return this.received | ||
} | ||
@@ -2136,5 +1963,4 @@ return -1 | ||
Sub.prototype.getMax = function () { | ||
const sub = this.nc.subs[this.sid] | ||
if (sub) { | ||
return sub.max || -1 | ||
if (!this.closed) { | ||
return this.max || -1 | ||
} | ||
@@ -2145,18 +1971,148 @@ return -1 | ||
Sub.prototype.isCancelled = function () { | ||
const sub = this.nc.subs[this.sid] | ||
return sub === undefined | ||
return this.closed | ||
} | ||
Sub.prototype.isDraining = function () { | ||
const sub = this.nc.subs[this.sid] | ||
return sub && sub.draining | ||
return !this.closed && this.draining | ||
} | ||
function Req (nc, sid) { | ||
Sub.prototype.subCmd = function () { | ||
if (this.queue) { | ||
return [SUB, this.subject, this.queue, this.sid + CR_LF].join(SPC) | ||
} | ||
return [SUB, this.subject, this.sid + CR_LF].join(SPC) | ||
} | ||
Sub.prototype.reUnsubCmd = function () { | ||
if (this.max) { | ||
const max = this.max - this.received | ||
if (max > 0) { | ||
return [UNSUB, this.sid, max + CR_LF].join(SPC) | ||
} | ||
return [UNSUB, this.sid + CR_LF].join(SPC) | ||
} | ||
return '' | ||
} | ||
Sub.prototype.unsubCmd = function () { | ||
if (this.max) { | ||
return [UNSUB, this.sid, this.max + CR_LF].join(SPC) | ||
} | ||
return [UNSUB, this.sid + CR_LF].join(SPC) | ||
} | ||
function Reqs (nc) { | ||
this.sids = 0 | ||
this.nc = nc | ||
this.inbox = this.nc.createInbox() | ||
this.inboxPrefixLen = this.inbox.length + 1 | ||
this.tokenToReq = {} | ||
this.length = 0 | ||
this.sub = null | ||
} | ||
Reqs.prototype.init = function () { | ||
if (this.sub === null) { | ||
this.sub = this.nc.subscribe(`${this.inbox}.*`, (err, m) => { | ||
const token = this.extractToken(m.subject) | ||
const req = this.get(token) | ||
if (req) { | ||
req.received++ | ||
if (req.callback && typeof req.callback === 'function') { | ||
req.callback(err, m) | ||
} | ||
if (Object.hasOwnProperty.call(req, 'max')) { | ||
if (req.received >= req.max) { | ||
req.cancel() | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
Reqs.prototype.clearTimers = function () { | ||
this.getAll().forEach((s) => { | ||
s.cancel() | ||
}) | ||
} | ||
Reqs.prototype.addRequest = function (callback, opts) { | ||
// insure we have a subscription | ||
this.init() | ||
this.sids-- | ||
const token = nuid.next() | ||
const subject = `${this.inbox}.${token}` | ||
const sid = this.sids | ||
const req = new Req(this.nc, sid, subject, token, callback, opts) | ||
this.tokenToReq[token] = req | ||
this.length++ | ||
if (opts.timeout) { | ||
req.timeout = setTimeout(() => { | ||
if (req.callback) { | ||
req.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + req.sid, ErrorCode.REQ_TIMEOUT)) | ||
} | ||
this.nc.reqs.remove(req) | ||
}, opts.timeout) | ||
} | ||
return req | ||
} | ||
Reqs.prototype.remove = function (req) { | ||
if (req) { | ||
if (req.timeout) { | ||
clearTimeout(req.timeout) | ||
req.timeout = null | ||
} | ||
delete this.tokenToReq[req.token] | ||
req.closed = true | ||
this.length-- | ||
this.nc.emit(Events.Unsubscribe, { sid: req.sid, subject: req.subject }) | ||
} | ||
} | ||
Reqs.prototype.extractToken = function (s) { | ||
return s.substr(this.inboxPrefixLen) | ||
} | ||
Reqs.prototype.get = function (token) { | ||
return this.tokenToReq[token] | ||
} | ||
Reqs.prototype.getAll = function () { | ||
const a = [] | ||
for (const p in this.tokenToReq) { | ||
if (Object.hasOwnProperty.call(this.tokenToReq, p)) { | ||
const req = this.tokenToReq[p] | ||
a.push(req) | ||
} | ||
} | ||
return a | ||
} | ||
function Req (nc, sid, subject, token, callback, opts, sub) { | ||
this.nc = nc | ||
this.sid = sid | ||
this.closed = false | ||
this.subject = subject | ||
this.token = token | ||
this.callback = callback | ||
this.received = 0 | ||
this.max = opts.max || undefined | ||
this.sub = sub | ||
} | ||
Req.prototype.cancel = function () { | ||
this.nc.unsubscribe(this.sid) | ||
if (this.nc.closed) { | ||
return | ||
} | ||
if (!this.closed) { | ||
if (this.sub) { | ||
this.sub.unsubscribe() | ||
} else { | ||
this.nc.reqs.remove(this) | ||
} | ||
} | ||
} | ||
@@ -2163,0 +2119,0 @@ |
{ | ||
"name": "nats", | ||
"version": "2.0.0-15", | ||
"version": "2.0.0-16", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
115765
2323