Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ali-ons

Package Overview
Dependencies
Maintainers
2
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ali-ons - npm Package Compare versions

Comparing version 3.3.0-beta.1 to 3.3.0-beta.2

50

lib/consumer/mq_push_consumer.js

@@ -172,8 +172,9 @@ 'use strict';

const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression);
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic, subExpression);
this.subscriptions.set(retryTopic, {
handler,
subscriptionData: retrySubscriptionData,
});
if (this.messageModel === MessageModel.CLUSTERING) {
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic);
this.subscriptions.set(retryTopic, {
subscriptionData: retrySubscriptionData,
});
}
const tagsSet = subscriptionData.tagsSet;

@@ -196,14 +197,22 @@ const needFilter = !!tagsSet.length;

await Promise.all([ (async () => {
const loops = [];
loops.push((async () => {
// 消息消费循环
while (!this._isClosed && this.subscriptions.has(topic)) {
await this._consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression);
await this._consumeMessageLoop(topic, needFilter, tagsSet, subExpression);
}
})(), (async () => {
// 重试消息消费循环
while (!this._isClosed && this.subscriptions.has(retryTopic)) {
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, handler, subExpression);
}
})() ]);
})());
if (this.messageModel === MessageModel.CLUSTERING) {
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
loops.push((async () => {
// 重试消息消费循环
while (!this._isClosed && this.subscriptions.has(retryTopic)) {
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, subExpression);
}
})());
}
await Promise.all(loops);
} catch (err) {

@@ -215,3 +224,3 @@ this._handleError(err);

async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) {
async _consumeMessageLoop(topic, needFilter, tagsSet, subExpression) {
const mqList = this._topicSubscribeInfoTable.get(topic);

@@ -236,2 +245,4 @@ let hasMsg = false;

utils.resetRetryTopic(msg, this.consumerGroup);
// 用恢复后的 topic 获取 handler
const handler = this._subscriptions.get(msg.topic).handler;
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {

@@ -410,4 +421,4 @@ consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq));

const pullRT = Date.now() - processQueue.lastPullTimestamp;
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.',
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT);
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, messageQueue: %s cost: %dms.',
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, messageQueue.key, pullRT);

@@ -486,3 +497,6 @@ // submit to consumer

correctTagsOffset(pullRequest) {
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true);
// 仅当已拉下的消息消费完的情况下才更新 offset
if (pullRequest.processQueue.msgCount === 0) {
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true);
}
}

@@ -489,0 +503,0 @@

@@ -60,3 +60,3 @@ 'use strict';

const groupTopic = MixAll.getRetryTopic(consumerGroup);
if (msg.retryTopic === groupTopic) {
if (msg.topic === groupTopic) {
msg.topic = msg.retryTopic;

@@ -63,0 +63,0 @@ }

{
"name": "ali-ons",
"version": "3.3.0-beta.1",
"version": "3.3.0-beta.2",
"description": "Aliyun Open Notification Service Client",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc