changefeed
Advanced tools
Comparing version 1.1.3 to 1.1.4
@@ -12,2 +12,3 @@ /* | ||
var mod_assert = require('assert-plus'); | ||
var mod_backoff = require('backoff'); | ||
var mod_bunyan = require('bunyan'); | ||
@@ -36,3 +37,3 @@ var mod_restify = require('restify-clients'); | ||
* retries: Infinity | ||
* } | ||
* }, | ||
* log: new mod_bunyan({ | ||
@@ -85,9 +86,4 @@ * name: 'my_logger', | ||
self.changeKind = options.changeKind; | ||
self.backoff_opts = options.backoff; | ||
if (options.backoff) { | ||
self.minTimeout = options.minTimeout || 1000; | ||
self.maxTimeout = options.maxTimetout || Infinity; | ||
self.retries = options.retries || Infinity; | ||
} | ||
self.initBootstrap = true; | ||
@@ -105,2 +101,3 @@ } | ||
var self = this; | ||
var log = self.log; | ||
var registration = { | ||
@@ -111,12 +108,8 @@ instance: self.instance, | ||
}; | ||
self.log.trace({ cfRegistration: registration }, 'register: start'); | ||
log.trace({ cfRegistration: registration }, 'register: start'); | ||
var wskey = shed.generateKey(); | ||
var clientOpts = { | ||
log: self.log, | ||
log: log, | ||
url: self.url, | ||
retry: { | ||
minTimeout: self.minTimeout, | ||
maxTimeout: self.maxTimeout, | ||
retries: self.retries | ||
} | ||
retry: self.backoff_opts | ||
}; | ||
@@ -134,68 +127,92 @@ | ||
var client = mod_restify.createClient(clientOpts); | ||
client.get(upgradeOpts, function (err, res, socket, head) { | ||
if (err) { | ||
self.log.error('err: %j', err); | ||
self.emit('error'); | ||
var expBackoff_opts = self.backoff_opts ? { | ||
initialDelay: self.backoff_opts.minTimeout, | ||
maxDelay: self.maxTimeout | ||
} : null; | ||
var expBackoff = mod_backoff.exponential(expBackoff_opts); | ||
if (self.backoff_opts) { | ||
log.info('cf: backoff enabled'); | ||
expBackoff.failAfter(self.backoff_opts.retries); | ||
} | ||
expBackoff.on('backoff', function _backoff(number, delay) { | ||
if (number > 0) { | ||
log.warn('Backing off -- retry count: %s delay: %s', number, delay); | ||
} | ||
}); | ||
expBackoff.on('ready', function _ready(number, delay) { | ||
client.get(upgradeOpts, function (err, res, socket, head) { | ||
if (err) { | ||
log.error('err: %j', err); | ||
self.emit('error'); | ||
} | ||
res.once('upgradeResult', function (err2, res2, socket2, head2) { | ||
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey); | ||
res.once('upgradeResult', function (err2, res2, socket2, head2) { | ||
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey); | ||
// Send registration | ||
try { | ||
wsc.send(JSON.stringify(registration)); | ||
} catch (ex) { | ||
self.log.error('ex: %s', ex.message); | ||
} | ||
// Send registration | ||
try { | ||
wsc.send(JSON.stringify(registration)); | ||
} catch (ex) { | ||
log.error('ex: %s', ex.message); | ||
} | ||
var heartbeat = setInterval(function _poll() { | ||
self.log.trace('cf: _poll: start'); | ||
wsc.send('heartbeat'); | ||
}, pollInterval); | ||
var heartbeat = setInterval(function _poll() { | ||
log.trace('cf: _poll: start'); | ||
wsc.send('heartbeat'); | ||
}, pollInterval); | ||
wsc.on('end', function _end() { | ||
self.log.trace('cf: _end: start'); | ||
self.emit('connection-end'); | ||
self.log.info('cf: websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
wsc.on('end', function _end() { | ||
log.trace('cf: _end: start'); | ||
self.emit('connection-end'); | ||
log.info('cf: websocket ended'); | ||
clearInterval(heartbeat); | ||
if (self.backoff_opts) { | ||
expBackoff.backoff(); | ||
} | ||
}); | ||
wsc.on('connectionReset', function _connectionReset() { | ||
self.log.trace('cf: _connectionReset: start'); | ||
self.emit('connection-end'); | ||
self.log.info('cf: websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
wsc.on('connectionReset', function _connectionReset() { | ||
log.trace('cf: _connectionReset: start'); | ||
self.emit('connection-end'); | ||
log.info('cf: websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
// Handles publisher change feed items and bootstrap response. | ||
// The only valid response from the publisher when the listener is | ||
// in the initBootstrap state, is a bootstrap object. From that | ||
// point forward it is expected that all items are change feed items | ||
wsc.on('text', function _receivedText(text) { | ||
self.log.trace({ cfText: text }, 'cf: _receivedText: start'); | ||
var item = JSON.parse(text); | ||
var isBootstrap = item.hasOwnProperty('bootstrapRoute'); | ||
if (self.initBootstrap && isBootstrap) { | ||
self.log.trace('cf: bootstrap'); | ||
self.initBootstrap = false; | ||
self.emit('bootstrap', item); | ||
} else if (!self.initBootstrap) { | ||
self.log.trace({ cfItem: item }, | ||
'cf: change item received'); | ||
self.push(item); | ||
} else { | ||
self.log.error( | ||
'Invalid socket state! text: %s initBootstrap: %s', | ||
text, | ||
self.initBootstrap); | ||
self.emit('error'); | ||
} | ||
// Handles publisher change feed items and bootstrap response. | ||
// The only valid response from the publisher when the listener | ||
// is in the initBootstrap state, is a bootstrap object. From | ||
// that point forward it is expected that all items are change | ||
// feed items | ||
wsc.on('text', function _receivedText(text) { | ||
log.trace( | ||
{ cfText: text }, | ||
'cf: _receivedText: start'); | ||
var item = JSON.parse(text); | ||
var isBootstrap = item.hasOwnProperty('bootstrapRoute'); | ||
if (self.initBootstrap && isBootstrap) { | ||
log.trace('cf: bootstrap'); | ||
self.initBootstrap = false; | ||
self.emit('bootstrap', item); | ||
expBackoff.reset(); | ||
} else if (!self.initBootstrap) { | ||
log.trace({ cfItem: item }, 'cf: change item received'); | ||
self.push(item); | ||
} else { | ||
log.error( | ||
'Invalid socket state! text: %s initBootstrap: %s', | ||
text, | ||
self.initBootstrap); | ||
self.emit('error'); | ||
} | ||
}); | ||
}); | ||
}); | ||
}); | ||
client.on('attempt', function _attempt() { | ||
attempt = attempt + 1; | ||
self.log.warn('Changefeed unavailable -- backoff attempt: %s', attempt); | ||
client.on('attempt', function _attempt() { | ||
attempt = attempt + 1; | ||
log.warn('cf: restify backoff attempt: %s', attempt); | ||
}); | ||
}); | ||
expBackoff.backoff(); | ||
}; | ||
@@ -202,0 +219,0 @@ |
{ | ||
"name": "changefeed", | ||
"description": "Change Feed Modules", | ||
"version": "1.1.3", | ||
"version": "1.1.4", | ||
"author": "Joyent (joyent.com)", | ||
@@ -6,0 +6,0 @@ "private": false, |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
46752
596