Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

message-hub-rest

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

message-hub-rest - npm Package Compare versions

Comparing version 2.0.0-alpha.0 to 2.0.0

367

lib/messagehub.js
/**
* Copyright 2015 IBM
* Copyright 2015, 2018 IBM
*

@@ -18,3 +18,3 @@ * Licensed under the Apache License, Version 2.0 (the "License");

* Licensed Materials - Property of IBM
* © Copyright IBM Corp. 2015
* © Copyright IBM Corp. 2015, 2018
*/

@@ -213,365 +213,2 @@ 'use strict';

/**
* Client.prototype.produce
* Produces a message on the specified topic.
*
* @param topic The topic name for the new messages to be produced on.
* @param message The message object to be pushed to the service.
* @returns Response object generated by the service.
*/
Client.prototype.produce = function(topic, message) {
if(typeof(topic) === 'string' && topic.length > 0 && message) {
var requestOptions = {
host: this.url.hostname,
port: this.url.port,
path: '/topics/' + topic,
method: 'POST',
headers: {
'X-Auth-Token': this.apiKey,
'Content-Type': CONTENT_TYPE
},
};
var input = null;
if(message instanceof MessageList) {
input = message.messages;
} else if(Array.isArray(message)) {
input = new MessageList(message).messages;
} else if(typeof(message) === 'object') {
if(!message.records) {
input = new MessageList([JSON.stringify(message)]).messages;
} else {
input = message;
}
} else {
input = new MessageList([message]).messages;
}
return Utils.request(requestOptions, { https: this.config.https }, input);
} else {
var deferred = Q.defer();
if(!message) {
deferred.reject(new ReferenceError('Provided message object cannot be undefined.'));
} else if(typeof(topic) !== 'string') {
deferred.reject(new TypeError('Provided topic parameter must be of type "string".'));
} else {
deferred.reject(new Error('Provided topic parameter must have length greater than zero.'));
}
return deferred.promise;
}
};
/**
* Client.prototype.consume
* Configures a consumer instance of the specified name.
*
* @param groupName The name of the consumer group. If the group doesn't exist, one is created.
* @param instanceName The name of the consumer group instance.
* @param options Options provided to configure the consumer group.
* @returns Consumer group instance object.
*/
Client.prototype.consume = function(groupName, instanceName, options) {
return new Client.ConsumerInstance(this, groupName, instanceName, options, false).configure();
};
/**
* Client.ConsumerInstance (Constructor)
* Constructs a new ConsumerInstance object. Usually not created directly,
* it is recommended to use Client.prototype.consume.
*
* @param client The Message Hub client object associated with this consumer instance.
* @param groupName The name of the consumer group the instance is attached to.
* @param instanceName The name to assign to the consumer instance.
* @param options Options provided to configure the consumer group.
* @param configure Flag used to automatically configure the instance.
*/
var ConsumerInstance = Client.ConsumerInstance = function(client, groupName, instanceName, options, configure) {
if(!(client instanceof Client)) {
throw new TypeError('Provided client parameter must be an instance of Client.');
}
configure = typeof configure === 'boolean' ? configure : true;
this.client = client;
this.groupName = groupName;
this.instanceName = instanceName;
this.options = options;
// a map to keep track of topic -> request promise. we will use this to
// ensure that only one request per consumer isntance per topic is pending
// at a given time - avoids kafka errors
this._topicGetMap = {};
if(configure) {
this.configure();
}
};
/**
* Client.ConsumerInstance.prototype.configure
* Configures the consumer instance by sending a request to
* the Kafka REST service.
*/
ConsumerInstance.prototype.configure = function() {
var deferred = Q.defer();
var instance = this;
var requestOptions = {
host: this.client.url.hostname,
port: this.client.url.port,
path: "/consumers/" + this.groupName,
method: 'POST',
headers: {
'X-Auth-Token': this.client.apiKey,
'Content-Type': CONTENT_TYPE
},
};
var configOptions = {
https: this.client.config.https,
ignoredErrorCodes: [409],
};
if(!this.groupName) {
deferred.reject(new ReferenceError('Provided groupName parameter cannot be undefined.'));
} else if(!(typeof(this.groupName) === 'string' && this.groupName.length > 0)) {
deferred.reject(new TypeError('Provided groupName parameter must be a non-zero length string.'));
}
if(!this.instanceName) {
deferred.reject(new ReferenceError('Provided instanceName parameter cannot be undefined.'));
} else if(!(typeof(this.groupName) === 'string' && this.instanceName.length > 0)) {
deferred.reject(new TypeError('Provided instanceName parameter must be a non-zero length string.'));
}
// Generate the message to be sent to the server. Copy all
// properties to the message defined in the opts variable.
var message = {
'id': this.instanceName,
'format': 'binary',
};
if(this.options !== undefined) {
for(var option in this.options) {
if(!message.hasOwnProperty(option)) {
message[option] = this.options[option];
}
}
}
if(deferred.promise.inspect().state !== 'rejected')
{
deferred.resolve();
return Utils.request(requestOptions, configOptions, message)
.then(function(responseData) {
instance.client.consumerInstances[instance.groupName + instance.instanceName] = instance;
return [instance, responseData];
});
} else {
return deferred.promise;
}
};
/**
* Client.ConsumerInstance.prototype.get
* Retrieves messages from the provided topic name.
*
* @param topicName The topic to retrieve messages from.
* @param toValue Unwraps base64 encoded messages, if set to true.
* @returns Promise object which will be fulfilled when the request
* to the service resolves.
*/
ConsumerInstance.prototype.get = function(topicName, toValue) {
// if there's already a promise pending for a given topic, return that instead
// of issuing a new get. this prevents the Kafka error: `Another request is in
// progress for consumer "test:test1". Request may be retried when response is
// received for the previous request.
if (this._topicGetMap[topicName]) {
return this._topicGetMap[topicName];
}
// if we don't have a request already "out", proceed with business as usual
if(toValue !== undefined && typeof(toValue) !== 'boolean') {
console.warn('Provided parameter toValue is not a boolean, defaulting to true.');
toValue = true;
} else if(toValue === undefined) {
// Silently default to true
toValue = true;
}
var req = Utils.request({
host: this.client.url.hostname,
port: this.client.url.port,
path: '/consumers/' + this.groupName + '/instances/' + this.instanceName + '/topics/' + topicName,
method: 'GET',
headers: {
'X-Auth-Token': this.client.apiKey,
'Accept': CONTENT_TYPE
},
}, {
https: this.client.config.https,
});
// Convert the response to pure values without Kafka metadata.
var prom;
if(toValue) {
prom = req.then(function(data) {
var output = [];
for(var index in data) {
output.push(new Buffer(data[index].value, 'base64').toString('utf8'));
}
return output;
});
} else {
prom = req;
}
// Keep track of the promise in our topic map, make sure to clean up when
// request is complete
this._topicGetMap[topicName] = prom.then(function() {
delete this._topicGetMap[topicName];
// return all arguments to ensure proper promise chaining
return arguments;
}.bind(this));
return prom;
};
/**
* Client.ConsumerInstance.prototype.remove
* Removes the current consumer instance from the server.
*/
ConsumerInstance.prototype.remove = function() {
var client = this.client;
var groupName = this.groupName;
var instanceName = this.instanceName;
return Utils.request({
host: this.client.url.hostname,
port: this.client.url.port,
path: '/consumers/' + this.groupName + '/instances/' + this.instanceName,
method: 'DELETE',
headers: {
'X-Auth-Token': this.client.apiKey,
'Accept': CONTENT_TYPE,
},
}, {
https: this.client.config.https,
acceptedResponseCodes: [200, 204],
}).then(function(response) {
delete client.consumerInstances[groupName + instanceName];
// Defer response to next handler
return response;
});
};
/**
* Client.MessageList (Constructor)
* Constructs a new instances of the MessageList class. An initial
* array of values can be provided to pre-populate the list with
* messages.
*
* @param init Array of values to be added to the list of messages.
* @returns An instance of MessageList, which allows for chaining other methods.
*/
var MessageList = Client.MessageList = function(init) {
this.messages = { 'records': [] };
// Push initial values into the records array. If the
// provided value is undefined, just ignore it.
if(init !== undefined) {
if(Array.isArray(init)) {
for(var index in init) {
this.push(init[index]);
}
} else {
throw new TypeError('The provided init variable must be an array.');
}
}
this.__defineGetter__('length', function() {
return this.messages.records.length;
});
};
/**
* Client.MessageList.prototype.push
* Convenience wrapper to add messages to 'messages.records'. Also converts all values
* to base64 strings so they can be sent through the service.
*
* @param message The message to be added to the list.
* @returns The current MessageList instance, which allows for chaining other methods.
*/
MessageList.prototype.push = function(message) {
if(message) {
// Convert inputs to base64 strings
if(Buffer.isBuffer(message)) {
message = message.toString('base64');
} else if(typeof message === 'object') {
message = new Buffer(JSON.stringify(message)).toString('base64');
} else {
message = new Buffer(message).toString('base64');
}
this.messages.records.push({ 'value': message });
// Return object instance to allow chaining.
return this;
} else {
throw new ReferenceError('Provided message is undefined.');
}
};
/**
* Client.MessageList.prototype.pop
* Convenience wrapper for 'messages.records.pop()', but returns the current
* MessageList instance to allow chaining of methods.
*
* @returns Current MessageList instance.
*/
MessageList.prototype.pop = function() {
this.messages.records.pop();
return this;
};
/**
* Client.MessageList.prototype.get
* Retrieves a message from the message list, converting it back to
* its original representation (i.e. JSON string -> object)
*
* @param index The index of the list to retrieve.
* @returns Original representation of value stored in records array.
*/
MessageList.prototype.get = function(index) {
if(typeof index !== 'number') {
throw new TypeError('The index provided must be a number.');
} else {
index = index | 0;
}
if(this.messages && this.messages.records) {
if(index >= 0 && index < this.messages.records.length) {
var base64String = this.messages.records[index].value;
var originalString = new Buffer(base64String, 'base64').toString();
var result;
// Attempt to convert to JSON
try {
result = JSON.parse(originalString);
} catch(e) {
// SyntaxError thrown if the string it not JSON. In
// this case, just assign the converted string to the result.
result = originalString;
}
return result;
} else {
throw new RangeError('Index ' + index + ' is out of bounds. The number of records in the list is ' + this.messages.records.length + '.');
}
} else {
throw new ReferenceError('The messages object is undefined or null.');
}
}
module.exports = Client;

5

lib/utils.js
/**
* Copyright 2015 IBM
* Copyright 2015, 2018 IBM
*

@@ -18,3 +18,3 @@ * Licensed under the Apache License, Version 2.0 (the "License");

* Licensed Materials - Property of IBM
* © Copyright IBM Corp. 2015
* © Copyright IBM Corp. 2015, 2018
*/

@@ -154,4 +154,3 @@ 'use strict';

//console.log(error);
return error;
};
{
"name": "message-hub-rest",
"version": "2.0.0-alpha.0",
"version": "2.0.0",
"description": "Node.js module for connecting to the Kafka REST interface of IBM Message Hub.",

@@ -11,3 +11,3 @@ "main": "lib/messagehub.js",

"engines": {
"node" : ">=0.12"
"node": ">=0.12"
},

@@ -14,0 +14,0 @@ "scripts": {

## IBM Message Hub REST API Client Module
IBM Message Hub is a scalable, distributed, high throughput message bus to unite your on-premise and off-premise cloud technologies. You can wire micro-services together using open protocols, connect stream data to analytics to realise powerful insight and feed event data to multiple applications to react in real time.
This Node.js module provides a high-level API by which you can interact with the REST API exposed by the Message Hub service.
This Node.js module provides a high-level API by which you can interact with the REST API exposed by the Message Hub service.
__Note__:
From version `2.0.0` onwards, the consume and produce APIs have been removed as the Message Hub Enterprise offering does not support them. Customers should instead use [node-rdkafka](https://www.npmjs.com/package/rdkafka) for Kafka API-level messaging.
## Getting Started

@@ -28,3 +32,2 @@ ### Prerequisites

var instance = new MessageHub(services);
var consumerInstance;
var topicName = 'mytopic';

@@ -34,36 +37,7 @@

.then(function(response) {
return instance.consume('my_consumer_group', 'my_consumer_instance', { 'auto.offset.reset': 'largest' });
console.log('Topic created.');
})
.then(function(response) {
consumerInstance = response[0];
})
.fail(function(error) {
throw new Error(error);
});
var receivedMessages = 0;
var produceInterval = setInterval(function() {
var list = new MessageHub.MessageList([
"This is the message text"
]);
instance.produce('mytopic', list.messages)
.then(function() {
return consumerInstance.get('mytopic');
})
.then(function(data) {
console.log(data);
receivedMessages++;
if(receivedMessages >= 3) {
clearInterval(produceInterval);
return consumerInstance.remove();
}
})
.fail(function(error) {
throw new Error(error);
});
}, 1000);
```

@@ -100,73 +74,1 @@

Returns a Promise object which will be fulfilled when the request to the service resolves.
### MessageHub.prototype.produce(topic, message)
Produces a message on the specified topic.
* `topic` - (String) (required), the topic name for the new messages to be produced on.
* `message` - (String|Array|MessageHub.MessageList|Object) (required), the message object to be pushed to the service.
Returns a Promise object which will be fulfilled when the request to the service resolves.
### MessageHub.prototype.consume(groupName, instanceName, [options])
Configures a consumer instance of the specified name.
* `groupName` - (String) (required), the name of the consumer group. If the group doesn't exist, one is created.
* `instanceName` - (String) (required), the name of the consumer group instance.
* `options` - (Object) (optional), additional options which can be provided to configure the consumer group.
Returns an instance of `MessageHub.ConsumerInstance`.
### MessageHub.ConsumerInstance(client, groupName, instanceName, [options], [configure])
Constructs a new ConsumerInstance object. Usually not created directly, it is recommended to use `Client.prototype.consume`.
* `groupName` - (String) (required), the name of the consumer group. If the group doesn't exist, one is created.
* `instanceName` - (String) (required), the name of the consumer group instance.
* `options` - (Object) (optional), additional options which can be provided to configure the consumer group.
* `configure` - (Boolean) (optional), flag used to automatically configure the instance. Defaults to true.
Returns an instance of `MessageHub.ConsumerInstance`.
### MessageHub.ConsumerInstance.prototype.configure()
Configures the consumer instance by sending a request to the Kafka REST service.
Returns a Promise object which will be fulfilled when the request to the service resolves.
### MessageHub.ConsumerInstance.prototype.get(topicName [toValue])
Retrieves a message from the provided topic name.
* `topicName` - (String) (required), the topic to retrieve messages from.
* `toValue` - (Boolean) (optional), unwraps base64 encoded messages, if true. Defaults to true.
Returns a Promise object which will be fulfilled when the request to the service resolves.
### MessageHub.ConsumerInstance.prototype.remove()
Removes the current consumer instance from the server.
Returns a Promise object which will be fulfilled when the request to the service resolves.
### MessageHub.MessageList([init])
Constructs a new instances of the MessageList class. An initial array of values can be provided to pre-populate the list with messages.
* `init` - (Array<String>) (optional), array of values to be added to the list of messages.
Returns an instance of `MessageHub.MessageList`, which allows for chaining of other methods.
### MessageHub.MessageList.prototype.length
Returns the number of messages in the message list.
### MessageHub.MessageList.prototype.messages
Returns the list of messages added to the `MessageList` instance.
### MessageHub.MessageList.prototype.push(message)
Convenience wrapper to add messages to 'messages.records'. Also converts all values to base64 strings so they can be sent through the service.
* `message` - (String) (required), the message to be added to the list.
Returns the current `MessageHub.MessageList` instance, which allows for chaining other methods.
### MessageHub.MessageList.prototype.pop()
Convenience wrapper for 'messages.records.pop()', but returns the current MessageList instance to allow chaining of methods.
Returns the current `MessageHub.MessageList` instance, which allows for chaining other methods.
### MessageHub.MessageList.prototype.get(index)
Retrieves a message from the message list, converting it back to its original representation (i.e. JSON string -> object)
* `index` - (number) (required) The index of the list to retrieve.
Returns the original representation of value stored in records array.

@@ -28,47 +28,6 @@ /**

describe('[Client] API', function() {
var instance;
var message;
beforeEach(function() {
instance = new MessageHub(services);
message = {
'records': [{ 'value': new Buffer('Test string').toString('base64') }]
};
});
it('prototype.produce', function() {
Expect(instance.hasOwnProperty('produce'));
Expect(instance.produce('mytopic', message).constructor.name).to.eql('Promise');
});
it('prototype.consume', function() {
Expect(instance.hasOwnProperty('consume'));
Expect(instance.consume('test_consumer_group', 'test_consumer_instance', { })
.constructor.name).to.eql('Promise');
});
it('Client.ConsumerInstance', function() {
Expect(MessageHub.ConsumerInstance).to.be.a('function');
});
it('Client.MessageList', function() {
Expect(MessageHub.MessageList).to.be.a('function');
});
});
describe('[Client] Functionality', function() {
it('Provides relevant service functions on prototype chain', function() {
var instance = new MessageHub(services);
Expect(instance.produce).to.be.a('function');
Expect(instance.consume).to.be.a('function');
});
it('Provides helper classes on module.exports', function() {
Expect(MessageHub).to.be.a('function');
Expect(MessageHub.MessageList).to.be.a('function');
Expect(MessageHub.ConsumerInstance).to.be.a('function');
});

@@ -75,0 +34,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc