lib-task-scheduler
Advanced tools
Comparing version 1.0.33 to 1.0.38
/// <reference types="chai" /> | ||
import * as AWS from 'aws-sdk'; | ||
import { AwsCredentialsConfig } from './aws'; | ||
@@ -15,7 +16,12 @@ import { Logger } from '../logger'; | ||
private snsSubscribeTopic; | ||
private snsListTopics; | ||
constructor(config: SNSConfig, awsConfig: AwsCredentialsConfig, log?: Logger); | ||
private getArn(topicName); | ||
private topicFullName(topicName); | ||
private getOrCreateArn(topicName, createIfUnknown); | ||
publishEvent(topicName: string, message: Object): Promise<void>; | ||
createTopic(topicName: string): Promise<string>; | ||
findTopicArnOrThrow(topicName: string): Promise<string>; | ||
listAllTopics(): Promise<AWS.SNS.Topic[]>; | ||
listTopics(nextToken?: string): Promise<AWS.SNS.ListTopicsResponse>; | ||
subscribeQueueToTopic(topicName: string, queueArn: string): Promise<string>; | ||
} |
@@ -10,2 +10,3 @@ "use strict"; | ||
}; | ||
const _ = require('lodash'); | ||
const AWS = require('aws-sdk'); | ||
@@ -28,8 +29,17 @@ const bPromise = require('bluebird'); | ||
this.snsSubscribeTopic = bPromise.promisify(snsClient.subscribe, context); | ||
this.snsListTopics = bPromise.promisify(snsClient.listTopics, context); | ||
this.log = log || new logger_1.NoopLogger(); | ||
} | ||
getArn(topicName) { | ||
topicFullName(topicName) { | ||
return `${this.config.namePrefix}-${topicName}`; | ||
} | ||
getOrCreateArn(topicName, createIfUnknown) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (!this.topicArns.has(topicName)) { | ||
this.topicArns.set(topicName, yield this.createTopic(topicName)); | ||
if (createIfUnknown) { | ||
this.topicArns.set(topicName, yield this.createTopic(topicName)); | ||
} | ||
else { | ||
this.topicArns.set(topicName, yield this.findTopicArnOrThrow(topicName)); | ||
} | ||
} | ||
@@ -41,3 +51,3 @@ return this.topicArns.get(topicName); | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const topicArn = yield this.getArn(topicName); | ||
const topicArn = yield this.getOrCreateArn(topicName, true); | ||
this.log.info(`Publishing message to ${topicName}: ${JSON.stringify(message)}`); | ||
@@ -50,5 +60,6 @@ yield this.snsPublish({ | ||
} | ||
// this call is idempotent | ||
createTopic(topicName) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const fullTopicName = `${this.config.namePrefix}-${topicName}`; | ||
const fullTopicName = this.topicFullName(topicName); | ||
this.log.info(`Creating topic ${fullTopicName}`); | ||
@@ -61,7 +72,40 @@ const result = yield this.snsCreateTopic({ | ||
} | ||
findTopicArnOrThrow(topicName) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const fullTopicName = this.topicFullName(topicName); | ||
// the topic name is the last part of the Arn, which has ':' as field separator | ||
const mapped = _.map(yield this.listAllTopics(), 'TopicArn'); | ||
const foundArn = _.find(mapped, t => t.endsWith(`:${fullTopicName}`)); | ||
if (!foundArn) { | ||
throw new Error(`Unable to find arn for topic ${fullTopicName}. Arns found: ${JSON.stringify(mapped)}`); | ||
} | ||
return foundArn; | ||
}); | ||
} | ||
listAllTopics() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let allTopics = []; | ||
let nextToken = undefined; | ||
do { | ||
const response = yield this.listTopics(nextToken); | ||
nextToken = response.NextToken; | ||
allTopics = _.concat(allTopics, response.Topics); | ||
} while (nextToken); | ||
this.log.debug(`Found topics: ${JSON.stringify(allTopics)}`); | ||
return allTopics; | ||
}); | ||
} | ||
listTopics(nextToken = undefined) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this.log.debug('Listing topics'); | ||
return yield this.snsListTopics({ | ||
NextToken: nextToken, | ||
}); | ||
}); | ||
} | ||
// this call is idempotent | ||
subscribeQueueToTopic(topicName, queueArn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const topicArn = yield this.getArn(topicName); | ||
this.log.info(`Subscribing queue ${queueArn} to topic ${topicName}`); | ||
const topicArn = yield this.getOrCreateArn(topicName, false); | ||
this.log.info(`Subscribing queue ${queueArn} to topic ${topicArn}`); | ||
const result = yield this.snsSubscribeTopic({ | ||
@@ -68,0 +112,0 @@ Endpoint: queueArn, |
@@ -45,3 +45,3 @@ import * as AWS from 'aws-sdk'; | ||
deleteMessage(receiptHandle: string): Promise<void>; | ||
getArn(): Promise<string>; | ||
getOrCreateArn(): Promise<string>; | ||
} |
@@ -87,3 +87,4 @@ "use strict"; | ||
const attributes = this.awsAttrs(); | ||
this.log.info(`Creating SQS queue ${JSON.stringify(this.config)}`); | ||
this.log.info(`Creating SQS queue ${this.queueName} | ||
with attributes ${JSON.stringify(attributes)}. Config: ${JSON.stringify(this.config)}`); | ||
const result = yield this.awsCreateQueue({ | ||
@@ -161,3 +162,3 @@ QueueName: this.queueName, | ||
} | ||
getArn() { | ||
getOrCreateArn() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
@@ -164,0 +165,0 @@ if (!this.queueArn) { |
/// <reference types="node" /> | ||
export * from './source_sink_task'; | ||
export * from './sqs_consumer_task'; | ||
export * from './sns_sqs_consumer_task'; | ||
export * from './source_sink_runner'; | ||
@@ -5,0 +6,0 @@ export * from './sqs_queue'; |
@@ -17,2 +17,3 @@ "use strict"; | ||
__export(require('./sqs_consumer_task')); | ||
__export(require('./sns_sqs_consumer_task')); | ||
__export(require('./source_sink_runner')); | ||
@@ -19,0 +20,0 @@ __export(require('./sqs_queue')); |
@@ -14,6 +14,8 @@ /// <reference types="chai" /> | ||
log?: Logger; | ||
taskLog?: Logger; | ||
} | ||
export declare abstract class SQSConsumerTask implements Task { | ||
private config; | ||
private sqsClient; | ||
private readonly sqsClient; | ||
private readonly log; | ||
constructor(config: SQSTaskConfig); | ||
@@ -20,0 +22,0 @@ execute(): Promise<boolean>; |
@@ -12,4 +12,8 @@ "use strict"; | ||
const sqs_1 = require('./clients/sqs'); | ||
const logger_1 = require('./logger'); | ||
const MAX_RETRY_INTERVAL_SECS = moment.duration(10, 'minutes').asSeconds(); | ||
const BASE_RETRY_INTERVAL_SECS = moment.duration(30, 'seconds').asSeconds(); | ||
/* | ||
* Consumes messages from an SQS queue. | ||
*/ | ||
class SQSConsumerTask { | ||
@@ -19,2 +23,3 @@ constructor(config) { | ||
this.sqsClient = new sqs_1.SQSClient(config.queueName, config.sqsConfig, config.awsCredentials, config.log); | ||
this.log = config.taskLog || new logger_1.NoopLogger(); | ||
} | ||
@@ -30,2 +35,3 @@ execute() { | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${message}`, error); | ||
yield this.backoff(sqsMessage); | ||
@@ -45,3 +51,3 @@ throw error; | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const queueArn = yield this.sqsClient.getArn(); | ||
const queueArn = yield this.sqsClient.getOrCreateArn(); | ||
return yield snsClient.subscribeQueueToTopic(topicName, queueArn); | ||
@@ -48,0 +54,0 @@ }); |
{ | ||
"name": "lib-task-scheduler", | ||
"version": "1.0.33", | ||
"version": "1.0.38", | ||
"description": "A lightweight, modular task scheduler.", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
127405
57
1856