New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@adhityan/gc-nats

Package Overview
Dependencies
Maintainers
1
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@adhityan/gc-nats - npm Package Compare versions

Comparing version 0.0.22 to 0.0.23

3

lib/index.d.ts
export * from './nats';
export * from './configs';
export * from './types';
export * from './worker';
export { NatsConnectionOptions, Payload } from 'ts-nats';

@@ -7,4 +7,5 @@ "use strict";

__export(require("./nats"));
__export(require("./configs"));
__export(require("./worker"));
var ts_nats_1 = require("ts-nats");
exports.Payload = ts_nats_1.Payload;
//# sourceMappingURL=index.js.map

@@ -1,11 +0,9 @@

import { NatsConnectionConfigs } from './types';
import { NatsConnectionOptions, Client } from 'ts-nats';
/**
* NATS streaming server service. It has connect, publish and subscribe methods
* NATS server service. It has connect, publish and subscribe methods
*/
export declare class NatsService {
private natsConnectionConfigs;
private connection;
private subscription;
constructor(natsConnectionConfigs: NatsConnectionConfigs);
connect(): Promise<void>;
private connectionOptions;
constructor(connectionOptions: NatsConnectionOptions);
connect(): Promise<Client>;
/**

@@ -16,3 +14,3 @@ * Publishes a message to a channel, converts the message to string before publishing

*/
publish(channel: string, message: {}): Promise<string>;
publish(channel: string, message: any): Promise<void>;
/**

@@ -23,3 +21,3 @@ * subscribe, listens for messages on a channel

*/
subscribe(channel: string, messageReceived: (...args: any[]) => Promise<void>, messageValidation?: ((...args: any[]) => Promise<boolean>)): Promise<void>;
subscribe(connection: Client, channel: string, messageReceived: (...args: any[]) => Promise<void>, messageValidation?: ((...args: any[]) => Promise<boolean>)): Promise<void>;
}

@@ -11,39 +11,14 @@ "use strict";

};
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k];
result["default"] = mod;
return result;
};
Object.defineProperty(exports, "__esModule", { value: true });
const stan = __importStar(require("node-nats-streaming"));
const nuid = __importStar(require("nuid"));
const configs_1 = require("./configs");
const ts_nats_1 = require("ts-nats");
/**
* NATS streaming server service. It has connect, publish and subscribe methods
* NATS server service. It has connect, publish and subscribe methods
*/
class NatsService {
constructor(natsConnectionConfigs) {
this.subscription = undefined;
this.natsConnectionConfigs = natsConnectionConfigs;
/* If configs.clientId is not provided then generate one */
if (!this.natsConnectionConfigs.clientId)
this.natsConnectionConfigs.clientId = nuid.next();
constructor(connectionOptions) {
this.connectionOptions = connectionOptions;
}
/* connect; connects with the NATS Streaming Server client */
/* connect; connects with the NATS Server client */
connect() {
return new Promise((resolve, reject) => {
const connection = stan.connect(this.natsConnectionConfigs.clusterId,
// @ts-ignore
this.natsConnectionConfigs.clientId, this.natsConnectionConfigs.options);
connection.on('connect', () => {
this.connection = connection;
resolve();
});
connection.on('error', (reason) => {
console.error(`Failed to connect to NATS server with error: ${reason}`);
reject();
});
});
return ts_nats_1.connect(this.connectionOptions);
}

@@ -57,22 +32,6 @@ /**

return __awaiter(this, void 0, void 0, function* () {
return new Promise((resolve, reject) => __awaiter(this, void 0, void 0, function* () {
/* connect to NATS streaming server */
yield this.connect();
let stringMessage;
try {
stringMessage = JSON.stringify(message);
this.connection.publish(channel, stringMessage, (err, guid) => {
/* disconnect with NATS streaming server */
this.connection.close();
if (err)
return reject(err);
else
return resolve(guid);
});
}
catch (err) {
console.error(`Failed to stringify message with error: ${err}`);
return reject(err);
}
}));
const connection = yield this.connect();
connection.publish(channel, message);
yield connection.flush();
connection.close();
});

@@ -85,45 +44,18 @@ }

