Comparing version 3.3.0-beta.4 to 3.3.0-beta.5
@@ -127,2 +127,20 @@ 'use strict'; | ||
this._inited = true; | ||
// 订阅重试 TOPIC | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
this.subscribe(retryTopic, '*', async msg => { | ||
const originTopic = msg.retryTopic; | ||
const originMsgId = msg.originMessageId; | ||
const handler = this._subscriptions.get(msg.topic); | ||
if (!MixAll.isRetryTopic(originTopic) && handler) { | ||
await handler(msg); | ||
} else { | ||
this.logger.warn('[MQPushConsumer] retry message no handler, originTopic: %s, originMsgId: %s, msgId: %s', | ||
originTopic, | ||
originMsgId, | ||
msg.msgId); | ||
} | ||
}); | ||
} | ||
} | ||
@@ -173,9 +191,2 @@ | ||
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic); | ||
this.subscriptions.set(retryTopic, { | ||
subscriptionData: retrySubscriptionData, | ||
}); | ||
} | ||
const tagsSet = subscriptionData.tagsSet; | ||
@@ -207,11 +218,2 @@ const needFilter = !!tagsSet.length; | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
loops.push((async () => { | ||
// 重试消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(retryTopic)) { | ||
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, subExpression); | ||
} | ||
})()); | ||
} | ||
@@ -737,9 +739,15 @@ await Promise.all(loops); | ||
const newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body); | ||
newMsg.flag = msg.flag; | ||
newMsg.properties = msg.properties; | ||
newMsg.originMessageId = msg.originMessageId || msg.msgId; | ||
newMsg.retryTopic = msg.topic; | ||
let newMsg; | ||
if (MixAll.isRetryTopic(msg.topic)) { | ||
newMsg = msg; | ||
} else { | ||
newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body); | ||
newMsg.flag = msg.flag; | ||
newMsg.properties = msg.properties; | ||
newMsg.originMessageId = msg.originMessageId || msg.msgId; | ||
newMsg.retryTopic = msg.topic; | ||
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes; | ||
} | ||
newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1); | ||
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes; | ||
newMsg.delayTimeLevel = 3 + msg.reconsumeTimes; | ||
@@ -746,0 +754,0 @@ await (await MQProducer.getDefaultProducer()).send(newMsg); |
@@ -23,1 +23,10 @@ 'use strict'; | ||
}; | ||
/** | ||
* 判断是否为 RETRY_TOPIC | ||
* @param {String} topic topic | ||
* @return {boolean} ret | ||
*/ | ||
exports.isRetryTopic = topic => { | ||
return topic && topic.startsWith(exports.RETRY_GROUP_TOPIC_PREFIX); | ||
}; |
{ | ||
"name": "ali-ons", | ||
"version": "3.3.0-beta.4", | ||
"version": "3.3.0-beta.5", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
148836
4112