Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ali-ons

Package Overview
Dependencies
Maintainers
2
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ali-ons - npm Package Compare versions

Comparing version 3.3.0-beta.0 to 3.3.0-beta.1

60

lib/consumer/mq_push_consumer.js

@@ -229,30 +229,8 @@ 'use strict';

}
let localRetry = false;
// 并发消费任务
const consumeTasks = [];
for (const msg of msgs) {
utils.resetRetryTopic(msg, this.consumerGroup);
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
try {
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) {
await handler(msg, mq, pq);
}
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
// delayLevel 为 0 代表由服务端控制重试间隔
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup);
} catch (err) {
this.emit('error', err);
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId);
// 重试消息发送失败,本地重试
msg.reconsumeTimes++;
await this._sleep(5000);
localRetry = true;
}
} else {
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
}
consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq));
} else {

@@ -262,3 +240,6 @@ this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);

}
if (localRetry) {
// 必须全部成功
try {
await Promise.all(consumeTasks);
} catch (err) {
continue;

@@ -281,2 +262,29 @@ }

async consumeSingleMsg(handler, msg, mq, pq) {
try {
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) {
await handler(msg, mq, pq);
}
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
// delayLevel 为 0 代表由服务端控制重试间隔
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup);
} catch (err) {
this.emit('error', err);
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId);
// 重试消息发送失败,本地重试
msg.reconsumeTimes++;
await this._sleep(5000);
throw err;
}
} else {
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
}
}
/**

@@ -283,0 +291,0 @@ * construct subscription data

{
"name": "ali-ons",
"version": "3.3.0-beta.0",
"version": "3.3.0-beta.1",
"description": "Aliyun Open Notification Service Client",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc