Comparing version 3.0.0 to 3.1.0
@@ -10,2 +10,3 @@ /* | ||
const fs = require("fs"); | ||
const jwt = require("jsonwebtoken"); | ||
@@ -41,2 +42,6 @@ const HKEntity = require("../hkentity"); | ||
} | ||
else if(options.authSecret) | ||
{ | ||
this.options.auth = {bearer: HKDatasource.getAuthToken(options.authSecret, null)}; | ||
} | ||
@@ -1683,2 +1688,27 @@ | ||
/** | ||
* If the authSecret is passed, this method generates a signed jwt token for acessing the datasource | ||
* Otherwise, it retrieves an empty token | ||
* @param authSecret secret used to sign the jwt token (must be the same as the one used in the server) | ||
* @param expiresIn token expiration expressed in seconds or a string describing a time span zeit/m | ||
* Eg: 60, "2 days", "10h", "7d". A numeric value is interpreted as a seconds count. | ||
* If you use a string be sure you provide the time units (days, hours, etc), | ||
* otherwise milliseconds unit is used by default ("120" is equal to "120ms"). | ||
* If !expiresIn, the generated token never expires. | ||
*/ | ||
HKDatasource.getAuthToken = function (authSecret, expiresIn = 2 * 60) | ||
{ | ||
if (authSecret) | ||
{ | ||
if(!expiresIn) | ||
{ | ||
return jwt.sign({}, authSecret, {}) | ||
} | ||
return jwt.sign({}, authSecret, {expiresIn: expiresIn}) | ||
} | ||
return ''; | ||
} | ||
HKDatasource.prototype.getAuthToken = HKDatasource.getAuthToken; | ||
// ---- | ||
@@ -1685,0 +1715,0 @@ |
@@ -10,2 +10,3 @@ /* | ||
const request = require ('request-promise-native'); | ||
const ping = require ('ping'); | ||
@@ -22,2 +23,22 @@ const CONNECTION_REFUSED_ERROR = 'ECONNREFUSED'; | ||
/** | ||
* Instantiate an observer client. | ||
* This method will ask the target HKBase which observer configuration it supports (REST or RabbitMQ) | ||
* and will instantiate the appropriate client with the provided configurations. | ||
* @param {string} basePath HKBase basePath that is used to communicate with the server | ||
* @param {Object} observerOptions Observer options that are used to initialize the observer client | ||
* @param {boolean} observerOptions.isObserverService if true, the factory method will assume that it | ||
* is being called from the hkbaseObserverService and will ignore hkbaseObserverServiceUrl, hkbaseObserverServiceExternalUrl and hkbaseObserverConfiguration options | ||
* @param {string} observerOptions.hkbaseObserverServiceUrl url of the hkbaseObserverService to be used if hkbase does not inform one | ||
* @param {string} observerOptions.hkbaseObserverServiceExternalUrl external url of the hkbaseObserverService to be used if hkbaseObserverServiceUrl is not accessible to client | ||
* and hkbase does not provide one | ||
* @param {Object} observerOptions.hkbaseObserverConfiguration the ObserverConfiguration of this client, that includes which notification filters should be applied | ||
* and the desired notification format. The definition of ObserverConfiguration fields and possible filters is provided in OpenAPI/Swagger format at: | ||
* "https://github.ibm.com/keg-core/hkbase-observer/blob/main/docs/spec.yml" or acessing the hkbaseObserverServiceUrl through a browser (Swagger UI) | ||
* @param {string} observerOptions.certificate if hkbase uses RabbitMQ notification, the AMQP connection certificate can be passed if needed | ||
* @param {number} observerOptions.port if hkbase uses REST notification, the port te be used when instantiating express server for receiving callback requests can be passed | ||
* @param {string} observerOptions.address if hkbase uses REST notification, the address to be used when instantiating express server for receiving callback requests can be passed | ||
* @param {Object} hkbaseOptions options that sould be passed when making requests to hkbase (e.g., authentication) | ||
* @returns {Promise<ObserverClient>} instance of ObserverClient or one of its subclasses | ||
*/ | ||
async function createObserver (basePath, observerOptions = {}, hkbaseOptions = {}) | ||
@@ -42,3 +63,24 @@ { | ||
return new Klass (info, observerOptions); | ||
const isObserverService = observerOptions.isObserverService || false; | ||
const observerServiceParams = {}; | ||
observerServiceParams.defaultUrl = !isObserverService ? info.hkbaseObserverServiceUrl || observerOptions.hkbaseObserverServiceUrl : undefined; | ||
observerServiceParams.externalUrl = !isObserverService ? info.hkbaseObserverServiceExternalUrl || observerOptions.hkbaseObserverServiceExternalUrl : undefined; | ||
observerServiceParams.observerConfiguration = !isObserverService ? info.hkbaseObserverConfiguration || observerOptions.hkbaseObserverConfiguration : undefined; | ||
if(observerServiceParams.defaultUrl && observerServiceParams.observerConfiguration) | ||
{ | ||
let pingResult = await ping.promise.probe(observerServiceParams.defaultUrl, {timeout: 10}); | ||
if(pingResult.alive || !observerServiceParams.externalUrl) | ||
{ | ||
observerServiceParams.url = observerServiceParams.defaultUrl; | ||
} | ||
else | ||
{ | ||
observerServiceParams.url = observerServiceParams.externalUrl | ||
} | ||
let observerServiceInfo = JSON.parse(await request (`${observerServiceParams.url}/observer/info`)); | ||
observerServiceParams.heartbeatInterval = observerServiceInfo.heartbeat; | ||
} | ||
return new Klass (info, observerOptions, hkbaseOptions, observerServiceParams); | ||
} | ||
@@ -57,4 +99,3 @@ catch (err) | ||
console.error (err); | ||
console.error ('Creating a default client'); | ||
return new clients['default'](); | ||
console.error ('Could not initialize observer client'); | ||
} | ||
@@ -61,0 +102,0 @@ } |
@@ -8,4 +8,5 @@ /* | ||
module.exports.DefaultObserverClient = require('./observerclient'); | ||
module.exports.RestObserverClient = require('./restobserverclient'); | ||
module.exports.RabbitMQObserverClient = require('./rabbitmqobserverclient') | ||
module.exports.DefaultObserverClient = require('./observerclient'); | ||
module.exports.ConfigurableObserverClient = require('./configurableobserverclient'); | ||
module.exports.RestObserverClient = require('./restobserverclient'); | ||
module.exports.RabbitMQObserverClient = require('./rabbitmqobserverclient') |
@@ -7,3 +7,5 @@ /* | ||
'use strict'; | ||
/** | ||
* Abstract class with basic behaviour and API of an observer client | ||
*/ | ||
class ObserverClient | ||
@@ -13,5 +15,12 @@ { | ||
{ | ||
if (this.constructor == ObserverClient) { | ||
throw new Error("Abstract classes can't be instantiated."); | ||
} | ||
this._handlers = []; | ||
} | ||
/** | ||
* retrieves client type | ||
* @returns {string} client type | ||
*/ | ||
static getType () | ||
@@ -22,15 +31,38 @@ { | ||
/*** | ||
* Initialize client and start calling handlers when notifications are received | ||
* @returns {Promise} | ||
*/ | ||
async init () | ||
{ | ||
return Promise.resolve(); | ||
throw new Error("Abstract Method has no implementation"); | ||
} | ||
addHandler (cb) | ||
/** | ||
* Deinitilize client and stop receiving notifications | ||
* @returns {Promise} | ||
*/ | ||
async deinit() | ||
{ | ||
if (typeof (cb) === 'function') | ||
throw new Error("Abstract Method has no implementation"); | ||
} | ||
/** | ||
* Add handler function to be called when notifications are received | ||
* Can be called more than once | ||
* All added handlers are called for all notifications recieved | ||
* @param {Function} callback handler function to be called when notifications are received | ||
*/ | ||
addHandler (callback) | ||
{ | ||
if (typeof (callback) === 'function') | ||
{ | ||
this._handlers.push (cb); | ||
this._handlers.push (callback); | ||
} | ||
} | ||
/** | ||
* Calls every handler passing a notification as single parameter | ||
* @param {Object} notification notification received from HKBase or HKBase Observer Service | ||
*/ | ||
notify (notification) | ||
@@ -37,0 +69,0 @@ { |
@@ -8,5 +8,7 @@ /* | ||
const ObserverClient = require ('./observerclient') | ||
const amqp = require ('amqp-connection-manager'); | ||
const ConfigurableObserverClient = require ('./configurableobserverclient') | ||
const amqp = require ('amqp-connection-manager'); | ||
const ping = require ('ping'); | ||
async function createChannel () | ||
@@ -17,8 +19,2 @@ { | ||
this._channelWrapper = this._connectionManager.createChannel (); | ||
this._channelWrapper.addSetup((channel) => | ||
{ | ||
channel.assertQueue(this._exchangeName, this._exchangeOptions); | ||
channel.on ('error', console.log); | ||
channel.on ('close', () => this.init()); | ||
}); | ||
return Promise.resolve (); | ||
@@ -28,3 +24,3 @@ } | ||
{ | ||
console.log(err); | ||
console.error(err); | ||
return Promise.reject(); | ||
@@ -44,5 +40,19 @@ } | ||
this._connectionManager = amqp.connect (this._broker, {connectionOptions: options}); | ||
let brokerHost = new URL(this._broker).hostname; | ||
let pingResult = await ping.promise.probe(brokerHost, {timeout: 10}); | ||
if(pingResult.alive) | ||
{ | ||
this._connectionManager = amqp.connect (this._broker, {connectionOptions: options}); | ||
} | ||
else if(this._brokerExternal) | ||
{ | ||
this._connectionManager = amqp.connect (this._brokerExternal, {connectionOptions: options}); | ||
} | ||
else | ||
{ | ||
throw `Cannot reach broker host ${brokerHost} from ${this._broker}`; | ||
} | ||
await this._connectionManager._connectPromise; | ||
this._connectionManager._currentConnection.connection.on ('error', console.log); | ||
this._connectionManager._currentConnection.connection.on ('error', console.error); | ||
@@ -53,3 +63,3 @@ await createChannel.call(this); | ||
{ | ||
console.log(err); | ||
console.error(err); | ||
this._connectionManager = null; | ||
@@ -61,13 +71,29 @@ return Promise.reject(err); | ||
class RabbitMQObserverClient extends ObserverClient | ||
class RabbitMQObserverClient extends ConfigurableObserverClient | ||
{ | ||
constructor (info, options) | ||
/** | ||
* @param {Object} info observer info from hkbase | ||
* @param {string} info.broker amqp broker default address | ||
* @param {string} info.brokerExternal amqp broker external address to be used if default address is not accessible | ||
* @param {string} info.exchangeName name of the RabbitMQ exchange where hkbase will publish messages | ||
* @param {Object} info.exchangeOptions additional options to be used when connecting to hkbase echange | ||
* @param {string} info.certificate RabbitMQ connection certificate (if needed) | ||
* @param {Object} options observer initialization options | ||
* @param {string} options.certificate RabbitMQ connection certificate (if needed) | ||
* @param {Object} hkbaseOptions options to be used when communicating with hkbase | ||
* @param {Object} observerServiceParams observer service parameters (if using specialized observer) | ||
*/ | ||
constructor (info, options, hkbaseOptions, observerServiceParams) | ||
{ | ||
super (); | ||
this._broker = info.broker; | ||
this._exchangeName = info.exchangeName; | ||
this._exchangeOptions = info.exchangeOptions; | ||
this._certificate = info.certificate || options.certificate; | ||
this._connectionManager = null; | ||
this._channelWrapper = null; | ||
super (hkbaseOptions, observerServiceParams); | ||
this._broker = info.broker; | ||
this._brokerExternal = info.brokerExternal; | ||
this._exchangeName = info.exchangeName; | ||
this._defaultExchangeName = info.exchangeName; | ||
this._exchangeOptions = info.exchangeOptions; | ||
this._certificate = info.certificate || options.certificate; | ||
this._connectionManager = null; | ||
this._channelWrapper = null; | ||
this._setupFunction = null; | ||
this._queueName = null; | ||
} | ||
@@ -82,33 +108,113 @@ | ||
{ | ||
console.info(`initializing RabbitMQ observer client`); | ||
try | ||
{ | ||
await connect.call (this); | ||
this._channelWrapper.addSetup(async (channel) => | ||
let exchangeName = this._exchangeName; | ||
// if specialized configuration is set up, get specialized queueName | ||
if(this.usesSpecializedObserver()) | ||
{ | ||
const q = await channel.assertQueue ('', {exclusive: true}); | ||
channel.bindQueue (q.queue, this._exchangeName, ''); | ||
console.log(`Bound to exchange "${this._exchangeName}"`); | ||
console.log(" [*] Waiting for messages in %s.", q.queue); | ||
exchangeName = await this.registerObserver(); | ||
} | ||
else | ||
{ | ||
console.info(`registered as observer of hkbase`); | ||
} | ||
await this._init(exchangeName); | ||
} | ||
catch (err) | ||
{ | ||
console.error(err); | ||
} | ||
} | ||
channel.consume (q.queue, (msg) => | ||
async _init(exchangeName) | ||
{ | ||
await connect.call(this); | ||
this._setupFunction = async (channel) => | ||
{ | ||
this._exchangeName = exchangeName; | ||
const q = await channel.assertQueue('', { exclusive: true, autoDelete: true }); | ||
this._queueName = q.queue; | ||
channel.bindQueue(this._queueName, this._exchangeName, 'create'); | ||
console.info(`Bound to exchange "${this._exchangeName}"`); | ||
console.info(" [*] Waiting for messages in %s.", this._queueName); | ||
channel.consume(this._queueName, (msg) => | ||
{ | ||
if(!msg) return; | ||
try | ||
{ | ||
try | ||
let message = JSON.parse(msg.content.toString()); | ||
if (this._observerId && message.observerId == this._observerId) | ||
{ | ||
let notification = JSON.parse (msg.content.toString()); | ||
this.notify (notification); | ||
this.notify(message.notification); | ||
} | ||
catch (err) | ||
else if (!this._observerId) | ||
{ | ||
console.log (err); | ||
this.notify(message); | ||
} | ||
}, {noAck: true}); | ||
}); | ||
} | ||
catch (err) | ||
{ | ||
console.log(err); | ||
} | ||
catch (err) | ||
{ | ||
console.error(err); | ||
} | ||
}, { noAck: true }); | ||
} | ||
this._channelWrapper.addSetup(this._setupFunction); | ||
this._isInitialized = true; | ||
} | ||
async deinit () | ||
{ | ||
let _deinit = () => setTimeout( async () => | ||
{ | ||
if(this._isInitialized && !this._queueName) | ||
{ | ||
_deinit(); | ||
return; | ||
} | ||
console.info("Deiniting observer"); | ||
if(!this._channelWrapper) | ||
{ | ||
console.warn('Observer already deinited!'); | ||
return; | ||
} | ||
await this._channelWrapper.cancelAll(); | ||
console.info('canceled AMQP consumer'); | ||
if(this._observerId) | ||
{ | ||
await this.unregisterObserver(); | ||
} | ||
await this._channelWrapper.deleteQueue(this._queueName); | ||
console.info(`removed queue ${this._queueName}`); | ||
this._queueName = null; | ||
if(this._setupFunction) | ||
{ | ||
this._channelWrapper.removeSetup(this._setupFunction, (c) => c.close(), async () => | ||
{ | ||
await this._channelWrapper.close(); | ||
await this._connectionManager.close(); | ||
this._setupFunction = null; | ||
this._channelWrapper = null; | ||
this._connectionManager = null; | ||
this._isInitialized = false; | ||
this._exchangeName = this._defaultExchangeName; | ||
console.info("Observer deinited!"); | ||
}); | ||
} | ||
else | ||
{ | ||
await this._channelWrapper.close(); | ||
await this._connectionManager.close(); | ||
console.info("Observer deinited!"); | ||
} | ||
}, 1000); | ||
_deinit(); | ||
} | ||
} | ||
module.exports = RabbitMQObserverClient; |
@@ -8,7 +8,7 @@ /* | ||
const express = require ('express'); | ||
const bodyParser = require ('body-parser'); | ||
const request = require ('request-promise-native'); | ||
const ObserverClient = require ('./observerclient') | ||
const Notification = require ('../notification'); | ||
const express = require ('express'); | ||
const bodyParser = require ('body-parser'); | ||
const request = require ('request-promise-native'); | ||
const ConfigurableObserverClient = require ('./configurableobserverclient') | ||
const Notification = require ('../notification'); | ||
@@ -29,3 +29,2 @@ const DEFAULT_ADDR = 'http://localhost'; | ||
this.notify (notification); | ||
res.sendStatus(200); | ||
@@ -45,31 +44,44 @@ } | ||
this.notify (notification); | ||
res.sendStatus(200); | ||
} | ||
this._webServer.put ('/repository/:repoName', | ||
this._app.post ('/repository/:repoName', | ||
(req, res) => repoCb (req, res, Notification.action.CREATE)); | ||
this._webServer.delete ('/repository/:repoName', | ||
this._app.delete ('/repository/:repoName', | ||
(req, res) => repoCb (req, res, Notification.action.DELETE)); | ||
this._webServer.put ('/repository/:repoName/entity', | ||
this._app.post ('/repository/:repoName/entity', | ||
(req, res) => entitiesCb (req, res, Notification.action.CREATE)); | ||
this._webServer.post ('/repository/:repoName/entity', | ||
this._app.put ('/repository/:repoName/entity', | ||
(req, res) => entitiesCb (req, res, Notification.action.UPDATE)); | ||
this._webServer.delete ('/repository/:repoName/entity', | ||
this._app.delete ('/repository/:repoName/entity', | ||
(req, res) => entitiesCb (req, res, Notification.action.DELETE)); | ||
} | ||
class RestObserverClient extends ObserverClient | ||
class RestObserverClient extends ConfigurableObserverClient | ||
{ | ||
constructor (info, options) | ||
/** | ||
* | ||
* @param {Object} info observer info from hkbase | ||
* @param {Object} options observer initialization options | ||
* @param {string} options.baseUrl base URL of hkbase to be used for registering observer | ||
* @param {number} options.port port te be used when instantiating express server for receiving callback requests | ||
* @param {string} options.address address to be used when instantiating express server for receiving callback requests | ||
* @param {Object} hkbaseOptions options to be used when communicating with hkbase | ||
* @param {Object} observerServiceParams observer service parameters (if using specialized observer) | ||
*/ | ||
constructor (info, options, hkbaseOptions, observerServiceParams) | ||
{ | ||
super (); | ||
super (hkbaseOptions, observerServiceParams); | ||
this._baseUrl = options.baseUrl; | ||
this._webServer = express (); | ||
this._app = express (); | ||
this._port = options.port || 0; | ||
this._address = options.address || DEFAULT_ADDR; | ||
this._observerId = null; | ||
this._listeningPath = null; | ||
this._server = null; | ||
@@ -81,3 +93,3 @@ if (!this._baseUrl.endsWith('/')) | ||
this._webServer.use (bodyParser.json()); | ||
this._app.use (bodyParser.json()); | ||
} | ||
@@ -90,15 +102,29 @@ | ||
init () | ||
async init () | ||
{ | ||
console.info(`initializing REST observer client`); | ||
return new Promise ((resolve, reject) => | ||
{ | ||
setupEndpoints.call (this); | ||
let server = this._webServer.listen (this._port, | ||
this._server = this._app.listen (this._port, | ||
async () => | ||
{ | ||
this._port = server.address().port; | ||
this._port = this._server.address().port; | ||
console.info(`Express Server initialized at port ${this._port} for receiving callback requests of HKBase notifications`); | ||
try | ||
{ | ||
let listeningPath = encodeURIComponent(`${this._address}:${this._port}`); | ||
await request (`${this._baseUrl}observer/${listeningPath}`, {method: 'put'}) | ||
let listeningPath = `${this._address}:${this._port}`; | ||
this._listeningPath = listeningPath; | ||
if(this.usesSpecializedObserver()) | ||
{ | ||
this._observerConfiguration.callbackEndpoint = listeningPath; | ||
await this.registerObserver(); | ||
} | ||
else | ||
{ | ||
let params = {method: 'put'}; | ||
this.setHKBaseOptions(params); | ||
await request (`${this._baseUrl}observer/${encodeURIComponent(listeningPath)}`, params); | ||
console.info(`registered ${listeningPath} as observer of hkbase`); | ||
} | ||
resolve (); | ||
@@ -110,3 +136,3 @@ } | ||
{ | ||
console.log('observer already registered'); | ||
console.warn('observer already registered'); | ||
resolve (); | ||
@@ -116,3 +142,3 @@ } | ||
{ | ||
console.log(err); | ||
console.error(err); | ||
reject (err); | ||
@@ -126,4 +152,30 @@ } | ||
async deinit () | ||
{ | ||
console.info('Deiniting observer'); | ||
if(this._observerId) | ||
{ | ||
this.unregisterObserver(); | ||
} | ||
else if (this._listeningPath) | ||
{ | ||
let params = {method: 'delete'}; | ||
this.setHKBaseOptions(params); | ||
await request (`${this._baseUrl}observer/${encodeURIComponent(this._listeningPath)}`, params); | ||
console.info(`unregistered ${this._listeningPath} as observer of hkbase`); | ||
} | ||
this._listeningPath = null; | ||
if(this._server) | ||
{ | ||
await this._server.close(); | ||
console.info(`Express Server at port ${this._port} was stopped`); | ||
this._port = null; | ||
} | ||
this._server = null; | ||
this._app = null; | ||
console.info('Observer deinited'); | ||
} | ||
} | ||
module.exports = RestObserverClient; |
@@ -8,8 +8,9 @@ /* | ||
module.exports.object = { | ||
const object = { | ||
REPOSITORY: 'repository', | ||
ENTITIES: 'entities' | ||
}; | ||
module.exports.object = object; | ||
module.exports.action = { | ||
const action = { | ||
CREATE: 'create', | ||
@@ -19,1 +20,8 @@ DELETE: 'delete', | ||
}; | ||
module.exports.action = action; | ||
const httpMethodByAction = {}; | ||
httpMethodByAction[action.CREATE] = 'POST'; | ||
httpMethodByAction[action.DELETE] = 'DELETE'; | ||
httpMethodByAction[action.UPDATE] = 'PUT'; | ||
module.exports.httpMethodByAction = httpMethodByAction; |
{ | ||
"name": "hklib", | ||
"version": "3.0.0", | ||
"version": "3.1.0", | ||
"main": "index.js", | ||
@@ -42,3 +42,3 @@ "author": "IBM Research", | ||
"amqplib": "^0.8.0", | ||
"amqp-connection-manager": "~3.2.2", | ||
"amqp-connection-manager": "~3.7.0", | ||
"body-parser": "^1.19.0", | ||
@@ -49,3 +49,6 @@ "express": "^4.17.1", | ||
"request-promise-native": "^1.0.8", | ||
"shortid": "^2.2.14" | ||
"shortid": "^2.2.14", | ||
"ninja-util": "^1.2.3", | ||
"jsonwebtoken": "^8.5.1", | ||
"ping": "^0.4.1" | ||
}, | ||
@@ -52,0 +55,0 @@ "license": "MIT", |
130521
32
4638
11
+ Addedjsonwebtoken@^8.5.1
+ Addedninja-util@^1.2.3
+ Addedping@^0.4.1
+ Addedamqp-connection-manager@3.7.0(transitive)
+ Addedbuffer-equal-constant-time@1.0.1(transitive)
+ Addedbusboy@1.6.0(transitive)
+ Addedecdsa-sig-formatter@1.0.11(transitive)
+ Addedexpress-fileupload@1.5.1(transitive)
+ Addedexpress-query-boolean@2.0.0(transitive)
+ Addedjsonwebtoken@8.5.1(transitive)
+ Addedjwa@1.4.1(transitive)
+ Addedjws@3.2.2(transitive)
+ Addedlodash.includes@4.3.0(transitive)
+ Addedlodash.isboolean@3.0.3(transitive)
+ Addedlodash.isinteger@4.0.4(transitive)
+ Addedlodash.isnumber@3.0.3(transitive)
+ Addedlodash.isplainobject@4.0.6(transitive)
+ Addedlodash.isstring@4.0.1(transitive)
+ Addedlodash.once@4.1.1(transitive)
+ Addedninja-util@1.3.1(transitive)
+ Addedping@0.4.4(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedstreamsearch@1.1.0(transitive)
+ Addedws@7.5.10(transitive)
- Removedamqp-connection-manager@3.2.4(transitive)