node-nats-streaming
Advanced tools
Comparing version 0.2.0 to 0.2.2
1187
lib/stan.js
@@ -23,3 +23,3 @@ /* | ||
*/ | ||
var util = require('util'), | ||
const util = require('util'), | ||
nats = require('nats'), | ||
@@ -36,31 +36,31 @@ timers = require('timers'), | ||
*/ | ||
var VERSION = '0.2.0', | ||
DEFAULT_PORT = 4222, | ||
DEFAULT_PRE = 'nats://localhost:', | ||
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT, | ||
DEFAULT_DISCOVER_PREFIX = '_STAN.discover', | ||
DEFAULT_ACK_PREFIX = '_STAN.acks', | ||
DEFAULT_CONNECT_WAIT = 1000 * 2, | ||
const VERSION = '0.2.2', | ||
DEFAULT_PORT = 4222, | ||
DEFAULT_PRE = 'nats://localhost:', | ||
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT, | ||
DEFAULT_DISCOVER_PREFIX = '_STAN.discover', | ||
DEFAULT_ACK_PREFIX = '_STAN.acks', | ||
DEFAULT_CONNECT_WAIT = 1000 * 2, | ||
DEFAULT_MAX_IN_FLIGHT = 16384, | ||
DEFAULT_ACK_WAIT = 30 * 1000, | ||
DEFAULT_MAX_IN_FLIGHT = 16384, | ||
DEFAULT_ACK_WAIT = 30 * 1000, | ||
BAD_SUBJECT = 'stan: subject must be supplied', | ||
BAD_CLUSTER_ID = 'stan: cluster ID must be supplied', | ||
BAD_CLIENT_ID = 'stan: client ID must be supplied', | ||
MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.', | ||
CONN_CLOSED = 'stan: Connection closed', | ||
BAD_SUBSCRIPTION = 'stan: invalid subscription', | ||
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.', | ||
NO_SERVER_SUPPORT = 'stan: not supported by server', | ||
ACK_TIMEOUT = 'stan: publish ack timeout', | ||
CONNECT_REQ_TIMEOUT = 'stan: connect request timeout', | ||
CLOSE_REQ_TIMEOUT = 'stan: close request timeout', | ||
SUB_REQ_TIMEOUT = 'stan: subscribe request timeout', | ||
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout', | ||
BAD_SUBJECT = 'stan: subject must be supplied', | ||
BAD_CLUSTER_ID = 'stan: cluster ID must be supplied', | ||
BAD_CLIENT_ID = 'stan: client ID must be supplied', | ||
MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.', | ||
CONN_CLOSED = 'stan: Connection closed', | ||
BAD_SUBSCRIPTION = 'stan: invalid subscription', | ||
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.', | ||
NO_SERVER_SUPPORT = 'stan: not supported by server', | ||
ACK_TIMEOUT = 'stan: publish ack timeout', | ||
CONNECT_REQ_TIMEOUT = 'stan: connect request timeout', | ||
CLOSE_REQ_TIMEOUT = 'stan: close request timeout', | ||
SUB_REQ_TIMEOUT = 'stan: subscribe request timeout', | ||
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout', | ||
PROTOCOL_ONE = 1, | ||
DEFAULT_PING_INTERVAL = 5 * 1000, | ||
DEFAULT_PING_MAXOUT = 3, | ||
MAX_PINGS_EXCEEDED = 'stan: connection lost due to PING failure'; | ||
PROTOCOL_ONE = 1, | ||
DEFAULT_PING_INTERVAL = 5 * 1000, | ||
DEFAULT_PING_MAXOUT = 3, | ||
MAX_PINGS_EXCEEDED = 'stan: connection lost due to PING failure'; | ||
@@ -76,24 +76,24 @@ | ||
function Stan(clusterID, clientID, opts) { | ||
events.EventEmitter.call(this); | ||
if(typeof clusterID !== 'string' || clusterID.length < 1) { | ||
throw new Error(BAD_CLUSTER_ID); | ||
} | ||
if(typeof clientID !== 'string' || clientID.length < 1) { | ||
throw new Error(BAD_CLIENT_ID); | ||
} | ||
this.clusterID = clusterID; | ||
this.clientID = clientID; | ||
this.ackSubject = DEFAULT_ACK_PREFIX + "." + nuid.next(); // publish acks | ||
events.EventEmitter.call(this); | ||
if (typeof clusterID !== 'string' || clusterID.length < 1) { | ||
throw new Error(BAD_CLUSTER_ID); | ||
} | ||
if (typeof clientID !== 'string' || clientID.length < 1) { | ||
throw new Error(BAD_CLIENT_ID); | ||
} | ||
this.clusterID = clusterID; | ||
this.clientID = clientID; | ||
this.ackSubject = DEFAULT_ACK_PREFIX + "." + nuid.next(); // publish acks | ||
// these are set by stan | ||
this.pubPrefix = null; // publish prefix appended to subject | ||
this.subRequests = null; // subject for subscription requests | ||
this.unsubRequests = null; // subject for unsubscribe requests | ||
this.subCloseRequests = null; // subject for subscription close requests | ||
this.closeRequests = null; // subject for close requests | ||
// these are set by stan | ||
this.pubPrefix = null; // publish prefix appended to subject | ||
this.subRequests = null; // subject for subscription requests | ||
this.unsubRequests = null; // subject for unsubscribe requests | ||
this.subCloseRequests = null; // subject for subscription close requests | ||
this.closeRequests = null; // subject for close requests | ||
this.parseOptions(opts); | ||
this.initState(); | ||
this.createConnection(); | ||
return this; | ||
this.parseOptions(opts); | ||
this.initState(); | ||
this.createConnection(); | ||
return this; | ||
} | ||
@@ -113,3 +113,3 @@ | ||
exports.connect = function(clusterID, clientID, opts) { | ||
return new Stan(clusterID, clientID, opts); | ||
return new Stan(clusterID, clientID, opts); | ||
}; | ||
@@ -123,3 +123,3 @@ | ||
Stan.prototype.isClosed = function() { | ||
return this.nc === undefined; | ||
return this.nc === undefined; | ||
}; | ||
@@ -133,66 +133,65 @@ | ||
Stan.prototype.parseOptions = function(opts) { | ||
var options = this.options = { | ||
url: DEFAULT_URI, | ||
connectTimeout: DEFAULT_CONNECT_WAIT, | ||
ackTimeout: DEFAULT_ACK_WAIT, | ||
discoverPrefix: DEFAULT_DISCOVER_PREFIX, | ||
maxPubAcksInflight: DEFAULT_MAX_IN_FLIGHT, | ||
stanEncoding: 'utf8', | ||
stanPingInterval: DEFAULT_PING_INTERVAL, | ||
stanMaxPingOut: DEFAULT_PING_MAXOUT, | ||
maxReconnectAttempts: -1 | ||
}; | ||
const options = this.options = { | ||
url: DEFAULT_URI, | ||
connectTimeout: DEFAULT_CONNECT_WAIT, | ||
ackTimeout: DEFAULT_ACK_WAIT, | ||
discoverPrefix: DEFAULT_DISCOVER_PREFIX, | ||
maxPubAcksInflight: DEFAULT_MAX_IN_FLIGHT, | ||
stanEncoding: 'utf8', | ||
stanPingInterval: DEFAULT_PING_INTERVAL, | ||
stanMaxPingOut: DEFAULT_PING_MAXOUT, | ||
maxReconnectAttempts: -1 | ||
}; | ||
if (opts === undefined) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
} else if ('string' === typeof opts) { | ||
options.url = sanitizeUrl(opts); | ||
} else if ('object' === typeof opts) { | ||
if (opts.port !== undefined) { | ||
options.url = DEFAULT_PRE + opts.port; | ||
} | ||
if (opts === undefined) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
} else if ('string' === typeof opts) { | ||
options.url = sanitizeUrl(opts); | ||
} else if ('object' === typeof opts) { | ||
if (opts.port !== undefined) { | ||
options.url = DEFAULT_PRE + opts.port; | ||
} | ||
this.assignOption(opts, 'discoverPrefix'); | ||
this.assignOption(opts, 'nc'); | ||
this.assignOption(opts, 'connectTimeout'); | ||
this.assignOption(opts, 'ackTimeout'); | ||
this.assignOption(opts, 'maxPubAcksInflight'); | ||
this.assignOption(opts, 'stanEncoding'); | ||
this.assignOption(opts, 'stanPingInterval'); | ||
this.assignOption(opts, 'stanMaxPingOut'); | ||
this.assignOption(opts, 'discoverPrefix'); | ||
this.assignOption(opts, 'nc'); | ||
this.assignOption(opts, 'connectTimeout'); | ||
this.assignOption(opts, 'ackTimeout'); | ||
this.assignOption(opts, 'maxPubAcksInflight'); | ||
this.assignOption(opts, 'stanEncoding'); | ||
this.assignOption(opts, 'stanPingInterval'); | ||
this.assignOption(opts, 'stanMaxPingOut'); | ||
// node-nats does takes a bunch of other options | ||
// we simply forward them, as node-nats is used | ||
// underneath. | ||
this.assignOption(opts, 'url'); | ||
this.assignOption(opts, 'uri', 'url'); | ||
this.assignOption(opts, 'user'); | ||
this.assignOption(opts, 'pass'); | ||
this.assignOption(opts, 'token'); | ||
this.assignOption(opts, 'password', 'pass'); | ||
this.assignOption(opts, 'verbose'); | ||
this.assignOption(opts, 'pedantic'); | ||
this.assignOption(opts, 'reconnect'); | ||
this.assignOption(opts, 'maxReconnectAttempts'); | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'encoding'); | ||
this.assignOption(opts, 'tls'); | ||
this.assignOption(opts, 'secure', 'tls'); | ||
this.assignOption(opts, 'name'); | ||
this.assignOption(opts, 'client', 'name'); | ||
this.assignOption(opts, 'yieldTime'); | ||
this.assignOption(opts, 'waitOnFirstConnect'); | ||
this.assignOption(opts, 'preserveBuffers'); | ||
this.assignOption(opts, 'pingInterval'); | ||
this.assignOption(opts, 'maxPingOut'); | ||
this.assignOption(opts, 'useOldRequestStyle'); | ||
} | ||
// node-nats does takes a bunch of other options | ||
// we simply forward them, as node-nats is used | ||
// underneath. | ||
this.assignOption(opts, 'url'); | ||
this.assignOption(opts, 'uri', 'url'); | ||
this.assignOption(opts, 'user'); | ||
this.assignOption(opts, 'pass'); | ||
this.assignOption(opts, 'token'); | ||
this.assignOption(opts, 'password', 'pass'); | ||
this.assignOption(opts, 'verbose'); | ||
this.assignOption(opts, 'pedantic'); | ||
this.assignOption(opts, 'reconnect'); | ||
this.assignOption(opts, 'maxReconnectAttempts'); | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'encoding'); | ||
this.assignOption(opts, 'tls'); | ||
this.assignOption(opts, 'secure', 'tls'); | ||
this.assignOption(opts, 'name'); | ||
this.assignOption(opts, 'client', 'name'); | ||
this.assignOption(opts, 'yieldTime'); | ||
this.assignOption(opts, 'waitOnFirstConnect'); | ||
this.assignOption(opts, 'preserveBuffers'); | ||
this.assignOption(opts, 'pingInterval'); | ||
this.assignOption(opts, 'maxPingOut'); | ||
this.assignOption(opts, 'useOldRequestStyle'); | ||
} | ||
}; | ||
@@ -202,11 +201,11 @@ | ||
function sanitizeUrl(host) { | ||
if ((/^.*:\/\/.*/).exec(host) === null) { | ||
// Does not have a scheme. | ||
host = 'nats://' + host; | ||
} | ||
var u = url.parse(host); | ||
if (u.port === null || u.port == '') { | ||
host += ":" + DEFAULT_PORT; | ||
} | ||
return host; | ||
if ((/^.*:\/\/.*/).exec(host) === null) { | ||
// Does not have a scheme. | ||
host = 'nats://' + host; | ||
} | ||
const u = url.parse(host); | ||
if (u.port === null || u.port == '') { | ||
host += ":" + DEFAULT_PORT; | ||
} | ||
return host; | ||
} | ||
@@ -222,8 +221,8 @@ | ||
Stan.prototype.assignOption = function(opts, prop, assign) { | ||
if (assign === undefined) { | ||
assign = prop; | ||
} | ||
if (opts[prop] !== undefined) { | ||
this.options[assign] = opts[prop]; | ||
} | ||
if (assign === undefined) { | ||
assign = prop; | ||
} | ||
if (opts[prop] !== undefined) { | ||
this.options[assign] = opts[prop]; | ||
} | ||
}; | ||
@@ -236,9 +235,9 @@ | ||
Stan.prototype.initState = function() { | ||
this.pubAckMap = {}; | ||
this.pubAckOutstanding = 0; | ||
this.subMap = {}; | ||
this.pubAckMap = {}; | ||
this.pubAckOutstanding = 0; | ||
this.subMap = {}; | ||
}; | ||
Buffer.prototype.toByteArray = function() { | ||
return Array.prototype.slice.call(this, 0); | ||
return Array.prototype.slice.call(this, 0); | ||
}; | ||
@@ -279,129 +278,132 @@ | ||
Stan.prototype.createConnection = function() { | ||
var that = this; | ||
if(typeof this.options.nc === 'object') { | ||
if(this.options.nc.encoding !== 'binary') { | ||
throw new Error(BINARY_ENCODING_REQUIRED); | ||
} else { | ||
this.nc = this.options.nc; | ||
if (typeof this.options.nc === 'object') { | ||
if (this.options.nc.encoding !== 'binary') { | ||
throw new Error(BINARY_ENCODING_REQUIRED); | ||
} else { | ||
this.nc = this.options.nc; | ||
} | ||
} | ||
} | ||
if (this.nc === undefined) { | ||
var encoding = this.options.encoding; | ||
if(encoding && encoding !== 'binary') { | ||
throw new Error(BINARY_ENCODING_REQUIRED); | ||
} else { | ||
this.options.encoding = 'binary'; | ||
if (this.nc === undefined) { | ||
const encoding = this.options.encoding; | ||
if (encoding && encoding !== 'binary') { | ||
throw new Error(BINARY_ENCODING_REQUIRED); | ||
} else { | ||
this.options.encoding = 'binary'; | ||
} | ||
this.nc = nats.connect(this.options); | ||
this.ncOwned = true; | ||
} | ||
this.nc = nats.connect(this.options); | ||
this.ncOwned = true; | ||
} | ||
this.nc.on('connect', function() { | ||
// heartbeat processing | ||
var hbInbox = nats.createInbox(); | ||
that.hbSubscription = that.nc.subscribe(hbInbox, function(msg, reply) { | ||
that.nc.publish(reply); | ||
}); | ||
this.nc.on('connect', () => { | ||
// heartbeat processing | ||
const hbInbox = nats.createInbox(); | ||
this.hbSubscription = this.nc.subscribe(hbInbox, (msg, reply) => { | ||
this.nc.publish(reply); | ||
}); | ||
that.pingInbox = nats.createInbox(); | ||
that.pingSubscription = that.nc.subscribe(that.pingInbox, function(msg) { | ||
if (msg) { | ||
var pingResponse = proto.PingResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
var err = pingResponse.getError(); | ||
if (err) { | ||
that.closeWithError('connection_lost', err); | ||
return; | ||
} | ||
} | ||
that.pingOut = 0; | ||
}); | ||
this.pingInbox = nats.createInbox(); | ||
this.pingSubscription = this.nc.subscribe(this.pingInbox, (msg) => { | ||
if (msg) { | ||
const pingResponse = proto.PingResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
const err = pingResponse.getError(); | ||
if (err) { | ||
this.closeWithError('connection_lost', err); | ||
return; | ||
} | ||
} | ||
this.pingOut = 0; | ||
}); | ||
that.ackSubscription = that.nc.subscribe(that.ackSubject, that.processAck()); | ||
this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck()); | ||
var discoverSubject = that.options.discoverPrefix + '.' + that.clusterID; | ||
//noinspection JSUnresolvedFunction | ||
that.connId = Buffer.from(nuid.next(), "utf8"); | ||
var req = new proto.ConnectRequest(); | ||
req.setClientId(that.clientID); | ||
req.setHeartbeatInbox(hbInbox); | ||
req.setProtocol(PROTOCOL_ONE); | ||
req.setConnId(that.connId); | ||
req.setPingInterval(Math.ceil(that.options.stanPingInterval / 1000)); | ||
req.setPingMaxOut(that.options.stanMaxPingOut); | ||
const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID; | ||
//noinspection JSUnresolvedFunction | ||
this.connId = Buffer.from(nuid.next(), "utf8"); | ||
const req = new proto.ConnectRequest(); | ||
req.setClientId(this.clientID); | ||
req.setHeartbeatInbox(hbInbox); | ||
req.setProtocol(PROTOCOL_ONE); | ||
req.setConnId(this.connId); | ||
req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000)); | ||
req.setPingMaxOut(this.options.stanMaxPingOut); | ||
this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => { | ||
if (msg instanceof nats.NatsError) { | ||
let err = msg; | ||
if (msg.code === nats.REQ_TIMEOUT) { | ||
err = CONNECT_REQ_TIMEOUT; | ||
} | ||
this.closeWithError('error', err); | ||
return; | ||
} | ||
// fixme: hardcoded timeout | ||
that.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), 2*1000, function(msg) { | ||
if(msg instanceof nats.NatsError) { | ||
that.closeWithError('error', msg); | ||
return; | ||
} | ||
const cr = proto.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
if (cr.getError() !== "") { | ||
this.closeWithError('error', cr.getError()); | ||
return; | ||
} | ||
this.pubPrefix = cr.getPubPrefix(); | ||
this.subRequests = cr.getSubRequests(); | ||
this.unsubRequests = cr.getUnsubRequests(); | ||
this.subCloseRequests = cr.getSubCloseRequests(); | ||
this.closeRequests = cr.getCloseRequests(); | ||
var cr = proto.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
if (cr.getError() !== "") { | ||
that.closeWithError('error', cr.getError()); | ||
return; | ||
} | ||
that.pubPrefix = cr.getPubPrefix(); | ||
that.subRequests = cr.getSubRequests(); | ||
that.unsubRequests = cr.getUnsubRequests(); | ||
that.subCloseRequests = cr.getSubCloseRequests(); | ||
that.closeRequests = cr.getCloseRequests(); | ||
let unsubPingSub = true; | ||
if (cr.getProtocol() >= PROTOCOL_ONE) { | ||
if (cr.getPingInterval() !== 0) { | ||
unsubPingSub = false; | ||
var unsubPingSub = true; | ||
if (cr.getProtocol() >= PROTOCOL_ONE) { | ||
if (cr.getPingInterval() !== 0) { | ||
unsubPingSub = false; | ||
this.pingRequests = cr.getPingRequests(); | ||
this.stanPingInterval = cr.getPingInterval() * 1000; | ||
this.stanMaxPingOut = cr.getPingMaxOut(); | ||
that.pingRequests = cr.getPingRequests(); | ||
that.stanPingInterval = cr.getPingInterval() * 1000; | ||
that.stanMaxPingOut = cr.getPingMaxOut(); | ||
const ping = new proto.Ping(); | ||
ping.setConnId(this.connId); | ||
this.pingBytes = Buffer.from(ping.serializeBinary()); | ||
var ping = new proto.Ping(); | ||
ping.setConnId(that.connId); | ||
that.pingBytes = Buffer.from(ping.serializeBinary()); | ||
that.pingOut = 0; | ||
that.pingTimer = setTimeout(function pingFun() { | ||
that.pingOut++; | ||
if (that.pingOut > that.stanMaxPingOut) { | ||
that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED)); | ||
return; | ||
this.pingOut = 0; | ||
const that = this; | ||
this.pingTimer = setTimeout(function pingFun() { | ||
that.pingOut++; | ||
if (that.pingOut > that.stanMaxPingOut) { | ||
that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED)); | ||
return; | ||
} | ||
that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox); | ||
that.pingTimer = setTimeout(pingFun, that.stanPingInterval); | ||
}, this.stanPingInterval); | ||
} | ||
} | ||
that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox); | ||
that.pingTimer = setTimeout(pingFun, that.stanPingInterval); | ||
}, that.stanPingInterval); | ||
} | ||
} | ||
if (unsubPingSub) { | ||
that.nc.unsubscribe(that.pingSubscription); | ||
that.pingSubscription = null; | ||
} | ||
if (unsubPingSub) { | ||
this.nc.unsubscribe(this.pingSubscription); | ||
this.pingSubscription = null; | ||
} | ||
that.emit('connect', that); | ||
this.emit('connect', this); | ||
}); | ||
}); | ||
}); | ||
this.nc.on('close', function() { | ||
that.emit('close'); | ||
}); | ||
this.nc.on('close', () => { | ||
// insure we cleaned up | ||
this.cleanupOnClose(); | ||
this.emit('close'); | ||
}); | ||
this.nc.on('disconnect', function() { | ||
that.emit('disconnect'); | ||
}); | ||
this.nc.on('disconnect', () => { | ||
this.emit('disconnect'); | ||
}); | ||
this.nc.on('reconnect', function() { | ||
that.emit('reconnect', that); | ||
}); | ||
this.nc.on('reconnect', () => { | ||
this.emit('reconnect', this); | ||
}); | ||
this.nc.on('reconnecting', function() { | ||
that.emit('reconnecting'); | ||
}); | ||
this.nc.on('reconnecting', () => { | ||
this.emit('reconnecting'); | ||
}); | ||
this.nc.on('error', function(msg) { | ||
that.emit('error', msg); | ||
}); | ||
this.nc.on('error', (msg) => { | ||
this.emit('error', msg); | ||
}); | ||
}; | ||
@@ -418,11 +420,11 @@ | ||
Stan.prototype.closeWithError = function(event, error) { | ||
if (this.nc === undefined || this.clientID === undefined) { | ||
return; | ||
} | ||
this.cleanupOnClose(error); | ||
if (this.ncOwned) { | ||
this.nc.close(); | ||
} | ||
this.emit(event, error); | ||
this.emit('close'); | ||
if (this.nc === undefined || this.clientID === undefined) { | ||
return; | ||
} | ||
this.cleanupOnClose(error); | ||
if (this.ncOwned) { | ||
this.nc.close(); | ||
} | ||
this.emit(event, error); | ||
this.emit('close'); | ||
}; | ||
@@ -437,34 +439,33 @@ | ||
Stan.prototype.cleanupOnClose = function(err) { | ||
// remove the ping timer | ||
if (this.pingTimer) { | ||
timers.clearTimeout(this.pingTimer); | ||
delete this.pingTimer; | ||
} | ||
// remove the ping timer | ||
if (this.pingTimer) { | ||
timers.clearTimeout(this.pingTimer); | ||
delete this.pingTimer; | ||
} | ||
// if we don't own the connection, we unsub to insure | ||
// that a subsequent reconnect will properly clean up. | ||
// Otherwise the close() will take care of it. | ||
if(!this.ncOwned && this.nc) { | ||
if (this.ackSubscription) { | ||
this.nc.unsubscribe(this.ackSubscription); | ||
this.ackSubscription = null; | ||
// if we don't own the connection, we unsub to insure | ||
// that a subsequent reconnect will properly clean up. | ||
// Otherwise the close() will take care of it. | ||
if (!this.ncOwned && this.nc) { | ||
if (this.ackSubscription) { | ||
this.nc.unsubscribe(this.ackSubscription); | ||
this.ackSubscription = null; | ||
} | ||
if (this.pingSubscription) { | ||
this.nc.unsubscribe(this.pingSubscription); | ||
this.pingSubscription = null; | ||
} | ||
if (this.hbSubscription) { | ||
this.nc.unsubscribe(this.hbSubscription); | ||
this.hbSubscription = null; | ||
} | ||
} | ||
if(this.pingSubscription) { | ||
this.nc.unsubscribe(this.pingSubscription); | ||
this.pingSubscription = null; | ||
for (const guid in this.pubAckMap) { | ||
if (this.pubAckMap.hasOwnProperty(guid)) { | ||
const a = this.removeAck(guid); | ||
if (a && a.ah && typeof a.ah === 'function') { | ||
a.ah(err, guid); | ||
} | ||
} | ||
} | ||
if(this.hbSubscription) { | ||
this.nc.unsubscribe(this.hbSubscription); | ||
this.hbSubscription = null; | ||
} | ||
} | ||
for (var guid in this.pubAckMap) { | ||
if(!this.pubAckMap.hasOwnProperty(guid)) { | ||
continue; | ||
} | ||
var a = this.removeAck(guid); | ||
if (a && a.ah && typeof a.ah === 'function') { | ||
a.ah(err, guid); | ||
} | ||
} | ||
}; | ||
@@ -478,40 +479,38 @@ | ||
Stan.prototype.close = function() { | ||
if (this.nc === undefined || this.clientID === undefined) { | ||
return; | ||
} | ||
this.cleanupOnClose(new Error(CONN_CLOSED)); | ||
var that = this; | ||
//noinspection JSUnresolvedFunction | ||
if(this.nc && this.closeRequests) { | ||
var req = new proto.CloseRequest(); | ||
req.setClientId(this.clientID); | ||
this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, that.options.connectTimeout, function (msgOrError) { | ||
var nc = that.nc; | ||
delete that.nc; | ||
var closeError = null; | ||
//noinspection JSUnresolvedVariable | ||
if (msgOrError instanceof nats.NatsError) { | ||
// if we get an error here, we simply show it in the close notification as there's not much we can do here. | ||
closeError = msgOrError; | ||
} else { | ||
var cr = proto.CloseResponse.deserializeBinary(Buffer.from(msgOrError, 'binary').toByteArray()); | ||
var err = cr.getError(); | ||
if (err && err.length > 0) { | ||
// if the protocol returned an error there's nothing for us to handle, pass it as an arg to close notification. | ||
closeError = new Error(err); | ||
if (this.nc === undefined || this.clientID === undefined) { | ||
return; | ||
} | ||
this.cleanupOnClose(new Error(CONN_CLOSED)); | ||
//noinspection JSUnresolvedFunction | ||
if (this.nc && this.closeRequests) { | ||
const req = new proto.CloseRequest(); | ||
req.setClientId(this.clientID); | ||
this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, this.options.connectTimeout, (msgOrError) => { | ||
const nc = this.nc; | ||
delete this.nc; | ||
let closeError = null; | ||
//noinspection JSUnresolvedVariable | ||
if (msgOrError instanceof nats.NatsError) { | ||
// if we get an error here, we simply show it in the close notification as there's not much we can do here. | ||
closeError = msgOrError; | ||
} else { | ||
const cr = proto.CloseResponse.deserializeBinary(Buffer.from(msgOrError, 'binary').toByteArray()); | ||
const err = cr.getError(); | ||
if (err && err.length > 0) { | ||
// if the protocol returned an error there's nothing for us to handle, pass it as an arg to close notification. | ||
closeError = new Error(err); | ||
} | ||
} | ||
if (nc && this.ncOwned) { | ||
nc.close(); | ||
} | ||
this.emit('close', closeError); | ||
}); | ||
} else { | ||
if (this.nc && this.ncOwned) { | ||
this.nc.close(); | ||
delete this.nc; | ||
} | ||
} | ||
// go closes always | ||
if (nc && that.ncOwned) { | ||
nc.close(); | ||
} | ||
that.emit('close', closeError); | ||
}); | ||
} else { | ||
if (this.nc && this.ncOwned) { | ||
this.nc.close(); | ||
delete that.nc; | ||
this.emit('close'); | ||
} | ||
that.emit('close'); | ||
} | ||
}; | ||
@@ -525,13 +524,12 @@ | ||
Stan.prototype.processAck = function() { | ||
var that = this; | ||
return function(msg) { | ||
//noinspection JSUnresolvedVariable | ||
var pa = proto.PubAck.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
var guid = pa.getGuid(); | ||
var a = that.removeAck(guid); | ||
if(a && a.ah) { | ||
var err = pa.getError(); | ||
a.ah(err === '' ? undefined : err, guid); | ||
} | ||
}; | ||
return (msg) => { | ||
//noinspection JSUnresolvedVariable | ||
const pa = proto.PubAck.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
const guid = pa.getGuid(); | ||
const a = this.removeAck(guid); | ||
if (a && a.ah) { | ||
const err = pa.getError(); | ||
a.ah(err === '' ? undefined : err, guid); | ||
} | ||
}; | ||
}; | ||
@@ -546,12 +544,12 @@ | ||
Stan.prototype.removeAck = function(guid) { | ||
var a = this.pubAckMap[guid]; | ||
if (a !== undefined) { | ||
delete this.pubAckMap[guid]; | ||
this.pubAckOutstanding--; | ||
if (a.t !== undefined) { | ||
//noinspection JSUnresolvedFunction | ||
timers.clearTimeout(a.t); | ||
const a = this.pubAckMap[guid]; | ||
if (a !== undefined) { | ||
delete this.pubAckMap[guid]; | ||
this.pubAckOutstanding--; | ||
if (a.t !== undefined) { | ||
//noinspection JSUnresolvedFunction | ||
timers.clearTimeout(a.t); | ||
} | ||
} | ||
} | ||
return a; | ||
return a; | ||
}; | ||
@@ -571,60 +569,59 @@ | ||
Stan.prototype.publish = function(subject, data, ackHandler) { | ||
if (this.nc === undefined) { | ||
if (util.isFunction(ackHandler)) { | ||
ackHandler(new Error(CONN_CLOSED)); | ||
return; | ||
} else { | ||
throw new Error(CONN_CLOSED); | ||
if (this.nc === undefined) { | ||
if (util.isFunction(ackHandler)) { | ||
ackHandler(new Error(CONN_CLOSED)); | ||
return; | ||
} else { | ||
throw new Error(CONN_CLOSED); | ||
} | ||
} | ||
} | ||
if(this.pubAckOutstanding > this.options.maxPubAcksInflight) { | ||
// we have many pending publish messages, fail it. | ||
if(util.isFunction(ackHandler)) { | ||
ackHandler(new Error(MAX_FLIGHT_LIMIT_REACHED)); | ||
} else { | ||
throw new Error(MAX_FLIGHT_LIMIT_REACHED); | ||
if (this.pubAckOutstanding > this.options.maxPubAcksInflight) { | ||
// we have many pending publish messages, fail it. | ||
if (util.isFunction(ackHandler)) { | ||
ackHandler(new Error(MAX_FLIGHT_LIMIT_REACHED)); | ||
} else { | ||
throw new Error(MAX_FLIGHT_LIMIT_REACHED); | ||
} | ||
} | ||
} | ||
var subj = this.pubPrefix + '.' + subject; | ||
var peGUID = nuid.next(); | ||
//noinspection JSUnresolvedFunction | ||
var pe = new proto.PubMsg(); | ||
pe.setClientId(this.clientID); | ||
pe.setConnId(this.connId); | ||
pe.setGuid(peGUID); | ||
pe.setSubject(subject); | ||
var buf; | ||
if(typeof data === 'string') { | ||
buf = Buffer.from(data, 'utf8'); | ||
data = new Uint8Array(buf); | ||
} else if(Buffer.prototype.isPrototypeOf(data)) { | ||
buf = Buffer.from(data, 'utf8'); | ||
data = new Uint8Array(buf); | ||
} else if(Buffer.prototype.isPrototypeOf(Uint8Array)) { | ||
// we already handle this | ||
} | ||
const subj = this.pubPrefix + '.' + subject; | ||
const peGUID = nuid.next(); | ||
//noinspection JSUnresolvedFunction | ||
const pe = new proto.PubMsg(); | ||
pe.setClientId(this.clientID); | ||
pe.setConnId(this.connId); | ||
pe.setGuid(peGUID); | ||
pe.setSubject(subject); | ||
let buf; | ||
if (typeof data === 'string') { | ||
buf = Buffer.from(data, 'utf8'); | ||
data = new Uint8Array(buf); | ||
} else if (Buffer.prototype.isPrototypeOf(data)) { | ||
buf = Buffer.from(data, 'utf8'); | ||
data = new Uint8Array(buf); | ||
} else if (Buffer.prototype.isPrototypeOf(Uint8Array)) { | ||
// we already handle this | ||
} | ||
pe.setData(data); | ||
pe.setData(data); | ||
var ack = {}; | ||
ack.ah = ackHandler; | ||
this.pubAckMap[peGUID] = ack; | ||
const ack = {}; | ||
ack.ah = ackHandler; | ||
this.pubAckMap[peGUID] = ack; | ||
var that = this; | ||
var bytes = Buffer.from(pe.serializeBinary()); | ||
this.nc.publish(subj, bytes, this.ackSubject); | ||
this.pubAckOutstanding++; | ||
const bytes = Buffer.from(pe.serializeBinary()); | ||
this.nc.publish(subj, bytes, this.ackSubject); | ||
this.pubAckOutstanding++; | ||
// all acks are received in ackSubject, so not possible to reuse nats.timeout | ||
//noinspection JSUnresolvedFunction | ||
ack.t = timers.setTimeout(function() { | ||
var _ack = that.removeAck(peGUID); | ||
if(_ack && _ack.ah !== undefined) { | ||
_ack.ah(new Error(ACK_TIMEOUT), peGUID); | ||
} | ||
}, this.options.ackTimeout); | ||
// all acks are received in ackSubject, so not possible to reuse nats.timeout | ||
//noinspection JSUnresolvedFunction | ||
ack.t = timers.setTimeout(() => { | ||
const a = this.removeAck(peGUID); | ||
if (a && a.ah !== undefined) { | ||
a.ah(new Error(ACK_TIMEOUT), peGUID); | ||
} | ||
}, this.options.ackTimeout); | ||
return peGUID; | ||
return peGUID; | ||
}; | ||
@@ -645,80 +642,76 @@ | ||
Stan.prototype.subscribe = function(subject, qGroup, options) { | ||
var that = this; | ||
const args = {}; | ||
if (typeof qGroup === 'string') { | ||
args.qGroup = qGroup; | ||
} else if (typeof qGroup === 'object') { | ||
args.options = qGroup; | ||
} | ||
if (typeof options === 'object') { | ||
args.options = options; | ||
} | ||
if (!args.options) { | ||
args.options = new SubscriptionOptions(); | ||
} | ||
var args = {}; | ||
if (typeof qGroup === 'string') { | ||
args.qGroup = qGroup; | ||
} | ||
else if (typeof qGroup === 'object') { | ||
args.options = qGroup; | ||
} | ||
if (typeof options === 'object') { | ||
args.options = options; | ||
} | ||
if (!args.options) { | ||
args.options = new SubscriptionOptions(); | ||
} | ||
// in node-nats there's no Subscription object... | ||
const retVal = new Subscription(this, subject, args.qGroup, nats.createInbox(), args.options, args.callback); | ||
// in node-nats there's no Subscription object... | ||
var retVal = new Subscription(this, subject, args.qGroup, nats.createInbox(), args.options, args.callback); | ||
if (typeof subject !== 'string' || subject.length === 0) { | ||
process.nextTick(() => { | ||
this.emit('error', new Error(BAD_SUBJECT)); | ||
}); | ||
return retVal; | ||
} | ||
if (typeof subject !== 'string' || subject.length === 0) { | ||
process.nextTick(function() { | ||
that.emit('error', new Error(BAD_SUBJECT)); | ||
}); | ||
return retVal; | ||
} | ||
if (this.isClosed()) { | ||
process.nextTick(() => { | ||
this.emit('error', new Error(CONN_CLOSED)); | ||
}); | ||
return retVal; | ||
} | ||
if(this.isClosed()) { | ||
process.nextTick(function() { | ||
that.emit('error', new Error(CONN_CLOSED)); | ||
}); | ||
return retVal; | ||
} | ||
this.subMap[retVal.inbox] = retVal; | ||
retVal.inboxSub = this.nc.subscribe(retVal.inbox, this.processMsg()); | ||
const sr = new proto.SubscriptionRequest(); | ||
sr.setClientId(this.clientID); | ||
sr.setSubject(subject); | ||
sr.setQGroup(retVal.qGroup || ''); | ||
sr.setInbox(retVal.inbox); | ||
sr.setMaxInFlight(retVal.opts.maxInFlight); | ||
sr.setAckWaitInSecs(retVal.opts.ackWait / 1000); | ||
sr.setStartPosition(retVal.opts.startPosition); | ||
sr.setDurableName(retVal.opts.durableName || ''); | ||
this.subMap[retVal.inbox] = retVal; | ||
retVal.inboxSub = this.nc.subscribe(retVal.inbox, this.processMsg()); | ||
var sr = new proto.SubscriptionRequest(); | ||
sr.setClientId(this.clientID); | ||
sr.setSubject(subject); | ||
sr.setQGroup(retVal.qGroup || ''); | ||
sr.setInbox(retVal.inbox); | ||
sr.setMaxInFlight(retVal.opts.maxInFlight); | ||
sr.setAckWaitInSecs(retVal.opts.ackWait / 1000); | ||
sr.setStartPosition(retVal.opts.startPosition); | ||
sr.setDurableName(retVal.opts.durableName || ''); | ||
switch (sr.getStartPosition()) { | ||
case proto.StartPosition.TIME_DELTA_START: | ||
sr.setStartTimeDelta(retVal.opts.startTime); | ||
break; | ||
case proto.StartPosition.SEQUENCE_START: | ||
sr.setStartSequence(retVal.opts.startSequence); | ||
break; | ||
} | ||
switch (sr.getStartPosition()) { | ||
case proto.StartPosition.TIME_DELTA_START: | ||
sr.setStartTimeDelta(retVal.opts.startTime); | ||
break; | ||
case proto.StartPosition.SEQUENCE_START: | ||
sr.setStartSequence(retVal.opts.startSequence); | ||
break; | ||
} | ||
this.nc.requestOne(this.subRequests, Buffer.from(sr.serializeBinary()), this.options.connectTimeout, (msg) => { | ||
if (msg instanceof nats.NatsError) { | ||
if (msg.code === nats.REQ_TIMEOUT) { | ||
this.emit('timeout', SUB_REQ_TIMEOUT); | ||
} else { | ||
this.emit('error', msg); | ||
} | ||
return; | ||
} | ||
//noinspection JSUnresolvedVariable | ||
const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
const err = r.getError(); | ||
if (err && err.length !== 0) { | ||
retVal.emit('error', new Error(err)); | ||
this.nc.unsubscribe(retVal.inboxSub); | ||
retVal.emit('unsubscribed'); | ||
return; | ||
} | ||
retVal.ackInbox = r.getAckInbox(); | ||
retVal.emit('ready'); | ||
}); | ||
// fixme: hardcoded timeout | ||
this.nc.requestOne(this.subRequests, Buffer.from(sr.serializeBinary()), 2*1000, function(msg) { | ||
if(msg instanceof nats.NatsError) { | ||
if(msg.code === nats.REQ_TIMEOUT) { | ||
that.emit('timeout', msg); | ||
} else { | ||
that.emit('error', msg); | ||
} | ||
return; | ||
} | ||
//noinspection JSUnresolvedVariable | ||
var r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
var err = r.getError(); | ||
if (err && err.length !== 0) { | ||
retVal.emit('error', new Error(err)); | ||
that.nc.unsubscribe(retVal.inboxSub); | ||
retVal.emit('unsubscribed'); | ||
return; | ||
} | ||
retVal.ackInbox = r.getAckInbox(); | ||
retVal.emit('ready'); | ||
}); | ||
return retVal; | ||
return retVal; | ||
}; | ||
@@ -740,9 +733,9 @@ | ||
function Subscription(stanConnection, subject, qGroup, inbox, opts) { | ||
this.stanConnection = stanConnection; | ||
this.subject = subject; | ||
this.qGroup = qGroup; | ||
this.inbox = inbox; | ||
this.opts = opts; | ||
this.ackInbox = undefined; | ||
this.inboxSub = undefined; | ||
this.stanConnection = stanConnection; | ||
this.subject = subject; | ||
this.qGroup = qGroup; | ||
this.inbox = inbox; | ||
this.opts = opts; | ||
this.ackInbox = undefined; | ||
this.inboxSub = undefined; | ||
} | ||
@@ -790,3 +783,3 @@ | ||
Subscription.prototype.isClosed = function() { | ||
return this.stanConnection === undefined; | ||
return this.stanConnection === undefined; | ||
}; | ||
@@ -800,3 +793,3 @@ | ||
Subscription.prototype.unsubscribe = function() { | ||
closeOrUnsubscribe(this, false); | ||
this.closeOrUnsubscribe(false); | ||
}; | ||
@@ -815,62 +808,64 @@ | ||
Subscription.prototype.close = function() { | ||
closeOrUnsubscribe(this, true); | ||
this.closeOrUnsubscribe(true); | ||
}; | ||
function closeOrUnsubscribe(that, doClose) { | ||
if(that.isClosed()) { | ||
that.emit('error', new Error(BAD_SUBSCRIPTION)); | ||
return; | ||
} | ||
/** | ||
* @param doClose | ||
* @private | ||
*/ | ||
Subscription.prototype.closeOrUnsubscribe = function(doClose) { | ||
if (this.isClosed()) { | ||
this.emit('error', new Error(BAD_SUBSCRIPTION)); | ||
return; | ||
} | ||
var sc = that.stanConnection; | ||
delete that.stanConnection; | ||
delete sc.subMap[that.inbox]; | ||
const sc = this.stanConnection; | ||
delete this.stanConnection; | ||
delete sc.subMap[this.inbox]; | ||
if(sc.isClosed()) { | ||
that.emit('error', new Error(CONN_CLOSED)); | ||
return; | ||
} | ||
if (sc.isClosed()) { | ||
this.emit('error', new Error(CONN_CLOSED)); | ||
return; | ||
} | ||
var reqSubject = sc.unsubRequests; | ||
if(doClose) { | ||
reqSubject = sc.subCloseRequests; | ||
if(!reqSubject) { | ||
that.emit('error', new Error(NO_SERVER_SUPPORT)); | ||
let reqSubject = sc.unsubRequests; | ||
if (doClose) { | ||
reqSubject = sc.subCloseRequests; | ||
if (!reqSubject) { | ||
this.emit('error', new Error(NO_SERVER_SUPPORT)); | ||
} | ||
} | ||
} | ||
sc.nc.unsubscribe(that.inboxSub); | ||
//noinspection JSUnresolvedFunction | ||
var ur = new proto.UnsubscribeRequest(); | ||
ur.setClientId(sc.clientID); | ||
ur.setSubject(that.subject); | ||
ur.setInbox(that.ackInbox); | ||
sc.nc.unsubscribe(this.inboxSub); | ||
//noinspection JSUnresolvedFunction | ||
const ur = new proto.UnsubscribeRequest(); | ||
ur.setClientId(sc.clientID); | ||
ur.setSubject(this.subject); | ||
ur.setInbox(this.ackInbox); | ||
// fixme: hardcoded timeout | ||
sc.nc.requestOne(reqSubject, Buffer.from(ur.serializeBinary()), 2*1000, function (msg) { | ||
var err; | ||
if(msg instanceof nats.NatsError) { | ||
var type = doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT; | ||
err = new nats.NatsError(type, type, msg); | ||
if(msg.code === nats.REQ_TIMEOUT) { | ||
that.emit('timeout', err); | ||
} else { | ||
that.emit('error', err); | ||
} | ||
return; | ||
} | ||
//noinspection JSUnresolvedVariable | ||
var r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
err = r.getError(); | ||
if(err && err.length > 0) { | ||
that.emit('error', new Error(r.getError())); | ||
} else { | ||
that.emit(doClose ? 'closed' : 'unsubscribed'); | ||
} | ||
}); | ||
sc.nc.requestOne(reqSubject, Buffer.from(ur.serializeBinary()), sc.options.connectTimeout, (msg) => { | ||
let err; | ||
if (msg instanceof nats.NatsError) { | ||
const type = doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT; | ||
err = new nats.NatsError(type, type, msg); | ||
if (msg.code === nats.REQ_TIMEOUT) { | ||
this.emit('timeout', err); | ||
} else { | ||
this.emit('error', err); | ||
} | ||
return; | ||
} | ||
//noinspection JSUnresolvedVariable | ||
const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
err = r.getError(); | ||
if (err && err.length > 0) { | ||
this.emit('error', new Error(r.getError())); | ||
} else { | ||
this.emit(doClose ? 'closed' : 'unsubscribed'); | ||
} | ||
}); | ||
}; | ||
} | ||
/** | ||
@@ -882,19 +877,18 @@ * Internal function to process in-bound messages. | ||
Stan.prototype.processMsg = function() { | ||
// curry | ||
var that = this; | ||
return function(rawMsg, reply, subject, sid) { | ||
var sub = that.subMap[subject]; | ||
try { | ||
//noinspection JSUnresolvedVariable | ||
var m = proto.MsgProto.deserializeBinary(Buffer.from(rawMsg, 'binary').toByteArray()); | ||
if (sub === undefined || !that.nc) { | ||
return; | ||
} | ||
var msg = new Message(that, m, sub); | ||
sub.emit('message', msg); | ||
msg.maybeAutoAck(); | ||
} catch (error) { | ||
sub.emit('error', error); | ||
} | ||
}; | ||
// curry | ||
return (rawMsg, reply, subject, sid) => { | ||
const sub = this.subMap[subject]; | ||
try { | ||
//noinspection JSUnresolvedVariable | ||
const m = proto.MsgProto.deserializeBinary(Buffer.from(rawMsg, 'binary').toByteArray()); | ||
if (sub === undefined || !this.nc) { | ||
return; | ||
} | ||
const msg = new Message(this, m, sub); | ||
sub.emit('message', msg); | ||
msg.maybeAutoAck(); | ||
} catch (error) { | ||
sub.emit('error', error); | ||
} | ||
}; | ||
}; | ||
@@ -910,5 +904,5 @@ | ||
function Message(stanClient, msg, subscription) { | ||
this.stanClient = stanClient; | ||
this.msg = msg; | ||
this.subscription = subscription; | ||
this.stanClient = stanClient; | ||
this.msg = msg; | ||
this.subscription = subscription; | ||
} | ||
@@ -921,3 +915,3 @@ | ||
Message.prototype.getSequence = function() { | ||
return this.msg.getSequence(); | ||
return this.msg.getSequence(); | ||
}; | ||
@@ -930,3 +924,3 @@ | ||
Message.prototype.getSubject = function() { | ||
return this.msg.getSubject(); | ||
return this.msg.getSubject(); | ||
}; | ||
@@ -939,3 +933,3 @@ | ||
Message.prototype.getRawData = function() { | ||
return Buffer.from(this.msg.getData(), 'binary'); | ||
return Buffer.from(this.msg.getData(), 'binary'); | ||
}; | ||
@@ -950,8 +944,8 @@ | ||
Message.prototype.getData = function() { | ||
var bytes = this.msg.getData(); | ||
var encoding = this.stanClient.options.stanEncoding; | ||
if(encoding !== 'binary') { | ||
bytes = bytes.length > 0 ? Buffer.from(bytes, encoding).toString() : ''; | ||
} | ||
return bytes; | ||
let bytes = this.msg.getData(); | ||
const encoding = this.stanClient.options.stanEncoding; | ||
if (encoding !== 'binary') { | ||
bytes = bytes.length > 0 ? Buffer.from(bytes, encoding).toString() : ''; | ||
} | ||
return bytes; | ||
}; | ||
@@ -966,3 +960,3 @@ | ||
Message.prototype.getTimestampRaw = function() { | ||
return this.msg.getTimestamp(); | ||
return this.msg.getTimestamp(); | ||
}; | ||
@@ -975,3 +969,3 @@ | ||
Message.prototype.getTimestamp = function() { | ||
return new Date(this.getTimestampRaw()/1000000); | ||
return new Date(this.getTimestampRaw() / 1000000); | ||
}; | ||
@@ -984,3 +978,3 @@ | ||
Message.prototype.isRedelivered = function() { | ||
return this.msg.getRedelivered(); | ||
return this.msg.getRedelivered(); | ||
}; | ||
@@ -992,4 +986,4 @@ | ||
*/ | ||
Message.prototype.getCrc32= function() { | ||
return this.msg.getCrc32(); | ||
Message.prototype.getCrc32 = function() { | ||
return this.msg.getCrc32(); | ||
}; | ||
@@ -1004,5 +998,5 @@ | ||
Message.prototype.maybeAutoAck = function() { | ||
if(! this.subscription.opts.manualAcks) { | ||
this.ack(); | ||
} | ||
if (!this.subscription.opts.manualAcks) { | ||
this.ack(); | ||
} | ||
}; | ||
@@ -1015,8 +1009,8 @@ | ||
Message.prototype.ack = function() { | ||
if(!this.subscription.isClosed()) { | ||
var ack = new proto.Ack(); | ||
ack.setSubject(this.getSubject()); | ||
ack.setSequence(this.getSequence()); | ||
this.stanClient.nc.publish(this.subscription.ackInbox, Buffer.from(ack.serializeBinary())); | ||
} | ||
if (!this.subscription.isClosed()) { | ||
const ack = new proto.Ack(); | ||
ack.setSubject(this.getSubject()); | ||
ack.setSequence(this.getSequence()); | ||
this.stanClient.nc.publish(this.subscription.ackInbox, Buffer.from(ack.serializeBinary())); | ||
} | ||
}; | ||
@@ -1029,7 +1023,7 @@ | ||
Message.prototype.getClientID = function() { | ||
return this.msg.getConnId(); | ||
return this.msg.getConnId(); | ||
}; | ||
Message.prototype.getConnectionID = function() { | ||
return this.msg.getClientId(); | ||
return this.msg.getClientId(); | ||
}; | ||
@@ -1046,16 +1040,16 @@ | ||
function SubscriptionOptions(durableName, maxInFlight, ackWait, startPosition, startSequence, startTime, manualAcks) { | ||
// DurableName, if set will survive client restarts. | ||
this.durableName = durableName; | ||
// Controls the number of messages the cluster will have inflight without an ACK. | ||
this.maxInFlight = maxInFlight || DEFAULT_MAX_IN_FLIGHT; | ||
// Controls the time the cluster will wait for an ACK for a given message. | ||
this.ackWait = ackWait || DEFAULT_ACK_WAIT; | ||
// StartPosition enum from proto. | ||
this.startPosition = startPosition; | ||
// Optional start sequence number. | ||
this.startSequence = startSequence; | ||
// Optional start time. | ||
this.startTime = startTime; | ||
// Option to do Manual Acks | ||
this.manualAcks = manualAcks; | ||
// DurableName, if set will survive client restarts. | ||
this.durableName = durableName; | ||
// Controls the number of messages the cluster will have inflight without an ACK. | ||
this.maxInFlight = maxInFlight || DEFAULT_MAX_IN_FLIGHT; | ||
// Controls the time the cluster will wait for an ACK for a given message. | ||
this.ackWait = ackWait || DEFAULT_ACK_WAIT; | ||
// StartPosition enum from proto. | ||
this.startPosition = startPosition; | ||
// Optional start sequence number. | ||
this.startSequence = startSequence; | ||
// Optional start time. | ||
this.startTime = startTime; | ||
// Option to do Manual Acks | ||
this.manualAcks = manualAcks; | ||
} | ||
@@ -1068,3 +1062,3 @@ | ||
Stan.prototype.subscriptionOptions = function() { | ||
return new SubscriptionOptions(); | ||
return new SubscriptionOptions(); | ||
}; | ||
@@ -1077,20 +1071,20 @@ | ||
SubscriptionOptions.prototype.setMaxInFlight = function(n) { | ||
this.maxInFlight = n; | ||
return this; | ||
this.maxInFlight = n; | ||
return this; | ||
}; | ||
SubscriptionOptions.prototype.setAckWait = function(millis) { | ||
this.ackWait = millis; | ||
return this; | ||
this.ackWait = millis; | ||
return this; | ||
}; | ||
SubscriptionOptions.prototype.setStartAt = function(startPosition) { | ||
this.startPosition = startPosition; | ||
return this; | ||
this.startPosition = startPosition; | ||
return this; | ||
}; | ||
SubscriptionOptions.prototype.setStartAtSequence = function(sequence) { | ||
this.startPosition = proto.StartPosition.SEQUENCE_START; | ||
this.startSequence = sequence; | ||
return this; | ||
this.startPosition = proto.StartPosition.SEQUENCE_START; | ||
this.startSequence = sequence; | ||
return this; | ||
}; | ||
@@ -1104,6 +1098,6 @@ | ||
SubscriptionOptions.prototype.setStartTime = function(date) { | ||
this.startPosition = proto.StartPosition.TIME_DELTA_START; | ||
// server expects values in ns | ||
this.startTime = (Date.now() - date.valueOf()) * 1000000; | ||
return this; | ||
this.startPosition = proto.StartPosition.TIME_DELTA_START; | ||
// server expects values in ns | ||
this.startTime = (Date.now() - date.valueOf()) * 1000000; | ||
return this; | ||
}; | ||
@@ -1116,7 +1110,7 @@ | ||
SubscriptionOptions.prototype.setStartAtTimeDelta = function(millis) { | ||
this.startPosition = proto.StartPosition.TIME_DELTA_START; | ||
//noinspection JSUnresolvedFunction | ||
// server expects values in ns | ||
this.startTime = millis * 1000000; | ||
return this; | ||
this.startPosition = proto.StartPosition.TIME_DELTA_START; | ||
//noinspection JSUnresolvedFunction | ||
// server expects values in ns | ||
this.startTime = millis * 1000000; | ||
return this; | ||
}; | ||
@@ -1129,4 +1123,4 @@ | ||
SubscriptionOptions.prototype.setStartWithLastReceived = function() { | ||
this.startPosition = proto.StartPosition.LAST_RECEIVED; | ||
return this; | ||
this.startPosition = proto.StartPosition.LAST_RECEIVED; | ||
return this; | ||
}; | ||
@@ -1139,4 +1133,4 @@ | ||
SubscriptionOptions.prototype.setDeliverAllAvailable = function() { | ||
this.startPosition = proto.StartPosition.FIRST; | ||
return this; | ||
this.startPosition = proto.StartPosition.FIRST; | ||
return this; | ||
}; | ||
@@ -1149,4 +1143,4 @@ | ||
SubscriptionOptions.prototype.setManualAckMode = function(tf) { | ||
this.manualAcks = tf; | ||
return this; | ||
this.manualAcks = tf; | ||
return this; | ||
}; | ||
@@ -1160,13 +1154,4 @@ | ||
SubscriptionOptions.prototype.setDurableName = function(durableName) { | ||
this.durableName = durableName; | ||
return this; | ||
this.durableName = durableName; | ||
return this; | ||
}; | ||
{ | ||
"name": "node-nats-streaming", | ||
"version": "0.2.0", | ||
"version": "0.2.2", | ||
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system", | ||
@@ -32,10 +32,12 @@ "keywords": [ | ||
"scripts": { | ||
"coveralls": "cat ./reports/coverage/lcov.info | coveralls", | ||
"depcheck": "dependency-check . lib/* lib/pb/*", | ||
"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*.js", | ||
"cover": "nyc report --reporter=html && open coverage/index.html", | ||
"coveralls": "npm run test && nyc report --reporter=text-lcov | coveralls", | ||
"depcheck": "dependency-check . lib/*", | ||
"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*", | ||
"fmt": "js-beautify -n --config crockford.jscsrc -r lib/stan.js test/*.js test/support/*.js examples/* bench/*.js", | ||
"gen": "protoc --js_out=import_style=commonjs,binary:. lib/pb/protocol.proto", | ||
"lint": "jshint --reporter node_modules/jshint-stylish lib/*.js test/*.js test/support/*.js examples/*.js", | ||
"npm-publish": "npm publish https://github.com/nats-io/node-nats-streaming.git --access public", | ||
"lint": "eslint ./lib/stan.js ./examples ./bench ./test", | ||
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:unit", | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --exit --timeout 10000 --slow 750 && istanbul check-coverage" | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' nyc mocha --timeout 10000 --slow 750", | ||
"wtf": "wtfnode /usr/local/bin/_mocha --timeout 10000 --slow 750" | ||
}, | ||
@@ -46,18 +48,27 @@ "engines": { | ||
"dependencies": { | ||
"google-protobuf": "^3.6.1", | ||
"nats": "^1.2.2", | ||
"nuid": "^1.0.0" | ||
"google-protobuf": "^3.7.1", | ||
"nats": "^1.2.4", | ||
"nuid": "^1.1.0" | ||
}, | ||
"devDependencies": { | ||
"coveralls": "^3.0.2", | ||
"coveralls": "^3.0.3", | ||
"dependency-check": "2.6.x", | ||
"istanbul": "^0.4.5", | ||
"jshint": "^2.10.1", | ||
"jshint-stylish": "^2.2.1", | ||
"eslint": "^5.10.0", | ||
"js-beautify": "^1.9.1", | ||
"minimist": "^1.2.0", | ||
"mocha": "^5.2.0", | ||
"mocha-lcov-reporter": "1.2.x", | ||
"mocha-multi": "^1.0.1", | ||
"should": "^11.2.1" | ||
"nyc": "^13.3.0", | ||
"should": "^13.2.3" | ||
}, | ||
"typings": "./index.d.ts" | ||
"typings": "./index.d.ts", | ||
"nyc": { | ||
"exclude": [ | ||
"test/**", | ||
"examples/**", | ||
"bench/**", | ||
"lib/pb/**" | ||
] | ||
} | ||
} |
Sorry, the diff of this file is too big to display
1006296
28
4468
10
Updatedgoogle-protobuf@^3.7.1
Updatednats@^1.2.4
Updatednuid@^1.1.0