Comparing version 3.3.0-beta.0 to 3.3.0-beta.1
@@ -229,30 +229,8 @@ 'use strict'; | ||
} | ||
let localRetry = false; | ||
// 并发消费任务 | ||
const consumeTasks = []; | ||
for (const msg of msgs) { | ||
utils.resetRetryTopic(msg, this.consumerGroup); | ||
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
try { | ||
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) { | ||
await handler(msg, mq, pq); | ||
} | ||
} catch (err) { | ||
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; | ||
this.emit('error', err); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
// 发送重试消息 | ||
try { | ||
// delayLevel 为 0 代表由服务端控制重试间隔 | ||
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup); | ||
} catch (err) { | ||
this.emit('error', err); | ||
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId); | ||
// 重试消息发送失败,本地重试 | ||
msg.reconsumeTimes++; | ||
await this._sleep(5000); | ||
localRetry = true; | ||
} | ||
} else { | ||
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); | ||
} | ||
} | ||
consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq)); | ||
} else { | ||
@@ -262,3 +240,6 @@ this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); | ||
} | ||
if (localRetry) { | ||
// 必须全部成功 | ||
try { | ||
await Promise.all(consumeTasks); | ||
} catch (err) { | ||
continue; | ||
@@ -281,2 +262,29 @@ } | ||
async consumeSingleMsg(handler, msg, mq, pq) { | ||
try { | ||
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) { | ||
await handler(msg, mq, pq); | ||
} | ||
} catch (err) { | ||
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; | ||
this.emit('error', err); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
// 发送重试消息 | ||
try { | ||
// delayLevel 为 0 代表由服务端控制重试间隔 | ||
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup); | ||
} catch (err) { | ||
this.emit('error', err); | ||
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId); | ||
// 重试消息发送失败,本地重试 | ||
msg.reconsumeTimes++; | ||
await this._sleep(5000); | ||
throw err; | ||
} | ||
} else { | ||
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); | ||
} | ||
} | ||
} | ||
/** | ||
@@ -283,0 +291,0 @@ * construct subscription data |
{ | ||
"name": "ali-ons", | ||
"version": "3.3.0-beta.0", | ||
"version": "3.3.0-beta.1", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
4086
0
148058