Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 2.0.0-2 to 2.0.0-4

59

index.d.ts

@@ -114,4 +114,5 @@ /*

/** First argument will be an error if an error occurred (such as a timeout) or null.
* Message argument is the received message.
/** [[Client.subscribe]] callbacks. First argument will be an error if an error occurred (such as a timeout) or null.
* Message argument is the received message (which should be treated as debug information when an error is provided).
*
*/

@@ -122,6 +123,2 @@ export interface MsgCallback {

export interface DrainSubCallback {
(err: NatsError | null, sid: number): void;
}
export interface Callback {

@@ -141,4 +138,7 @@ (err: NatsError | null): void;

sid: number;
/** Number of bytes in the payload */
size: number;
/**
* Publishes a reply. Note this method can throw if the connection is closed.
* @param data
*/
respond(data?: any): void;
}

@@ -178,19 +178,5 @@

*/
subscribe(subject: string, callback: MsgCallback, opts?: SubscriptionOptions): number;
subscribe(subject: string, callback: MsgCallback, opts?: SubscriptionOptions): Subscription | undefined;
/**
* Unsubscribe to a given subscription id, with optional max number of messages before unsubscribing.
*/
unsubscribe(sid: number, max?: number):void;
/**
* Draining a subscription is similar to unsubscribe but inbound pending messages are
* not discarded. When the last in-flight message is processed, the subscription handler
* is removed.
* @param sid
* @param callback
*/
drainSubscription(sid: number, callback?:DrainSubCallback):void;
/**
* Drains all subscriptions. If an opt_callback is provided, the callback

@@ -215,3 +201,3 @@ * is called if there's an error with an error argument.

*/
request(subject: string, callback: MsgCallback, data?: any, options?: RequestOptions): number;
request(subject: string, callback: MsgCallback, data?: any, options?: RequestOptions): Request | undefined;

@@ -224,2 +210,27 @@ /**

export interface Subscription {
sid: number;
/**
* Unsubscribe with optional max number of messages before unsubscribing.
*/
unsubscribe(max?: number): void;
/**
* Draining a subscription is similar to unsubscribe but inbound pending messages are
* not discarded. When the last in-flight message is processed, the subscription handler
* is removed.
* @param sid
* @param callback
*/
drain(callback?: Callback): void;
}
export interface Request {
sid: number;
/**
* Unsubscribe with optional max number of messages before unsubscribing.
*/
cancel(): void;
}
declare class NatsError implements Error {

@@ -226,0 +237,0 @@ public name: string;

@@ -93,2 +93,3 @@ /*

ErrorCode.CONN_CLOSED = 'CONN_CLOSED'
ErrorCode.DISCONNECT_ERR = 'DISCONNECT'
ErrorCode.CONN_DRAINING = 'CONN_DRAINING'

@@ -127,2 +128,3 @@ ErrorCode.CONN_ERR = 'CONN_ERR'

const CONN_TIMEOUT_MSG = 'Connection timeout'
const DISCONNECT_MSG = 'Client disconnected, flush was reset'
const INVALID_ENCODING_MSG_PREFIX = 'Invalid Encoding:'

@@ -680,7 +682,29 @@ const NKEY_OR_JWT_REQ_MSG = 'An Nkey or User JWT callback needs to be defined.'

stream.on('close', () => {
const done = (this.closed === true || this.options.reconnect === false || this.servers.length === 0)
// if connected, it resets everything as partial buffers may have been sent
// this will also reset the heartbeats, but not other timers on requests or subscriptions
const pongs = this.pongs
this.closeStream()
if (stream.bytesRead > 0) {
// if the client will reconnect, re-setup pongs/pending to sending commands
if (!done) {
this.pongs = []
this.pending = []
this.pSize = 0
}
// now we tell them that we bailed
if (pongs) {
pongs.forEach((cb) => {
if (typeof cb === 'function') {
try {
cb(new NatsError(DISCONNECT_MSG, ErrorCode.DISCONNECT_ERR))
} catch (_) {
// don't fail
}
}
})
}
this.emit('disconnect')
}
if (this.closed === true || this.options.reconnect === false || this.servers.length === 0) {
if (done) {
this.cleanupTimers()

@@ -805,37 +829,5 @@ this.emit('close')

Client.prototype.createConnection = function () {
// Commands may have been queued during reconnect. Discard everything except:
// 1) ping requests with a pong callback
// 2) publish requests
//
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG
// response is no longer relevant, and any UNSUB will be accounted for when we
// sync our SUBs. Without this, users of the client may miss state transitions
// via callbacks, would have to track the client's internal connection state,
// and may have to double buffer messages (which we are already doing) if they
// wanted to ensure their messages reach the server.
const pong = []
const pend = []
let pSize = 0
if (this.pending !== null) {
let pongIndex = 0
this.pending.forEach((cmd) => {
const cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd)
if (cmd === PING_REQUEST && this.pongs !== null && pongIndex < this.pongs.length) {
// filter out any useless ping requests (no pong callback, nop flush)
const p = this.pongs[pongIndex++]
if (p !== undefined) {
pend.push(cmd)
pSize += cmdLen
pong.push(p)
}
} else if (cmd.length > 3 && cmd[0] === 'P' && cmd[1] === 'U' && cmd[2] === 'B') {
pend.push(cmd)
pSize += cmdLen
}
})
}
this.pongs = pong
this.pending = pend
this.pSize = pSize
this.pongs = this.pongs || []
this.pending = this.pending || []
this.pSize = this.pSize || 0
this.pstate = AWAITING_CONTROL

@@ -941,2 +933,4 @@

this.inbound = null
// if we are not connected, let's not queue up heartbeats
this.cancelHeartbeat()
}

@@ -1018,3 +1012,3 @@

if (this.closed || this.pending === null) {
if (this.closed) {
return

@@ -1370,2 +1364,3 @@ }

if (sub.callback) {
let err = null
let msg = this.payload.msg

@@ -1380,8 +1375,8 @@ if (this.options.json) {

} catch (e) {
msg = e
err = e
}
}
try {
const v = { data: msg, reply: this.payload.reply, subject: this.payload.subj, sid: this.payload.sid, size: this.payload.size }
sub.callback(undefined, v)
const v = new Msg(this, this.payload.subj, this.payload.reply, msg, this.payload.sid)
sub.callback(err, v)
} catch (error) {

@@ -1480,4 +1475,6 @@ this.emit('error', error)

const errs = []
subs.forEach((sub) => {
this.drainSubscription(sub.sid, () => {
this.drainSubscription(sub.sid, (err) => {
errs.push(err)
drains.push(sub)

@@ -1489,3 +1486,10 @@ if (drains.length === subs.length) {

if (typeof callback === 'function') {
callback()
errs.forEach((e, i) => {
errs[i] = e.toString()
})
let e = null
if (errs.length > 0) {
e = new Error('errors while draining:\n' + errs.join(('\n')))
}
callback(e)
}

@@ -1663,3 +1667,3 @@ })

* @param {Function} callback - callback arguments are data, reply subject (may be undefined), and subscription id
* @return {Number}
* @return {Subscription | undefined}
* @api public

@@ -1670,3 +1674,3 @@ */

if (this.handledSubArgErrors(subject, callback, opts)) {
return 0
return undefined
}

@@ -1703,13 +1707,8 @@

}
return this.ssid
sub.s = new Subscription(this, this.ssid)
return sub.s
}
/**
* Unsubscribe to a given Subscriber Id, with optional max parameter.
* Unsubscribing to a subscription that already yielded the specified number of messages
* will clear any pending timeout callbacks.
*
* @param {Number} sid
* @param {Number} [max]
* @api public
* @api private
*/

@@ -1754,7 +1753,3 @@ Client.prototype.unsubscribe = function (sid, max) {

/**
* Draining a subscription is similar to unsubscribe but inbound pending messages are
* not discarded. When the last in-flight message is processed, the subscription handler
* is removed.
* @param {Number} sid
* @param {Function} [callback]
* @private
*/

@@ -1784,3 +1779,3 @@ Client.prototype.drainSubscription = function (sid, callback) {

this.sendCommand(proto.join(SPC))
this.flush(() => {
this.flush((err) => {
if (sub.timeout) {

@@ -1793,3 +1788,3 @@ clearTimeout(sub.timeout)

if (typeof callback === 'function') {
callback()
callback(err)
}

@@ -1817,3 +1812,3 @@ })

* @param {Function} [callback]
* @return {Number}
* @return {Request}
* @api public

@@ -1832,5 +1827,5 @@ */

const inbox = this.createInbox()
const sid = this.subscribe(inbox, callback, opts)
const sub = this.subscribe(inbox, callback, opts)
this.doPublish(subject, inbox, data, callback)
return sid
return new Request(sub.nc, sub.sid)
} else {

@@ -1847,3 +1842,4 @@ const conf = this.initMuxRequestDetails(callback, opts.max)

}
return conf.id
conf.r = new Request(this, conf.id)
return conf.r
}

@@ -2055,1 +2051,35 @@ }

}
function Subscription (nc, sid) {
this.nc = nc
this.sid = sid
}
Subscription.prototype.unsubscribe = function (max) {
this.nc.unsubscribe(this.sid, max)
}
Subscription.prototype.drain = function (callback) {
this.nc.drainSubscription(this.sid, callback)
}
function Request (nc, sid) {
this.nc = nc
this.sid = sid
}
Request.prototype.cancel = function () {
this.nc.unsubscribe(this.sid)
}
function Msg (nc, subject, reply, data, sid) {
this.nc = nc
this.subject = subject
this.reply = reply
this.data = data
this.sid = sid
}
Msg.prototype.respond = function (data) {
this.nc.publish(this.reply, data)
}

@@ -43,2 +43,15 @@ # NATS.js 2.0 API Changes

- Old subscription API returned a number, the new API returns a subscription object. The subscription object provides a `unsubscribe()` and `drain()`:
```
const sub = nc.subscribe(subj, (err, m) => {
// do something with the message
})
// unsubscribe after 10 messages
sub.unsubscribe(10)
// drain the subscription
sub.drain(() => {
})
```
## Changes to `request`

@@ -64,2 +77,6 @@ - The previous request signature was: `request(subject: string, msg: any, options: RequestOptions, callback: Function): number;`

- `request()` now returns an object
- Requests can be cancelled before they timeout by invoking `cancel()` on the request.

@@ -89,6 +106,17 @@ ## Message Callbacks

sid: number;
size: number; // number of bytes in a payload before any decoding
}
```
- Message allows to respond to requests:
```
nc.subscribe(subj, (err, m) => {
if(err) {
console.error(err)
return
}
// echo back to the client - note this can throw if the connection is closed
m.respond(m.data)
}, {queue: 'queue})
```

@@ -95,0 +123,0 @@ ## `timeout`

{
"name": "nats",
"version": "2.0.0-2",
"version": "2.0.0-4",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -45,17 +45,17 @@ "keywords": [

"dependencies": {
"nuid": "^1.0.0",
"ts-nkeys": "^1.0.8"
"nuid": "^1.1.2",
"ts-nkeys": "^1.0.16"
},
"devDependencies": {
"@types/node": "^12.6.2",
"coveralls": "^3.0.5",
"@types/node": "^13.7.4",
"coveralls": "^3.0.9",
"dependency-check": "^4.1.0",
"eslint": "^6.7.2",
"eslint": "^6.8.0",
"minimist": "^1.2.0",
"mocha": "^6.1.4",
"mocha": "^7.0.1",
"mocha-lcov-reporter": "1.3.0",
"nyc": "^14.1.1",
"nyc": "^15.0.0",
"should": "^13.2.3",
"standard": "^14.3.1",
"typescript": "^3.5.3"
"typescript": "^3.8.2"
},

@@ -62,0 +62,0 @@ "typings": "./index.d.ts",

@@ -18,2 +18,5 @@ # NATS.js - Node.js Client

npm install nats
# to install current dev version:
npm install nats@next
```

@@ -32,3 +35,3 @@

// Simple Subscriber - error is set if there was some error
nc.subscribe('foo', function (err, msg) {
nc.subscribe('foo', (err, msg) => {
if (err) {

@@ -42,4 +45,4 @@ console.error(err)

// Unsubscribing
const sid = nc.subscribe('foo', function (err, m) {})
nc.unsubscribe(sid)
const sub = nc.subscribe('foo', (err, m) => {})
sub.unsubscribe()

@@ -85,3 +88,3 @@

// Replies
nc.subscribe('help', function (_, m) {
nc.subscribe('help', (_, m) => {
nc.publish(m.reply, 'I can help!')

@@ -217,6 +220,6 @@ })

let c1 = 0
const sid1 = nc.subscribe('foo', () => {
const sub = nc.subscribe('foo', () => {
c1++
if (c1 === 1) {
nc.drainSubscription(sid1, (err, sid) => {
sub.drain((err) => {
if (err) {

@@ -226,3 +229,3 @@ console.error(err)

}
console.log('subscription', sid, 'drained')
console.log(`subscription ${sub.sid} drained`)
})

@@ -349,3 +352,7 @@ }

// been processed.
nc.flush(() => {
nc.flush((err) => {
if (err) {
console.error('error flushing', err)
return
}
console.log('round trip to the server done')

@@ -375,4 +382,8 @@ })

nc.subscribe('foo', () => {}, { max: 100 })
// or
const sub = nc.subscribe('foo', () => {})
sub.unsubscribe(100)
// Encodings

@@ -438,3 +449,3 @@

// the client has to create a new connection.
nc.on('close', function () {
nc.on('close', () => {
console.log('close')

@@ -444,3 +455,3 @@ })

// emitted whenever the client unsubscribes
nc.on('unsubscribe', function (sid, subject) {
nc.on('unsubscribe', (sid, subject) => {
console.log('unsubscribed subscription', sid, 'for subject', subject)

@@ -453,3 +464,3 @@ })

// on the specific subject
nc.on('permission_error', function (err) {
nc.on('permission_error', (err) => {
console.error('got a permissions error', err.message)

@@ -456,0 +467,0 @@ })

- [ ] try to bind async server errors to matching callbacks
- [ ] normalize names with nats.ts so that the exports are the same
- [ ] subscription objects
- [ ] move to ava.js
- [X] subscription objects
- [ ] move to ava.js
- [X] m.respond() - to reply to requests
- [X] Request object needs `cancel()`
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc