mqlight-dev
Advanced tools
Comparing version 0.1.2014051301 to 0.1.2014060401
854
mqlight.js
@@ -20,66 +20,20 @@ /* %Z% %W% %I% %E% %U% */ | ||
/* | ||
* Set up logging to stderr. The level of output is configured by the | ||
* value of the MQLIGHT_NODE_LOG environment variable. The default is | ||
* 'ffdc'. | ||
/** | ||
* Set up logging (to stderr by default). The level of output is | ||
* configured by the value of the MQLIGHT_NODE_LOG environment | ||
* variable. The default is 'ffdc'. | ||
*/ | ||
var moment = require('moment'); | ||
var NO_CLIENT_ID = '*'; | ||
log = function(lvl, id, message) { | ||
if (logger.levels[logger.level] <= logger.levels[lvl]) { | ||
logger.heading = moment(new Date()).format('HH:mm:ss.SSS') + | ||
' [' + process.pid + ']'; | ||
logger.log.apply(this, arguments); | ||
} | ||
}; | ||
log = require('./mqlight-log'); | ||
var logger = require('npmlog'); | ||
logger.addLevel('all', -Infinity, { inverse: true }, 'all '); | ||
logger.addLevel('debug', 800, { fg: 'green', bg: 'black' }, 'debug '); | ||
logger.addLevel('detail', 1000, { fg: 'blue', bg: 'black' }, 'detail '); | ||
logger.addLevel('data', 1500, { fg: 'blue', bg: 'black' }, 'data '); | ||
logger.addLevel('parms', 3000, { fg: 'yellow', bg: 'black' }, 'parms '); | ||
logger.addLevel('exit', 3000, { fg: 'yellow', bg: 'black' }, 'exit '); | ||
logger.addLevel('entry', 3000, { fg: 'yellow', bg: 'black' }, 'entry '); | ||
logger.addLevel('entry_exit', 3000, { fg: 'yellow', bg: 'black' }, | ||
'entry_exit '); | ||
logger.addLevel('ffdc', 10000, { fg: 'red', bg: 'black' }, 'ffdc '); | ||
/** | ||
* The logging level can be set programmatically by calling | ||
* log.setLevel(level) | ||
* An ffdc can be generated programmatically by calling | ||
* log.debug() | ||
*/ | ||
exports.log = log; | ||
/** @const {string} */ | ||
logger.level = process.env.MQLIGHT_NODE_LOG || 'ffdc'; | ||
log('debug', NO_CLIENT_ID, 'logger.level =', logger.level); | ||
if (logger.levels[logger.level] <= logger.levels.data) { | ||
log('debug', NO_CLIENT_ID, 'Setting PN_TRACE_FRM'); | ||
/** @const {string} */ | ||
process.env.PN_TRACE_FRM = '1'; | ||
if (logger.levels[logger.level] <= logger.levels.detail) { | ||
log('debug', NO_CLIENT_ID, 'Setting PN_TRACE_RAW'); | ||
/** @const {string} */ | ||
process.env.PN_TRACE_RAW = '1'; | ||
} | ||
} | ||
/* | ||
* Set up a signal handler that will cause an ffdc to be generated when | ||
* the signal is caught. Set the environment variable MQLIGHT_NODE_NO_HANDLER | ||
* to stop the signal handler being registered. | ||
*/ | ||
var pkg = require('./package.json'); | ||
var os = require('os'); | ||
var isWin = (os.platform() === 'win32'); | ||
if (!process.env.MQLIGHT_NODE_NO_HANDLER) { | ||
if (isWin) { | ||
log('debug', NO_CLIENT_ID, 'Registering signal handler for SIGBREAK'); | ||
process.on('SIGBREAK', function() { | ||
ffdc('SIGBREAK', 255, null, null); | ||
}); | ||
} else { | ||
log('debug', NO_CLIENT_ID, 'Registering signal handler for SIGUSR2'); | ||
process.on('SIGUSR2', function() { | ||
ffdc('SIGUSR2', 255, null, null); | ||
}); | ||
} | ||
} | ||
var _system = os.platform() + '-' + process.arch; | ||
@@ -129,11 +83,36 @@ if (process.env.NODE_ENV === 'unittest') { | ||
/** Proton Messenger status values (returned from ProtonMessenger.Status()) */ | ||
PN_STATUS_UNKNOWN = 0; /** The status unknown. */ | ||
PN_STATUS_PENDING = 1; /** The message is in flight. */ | ||
PN_STATUS_ACCEPTED = 2; /** The message was accepted. */ | ||
PN_STATUS_REJECTED = 3; /** The message was rejected. */ | ||
PN_STATUS_RELEASED = 4; /** The message was released. */ | ||
PN_STATUS_MODIFIED = 5; /** The message was modified. */ | ||
PN_STATUS_ABORTED = 6; /** The message was aborted. */ | ||
PN_STATUS_SETTLED = 7; /** The remote party has settled the message. */ | ||
/** The status unknown. */ | ||
PN_STATUS_UNKNOWN = 0; | ||
/** The message is in flight. */ | ||
PN_STATUS_PENDING = 1; | ||
/** The message was accepted. */ | ||
PN_STATUS_ACCEPTED = 2; | ||
/** The message was rejected. */ | ||
PN_STATUS_REJECTED = 3; | ||
/** The message was released. */ | ||
PN_STATUS_RELEASED = 4; | ||
/** The message was modified. */ | ||
PN_STATUS_MODIFIED = 5; | ||
/** The message was aborted. */ | ||
PN_STATUS_ABORTED = 6; | ||
/** The remote party has settled the message. */ | ||
PN_STATUS_SETTLED = 7; | ||
/** | ||
@@ -181,3 +160,3 @@ * Constructs a new Client object in the disconnected state. | ||
exports.createClient = function(options) { | ||
log('entry', NO_CLIENT_ID, 'createClient >'); | ||
log.entry('createClient', log.NO_CLIENT_ID); | ||
@@ -198,4 +177,3 @@ if (!options) throw TypeError('options object missing'); | ||
log('exit', client.id, 'createClient <', client); | ||
log.exit('createClient', client.id, client); | ||
return client; | ||
@@ -220,4 +198,4 @@ }; | ||
var generateServiceList = function(service) { | ||
log('entry', NO_CLIENT_ID, 'generateServiceList >'); | ||
log('parms', NO_CLIENT_ID, 'service:', service); | ||
log.entry('generateServiceList', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'service:', service); | ||
@@ -293,4 +271,3 @@ // Validate the parameter list length | ||
log('exit', NO_CLIENT_ID, 'generateServiceList <', serviceList); | ||
log.exit('generateServiceList', log.NO_CLIENT_ID, serviceList); | ||
return serviceList; | ||
@@ -339,7 +316,8 @@ }; | ||
var Client = function(service, id, user, password) { | ||
log('entry', NO_CLIENT_ID, 'constructor >'); | ||
log('parms', NO_CLIENT_ID, 'service:', service); | ||
log('parms', NO_CLIENT_ID, 'id:', id); | ||
log('parms', NO_CLIENT_ID, 'user:', user); | ||
log('parms', NO_CLIENT_ID, 'password:', password ? '********' : password); | ||
log.entry('Client.constructor', log.NO_CLIENT_ID); | ||
log.log('parms', log.NO_CLIENT_ID, 'service:', service); | ||
log.log('parms', log.NO_CLIENT_ID, 'id:', id); | ||
log.log('parms', log.NO_CLIENT_ID, 'user:', user); | ||
log.log('parms', log.NO_CLIENT_ID, | ||
'password:', password ? '********' : password); | ||
@@ -387,2 +365,3 @@ EventEmitter.call(this); | ||
log.entry('proton.createMessenger', this.id); | ||
// Initialize ProtonMessenger with auth details | ||
@@ -397,2 +376,3 @@ if (user) { | ||
} | ||
log.exit('proton.createMessenger', this.id, null); | ||
@@ -402,7 +382,11 @@ // Set the initial state to disconnected | ||
this.service = undefined; | ||
// List of message subscriptions that the application is expected to call message.settleDelivery() for | ||
this.manualSettleSubscriptions = new Array(); | ||
log('exit', this.id, 'constructor <', this); | ||
// List of message subscriptions | ||
this.subscriptions = []; | ||
// List of outstanding send operations waiting to be accepted, settled, etc | ||
// by the listener. | ||
this.outstandingSends = []; | ||
log.exit('Client.constructor', this.id, this); | ||
}; | ||
@@ -455,3 +439,3 @@ util.inherits(Client, EventEmitter); | ||
Client.prototype.connect = function(callback) { | ||
log('entry', this.id, 'connect >'); | ||
log.entry('Client.connect', this.id); | ||
@@ -464,2 +448,3 @@ if (callback && (typeof callback !== 'function')) { | ||
var performConnect = function(client, callback) { | ||
log.entry('Client.connect.performConnect', client.id); | ||
@@ -476,5 +461,9 @@ var currentState = client.getState(); | ||
if (callback) { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback(undefined); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
} | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, client); | ||
return client; | ||
@@ -495,13 +484,18 @@ } | ||
} | ||
} catch (e) { | ||
} catch (err) { | ||
// if there is an error getting the service list then ensure state is | ||
// disconnected | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
var err = new Error(e.message); | ||
process.nextTick(function() { | ||
if (callback) { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback(err); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
} | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
return; | ||
@@ -516,13 +510,18 @@ } | ||
client.service = service; | ||
} catch (e) { | ||
} catch (err) { | ||
// if there is an error connecting to the service then ensure state is | ||
// disconnected | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
var err = new Error(e.message); | ||
process.nextTick(function() { | ||
if (callback) { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback(err); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
} | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
return; | ||
@@ -534,2 +533,3 @@ } | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'connected'); | ||
client.emit('connected'); | ||
@@ -543,88 +543,9 @@ }); | ||
process.nextTick(function() { | ||
log.entry('Client.connect.performConnect.callback', client.id); | ||
callback.apply(client); | ||
log.exit('Client.connect.performConnect.callback', client.id, null); | ||
}); | ||
} | ||
// Function to check for messages, outputting the contents of each to the | ||
// event emitter | ||
var messenger = client.messenger; | ||
var check_for_messages = function() { | ||
if (client.state !== 'connected') { | ||
return; | ||
} | ||
try { | ||
var messages = messenger.receive(50); | ||
if (messages.length > 0) { | ||
for (var i = 0, tot = messages.length; i < tot; i++) { | ||
var protonMsg = messages[i]; | ||
// if body is a JSON'ified object, try to parse it back to a js obj | ||
var data; | ||
if (protonMsg.contentType === 'application/json') { | ||
try { | ||
data = JSON.parse(protonMsg.body); | ||
} catch (_) { | ||
console.warn(_); | ||
} | ||
} else { | ||
data = protonMsg.body; | ||
} | ||
var topic = url.parse(protonMsg.address).path.substring(1); | ||
var index = client.manualSettleSubscriptions.indexOf(protonMsg.address); | ||
var autoSettle = index < 0; | ||
var delivery = { | ||
message : { | ||
properties : { | ||
contentType : protonMsg.contentType | ||
}, | ||
topic : topic, | ||
settleDelivery : autoSettle ? function() { | ||
log('entry', this.id, 'message.settleDelivery (noop version) >'); | ||
log('data', this.id, 'delivery:', delivery); | ||
log('exit', this.id, 'message.settleDelivery (noop version) <'); | ||
} : function() { | ||
log('entry', this.id, 'message.settleDelivery >'); | ||
log('data', this.id, 'delivery:', delivery); | ||
messenger.settle(protonMsg); | ||
protonMsg.destroy(); | ||
log('exit', this.id, 'message.settleDelivery <'); | ||
} | ||
} | ||
}; | ||
var linkAddress = protonMsg.linkAddress; | ||
if (linkAddress) { | ||
delivery.subscription = {}; | ||
var split = linkAddress.split(':', 3); | ||
if (linkAddress.indexOf('share:') === 0) { | ||
delivery.subscription.share = split[1]; | ||
delivery.subscription.pattern = split[2]; | ||
} else { | ||
delivery.subscription.pattern = split[1]; | ||
} | ||
} | ||
client.emit('message', data, delivery); | ||
if (autoSettle) { | ||
messenger.settle(protonMsg); | ||
protonMsg.destroy(); | ||
} | ||
} | ||
} | ||
} catch (e) { | ||
var err = new Error(e.message); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (err) client.emit('error', err); | ||
}); | ||
} | ||
if (client.state === 'connected') { | ||
setImmediate(check_for_messages); | ||
} | ||
}; | ||
// Setup the check for messages such that each received messages is output | ||
// to the event emitter | ||
process.nextTick(function() { | ||
check_for_messages(); | ||
}); | ||
log.exit('Client.connect.performConnect', client.id, null); | ||
return; | ||
@@ -636,2 +557,4 @@ }; | ||
var stillDisconnecting = function(client, callback) { | ||
log.entry('stillDisconnecting', client.id); | ||
if (client.getState() === 'disconnecting') { | ||
@@ -646,2 +569,4 @@ process.nextTick(function() { | ||
} | ||
log.exit('stillDisconnecting', client.id, null); | ||
}; | ||
@@ -653,4 +578,3 @@ | ||
log('exit', this.id, 'connect <', client); | ||
log.exit('Client.connect', client.id, client); | ||
return client; | ||
@@ -692,3 +616,3 @@ }; | ||
Client.prototype.disconnect = function(callback) { | ||
log('entry', this.id, 'disconnect >'); | ||
log.entry('Client.disconnect', this.id); | ||
@@ -699,18 +623,33 @@ var client = this; | ||
var performDisconnect = function(client, callback) { | ||
log.entry('Client.disconnect.performDisconnect', client.id); | ||
client.state = 'disconnecting'; | ||
if (client.messenger) { | ||
client.messenger.stop(); | ||
} | ||
// Indicate that we've disconnected | ||
client.state = 'disconnected'; | ||
process.nextTick(function() { | ||
client.emit('disconnected'); | ||
}); | ||
if (callback) { | ||
// Only disconnect when all outstanding send operations are complete | ||
if (client.outstandingSends.length === 0) { | ||
if (client.messenger) { | ||
client.messenger.stop(); | ||
} | ||
// Indicate that we've disconnected | ||
client.state = 'disconnected'; | ||
process.nextTick(function() { | ||
callback.apply(client); | ||
log.log('emit', client.id, 'disconnected'); | ||
client.emit('disconnected'); | ||
}); | ||
if (callback) { | ||
process.nextTick(function() { | ||
log.entry('Client.disconnect.performDisconnect.callback', client.id); | ||
callback.apply(client); | ||
log.exit('Client.disconnect.performDisconnect.callback', client.id, | ||
null); | ||
}); | ||
} | ||
log.exit('Client.disconnect.performDisconnect', client.id, null); | ||
return; | ||
} | ||
return; | ||
// try disconnect again | ||
setImmediate(performDisconnect, client, callback); | ||
}; | ||
@@ -727,5 +666,9 @@ | ||
if (callback) { | ||
log.entry('Client.disconnect.callback', client.id); | ||
callback.apply(client); | ||
log.exit('Client.disconnect.callback', client.id, null); | ||
} | ||
}); | ||
log.exit('Client.disconnect', client.id, client); | ||
return client; | ||
@@ -738,4 +681,3 @@ } | ||
log('exit', this.id, 'disconnect <', client); | ||
log.exit('Client.disconnect', client.id, client); | ||
return client; | ||
@@ -780,2 +722,3 @@ }; | ||
var state = this.state; | ||
log.log('data', this.id, 'Client.getState:', state); | ||
return state; | ||
@@ -835,3 +778,3 @@ }; | ||
Client.prototype.send = function(topic, data, options, callback) { | ||
log('entry', this.id, 'send >'); | ||
log.entry('Client.send', this.id); | ||
@@ -844,3 +787,3 @@ // Validate the passed parameters | ||
} | ||
log('parms', this.id, 'topic:', topic); | ||
log.log('parms', this.id, 'topic:', topic); | ||
if (data === undefined) { | ||
@@ -851,45 +794,46 @@ throw new TypeError('Cannot send undefined data'); | ||
} | ||
log('parms', this.id, 'data:', data); | ||
log.log('parms', this.id, 'data:', data); | ||
// Validate the remaining optional parameters, assigning local variables to | ||
// the appropriate parameter | ||
var callbackOption; | ||
if (options) { | ||
if (options instanceof Function) { | ||
callbackOption = options; | ||
// If the last argument is a Function then it must be a callback, and not | ||
// options | ||
if (arguments.length === 3) { | ||
if (arguments[2] instanceof Function) { | ||
callback = options; | ||
options = undefined; | ||
} | ||
} | ||
// Validate the options parameter, when specified | ||
if (options !== undefined) { | ||
if (typeof options == 'object') { | ||
log.log('parms', this.id, 'options:', options); | ||
} else { | ||
if (options instanceof Object) { | ||
log('parms', this.id, 'options:', options); | ||
} else { | ||
throw new TypeError('options must be an object type not a ' + | ||
(typeof options) + ')'); | ||
} | ||
throw new TypeError('options must be an object type not a ' + | ||
(typeof options) + ')'); | ||
} | ||
} | ||
var qos = exports.QOS_AT_MOST_ONCE; | ||
if (options) { | ||
if (options.qos) { | ||
if (options.qos == exports.QOS_AT_MOST_ONCE) { | ||
if ('qos' in options) { | ||
if (options.qos === exports.QOS_AT_MOST_ONCE) { | ||
qos = exports.QOS_AT_MOST_ONCE; | ||
} else if (options.qos == exports.QOS_AT_LEAST_ONCE) { | ||
} else if (options.qos === exports.QOS_AT_LEAST_ONCE) { | ||
qos = exports.QOS_AT_LEAST_ONCE; | ||
} else { | ||
throw new TypeError("options:qos value '" + options.qos + "' is invalid " + | ||
"must evaluate to 0 or 1"); | ||
throw new TypeError("options:qos value '" + options.qos + | ||
"' is invalid must evaluate to 0 or 1"); | ||
} | ||
} | ||
} | ||
// Validate the callback parameter, when specified | ||
// (and must be specified for QoS of ALO) | ||
if (callback) { | ||
if (callbackOption) { | ||
throw new TypeError('Invalid forth argument, callback already matched' + | ||
'for third argument'); | ||
} | ||
if (callback instanceof Function) { | ||
callbackOption = callback; | ||
} else { | ||
if (!(callback instanceof Function)) { | ||
throw new TypeError('callback must be a function type'); | ||
} | ||
} else if (qos === exports.QOS_AT_LEAST_ONCE) { | ||
throw new TypeError('callback must be specified when options:qos value ' + | ||
'of 1 (at least once) is specified'); | ||
} | ||
@@ -905,3 +849,5 @@ | ||
try { | ||
log.entry('proton.createMessage', client.id); | ||
protonMsg = proton.createMessage(); | ||
log.exit('proton.createMessage', client.id, protonMsg); | ||
protonMsg.address = this.getService(); | ||
@@ -931,72 +877,284 @@ if (topic) { | ||
// Record that a send operation is in progress | ||
var localMessageId = uuid.v4(); | ||
client.outstandingSends.push(localMessageId); | ||
// setup a timer to trigger the callback once the msg has been sent, or | ||
// immediately if no message to be sent | ||
var untilSendComplete = function(protonMsg, sendCallback) { | ||
var untilSendComplete = function(protonMsg, localMessageId, sendCallback) { | ||
log.entry('Client.send.utilSendComplete', client.id); | ||
try { | ||
var complete = false; | ||
switch(messenger.status(protonMsg)) { | ||
case PN_STATUS_ACCEPTED: | ||
case PN_STATUS_SETTLED: | ||
messenger.settle(protonMsg); | ||
complete = true; | ||
break; | ||
}; | ||
if (complete) { | ||
var err, index; | ||
if (!messenger.stopped) { // if still connected | ||
var status = messenger.status(protonMsg); | ||
switch (status) { | ||
case PN_STATUS_ACCEPTED: | ||
case PN_STATUS_SETTLED: | ||
messenger.settle(protonMsg); | ||
complete = true; | ||
break; | ||
case PN_STATUS_REJECTED: | ||
complete = true; | ||
err = new Error('send failed - message was rejected'); | ||
break; | ||
case PN_STATUS_RELEASED: | ||
complete = true; | ||
err = new Error('send failed - message was released'); | ||
break; | ||
case PN_STATUS_MODIFIED: | ||
complete = true; | ||
err = new Error('send failed - message was modified'); | ||
break; | ||
case PN_STATUS_ABORTED: | ||
complete = true; | ||
err = new Error('send failed - message was aborted'); | ||
break; | ||
} | ||
// If complete then invoke the callback, when specified | ||
if (complete) { | ||
index = client.outstandingSends.indexOf(localMessageId); | ||
if (index >= 0) client.outstandingSends.splice(index, 1); | ||
if (sendCallback) { | ||
var body = protonMsg.body; | ||
setImmediate(function() { | ||
log.entry('Client.send.utilSendComplete.callback', client.id); | ||
sendCallback.apply(client, [err, topic, body, options]); | ||
log.exit('Client.send.utilSendComplete.callback', client.id, | ||
null); | ||
}); | ||
} | ||
protonMsg.destroy(); | ||
log.exit('Client.send.utilSendComplete', client.id, null); | ||
return; | ||
} | ||
// message not sent yet, so check again in a second or so | ||
messenger.send(); | ||
setImmediate(untilSendComplete, protonMsg, localMessageId, | ||
sendCallback); | ||
} else { | ||
// TODO Not sure we can actually get here (so FFDC?) | ||
index = client.outstandingSends.indexOf(localMessageId); | ||
if (index >= 0) client.outstandingSends.splice(index, 1); | ||
if (sendCallback) { | ||
var body = protonMsg.body; | ||
var decoded = decodeURIComponent(protonMsg.address); | ||
var topic = url.parse(decoded).path.substring(1); | ||
var delivery = { | ||
message: { | ||
properties: { | ||
contentType: protonMsg.contentType | ||
}, | ||
topic: topic | ||
} | ||
}; | ||
setImmediate(function() { | ||
// TODO: defect 59405 might mean we change what gets passed into | ||
// the callback... | ||
sendCallback.apply(client, [undefined, body, delivery]); | ||
//sendCallback.apply(client); | ||
}); | ||
err = new Error('send may have not completed due to disconnect'); | ||
log.entry('Client.send.utilSendComplete.callback', client.id); | ||
sendCallback.apply(client, [err, topic, protonMsg.body, options]); | ||
log.exit('Client.send.utilSendComplete.callback', client.id, null); | ||
} | ||
protonMsg.destroy(); | ||
log.exit('Client.send.utilSendComplete', client.id, null); | ||
return; | ||
} | ||
// if msg not yet sent and still running, check again in a second or so | ||
if (!messenger.stopped) { | ||
messenger.send(); | ||
setImmediate(untilSendComplete, protonMsg, callbackOption); | ||
} | ||
} catch (e) { | ||
var err = new Error(e.message); | ||
log.log('error', client.id, e); | ||
//error condition so won't retry send remove from list of unsent | ||
index = client.outstandingSends.indexOf(localMessageId); | ||
if (index >= 0) client.outstandingSends.splice(index, 1); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (callbackOption) { | ||
callbackOption(err, protonMsg); | ||
if (sendCallback) { | ||
log.entry('Client.send.utilSendComplete.callback', client.id); | ||
sendCallback.apply(client, [e, topic, protonMsg.body, options]); | ||
log.exit('Client.send.utilSendComplete.callback', client.id, null); | ||
} | ||
if (err) client.emit('error', err); | ||
if (e) { | ||
log.log('emit', client.id, 'error', e); | ||
client.emit('error', e); | ||
} | ||
}); | ||
} | ||
log.exit('Client.send.utilSendComplete', client.id, null); | ||
}; | ||
// start the timer to trigger it to keep sending until msg has sent | ||
setImmediate(untilSendComplete, protonMsg, callbackOption); | ||
} catch (e) { | ||
var err = new Error(e.message); | ||
setImmediate(untilSendComplete, protonMsg, localMessageId, callback); | ||
} catch (err) { | ||
log.log('error', client.id, err); | ||
//error condition so won't retry send need to remove it from list of unsent | ||
index = client.outstandingSends.indexOf(localMessageId); | ||
if (index >= 0) client.outstandingSends.splice(index, 1); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (callbackOption) { | ||
callbackOption(err, protonMsg); | ||
if (callback) { | ||
log.entry('Client.send.callback', client.id); | ||
callback(err, protonMsg); | ||
log.exit('Client.send.callback', client.id, null); | ||
} | ||
if (err) client.emit('error', err); | ||
if (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
} | ||
}); | ||
} | ||
log('exit', this.id, 'send <'); | ||
log.exit('Client.send', this.id, null); | ||
}; | ||
/** | ||
* Function to force the client to check for messages, outputting the contents | ||
* of any that have arrived to the client event emitter. | ||
* | ||
* @throws {Error} | ||
* If a listener hasn't been reqistered for the 'malformed' event and | ||
* one needs to be emitted. | ||
*/ | ||
Client.prototype.checkForMessages = function() { | ||
var client = this; | ||
log.entryLevel('entry_often', 'checkForMessages', client.id); | ||
var messenger = client.messenger; | ||
if (client.state !== 'connected' || client.subscriptions.length === 0) { | ||
log.exitLevel('entry_often', 'checkForMessages', client.id); | ||
return; | ||
} | ||
try { | ||
var messages = messenger.receive(50); | ||
if (messages.length > 0) { | ||
log.log('debug', client.id, 'received %d messages', messages.length); | ||
for (var msg = 0, tot = messages.length; msg < tot; msg++) { | ||
log.log('debug', client.id, 'processing message %d', msg); | ||
var protonMsg = messages[msg]; | ||
// if body is a JSON'ified object, try to parse it back to a js obj | ||
var data; | ||
if (protonMsg.contentType === 'application/json') { | ||
try { | ||
data = JSON.parse(protonMsg.body); | ||
} catch (_) { | ||
log.log('error', client.id, _); | ||
console.warn(_); | ||
} | ||
} else { | ||
data = protonMsg.body; | ||
} | ||
var topic = url.parse(protonMsg.address).path.substring(1); | ||
var autoConfirm = true; | ||
var qos = exports.QOS_AT_MOST_ONCE; | ||
for (var i = 0; i < client.subscriptions.length; i++) { | ||
if (client.subscriptions[i].address === protonMsg.address) { | ||
qos = client.subscriptions[i].qos; | ||
if (qos === exports.QOS_AT_LEAST_ONCE) { | ||
autoConfirm = client.subscriptions[i].autoConfirm; | ||
} | ||
break; | ||
} | ||
} | ||
var delivery = { | ||
message: { | ||
properties: { | ||
contentType: protonMsg.contentType | ||
}, | ||
topic: topic, | ||
confirmDelivery: autoConfirm ? function() { | ||
log.entry('message.confirmDelivery.auto', this.id); | ||
log.log('data', this.id, 'delivery:', delivery); | ||
log.exit('message.confirmDelivery.auto', this.id, null); | ||
} : function() { | ||
log.entry('message.confirmDelivery', this.id); | ||
log.log('data', this.id, 'delivery:', delivery); | ||
if (protonMsg) { | ||
messenger.settle(protonMsg); | ||
protonMsg.destroy(); | ||
protonMsg = undefined; | ||
} | ||
log.exit('message.confirmDelivery', this.id, null); | ||
} | ||
} | ||
}; | ||
var linkAddress = protonMsg.linkAddress; | ||
if (linkAddress) { | ||
delivery.destination = {}; | ||
var split = linkAddress.split(':', 3); | ||
if (linkAddress.indexOf('share:') === 0) { | ||
delivery.destination.share = split[1]; | ||
delivery.destination.topicPattern = split[2]; | ||
} else { | ||
delivery.destination.topicPattern = split[1]; | ||
} | ||
} | ||
var da = protonMsg.deliveryAnnotations; | ||
var malformed = {}; | ||
malformed.MQMD = {}; | ||
for (var an = 0; da && (an < da.length); ++an) { | ||
if (da[an] && da[an].key) { | ||
switch (da[an].key) { | ||
case 'x-opt-message-malformed-condition': | ||
malformed.condition = da[an].value; | ||
break; | ||
case 'x-opt-message-malformed-description': | ||
malformed.description = da[an].value; | ||
break; | ||
case 'x-opt-message-malformed-MQMD.CodedCharSetId': | ||
malformed.MQMD.CodedCharSetId = Number(da[an].value); | ||
break; | ||
case 'x-opt-message-malformed-MQMD.Format': | ||
malformed.MQMD.Format = da[an].value; | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
} | ||
if (malformed.condition) { | ||
if (client.listeners('malformed').length > 0) { | ||
delivery.malformed = malformed; | ||
log.log('emit', client.id, | ||
'malformed', protonMsg.body, delivery); | ||
client.emit('malformed', protonMsg.body, delivery); | ||
} else { | ||
protonMsg.destroy(); | ||
throw new Error('No listener for "malformed" event.'); | ||
} | ||
} else { | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'message', data, delivery); | ||
client.emit('message', data, delivery); | ||
}); | ||
} | ||
if (qos === exports.QOS_AT_MOST_ONCE) { | ||
messenger.accept(protonMsg); | ||
messenger.settle(protonMsg); | ||
protonMsg.destroy(); | ||
} else { | ||
if (autoConfirm) { | ||
messenger.settle(protonMsg); | ||
protonMsg.destroy(); | ||
} | ||
} | ||
} | ||
} | ||
} catch (err) { | ||
log.log('error', client.id, err); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
}); | ||
} | ||
log.exitLevel('entry_often', 'checkForMessages', client.id); | ||
setImmediate(function() { | ||
if (client.state === 'connected') { | ||
client.checkForMessages.apply(client); | ||
} | ||
}); | ||
}; | ||
/** | ||
* @param {function(object)} | ||
* destCallback - callback, invoked with an Error object if something | ||
* destCallback - callback, invoked with an Error object if | ||
* something | ||
* goes wrong. | ||
@@ -1006,3 +1164,3 @@ * @param {String} | ||
* @param {String} | ||
* pattern - the pattern that was subscribed to. | ||
* topicPattern - the topic pattern that was subscribed to | ||
*/ | ||
@@ -1013,8 +1171,9 @@ | ||
* Constructs a subscription object and starts the emission of message events | ||
* each time a message arrives, at the MQ Light service, that matches pattern. | ||
* each time a message arrives, at the MQ Light service, that matches | ||
* topic pattern. | ||
* | ||
* @param {String} | ||
* pattern used to match against the <code>address</code> attribute of | ||
* messages to determine if a copy of the message should be delivered | ||
* to the <code>Destination</code>. | ||
* topicPattern used to match against the <code>address</code> | ||
* attribute of messages to determine if a copy of the message should | ||
* be delivered to the <code>Destination</code>. | ||
* @param {String} | ||
@@ -1038,16 +1197,17 @@ * share (Optional) Specifies whether to create or join a shared | ||
* @throws {Error} | ||
* If the pattern parameter is undefined. | ||
* If the topic pattern parameter is undefined. | ||
*/ | ||
Client.prototype.subscribe = function(pattern, share, options, callback) { | ||
log('entry', this.id, 'subscribe >'); | ||
log('parms', this.id, 'pattern:', pattern); | ||
Client.prototype.subscribe = function(topicPattern, share, options, callback) { | ||
log.entry('Client.subscribe', this.id); | ||
log.log('parms', this.id, 'topicPattern:', topicPattern); | ||
// Must accept at least one option - and first option is always a pattern. | ||
// Must accept at least one option - and first option is always a | ||
// topicPattern. | ||
if (arguments.length === 0) { | ||
throw new TypeError("You must specify a 'pattern' argument"); | ||
throw new TypeError("You must specify a 'topicPattern' argument"); | ||
} | ||
if (!pattern) { | ||
throw new TypeError("You must specify a 'pattern' argument"); | ||
if (!topicPattern) { | ||
throw new TypeError("You must specify a 'topicPattern' argument"); | ||
} | ||
pattern = String(pattern); | ||
topicPattern = String(topicPattern); | ||
@@ -1084,2 +1244,3 @@ // Two or three arguments are the interesting cases - the rules we use to | ||
var originalShareValue = share; | ||
if (share) { | ||
@@ -1096,30 +1257,41 @@ share = String(share); | ||
// Validate the options parameter, when specified | ||
if (options !== undefined) { | ||
if (typeof options == 'object') { | ||
log.log('parms', this.id, 'options:', options); | ||
} else { | ||
throw new TypeError('options must be an object type not a ' + | ||
(typeof options) + ')'); | ||
} | ||
} | ||
var qos = exports.QOS_AT_MOST_ONCE; | ||
var autoSettle = true; | ||
var autoConfirm = true; | ||
if (options) { | ||
if (options.qos) { | ||
if (options.qos == exports.QOS_AT_MOST_ONCE) { | ||
if ('qos' in options) { | ||
if (options.qos === exports.QOS_AT_MOST_ONCE) { | ||
qos = exports.QOS_AT_MOST_ONCE; | ||
} else if (options.qos == exports.QOS_AT_LEAST_ONCE) { | ||
} else if (options.qos === exports.QOS_AT_LEAST_ONCE) { | ||
qos = exports.QOS_AT_LEAST_ONCE; | ||
} else { | ||
throw new TypeError("options:qos value '" + options.qos + "' is invalid " + | ||
"must evaluate to 0 or 1"); | ||
throw new TypeError("options:qos value '" + options.qos + | ||
"' is invalid must evaluate to 0 or 1"); | ||
} | ||
} | ||
if (options.autoSettle === true) { | ||
autoSettle = true; | ||
} else if (options.autoSettle === false) { | ||
autoSettle = false; | ||
} else if (options.autoSettle == undefined) { | ||
autoSettle = true; | ||
} else { | ||
throw new TypeError("options:autoSettle value '" + options.autoSettle + "' is invalid " + | ||
"must evaluate to true or false"); | ||
if ('autoConfirm' in options) { | ||
if (options.autoConfirm === true) { | ||
autoConfirm = true; | ||
} else if (options.autoConfirm === false) { | ||
autoConfirm = false; | ||
} else { | ||
throw new TypeError("options:autoConfirm value '" + | ||
options.autoConfirm + | ||
"' is invalid must evaluate to true or false"); | ||
} | ||
} | ||
} | ||
log('parms', this.id, 'share:', share); | ||
log('parms', this.id, 'options:', options); | ||
log.log('parms', this.id, 'share:', share); | ||
log.log('parms', this.id, 'options:', options); | ||
if (callback && !(callback instanceof Function)) { | ||
@@ -1132,20 +1304,33 @@ throw new TypeError('callback must be a function type'); | ||
// Subscribe using the specified pattern and share options | ||
// Subscribe using the specified topic pattern and share options | ||
var messenger = this.messenger; | ||
var address = this.getService() + '/' + share + pattern; | ||
var address = this.getService() + '/' + share + topicPattern; | ||
var client = this; | ||
// If manual settle required then add address to manual settle list, otherwise ensure manual settle list does not contain the address | ||
var index = client.manualSettleSubscriptions.indexOf(this.getService() + '/' + pattern); | ||
if (qos === exports.QOS_AT_LEAST_ONCE && !autoSettle) { | ||
if (index < 0) client.manualSettleSubscriptions.push(this.getService() + '/' + pattern); | ||
} else { | ||
if (index >= 0) client.manualSettleSubscriptions.splice(index, 1); | ||
} | ||
var err; | ||
try { | ||
messenger.subscribe(address, qos); | ||
// If this is the first subscription to be added, schedule a request to | ||
// start the polling loop to check for messages arriving | ||
if (client.subscriptions.length === 0) { | ||
process.nextTick(function() { | ||
client.checkForMessages(); | ||
}); | ||
} | ||
// Add address to list of subscriptions, replacing any existing entry | ||
var subscriptionAddress = this.getService() + '/' + topicPattern; | ||
for (var i = 0; i < client.subscriptions.length; i++) { | ||
if (client.subscriptions[i].address === subscriptionAddress) { | ||
client.subscriptions.splice(i, 1); | ||
break; | ||
} | ||
} | ||
client.subscriptions.push({ address: subscriptionAddress, | ||
qos: qos, autoConfirm: autoConfirm }); | ||
} catch (e) { | ||
err = new Error(e.message); | ||
log.log('error', client.id, e); | ||
err = e; | ||
} | ||
@@ -1155,5 +1340,10 @@ | ||
if (callback) { | ||
callback.apply(client, [err, pattern]); | ||
log.entry('Client.subscribe.callback', client.id); | ||
log.log('parms', client.id, 'err:', err, 'topicPattern:', topicPattern, | ||
'originalShareValue:', originalShareValue); | ||
callback.apply(client, [err, topicPattern, originalShareValue]); | ||
log.exit('Client.subscribe.callback', client.id, null); | ||
} | ||
if (err) { | ||
log.log('emit', client.id, 'error', err); | ||
client.emit('error', err); | ||
@@ -1164,64 +1354,6 @@ client.disconnect(); | ||
log('exit', this.id, 'subscribe <', client); | ||
log.exit('Client.subscribe', client.id, client); | ||
return client; | ||
}; | ||
var ffdcSequence = 0; | ||
ffdc = function(fnc, probeId, client, data) { | ||
var clientId = client ? client.id : NO_CLIENT_ID; | ||
log('ffdc', clientId, '+--------------------------------------' + | ||
'---------------------------------------+'); | ||
log('ffdc', clientId, '| IBM MQ Light Node.js Client Module - ' + | ||
'First Failure Data Capture'); | ||
log('ffdc', clientId, '| =====================================' + | ||
'=========================='); | ||
log('ffdc', clientId, '|'); | ||
log('ffdc', clientId, '| Date/Time :- ' + | ||
moment(new Date()).format('ddd MMMM DD YYYY HH:mm:ss.SSS Z')); | ||
log('ffdc', clientId, '| Host Name :- ' + os.hostname()); | ||
log('ffdc', clientId, '| Operating System :- ' + os.type(), os.release()); | ||
log('ffdc', clientId, '| Architecture :- ' + os.platform(), os.arch()); | ||
log('ffdc', clientId, '| Node Version :- ' + process.version); | ||
log('ffdc', clientId, '| Node Path :- ' + process.execPath); | ||
log('ffdc', clientId, '| Node Arguments :- ' + process.execArgs); | ||
log('ffdc', clientId, '| Program Arguments :- ' + process.argv); | ||
if (!isWin) { | ||
log('ffdc', clientId, '| User Id :- ' + process.getuid()); | ||
log('ffdc', clientId, '| Group Id :- ' + process.getgid()); | ||
} | ||
log('ffdc', clientId, '| Name :- ' + pkg.name); | ||
log('ffdc', clientId, '| Version :- ' + pkg.version); | ||
log('ffdc', clientId, '| Description :- ' + pkg.description); | ||
log('ffdc', clientId, '| Installation Path :- ' + __dirname); | ||
log('ffdc', clientId, '| Uptime :- ' + process.uptime()); | ||
log('ffdc', clientId, '| Function :- ' + fnc); | ||
log('ffdc', clientId, '| Probe Id :- ' + probeId); | ||
log('ffdc', clientId, '| FDCSequenceNumber :- ' + ffdcSequence++); | ||
log('ffdc', clientId, '+--------------------------------------' + | ||
'---------------------------------------+'); | ||
log('ffdc', clientId, ''); | ||
log('ffdc', clientId, new Error().stack); | ||
if (client) { | ||
log('ffdc', clientId, ''); | ||
log('ffdc', clientId, 'Client'); | ||
log('ffdc', clientId, client); | ||
} | ||
if (data) { | ||
log('ffdc', clientId, ''); | ||
log('ffdc', clientId, 'Data'); | ||
log('ffdc', clientId, data); | ||
} | ||
log('ffdc', clientId, ''); | ||
log('ffdc', clientId, 'Memory Usage'); | ||
log('ffdc', clientId, process.memoryUsage()); | ||
if ((ffdcSequence === 1) || (probeId === 255)) { | ||
log('ffdc', clientId, ''); | ||
log('ffdc', clientId, 'Environment Variables'); | ||
log('ffdc', clientId, process.env); | ||
} | ||
log('ffdc', clientId, ''); | ||
}; | ||
/* ------------------------------------------------------------------------- */ |
{ | ||
"name": "mqlight-dev", | ||
"version": "0.1.2014051301", | ||
"description": "IBM MQ Light Client Module", | ||
"main": "mqlight.js", | ||
"gypfile": true, | ||
"dependencies": { | ||
"node-uuid": "~1.4.0", | ||
"nopt": "~2.2.0", | ||
"npmlog": "0.0.6", | ||
"moment": "2.5.0" | ||
}, | ||
"devDependencies": { | ||
"nodeunit": "~0.8.6" | ||
}, | ||
"engines": { | ||
"node": "~0.10.x" | ||
}, | ||
"licenses": [ | ||
{ | ||
"type": "proprietary", | ||
"url": "http://www14.software.ibm.com/cgi-bin/weblap/lap.pl?popup=Y&li_formnum=L-ACRR-9FLFNA" | ||
"name": "mqlight-dev", | ||
"version": "0.1.2014060401", | ||
"description": "IBM MQ Light Client Module", | ||
"main": "mqlight.js", | ||
"gypfile": true, | ||
"bin": { | ||
"mqlight-debug": "./bin/mqlight-debug.js" | ||
}, | ||
"dependencies": { | ||
"node-uuid": "~1.4.0", | ||
"nopt": "~2.2.0", | ||
"npmlog": "0.0.6", | ||
"moment": "2.5.0" | ||
}, | ||
"devDependencies": { | ||
"nodeunit": "~0.8.6" | ||
}, | ||
"engines": { | ||
"node": "~0.10.x" | ||
}, | ||
"licenses": [{ | ||
"type": "proprietary", | ||
"url": "http://www14.software.ibm.com/cgi-bin/weblap/lap.pl?popup=Y&li_formnum=L-ACRR-9FLFNA" | ||
}], | ||
"scripts": { | ||
"postinstall": "node bin/mqlight-postinstall.js", | ||
"test": "./node_modules/.bin/nodeunit --config tests/nodeunit.json tests" | ||
} | ||
], | ||
"scripts": { | ||
"test": "./node_modules/.bin/nodeunit --config tests/nodeunit.json tests" | ||
}, | ||
"readme": "# node-mqlight (beta)\n\nMQ Light is designed to allow applications to exchange discrete pieces of\ninformation in the form of messages. This might sound a lot like TCP/IP\nnetworking, and MQ Light does use TCP/IP under the covers, but MQ Light takes\naway much of the complexity and provides a higher level set of abstractions to\nbuild your applications with.\n\nThis Node.js module provides the high-level API by which you can interact\nwith the MQ Light runtime.\n\nSee https://www.ibmdw.net/messaging/mq-light/ for more details.\n\nCurrent Features:\n\n* Send and receive arbitrary String, Buffer and JSON objects between Node.js\n applications using an at-most-once quality of service.\n* Includes samples to demonstrate API usage.\n\n## Getting Started\n\nInstall it in node.js:\n\n```\nnpm install mqlight\n\nOR\n\nnpm install https://ibm.biz/node-mqlight\n```\n\n```javascript\nvar mqlight = require('mqlight');\n```\n\nThen create some clients to send and receive messages:\n\n```javascript\nvar recvClient = mqlight.createClient({\n service: 'amqp://localhost',\n id: 'recv_client_1'\n});\n\nvar address = 'public';\nrecvClient.on('connected', function() {\n recvClient.subscribe(address);\n recvClient.on('message', function(data, delivery) {\n console.log('Recv: %s', data);\n });\n});\n\nrecvClient.connect();\n\nvar sendClient = mqlight.createClient({\n service: 'amqp://localhost',\n id: 'send_client_1'\n});\n\nvar topic = 'public';\nsendClient.on('connected', function() {\n sendClient.send(topic, 'Hello World!', function (err, data) {\n console.log('Sent: %s', data);\n sendClient.disconnect();\n });\n});\n\nsendClient.connect();\n\n```\n\n## API\n\n### mqlight.createClient([`options`])\n\nCreates an MQ Light client instance.\n\n* `options`, (Object) options for the client. Properties include:\n\n * **service**, (String) (required), the URL for the service to connect to.\n * **id** (String, default: AUTO_[0-9a-f]{7}), a unique identifier for\n this client. A client with a duplicate `id` will be prevented from\n connecting to the messaging service.\n\nReturns a `Client` object representing the client instance. The client is an\nevent emitter and listeners can be registered for the following events:\n`connect`, `disconnect`, `error`, and `message`.\n\n### mqlight.Client.connect([`callback`])\n\nConnects the MQ Light client instance to the service.\n* `callback` - (Function) (optional) callback to be notified of errors &\n completion\n\n### mqlight.Client.send(`topic`, `message`, [`options`], [`callback`])\n\nSends the given MQ Light message object to the specified topic. String and\nBuffer messages will be sent and received as-is. Any other Object will be\nconverted to JSON before sending and automatically parsed back into the same\nObject type when received.\n\n* `topic` - (String) the topic to which the message will be sent.\n* `message` - (String | Buffer | Object) the message body to be sent\n* `options` - (Object) (optional) map of additional options for the send.\n There are no options that can be set in this beta.\n* `callback` - (Function) (optional) callback to be notified of errors &\n completion\n\n### mqlight.Client.subscribe(`pattern`, [`share`], [`options`], [`callback`])\n\nCreate a `Destination` and associates it with a `pattern`.\n\nThe `pattern` argument is matched against the `topic` that messages are\nsent to, allowing the messaging service to determine whether a paricular\nmessage will be delivered to a particular `Destination`, and hence\n`subscription`.\n\n* `pattern` - (String) used to match against the `topic` specified when a\n message is sent to the messaging service.\n* `share` - (String) (optional) name for creating or joining a shared\n subscription for which messages are anycast between connected subscribers. If\n omitted defaults to unshared (e.g. private to the client).\n* `options` - (Object) (optional) map of additional options for the destination.\n There are no options that can be set in this beta.\n* `callback` - (Function) callback to be notified of errors & completion.\n\nReturns the `Client` object that the subscribe was called on. `message` events\nwill be emitted when messages arrive.\n\n### mqlight.Client.getId()\n\nReturns the identifier associated with the client. This will either be what\nwas passed in on the `Client.createClient` call or an autogenerated id.\n\n### mqlight.Client.getService()\n\nReturns the URL of the service to which the client is currently connected\nto, or undefined if not connected.\n\n### mqlight.Client.getState()\n\nReturns the current state of the client, which will be one of:\n'connected', 'connecting', 'disconnected' or 'disconnecting'.\n\n### mqlight.Client.disconnect([callback])\n\nDisconnects this Client from the messaging server and frees the system\nresources that it uses. Calling this method also implicitly closes any\nsubscriptions that have been created using the client's\n`Client.subscribe` method.\n\n### Event: 'message'\n\nEmitted when a message is delivered from a destination matching one of the\nclient's subscriptions.\n\n* `data` - (String | Buffer | Object) the message body.\n* `delivery` - (Object) additional information about why the event was emitted.\n Properties include:\n * **message**, (Object) additional information about the message. Properties\n include:\n * **topic**, (Object) the topic that the message was sent to.\n * **subscription**, (Object) information about the `Client.subscribe` method\n call that caused the client to receive this message (note: this isn't\n implemented yet!)\n\n### Event: 'connect'\n\nThis event is emitted when a client successfully connects to the messaging\nservice.\n\n### Event: 'disconnect'\n\nThis event is emitted when a client disconnects from the messaging service,\neither explicitly, or because the connection between the client and the\nservice is interrupted.\n\n### Event: 'error'\n\nEmitted when an error is detected that prevents or interrupts a client's\nconnection to the messaging service.\n\n* `error` (Error) the error.\n\n## Samples\n\nTo run the samples, install the module via npm and navigate to the\n`mqlight/samples/` folder.\n\nUsage:\n\nReceiver Example:\n\n```\nUsage: recv.js [options] <address>\n address: amqp://<domain>/<name>\n (default amqp://localhost/public)\n\nOptions:\n -h, --help show this help message and exit\n```\n\nSender Example:\n\n```\nUsage: send.js [options] <msg_1> ... <msg_n>\n\nOptions:\n -h, --help show this help message and exit\n -a ADDRESS, --address=ADDRESS\n address: amqp://<domain>/<name>\n (default amqp://localhost/public)\n -d NUM, --delay=NUM add a NUM seconds time delay between each request\n```\n\n## Release notes\n\n### 0.1.0\n\n* Initial beta release.\n* Support for sending and receiving 'at-most-once' messages.\n* Support for wildcard subscriptions.\n* Support for shared subscriptions.\n\n", | ||
"readmeFilename": "README.md", | ||
"_id": "mqlight@0.1.2014051301", | ||
"dist": { | ||
"shasum": "01c3fd3ab49cfe446638fdb3a086f2dbe1b4e089" | ||
}, | ||
"_resolved": "/home/david/node-mqlight-Linux-x64-0.1.tar.gz", | ||
"_from": "/home/david/node-mqlight-Linux-x64-0.1.tar.gz" | ||
} |
@@ -12,3 +12,3 @@ # node-mqlight (beta) | ||
See https://www.ibmdw.net/messaging/mq-light/ for more details. | ||
See https://developer.ibm.com/messaging/mq-light/ for more details. | ||
@@ -23,2 +23,33 @@ Current Features: | ||
### Prerequisites | ||
You will need a Node.js 0.10 runtime environment to use the MQ Light API | ||
module. This can be installed from http://nodejs.org/download/, or by using | ||
your operating system's package manager. | ||
The following are the currently supported platform architectures: | ||
* 64-bit or 32-bit runtime on Windows (x64 or x86) | ||
* 64-bit runtime on Linux (x64) | ||
You will currently receive an error if you attempt to use any other | ||
combination. | ||
Before using MQ Light on Linux, you will also need the 0.9.8 version of an | ||
OpenSSL package. This version of the package is not installed by default, so to | ||
use the module you will need to install it. For example: | ||
* To install the package on Ubuntu, run: ``sudo apt-get install libssl0.9.8`` | ||
* To install the package on RedHat, run: ``sudo yum install openssl098e`` | ||
Additionally, you will also need to make sure you have the libuuid package | ||
installed. For example: | ||
* To check whether you have the package on Ubuntu, run: ``dpkg -l libuuid1`` | ||
* To check whether you have the package on RedHat, run: ``rpm -qa | grep | ||
libuuid`` | ||
### Usage | ||
Install it in node.js: | ||
@@ -106,3 +137,6 @@ | ||
* `options` - (Object) (optional) map of additional options for the send. | ||
There are no options that can be set in this beta. | ||
Supported options are: | ||
* **qos**, (Number) The quality of service to use when sending the message. | ||
0 is used to denote at most once (the default) and 1 is used for at least | ||
once. | ||
* `callback` - (Function) (optional) callback to be notified of errors & | ||
@@ -126,3 +160,11 @@ completion | ||
* `options` - (Object) (optional) map of additional options for the destination. | ||
There are no options that can be set in this beta. | ||
Supported options are: | ||
* **autoConfirm**, (Boolean) When qos option is specified with a value of 1: | ||
true (the default) denotes received messages will be automatically | ||
confirmed (settled). | ||
false denotes received messages will only be confirmed when the associated | ||
'message' events's delivery.message.confirmDelivery() method is called. | ||
* **qos**, (Number) The quality of service to use for delivering messages to | ||
the subscription. Valid values are: 0 to denote at most once (the default) | ||
and 1 is used for at least once. | ||
* `callback` - (Function) callback to be notified of errors & completion. | ||
@@ -165,6 +207,11 @@ | ||
include: | ||
* **properties** (Object) Map of properties for the message. Properties are: | ||
* **contentType** (String) The content of the `data` argument. Values are: | ||
'text/plain' - `data` will be a String. | ||
'application/octet-stream' - `data` will be a Buffer. | ||
'application/json' - `data` will be a JSON Object. | ||
* **topic**, (Object) the topic that the message was sent to. | ||
* **subscription**, (Object) information about the `Client.subscribe` method | ||
call that caused the client to receive this message (note: this isn't | ||
implemented yet!) | ||
* **confirmDelivery**, (Function) A method that can be used to confirm | ||
(settle) the delivery of a at least once quality of service (qos:1) | ||
message. This method does not expect any arguments. | ||
@@ -220,2 +267,16 @@ ### Event: 'connect' | ||
## Feedback | ||
You can help shape the product we release by trying out the beta code and | ||
leaving your | ||
[feedback](https://developer.ibm.com/community/groups/service/html/communityview?communityUuid=00a6a6d0-9601-44cb-a2a2-b0b26811790a). | ||
### Reporting bugs | ||
If you think you've found a bug, please leave us | ||
[feedback](https://developer.ibm.com/community/groups/service/html/communityview?communityUuid=00a6a6d0-9601-44cb-a2a2-b0b26811790a). | ||
To help us fix the bug a log might be helpful. You can get a log by setting the | ||
environment variable `MQLIGHT_NODE_LOG` to `debug` and by collecting the output | ||
that goes to stderr when you run your application. | ||
## Release notes | ||
@@ -222,0 +283,0 @@ |
@@ -87,10 +87,2 @@ /* %Z% %W% %I% %E% %U% */ | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.error(err.message); | ||
process.exit(1); | ||
} | ||
}); | ||
// once connection is acquired, receive messages from the required topic | ||
@@ -118,2 +110,25 @@ client.on('connected', function() { | ||
}); | ||
client.on('malformed', function(data, delivery) { | ||
console.log('*** received malformed message (%d)', (++i)); | ||
console.log(data); | ||
console.log(delivery); | ||
}); | ||
}); | ||
client.on('error', function(error) { | ||
console.log('*** error ***'); | ||
if (error) { | ||
if (error.message) console.log('message: '+error.message); | ||
else if (error.stack) console.log(error.stack); | ||
} | ||
console.log('exiting.'); | ||
process.exit(1); | ||
}); | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.error(err.message); | ||
process.exit(1); | ||
} | ||
}); |
@@ -90,10 +90,2 @@ /* %Z% %W% %I% %E% %U% */ | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.error(err.message); | ||
process.exit(1); | ||
} | ||
}); | ||
// once connection is acquired, send messages | ||
@@ -108,3 +100,3 @@ client.on('connected', function() { | ||
var body = messages[i]; | ||
client.send(topic, body, function(err, data, delivery) { | ||
client.send(topic, body, function(err, topic, data, options) { | ||
if (err) { | ||
@@ -117,3 +109,2 @@ console.error('Problem with send request: %s', err.message); | ||
console.log(data); | ||
console.log(delivery); | ||
} | ||
@@ -135,1 +126,19 @@ // if there are more messages pending, send the next in <delay> seconds | ||
}); | ||
client.on('error', function(error) { | ||
console.log('*** error ***'); | ||
if (error) { | ||
if (error.message) console.log('message: '+error.message); | ||
else if (error.stack) console.log(error.stack); | ||
} | ||
console.log('exiting.'); | ||
process.exit(1); | ||
}); | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.error(err.message); | ||
process.exit(1); | ||
} | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Install scripts
Supply chain riskInstall scripts are run when the package is installed. The majority of malware in npm is hidden in install scripts.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 7 instances in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
8715358
53
2180
285
1
21
10