Comparing version 3.3.0-beta.1 to 3.3.0-beta.2
@@ -172,8 +172,9 @@ 'use strict'; | ||
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression); | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic, subExpression); | ||
this.subscriptions.set(retryTopic, { | ||
handler, | ||
subscriptionData: retrySubscriptionData, | ||
}); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic); | ||
this.subscriptions.set(retryTopic, { | ||
subscriptionData: retrySubscriptionData, | ||
}); | ||
} | ||
const tagsSet = subscriptionData.tagsSet; | ||
@@ -196,14 +197,22 @@ const needFilter = !!tagsSet.length; | ||
await Promise.all([ (async () => { | ||
const loops = []; | ||
loops.push((async () => { | ||
// 消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(topic)) { | ||
await this._consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression); | ||
await this._consumeMessageLoop(topic, needFilter, tagsSet, subExpression); | ||
} | ||
})(), (async () => { | ||
// 重试消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(retryTopic)) { | ||
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, handler, subExpression); | ||
} | ||
})() ]); | ||
})()); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
loops.push((async () => { | ||
// 重试消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(retryTopic)) { | ||
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, subExpression); | ||
} | ||
})()); | ||
} | ||
await Promise.all(loops); | ||
} catch (err) { | ||
@@ -215,3 +224,3 @@ this._handleError(err); | ||
async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) { | ||
async _consumeMessageLoop(topic, needFilter, tagsSet, subExpression) { | ||
const mqList = this._topicSubscribeInfoTable.get(topic); | ||
@@ -236,2 +245,4 @@ let hasMsg = false; | ||
utils.resetRetryTopic(msg, this.consumerGroup); | ||
// 用恢复后的 topic 获取 handler | ||
const handler = this._subscriptions.get(msg.topic).handler; | ||
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
@@ -410,4 +421,4 @@ consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq)); | ||
const pullRT = Date.now() - processQueue.lastPullTimestamp; | ||
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.', | ||
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT); | ||
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, messageQueue: %s cost: %dms.', | ||
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, messageQueue.key, pullRT); | ||
@@ -486,3 +497,6 @@ // submit to consumer | ||
correctTagsOffset(pullRequest) { | ||
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true); | ||
// 仅当已拉下的消息消费完的情况下才更新 offset | ||
if (pullRequest.processQueue.msgCount === 0) { | ||
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true); | ||
} | ||
} | ||
@@ -489,0 +503,0 @@ |
@@ -60,3 +60,3 @@ 'use strict'; | ||
const groupTopic = MixAll.getRetryTopic(consumerGroup); | ||
if (msg.retryTopic === groupTopic) { | ||
if (msg.topic === groupTopic) { | ||
msg.topic = msg.retryTopic; | ||
@@ -63,0 +63,0 @@ } |
{ | ||
"name": "ali-ons", | ||
"version": "3.3.0-beta.1", | ||
"version": "3.3.0-beta.2", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
148585
4098