changefeed
Advanced tools
Comparing version 1.0.5 to 1.1.0
@@ -11,8 +11,8 @@ /* | ||
var mod_assert = require('assert'); | ||
var mod_assertplus = require('assert'); | ||
var mod_assert = require('assert-plus'); | ||
var mod_bunyan = require('bunyan'); | ||
var mod_http = require('http'); | ||
var mod_restify = require('restify'); | ||
var mod_util = require('util'); | ||
var Readable = require('stream').Readable; | ||
var sprintf = require('sprintf-js').sprintf; | ||
var Watershed = require('watershed').Watershed; | ||
@@ -22,2 +22,6 @@ | ||
var pollInterval = 27817; | ||
var attempt = 0; | ||
var HTTP_MODIFIER = 'http://%s'; | ||
var PORT_MODIFIER = ':%s'; | ||
var ROOT_PATH = '/changefeeds'; | ||
@@ -29,2 +33,7 @@ /* | ||
* var options = { | ||
* backoff: { | ||
* maxTimeout: Infinity, | ||
* minTimeout: 10, | ||
* retries: Infinity | ||
* } | ||
* log: new mod_bunyan({ | ||
@@ -35,4 +44,3 @@ * name: 'my_logger', | ||
* }), | ||
* endpoint: '127.0.0.1', | ||
* port: 80, | ||
* url: 'http://127.0.0.1', | ||
* instance: '<UUID>', | ||
@@ -49,7 +57,29 @@ * service: '<service name>', | ||
mod_assert.object(options, 'options'); | ||
mod_assert.object(options.log, 'options.log'); | ||
mod_assert.string(options.instance, 'options.instance'); | ||
mod_assert.string(options.service, 'options.service'); | ||
mod_assert.object(options.changeKind, 'options.changeKind'); | ||
mod_assert.string( | ||
options.changeKind.resource, | ||
'options.changeKind.resource'); | ||
mod_assert.arrayOfString( | ||
options.changeKind.subResources, | ||
'options.changeKind.subResources'); | ||
Readable.call(self, { objectMode: true }); | ||
// retain backwards compatibility w/ non restify-clients listeners | ||
if (options.endpoint && options.port) { | ||
self.url = sprintf(HTTP_MODIFIER, options.endpoint); | ||
if (options.port !== 80) { | ||
self.url += sprintf(PORT_MODIFIER, options.port); | ||
} | ||
} else { | ||
mod_assert.string(options.url, 'options.url'); | ||
self.url = options.url; | ||
} | ||
self.log = options.log; | ||
self.endpoint = options.endpoint; | ||
self.publisherPort = options.port; | ||
self.instance = options.instance; | ||
@@ -59,2 +89,8 @@ self.service = options.service; | ||
if (options.backoff) { | ||
self.minTimeout = options.minTimeout || 1000; | ||
self.maxTimeout = options.maxTimetout || Infinity; | ||
self.retries = options.retries || Infinity; | ||
} | ||
self.initBootstrap = true; | ||
@@ -79,5 +115,14 @@ } | ||
var wskey = shed.generateKey(); | ||
var options = { | ||
port: self.publisherPort, | ||
hostname: self.endpoint, | ||
var clientOpts = { | ||
log: self.log, | ||
url: self.url, | ||
retry: { | ||
minTimeout: self.minTimeout, | ||
maxTimeout: self.maxTimeout, | ||
retries: self.retries | ||
} | ||
}; | ||
var upgradeOpts = { | ||
path: self.path, | ||
headers: { | ||
@@ -89,55 +134,67 @@ 'connection': 'upgrade', | ||
}; | ||
var req = mod_http.request(options); | ||
req.end(); | ||
req.on('upgrade', function _upgrade(res, socket, head) { | ||
self.log.trace('_upgrade: start'); | ||
var wsc = self.wsc = shed.connect(res, socket, head, wskey); | ||
// Send registration | ||
try { | ||
wsc.send(JSON.stringify(registration)); | ||
} catch (ex) { | ||
self.log.error('ex: %s', ex.message); | ||
var client = mod_restify.createClient(clientOpts); | ||
client.get(upgradeOpts, function (err, res, socket, head) { | ||
if (err) { | ||
self.log.error('err: %j', err); | ||
self.emit('error'); | ||
} | ||
var heartbeat = setInterval(function _poll() { | ||
self.log.trace('_poll: start'); | ||
wsc.send('heartbeat'); | ||
}, pollInterval); | ||
res.once('upgradeResult', function (err2, res2, socket2, head2) { | ||
var wsc = self.wsc = shed.connect(res2, socket2, head2, wskey); | ||
wsc.on('end', function _end() { | ||
self.log.trace('_end: start'); | ||
self.emit('connection-end'); | ||
self.log.info('websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
// Send registration | ||
try { | ||
wsc.send(JSON.stringify(registration)); | ||
} catch (ex) { | ||
self.log.error('ex: %s', ex.message); | ||
} | ||
wsc.on('connectionReset', function _connectionReset() { | ||
self.log.trace('_connectionReset: start'); | ||
self.emit('connection-end'); | ||
self.log.info('websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
var heartbeat = setInterval(function _poll() { | ||
self.log.trace('_poll: start'); | ||
wsc.send('heartbeat'); | ||
}, pollInterval); | ||
// Handles publisher change feed items and bootstrap response. | ||
// The only valid response from the publisher when the listener is in | ||
// the initBootstrap state, is a bootstrap object. From that point | ||
// forward it is expected that all items are change feed items. | ||
wsc.on('text', function _recieveRegistration(text) { | ||
self.log.trace({ cfText: text }, '_recieveRegistration: start'); | ||
var item = JSON.parse(text); | ||
if (self.initBootstrap && item.hasOwnProperty('bootstrapRoute')) { | ||
self.initBootstrap = false; | ||
self.emit('bootstrap', item); | ||
} else if (!self.initBootstrap) { | ||
self.push(item); | ||
} else { | ||
self.log.error( | ||
'Invalid socket state! text: %s initBootstrap: %s', | ||
text, | ||
self.initBootstrap); | ||
self.emit('error'); | ||
} | ||
wsc.on('end', function _end() { | ||
self.log.trace('_end: start'); | ||
self.emit('connection-end'); | ||
self.log.info('websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
wsc.on('connectionReset', function _connectionReset() { | ||
self.log.trace('_connectionReset: start'); | ||
self.emit('connection-end'); | ||
self.log.info('websocket ended'); | ||
clearInterval(heartbeat); | ||
}); | ||
// Handles publisher change feed items and bootstrap response. | ||
// The only valid response from the publisher when the listener is | ||
// in the initBootstrap state, is a bootstrap object. From that | ||
// point forward it is expected that all items are change feed items | ||
wsc.on('text', function _recieveRegistration(text) { | ||
self.log.trace({ cfText: text }, '_recieveRegistration: start'); | ||
var item = JSON.parse(text); | ||
var isBootstrap = item.hasOwnProperty('bootstrapRoute'); | ||
if (self.initBootstrap && isBootstrap) { | ||
self.initBootstrap = false; | ||
self.emit('bootstrap', item); | ||
} else if (!self.initBootstrap) { | ||
self.push(item); | ||
} else { | ||
self.log.error( | ||
'Invalid socket state! text: %s initBootstrap: %s', | ||
text, | ||
self.initBootstrap); | ||
self.emit('error'); | ||
} | ||
}); | ||
}); | ||
}); | ||
client.on('attempt', function _attempt() { | ||
attempt = attempt + 1; | ||
self.log.warn('Changefeed unavailable -- backoff attempt: %s', attempt); | ||
}); | ||
}; | ||
@@ -144,0 +201,0 @@ |
@@ -13,2 +13,3 @@ /* | ||
var mod_assert = require('assert'); | ||
var mod_backoff = require('backoff'); | ||
var mod_bunyan = require('bunyan'); | ||
@@ -30,2 +31,7 @@ var mod_libuuid = require('libuuid'); | ||
* var options = { | ||
* backoff: { | ||
* maxTimeout: Infinity, | ||
* minTimeout: 10, | ||
* retries: Infinity | ||
* }, | ||
* log: new Bunyan({ | ||
@@ -36,2 +42,3 @@ * name: 'publisher_test', | ||
* }), | ||
* maxAge: 28800, | ||
* moray: { | ||
@@ -49,4 +56,3 @@ * bucketName: 'change_feed_bucket', | ||
* restifyServer: server, | ||
* resources: resources, | ||
* maxAge: 28800 | ||
* resources: resources | ||
* }; | ||
@@ -69,2 +75,12 @@ */ | ||
if (options.backoff) { | ||
self.minTimeout = options.backoff.minTimeout || 10; | ||
self.maxTimeout = options.backoff.maxTimeout || Infinity; | ||
self.retries = options.backoff.retries || Infinity; | ||
} else { | ||
self.minTimeout = 10; | ||
self.maxTimeout = Infinity; | ||
self.retries = Infinity; | ||
} | ||
var morayOptions = options.moray; | ||
@@ -103,2 +119,9 @@ self.morayHost = morayOptions.host; | ||
var expBackoff = mod_backoff.exponential({ | ||
initialDelay: self.minTimeout, | ||
maxDelay: self.maxTimeout | ||
}); | ||
expBackoff.failAfter(self.retries); | ||
morayClient.on('connect', function _morayConnect() { | ||
@@ -112,11 +135,27 @@ log.trace('_morayConnect: started'); | ||
}); | ||
// Auto setup the change feed moray bucket | ||
self._setupBucket(function _bucketSetupError(err) { | ||
if (err) { | ||
log.error({ err: err }, 'Bucket was not loaded'); | ||
} else { | ||
log.info('Bucket successfully loaded'); | ||
self.emit('moray-ready'); | ||
} | ||
expBackoff.on('backoff', function _backoff(number, delay) { | ||
log.warn('Backing off -- retry count: %s delay: %s', number, delay); | ||
}); | ||
expBackoff.on('ready', function _ready(number, delay) { | ||
// Auto setup the change feed moray bucket | ||
self._setupBucket(function _bucketSetupError(err) { | ||
if (err) { | ||
log.error({ err: err }, 'Bucket was not loaded'); | ||
expBackoff.backoff(); | ||
} else { | ||
log.info('Bucket successfully loaded'); | ||
self.emit('moray-ready'); | ||
expBackoff.reset(); | ||
} | ||
}); | ||
}); | ||
expBackoff.on('fail', function _fail() { | ||
log.error('backoff failed'); | ||
self.emit('moray-fail'); | ||
}); | ||
expBackoff.backoff(); | ||
}); | ||
@@ -123,0 +162,0 @@ |
{ | ||
"name": "changefeed", | ||
"description": "Change Feed Modules", | ||
"version": "1.0.5", | ||
"version": "1.1.0", | ||
"author": "Joyent (joyent.com)", | ||
@@ -9,2 +9,4 @@ "private": false, | ||
"dependencies": { | ||
"assert-plus": "0.2.0", | ||
"backoff": "2.4.1", | ||
"bunyan": "1.5.1", | ||
@@ -14,3 +16,4 @@ "libuuid": "0.1.4", | ||
"posix-getopt": "1.2.0", | ||
"restify": "4.0.3", | ||
"restify-clients": "1.1.1", | ||
"sprintf-js": "1.0.3", | ||
"vasync": "1.6.3", | ||
@@ -21,3 +24,4 @@ "vstream": "0.1.0", | ||
"devDependencies": { | ||
"tape": "4.2.2" | ||
"tape": "4.2.2", | ||
"restify": "4.0.3" | ||
}, | ||
@@ -24,0 +28,0 @@ "optionalDependencies": { |
@@ -58,2 +58,7 @@ <!-- | ||
var options = { | ||
backoff: { | ||
maxTimeout: Infinity, | ||
minTimeout: 10, | ||
retries: Infinity | ||
}, | ||
log: mod_bunyan.createLogger({ | ||
@@ -64,2 +69,3 @@ name: 'publisher_test', | ||
}), | ||
maxAge: 2000, | ||
moray: { | ||
@@ -77,4 +83,3 @@ bucketName: 'pub_change_bucket', | ||
restifyServer: server, | ||
resources: resources, | ||
maxAge: 2000 | ||
resources: resources | ||
}; | ||
@@ -92,2 +97,7 @@ | ||
var options = { | ||
backoff: { | ||
maxTimeout: Infinity, | ||
minTimeout: 10, | ||
retries: Infinity | ||
}, | ||
log: mod_bunyan.createLogger({ | ||
@@ -98,3 +108,3 @@ name: 'listener_test', | ||
}), | ||
endpoint: '127.0.0.1', | ||
url: 'http://localhost', | ||
instance: 'uuid goes here', | ||
@@ -101,0 +111,0 @@ service: 'tcns', |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
45555
9
570
128
1
12
2
+ Addedassert-plus@0.2.0
+ Addedbackoff@2.4.1
+ Addedrestify-clients@1.1.1
+ Addedsprintf-js@1.0.3
+ Addedassert-plus@0.2.0(transitive)
+ Addedbackoff@2.4.1(transitive)
+ Addedlodash@3.10.1(transitive)
+ Addedrestify-clients@1.1.1(transitive)
+ Addedrestify-errors@3.1.0(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedsprintf-js@1.0.3(transitive)
- Removedrestify@4.0.3
- Removedasn1@0.1.11(transitive)
- Removedbackoff@2.5.0(transitive)
- Removedcsv@0.4.6(transitive)
- Removedcsv-generate@0.0.6(transitive)
- Removedcsv-parse@1.3.3(transitive)
- Removedcsv-stringify@0.0.8(transitive)
- Removedctype@0.5.3(transitive)
- Removedescape-regexp-component@1.0.2(transitive)
- Removedformidable@1.2.6(transitive)
- Removedhttp-signature@0.11.0(transitive)
- Removednegotiator@0.5.3(transitive)
- Removedqs@3.1.0(transitive)
- Removedrestify@4.0.3(transitive)
- Removedsemver@4.3.6(transitive)
- Removedspdy@1.32.5(transitive)
- Removedstream-transform@0.1.2(transitive)