Comparing version 2.0.0-2 to 2.0.0-4
@@ -114,4 +114,5 @@ /* | ||
/** First argument will be an error if an error occurred (such as a timeout) or null. | ||
* Message argument is the received message. | ||
/** [[Client.subscribe]] callbacks. First argument will be an error if an error occurred (such as a timeout) or null. | ||
* Message argument is the received message (which should be treated as debug information when an error is provided). | ||
* | ||
*/ | ||
@@ -122,6 +123,2 @@ export interface MsgCallback { | ||
export interface DrainSubCallback { | ||
(err: NatsError | null, sid: number): void; | ||
} | ||
export interface Callback { | ||
@@ -141,4 +138,7 @@ (err: NatsError | null): void; | ||
sid: number; | ||
/** Number of bytes in the payload */ | ||
size: number; | ||
/** | ||
* Publishes a reply. Note this method can throw if the connection is closed. | ||
* @param data | ||
*/ | ||
respond(data?: any): void; | ||
} | ||
@@ -178,19 +178,5 @@ | ||
*/ | ||
subscribe(subject: string, callback: MsgCallback, opts?: SubscriptionOptions): number; | ||
subscribe(subject: string, callback: MsgCallback, opts?: SubscriptionOptions): Subscription | undefined; | ||
/** | ||
* Unsubscribe to a given subscription id, with optional max number of messages before unsubscribing. | ||
*/ | ||
unsubscribe(sid: number, max?: number):void; | ||
/** | ||
* Draining a subscription is similar to unsubscribe but inbound pending messages are | ||
* not discarded. When the last in-flight message is processed, the subscription handler | ||
* is removed. | ||
* @param sid | ||
* @param callback | ||
*/ | ||
drainSubscription(sid: number, callback?:DrainSubCallback):void; | ||
/** | ||
* Drains all subscriptions. If an opt_callback is provided, the callback | ||
@@ -215,3 +201,3 @@ * is called if there's an error with an error argument. | ||
*/ | ||
request(subject: string, callback: MsgCallback, data?: any, options?: RequestOptions): number; | ||
request(subject: string, callback: MsgCallback, data?: any, options?: RequestOptions): Request | undefined; | ||
@@ -224,2 +210,27 @@ /** | ||
export interface Subscription { | ||
sid: number; | ||
/** | ||
* Unsubscribe with optional max number of messages before unsubscribing. | ||
*/ | ||
unsubscribe(max?: number): void; | ||
/** | ||
* Draining a subscription is similar to unsubscribe but inbound pending messages are | ||
* not discarded. When the last in-flight message is processed, the subscription handler | ||
* is removed. | ||
* @param sid | ||
* @param callback | ||
*/ | ||
drain(callback?: Callback): void; | ||
} | ||
export interface Request { | ||
sid: number; | ||
/** | ||
* Unsubscribe with optional max number of messages before unsubscribing. | ||
*/ | ||
cancel(): void; | ||
} | ||
declare class NatsError implements Error { | ||
@@ -226,0 +237,0 @@ public name: string; |
156
lib/nats.js
@@ -93,2 +93,3 @@ /* | ||
ErrorCode.CONN_CLOSED = 'CONN_CLOSED' | ||
ErrorCode.DISCONNECT_ERR = 'DISCONNECT' | ||
ErrorCode.CONN_DRAINING = 'CONN_DRAINING' | ||
@@ -127,2 +128,3 @@ ErrorCode.CONN_ERR = 'CONN_ERR' | ||
const CONN_TIMEOUT_MSG = 'Connection timeout' | ||
const DISCONNECT_MSG = 'Client disconnected, flush was reset' | ||
const INVALID_ENCODING_MSG_PREFIX = 'Invalid Encoding:' | ||
@@ -680,7 +682,29 @@ const NKEY_OR_JWT_REQ_MSG = 'An Nkey or User JWT callback needs to be defined.' | ||
stream.on('close', () => { | ||
const done = (this.closed === true || this.options.reconnect === false || this.servers.length === 0) | ||
// if connected, it resets everything as partial buffers may have been sent | ||
// this will also reset the heartbeats, but not other timers on requests or subscriptions | ||
const pongs = this.pongs | ||
this.closeStream() | ||
if (stream.bytesRead > 0) { | ||
// if the client will reconnect, re-setup pongs/pending to sending commands | ||
if (!done) { | ||
this.pongs = [] | ||
this.pending = [] | ||
this.pSize = 0 | ||
} | ||
// now we tell them that we bailed | ||
if (pongs) { | ||
pongs.forEach((cb) => { | ||
if (typeof cb === 'function') { | ||
try { | ||
cb(new NatsError(DISCONNECT_MSG, ErrorCode.DISCONNECT_ERR)) | ||
} catch (_) { | ||
// don't fail | ||
} | ||
} | ||
}) | ||
} | ||
this.emit('disconnect') | ||
} | ||
if (this.closed === true || this.options.reconnect === false || this.servers.length === 0) { | ||
if (done) { | ||
this.cleanupTimers() | ||
@@ -805,37 +829,5 @@ this.emit('close') | ||
Client.prototype.createConnection = function () { | ||
// Commands may have been queued during reconnect. Discard everything except: | ||
// 1) ping requests with a pong callback | ||
// 2) publish requests | ||
// | ||
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG | ||
// response is no longer relevant, and any UNSUB will be accounted for when we | ||
// sync our SUBs. Without this, users of the client may miss state transitions | ||
// via callbacks, would have to track the client's internal connection state, | ||
// and may have to double buffer messages (which we are already doing) if they | ||
// wanted to ensure their messages reach the server. | ||
const pong = [] | ||
const pend = [] | ||
let pSize = 0 | ||
if (this.pending !== null) { | ||
let pongIndex = 0 | ||
this.pending.forEach((cmd) => { | ||
const cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd) | ||
if (cmd === PING_REQUEST && this.pongs !== null && pongIndex < this.pongs.length) { | ||
// filter out any useless ping requests (no pong callback, nop flush) | ||
const p = this.pongs[pongIndex++] | ||
if (p !== undefined) { | ||
pend.push(cmd) | ||
pSize += cmdLen | ||
pong.push(p) | ||
} | ||
} else if (cmd.length > 3 && cmd[0] === 'P' && cmd[1] === 'U' && cmd[2] === 'B') { | ||
pend.push(cmd) | ||
pSize += cmdLen | ||
} | ||
}) | ||
} | ||
this.pongs = pong | ||
this.pending = pend | ||
this.pSize = pSize | ||
this.pongs = this.pongs || [] | ||
this.pending = this.pending || [] | ||
this.pSize = this.pSize || 0 | ||
this.pstate = AWAITING_CONTROL | ||
@@ -941,2 +933,4 @@ | ||
this.inbound = null | ||
// if we are not connected, let's not queue up heartbeats | ||
this.cancelHeartbeat() | ||
} | ||
@@ -1018,3 +1012,3 @@ | ||
if (this.closed || this.pending === null) { | ||
if (this.closed) { | ||
return | ||
@@ -1370,2 +1364,3 @@ } | ||
if (sub.callback) { | ||
let err = null | ||
let msg = this.payload.msg | ||
@@ -1380,8 +1375,8 @@ if (this.options.json) { | ||
} catch (e) { | ||
msg = e | ||
err = e | ||
} | ||
} | ||
try { | ||
const v = { data: msg, reply: this.payload.reply, subject: this.payload.subj, sid: this.payload.sid, size: this.payload.size } | ||
sub.callback(undefined, v) | ||
const v = new Msg(this, this.payload.subj, this.payload.reply, msg, this.payload.sid) | ||
sub.callback(err, v) | ||
} catch (error) { | ||
@@ -1480,4 +1475,6 @@ this.emit('error', error) | ||
const errs = [] | ||
subs.forEach((sub) => { | ||
this.drainSubscription(sub.sid, () => { | ||
this.drainSubscription(sub.sid, (err) => { | ||
errs.push(err) | ||
drains.push(sub) | ||
@@ -1489,3 +1486,10 @@ if (drains.length === subs.length) { | ||
if (typeof callback === 'function') { | ||
callback() | ||
errs.forEach((e, i) => { | ||
errs[i] = e.toString() | ||
}) | ||
let e = null | ||
if (errs.length > 0) { | ||
e = new Error('errors while draining:\n' + errs.join(('\n'))) | ||
} | ||
callback(e) | ||
} | ||
@@ -1663,3 +1667,3 @@ }) | ||
* @param {Function} callback - callback arguments are data, reply subject (may be undefined), and subscription id | ||
* @return {Number} | ||
* @return {Subscription | undefined} | ||
* @api public | ||
@@ -1670,3 +1674,3 @@ */ | ||
if (this.handledSubArgErrors(subject, callback, opts)) { | ||
return 0 | ||
return undefined | ||
} | ||
@@ -1703,13 +1707,8 @@ | ||
} | ||
return this.ssid | ||
sub.s = new Subscription(this, this.ssid) | ||
return sub.s | ||
} | ||
/** | ||
* Unsubscribe to a given Subscriber Id, with optional max parameter. | ||
* Unsubscribing to a subscription that already yielded the specified number of messages | ||
* will clear any pending timeout callbacks. | ||
* | ||
* @param {Number} sid | ||
* @param {Number} [max] | ||
* @api public | ||
* @api private | ||
*/ | ||
@@ -1754,7 +1753,3 @@ Client.prototype.unsubscribe = function (sid, max) { | ||
/** | ||
* Draining a subscription is similar to unsubscribe but inbound pending messages are | ||
* not discarded. When the last in-flight message is processed, the subscription handler | ||
* is removed. | ||
* @param {Number} sid | ||
* @param {Function} [callback] | ||
* @private | ||
*/ | ||
@@ -1784,3 +1779,3 @@ Client.prototype.drainSubscription = function (sid, callback) { | ||
this.sendCommand(proto.join(SPC)) | ||
this.flush(() => { | ||
this.flush((err) => { | ||
if (sub.timeout) { | ||
@@ -1793,3 +1788,3 @@ clearTimeout(sub.timeout) | ||
if (typeof callback === 'function') { | ||
callback() | ||
callback(err) | ||
} | ||
@@ -1817,3 +1812,3 @@ }) | ||
* @param {Function} [callback] | ||
* @return {Number} | ||
* @return {Request} | ||
* @api public | ||
@@ -1832,5 +1827,5 @@ */ | ||
const inbox = this.createInbox() | ||
const sid = this.subscribe(inbox, callback, opts) | ||
const sub = this.subscribe(inbox, callback, opts) | ||
this.doPublish(subject, inbox, data, callback) | ||
return sid | ||
return new Request(sub.nc, sub.sid) | ||
} else { | ||
@@ -1847,3 +1842,4 @@ const conf = this.initMuxRequestDetails(callback, opts.max) | ||
} | ||
return conf.id | ||
conf.r = new Request(this, conf.id) | ||
return conf.r | ||
} | ||
@@ -2055,1 +2051,35 @@ } | ||
} | ||
function Subscription (nc, sid) { | ||
this.nc = nc | ||
this.sid = sid | ||
} | ||
Subscription.prototype.unsubscribe = function (max) { | ||
this.nc.unsubscribe(this.sid, max) | ||
} | ||
Subscription.prototype.drain = function (callback) { | ||
this.nc.drainSubscription(this.sid, callback) | ||
} | ||
function Request (nc, sid) { | ||
this.nc = nc | ||
this.sid = sid | ||
} | ||
Request.prototype.cancel = function () { | ||
this.nc.unsubscribe(this.sid) | ||
} | ||
function Msg (nc, subject, reply, data, sid) { | ||
this.nc = nc | ||
this.subject = subject | ||
this.reply = reply | ||
this.data = data | ||
this.sid = sid | ||
} | ||
Msg.prototype.respond = function (data) { | ||
this.nc.publish(this.reply, data) | ||
} |
@@ -43,2 +43,15 @@ # NATS.js 2.0 API Changes | ||
- Old subscription API returned a number, the new API returns a subscription object. The subscription object provides a `unsubscribe()` and `drain()`: | ||
``` | ||
const sub = nc.subscribe(subj, (err, m) => { | ||
// do something with the message | ||
}) | ||
// unsubscribe after 10 messages | ||
sub.unsubscribe(10) | ||
// drain the subscription | ||
sub.drain(() => { | ||
}) | ||
``` | ||
## Changes to `request` | ||
@@ -64,2 +77,6 @@ - The previous request signature was: `request(subject: string, msg: any, options: RequestOptions, callback: Function): number;` | ||
- `request()` now returns an object | ||
- Requests can be cancelled before they timeout by invoking `cancel()` on the request. | ||
@@ -89,6 +106,17 @@ ## Message Callbacks | ||
sid: number; | ||
size: number; // number of bytes in a payload before any decoding | ||
} | ||
``` | ||
- Message allows to respond to requests: | ||
``` | ||
nc.subscribe(subj, (err, m) => { | ||
if(err) { | ||
console.error(err) | ||
return | ||
} | ||
// echo back to the client - note this can throw if the connection is closed | ||
m.respond(m.data) | ||
}, {queue: 'queue}) | ||
``` | ||
@@ -95,0 +123,0 @@ ## `timeout` |
{ | ||
"name": "nats", | ||
"version": "2.0.0-2", | ||
"version": "2.0.0-4", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -45,17 +45,17 @@ "keywords": [ | ||
"dependencies": { | ||
"nuid": "^1.0.0", | ||
"ts-nkeys": "^1.0.8" | ||
"nuid": "^1.1.2", | ||
"ts-nkeys": "^1.0.16" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^12.6.2", | ||
"coveralls": "^3.0.5", | ||
"@types/node": "^13.7.4", | ||
"coveralls": "^3.0.9", | ||
"dependency-check": "^4.1.0", | ||
"eslint": "^6.7.2", | ||
"eslint": "^6.8.0", | ||
"minimist": "^1.2.0", | ||
"mocha": "^6.1.4", | ||
"mocha": "^7.0.1", | ||
"mocha-lcov-reporter": "1.3.0", | ||
"nyc": "^14.1.1", | ||
"nyc": "^15.0.0", | ||
"should": "^13.2.3", | ||
"standard": "^14.3.1", | ||
"typescript": "^3.5.3" | ||
"typescript": "^3.8.2" | ||
}, | ||
@@ -62,0 +62,0 @@ "typings": "./index.d.ts", |
@@ -18,2 +18,5 @@ # NATS.js - Node.js Client | ||
npm install nats | ||
# to install current dev version: | ||
npm install nats@next | ||
``` | ||
@@ -32,3 +35,3 @@ | ||
// Simple Subscriber - error is set if there was some error | ||
nc.subscribe('foo', function (err, msg) { | ||
nc.subscribe('foo', (err, msg) => { | ||
if (err) { | ||
@@ -42,4 +45,4 @@ console.error(err) | ||
// Unsubscribing | ||
const sid = nc.subscribe('foo', function (err, m) {}) | ||
nc.unsubscribe(sid) | ||
const sub = nc.subscribe('foo', (err, m) => {}) | ||
sub.unsubscribe() | ||
@@ -85,3 +88,3 @@ | ||
// Replies | ||
nc.subscribe('help', function (_, m) { | ||
nc.subscribe('help', (_, m) => { | ||
nc.publish(m.reply, 'I can help!') | ||
@@ -217,6 +220,6 @@ }) | ||
let c1 = 0 | ||
const sid1 = nc.subscribe('foo', () => { | ||
const sub = nc.subscribe('foo', () => { | ||
c1++ | ||
if (c1 === 1) { | ||
nc.drainSubscription(sid1, (err, sid) => { | ||
sub.drain((err) => { | ||
if (err) { | ||
@@ -226,3 +229,3 @@ console.error(err) | ||
} | ||
console.log('subscription', sid, 'drained') | ||
console.log(`subscription ${sub.sid} drained`) | ||
}) | ||
@@ -349,3 +352,7 @@ } | ||
// been processed. | ||
nc.flush(() => { | ||
nc.flush((err) => { | ||
if (err) { | ||
console.error('error flushing', err) | ||
return | ||
} | ||
console.log('round trip to the server done') | ||
@@ -375,4 +382,8 @@ }) | ||
nc.subscribe('foo', () => {}, { max: 100 }) | ||
// or | ||
const sub = nc.subscribe('foo', () => {}) | ||
sub.unsubscribe(100) | ||
// Encodings | ||
@@ -438,3 +449,3 @@ | ||
// the client has to create a new connection. | ||
nc.on('close', function () { | ||
nc.on('close', () => { | ||
console.log('close') | ||
@@ -444,3 +455,3 @@ }) | ||
// emitted whenever the client unsubscribes | ||
nc.on('unsubscribe', function (sid, subject) { | ||
nc.on('unsubscribe', (sid, subject) => { | ||
console.log('unsubscribed subscription', sid, 'for subject', subject) | ||
@@ -453,3 +464,3 @@ }) | ||
// on the specific subject | ||
nc.on('permission_error', function (err) { | ||
nc.on('permission_error', (err) => { | ||
console.error('got a permissions error', err.message) | ||
@@ -456,0 +467,0 @@ }) |
- [ ] try to bind async server errors to matching callbacks | ||
- [ ] normalize names with nats.ts so that the exports are the same | ||
- [ ] subscription objects | ||
- [ ] move to ava.js | ||
- [X] subscription objects | ||
- [ ] move to ava.js | ||
- [X] m.respond() - to reply to requests | ||
- [X] Request object needs `cancel()` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
113297
2233
527
Updatednuid@^1.1.2
Updatedts-nkeys@^1.0.16