message-hub-rest
Advanced tools
Comparing version 2.0.0-alpha.0 to 2.0.0
/** | ||
* Copyright 2015 IBM | ||
* Copyright 2015, 2018 IBM | ||
* | ||
@@ -18,3 +18,3 @@ * Licensed under the Apache License, Version 2.0 (the "License"); | ||
* Licensed Materials - Property of IBM | ||
* © Copyright IBM Corp. 2015 | ||
* © Copyright IBM Corp. 2015, 2018 | ||
*/ | ||
@@ -213,365 +213,2 @@ 'use strict'; | ||
/** | ||
* Client.prototype.produce | ||
* Produces a message on the specified topic. | ||
* | ||
* @param topic The topic name for the new messages to be produced on. | ||
* @param message The message object to be pushed to the service. | ||
* @returns Response object generated by the service. | ||
*/ | ||
Client.prototype.produce = function(topic, message) { | ||
if(typeof(topic) === 'string' && topic.length > 0 && message) { | ||
var requestOptions = { | ||
host: this.url.hostname, | ||
port: this.url.port, | ||
path: '/topics/' + topic, | ||
method: 'POST', | ||
headers: { | ||
'X-Auth-Token': this.apiKey, | ||
'Content-Type': CONTENT_TYPE | ||
}, | ||
}; | ||
var input = null; | ||
if(message instanceof MessageList) { | ||
input = message.messages; | ||
} else if(Array.isArray(message)) { | ||
input = new MessageList(message).messages; | ||
} else if(typeof(message) === 'object') { | ||
if(!message.records) { | ||
input = new MessageList([JSON.stringify(message)]).messages; | ||
} else { | ||
input = message; | ||
} | ||
} else { | ||
input = new MessageList([message]).messages; | ||
} | ||
return Utils.request(requestOptions, { https: this.config.https }, input); | ||
} else { | ||
var deferred = Q.defer(); | ||
if(!message) { | ||
deferred.reject(new ReferenceError('Provided message object cannot be undefined.')); | ||
} else if(typeof(topic) !== 'string') { | ||
deferred.reject(new TypeError('Provided topic parameter must be of type "string".')); | ||
} else { | ||
deferred.reject(new Error('Provided topic parameter must have length greater than zero.')); | ||
} | ||
return deferred.promise; | ||
} | ||
}; | ||
/** | ||
* Client.prototype.consume | ||
* Configures a consumer instance of the specified name. | ||
* | ||
* @param groupName The name of the consumer group. If the group doesn't exist, one is created. | ||
* @param instanceName The name of the consumer group instance. | ||
* @param options Options provided to configure the consumer group. | ||
* @returns Consumer group instance object. | ||
*/ | ||
Client.prototype.consume = function(groupName, instanceName, options) { | ||
return new Client.ConsumerInstance(this, groupName, instanceName, options, false).configure(); | ||
}; | ||
/** | ||
* Client.ConsumerInstance (Constructor) | ||
* Constructs a new ConsumerInstance object. Usually not created directly, | ||
* it is recommended to use Client.prototype.consume. | ||
* | ||
* @param client The Message Hub client object associated with this consumer instance. | ||
* @param groupName The name of the consumer group the instance is attached to. | ||
* @param instanceName The name to assign to the consumer instance. | ||
* @param options Options provided to configure the consumer group. | ||
* @param configure Flag used to automatically configure the instance. | ||
*/ | ||
var ConsumerInstance = Client.ConsumerInstance = function(client, groupName, instanceName, options, configure) { | ||
if(!(client instanceof Client)) { | ||
throw new TypeError('Provided client parameter must be an instance of Client.'); | ||
} | ||
configure = typeof configure === 'boolean' ? configure : true; | ||
this.client = client; | ||
this.groupName = groupName; | ||
this.instanceName = instanceName; | ||
this.options = options; | ||
// a map to keep track of topic -> request promise. we will use this to | ||
// ensure that only one request per consumer isntance per topic is pending | ||
// at a given time - avoids kafka errors | ||
this._topicGetMap = {}; | ||
if(configure) { | ||
this.configure(); | ||
} | ||
}; | ||
/** | ||
* Client.ConsumerInstance.prototype.configure | ||
* Configures the consumer instance by sending a request to | ||
* the Kafka REST service. | ||
*/ | ||
ConsumerInstance.prototype.configure = function() { | ||
var deferred = Q.defer(); | ||
var instance = this; | ||
var requestOptions = { | ||
host: this.client.url.hostname, | ||
port: this.client.url.port, | ||
path: "/consumers/" + this.groupName, | ||
method: 'POST', | ||
headers: { | ||
'X-Auth-Token': this.client.apiKey, | ||
'Content-Type': CONTENT_TYPE | ||
}, | ||
}; | ||
var configOptions = { | ||
https: this.client.config.https, | ||
ignoredErrorCodes: [409], | ||
}; | ||
if(!this.groupName) { | ||
deferred.reject(new ReferenceError('Provided groupName parameter cannot be undefined.')); | ||
} else if(!(typeof(this.groupName) === 'string' && this.groupName.length > 0)) { | ||
deferred.reject(new TypeError('Provided groupName parameter must be a non-zero length string.')); | ||
} | ||
if(!this.instanceName) { | ||
deferred.reject(new ReferenceError('Provided instanceName parameter cannot be undefined.')); | ||
} else if(!(typeof(this.groupName) === 'string' && this.instanceName.length > 0)) { | ||
deferred.reject(new TypeError('Provided instanceName parameter must be a non-zero length string.')); | ||
} | ||
// Generate the message to be sent to the server. Copy all | ||
// properties to the message defined in the opts variable. | ||
var message = { | ||
'id': this.instanceName, | ||
'format': 'binary', | ||
}; | ||
if(this.options !== undefined) { | ||
for(var option in this.options) { | ||
if(!message.hasOwnProperty(option)) { | ||
message[option] = this.options[option]; | ||
} | ||
} | ||
} | ||
if(deferred.promise.inspect().state !== 'rejected') | ||
{ | ||
deferred.resolve(); | ||
return Utils.request(requestOptions, configOptions, message) | ||
.then(function(responseData) { | ||
instance.client.consumerInstances[instance.groupName + instance.instanceName] = instance; | ||
return [instance, responseData]; | ||
}); | ||
} else { | ||
return deferred.promise; | ||
} | ||
}; | ||
/** | ||
* Client.ConsumerInstance.prototype.get | ||
* Retrieves messages from the provided topic name. | ||
* | ||
* @param topicName The topic to retrieve messages from. | ||
* @param toValue Unwraps base64 encoded messages, if set to true. | ||
* @returns Promise object which will be fulfilled when the request | ||
* to the service resolves. | ||
*/ | ||
ConsumerInstance.prototype.get = function(topicName, toValue) { | ||
// if there's already a promise pending for a given topic, return that instead | ||
// of issuing a new get. this prevents the Kafka error: `Another request is in | ||
// progress for consumer "test:test1". Request may be retried when response is | ||
// received for the previous request. | ||
if (this._topicGetMap[topicName]) { | ||
return this._topicGetMap[topicName]; | ||
} | ||
// if we don't have a request already "out", proceed with business as usual | ||
if(toValue !== undefined && typeof(toValue) !== 'boolean') { | ||
console.warn('Provided parameter toValue is not a boolean, defaulting to true.'); | ||
toValue = true; | ||
} else if(toValue === undefined) { | ||
// Silently default to true | ||
toValue = true; | ||
} | ||
var req = Utils.request({ | ||
host: this.client.url.hostname, | ||
port: this.client.url.port, | ||
path: '/consumers/' + this.groupName + '/instances/' + this.instanceName + '/topics/' + topicName, | ||
method: 'GET', | ||
headers: { | ||
'X-Auth-Token': this.client.apiKey, | ||
'Accept': CONTENT_TYPE | ||
}, | ||
}, { | ||
https: this.client.config.https, | ||
}); | ||
// Convert the response to pure values without Kafka metadata. | ||
var prom; | ||
if(toValue) { | ||
prom = req.then(function(data) { | ||
var output = []; | ||
for(var index in data) { | ||
output.push(new Buffer(data[index].value, 'base64').toString('utf8')); | ||
} | ||
return output; | ||
}); | ||
} else { | ||
prom = req; | ||
} | ||
// Keep track of the promise in our topic map, make sure to clean up when | ||
// request is complete | ||
this._topicGetMap[topicName] = prom.then(function() { | ||
delete this._topicGetMap[topicName]; | ||
// return all arguments to ensure proper promise chaining | ||
return arguments; | ||
}.bind(this)); | ||
return prom; | ||
}; | ||
/** | ||
* Client.ConsumerInstance.prototype.remove | ||
* Removes the current consumer instance from the server. | ||
*/ | ||
ConsumerInstance.prototype.remove = function() { | ||
var client = this.client; | ||
var groupName = this.groupName; | ||
var instanceName = this.instanceName; | ||
return Utils.request({ | ||
host: this.client.url.hostname, | ||
port: this.client.url.port, | ||
path: '/consumers/' + this.groupName + '/instances/' + this.instanceName, | ||
method: 'DELETE', | ||
headers: { | ||
'X-Auth-Token': this.client.apiKey, | ||
'Accept': CONTENT_TYPE, | ||
}, | ||
}, { | ||
https: this.client.config.https, | ||
acceptedResponseCodes: [200, 204], | ||
}).then(function(response) { | ||
delete client.consumerInstances[groupName + instanceName]; | ||
// Defer response to next handler | ||
return response; | ||
}); | ||
}; | ||
/** | ||
* Client.MessageList (Constructor) | ||
* Constructs a new instances of the MessageList class. An initial | ||
* array of values can be provided to pre-populate the list with | ||
* messages. | ||
* | ||
* @param init Array of values to be added to the list of messages. | ||
* @returns An instance of MessageList, which allows for chaining other methods. | ||
*/ | ||
var MessageList = Client.MessageList = function(init) { | ||
this.messages = { 'records': [] }; | ||
// Push initial values into the records array. If the | ||
// provided value is undefined, just ignore it. | ||
if(init !== undefined) { | ||
if(Array.isArray(init)) { | ||
for(var index in init) { | ||
this.push(init[index]); | ||
} | ||
} else { | ||
throw new TypeError('The provided init variable must be an array.'); | ||
} | ||
} | ||
this.__defineGetter__('length', function() { | ||
return this.messages.records.length; | ||
}); | ||
}; | ||
/** | ||
* Client.MessageList.prototype.push | ||
* Convenience wrapper to add messages to 'messages.records'. Also converts all values | ||
* to base64 strings so they can be sent through the service. | ||
* | ||
* @param message The message to be added to the list. | ||
* @returns The current MessageList instance, which allows for chaining other methods. | ||
*/ | ||
MessageList.prototype.push = function(message) { | ||
if(message) { | ||
// Convert inputs to base64 strings | ||
if(Buffer.isBuffer(message)) { | ||
message = message.toString('base64'); | ||
} else if(typeof message === 'object') { | ||
message = new Buffer(JSON.stringify(message)).toString('base64'); | ||
} else { | ||
message = new Buffer(message).toString('base64'); | ||
} | ||
this.messages.records.push({ 'value': message }); | ||
// Return object instance to allow chaining. | ||
return this; | ||
} else { | ||
throw new ReferenceError('Provided message is undefined.'); | ||
} | ||
}; | ||
/** | ||
* Client.MessageList.prototype.pop | ||
* Convenience wrapper for 'messages.records.pop()', but returns the current | ||
* MessageList instance to allow chaining of methods. | ||
* | ||
* @returns Current MessageList instance. | ||
*/ | ||
MessageList.prototype.pop = function() { | ||
this.messages.records.pop(); | ||
return this; | ||
}; | ||
/** | ||
* Client.MessageList.prototype.get | ||
* Retrieves a message from the message list, converting it back to | ||
* its original representation (i.e. JSON string -> object) | ||
* | ||
* @param index The index of the list to retrieve. | ||
* @returns Original representation of value stored in records array. | ||
*/ | ||
MessageList.prototype.get = function(index) { | ||
if(typeof index !== 'number') { | ||
throw new TypeError('The index provided must be a number.'); | ||
} else { | ||
index = index | 0; | ||
} | ||
if(this.messages && this.messages.records) { | ||
if(index >= 0 && index < this.messages.records.length) { | ||
var base64String = this.messages.records[index].value; | ||
var originalString = new Buffer(base64String, 'base64').toString(); | ||
var result; | ||
// Attempt to convert to JSON | ||
try { | ||
result = JSON.parse(originalString); | ||
} catch(e) { | ||
// SyntaxError thrown if the string it not JSON. In | ||
// this case, just assign the converted string to the result. | ||
result = originalString; | ||
} | ||
return result; | ||
} else { | ||
throw new RangeError('Index ' + index + ' is out of bounds. The number of records in the list is ' + this.messages.records.length + '.'); | ||
} | ||
} else { | ||
throw new ReferenceError('The messages object is undefined or null.'); | ||
} | ||
} | ||
module.exports = Client; |
/** | ||
* Copyright 2015 IBM | ||
* Copyright 2015, 2018 IBM | ||
* | ||
@@ -18,3 +18,3 @@ * Licensed under the Apache License, Version 2.0 (the "License"); | ||
* Licensed Materials - Property of IBM | ||
* © Copyright IBM Corp. 2015 | ||
* © Copyright IBM Corp. 2015, 2018 | ||
*/ | ||
@@ -154,4 +154,3 @@ 'use strict'; | ||
//console.log(error); | ||
return error; | ||
}; |
{ | ||
"name": "message-hub-rest", | ||
"version": "2.0.0-alpha.0", | ||
"version": "2.0.0", | ||
"description": "Node.js module for connecting to the Kafka REST interface of IBM Message Hub.", | ||
@@ -11,3 +11,3 @@ "main": "lib/messagehub.js", | ||
"engines": { | ||
"node" : ">=0.12" | ||
"node": ">=0.12" | ||
}, | ||
@@ -14,0 +14,0 @@ "scripts": { |
110
README.md
## IBM Message Hub REST API Client Module | ||
IBM Message Hub is a scalable, distributed, high throughput message bus to unite your on-premise and off-premise cloud technologies. You can wire micro-services together using open protocols, connect stream data to analytics to realise powerful insight and feed event data to multiple applications to react in real time. | ||
This Node.js module provides a high-level API by which you can interact with the REST API exposed by the Message Hub service. | ||
This Node.js module provides a high-level API by which you can interact with the REST API exposed by the Message Hub service. | ||
__Note__: | ||
From version `2.0.0` onwards, the consume and produce APIs have been removed as the Message Hub Enterprise offering does not support them. Customers should instead use [node-rdkafka](https://www.npmjs.com/package/rdkafka) for Kafka API-level messaging. | ||
## Getting Started | ||
@@ -28,3 +32,2 @@ ### Prerequisites | ||
var instance = new MessageHub(services); | ||
var consumerInstance; | ||
var topicName = 'mytopic'; | ||
@@ -34,36 +37,7 @@ | ||
.then(function(response) { | ||
return instance.consume('my_consumer_group', 'my_consumer_instance', { 'auto.offset.reset': 'largest' }); | ||
console.log('Topic created.'); | ||
}) | ||
.then(function(response) { | ||
consumerInstance = response[0]; | ||
}) | ||
.fail(function(error) { | ||
throw new Error(error); | ||
}); | ||
var receivedMessages = 0; | ||
var produceInterval = setInterval(function() { | ||
var list = new MessageHub.MessageList([ | ||
"This is the message text" | ||
]); | ||
instance.produce('mytopic', list.messages) | ||
.then(function() { | ||
return consumerInstance.get('mytopic'); | ||
}) | ||
.then(function(data) { | ||
console.log(data); | ||
receivedMessages++; | ||
if(receivedMessages >= 3) { | ||
clearInterval(produceInterval); | ||
return consumerInstance.remove(); | ||
} | ||
}) | ||
.fail(function(error) { | ||
throw new Error(error); | ||
}); | ||
}, 1000); | ||
``` | ||
@@ -100,73 +74,1 @@ | ||
Returns a Promise object which will be fulfilled when the request to the service resolves. | ||
### MessageHub.prototype.produce(topic, message) | ||
Produces a message on the specified topic. | ||
* `topic` - (String) (required), the topic name for the new messages to be produced on. | ||
* `message` - (String|Array|MessageHub.MessageList|Object) (required), the message object to be pushed to the service. | ||
Returns a Promise object which will be fulfilled when the request to the service resolves. | ||
### MessageHub.prototype.consume(groupName, instanceName, [options]) | ||
Configures a consumer instance of the specified name. | ||
* `groupName` - (String) (required), the name of the consumer group. If the group doesn't exist, one is created. | ||
* `instanceName` - (String) (required), the name of the consumer group instance. | ||
* `options` - (Object) (optional), additional options which can be provided to configure the consumer group. | ||
Returns an instance of `MessageHub.ConsumerInstance`. | ||
### MessageHub.ConsumerInstance(client, groupName, instanceName, [options], [configure]) | ||
Constructs a new ConsumerInstance object. Usually not created directly, it is recommended to use `Client.prototype.consume`. | ||
* `groupName` - (String) (required), the name of the consumer group. If the group doesn't exist, one is created. | ||
* `instanceName` - (String) (required), the name of the consumer group instance. | ||
* `options` - (Object) (optional), additional options which can be provided to configure the consumer group. | ||
* `configure` - (Boolean) (optional), flag used to automatically configure the instance. Defaults to true. | ||
Returns an instance of `MessageHub.ConsumerInstance`. | ||
### MessageHub.ConsumerInstance.prototype.configure() | ||
Configures the consumer instance by sending a request to the Kafka REST service. | ||
Returns a Promise object which will be fulfilled when the request to the service resolves. | ||
### MessageHub.ConsumerInstance.prototype.get(topicName [toValue]) | ||
Retrieves a message from the provided topic name. | ||
* `topicName` - (String) (required), the topic to retrieve messages from. | ||
* `toValue` - (Boolean) (optional), unwraps base64 encoded messages, if true. Defaults to true. | ||
Returns a Promise object which will be fulfilled when the request to the service resolves. | ||
### MessageHub.ConsumerInstance.prototype.remove() | ||
Removes the current consumer instance from the server. | ||
Returns a Promise object which will be fulfilled when the request to the service resolves. | ||
### MessageHub.MessageList([init]) | ||
Constructs a new instances of the MessageList class. An initial array of values can be provided to pre-populate the list with messages. | ||
* `init` - (Array<String>) (optional), array of values to be added to the list of messages. | ||
Returns an instance of `MessageHub.MessageList`, which allows for chaining of other methods. | ||
### MessageHub.MessageList.prototype.length | ||
Returns the number of messages in the message list. | ||
### MessageHub.MessageList.prototype.messages | ||
Returns the list of messages added to the `MessageList` instance. | ||
### MessageHub.MessageList.prototype.push(message) | ||
Convenience wrapper to add messages to 'messages.records'. Also converts all values to base64 strings so they can be sent through the service. | ||
* `message` - (String) (required), the message to be added to the list. | ||
Returns the current `MessageHub.MessageList` instance, which allows for chaining other methods. | ||
### MessageHub.MessageList.prototype.pop() | ||
Convenience wrapper for 'messages.records.pop()', but returns the current MessageList instance to allow chaining of methods. | ||
Returns the current `MessageHub.MessageList` instance, which allows for chaining other methods. | ||
### MessageHub.MessageList.prototype.get(index) | ||
Retrieves a message from the message list, converting it back to its original representation (i.e. JSON string -> object) | ||
* `index` - (number) (required) The index of the list to retrieve. | ||
Returns the original representation of value stored in records array. |
@@ -28,47 +28,6 @@ /** | ||
describe('[Client] API', function() { | ||
var instance; | ||
var message; | ||
beforeEach(function() { | ||
instance = new MessageHub(services); | ||
message = { | ||
'records': [{ 'value': new Buffer('Test string').toString('base64') }] | ||
}; | ||
}); | ||
it('prototype.produce', function() { | ||
Expect(instance.hasOwnProperty('produce')); | ||
Expect(instance.produce('mytopic', message).constructor.name).to.eql('Promise'); | ||
}); | ||
it('prototype.consume', function() { | ||
Expect(instance.hasOwnProperty('consume')); | ||
Expect(instance.consume('test_consumer_group', 'test_consumer_instance', { }) | ||
.constructor.name).to.eql('Promise'); | ||
}); | ||
it('Client.ConsumerInstance', function() { | ||
Expect(MessageHub.ConsumerInstance).to.be.a('function'); | ||
}); | ||
it('Client.MessageList', function() { | ||
Expect(MessageHub.MessageList).to.be.a('function'); | ||
}); | ||
}); | ||
describe('[Client] Functionality', function() { | ||
it('Provides relevant service functions on prototype chain', function() { | ||
var instance = new MessageHub(services); | ||
Expect(instance.produce).to.be.a('function'); | ||
Expect(instance.consume).to.be.a('function'); | ||
}); | ||
it('Provides helper classes on module.exports', function() { | ||
Expect(MessageHub).to.be.a('function'); | ||
Expect(MessageHub.MessageList).to.be.a('function'); | ||
Expect(MessageHub.ConsumerInstance).to.be.a('function'); | ||
}); | ||
@@ -75,0 +34,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
1
57966
11
1251
72