*/
subscribe(channel, messageReceived, messageValidation) {
subscribe(connection, channel, messageReceived, messageValidation) {
return __awaiter(this, void 0, void 0, function* () {
/* subscribe to the channel */
const opts = this.connection.subscriptionOptions();
if (configs_1.natsSubscriptionConfigs.durableName)
opts.setDurableName(configs_1.natsSubscriptionConfigs.durableName);
if (configs_1.natsSubscriptionConfigs.maxInFlight)
opts.setMaxInFlight(configs_1.natsSubscriptionConfigs.maxInFlight);
if (configs_1.natsSubscriptionConfigs.deliverAllAvailable)
opts.setDeliverAllAvailable();
if (configs_1.natsSubscriptionConfigs.manualAckMode)
opts.setManualAckMode(configs_1.natsSubscriptionConfigs.manualAckMode);
if (configs_1.natsSubscriptionConfigs.ackWaitTime)
opts.setAckWait(configs_1.natsSubscriptionConfigs.ackWaitTime);
/* set queue group to <channel>.queue */
const queueGroup = channel + '.queue';
/* subsbcribe to NATS streaming server channel */
this.subscription = this.connection.subscribe(channel, queueGroup, opts);
/* listen for messages on subscribed channel */
this.subscription.on('message', (msg) => __awaiter(this, void 0, void 0, function* () {
/* acknowledge the message */
msg.ack();
/**
* unsubscribe to the channel,
* as we do not want to receive another message before processing current one
*/
this.subscription.unsubscribe();
/* convert message to json format and process */
const message = msg.getData();
try {
const parsedMessage = JSON.parse(message);
/* if a validate function is set then first validate the input then pass it forward */
if (!messageValidation || (messageValidation && (yield messageValidation(parsedMessage)))) {
/* messageReceived gets the parsed message */
yield messageReceived(parsedMessage);
}
yield connection.subscribe(channel, (err, msg) => __awaiter(this, void 0, void 0, function* () {
if (err) {
console.error(`Failed to subscribe to channel ${channel}`, err);
return;
}
catch (err) {
console.error(`Failed to parse message with error: ${err}`);
/* if a validate function is set then first validate the input then pass it forward */
if (!messageValidation || (messageValidation && (yield messageValidation(msg.data)))) {
/* messageReceived gets the parsed message */
yield messageReceived(msg.data);
/* subscribe again to receive next message */
this.subscribe(connection, channel, messageReceived, messageValidation);
}
/* subscribe again to receive next message */
this.subscribe(channel, messageReceived, messageValidation);
}));
}), { queue: channel, max: 1 });
});

@@ -130,0 +62,0 @@ }

@@ -1,5 +0,5 @@

import { NatsConnectionConfigs } from './types';
import { NatsConnectionOptions } from 'ts-nats';
/**
* StartWorker function, this creates a new connection and starts the subscription for a channel
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
* @param channel : The channel/subject on which the worker will execute

@@ -11,11 +11,11 @@ * @param messageReceived : This method needs to be defined in the Worker class.

[key: string]: any;
}>(natsConnectionConfigs: NatsConnectionConfigs, channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>): Promise<void>;
}>(connectionOptions: NatsConnectionOptions, channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>): Promise<void>;
/**
* Worker factory
* Executes the "messageReceived" method of the class whenever a message is received
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
*/
export declare function Worker<X extends {
[key: string]: any;
}>(natsConnectionConfigs: NatsConnectionConfigs): <T extends {
}>(connectionOptions: NatsConnectionOptions): <T extends {
new (...args: any[]): {

@@ -22,0 +22,0 @@ messageReceived(message: X): Promise<void>;

@@ -15,3 +15,3 @@ "use strict";

* StartWorker function, this creates a new connection and starts the subscription for a channel
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
* @param channel : The channel/subject on which the worker will execute

@@ -21,7 +21,7 @@ * @param messageReceived : This method needs to be defined in the Worker class.

*/
function startWorker(natsConnectionConfigs, channel, messageReceived, messageValidation) {
function startWorker(connectionOptions, channel, messageReceived, messageValidation) {
return __awaiter(this, void 0, void 0, function* () {
const natsService = new nats_1.NatsService(natsConnectionConfigs);
yield natsService.connect();
return natsService.subscribe(channel, messageReceived, messageValidation);
const natsService = new nats_1.NatsService(connectionOptions);
const connection = yield natsService.connect();
return natsService.subscribe(connection, channel, messageReceived, messageValidation);
});

@@ -33,5 +33,5 @@ }

* Executes the "messageReceived" method of the class whenever a message is received
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
*/
function Worker(natsConnectionConfigs) {
function Worker(connectionOptions) {
return function (constructor) {

@@ -42,5 +42,5 @@ const reference = class extends constructor {

if (this.messageValidation)
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this));
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this));
else
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this));
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this));
}

@@ -47,0 +47,0 @@ };

{
"name": "@adhityan/gc-nats",
"description": "Typescript Node.js client for NATS streaming server",
"version": "0.0.22",
"version": "0.0.23",
"repository": {

@@ -20,10 +20,9 @@ "type": "git",

"dependencies": {
"node-nats-streaming": "^0.2.6",
"nuid": "^1.1.0"
"ts-nats": "^1.2.4"
},
"devDependencies": {
"@types/node": "^12.0.1",
"ts-node": "^8.1.0",
"typescript": "^3.4.5"
"@types/node": "^12.12.6",
"ts-node": "^8.6.2",
"typescript": "^3.8.2"
}
}

@@ -11,7 +11,8 @@ # gc-nats

import { Inject, Service } from 'typedi';
import { ResponseUtil, logger } from '@app/utils';
import { ResponseUtil } from '@app/utils';
import { NatsService } from '@buzzkey/gc-nats';
import { Logger } from '@adhityan/gc-logger';
@Service()
@JsonController('/v1')
@Service()
export class PingController {

@@ -30,3 +31,3 @@ @Inject()

const messageId: string = await this.natsService.publish(channel, message);
logger.debug(`published message: ${messageId}`);
Logger.debug(`published message: ${messageId}`);

@@ -41,16 +42,16 @@ return ResponseUtil.ok({ message: 'Ping Successful' }, res);

```javascript
import { logger } from '@app/utils';
import { natsConnectionConfigs } from '@app/config/nats.config';
import { Worker } from '@buzzkey/gc-nats';
import { natsConfig } from '@app/config/nats.config';
import { Logger } from '@adhityan/gc-logger';
import { Worker } from '@adhityan/gc-nats';
const subject = "ping";
@Worker(natsConnectionConfigs, subject)
@Worker<T>(natsConfig)
export class PingWorker {
channel: string = subject;
static channel: string = subject;
public async messageReceived (message: string) {
logger.debug('Ping successful. worker processing message: ', message);
public async messageReceived (message: T) {
Logger.debug('Ping successful. worker processing message: ', message);
}
}
```
export * from './nats';
export * from './configs';
export * from './types';
export * from './worker';
export * from './worker';
export { NatsConnectionOptions, Payload } from 'ts-nats';

@@ -1,47 +0,20 @@

import * as stan from 'node-nats-streaming';
import * as nuid from 'nuid';
import {
NatsConnectionConfigs,
NatsInternalSubOptions,
NatsMessageType,
} from './types';
import { natsSubscriptionConfigs } from './configs';
connect,
NatsConnectionOptions,
Client,
} from 'ts-nats';
/**
* NATS streaming server service. It has connect, publish and subscribe methods
* NATS server service. It has connect, publish and subscribe methods
*/
export class NatsService {
private natsConnectionConfigs: NatsConnectionConfigs;
private connection: any | undefined;
private subscription: any = undefined;
private connectionOptions: NatsConnectionOptions;
constructor(natsConnectionConfigs: NatsConnectionConfigs) {
this.natsConnectionConfigs = natsConnectionConfigs;
/* If configs.clientId is not provided then generate one */
if (!this.natsConnectionConfigs.clientId)
this.natsConnectionConfigs.clientId = nuid.next();
constructor(connectionOptions: NatsConnectionOptions) {
this.connectionOptions = connectionOptions;
}
/* connect; connects with the NATS Streaming Server client */
public connect(): Promise<void> {
return new Promise((resolve, reject) => {
const connection = stan.connect(
this.natsConnectionConfigs.clusterId,
// @ts-ignore
this.natsConnectionConfigs.clientId,
this.natsConnectionConfigs.options,
);
connection.on('connect', () => {
this.connection = connection;
resolve();
});
connection.on('error', (reason: string) => {
console.error(`Failed to connect to NATS server with error: ${reason}`);
reject();
});
});
/* connect; connects with the NATS Server client */
public connect(): Promise<Client> {
return connect(this.connectionOptions);
}

@@ -54,22 +27,7 @@

*/
public async publish(channel: string, message: {}): Promise<string> {
return new Promise(async (resolve, reject) => {
/* connect to NATS streaming server */
await this.connect();
let stringMessage: string;
try {
stringMessage = JSON.stringify(message);
this.connection.publish(channel, stringMessage, (err: string, guid: string) => {
/* disconnect with NATS streaming server */
this.connection.close();
if (err)
return reject(err);
else
return resolve(guid);
});
} catch(err) {
console.error(`Failed to stringify message with error: ${err}`);
return reject(err);
}
});
public async publish(channel: string, message: any): Promise<void> {
const connection = await this.connect();
connection.publish(channel, message);
await connection.flush();
connection.close();
}

@@ -82,53 +40,23 @@

*/
public async subscribe(channel: string,
public async subscribe(connection: Client, channel: string,
messageReceived: (...args: any[]) => Promise<void>,
messageValidation?: ((...args: any[]) => Promise<boolean>)) {
/* subscribe to the channel */
const opts: NatsInternalSubOptions = this.connection.subscriptionOptions();
if (natsSubscriptionConfigs.durableName)
opts.setDurableName(natsSubscriptionConfigs.durableName);
if (natsSubscriptionConfigs.maxInFlight)
opts.setMaxInFlight(natsSubscriptionConfigs.maxInFlight);
if (natsSubscriptionConfigs.deliverAllAvailable)
opts.setDeliverAllAvailable();
if (natsSubscriptionConfigs.manualAckMode)
opts.setManualAckMode(natsSubscriptionConfigs.manualAckMode);
if (natsSubscriptionConfigs.ackWaitTime)
opts.setAckWait(natsSubscriptionConfigs.ackWaitTime);
await connection.subscribe(channel, async (err, msg) => {
if (err) {
console.error(`Failed to subscribe to channel ${channel}`, err);
return;
}
/* set queue group to <channel>.queue */
const queueGroup = channel + '.queue';
/* subsbcribe to NATS streaming server channel */
this.subscription = this.connection.subscribe(channel, queueGroup, opts);
/* listen for messages on subscribed channel */
this.subscription.on('message', async (msg: NatsMessageType) => {
/* acknowledge the message */
msg.ack();
/**
* unsubscribe to the channel,
* as we do not want to receive another message before processing current one
*/
this.subscription.unsubscribe();
/* convert message to json format and process */
const message = msg.getData();
try {
const parsedMessage = JSON.parse(message);
/* if a validate function is set then first validate the input then pass it forward */
if(!messageValidation || (messageValidation && await messageValidation(parsedMessage))) {
if(!messageValidation || (messageValidation && await messageValidation(msg.data))) {
/* messageReceived gets the parsed message */
await messageReceived(parsedMessage);
await messageReceived(msg.data);
/* subscribe again to receive next message */
this.subscribe(connection, channel, messageReceived, messageValidation);
}
} catch(err) {
console.error(`Failed to parse message with error: ${err}`);
}
/* subscribe again to receive next message */
this.subscribe(channel, messageReceived, messageValidation);
});
}, {queue: channel, max: 1});
}
}

@@ -1,2 +0,2 @@

import { NatsConnectionConfigs } from './types';
import { NatsConnectionOptions } from 'ts-nats';
import { NatsService } from './nats';

@@ -6,3 +6,3 @@

* StartWorker function, this creates a new connection and starts the subscription for a channel
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
* @param channel : The channel/subject on which the worker will execute

@@ -12,8 +12,8 @@ * @param messageReceived : This method needs to be defined in the Worker class.

*/
export async function startWorker<X extends { [key: string]: any }> (natsConnectionConfigs: NatsConnectionConfigs,
export async function startWorker<X extends { [key: string]: any }> (connectionOptions: NatsConnectionOptions,
channel: string, messageReceived: (message: X) => Promise<void>, messageValidation?: (message: X) => Promise<boolean>) {
const natsService = new NatsService(natsConnectionConfigs);
await natsService.connect();
const natsService = new NatsService(connectionOptions);
const connection = await natsService.connect();
return natsService.subscribe(channel, messageReceived, messageValidation);
return natsService.subscribe(connection, channel, messageReceived, messageValidation);
}

@@ -24,5 +24,5 @@

* Executes the "messageReceived" method of the class whenever a message is received
* @param natsConnectionConfigs : NATS connection settings
* @param connectionOptions : NATS connection settings
*/
export function Worker <X extends { [key: string]: any }> (natsConnectionConfigs: NatsConnectionConfigs) {
export function Worker <X extends { [key: string]: any }> (connectionOptions: NatsConnectionOptions) {
return function <T extends { new(...args: any[]): {

@@ -37,5 +37,5 @@ messageReceived(message: X): Promise<void>,

if(this.messageValidation)
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this));
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this), this.messageValidation.bind(this));
else
return startWorker(natsConnectionConfigs, constructor.channel, this.messageReceived.bind(this));
return startWorker(connectionOptions, constructor.channel, this.messageReceived.bind(this));
}

@@ -42,0 +42,0 @@ }

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc