mqlight-dev
Advanced tools
Comparing version 0.1.2014032000 to 0.1.2014041000
805
mqlight.js
@@ -24,3 +24,3 @@ /* %Z% %W% %I% %E% %U% */ | ||
var proton = require('./lib/' + _system + '/proton'); | ||
} catch(_) { | ||
} catch (_) { | ||
if ('MODULE_NOT_FOUND' === _.code) { | ||
@@ -36,6 +36,8 @@ throw new Error('mqlight.js is not currently supported on ' + _system); | ||
var uuid = require('node-uuid'); | ||
} catch(_) { | ||
} catch (_) { | ||
var uuid = require(require.resolve('npm') + '/../../node_modules/request/node_modules/node-uuid'); | ||
} | ||
var url = require('url'); | ||
var validClientIdChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789%/._'; | ||
@@ -50,25 +52,33 @@ /** @constant {number} */ | ||
/** | ||
* Creates an MQ Light client instance. | ||
* | ||
* Constructs a new Client object in the disconnected state. | ||
* <p> | ||
* Options: | ||
* - **host**, (String, default: localhost), the remote hostname to which we will connect. | ||
* - **port**, (Number, default: 5672), the remote tcp port to connect to. | ||
* - **clientId (String, default: AUTO_[0-9a-f]{7}), a unique identifier for this client. | ||
* <ul> | ||
* <li>service - Required; when an instance of String this is a URL to connect to. When an instance of Array this is an array of URLs to connect to - each will be tried in turn | ||
* until either a connection is successfully established to one of the URLs, or all of the URLs have been tried. When an instance of Function is specified for this argument, then | ||
* function is invoked each time the client wants to establish a connection (e.g. for any of the state transitions, on the state diagram shown earlier on this page, which lead to | ||
* the 'connected' state). The function must return either an instance of String or Array, which are treated in the manner described previously.</li> | ||
* <li>id - Optional; an identifier that is used to identify this client. Two different instances of Client can have the same id, however only one instance can be connected | ||
* to the MQ Light service at a given moment in time. If two instances of Client have the same id and both try to connect then the first instance to establish its connection | ||
* is diconnected in favour of the second instance. If this property is not specified then the client will generate a probabalistically unique ID.</li> | ||
* <li>user - Optional; the user name to use for authentication to the MQ Light service.</li> | ||
* <li>password - Optional; the password to use for authentication.</li> | ||
* </ul> | ||
* | ||
* @param {Object} [options] (optional) map of options for the client. | ||
* @param {Object} | ||
* options - (optional) map of options for the client. | ||
* @returns {Object} The created Client object. | ||
*/ | ||
exports.createClient = function(options) { | ||
var opt = (typeof options == 'object') ? options : {}; | ||
var client = new Client(opt.host, opt.port, opt.clientId, opt.userName, opt.password); | ||
// FIXME: make this actually check driver/engine connection state | ||
process.nextTick(function() { | ||
client.emit('connected', true); | ||
}); | ||
var client = new Client(opt.service, opt.id, opt.user, opt.password); | ||
process.setMaxListeners(0); | ||
process.once('exit', function() { | ||
if (client) { | ||
client.send(); | ||
client.close(); | ||
if (client.messenger) client.messenger.send(); | ||
if (client && client.getState() == 'connected') { | ||
client.disconnect(); | ||
} | ||
}); | ||
return client; | ||
@@ -78,26 +88,132 @@ }; | ||
/** | ||
* Function to take a single service URL, or array of service URLs, validate them, returning an array of service URLs. | ||
* | ||
* @param {String|Array} | ||
* service - Required; when an instance of String this is a URL to connect to. When an instance of Array this is an array of URLs to connect to | ||
* @returns {Array} Valid service URLs, with port number added as appropriate. | ||
* @throws TypeError | ||
* If service is not a string or array type. | ||
* @throws Error | ||
* if an unsupported or invalid URL specified. | ||
*/ | ||
var generateServiceList = function(service) { | ||
// Validate the parameter list length | ||
if (arguments.length > 1) { | ||
throw new Error('Too many arguments'); | ||
} | ||
// Ensure the service is an Array | ||
var inputServiceList = []; | ||
if (!service) { | ||
throw new Error("service is undefined"); | ||
} else if (service instanceof Function) { | ||
throw new TypeError("service cannot be a function"); | ||
} else if (service instanceof Array) { | ||
if (service.length === 0) { | ||
throw new Error("service array is empty"); | ||
} | ||
inputServiceList = service; | ||
} else if (typeof service === 'string') { | ||
inputServiceList[0] = service; | ||
} else { | ||
throw new TypeError("service must be a string or array type"); | ||
} | ||
// Validate the list of URLs for the service, inserting default values as necessary | ||
// Expected format for each URL is: amqp://host:port or amqps://host:port (port is optional, defaulting to 5672) | ||
var serviceList = []; | ||
for ( var i = 0; i < inputServiceList.length; i++) { | ||
var serviceUrl = url.parse(inputServiceList[i]); | ||
var protocol = serviceUrl.protocol; | ||
var host = serviceUrl.hostname; | ||
var port = serviceUrl.port; | ||
var path = serviceUrl.path; | ||
var auth = serviceUrl.auth; | ||
var msg; | ||
// check for auth details | ||
if ( auth ) { | ||
msg = "Unsupported URL, auth details e.g user:pass@localhost should be supplied as options for createClient"; | ||
throw new Error(msg); | ||
} | ||
// Check we are trying to use the amqp protocol | ||
if ( !protocol || (protocol !== "amqp:" && protocol !== "amqps:" ) ) { | ||
msg = "Unsupported URL '" + inputServiceList[i] + "' specified for service. Only the amqp or amqps protocol are supported."; | ||
throw new Error(msg); | ||
} | ||
// Check we have a hostname | ||
if ( !host ) { | ||
msg = "Unsupported URL ' "+ inputServiceList[i] + "' specified for service. Must supply a hostname."; | ||
throw new Error(msg); | ||
} | ||
// Set default port if not supplied | ||
if ( !port ) { | ||
port = "5672"; | ||
} | ||
// Check for no path | ||
if ( path ) { | ||
msg = "Unsupported URL '" + inputServiceList[i] + "' paths ("+ path +" )can't be part of a service URL."; | ||
throw new Error(msg); | ||
} | ||
serviceList[i] = protocol + "//" + host + ":" + port; | ||
} | ||
return serviceList; | ||
}; | ||
/** | ||
* Represents an MQ Light client instance. | ||
* | ||
* @param {string} host - (optional) the remote host to which we will connect. | ||
* @param {number} [port] - (optional) the remote tcp port to connect to. | ||
* @param {string} [clientId] - (optional) unique identifier for this client. | ||
* @param {String|Array|Function} | ||
* service - Required; when an instance of String this is a URL to connect to. When an instance of Array this is an array of URLs to connect to - each will be tried in | ||
* turn until either a connection is successfully established to one of the URLs, or all of the URLs have been tried. When an instance of Function is specified for this | ||
* argument, then function is invoked each time the client wants to establish a connection. The function must return either an instance of String or Array, which are | ||
* treated in the manner described previously. | ||
* @param {String} | ||
* id - Optional; an identifier that is used to identify this client. To different instances of Client can have the same id, however only one instance can be subscribed to | ||
* any particular topic at a given moment in time. If two instances of Client have the same id and both try to subscribe to the same topic pattern (or topic pattern and | ||
* share name) then the first instance to establish its subscription be unsubscribed from the topic, in favour of the second instance. If this property is not specified | ||
* then the client will generate a probabalistically unique ID. | ||
* @param {String} | ||
* user - Optional; the user name to use for authentication to the MQ Light service. | ||
* @param {String} | ||
* password - Optional; the password to use for authentication. | ||
* @throws {TypeError} | ||
* If one of the specified parameters in of the wrong type. | ||
* @throws {RangeError} | ||
* If the specified id is too long. | ||
* @throws {Error} | ||
* If service is not specified or one of the parameters is incorrectly formatted. | ||
* @constructor | ||
*/ | ||
var Client = function(host, port, clientId, userName, password) { | ||
var Client = function(service, id, user, password) { | ||
EventEmitter.call(this); | ||
if (!host) host = "localhost"; | ||
if (!port) port = 5672; | ||
if (!clientId) clientId = "AUTO_" + uuid.v4().substring(0, 7); | ||
if (!password) password = ""; | ||
if (clientId.length > 48) { | ||
var msg = "Client identifier '" + clientId + "' is longer than the " + | ||
"maximum ID length of 48."; | ||
throw new Error(msg); | ||
// Ensure the service is an Array or Function | ||
var serviceList, serviceFunction; | ||
if (service instanceof Function) { | ||
serviceFunction = service; | ||
} else { | ||
serviceList = generateServiceList(service); | ||
} | ||
/* currently client ids are restricted to a fixed char set, reject those not in it*/ | ||
var i; | ||
for (i in clientId) { | ||
if (validClientIdChars.indexOf(clientId[i]) == -1) { | ||
var err = "Client Identifier '" + clientId + "' contains invalid char: " + | ||
clientId[i]; | ||
// If client id has not been specified then generate an id | ||
if (!id) id = "AUTO_" + uuid.v4().substring(0, 7); | ||
// If the client id is incorrectly formatted then throw an error | ||
if (id.length > 48) { | ||
var msg = "Client identifier '" + id + "' is longer than the maximum ID length of 48."; | ||
throw new RangeError(msg); | ||
} | ||
// If client id is not a string then throw an error | ||
if (typeof id !== 'string') { | ||
throw new TypeError("Client identifier must be a string type"); | ||
} | ||
// currently client ids are restricted to a fixed char set, reject those not in it | ||
for ( var i in id) { | ||
if (validClientIdChars.indexOf(id[i]) == -1) { | ||
var err = "Client Identifier '" + id + "' contains invalid char: " + id[i]; | ||
throw new Error(err); | ||
@@ -107,10 +223,20 @@ } | ||
this.brokerUrl = "amqp://" + host + ':' + port; | ||
this.clientId = clientId; | ||
if (userName){ | ||
this.messenger = new proton.ProtonMessenger(clientId, userName, password); | ||
} else { | ||
this.messenger = new proton.ProtonMessenger(clientId); | ||
// Validate user and password parameters, when specified | ||
if (user && typeof user !== 'string') { | ||
throw new TypeError("user must be a string type"); | ||
} | ||
this.messenger.start(); | ||
if (password && typeof password !== 'string') { | ||
throw new TypeError("password must be a string type"); | ||
} | ||
// Save the required data as client fields | ||
this.serviceFunction = serviceFunction; | ||
this.serviceList = serviceList; | ||
this.id = id; | ||
this.user = user; | ||
this.password = password; | ||
// Set the initial state to disconnected | ||
this.state = 'disconnected'; | ||
this.service = undefined; | ||
}; | ||
@@ -120,158 +246,525 @@ util.inherits(Client, EventEmitter); | ||
/** | ||
* @callback sendCallback | ||
* @param {string} err - an error message if a problem occurred. | ||
* @param {ProtonMessage} message - the message that was sent. | ||
* @callback connectCallback | ||
* @param {String} | ||
* err - an error message if a problem occurred. | ||
*/ | ||
/** | ||
* Sends the given MQ Light message object to its address. | ||
* Attempts to connect the client to the MQ Light service - as per the options specified when the client object was created by the mqlight.createClient() method. Connects to the MQ | ||
* Light service. | ||
* <p> | ||
* This method is asynchronous and calls the optional callback function when: a) the client has successfully connected to the MQ Light service, or b) the client.disconnect() method | ||
* has been invoked before a successful connection could be established, or c) the client could not connect to the MQ Light service. The callback function should accept a single | ||
* argument which will be set to undefined if the client connects successfully or an Error object if the client cannot connect to the MQ Light service or is disconnected before a | ||
* connection can be established. | ||
* <p> | ||
* Calling this method will result in either the 'connected' event being emitted or an 'error' event being emitted (if a connection cannot be established). These events are | ||
* guaranteed to be dispatched on a subsequent pass through the event loop - so, to avoid missing an event, the corresponding listeners must be registered either prior to calling | ||
* client.connect() or on the same tick as calling client.connect(). | ||
* <p> | ||
* If this method is invoked while the client is in 'connecting', 'connected' or 'retrying' states then the method will complete without performing any work or changing the state | ||
* of the client. If this method is invoked while the client is in 'disconnecting' state then it's effect will be deferred until the client has transitioned into 'disconnected' | ||
* state. | ||
* | ||
* @param {string} topic - the topic to which the message will be sent. | ||
* @param {Object} message - the message body to be sent. | ||
* @param {Object} [options] (optional) map of additional options for the send. | ||
* @param {sendCallback} cb - (optional) callback to be notified of | ||
* errors and completion. | ||
* @param {connectCallback} | ||
* callback - (optional) callback to be notified of errors and completion. | ||
* @returns {Object} The instance of client that it is invoked on - allowing for chaining of other method calls on the client object. | ||
* @throws {TypeError} | ||
* If callback is specified and is not a function. | ||
*/ | ||
Client.prototype.send = function(topic, message, options, cb) { | ||
var messenger = this.messenger; | ||
var callback = (typeof options === 'function') ? options : cb; | ||
try { | ||
if (!message) { | ||
throw new Error('Cannot send undefined'); | ||
} else if (message instanceof Function) { | ||
throw new Error('Cannot send a function'); | ||
Client.prototype.connect = function(callback) { | ||
// Validate the parameter list length | ||
if (arguments.length > 1) { | ||
throw new Error('Too many arguments'); | ||
} | ||
// Performs the connect | ||
var performConnect = function(client, callback) { | ||
var currentState = client.getState(); | ||
// if we are not disconnected or disconnecting return with the client object | ||
if ( currentState !== "disconnected" ){ | ||
if ( currentState === "disconnecting" ){ | ||
process.nextTick(function() { | ||
stillDisconnecting(client, callback); | ||
}); | ||
} else { | ||
process.nextTick(function() { | ||
if (callback) { | ||
callback(undefined); | ||
} | ||
}); | ||
return client; | ||
} | ||
} | ||
client.state = "connecting"; | ||
if (client.user) { | ||
var password = !client.password ? "" : client.password; | ||
client.messenger = new proton.ProtonMessenger(client.id, client.user, password); | ||
} else { | ||
var protonMsg = new proton.ProtonMessage(); | ||
protonMsg.address = this.brokerUrl; | ||
if (topic) protonMsg.address += '/' + topic; | ||
if (typeof message === 'string') { | ||
protonMsg.body = message; | ||
protonMsg.contentType = 'text/plain'; | ||
} else if (message instanceof Buffer) { | ||
protonMsg.body = message; | ||
protonMsg.contentType = 'application/octet-stream'; | ||
client.messenger = new proton.ProtonMessenger(client.id); | ||
} | ||
client.messenger.start(); | ||
// Obtain the list of services for connect | ||
var serviceList; | ||
try { | ||
if (client.serviceFunction) { | ||
var service = client.serviceFunction(); | ||
serviceList = generateServiceList(service); | ||
} else { | ||
protonMsg.body = JSON.stringify(message); | ||
protonMsg.contentType = 'application/json'; | ||
serviceList = client.serviceList; | ||
} | ||
messenger.put(protonMsg); | ||
// setup a timer to trigger the callback once the msg has been sent | ||
var untilSendComplete = function(protonMsg, callback) { | ||
messenger.send(); | ||
if (messenger.hasSent(protonMsg)) { | ||
messenger.send(); | ||
process.nextTick(function() { | ||
callback(undefined, protonMsg); | ||
}); | ||
return; | ||
} catch (e) { | ||
//if there is an error getting the service list disconnect | ||
client.disconnect(); | ||
var err = new Error(e.message); | ||
process.nextTick(function() { | ||
if (callback) { | ||
callback(err); | ||
} | ||
// if msg not yet sent and still running, check again in a second or so | ||
if (!messenger.stopped) { | ||
setImmediate(untilSendComplete, protonMsg, callback); | ||
} | ||
}; | ||
// if a callback is set, start the timer to trigger it | ||
if (callback) { | ||
setImmediate(untilSendComplete, protonMsg, callback); | ||
} | ||
client.emit('error', err); | ||
}); | ||
return; | ||
} | ||
} catch (e) { | ||
var client = this; | ||
var err = new Error(e.message); | ||
// TODO need to somehow actually connect selecting an available service from the list | ||
client.service = serviceList[0]; | ||
// Indicate that we're connected | ||
client.state = 'connected'; | ||
process.nextTick(function() { | ||
client.emit('connected', true); | ||
}); | ||
if (callback) { | ||
process.nextTick(function() { | ||
callback(undefined); | ||
}); | ||
} | ||
return; | ||
}; | ||
if (callback && !(callback instanceof Function)) { | ||
throw new TypeError("callback must be a function"); | ||
} | ||
var client = this; | ||
var stillDisconnecting = function(client, callback){ | ||
if ( client.getState() === "disconnecting" ){ | ||
process.nextTick(function() { | ||
stillDisconnecting(client, callback); | ||
}); | ||
} else { | ||
process.nextTick(function() { | ||
performConnect(client, callback); | ||
}); | ||
} | ||
}; | ||
process.nextTick(function() { | ||
performConnect(client, callback); | ||
}); | ||
return client; | ||
}; | ||
/** | ||
* @callback disconnectCallback | ||
* @param {String} | ||
* err - an error message if a problem occurred. | ||
*/ | ||
/** | ||
* Disconnects the client from the MQ Light service, implicitly closing any subscriptions that the client has open. The 'disconnected' event will be emitted once the client has | ||
* disconnected. | ||
* <p> | ||
* This method works asynchronously, and will invoke the optional callback once the client has disconnected. The callback function should accept a single Error argument, although | ||
* there is currently no situation where this will be set to any other value than undefined. | ||
* <p> | ||
* Calling client.disconnect() when the client is in 'disconnecting' or 'disconnected' state has no effect. Calling client.disconnect() from any other state results in the client | ||
* disconnecting and the 'disconnected' event being generated. | ||
* | ||
* @param {disconnectCallback} | ||
* callback - (optional) callback to be notified of errors and completion. | ||
* @returns {Object} The instance of client that it is invoked on - allowing for chaining of other method calls on the client object. | ||
* @throws {TypeError} | ||
* If callback is specified and is not a function. | ||
*/ | ||
Client.prototype.disconnect = function(callback) { | ||
var client = this; | ||
// Validate the parameter list length | ||
if (arguments.length > 1) { | ||
throw new Error('Too many arguments'); | ||
} | ||
// Performs the disconnect | ||
var performDisconnect = function(client, callback) { | ||
client.state = 'disconnecting'; | ||
if (client.messenger) { | ||
client.messenger.stop(); | ||
delete client.messenger; | ||
client.messenger = undefined; | ||
} | ||
// Indicate that we've disconnected | ||
client.state = 'disconnected'; | ||
process.nextTick(function() { | ||
client.emit('disconnected', true); | ||
}); | ||
if (callback) { | ||
process.nextTick(function() { | ||
callback(undefined); | ||
}); | ||
} | ||
return; | ||
}; | ||
if (callback && !(callback instanceof Function)) { | ||
throw new TypeError("callback must be a function"); | ||
} | ||
//just return if already disconnected or in the process of disconnecting | ||
if ( client.getState() === "disconnected" || client.getState() === "disconnecting" ){ | ||
process.nextTick(function() { | ||
if (callback) { | ||
callback(err, protonMsg); | ||
callback(undefined); | ||
} | ||
if (err) client.emit('error', err); | ||
}); | ||
return client; | ||
} | ||
process.nextTick(function() { | ||
performDisconnect(client, callback); | ||
}); | ||
return client; | ||
}; | ||
/** | ||
* Disconnects this Client from the messaging server and frees the system | ||
* resources that it uses. Calling this method also implicitly closes any | ||
* Destination objects that have been created using the client's | ||
* {@linkClient#createDestination} method. | ||
* @returns {String} The identifier associated with the client. This will either be: a) the identifier supplied as the id property of the options object supplied to the | ||
* mqlight.createClient() method, or b) an automatically generated identifier if the id property was not specified when the client was created. | ||
*/ | ||
Client.prototype.close = function() { | ||
this.messenger.stop(); | ||
Client.prototype.getId = function() { | ||
var id = this.id; | ||
return id; | ||
}; | ||
/** | ||
* @callback destCallback | ||
* @param {string} err - an error message if a problem occurred. | ||
* @param {string} address - the address that was subscribed to. | ||
* @returns {String} The URL of the service to which the client is currently connected (when the client is in 'connected') - otherwise (for all other client | ||
* states) undefined is returned. | ||
*/ | ||
Client.prototype.getService = function() { | ||
if (this.state === 'connected') { | ||
var service = this.service; | ||
return service; | ||
} else { | ||
return undefined; | ||
} | ||
}; | ||
/** | ||
* Create a {@link Destination} and associates it with a <code>pattern</code>. | ||
* @returns {String} The current state of the client - can will be one of the following string values: 'connected', 'connecting', 'disconnected', 'disconnecting', or 'retrying'. | ||
*/ | ||
Client.prototype.getState = function() { | ||
var state = this.state; | ||
return state; | ||
}; | ||
/** | ||
* @returns {Boolean} <code>true</code> if a connection has been made (i.e. state is connected), <code>false</code> otherwise. | ||
*/ | ||
Client.prototype.hasConnected = function() { | ||
return this.state === 'connected'; | ||
}; | ||
/** | ||
* @callback sendCallback | ||
* @param {String} | ||
* err - an error message if a problem occurred. message - the message that was sent. ????? | ||
*/ | ||
/** | ||
* Sends a message to the MQ Light service. | ||
* | ||
* The <code>pattern</code> is matched against the <code>address</code> | ||
* attribute of messages sent to the IBM MQ Light messaging service to | ||
* determine whether a particular message will be delivered to a particular | ||
* <code>Destination</code>. | ||
* | ||
* @param pattern used to match against the <code>address</code> attribute of | ||
* messages to determine if a copy of the message should be delivered to the | ||
* <code>Destination</code>. | ||
* @param {Object} [options] (optional) map of additional options for the | ||
* destination. | ||
* @param {destCallback} cb - (optional) callback to be notified of errors | ||
* @return a {@link Destination} which will emit 'message' events on arrival. | ||
* @param {String} | ||
* topic - Identifies which subscriptions receive the message - based on the pattern argument supplied when the subscription is created. | ||
* @param {Object} | ||
* data - The message body to be sent. Any object or javascript primitive type although certain types receive special treatment: String and Buffer objects are treated as | ||
* immutable as they pass through the MQ Light service. E.g. if the sender sends a String, the receiver receives a String. undefined and Function objects will be rejected | ||
* with an error. | ||
* @param {Object} | ||
* options (Optional) Used to specify options that affect how the MQ Light service processes the message. | ||
* @param {sendCallback} | ||
* callback - (Optional) callback to be notified of errors and completion. The callback function accepts a single Error argument which is used to indicate whether the | ||
* message was successfully delivered to the MQ Light service. The callback may be omitted if a qos of 0 (at most once) is used - however it must be present if a qos of 1 | ||
* (at least once) is specified, otherwise | ||
* @throws {TypeError} | ||
* If one of the specified parameters is of the wrong type. | ||
* @throws {Error} | ||
* If the topic or data parameter is undefined. | ||
*/ | ||
Client.prototype.createDestination = function(pattern, options, cb) { | ||
var messenger = this.messenger; | ||
var address = this.brokerUrl + '/' + pattern; | ||
var emitter = new EventEmitter(); | ||
var callback = (typeof options === 'function') ? options : cb; | ||
Client.prototype.send = function(topic, data, options, callback) { | ||
try { | ||
messenger.subscribe(address); | ||
} catch (e) { | ||
var err = new Error(e.message); | ||
// Validate the parameter list length | ||
if (arguments.length > 4) { | ||
throw new Error('Too many arguments'); | ||
} | ||
// Validate the passed parameters | ||
if (topic === undefined) { | ||
throw new Error('Cannot send to undefined topic'); | ||
} else if (typeof topic !== 'string') { | ||
throw new TypeError('topic must be a string type'); | ||
} | ||
if (data === undefined) { | ||
throw new Error('Cannot send undefined data'); | ||
} else if (data instanceof Function) { | ||
throw new TypeError('Cannot send a function'); | ||
} | ||
// Validate the remaining optional parameters, assigning local variables to the appropriate parameter | ||
var optionsOption, callbackOption; | ||
if (options) { | ||
if (options instanceof Function) { | ||
callbackOption = options; | ||
} else { | ||
if (options instanceof Object) { | ||
optionsOption = options; | ||
} else { | ||
throw new TypeError('options must be an object type'); | ||
} | ||
} | ||
if (callback) { | ||
if (callbackOption) { | ||
throw new TypeError('Invalid forth argument, callback already matched for third argument'); | ||
} | ||
if (callback instanceof Function) { | ||
callbackOption = callback; | ||
} else { | ||
throw new TypeError('callback must be a function type'); | ||
} | ||
} | ||
} | ||
setImmediate(function() { | ||
if (callback) { | ||
callback(err, address); | ||
// Ensure we have attempted a connect | ||
if (!this.hasConnected()) throw new Error('not connected'); | ||
// Send the data as a message to the specified topic | ||
var messenger = this.messenger; | ||
var protonMsg; | ||
try { | ||
protonMsg = new proton.ProtonMessage(); | ||
protonMsg.address = this.getService(); | ||
if (topic) { | ||
// need to encode the topic component but / have meaning that shouldn't be encoded | ||
var topicLevels = topic.split("/"); | ||
var encodedTopicLevels = topicLevels.map(function(tLevel) { | ||
return encodeURIComponent(tLevel); | ||
}); | ||
var encodedTopic = encodedTopicLevels.join("/"); | ||
protonMsg.address += '/' + encodedTopic; | ||
} | ||
if (typeof data === 'string') { | ||
protonMsg.body = data; | ||
protonMsg.contentType = 'text/plain'; | ||
} else if (data instanceof Buffer) { | ||
protonMsg.body = data; | ||
protonMsg.contentType = 'application/octet-stream'; | ||
} else { | ||
protonMsg.body = JSON.stringify(data); | ||
protonMsg.contentType = 'application/json'; | ||
} | ||
messenger.put(protonMsg); | ||
messenger.send(); | ||
// setup a timer to trigger the callback once the msg has been sent, or immediately if no message to be sent | ||
var untilSendComplete = function(protonMsg, sendCallback) { | ||
if (messenger.hasSent(protonMsg)) { | ||
if (sendCallback) { | ||
var message = { | ||
address : decodeURIComponent(protonMsg.address), | ||
contentType : protonMsg.contentType, | ||
body : protonMsg.body | ||
}; | ||
setImmediate(sendCallback, undefined, message); | ||
} | ||
return; | ||
} | ||
if (err) emitter.emit('error', err); | ||
// if msg not yet sent and still running, check again in a second or so | ||
if (!messenger.stopped) { | ||
messenger.send(); | ||
setImmediate(untilSendComplete, protonMsg, callbackOption); | ||
} | ||
}; | ||
// start the timer to trigger it to keep sending until msg has sent | ||
setImmediate(untilSendComplete, protonMsg, callbackOption); | ||
} catch (e) { | ||
var client = this; | ||
var err = new Error(e.message); | ||
client.disconnect(); | ||
process.nextTick(function() { | ||
if (callbackOption) { | ||
callbackOption(err, protonMsg); | ||
} | ||
if (err) client.emit('error', err); | ||
}); | ||
} | ||
}; | ||
if (!err) { | ||
var check_for_messages = function() { | ||
var messages = messenger.receive(50); | ||
if (messages.length > 0) { | ||
for (var i=0, tot=messages.length; i < tot; i++) { | ||
var protonMsg = messages[i]; | ||
var message = { address: protonMsg.address, | ||
contentType: protonMsg.contentType, | ||
body: protonMsg.body | ||
}; | ||
/** | ||
* @callback destCallback | ||
* @param {String} | ||
* err - an error message if a problem occurred. | ||
* @param {String} | ||
* address - the address that was subscribed to. | ||
*/ | ||
// if body is a JSON'ified object, try to parse it back to a js obj | ||
if (message.contentType === 'application/json') { | ||
try { | ||
var obj = JSON.parse(message.body); | ||
message.body = obj; | ||
} catch(_) {console.log(_);} | ||
} | ||
emitter.emit('message', message); | ||
} | ||
/** | ||
* Constructs a subscription object and starts the emission of message events each time a message arrives, at the MQ Light service, that matches pattern. | ||
* | ||
* @param {String} | ||
* pattern used to match against the <code>address</code> attribute of messages to determine if a copy of the message should be delivered to the <code>Destination</code>. | ||
* @param {String} | ||
* share. (Optional) Specifies whether to create or join a shared subscription for which messages are anycast amongst the present subscribers. If this argument is omitted | ||
* then the subscription will be unshared (e.g. private to the client). | ||
* @param {Object} | ||
* [options] (optional) The options argument accepts an object with properties to set. | ||
* @param {destCallback} | ||
* callback - (optional) Invoked when the subscription request has been processed. A single Error parameter is passed to this function to indicate whether the subscription | ||
* request was successful, and if not: why not. | ||
* @returns {@link Client} the instance of the client this was called on which will emit 'message' events on arrival. | ||
* @throws {TypeError} | ||
* If one of the specified parameters is of the wrong type. | ||
* @throws {Error} | ||
* If the pattern parameter is undefined. | ||
*/ | ||
Client.prototype.subscribe = function(pattern, share, options, callback) { | ||
// Validate the parameter list length | ||
if (arguments.length > 4) { | ||
throw new Error('Too many arguments'); | ||
} | ||
// Validate the pattern parameter | ||
if (pattern === undefined) { | ||
throw new Error('Cannot subscribe to undefined pattern'); | ||
} else if (typeof pattern !== 'string') { | ||
throw new TypeError('pattern must be a string type'); | ||
} | ||
// Validate the remaining optional parameters, assigning local variables to the appropriate parameter | ||
var shareOption, optionsOption, callbackOption; | ||
if (share) { | ||
if (typeof share === 'string') { | ||
shareOption = "share:" + share + ":"; | ||
} else if (share instanceof Function) { | ||
shareOption = "private:"; | ||
callbackOption = share; | ||
} else if (share instanceof Object) { | ||
shareOption = "private:"; | ||
optionsOption = share; | ||
} else { | ||
throw new TypeError('share must be a string type'); | ||
} | ||
if (options) { | ||
if (callbackOption) { | ||
throw new TypeError('Invalid third argument, callback already matched for second argument'); | ||
} | ||
if (options instanceof Function) { | ||
callbackOption = options; | ||
} else { | ||
if (optionsOption) { | ||
throw new TypeError('Invalid third argument, options already matched for second argument'); | ||
} | ||
if (!messenger.stopped) { | ||
setImmediate(check_for_messages); | ||
if (options instanceof Object) { | ||
optionsOption = options; | ||
} else { | ||
throw new TypeError('options must be an object type'); | ||
} | ||
}; | ||
process.nextTick(function() { | ||
if (!messenger.stopped) { | ||
check_for_messages(); | ||
} | ||
if (callback) { | ||
if (callbackOption) { | ||
throw new TypeError('Invalid forth argument, callback already matched for third argument'); | ||
} | ||
}); | ||
if (callback instanceof Function) { | ||
callbackOption = callback; | ||
} else { | ||
throw new TypeError('callback must be a function type'); | ||
} | ||
} | ||
} | ||
} else { | ||
shareOption = "private:"; | ||
} | ||
return emitter; | ||
// Ensure we have attempted a connect | ||
if (!this.hasConnected()) throw new Error('not connected'); | ||
// Subscribe using the specified pattern and share options | ||
var messenger = this.messenger; | ||
var address = this.getService() + '/' + shareOption + pattern; | ||
var client = this; | ||
var err; | ||
try { | ||
messenger.subscribe(address); | ||
} catch (e) { | ||
err = new Error(e.message); | ||
} | ||
setImmediate(function() { | ||
if (callbackOption) { | ||
callbackOption(err, address); | ||
} | ||
if (err){ | ||
client.emit('error', err); | ||
client.disconnect(); | ||
} | ||
}); | ||
if (!err) { | ||
var check_for_messages = function() { | ||
var messages = messenger.receive(50); | ||
if (messages.length > 0) { | ||
for ( var i = 0, tot = messages.length; i < tot; i++) { | ||
var protonMsg = messages[i]; | ||
var message = { | ||
address : protonMsg.address, | ||
contentType : protonMsg.contentType, | ||
body : protonMsg.body | ||
}; | ||
// if body is a JSON'ified object, try to parse it back to a js obj | ||
if (message.contentType === 'application/json') { | ||
try { | ||
var obj = JSON.parse(message.body); | ||
message.body = obj; | ||
} catch (_) { | ||
console.log(_); | ||
} | ||
} | ||
client.emit('message', message); | ||
} | ||
} | ||
if (!messenger.stopped) { | ||
setImmediate(check_for_messages); | ||
} | ||
}; | ||
process.nextTick(function() { | ||
if (!messenger.stopped) { | ||
check_for_messages(); | ||
} | ||
}); | ||
} | ||
return client; | ||
}; | ||
/* ------------------------------------------------------------------------- */ |
{ | ||
"name": "mqlight-dev", | ||
"version": "0.1.2014032000", | ||
"description": "IBM MQ Light Client Module unstable development stream", | ||
"version": "0.1.2014041000", | ||
"description": "IBM MQ Light Client Module", | ||
"main": "mqlight.js", | ||
@@ -6,0 +6,0 @@ "private": false, |
@@ -24,3 +24,3 @@ /* %Z% %W% %I% %E% %U% */ | ||
var nopt = require('nopt'); | ||
} catch(_) { | ||
} catch (_) { | ||
var nopt = require(require.resolve('npm') + '/../../node_modules/nopt'); | ||
@@ -31,3 +31,5 @@ } | ||
var types = {}; | ||
var shorthands = { h: ["--help"] }; | ||
var shorthands = { | ||
h : [ "--help" ] | ||
}; | ||
var parsed = nopt(types, shorthands, process.argv, 2); | ||
@@ -63,3 +65,3 @@ var remain = parsed.argv.remain; | ||
if (hostname.indexOf('/') > -1) { | ||
topic = hostname.substring(hostname.indexOf('/')+1); | ||
topic = hostname.substring(hostname.indexOf('/') + 1); | ||
hostname = hostname.substring(0, hostname.indexOf('/')); | ||
@@ -76,18 +78,30 @@ } else { | ||
var service = "amqp://" + hostname + ":" + port; | ||
// connect client to broker | ||
var opts = { host: hostname, port: port, clientId: "recv.js"}; | ||
var opts = { | ||
service : service, | ||
id : "recv.js" | ||
}; | ||
var client = mqlight.createClient(opts); | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.log(err); | ||
} | ||
}); | ||
// once connection is acquired, receive messages from the required topic | ||
client.on('connected', function() { | ||
console.log("Connected to " + hostname + ":" + port + " using client-id " + | ||
client.clientId); | ||
console.log("Connected to %s using client-id %s", service, client.getId()); | ||
// now subscribe to topic for publications | ||
var destination = client.createDestination(topic, function(err, address) { | ||
var destination = client.subscribe(topic, function(err, address) { | ||
if (err) { | ||
console.error('Problem with createDestination request: ' + err.message); | ||
console.error('Problem with subscribe request: ' + err.message); | ||
process.exit(0); | ||
} | ||
if (address) { | ||
console.log("Subscribing to " + address); | ||
console.log("Subscribed to %s", address); | ||
} | ||
@@ -99,6 +113,5 @@ }); | ||
destination.on('message', function(msg) { | ||
console.log('# received message (' + (++i) + ')'); | ||
console.log('# received message (%d)', (++i)); | ||
console.log(msg); | ||
}); | ||
}); | ||
@@ -24,3 +24,3 @@ /* %Z% %W% %I% %E% %U% */ | ||
var nopt = require('nopt'); | ||
} catch(_) { | ||
} catch (_) { | ||
var nopt = require(require.resolve('npm') + '/../../node_modules/nopt'); | ||
@@ -30,4 +30,11 @@ } | ||
// parse the commandline arguments | ||
var types = { address: String, delay: Number }; | ||
var shorthands = { a: ["--address"], d: ["--delay"], h: ["--help"] }; | ||
var types = { | ||
address : String, | ||
delay : Number | ||
}; | ||
var shorthands = { | ||
a : [ "--address" ], | ||
d : [ "--delay" ], | ||
h : [ "--help" ] | ||
}; | ||
var parsed = nopt(types, shorthands, process.argv, 2); | ||
@@ -60,3 +67,3 @@ | ||
if (hostname.indexOf('/') > -1) { | ||
topic = hostname.substring(hostname.indexOf('/')+1); | ||
topic = hostname.substring(hostname.indexOf('/') + 1); | ||
hostname = hostname.substring(0, hostname.indexOf('/')); | ||
@@ -73,4 +80,9 @@ } else { | ||
var service = "amqp://" + hostname + ":" + port; | ||
// create client to connect to broker with | ||
var opts = { host: hostname, port: port, clientId: "send.js"}; | ||
var opts = { | ||
service : service, | ||
id : "send.js" | ||
}; | ||
var client = mqlight.createClient(opts); | ||
@@ -80,3 +92,3 @@ | ||
var remain = parsed.argv.remain; | ||
var data = (remain.length > 0) ? remain : ["Hello World!"]; | ||
var data = (remain.length > 0) ? remain : [ "Hello World!" ]; | ||
@@ -86,6 +98,13 @@ // insert a delay between sends if requested | ||
// Make the connection | ||
client.connect(function(err) { | ||
if (err) { | ||
console.log(err); | ||
} | ||
}); | ||
// once connection is acquired, send messages | ||
client.on('connected', function() { | ||
console.log("Connected to " + hostname + ":" + port + " using client-id " + | ||
client.clientId); | ||
console.log("Connected to %s using client-id %s", service, client.getId()); | ||
console.log("Publishing to: %s", topic); | ||
@@ -105,14 +124,14 @@ // queue all messages for sending | ||
} | ||
}); | ||
// if there are more messages pending, send the next in <delay> seconds time | ||
if (data.length > ++i) { | ||
if (delay > 0) { | ||
setTimeout(sendNextMessage, delay); | ||
// if there are more messages pending, send the next in <delay> seconds | ||
if (data.length > ++i) { | ||
if (delay > 0) { | ||
setTimeout(sendNextMessage, delay); | ||
} else { | ||
setImmediate(sendNextMessage); | ||
} | ||
} else { | ||
setImmediate(sendNextMessage); | ||
// wait a short time before exiting | ||
setTimeout(process.exit, 1500, 0); | ||
} | ||
} else { | ||
// wait a short time before exiting | ||
setTimeout(process.exit, 1500, 0); | ||
} | ||
}); | ||
}; | ||
@@ -122,2 +141,1 @@ | ||
}); | ||
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
2480443
910
3