mqlight-dev
Advanced tools
Comparing version 0.1.2014060401 to 0.1.2014060800
370
mqlight.js
@@ -64,2 +64,3 @@ /* %Z% %W% %I% %E% %U% */ | ||
var url = require('url'); | ||
var http = require('http'); | ||
@@ -117,2 +118,7 @@ var validClientIdChars = | ||
/** The connection retry interval in milliseconds. */ | ||
CONNECT_RETRY_INTERVAL = 10000; | ||
if (process.env.NODE_ENV === 'unittest') CONNECT_RETRY_INTERVAL = 0; | ||
/** | ||
@@ -273,3 +279,83 @@ * Constructs a new Client object in the disconnected state. | ||
/** | ||
* Function to take a single HTTP URL and using the JSON retrieved from it to | ||
* return an array of service URLs. | ||
* | ||
* @param {String} | ||
* serviceUrl - Required; an HTTP address to retrieve service info | ||
* from. | ||
* @return {Array} a list of AMQP service URLs retrieved from the URL. | ||
* @throws TypeError | ||
* If serviceUrl is not a string. | ||
* @throws Error | ||
* if an unsupported or invalid URL specified. | ||
*/ | ||
var getHttpServiceFunction = function(serviceUrl) { | ||
log.entry('getHttpServiceFunction', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'serviceUrl:', serviceUrl); | ||
if (typeof serviceUrl !== 'string') { | ||
throw new TypeError('serviceUrl must be a string type'); | ||
} | ||
var httpServiceFunction = function(callback) { | ||
log.entry('httpServiceFunction', log.NO_CLIENT_ID); | ||
var req = http.request(serviceUrl, function(res) { | ||
log.entry('httpServiceFunction.req.callback', log.NO_CLIENT_ID); | ||
var data = ''; | ||
res.setEncoding('utf8'); | ||
res.on('data', function(chunk) { | ||
data += chunk; | ||
}); | ||
res.on('end', function() { | ||
log.entry('httpServiceFunction.req.on.end.callback', log.NO_CLIENT_ID); | ||
if (res.statusCode === 200) { | ||
try { | ||
var obj = JSON.parse(data); | ||
log.entry('httpServiceFunction.callback', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'service:', obj.service); | ||
callback(undefined, obj.service); | ||
log.exit('httpServiceFunction.callback', log.NO_CLIENT_ID, null); | ||
} catch (err) { | ||
log.log('error', log.NO_CLIENT_ID, err); | ||
log.entry('httpServiceFunction.callback', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'err:', err); | ||
callback(err); | ||
log.exit('httpServiceFunction.callback', log.NO_CLIENT_ID, null); | ||
} | ||
} else { | ||
var err = new Error(data); | ||
log.log('error', log.NO_CLIENT_ID, err); | ||
log.entry('httpServiceFunction.callback', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'err:', err); | ||
callback(err); | ||
log.exit('httpServiceFunction.callback', log.NO_CLIENT_ID, null); | ||
} | ||
log.exit('httpServiceFunction.req.on.end.callback', log.NO_CLIENT_ID, | ||
null); | ||
}); | ||
log.exit('httpServiceFunction.req.callback', log.NO_CLIENT_ID, null); | ||
}).on('error', function(err) { | ||
log.log('error', log.NO_CLIENT_ID, err); | ||
log.entry('httpServiceFunction.callback', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'err:', err); | ||
callback(err); | ||
log.exit('httpServiceFunction.callback', log.NO_CLIENT_ID, null); | ||
}); | ||
req.setTimeout(5000); | ||
req.end(); | ||
log.exit('httpServiceFunction', log.NO_CLIENT_ID, null); | ||
}; | ||
log.exit('getHttpServiceFunction', log.NO_CLIENT_ID, httpServiceFunction); | ||
return httpServiceFunction; | ||
}; | ||
/** | ||
@@ -326,3 +412,9 @@ * Represents an MQ Light client instance. | ||
serviceFunction = service; | ||
} else { | ||
} else if (typeof service === 'string') { | ||
var serviceUrl = url.parse(service); | ||
if (serviceUrl.protocol === 'http:' || serviceUrl.protocol === 'https:') { | ||
serviceFunction = getHttpServiceFunction(service); | ||
} | ||
} | ||
if (!serviceFunction) { | ||
serviceList = generateServiceList(service); | ||
@@ -377,2 +469,4 @@ } | ||
this.service = undefined; | ||
//the first connect, set to false after connect and back to true on disconnect | ||
this.firstConnect = true; | ||
@@ -448,5 +542,6 @@ // List of message subscriptions | ||
if (currentState === 'disconnecting') { | ||
process.nextTick(function() { | ||
setImmediate(function() { | ||
stillDisconnecting(client, callback); | ||
}); | ||
return; | ||
} else { | ||
@@ -468,73 +563,21 @@ process.nextTick(function() { | ||
// Obtain the list of services for connect | ||
var serviceList; | ||
try { | ||
if (client.serviceFunction) { | ||
var serviceFunction = client.serviceFunction(); | ||
serviceList = generateServiceList(serviceFunction); | ||
} else { | ||
serviceList = client.serviceList; | ||
} | ||
} catch (err) { | ||
// if there is an error getting the service list then ensure state is | ||
// disconnected | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (callback) { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
// Obtain the list of services for connect and connect to one of the | ||
// services, retrying until a connection can be established | ||
if (client.serviceFunction instanceof Function) { | ||
client.serviceFunction(function(err, service) { | ||
if (err) { | ||
log.entry('Client.connect.performConnect.serviceFunction.callback', | ||
client.id); | ||
callback(err); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
log.exit('Client.connect.performConnect.serviceFunction.callback', | ||
client.id, null); | ||
} else { | ||
client.serviceList = generateServiceList(service); | ||
client.connectToService(callback); | ||
} | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
return; | ||
} else { | ||
client.connectToService(callback); | ||
} | ||
// Connect to one of the listed services | ||
try { | ||
// TODO - select a service (for now just select the first one) | ||
var service = serviceList[0]; | ||
client.messenger.connect(service); | ||
client.service = service; | ||
} catch (err) { | ||
// if there is an error connecting to the service then ensure state is | ||
// disconnected | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (callback) { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback(err); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
} | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
return; | ||
} | ||
// Indicate that we're connected | ||
client.state = 'connected'; | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'connected'); | ||
client.emit('connected'); | ||
}); | ||
if (callback) { | ||
if (!(callback instanceof Function)) { | ||
throw new TypeError('callback must be a function'); | ||
} | ||
process.nextTick(function() { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback.apply(client); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
}); | ||
} | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
@@ -550,3 +593,3 @@ return; | ||
if (client.getState() === 'disconnecting') { | ||
process.nextTick(function() { | ||
setImmediate(function() { | ||
stillDisconnecting(client, callback); | ||
@@ -571,5 +614,119 @@ }); | ||
/** | ||
* Function to connect to the service, trys each available service | ||
* in turn. If none can connect it emits an error, waits and | ||
* attempts to connect again. Callback happens once a successful | ||
* connect/reconnect occurs. | ||
* @param {connectCallback} | ||
* - callback called when connect/reconnect happens | ||
*/ | ||
Client.prototype.connectToService = function(callback) { | ||
var client = this; | ||
log.entry('Client.connectToService', client.id); | ||
if (client.getState() === 'diconnecting' || | ||
client.getState() === 'diconnected') { | ||
if (callback) { | ||
log.entry('Client.connectToService.callback', client.id); | ||
callback(new Error('connect aborted due to disconnect')); | ||
log.exit('Client.connectToService.callback', client.id, null); | ||
} | ||
log.exit('Client.connectToService', client.id, null); | ||
return; | ||
} | ||
var connected = false; | ||
var error; | ||
// Try each service in turn until we can successfully connect, or exhaust | ||
// the list | ||
var serviceList = client.serviceList; | ||
if (!error) { | ||
for (var i = 0; i < serviceList.length; i++) { | ||
try { | ||
var service = serviceList[i]; | ||
log.log('data', client.id, 'attempting connect to: ' + service); | ||
var rc = client.messenger.connect(service); | ||
if (rc) { | ||
error = new Error(client.messenger.getLastErrorText()); | ||
log.log('data', client.id, 'failed to connect to: ' + service + | ||
' due to error: ' + error); | ||
} else { | ||
log.log('data', client.id, 'successfully connected to: ' + | ||
service); | ||
client.service = service; | ||
connected = true; | ||
break; | ||
} | ||
} catch (err) { | ||
// Should not get here. | ||
// Means that messenger.connect has been called in an invalid way | ||
error = err; | ||
log.ffdc('Client.connectToService', 'ffdc001', client.id, err); | ||
throw err; | ||
} | ||
} | ||
} | ||
// If we've successfully connected then we're done, otherwise we'll retry | ||
if (connected) { | ||
// Indicate that we're connected | ||
client.state = 'connected'; | ||
var statusClient; | ||
if (client.firstConnect) { | ||
statusClient = 'connected'; | ||
client.firstConnect = false; | ||
} else { | ||
statusClient = 'reconnected'; | ||
} | ||
process.nextTick(function() { | ||
log.log('emit', client.id, statusClient); | ||
client.emit(statusClient); | ||
}); | ||
if (callback) { | ||
process.nextTick(function() { | ||
log.entry('Client.connectToService.callback', client.id); | ||
callback.apply(client); | ||
log.exit('Client.connectToService.callback', client.id, null); | ||
}); | ||
} | ||
} else { | ||
// We've tried all services without success. Pause for a while before | ||
// trying again | ||
// TODO 10 seconds is an arbitrary value, need to review if this is | ||
// appropriate. Timeout should be adjusted based on reconnect algo. | ||
log.log('emit', client.id, 'error', error); | ||
client.emit('error', error); | ||
client.state = 'retrying'; | ||
log.log('data', client.id, 'trying connect again after 10 seconds'); | ||
var retry = function() { client.connectToService(callback); }; | ||
// if client is using serviceFunction, re-generate the list of services | ||
// TODO: merge these copy & paste | ||
if (client.serviceFunction instanceof Function) { | ||
client.serviceFunction(function(err, service) { | ||
if (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
} else { | ||
client.serviceList = generateServiceList(service); | ||
setTimeout(retry, CONNECT_RETRY_INTERVAL); | ||
} | ||
}); | ||
} else { | ||
setTimeout(retry, CONNECT_RETRY_INTERVAL); | ||
} | ||
} | ||
log.exit('Client.connectToService', client.id, null); | ||
return; | ||
}; | ||
/** | ||
* @param {function(object)} | ||
* disconnectCallback - callback, passed an error object if someting | ||
* disconnectCallback - callback, passed an error object if something | ||
* goes wrong. | ||
@@ -626,2 +783,3 @@ * @param {String} | ||
client.emit('disconnected'); | ||
client.firstConnect = true; | ||
}); | ||
@@ -674,2 +832,53 @@ if (callback) { | ||
/** | ||
* Reconnects the client to the MQ Light service, implicitly closing any | ||
* subscriptions that the client has open. The 'reconnected' event will be | ||
* emitted once the client has reconnected. | ||
* <p> | ||
* TODO: Flesh this out for reconnects after a connection is broken. | ||
* | ||
* @return {Object} The instance of client that it is invoked on - allowing | ||
* for chaining of other method calls on the client object. | ||
*/ | ||
Client.prototype.reconnect = function() { | ||
var client = this; | ||
log.entry('Client.reconnect', client.id); | ||
// stop the messenger to free the object then attempt a reconnect | ||
client.messenger.stop(); | ||
// clear the subscriptions list, if the cause of the reconnect happens during | ||
// check for messages we need a 0 length so it will check once reconnected. | ||
// TODO: need to resubscribe to the existing subs so this logic may change | ||
while (client.subscriptions.length > 0) { | ||
client.subscriptions.pop(); | ||
} | ||
// also clear any left over outstanding sends | ||
while (client.outstandingSends.length > 0) { | ||
client.outstandingSends.pop(); | ||
} | ||
// if client is using serviceFunction, re-generate the list of services | ||
// TODO: merge these copy & paste | ||
if (client.serviceFunction instanceof Function) { | ||
client.serviceFunction(function(err, service) { | ||
if (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
} else { | ||
client.serviceList = generateServiceList(service); | ||
client.connectToService(undefined); | ||
} | ||
}); | ||
} else { | ||
setImmediate(function() { | ||
client.connectToService.apply(client, undefined); | ||
}); | ||
} | ||
log.exit('Client.reconnect', client.id, client); | ||
return client; | ||
}; | ||
/** | ||
* @return {String} The identifier associated with the client. This will | ||
@@ -963,3 +1172,2 @@ * either be: a) the identifier supplied as the id property of the options | ||
if (index >= 0) client.outstandingSends.splice(index, 1); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
@@ -971,6 +1179,5 @@ if (callback) { | ||
} | ||
if (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
} | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
client.reconnect(); | ||
}); | ||
@@ -1103,6 +1310,9 @@ } | ||
} else { | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'message', data, delivery); | ||
log.log('emit', client.id, 'message', data, delivery); | ||
try { | ||
client.emit('message', data, delivery); | ||
}); | ||
} catch (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
} | ||
} | ||
@@ -1123,6 +1333,6 @@ if (qos === exports.QOS_AT_MOST_ONCE) { | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
client.reconnect(); | ||
}); | ||
@@ -1328,3 +1538,3 @@ } | ||
client.emit('error', err); | ||
client.disconnect(); | ||
client.reconnect(); | ||
} | ||
@@ -1331,0 +1541,0 @@ }); |
{ | ||
"name": "mqlight-dev", | ||
"version": "0.1.2014060401", | ||
"version": "0.1.2014060800", | ||
"description": "IBM MQ Light Client Module", | ||
@@ -5,0 +5,0 @@ "main": "mqlight.js", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
8734226
2366
22