Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lib-task-scheduler

Package Overview
Dependencies
Maintainers
1
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lib-task-scheduler - npm Package Compare versions

Comparing version 1.0.33 to 1.0.38

dist/src/sns_sqs_consumer_task.d.ts

8

dist/src/clients/sns.d.ts
/// <reference types="chai" />
import * as AWS from 'aws-sdk';
import { AwsCredentialsConfig } from './aws';

@@ -15,7 +16,12 @@ import { Logger } from '../logger';

private snsSubscribeTopic;
private snsListTopics;
constructor(config: SNSConfig, awsConfig: AwsCredentialsConfig, log?: Logger);
private getArn(topicName);
private topicFullName(topicName);
private getOrCreateArn(topicName, createIfUnknown);
publishEvent(topicName: string, message: Object): Promise<void>;
createTopic(topicName: string): Promise<string>;
findTopicArnOrThrow(topicName: string): Promise<string>;
listAllTopics(): Promise<AWS.SNS.Topic[]>;
listTopics(nextToken?: string): Promise<AWS.SNS.ListTopicsResponse>;
subscribeQueueToTopic(topicName: string, queueArn: string): Promise<string>;
}

@@ -10,2 +10,3 @@ "use strict";

};
const _ = require('lodash');
const AWS = require('aws-sdk');

@@ -28,8 +29,17 @@ const bPromise = require('bluebird');

