Comparing version 2.1.12 to 2.1.13
{ | ||
"name": "sqns", | ||
"version": "2.1.12", | ||
"version": "2.1.13", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -26,3 +26,3 @@ "use strict"; | ||
this.MessageSystemAttribute = item.MessageSystemAttribute || {}; | ||
this.priority = item.priority || EventItem.PRIORITY.DEFAULT; | ||
this.priority = isNaN(item.priority) ? EventItem.PRIORITY.DEFAULT : item.priority; | ||
this.state = item.state || EventState.PENDING; | ||
@@ -29,0 +29,0 @@ this.eventTime = item.eventTime; |
@@ -53,6 +53,9 @@ "use strict"; | ||
log.info('Adding scheduler job for event master.'); | ||
this.job = schedule.scheduleJob(cronInterval, () => this.queueNames | ||
.filter((queueName) => !this.queueConfigs[queueName].sending) | ||
.forEach((queueName) => this | ||
.requestEventsToAddInQueueAsynchronous(this.queueConfigs[queueName], this.queueConfigs[queueName].cloneBaseParams))); | ||
this.job = schedule.scheduleJob(cronInterval, () => { | ||
log.info('Executing Manage Job Interval'); | ||
const queuesNotSendingEvent = this.queueNames.filter((queueName) => !this.queueConfigs[queueName].sending); | ||
log.info('Queues to start event sending:', queuesNotSendingEvent); | ||
queuesNotSendingEvent.forEach((queueName) => this | ||
.requestEventsToAddInQueueAsynchronous(this.queueConfigs[queueName], this.queueConfigs[queueName].cloneBaseParams)); | ||
}); | ||
} | ||
@@ -59,0 +62,0 @@ requestEventsToAddInQueueAsynchronous(queueConfig_, itemListParams) { |
@@ -143,5 +143,8 @@ "use strict"; | ||
log.info('Adding scheduler job for event slave.'); | ||
this.job = schedule.scheduleJob(cronInterval, () => this.queueNames | ||
.filter((queueName) => !this.queueConfigs[queueName].polling) | ||
.forEach((queueName) => this.checkIfMoreItemsCanBeProcessed(this.queueConfigs[queueName]))); | ||
this.job = schedule.scheduleJob(cronInterval, () => { | ||
log.info('Executing Worker Job Interval'); | ||
const queuesNotPollingEvent = this.queueNames.filter((queueName) => !this.queueConfigs[queueName].polling); | ||
log.info('Queues to start event polling:', queuesNotPollingEvent); | ||
queuesNotPollingEvent.forEach((queueName) => this.checkIfMoreItemsCanBeProcessed(this.queueConfigs[queueName])); | ||
}); | ||
} | ||
@@ -152,8 +155,11 @@ checkIfMoreItemsCanBeProcessed(workerQueueConfig_) { | ||
if (workerQueueConfig.config.count >= workerQueueConfig.config.MAX_COUNT) { | ||
log.info('Queue:', workerQueueConfig.queueName, 'already maximum task running.'); | ||
return; | ||
} | ||
while (workerQueueConfig.config.count < workerQueueConfig.config.MAX_COUNT && workerQueueConfig.hasMore) { | ||
log.info('Queue:', workerQueueConfig.queueName, 'Processing new event.'); | ||
this.requestEventToProcessAsynchronous(workerQueueConfig); | ||
} | ||
if (!workerQueueConfig.config.count && !workerQueueConfig.hasMore) { | ||
log.info('Queue:', workerQueueConfig.queueName, 'No events to process reset status.'); | ||
workerQueueConfig.polling = false; | ||
@@ -160,0 +166,0 @@ workerQueueConfig.hasMore = true; |
@@ -136,6 +136,9 @@ "use strict"; | ||
async sendMessage(queue, MessageBody, MessageAttribute, MessageSystemAttribute, DelaySeconds = '0', MessageDeduplicationId) { | ||
var _a; | ||
var _a, _b, _c; | ||
this.storageToQueueWorker.setUpIntervalForQueue(queue); | ||
const deliveryPolicy = delivery_policy_helper_1.DeliveryPolicyHelper | ||
.verifyAndGetChannelDeliveryPolicy((_a = MessageAttribute === null || MessageAttribute === void 0 ? void 0 : MessageAttribute.DeliveryPolicy) === null || _a === void 0 ? void 0 : _a.StringValue); | ||
const priority = isNaN(Number((_b = MessageAttribute === null || MessageAttribute === void 0 ? void 0 : MessageAttribute.Priority) === null || _b === void 0 ? void 0 : _b.StringValue)) | ||
? event_item_1.EventItem.PRIORITY.DEFAULT | ||
: Math.max(Math.min(Math.floor(Number((_c = MessageAttribute === null || MessageAttribute === void 0 ? void 0 : MessageAttribute.Priority) === null || _c === void 0 ? void 0 : _c.StringValue)), event_item_1.EventItem.PRIORITY.DEFAULT), 0); | ||
const eventItem = new event_item_1.EventItem({ | ||
@@ -150,2 +153,3 @@ id: undefined, | ||
maxReceiveCount: queue.getMaxReceiveCount(), | ||
priority, | ||
eventTime: new Date(new Date().getTime() + (Number(DelaySeconds) * 1000)), | ||
@@ -152,0 +156,0 @@ }); |
@@ -0,3 +1,3 @@ | ||
import { BASE_CONFIG } from '../../../../typings/common'; | ||
import { QueueStorageToQueueConfigListener } from '../../../../typings/config'; | ||
import { BASE_CONFIG } from '../../../../typings/typings'; | ||
import { Queue } from '../../common/model/queue'; | ||
@@ -4,0 +4,0 @@ export declare class QueueStorageToQueueScheduler { |
@@ -34,3 +34,6 @@ "use strict"; | ||
log.info(`Adding scheduler job for queueARN: ${queue.arn}`); | ||
this._job = schedule.scheduleJob(cronInterval || '*/5 * * * * *', () => this.startProcessingOfQueue()); | ||
this._job = schedule.scheduleJob(cronInterval || '*/5 * * * * *', () => { | ||
log.info('Executing Manage Job Interval'); | ||
this.startProcessingOfQueue(); | ||
}); | ||
} | ||
@@ -46,4 +49,6 @@ cancel() { | ||
if (this.config.sending) { | ||
log.info('Queues:', this.config.queues, 'already fetching events.'); | ||
return; | ||
} | ||
log.info('Queues:', this.config.queues, 'start fetching events.'); | ||
this.findEventsToAddInQueueAsynchronous(this.config.queues.map((each) => each), this.config.cloneBaseParams); | ||
@@ -62,2 +67,3 @@ } | ||
if (!hasMoreData) { | ||
log.info('Queues:', this.config.queues, 'No more data to fetch, resetting.'); | ||
this.config.sending = false; | ||
@@ -64,0 +70,0 @@ return; |
@@ -29,3 +29,3 @@ import { EventState } from '../src/sqns/common/model/event-item'; | ||
maxReceiveCount: number; | ||
priority?: number; | ||
priority: number; | ||
sentTime?: Date; | ||
@@ -32,0 +32,0 @@ firstSentTime?: Date; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
671052
8235