@adhityan/gc-nats
Advanced tools
Comparing version 0.0.22 to 0.0.23
export * from './nats'; | ||
export * from './configs'; | ||
export * from './types'; | ||
export * from './worker'; | ||
export { NatsConnectionOptions, Payload } from 'ts-nats'; |
@@ -7,4 +7,5 @@ "use strict"; | ||
__export(require("./nats")); | ||
__export(require("./configs")); | ||
__export(require("./worker")); | ||
var ts_nats_1 = require("ts-nats"); | ||
exports.Payload = ts_nats_1.Payload; | ||
//# sourceMappingURL=index.js.map |
@@ -1,11 +0,9 @@ | ||
import { NatsConnectionConfigs } from './types'; | ||
import { NatsConnectionOptions, Client } from 'ts-nats'; | ||
/** | ||
* NATS streaming server service. It has connect, publish and subscribe methods | ||
* NATS server service. It has connect, publish and subscribe methods | ||
*/ | ||
export declare class NatsService { | ||
private natsConnectionConfigs; | ||
private connection; | ||
private subscription; | ||
constructor(natsConnectionConfigs: NatsConnectionConfigs); | ||
connect(): Promise<void>; | ||
private connectionOptions; | ||
constructor(connectionOptions: NatsConnectionOptions); | ||
connect(): Promise<Client>; | ||
/** | ||
@@ -16,3 +14,3 @@ * Publishes a message to a channel, converts the message to string before publishing | ||
*/ | ||
publish(channel: string, message: {}): Promise<string>; | ||
publish(channel: string, message: any): Promise<void>; | ||
/** | ||
@@ -23,3 +21,3 @@ * subscribe, listens for messages on a channel | ||
*/ | ||
subscribe(channel: string, messageReceived: (...args: any[]) => Promise<void>, messageValidation?: ((...args: any[]) => Promise<boolean>)): Promise<void>; | ||
subscribe(connection: Client, channel: string, messageReceived: (...args: any[]) => Promise<void>, messageValidation?: ((...args: any[]) => Promise<boolean>)): Promise<void>; | ||
} |
112
lib/nats.js
@@ -11,39 +11,14 @@ "use strict"; | ||
}; | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k]; | ||
result["default"] = mod; | ||
return result; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const stan = __importStar(require("node-nats-streaming")); | ||
const nuid = __importStar(require("nuid")); | ||
const configs_1 = require("./configs"); | ||
const ts_nats_1 = require("ts-nats"); | ||
/** | ||
* NATS streaming server service. It has connect, publish and subscribe methods | ||
* NATS server service. It has connect, publish and subscribe methods | ||
*/ | ||
class NatsService { | ||
constructor(natsConnectionConfigs) { | ||
this.subscription = undefined; | ||
this.natsConnectionConfigs = natsConnectionConfigs; | ||
/* If configs.clientId is not provided then generate one */ | ||
if (!this.natsConnectionConfigs.clientId) | ||
this.natsConnectionConfigs.clientId = nuid.next(); | ||
constructor(connectionOptions) { | ||
this.connectionOptions = connectionOptions; | ||
} | ||
/* connect; connects with the NATS Streaming Server client */ | ||
/* connect; connects with the NATS Server client */ | ||
connect() { | ||
return new Promise((resolve, reject) => { | ||
const connection = stan.connect(this.natsConnectionConfigs.clusterId, | ||
// @ts-ignore | ||
this.natsConnectionConfigs.clientId, this.natsConnectionConfigs.options); | ||
connection.on('connect', () => { | ||
this.connection = connection; | ||
resolve(); | ||
}); | ||
connection.on('error', (reason) => { | ||
console.error(`Failed to connect to NATS server with error: ${reason}`); | ||
reject(); | ||
}); | ||
}); | ||
return ts_nats_1.connect(this.connectionOptions); | ||
} | ||
@@ -57,22 +32,6 @@ /** | ||
return __awaiter(this, void 0, void 0, function* () { | ||
return new Promise((resolve, reject) => __awaiter(this, void 0, void 0, function* () { | ||
/* connect to NATS streaming server */ | ||
yield this.connect(); | ||
let stringMessage; | ||
try { | ||
stringMessage = JSON.stringify(message); | ||
this.connection.publish(channel, stringMessage, (err, guid) => { | ||
/* disconnect with NATS streaming server */ | ||
this.connection.close(); | ||
if (err) | ||
return reject(err); | ||
else | ||
return resolve(guid); | ||
}); | ||
} | ||
catch (err) { | ||
console.error(`Failed to stringify message with error: ${err}`); | ||
return reject(err); | ||
} | ||
})); | ||
const connection = yield this.connect(); | ||
connection.publish(channel, message); | ||
yield connection.flush(); | ||
connection.close(); | ||
}); | ||
@@ -85,45 +44,18 @@ } | ||
*/ | ||
subscribe(channel, messageReceived, messageValidation) { | ||
subscribe(connection, channel, messageReceived, messageValidation) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
/* subscribe to the channel */ | ||
const opts = this.connection.subscriptionOptions(); | ||
if (configs_1.natsSubscriptionConfigs.durableName) | ||
opts.setDurableName(configs_1.natsSubscriptionConfigs.durableName); | ||
if (configs_1.natsSubscriptionConfigs.maxInFlight) | ||
opts.setMaxInFlight(configs_1.natsSubscriptionConfigs.maxInFlight); | ||
if (configs_1.natsSubscriptionConfigs.deliverAllAvailable) | ||
opts.setDeliverAllAvailable(); | ||
if (configs_1.natsSubscriptionConfigs.manualAckMode) | ||
opts.setManualAckMode(configs_1.natsSubscriptionConfigs.manualAckMode); | ||
if (configs_1.natsSubscriptionConfigs.ackWaitTime) | ||
opts.setAckWait(configs_1.natsSubscriptionConfigs.ackWaitTime); | ||
/* set queue group to <channel>.queue */ | ||
const queueGroup = channel + '.queue'; | ||
/* subsbcribe to NATS streaming server channel */ | ||
this.subscription = this.connection.subscribe(channel, queueGroup, opts); | ||
/* listen for messages on subscribed channel */ | ||
this.subscription.on('message', (msg) => __awaiter(this, void 0, void 0, function* () { | ||
/* acknowledge the message */ | ||
msg.ack(); | ||
/** | ||
* unsubscribe to the channel, | ||
* as we do not want to receive another message before processing current one | ||
*/ | ||
this.subscription.unsubscribe(); | ||
/* convert message to json format and process */ | ||
const message = msg.getData(); | ||
try { | ||
const parsedMessage = JSON.parse(message); | ||
/* if a validate function is set then first validate the input then pass it forward */ | ||
if (!messageValidation || (messageValidation && (yield messageValidation(parsedMessage)))) { | ||
/* messageReceived gets the parsed message */ | ||
yield messageReceived(parsedMessage); | ||
} | ||
yield connection.subscribe(channel, (err, msg) => __awaiter(this, void 0, void 0, function* () { | ||
if (err) { | ||
console.error(`Failed to subscribe to channel ${channel}`, err); | ||
return; | ||
} | ||
catch (err) { | ||
console.error(`Failed to parse message with error: ${err}`); | ||
/* if a validate function is set then first validate the input then pass it forward */ | ||
if (!messageValidation || (messageValidation && (yield messageValidation(msg.data)))) { | ||
/* messageReceived gets the parsed message */ | ||
yield messageReceived(msg.data); | ||
/* subscribe again to receive next message */ | ||
this.subscribe(connection, channel, messageReceived, messageValidation); | ||
} | ||
/* subscribe again to receive next message */ | ||
this.subscribe(channel, messageReceived, messageValidation); | ||
})); | ||
}), { queue: channel, max: 1 }); | ||
}); | ||
@@ -130,0 +62,0 @@ } |
@@ -1,5 +0,5 @@ | ||
import { NatsConnectionConfigs } from './types'; | ||
import { NatsConnectionOptions } from 'ts-nats'; | ||
/** | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
@@ -11,11 +11,11 @@ * @param messageReceived : This method needs to be defined in the Worker class. | ||
[key: string]: any; | ||
}>(natsConnectionConfigs: NatsConnectionConfigs, channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>): Promise<void>; | ||
}>(connectionOptions: NatsConnectionOptions, channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>): Promise<void>; | ||
/** | ||
* Worker factory | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
*/ | ||
export declare function Worker<X extends { | ||
[key: string]: any; | ||
}>(natsConnectionConfigs: NatsConnectionConfigs): <T extends { | ||
}>(connectionOptions: NatsConnectionOptions): <T extends { | ||
new (...args: any[]): { | ||
@@ -22,0 +22,0 @@ messageReceived(message: X): Promise<void>; |
@@ -15,3 +15,3 @@ "use strict"; | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
@@ -21,7 +21,7 @@ * @param messageReceived : This method needs to be defined in the Worker class. | ||
*/ | ||
function startWorker(natsConnectionConfigs, channel, messageReceived, messageValidation) { | ||
function startWorker(connectionOptions, channel, messageReceived, messageValidation) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const natsService = new nats_1.NatsService(natsConnectionConfigs); | ||
yield natsService.connect(); | ||
return natsService.subscribe(channel, messageReceived, messageValidation); | ||
const natsService = new nats_1.NatsService(connectionOptions); | ||
const connection = yield natsService.connect(); | ||
return natsService.subscribe(connection, channel, messageReceived, messageValidation); | ||
}); | ||
@@ -33,5 +33,5 @@ } | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
*/ | ||
function Worker(natsConnectionConfigs) { | ||
function Worker(connectionOptions) { | ||
return function (constructor) { | ||
@@ -42,5 +42,5 @@ const reference = class extends constructor { | ||
if (this.messageValidation) | ||
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this)); | ||
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this)); | ||
else | ||
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this)); | ||
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this)); | ||
} | ||
@@ -47,0 +47,0 @@ }; |
{ | ||
"name": "@adhityan/gc-nats", | ||
"description": "Typescript Node.js client for NATS streaming server", | ||
"version": "0.0.22", | ||
"version": "0.0.23", | ||
"repository": { | ||
@@ -20,10 +20,9 @@ "type": "git", | ||
"dependencies": { | ||
"node-nats-streaming": "^0.2.6", | ||
"nuid": "^1.1.0" | ||
"ts-nats": "^1.2.4" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^12.0.1", | ||
"ts-node": "^8.1.0", | ||
"typescript": "^3.4.5" | ||
"@types/node": "^12.12.6", | ||
"ts-node": "^8.6.2", | ||
"typescript": "^3.8.2" | ||
} | ||
} |
@@ -11,7 +11,8 @@ # gc-nats | ||
import { Inject, Service } from 'typedi'; | ||
import { ResponseUtil, logger } from '@app/utils'; | ||
import { ResponseUtil } from '@app/utils'; | ||
import { NatsService } from '@buzzkey/gc-nats'; | ||
import { Logger } from '@adhityan/gc-logger'; | ||
@Service() | ||
@JsonController('/v1') | ||
@Service() | ||
export class PingController { | ||
@@ -30,3 +31,3 @@ @Inject() | ||
const messageId: string = await this.natsService.publish(channel, message); | ||
logger.debug(`published message: ${messageId}`); | ||
Logger.debug(`published message: ${messageId}`); | ||
@@ -41,16 +42,16 @@ return ResponseUtil.ok({ message: 'Ping Successful' }, res); | ||
```javascript | ||
import { logger } from '@app/utils'; | ||
import { natsConnectionConfigs } from '@app/config/nats.config'; | ||
import { Worker } from '@buzzkey/gc-nats'; | ||
import { natsConfig } from '@app/config/nats.config'; | ||
import { Logger } from '@adhityan/gc-logger'; | ||
import { Worker } from '@adhityan/gc-nats'; | ||
const subject = "ping"; | ||
@Worker(natsConnectionConfigs, subject) | ||
@Worker<T>(natsConfig) | ||
export class PingWorker { | ||
channel: string = subject; | ||
static channel: string = subject; | ||
public async messageReceived (message: string) { | ||
logger.debug('Ping successful. worker processing message: ', message); | ||
public async messageReceived (message: T) { | ||
Logger.debug('Ping successful. worker processing message: ', message); | ||
} | ||
} | ||
``` |
export * from './nats'; | ||
export * from './configs'; | ||
export * from './types'; | ||
export * from './worker'; | ||
export * from './worker'; | ||
export { NatsConnectionOptions, Payload } from 'ts-nats'; |
130
src/nats.ts
@@ -1,47 +0,20 @@ | ||
import * as stan from 'node-nats-streaming'; | ||
import * as nuid from 'nuid'; | ||
import { | ||
NatsConnectionConfigs, | ||
NatsInternalSubOptions, | ||
NatsMessageType, | ||
} from './types'; | ||
import { natsSubscriptionConfigs } from './configs'; | ||
connect, | ||
NatsConnectionOptions, | ||
Client, | ||
} from 'ts-nats'; | ||
/** | ||
* NATS streaming server service. It has connect, publish and subscribe methods | ||
* NATS server service. It has connect, publish and subscribe methods | ||
*/ | ||
export class NatsService { | ||
private natsConnectionConfigs: NatsConnectionConfigs; | ||
private connection: any | undefined; | ||
private subscription: any = undefined; | ||
private connectionOptions: NatsConnectionOptions; | ||
constructor(natsConnectionConfigs: NatsConnectionConfigs) { | ||
this.natsConnectionConfigs = natsConnectionConfigs; | ||
/* If configs.clientId is not provided then generate one */ | ||
if (!this.natsConnectionConfigs.clientId) | ||
this.natsConnectionConfigs.clientId = nuid.next(); | ||
constructor(connectionOptions: NatsConnectionOptions) { | ||
this.connectionOptions = connectionOptions; | ||
} | ||
/* connect; connects with the NATS Streaming Server client */ | ||
public connect(): Promise<void> { | ||
return new Promise((resolve, reject) => { | ||
const connection = stan.connect( | ||
this.natsConnectionConfigs.clusterId, | ||
// @ts-ignore | ||
this.natsConnectionConfigs.clientId, | ||
this.natsConnectionConfigs.options, | ||
); | ||
connection.on('connect', () => { | ||
this.connection = connection; | ||
resolve(); | ||
}); | ||
connection.on('error', (reason: string) => { | ||
console.error(`Failed to connect to NATS server with error: ${reason}`); | ||
reject(); | ||
}); | ||
}); | ||
/* connect; connects with the NATS Server client */ | ||
public connect(): Promise<Client> { | ||
return connect(this.connectionOptions); | ||
} | ||
@@ -54,22 +27,7 @@ | ||
*/ | ||
public async publish(channel: string, message: {}): Promise<string> { | ||
return new Promise(async (resolve, reject) => { | ||
/* connect to NATS streaming server */ | ||
await this.connect(); | ||
let stringMessage: string; | ||
try { | ||
stringMessage = JSON.stringify(message); | ||
this.connection.publish(channel, stringMessage, (err: string, guid: string) => { | ||
/* disconnect with NATS streaming server */ | ||
this.connection.close(); | ||
if (err) | ||
return reject(err); | ||
else | ||
return resolve(guid); | ||
}); | ||
} catch(err) { | ||
console.error(`Failed to stringify message with error: ${err}`); | ||
return reject(err); | ||
} | ||
}); | ||
public async publish(channel: string, message: any): Promise<void> { | ||
const connection = await this.connect(); | ||
connection.publish(channel, message); | ||
await connection.flush(); | ||
connection.close(); | ||
} | ||
@@ -82,53 +40,23 @@ | ||
*/ | ||
public async subscribe(channel: string, | ||
public async subscribe(connection: Client, channel: string, | ||
messageReceived: (...args: any[]) => Promise<void>, | ||
messageValidation?: ((...args: any[]) => Promise<boolean>)) { | ||
/* subscribe to the channel */ | ||
const opts: NatsInternalSubOptions = this.connection.subscriptionOptions(); | ||
if (natsSubscriptionConfigs.durableName) | ||
opts.setDurableName(natsSubscriptionConfigs.durableName); | ||
if (natsSubscriptionConfigs.maxInFlight) | ||
opts.setMaxInFlight(natsSubscriptionConfigs.maxInFlight); | ||
if (natsSubscriptionConfigs.deliverAllAvailable) | ||
opts.setDeliverAllAvailable(); | ||
if (natsSubscriptionConfigs.manualAckMode) | ||
opts.setManualAckMode(natsSubscriptionConfigs.manualAckMode); | ||
if (natsSubscriptionConfigs.ackWaitTime) | ||
opts.setAckWait(natsSubscriptionConfigs.ackWaitTime); | ||
await connection.subscribe(channel, async (err, msg) => { | ||
if (err) { | ||
console.error(`Failed to subscribe to channel ${channel}`, err); | ||
return; | ||
} | ||
/* set queue group to <channel>.queue */ | ||
const queueGroup = channel + '.queue'; | ||
/* subsbcribe to NATS streaming server channel */ | ||
this.subscription = this.connection.subscribe(channel, queueGroup, opts); | ||
/* listen for messages on subscribed channel */ | ||
this.subscription.on('message', async (msg: NatsMessageType) => { | ||
/* acknowledge the message */ | ||
msg.ack(); | ||
/** | ||
* unsubscribe to the channel, | ||
* as we do not want to receive another message before processing current one | ||
*/ | ||
this.subscription.unsubscribe(); | ||
/* convert message to json format and process */ | ||
const message = msg.getData(); | ||
try { | ||
const parsedMessage = JSON.parse(message); | ||
/* if a validate function is set then first validate the input then pass it forward */ | ||
if(!messageValidation || (messageValidation && await messageValidation(parsedMessage))) { | ||
if(!messageValidation || (messageValidation && await messageValidation(msg.data))) { | ||
/* messageReceived gets the parsed message */ | ||
await messageReceived(parsedMessage); | ||
await messageReceived(msg.data); | ||
/* subscribe again to receive next message */ | ||
this.subscribe(connection, channel, messageReceived, messageValidation); | ||
} | ||
} catch(err) { | ||
console.error(`Failed to parse message with error: ${err}`); | ||
} | ||
/* subscribe again to receive next message */ | ||
this.subscribe(channel, messageReceived, messageValidation); | ||
}); | ||
}, {queue: channel, max: 1}); | ||
} | ||
} |
@@ -1,2 +0,2 @@ | ||
import { NatsConnectionConfigs } from './types'; | ||
import { NatsConnectionOptions } from 'ts-nats'; | ||
import { NatsService } from './nats'; | ||
@@ -6,3 +6,3 @@ | ||
* StartWorker function, this creates a new connection and starts the subscription for a channel | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
* @param channel : The channel/subject on which the worker will execute | ||
@@ -12,8 +12,8 @@ * @param messageReceived : This method needs to be defined in the Worker class. | ||
*/ | ||
export async function startWorker<X extends { [key: string]: any }> (natsConnectionConfigs: NatsConnectionConfigs, | ||
export async function startWorker<X extends { [key: string]: any }> (connectionOptions: NatsConnectionOptions, | ||
channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>) { | ||
const natsService = new NatsService(natsConnectionConfigs); | ||
await natsService.connect(); | ||
const natsService = new NatsService(connectionOptions); | ||
const connection = await natsService.connect(); | ||
return natsService.subscribe(channel, messageReceived, messageValidation); | ||
return natsService.subscribe(connection, channel, messageReceived, messageValidation); | ||
} | ||
@@ -24,5 +24,5 @@ | ||
* Executes the "messageReceived" method of the class whenever a message is received | ||
* @param natsConnectionConfigs : NATS connection settings | ||
* @param connectionOptions : NATS connection settings | ||
*/ | ||
export function Worker <X extends { [key: string]: any }> (natsConnectionConfigs: NatsConnectionConfigs) { | ||
export function Worker <X extends { [key: string]: any }> (connectionOptions: NatsConnectionOptions) { | ||
return function <T extends { new(...args: any[]): { | ||
@@ -37,5 +37,5 @@ messageReceived(message: X): Promise<void>, | ||
if(this.messageValidation) | ||
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this)); | ||
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this)); | ||
else | ||
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this)); | ||
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this)); | ||
} | ||
@@ -42,0 +42,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1
55
21585
15
341
+ Addedts-nats@^1.2.4
+ Addedts-nats@1.2.15(transitive)
- Removednode-nats-streaming@^0.2.6
- Removednuid@^1.1.0
- Removedgoogle-protobuf@3.21.4(transitive)
- Removednats@1.4.12(transitive)
- Removednode-nats-streaming@0.2.6(transitive)