Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ali-ons

Package Overview
Dependencies
Maintainers
2
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ali-ons - npm Package Compare versions

Comparing version 3.2.2 to 3.3.0-beta.0

174

lib/consumer/mq_push_consumer.js

@@ -9,2 +9,3 @@ 'use strict';

const MQClient = require('../mq_client');
const MQProducer = require('../producer/mq_producer');
const sleep = require('mz-modules/sleep');

@@ -23,2 +24,4 @@ const ClientConfig = require('../client_config');

const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely');
const Message = require('../message/message');
const MessageConst = require('../message/message_const');

@@ -46,4 +49,6 @@ const defaultOptions = {

pullBatchSize: 32, // 拉消息,一次拉多少条
parallelConsumeLimit: 1, // 并发消费消息限制
postSubscriptionWhenPull: true, // 是否每次拉消息时,都上传订阅关系
allocateMessageQueueStrategy: new AllocateMessageQueueAveragely(), // 队列分配算法,应用可重写
maxReconsumeTimes: 16, // 最大重试次数
};

@@ -54,3 +59,6 @@

assert(options && options.consumerGroup, '[MQPushConsumer] options.consumerGroup is required');
super(Object.assign({ initMethod: 'init' }, defaultOptions, options));
const mergedOptions = Object.assign({ initMethod: 'init' }, defaultOptions, options);
assert(mergedOptions.parallelConsumeLimit <= mergedOptions.pullBatchSize,
'[MQPushConsumer] options.parallelConsumeLimit must lte pullBatchSize');
super(mergedOptions);

@@ -92,2 +100,6 @@ // @example:

get parallelConsumeLimit() {
return this.options.parallelConsumeLimit;
}
get consumerGroup() {

@@ -115,2 +127,3 @@ return this.options.consumerGroup;

this._mqClient.registerConsumer(this.consumerGroup, this);
await MQProducer.getDefaultProducer(this.options);
await this._mqClient.ready();

@@ -157,4 +170,2 @@ await this._offsetStore.load();

subscribe(topic, subExpression, handler) {
const _self = this;
if (arguments.length === 2) {

@@ -165,8 +176,14 @@ handler = subExpression;

assert(is.asyncFunction(handler), '[MQPushConsumer] handler should be a asyncFunction');
assert(!_self.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);
const subscriptionData = _self.buildSubscriptionData(_self.consumerGroup, topic, subExpression);
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression);
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic, subExpression);
this.subscriptions.set(retryTopic, {
handler,
subscriptionData: retrySubscriptionData,
});
const tagsSet = subscriptionData.tagsSet;
const needFilter = !!tagsSet.length;
_self.subscriptions.set(topic, {
this.subscriptions.set(topic, {
handler,

@@ -178,56 +195,92 @@ subscriptionData,

try {
await _self.ready();
await this.ready();
// 如果 topic 没有路由信息,先更新一下
if (!_self._topicSubscribeInfoTable.has(topic)) {
await _self._mqClient.updateAllTopicRouterInfo();
await _self._mqClient.sendHeartbeatToAllBroker();
await _self._mqClient.doRebalance();
if (!this._topicSubscribeInfoTable.has(topic)) {
await this._mqClient.updateAllTopicRouterInfo();
await this._mqClient.sendHeartbeatToAllBroker();
await this._mqClient.doRebalance();
}
while (!_self._isClosed && _self.subscriptions.has(topic)) {
const mqList = _self._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = _self._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
const msg = pq.msgList[0];
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
await Promise.all([ (async () => {
// 消息消费循环
while (!this._isClosed && this.subscriptions.has(topic)) {
await this._consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression);
}
})(), (async () => {
// 重试消息消费循环
while (!this._isClosed && this.subscriptions.has(retryTopic)) {
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, handler, subExpression);
}
})() ]);
} catch (err) {
this._handleError(err);
}
})();
}
async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) {
const mqList = this._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = this._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
let msgs;
if (this.parallelConsumeLimit > pq.msgCount) {
msgs = pq.msgList.slice(0, pq.msgCount);
} else {
msgs = pq.msgList.slice(0, this.parallelConsumeLimit);
}
let localRetry = false;
for (const msg of msgs) {
utils.resetRetryTopic(msg, this.consumerGroup);
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
try {
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) {
await handler(msg, mq, pq);
}
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
await handler(msg, mq, pq);
// delayLevel 为 0 代表由服务端控制重试间隔
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup);
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
_self.emit('error', err);
if (_self.messageModel === MessageModel.CLUSTERING) {
// TODO: support retry message
msg.reconsumeTimes++;
await _self._sleep(5000);
continue;
} else {
_self.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
this.emit('error', err);
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId);
// 重试消息发送失败,本地重试
msg.reconsumeTimes++;
await this._sleep(5000);
localRetry = true;
}
} else {
_self.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
const offset = pq.remove();
if (offset >= 0) {
_self._offsetStore.updateOffset(mq, offset, true);
}
}
} else {
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
}
if (localRetry) {
continue;
}
// 注意这里必须是批量确认
const offset = pq.remove(msgs.length);
if (offset >= 0) {
this._offsetStore.updateOffset(mq, offset, true);
}
}
if (!hasMsg) {
await _self.await(`topic_${topic}_changed`);
}
}
} catch (err) {
_self._handleError(err);
}
})();
}
if (!hasMsg) {
await this.await(`topic_${topic}_changed`);
}
}

@@ -651,2 +704,31 @@