this.snsSubscribeTopic = bPromise.promisify(snsClient.subscribe, context);
this.snsListTopics = bPromise.promisify(snsClient.listTopics, context);
this.log = log || new logger_1.NoopLogger();
}
getArn(topicName) {
topicFullName(topicName) {
return `${this.config.namePrefix}-${topicName}`;
}
getOrCreateArn(topicName, createIfUnknown) {
return __awaiter(this, void 0, void 0, function* () {
if (!this.topicArns.has(topicName)) {
this.topicArns.set(topicName, yield this.createTopic(topicName));
if (createIfUnknown) {
this.topicArns.set(topicName, yield this.createTopic(topicName));
}
else {
this.topicArns.set(topicName, yield this.findTopicArnOrThrow(topicName));
}
}

@@ -41,3 +51,3 @@ return this.topicArns.get(topicName);

return __awaiter(this, void 0, void 0, function* () {
const topicArn = yield this.getArn(topicName);
const topicArn = yield this.getOrCreateArn(topicName, true);
this.log.info(`Publishing message to ${topicName}: ${JSON.stringify(message)}`);

@@ -50,5 +60,6 @@ yield this.snsPublish({

}
// this call is idempotent
createTopic(topicName) {
return __awaiter(this, void 0, void 0, function* () {
const fullTopicName = `${this.config.namePrefix}-${topicName}`;
const fullTopicName = this.topicFullName(topicName);
this.log.info(`Creating topic ${fullTopicName}`);

@@ -61,7 +72,40 @@ const result = yield this.snsCreateTopic({

}
findTopicArnOrThrow(topicName) {
return __awaiter(this, void 0, void 0, function* () {
const fullTopicName = this.topicFullName(topicName);
// the topic name is the last part of the Arn, which has ':' as field separator
const mapped = _.map(yield this.listAllTopics(), 'TopicArn');
const foundArn = _.find(mapped, t => t.endsWith(`:${fullTopicName}`));
if (!foundArn) {
throw new Error(`Unable to find arn for topic ${fullTopicName}. Arns found: ${JSON.stringify(mapped)}`);
}
return foundArn;
});
}
listAllTopics() {
return __awaiter(this, void 0, void 0, function* () {
let allTopics = [];
let nextToken = undefined;
do {
const response = yield this.listTopics(nextToken);
nextToken = response.NextToken;
allTopics = _.concat(allTopics, response.Topics);
} while (nextToken);
this.log.debug(`Found topics: ${JSON.stringify(allTopics)}`);
return allTopics;
});
}
listTopics(nextToken = undefined) {
return __awaiter(this, void 0, void 0, function* () {
this.log.debug('Listing topics');
return yield this.snsListTopics({
NextToken: nextToken,
});
});
}
// this call is idempotent
subscribeQueueToTopic(topicName, queueArn) {
return __awaiter(this, void 0, void 0, function* () {
const topicArn = yield this.getArn(topicName);
this.log.info(`Subscribing queue ${queueArn} to topic ${topicName}`);
const topicArn = yield this.getOrCreateArn(topicName, false);
this.log.info(`Subscribing queue ${queueArn} to topic ${topicArn}`);
const result = yield this.snsSubscribeTopic({

@@ -68,0 +112,0 @@ Endpoint: queueArn,

2

dist/src/clients/sqs.d.ts

@@ -45,3 +45,3 @@ import * as AWS from 'aws-sdk';

deleteMessage(receiptHandle: string): Promise<void>;
getArn(): Promise<string>;
getOrCreateArn(): Promise<string>;
}

@@ -87,3 +87,4 @@ "use strict";

const attributes = this.awsAttrs();
this.log.info(`Creating SQS queue ${JSON.stringify(this.config)}`);
this.log.info(`Creating SQS queue ${this.queueName}
with attributes ${JSON.stringify(attributes)}. Config: ${JSON.stringify(this.config)}`);
const result = yield this.awsCreateQueue({

@@ -161,3 +162,3 @@ QueueName: this.queueName,

}
getArn() {
getOrCreateArn() {
return __awaiter(this, void 0, void 0, function* () {

@@ -164,0 +165,0 @@ if (!this.queueArn) {

/// <reference types="node" />
export * from './source_sink_task';
export * from './sqs_consumer_task';
export * from './sns_sqs_consumer_task';
export * from './source_sink_runner';

@@ -5,0 +6,0 @@ export * from './sqs_queue';

@@ -17,2 +17,3 @@ "use strict";

__export(require('./sqs_consumer_task'));
__export(require('./sns_sqs_consumer_task'));
__export(require('./source_sink_runner'));

@@ -19,0 +20,0 @@ __export(require('./sqs_queue'));

@@ -14,6 +14,8 @@ /// <reference types="chai" />

log?: Logger;
taskLog?: Logger;
}
export declare abstract class SQSConsumerTask implements Task {
private config;
private sqsClient;
private readonly sqsClient;
private readonly log;
constructor(config: SQSTaskConfig);

@@ -20,0 +22,0 @@ execute(): Promise<boolean>;

@@ -12,4 +12,8 @@ "use strict";

const sqs_1 = require('./clients/sqs');
const logger_1 = require('./logger');
const MAX_RETRY_INTERVAL_SECS = moment.duration(10, 'minutes').asSeconds();
const BASE_RETRY_INTERVAL_SECS = moment.duration(30, 'seconds').asSeconds();
/*
* Consumes messages from an SQS queue.
*/
class SQSConsumerTask {

@@ -19,2 +23,3 @@ constructor(config) {

this.sqsClient = new sqs_1.SQSClient(config.queueName, config.sqsConfig, config.awsCredentials, config.log);
this.log = config.taskLog || new logger_1.NoopLogger();
}

@@ -30,2 +35,3 @@ execute() {

catch (error) {
this.log.error(`Unable to handle message: ${message}`, error);
yield this.backoff(sqsMessage);

@@ -45,3 +51,3 @@ throw error;

return __awaiter(this, void 0, void 0, function* () {
const queueArn = yield this.sqsClient.getArn();
const queueArn = yield this.sqsClient.getOrCreateArn();
return yield snsClient.subscribeQueueToTopic(topicName, queueArn);

@@ -48,0 +54,0 @@ });

{
"name": "lib-task-scheduler",
"version": "1.0.33",
"version": "1.0.38",
"description": "A lightweight, modular task scheduler.",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc