Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

changefeed

Package Overview
Dependencies
Maintainers
3
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

changefeed - npm Package Compare versions

Comparing version 1.0.5 to 1.1.0

CHANGES.md

167

lib/listener.js

@@ -11,8 +11,8 @@ /*

var mod_assert = require('assert');
var mod_assertplus = require('assert');
var mod_assert = require('assert-plus');
var mod_bunyan = require('bunyan');
var mod_http = require('http');
var mod_restify = require('restify');
var mod_util = require('util');
var Readable = require('stream').Readable;
var sprintf = require('sprintf-js').sprintf;
var Watershed = require('watershed').Watershed;

@@ -22,2 +22,6 @@

var pollInterval = 27817;
var attempt = 0;
var HTTP_MODIFIER = 'http://%s';
var PORT_MODIFIER = ':%s';
var ROOT_PATH = '/changefeeds';

@@ -29,2 +33,7 @@ /*

* var options = {
* backoff: {
* maxTimeout: Infinity,
* minTimeout: 10,
* retries: Infinity
* }
* log: new mod_bunyan({

@@ -35,4 +44,3 @@ * name: 'my_logger',

* }),
* endpoint: '127.0.0.1',
* port: 80,
* url: 'http://127.0.0.1',
* instance: '<UUID>',

@@ -49,7 +57,29 @@ * service: '<service name>',

mod_assert.object(options, 'options');
mod_assert.object(options.log, 'options.log');
mod_assert.string(options.instance, 'options.instance');
mod_assert.string(options.service, 'options.service');
mod_assert.object(options.changeKind, 'options.changeKind');
mod_assert.string(
options.changeKind.resource,
'options.changeKind.resource');
mod_assert.arrayOfString(
options.changeKind.subResources,
'options.changeKind.subResources');
Readable.call(self, { objectMode: true });
// retain backwards compatibility w/ non restify-clients listeners
if (options.endpoint && options.port) {
self.url = sprintf(HTTP_MODIFIER, options.endpoint);
if (options.port !== 80) {
self.url += sprintf(PORT_MODIFIER, options.port);
}
} else {
mod_assert.string(options.url, 'options.url');
self.url = options.url;
}
self.log = options.log;
self.endpoint = options.endpoint;
self.publisherPort = options.port;
self.instance = options.instance;

@@ -59,2 +89,8 @@ self.service = options.service;

if (options.backoff) {
self.minTimeout = options.minTimeout || 1000;
self.maxTimeout = options.maxTimetout || Infinity;
self.retries = options.retries || Infinity;
}
self.initBootstrap = true;

@@ -79,5 +115,14 @@ }

var wskey = shed.generateKey();
var options = {
port: self.publisherPort,
hostname: self.endpoint,
var clientOpts = {
log: self.log,
url: self.url,
retry: {
minTimeout: self.minTimeout,
maxTimeout: self.maxTimeout,
retries: self.retries
}
};
var upgradeOpts = {
path: self.path,
headers: {

@@ -89,55 +134,67 @@ 'connection': 'upgrade',

};
var req = mod_http.request(options);
req.end();
req.on('upgrade', function _upgrade(res, socket, head) {
self.log.trace('_upgrade: start');
var wsc = self.wsc = shed.connect(res, socket, head, wskey);
// Send registration
try {
wsc.send(JSON.stringify(registration));
} catch (ex) {
self.log.error('ex: %s', ex.message);
var client = mod_restify.createClient(clientOpts);
client.get(upgradeOpts, function (err, res, socket, head) {
if (err) {
self.log.error('err: %j', err);
self.emit('error');
}
var heartbeat = setInterval(function _poll() {
self.log.trace('_poll: start');
wsc.send('heartbeat');
}, pollInterval);
res.once('upgradeResult', function (err2, res2, socket2, head2) {
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey);
wsc.on('end', function _end() {
self.log.trace('_end: start');
self.emit('connection-end');
self.log.info('websocket ended');
clearInterval(heartbeat);
});
// Send registration
try {
wsc.send(JSON.stringify(registration));
} catch (ex) {
self.log.error('ex: %s', ex.message);
}
wsc.on('connectionReset', function _connectionReset() {
self.log.trace('_connectionReset: start');
self.emit('connection-end');
self.log.info('websocket ended');
clearInterval(heartbeat);
});
var heartbeat = setInterval(function _poll() {
self.log.trace('_poll: start');
wsc.send('heartbeat');
}, pollInterval);
// Handles publisher change feed items and bootstrap response.
// The only valid response from the publisher when the listener is in
// the initBootstrap state, is a bootstrap object. From that point
// forward it is expected that all items are change feed items.
wsc.on('text', function _recieveRegistration(text) {
self.log.trace({ cfText: text }, '_recieveRegistration: start');
var item = JSON.parse(text);
if (self.initBootstrap && item.hasOwnProperty('bootstrapRoute')) {
self.initBootstrap = false;
self.emit('bootstrap', item);
} else if (!self.initBootstrap) {
self.push(item);
} else {
self.log.error(
'Invalid socket state! text: %s initBootstrap: %s',
text,
self.initBootstrap);
self.emit('error');
}
wsc.on('end', function _end() {
self.log.trace('_end: start');
self.emit('connection-end');
self.log.info('websocket ended');
clearInterval(heartbeat);
});
wsc.on('connectionReset', function _connectionReset() {
self.log.trace('_connectionReset: start');
self.emit('connection-end');
self.log.info('websocket ended');
clearInterval(heartbeat);
});
// Handles publisher change feed items and bootstrap response.
// The only valid response from the publisher when the listener is
// in the initBootstrap state, is a bootstrap object. From that
// point forward it is expected that all items are change feed items
wsc.on('text', function _recieveRegistration(text) {
self.log.trace({ cfText: text }, '_recieveRegistration: start');
var item = JSON.parse(text);
var isBootstrap = item.hasOwnProperty('bootstrapRoute');
if (self.initBootstrap && isBootstrap) {
self.initBootstrap = false;
self.emit('bootstrap', item);
} else if (!self.initBootstrap) {
self.push(item);
} else {
self.log.error(
'Invalid socket state! text: %s initBootstrap: %s',
text,
self.initBootstrap);
self.emit('error');
}
});
});
});
client.on('attempt', function _attempt() {
attempt = attempt + 1;
self.log.warn('Changefeed unavailable -- backoff attempt: %s', attempt);
});
};

@@ -144,0 +201,0 @@

@@ -13,2 +13,3 @@ /*

var mod_assert = require('assert');
var mod_backoff = require('backoff');
var mod_bunyan = require('bunyan');

@@ -30,2 +31,7 @@ var mod_libuuid = require('libuuid');

* var options = {
* backoff: {
* maxTimeout: Infinity,
* minTimeout: 10,
* retries: Infinity
* },
* log: new Bunyan({

@@ -36,2 +42,3 @@ * name: 'publisher_test',

* }),
* maxAge: 28800,
* moray: {

@@ -49,4 +56,3 @@ * bucketName: 'change_feed_bucket',

* restifyServer: server,
* resources: resources,
* maxAge: 28800
* resources: resources
* };

@@ -69,2 +75,12 @@ */

if (options.backoff) {
self.minTimeout = options.backoff.minTimeout || 10;
self.maxTimeout = options.backoff.maxTimeout || Infinity;
self.retries = options.backoff.retries || Infinity;
} else {
self.minTimeout = 10;
self.maxTimeout = Infinity;
self.retries = Infinity;
}
var morayOptions = options.moray;

@@ -103,2 +119,9 @@ self.morayHost = morayOptions.host;

var expBackoff = mod_backoff.exponential({
initialDelay: self.minTimeout,
maxDelay: self.maxTimeout
});
expBackoff.failAfter(self.retries);
morayClient.on('connect', function _morayConnect() {

@@ -112,11 +135,27 @@ log.trace('_morayConnect: started');

});
// Auto setup the change feed moray bucket
self._setupBucket(function _bucketSetupError(err) {
if (err) {
log.error({ err: err }, 'Bucket was not loaded');
} else {
log.info('Bucket successfully loaded');
self.emit('moray-ready');
}
expBackoff.on('backoff', function _backoff(number, delay) {
log.warn('Backing off -- retry count: %s delay: %s', number, delay);
});
expBackoff.on('ready', function _ready(number, delay) {
// Auto setup the change feed moray bucket
self._setupBucket(function _bucketSetupError(err) {
if (err) {
log.error({ err: err }, 'Bucket was not loaded');
expBackoff.backoff();
} else {
log.info('Bucket successfully loaded');
self.emit('moray-ready');
expBackoff.reset();
}
});
});
expBackoff.on('fail', function _fail() {
log.error('backoff failed');
self.emit('moray-fail');
});
expBackoff.backoff();
});

@@ -123,0 +162,0 @@

{
"name": "changefeed",
"description": "Change Feed Modules",
"version": "1.0.5",
"version": "1.1.0",
"author": "Joyent (joyent.com)",

@@ -9,2 +9,4 @@ "private": false,

"dependencies": {
"assert-plus": "0.2.0",
"backoff": "2.4.1",
"bunyan": "1.5.1",

@@ -14,3 +16,4 @@ "libuuid": "0.1.4",

"posix-getopt": "1.2.0",
"restify": "4.0.3",
"restify-clients": "1.1.1",
"sprintf-js": "1.0.3",
"vasync": "1.6.3",

@@ -21,3 +24,4 @@ "vstream": "0.1.0",

"devDependencies": {
"tape": "4.2.2"
"tape": "4.2.2",
"restify": "4.0.3"
},

@@ -24,0 +28,0 @@ "optionalDependencies": {

@@ -58,2 +58,7 @@ <!--

var options = {
backoff: {
maxTimeout: Infinity,
minTimeout: 10,
retries: Infinity
},
log: mod_bunyan.createLogger({

@@ -64,2 +69,3 @@ name: 'publisher_test',

}),
maxAge: 2000,
moray: {

@@ -77,4 +83,3 @@ bucketName: 'pub_change_bucket',

restifyServer: server,
resources: resources,
maxAge: 2000
resources: resources
};

@@ -92,2 +97,7 @@

var options = {
backoff: {
maxTimeout: Infinity,
minTimeout: 10,
retries: Infinity
},
log: mod_bunyan.createLogger({

@@ -98,3 +108,3 @@ name: 'listener_test',

}),
endpoint: '127.0.0.1',
url: 'http://localhost',
instance: 'uuid goes here',

@@ -101,0 +111,0 @@ service: 'tcns',

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc