Comparing version 3.2.2 to 3.3.0-beta.0
@@ -9,2 +9,3 @@ 'use strict'; | ||
const MQClient = require('../mq_client'); | ||
const MQProducer = require('../producer/mq_producer'); | ||
const sleep = require('mz-modules/sleep'); | ||
@@ -23,2 +24,4 @@ const ClientConfig = require('../client_config'); | ||
const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely'); | ||
const Message = require('../message/message'); | ||
const MessageConst = require('../message/message_const'); | ||
@@ -46,4 +49,6 @@ const defaultOptions = { | ||
pullBatchSize: 32, // 拉消息,一次拉多少条 | ||
parallelConsumeLimit: 1, // 并发消费消息限制 | ||
postSubscriptionWhenPull: true, // 是否每次拉消息时,都上传订阅关系 | ||
allocateMessageQueueStrategy: new AllocateMessageQueueAveragely(), // 队列分配算法,应用可重写 | ||
maxReconsumeTimes: 16, // 最大重试次数 | ||
}; | ||
@@ -54,3 +59,6 @@ | ||
assert(options && options.consumerGroup, '[MQPushConsumer] options.consumerGroup is required'); | ||
super(Object.assign({ initMethod: 'init' }, defaultOptions, options)); | ||
const mergedOptions = Object.assign({ initMethod: 'init' }, defaultOptions, options); | ||
assert(mergedOptions.parallelConsumeLimit <= mergedOptions.pullBatchSize, | ||
'[MQPushConsumer] options.parallelConsumeLimit must lte pullBatchSize'); | ||
super(mergedOptions); | ||
@@ -92,2 +100,6 @@ // @example: | ||
get parallelConsumeLimit() { | ||
return this.options.parallelConsumeLimit; | ||
} | ||
get consumerGroup() { | ||
@@ -115,2 +127,3 @@ return this.options.consumerGroup; | ||
this._mqClient.registerConsumer(this.consumerGroup, this); | ||
await MQProducer.getDefaultProducer(this.options); | ||
await this._mqClient.ready(); | ||
@@ -157,4 +170,2 @@ await this._offsetStore.load(); | ||
subscribe(topic, subExpression, handler) { | ||
const _self = this; | ||
if (arguments.length === 2) { | ||
@@ -165,8 +176,14 @@ handler = subExpression; | ||
assert(is.asyncFunction(handler), '[MQPushConsumer] handler should be a asyncFunction'); | ||
assert(!_self.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`); | ||
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`); | ||
const subscriptionData = _self.buildSubscriptionData(_self.consumerGroup, topic, subExpression); | ||
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression); | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic, subExpression); | ||
this.subscriptions.set(retryTopic, { | ||
handler, | ||
subscriptionData: retrySubscriptionData, | ||
}); | ||
const tagsSet = subscriptionData.tagsSet; | ||
const needFilter = !!tagsSet.length; | ||
_self.subscriptions.set(topic, { | ||
this.subscriptions.set(topic, { | ||
handler, | ||
@@ -178,56 +195,92 @@ subscriptionData, | ||
try { | ||
await _self.ready(); | ||
await this.ready(); | ||
// 如果 topic 没有路由信息,先更新一下 | ||
if (!_self._topicSubscribeInfoTable.has(topic)) { | ||
await _self._mqClient.updateAllTopicRouterInfo(); | ||
await _self._mqClient.sendHeartbeatToAllBroker(); | ||
await _self._mqClient.doRebalance(); | ||
if (!this._topicSubscribeInfoTable.has(topic)) { | ||
await this._mqClient.updateAllTopicRouterInfo(); | ||
await this._mqClient.sendHeartbeatToAllBroker(); | ||
await this._mqClient.doRebalance(); | ||
} | ||
while (!_self._isClosed && _self.subscriptions.has(topic)) { | ||
const mqList = _self._topicSubscribeInfoTable.get(topic); | ||
let hasMsg = false; | ||
if (mqList && mqList.length) { | ||
for (const mq of mqList) { | ||
const item = _self._processQueueTable.get(mq.key); | ||
if (item) { | ||
const pq = item.processQueue; | ||
while (pq.msgCount) { | ||
hasMsg = true; | ||
const msg = pq.msgList[0]; | ||
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
await Promise.all([ (async () => { | ||
// 消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(topic)) { | ||
await this._consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression); | ||
} | ||
})(), (async () => { | ||
// 重试消息消费循环 | ||
while (!this._isClosed && this.subscriptions.has(retryTopic)) { | ||
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, handler, subExpression); | ||
} | ||
})() ]); | ||
} catch (err) { | ||
this._handleError(err); | ||
} | ||
})(); | ||
} | ||
async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) { | ||
const mqList = this._topicSubscribeInfoTable.get(topic); | ||
let hasMsg = false; | ||
if (mqList && mqList.length) { | ||
for (const mq of mqList) { | ||
const item = this._processQueueTable.get(mq.key); | ||
if (item) { | ||
const pq = item.processQueue; | ||
while (pq.msgCount) { | ||
hasMsg = true; | ||
let msgs; | ||
if (this.parallelConsumeLimit > pq.msgCount) { | ||
msgs = pq.msgList.slice(0, pq.msgCount); | ||
} else { | ||
msgs = pq.msgList.slice(0, this.parallelConsumeLimit); | ||
} | ||
let localRetry = false; | ||
for (const msg of msgs) { | ||
utils.resetRetryTopic(msg, this.consumerGroup); | ||
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
try { | ||
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) { | ||
await handler(msg, mq, pq); | ||
} | ||
} catch (err) { | ||
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; | ||
this.emit('error', err); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
// 发送重试消息 | ||
try { | ||
await handler(msg, mq, pq); | ||
// delayLevel 为 0 代表由服务端控制重试间隔 | ||
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup); | ||
} catch (err) { | ||
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; | ||
_self.emit('error', err); | ||
if (_self.messageModel === MessageModel.CLUSTERING) { | ||
// TODO: support retry message | ||
msg.reconsumeTimes++; | ||
await _self._sleep(5000); | ||
continue; | ||
} else { | ||
_self.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); | ||
} | ||
this.emit('error', err); | ||
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId); | ||
// 重试消息发送失败,本地重试 | ||
msg.reconsumeTimes++; | ||
await this._sleep(5000); | ||
localRetry = true; | ||
} | ||
} else { | ||
_self.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); | ||
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); | ||
} | ||
const offset = pq.remove(); | ||
if (offset >= 0) { | ||
_self._offsetStore.updateOffset(mq, offset, true); | ||
} | ||
} | ||
} else { | ||
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); | ||
} | ||
} | ||
if (localRetry) { | ||
continue; | ||
} | ||
// 注意这里必须是批量确认 | ||
const offset = pq.remove(msgs.length); | ||
if (offset >= 0) { | ||
this._offsetStore.updateOffset(mq, offset, true); | ||
} | ||
} | ||
if (!hasMsg) { | ||
await _self.await(`topic_${topic}_changed`); | ||
} | ||
} | ||
} catch (err) { | ||
_self._handleError(err); | ||
} | ||
})(); | ||
} | ||
if (!hasMsg) { | ||
await this.await(`topic_${topic}_changed`); | ||
} | ||
} | ||
@@ -651,2 +704,31 @@ | ||
async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) { | ||
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName) | ||
: msg.storeHost; | ||
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup; | ||
try { | ||
await this._mqClient.consumerSendMessageBack( | ||
brokerAddr, | ||
msg, | ||
thatConsumerGroup, | ||
delayLevel, | ||
3000, | ||
this.options.maxReconsumeTimes); | ||
} catch (err) { | ||
err.mesasge = 'sendMessageBack() occurred an exception, ' + thatConsumerGroup + ', ' + err.mesasge; | ||
this._handleError(err); | ||
const newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body); | ||
newMsg.flag = msg.flag; | ||
newMsg.properties = msg.properties; | ||
newMsg.originMessageId = msg.originMessageId || msg.msgId; | ||
newMsg.retryTopic = msg.topic; | ||
newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1); | ||
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes; | ||
newMsg.delayTimeLevel = 1 + msg.reconsumeTimes; | ||
await (await MQProducer.getDefaultProducer()).send(newMsg); | ||
} | ||
} | ||
// * viewMessage(msgId) { | ||
@@ -653,0 +735,0 @@ // const info = MessageDecoder.decodeMessageId(msgId); |
@@ -26,3 +26,4 @@ 'use strict'; | ||
exports.PROPERTY_RECONSUME_TIME = 'RECONSUME_TIME'; | ||
exports.PROPERTY_MAX_RECONSUME_TIMES = 'MAX_RECONSUME_TIMES'; | ||
exports.KEY_SEPARATOR = ' '; |
@@ -71,2 +71,26 @@ 'use strict'; | ||
/** | ||
* 原始消息 Id | ||
* @property {String} Message#originMessageId | ||
*/ | ||
get originMessageId() { | ||
return this.properties && this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID]; | ||
} | ||
set originMessageId(val) { | ||
this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID] = val; | ||
} | ||
/** | ||
* 重试 Topic | ||
* @property {String} Message#retryTopic | ||
*/ | ||
get retryTopic() { | ||
return this.properties && this.properties[MessageConst.PROPERTY_RETRY_TOPIC]; | ||
} | ||
set retryTopic(val) { | ||
this.properties[MessageConst.PROPERTY_RETRY_TOPIC] = val; | ||
} | ||
/** | ||
* 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义) | ||
@@ -73,0 +97,0 @@ * @property {Number} Message#delayTimeLevel |
@@ -6,2 +6,3 @@ 'use strict'; | ||
exports.DEFAULT_CONSUMER_GROUP = 'DEFAULT_CONSUMER'; | ||
exports.CLIENT_INNER_PRODUCER_GROUP = 'CLIENT_INNER_PRODUCER'; | ||
@@ -15,4 +16,9 @@ exports.DEFAULT_CHARSET = 'UTF-8'; | ||
/** | ||
* 获取 RETRY_TOPIC | ||
* @param {string} consumerGroup consumerGroup | ||
* @return {string} %RETRY%+consumerGroup | ||
*/ | ||
exports.getRetryTopic = consumerGroup => { | ||
return exports.RETRY_GROUP_TOPIC_PREFIX + consumerGroup; | ||
}; |
@@ -469,2 +469,3 @@ 'use strict'; | ||
k: requestHeader.unitMode, | ||
l: requestHeader.maxReconsumeTimes, | ||
}; | ||
@@ -504,2 +505,32 @@ const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); | ||
/** | ||
* consumer send message back | ||
* @param {String} brokerAddr - broker address | ||
* @param {Message} msg - message object | ||
* @param {String} consumerGroup - consumer group | ||
* @param {Number} delayLevel - delay level | ||
* @param {Number} timeoutMillis - timeout in millis | ||
* @param {Number} maxConsumeRetryTimes - max retry times | ||
*/ | ||
async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) { | ||
const requestHeader = { | ||
offset: msg.commitLogOffset, | ||
group: consumerGroup, | ||
delayLevel, | ||
originMsgId: msg.msgId, | ||
originTopic: msg.topic, | ||
unitMode: false, | ||
maxReconsumeTimes: maxConsumeRetryTimes, | ||
}; | ||
const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); | ||
const response = await this.invoke(brokerAddr, request, timeoutMillis); | ||
switch (response.code) { | ||
case ResponseCode.SUCCESS: | ||
return; | ||
default: | ||
this._defaultHandler(request, response); | ||
break; | ||
} | ||
} | ||
// * viewMessage(brokerAddr, phyoffset, timeoutMillis) { | ||
@@ -506,0 +537,0 @@ // const requestHeader = { |
@@ -117,3 +117,3 @@ 'use strict'; | ||
* @param {String} group - consumer group name | ||
* @param {Comsumer} consumer - consumer instance | ||
* @param {Consumer} consumer - consumer instance | ||
* @return {void} | ||
@@ -120,0 +120,0 @@ */ |
@@ -81,2 +81,3 @@ 'use strict'; | ||
// this._topicPublishInfoTable.set(this.createTopicKey, new TopicPublishInfo()); | ||
await MQProducer.getDefaultProducer(this.options); | ||
this._mqClient.registerProducer(this.producerGroup, this); | ||
@@ -299,2 +300,7 @@ await this._mqClient.ready(); | ||
} | ||
const maxReconsumeTimes = msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES]; | ||
if (maxReconsumeTimes) { | ||
requestHeader.maxReconsumeTimes = parseInt(maxReconsumeTimes, 10); | ||
delete msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES]; | ||
} | ||
} | ||
@@ -328,2 +334,18 @@ | ||
} | ||
/** | ||
* 获取默认 Producer 实例 | ||
* @param {Object} options 配置 | ||
* @return {MQProducer} 默认 Producer 实例 | ||
*/ | ||
static async getDefaultProducer(options = {}) { | ||
if (!MQProducer.defaultProducer) { | ||
const opts = Object.assign({}, options); | ||
opts.producerGroup = MixAll.CLIENT_INNER_PRODUCER_GROUP; | ||
MQProducer.defaultProducer = new MQProducer(opts); | ||
await MQProducer.defaultProducer.ready(); | ||
return MQProducer.defaultProducer; | ||
} | ||
return MQProducer.defaultProducer; | ||
} | ||
} | ||
@@ -330,0 +352,0 @@ |
@@ -6,2 +6,3 @@ 'use strict'; | ||
const zlib = require('zlib'); | ||
const MixAll = require('../mix_all'); | ||
const is = require('is-type-of'); | ||
@@ -57,2 +58,10 @@ | ||
// 重置重试消息的 Topic | ||
exports.resetRetryTopic = function(msg, consumerGroup) { | ||
const groupTopic = MixAll.getRetryTopic(consumerGroup); | ||
if (msg.retryTopic === groupTopic) { | ||
msg.topic = msg.retryTopic; | ||
} | ||
}; | ||
function zeroize(value, length) { | ||
@@ -59,0 +68,0 @@ if (!length) { |
{ | ||
"name": "ali-ons", | ||
"version": "3.2.2", | ||
"version": "3.3.0-beta.0", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
148088
4079
1
1