@adhityan/gc-nats
Advanced tools
Comparing version 0.0.5 to 0.0.7
export * from './nats'; | ||
export * from './configs'; | ||
export * from './types'; | ||
export * from './worker'; |
@@ -8,2 +8,3 @@ "use strict"; | ||
__export(require("./configs")); | ||
__export(require("./worker")); | ||
//# sourceMappingURL=index.js.map |
@@ -9,3 +9,2 @@ import { NatsConnectionConfigs } from './types'; | ||
private subscription; | ||
messageReceivedReference: any; | ||
constructor(natsConnectionConfigs: NatsConnectionConfigs); | ||
@@ -24,28 +23,3 @@ connect(): Promise<void>; | ||
*/ | ||
subscribe(channel: string): Promise<void>; | ||
subscribe(channel: string, messageReceived: (...args: any[]) => void, messageValidation?: ((...args: any[]) => boolean)): Promise<void>; | ||
} | ||
/** | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
* @param messageReceived : This method needs to be defined in the Worker class. | ||
* It will get executed whenever a message is received | ||
*/ | ||
export declare function startWorker<X extends { | ||
[key: string]: any; | ||
}>(natsConnectionConfigs: NatsConnectionConfigs, channel: string, messageReceived: (message: X) => void): Promise<void>; | ||
/** | ||
* Worker factory | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
*/ | ||
export declare function Worker<X extends { | ||
[key: string]: any; | ||
}>(natsConnectionConfigs: NatsConnectionConfigs, channel: string): <T extends new (...args: any[]) => { | ||
messageReceived(message: X): void; | ||
}>(constructor: T) => { | ||
new (...args: any[]): { | ||
messageReceived(message: X): void; | ||
}; | ||
} & T; |
@@ -28,3 +28,2 @@ "use strict"; | ||
this.subscription = undefined; | ||
this.messageReceivedReference = undefined; | ||
this.natsConnectionConfigs = natsConnectionConfigs; | ||
@@ -85,3 +84,3 @@ /* If configs.clientId is not provided then generate one */ | ||
*/ | ||
subscribe(channel) { | ||
subscribe(channel, messageReceived, messageValidation) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
@@ -117,4 +116,7 @@ /* subscribe to the channel */ | ||
const parsedMessage = JSON.parse(message); | ||
/* messageReceivedReference the parsed message */ | ||
yield this.messageReceivedReference(parsedMessage); | ||
/* if a validate function is set then first validate the input then pass it forward */ | ||
if (!messageValidation || (messageValidation && messageValidation(parsedMessage))) { | ||
/* messageReceived gets the parsed message */ | ||
yield messageReceived(parsedMessage); | ||
} | ||
} | ||
@@ -125,3 +127,3 @@ catch (err) { | ||
/* subscribe again to receive next message */ | ||
this.subscribe(channel); | ||
this.subscribe(channel, messageReceived, messageValidation); | ||
})); | ||
@@ -132,35 +134,2 @@ }); | ||
exports.NatsService = NatsService; | ||
/** | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
* @param messageReceived : This method needs to be defined in the Worker class. | ||
* It will get executed whenever a message is received | ||
*/ | ||
function startWorker(natsConnectionConfigs, channel, messageReceived) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const natsService = new NatsService(natsConnectionConfigs); | ||
yield natsService.connect(); | ||
natsService.messageReceivedReference = messageReceived; | ||
natsService.subscribe(channel); | ||
}); | ||
} | ||
exports.startWorker = startWorker; | ||
/** | ||
* Worker factory | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
*/ | ||
function Worker(natsConnectionConfigs, channel) { | ||
return function (constructor) { | ||
return class extends constructor { | ||
constructor(...args) { | ||
super(...args); | ||
startWorker(natsConnectionConfigs, channel, this.messageReceived); | ||
} | ||
}; | ||
}; | ||
} | ||
exports.Worker = Worker; | ||
//# sourceMappingURL=nats.js.map |
{ | ||
"name": "@adhityan/gc-nats", | ||
"version": "0.0.5", | ||
"version": "0.0.7", | ||
"description": "Typescript Node.js client for NATS streaming server", | ||
@@ -5,0 +5,0 @@ "repository": { |
export * from './nats'; | ||
export * from './configs'; | ||
export * from './types'; | ||
export * from './types'; | ||
export * from './worker'; |
@@ -18,3 +18,2 @@ import * as stan from 'node-nats-streaming'; | ||
private subscription: any = undefined; | ||
public messageReceivedReference: any = undefined; | ||
@@ -83,3 +82,5 @@ constructor(natsConnectionConfigs: NatsConnectionConfigs) { | ||
*/ | ||
public async subscribe(channel: string) { | ||
public async subscribe(channel: string, | ||
messageReceived: (...args: any[]) => void, | ||
messageValidation?: ((...args: any[]) => boolean)) { | ||
/* subscribe to the channel */ | ||
@@ -119,4 +120,8 @@ const opts: NatsInternalSubOptions = this.connection.subscriptionOptions(); | ||
const parsedMessage = JSON.parse(message); | ||
/* messageReceivedReference the parsed message */ | ||
await this.messageReceivedReference(parsedMessage); | ||
/* if a validate function is set then first validate the input then pass it forward */ | ||
if(!messageValidation || (messageValidation && messageValidation(parsedMessage))) { | ||
/* messageReceived gets the parsed message */ | ||
await messageReceived(parsedMessage); | ||
} | ||
} catch(err) { | ||
@@ -127,38 +132,5 @@ console.error(`Failed to parse message with error: ${err}`); | ||
/* subscribe again to receive next message */ | ||
this.subscribe(channel); | ||
this.subscribe(channel, messageReceived, messageValidation); | ||
}); | ||
} | ||
} | ||
/** | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
* @param messageReceived : This method needs to be defined in the Worker class. | ||
* It will get executed whenever a message is received | ||
*/ | ||
export async function startWorker<X extends { [key: string]: any }> ( | ||
natsConnectionConfigs: NatsConnectionConfigs, channel: string, messageReceived: (message: X) => void) { | ||
const natsService = new NatsService(natsConnectionConfigs); | ||
await natsService.connect(); | ||
natsService.messageReceivedReference = messageReceived; | ||
natsService.subscribe(channel); | ||
} | ||
/** | ||
* Worker factory | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
*/ | ||
export function Worker <X extends { [key: string]: any }> (natsConnectionConfigs: NatsConnectionConfigs, | ||
channel: string) { | ||
return function <T extends { new(...args: any[]): { messageReceived(message: X): void } }>(constructor: T) { | ||
return class extends constructor { | ||
constructor(...args: any[]) { | ||
super(...args); | ||
startWorker(natsConnectionConfigs, channel, this.messageReceived); | ||
} | ||
} | ||
} | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
33992
23
612