async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) {
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName)
: msg.storeHost;
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup;
try {
await this._mqClient.consumerSendMessageBack(
brokerAddr,
msg,
thatConsumerGroup,
delayLevel,
3000,
this.options.maxReconsumeTimes);
} catch (err) {
err.mesasge = 'sendMessageBack() occurred an exception, ' + thatConsumerGroup + ', ' + err.mesasge;
this._handleError(err);
const newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body);
newMsg.flag = msg.flag;
newMsg.properties = msg.properties;
newMsg.originMessageId = msg.originMessageId || msg.msgId;
newMsg.retryTopic = msg.topic;
newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1);
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes;
newMsg.delayTimeLevel = 1 + msg.reconsumeTimes;
await (await MQProducer.getDefaultProducer()).send(newMsg);
}
}
// * viewMessage(msgId) {

@@ -653,0 +735,0 @@ // const info = MessageDecoder.decodeMessageId(msgId);

@@ -26,3 +26,4 @@ 'use strict';

exports.PROPERTY_RECONSUME_TIME = 'RECONSUME_TIME';
exports.PROPERTY_MAX_RECONSUME_TIMES = 'MAX_RECONSUME_TIMES';
exports.KEY_SEPARATOR = ' ';

@@ -71,2 +71,26 @@ 'use strict';

/**
* 原始消息 Id
* @property {String} Message#originMessageId
*/
get originMessageId() {
return this.properties && this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID];
}
set originMessageId(val) {
this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID] = val;
}
/**
* 重试 Topic
* @property {String} Message#retryTopic
*/
get retryTopic() {
return this.properties && this.properties[MessageConst.PROPERTY_RETRY_TOPIC];
}
set retryTopic(val) {
this.properties[MessageConst.PROPERTY_RETRY_TOPIC] = val;
}
/**
* 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)

@@ -73,0 +97,0 @@ * @property {Number} Message#delayTimeLevel

@@ -6,2 +6,3 @@ 'use strict';

exports.DEFAULT_CONSUMER_GROUP = 'DEFAULT_CONSUMER';
exports.CLIENT_INNER_PRODUCER_GROUP = 'CLIENT_INNER_PRODUCER';

@@ -15,4 +16,9 @@ exports.DEFAULT_CHARSET = 'UTF-8';

/**
* 获取 RETRY_TOPIC
* @param {string} consumerGroup consumerGroup
* @return {string} %RETRY%+consumerGroup
*/
exports.getRetryTopic = consumerGroup => {
return exports.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
};

@@ -469,2 +469,3 @@ 'use strict';

k: requestHeader.unitMode,
l: requestHeader.maxReconsumeTimes,
};

@@ -504,2 +505,32 @@ const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);

/**
* consumer send message back
* @param {String} brokerAddr - broker address
* @param {Message} msg - message object
* @param {String} consumerGroup - consumer group
* @param {Number} delayLevel - delay level
* @param {Number} timeoutMillis - timeout in millis
* @param {Number} maxConsumeRetryTimes - max retry times
*/
async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) {
const requestHeader = {
offset: msg.commitLogOffset,
group: consumerGroup,
delayLevel,
originMsgId: msg.msgId,
originTopic: msg.topic,
unitMode: false,
maxReconsumeTimes: maxConsumeRetryTimes,
};
const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
const response = await this.invoke(brokerAddr, request, timeoutMillis);
switch (response.code) {
case ResponseCode.SUCCESS:
return;
default:
this._defaultHandler(request, response);
break;
}
}
// * viewMessage(brokerAddr, phyoffset, timeoutMillis) {

@@ -506,0 +537,0 @@ // const requestHeader = {

@@ -117,3 +117,3 @@ 'use strict';

* @param {String} group - consumer group name
* @param {Comsumer} consumer - consumer instance
* @param {Consumer} consumer - consumer instance
* @return {void}

@@ -120,0 +120,0 @@ */

@@ -81,2 +81,3 @@ 'use strict';

// this._topicPublishInfoTable.set(this.createTopicKey, new TopicPublishInfo());
await MQProducer.getDefaultProducer(this.options);
this._mqClient.registerProducer(this.producerGroup, this);

@@ -299,2 +300,7 @@ await this._mqClient.ready();

}
const maxReconsumeTimes = msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES];
if (maxReconsumeTimes) {
requestHeader.maxReconsumeTimes = parseInt(maxReconsumeTimes, 10);
delete msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES];
}
}

@@ -328,2 +334,18 @@

}
/**
* 获取默认 Producer 实例
* @param {Object} options 配置
* @return {MQProducer} 默认 Producer 实例
*/
static async getDefaultProducer(options = {}) {
if (!MQProducer.defaultProducer) {
const opts = Object.assign({}, options);
opts.producerGroup = MixAll.CLIENT_INNER_PRODUCER_GROUP;
MQProducer.defaultProducer = new MQProducer(opts);
await MQProducer.defaultProducer.ready();
return MQProducer.defaultProducer;
}
return MQProducer.defaultProducer;
}
}

@@ -330,0 +352,0 @@

@@ -6,2 +6,3 @@ 'use strict';

const zlib = require('zlib');
const MixAll = require('../mix_all');
const is = require('is-type-of');

@@ -57,2 +58,10 @@

// 重置重试消息的 Topic
exports.resetRetryTopic = function(msg, consumerGroup) {
const groupTopic = MixAll.getRetryTopic(consumerGroup);
if (msg.retryTopic === groupTopic) {
msg.topic = msg.retryTopic;
}
};
function zeroize(value, length) {

@@ -59,0 +68,0 @@ if (!length) {

{
"name": "ali-ons",
"version": "3.2.2",
"version": "3.3.0-beta.0",
"description": "Aliyun Open Notification Service Client",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc