Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

changefeed

Package Overview
Dependencies
Maintainers
3
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

changefeed - npm Package Compare versions

Comparing version 1.1.3 to 1.1.4

155

lib/listener.js

@@ -12,2 +12,3 @@ /*

var mod_assert = require('assert-plus');
var mod_backoff = require('backoff');
var mod_bunyan = require('bunyan');

@@ -36,3 +37,3 @@ var mod_restify = require('restify-clients');

* retries: Infinity
* }
* },
* log: new mod_bunyan({

@@ -85,9 +86,4 @@ * name: 'my_logger',

self.changeKind = options.changeKind;
self.backoff_opts = options.backoff;
if (options.backoff) {
self.minTimeout = options.minTimeout || 1000;
self.maxTimeout = options.maxTimetout || Infinity;
self.retries = options.retries || Infinity;
}
self.initBootstrap = true;

@@ -105,2 +101,3 @@ }

var self = this;
var log = self.log;
var registration = {

@@ -111,12 +108,8 @@ instance: self.instance,

};
self.log.trace({ cfRegistration: registration }, 'register: start');
log.trace({ cfRegistration: registration }, 'register: start');
var wskey = shed.generateKey();
var clientOpts = {
log: self.log,
log: log,
url: self.url,
retry: {
minTimeout: self.minTimeout,
maxTimeout: self.maxTimeout,
retries: self.retries
}
retry: self.backoff_opts
};

@@ -134,68 +127,92 @@

var client = mod_restify.createClient(clientOpts);
client.get(upgradeOpts, function (err, res, socket, head) {
if (err) {
self.log.error('err: %j', err);
self.emit('error');
var expBackoff_opts = self.backoff_opts ? {
initialDelay: self.backoff_opts.minTimeout,
maxDelay: self.maxTimeout
} : null;
var expBackoff = mod_backoff.exponential(expBackoff_opts);
if (self.backoff_opts) {
log.info('cf: backoff enabled');
expBackoff.failAfter(self.backoff_opts.retries);
}
expBackoff.on('backoff', function _backoff(number, delay) {
if (number > 0) {
log.warn('Backing off -- retry count: %s delay: %s', number, delay);
}
});
expBackoff.on('ready', function _ready(number, delay) {
client.get(upgradeOpts, function (err, res, socket, head) {
if (err) {
log.error('err: %j', err);
self.emit('error');
}
res.once('upgradeResult', function (err2, res2, socket2, head2) {
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey);
res.once('upgradeResult', function (err2, res2, socket2, head2) {
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey);
// Send registration
try {
wsc.send(JSON.stringify(registration));
} catch (ex) {
self.log.error('ex: %s', ex.message);
}
// Send registration
try {
wsc.send(JSON.stringify(registration));
} catch (ex) {
log.error('ex: %s', ex.message);
}
var heartbeat = setInterval(function _poll() {
self.log.trace('cf: _poll: start');
wsc.send('heartbeat');
}, pollInterval);
var heartbeat = setInterval(function _poll() {
log.trace('cf: _poll: start');
wsc.send('heartbeat');
}, pollInterval);
wsc.on('end', function _end() {
self.log.trace('cf: _end: start');
self.emit('connection-end');
self.log.info('cf: websocket ended');
clearInterval(heartbeat);
});
wsc.on('end', function _end() {
log.trace('cf: _end: start');
self.emit('connection-end');
log.info('cf: websocket ended');
clearInterval(heartbeat);
if (self.backoff_opts) {
expBackoff.backoff();
}
});
wsc.on('connectionReset', function _connectionReset() {
self.log.trace('cf: _connectionReset: start');
self.emit('connection-end');
self.log.info('cf: websocket ended');
clearInterval(heartbeat);
});
wsc.on('connectionReset', function _connectionReset() {
log.trace('cf: _connectionReset: start');
self.emit('connection-end');
log.info('cf: websocket ended');
clearInterval(heartbeat);
});
// Handles publisher change feed items and bootstrap response.
// The only valid response from the publisher when the listener is
// in the initBootstrap state, is a bootstrap object. From that
// point forward it is expected that all items are change feed items
wsc.on('text', function _receivedText(text) {
self.log.trace({ cfText: text }, 'cf: _receivedText: start');
var item = JSON.parse(text);
var isBootstrap = item.hasOwnProperty('bootstrapRoute');
if (self.initBootstrap && isBootstrap) {
self.log.trace('cf: bootstrap');
self.initBootstrap = false;
self.emit('bootstrap', item);
} else if (!self.initBootstrap) {
self.log.trace({ cfItem: item },
'cf: change item received');
self.push(item);
} else {
self.log.error(
'Invalid socket state! text: %s initBootstrap: %s',
text,
self.initBootstrap);
self.emit('error');
}
// Handles publisher change feed items and bootstrap response.
// The only valid response from the publisher when the listener
// is in the initBootstrap state, is a bootstrap object. From
// that point forward it is expected that all items are change
// feed items
wsc.on('text', function _receivedText(text) {
log.trace(
{ cfText: text },
'cf: _receivedText: start');
var item = JSON.parse(text);
var isBootstrap = item.hasOwnProperty('bootstrapRoute');
if (self.initBootstrap && isBootstrap) {
log.trace('cf: bootstrap');
self.initBootstrap = false;
self.emit('bootstrap', item);
expBackoff.reset();
} else if (!self.initBootstrap) {
log.trace({ cfItem: item }, 'cf: change item received');
self.push(item);
} else {
log.error(
'Invalid socket state! text: %s initBootstrap: %s',
text,
self.initBootstrap);
self.emit('error');
}
});
});
});
});
client.on('attempt', function _attempt() {
attempt = attempt + 1;
self.log.warn('Changefeed unavailable -- backoff attempt: %s', attempt);
client.on('attempt', function _attempt() {
attempt = attempt + 1;
log.warn('cf: restify backoff attempt: %s', attempt);
});
});
expBackoff.backoff();
};

@@ -202,0 +219,0 @@

{
"name": "changefeed",
"description": "Change Feed Modules",
"version": "1.1.3",
"version": "1.1.4",
"author": "Joyent (joyent.com)",

@@ -6,0 +6,0 @@ "private": false,

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc