Comparing version 1.0.0 to 2.0.0
2.0.0 / 2017-09-29 | ||
================== | ||
* feat: add local memory store | ||
* refactor: new consumer api | ||
* fix: consumer offset update issue | ||
1.0.0 / 2017-03-14 | ||
================== | ||
* feat: implement ali-ons base on rocketmq #1 | ||
* feat: implement ali-ons base on rocketmq #1 |
@@ -10,2 +10,3 @@ 'use strict'; | ||
const MQClient = require('../mq_client'); | ||
const sleep = require('mz-modules/sleep'); | ||
const ClientConfig = require('../client_config'); | ||
@@ -20,3 +21,3 @@ const ProcessQueue = require('../process_queue'); | ||
const LocalFileOffsetStore = require('../store/local_file'); | ||
// const MessageDecoder = require('../message/message_decoder'); | ||
const LocalMemoryOffsetStore = require('../store/local_memory'); | ||
const RemoteBrokerOffsetStore = require('../store/remote_broker'); | ||
@@ -27,5 +28,7 @@ const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely'); | ||
logger, | ||
persistent: false, // 是否持久化消费进度 | ||
isBroadcast: false, // 是否是广播模式(默认集群消费模式) | ||
brokerSuspendMaxTimeMillis: 1000 * 15, // 长轮询模式,Consumer连接在Broker挂起最长时间 | ||
pullTimeDelayMillsWhenException: 3000, // 拉消息异常时,延迟一段时间再拉 | ||
pullTimeDelayMillsWhenFlowControl: 5000, // 进入流控逻辑,延迟一段时间再拉 | ||
consumerTimeoutMillisWhenSuspend: 1000 * 30, // 长轮询模式,Consumer超时时间(必须要大于brokerSuspendMaxTimeMillis) | ||
@@ -41,3 +44,3 @@ consumerGroup: MixAll.DEFAULT_CONSUMER_GROUP, | ||
consumeTimestamp: utils.timeMillisToHumanString(Date.now() - 1000 * 60 * 30), | ||
pullThresholdForQueue: 1000, // 本地队列消息数超过此阀值,开始流控 | ||
pullThresholdForQueue: 500, // 本地队列消息数超过此阀值,开始流控 | ||
pullInterval: 0, // 拉取消息的频率, 如果为了降低拉取速度,可以设置大于0的值 | ||
@@ -61,5 +64,7 @@ consumeMessageBatchMaxSize: 1, // 消费一批消息,最大数 | ||
this._subscriptions = new Map(); | ||
this._handles = new Map(); | ||
this._topicSubscribeInfoTable = new Map(); | ||
this._processQueueTable = new Map(); | ||
this._inited = false; | ||
this._closePromise = this.await('close'); | ||
@@ -71,8 +76,6 @@ if (this.messageModel === MessageModel.CLUSTERING) { | ||
this._mqClient = MQClient.getAndCreateMQClient(this); | ||
this._offsetStore = this.options.isBroadcast ? | ||
new LocalFileOffsetStore(this._mqClient, this.consumerGroup) : | ||
new RemoteBrokerOffsetStore(this._mqClient, this.consumerGroup); | ||
this._offsetStore = this.newOffsetStoreInstance(); | ||
this._mqClient.on('error', err => this._error(err)); | ||
this._offsetStore.on('error', err => this._error(err)); | ||
this._mqClient.on('error', err => this._handleError(err)); | ||
this._offsetStore.on('error', err => this._handleError(err)); | ||
} | ||
@@ -88,2 +91,6 @@ | ||
get processQueueTable() { | ||
return this._processQueueTable; | ||
} | ||
get consumerGroup() { | ||
@@ -132,7 +139,17 @@ return this.options.consumerGroup; | ||
yield this._mqClient.close(); | ||
this.removeAllListeners(); | ||
this.logger.info('[mq:consumer] consumer closed'); | ||
this.emit('close'); | ||
}.bind(this)); | ||
} | ||
newOffsetStoreInstance() { | ||
if (this.messageModel === MessageModel.BROADCASTING) { | ||
if (this.options.persistent) { | ||
return new LocalFileOffsetStore(this._mqClient, this.consumerGroup); | ||
} | ||
return new LocalMemoryOffsetStore(this._mqClient, this.consumerGroup); | ||
} | ||
return new RemoteBrokerOffsetStore(this._mqClient, this.consumerGroup); | ||
} | ||
/** | ||
@@ -142,15 +159,76 @@ * subscribe | ||
* @param {String} subExpression - tag | ||
* @param {Function} handler - message handler | ||
* @return {void} | ||
*/ | ||
subscribe(topic, subExpression) { | ||
subscribe(topic, subExpression, handler) { | ||
if (arguments.length === 2) { | ||
handler = subExpression; | ||
subExpression = null; | ||
} | ||
assert(is.generatorFunction(handler), '[MQPushConsumer] handler should a generatorFunction'); | ||
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`); | ||
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression); | ||
this.subscriptions.set(topic, subscriptionData); | ||
const tagsSet = subscriptionData.tagsSet; | ||
const needFilter = !!tagsSet.length; | ||
this.subscriptions.set(topic, { | ||
handler, | ||
subscriptionData, | ||
}); | ||
if (this._inited) { | ||
co(function* () { | ||
co(function* () { | ||
yield this.ready(); | ||
// 如果 topic 没有路由信息,先更新一下 | ||
if (!this._topicSubscribeInfoTable.has(topic)) { | ||
yield this._mqClient.updateAllTopicRouterInfo(); | ||
yield this._mqClient.sendHeartbeatToAllBroker(); | ||
yield this._mqClient.doRebalance(); | ||
}.bind(this)).catch(err => this._error(err)); | ||
} | ||
} | ||
while (this._inited && this.subscriptions.has(topic)) { | ||
const mqList = this._topicSubscribeInfoTable.get(topic); | ||
let hasMsg = false; | ||
if (mqList && mqList.length) { | ||
for (const mq of mqList) { | ||
const item = this._processQueueTable.get(mq.key); | ||
if (item) { | ||
const pq = item.processQueue; | ||
while (pq.msgCount) { | ||
hasMsg = true; | ||
const msg = pq.msgList[0]; | ||
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
try { | ||
yield handler(msg, mq, pq); | ||
} catch (err) { | ||
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; | ||
this.emit('error', err); | ||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
// TODO: support retry message | ||
msg.reconsumeTimes++; | ||
yield this._sleep(5000); | ||
continue; | ||
} else { | ||
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); | ||
} | ||
} | ||
} else { | ||
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); | ||
} | ||
const offset = pq.remove(); | ||
if (offset >= 0) { | ||
this._offsetStore.updateOffset(mq, offset, true); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
if (!hasMsg) { | ||
yield Promise.race([ | ||
this._closePromise, | ||
this.await(`topic_${topic}_changed`), | ||
]); | ||
} | ||
} | ||
}.bind(this)).catch(err => this._handleError(err)); | ||
} | ||
@@ -178,12 +256,8 @@ | ||
const tags = subString.split('||'); | ||
if (tags && tags.length) { | ||
for (let tag of tags) { | ||
tag = tag.trim(); | ||
if (tag) { | ||
subscriptionData.tagsSet.push(tag); | ||
subscriptionData.codeSet.push(utils.hashCode(tag)); | ||
} | ||
for (let tag of tags) { | ||
tag = tag.trim(); | ||
if (tag) { | ||
subscriptionData.tagsSet.push(tag); | ||
subscriptionData.codeSet.push(utils.hashCode(tag)); | ||
} | ||
} else { | ||
throw new Error('[mq:consumer] subString split error'); | ||
} | ||
@@ -211,6 +285,6 @@ } | ||
co(function* () { | ||
while (this._processQueueTable.has(messageQueue.key)) { | ||
while (this._inited && this._processQueueTable.has(messageQueue.key)) { | ||
try { | ||
yield this.executePullRequestImmediately(messageQueue); | ||
yield sleep(this.options.pullInterval); | ||
yield this._sleep(this.options.pullInterval); | ||
} catch (err) { | ||
@@ -220,4 +294,4 @@ if (this._inited) { | ||
err.message = `[mq:consumer] pull message for queue: ${messageQueue.key}, occurred error: ${err.message}`; | ||
this.logger.error(err); | ||
yield sleep(this.options.pullTimeDelayMillsWhenException); | ||
this._handleError(err); | ||
yield this._sleep(this.options.pullTimeDelayMillsWhenException); | ||
} | ||
@@ -245,8 +319,16 @@ } | ||
} | ||
// flow control | ||
const size = processQueue.msgCount; | ||
if (size > this.options.pullThresholdForQueue) { | ||
yield this._sleep(this.options.pullTimeDelayMillsWhenFlowControl); | ||
return; | ||
} | ||
processQueue.lastPullTimestamp = Date.now(); | ||
const subscriptionData = this.subscriptions.get(messageQueue.topic); | ||
const data = this.subscriptions.get(messageQueue.topic); | ||
const subscriptionData = data && data.subscriptionData; | ||
if (!subscriptionData) { | ||
this.logger.warn('[mq:consumer] execute pull request, but subscriptionData not found, topic: %s, queueId: %s', messageQueue.topic, messageQueue.queueId); | ||
yield sleep(this.options.pullTimeDelayMillsWhenException); | ||
yield this._sleep(this.options.pullTimeDelayMillsWhenException); | ||
return; | ||
@@ -274,37 +356,12 @@ } | ||
case PullStatus.FOUND: | ||
{ | ||
let msgList = pullResult.msgFoundList; | ||
this.logger.info('[mq:consumer] new messages found, size: %d', msgList.length); | ||
if (subscriptionData.tagsSet && subscriptionData.tagsSet.length && !subscriptionData.classFilterMode) { | ||
msgList = msgList.filter(function(msg) { | ||
if (msg.tags && subscriptionData.tagsSet.indexOf(msg.tags) >= 0) { | ||
return true; | ||
} | ||
return false; | ||
}); | ||
} | ||
this.logger.info('[mq:consumer] after filter by tags %d messages remaining.', msgList.length); | ||
if (msgList && msgList.length) { | ||
// todo: long ? | ||
const firstMsgOffset = Number(msgList[0].queueOffset); | ||
const lastMsgOffset = Number(msgList[msgList.length - 1].queueOffset); | ||
{ | ||
const pullRT = Date.now() - processQueue.lastPullTimestamp; | ||
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.', | ||
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT); | ||
yield new Promise((resolve, reject) => { | ||
const timer = setTimeout(() => { | ||
reject(new Error(`Message process timeout for topic: ${subscriptionData.topic}, from ${firstMsgOffset} to ${lastMsgOffset}.`)); | ||
}, 3000); | ||
// emit message event | ||
this.emit('message', msgList, () => { | ||
resolve(); | ||
clearTimeout(timer); | ||
}); | ||
}); | ||
// update offset | ||
this._offsetStore.updateOffset(messageQueue, lastMsgOffset + 1); | ||
this.logger.info('[mq:consumer] process message successfully for topic: %s at queue: %s from %d to %d', subscriptionData.topic, messageQueue.queueId, firstMsgOffset, lastMsgOffset); | ||
} | ||
break; | ||
} | ||
// submit to consumer | ||
processQueue.putMessage(pullResult.msgFoundList); | ||
this.emit(`topic_${messageQueue.topic}_changed`); | ||
break; | ||
} | ||
case PullStatus.NO_NEW_MSG: | ||
@@ -316,8 +373,4 @@ case PullStatus.NO_MATCHED_MSG: | ||
case PullStatus.OFFSET_ILLEGAL: | ||
this.logger.warn('[mq:consumer] the pull request offset illegal, message queue => %s, the originOffset => %j, pullResult => %j', messageQueue.key, originOffset, pullResult); | ||
pullRequest.processQueue.droped = true; | ||
yield sleep(10000); | ||
this.logger.warn('[mq:consumer] the pull request offset illegal, message queue => %s, the originOffset => %d, pullResult => %j', messageQueue.key, originOffset, pullResult); | ||
this._offsetStore.updateOffset(messageQueue, pullRequest.nextOffset); | ||
yield this._offsetStore.persist(messageQueue); | ||
yield this.removeProcessQueue(messageQueue); | ||
break; | ||
@@ -327,4 +380,2 @@ default: | ||
} | ||
// update pull request | ||
this._processQueueTable.set(messageQueue.key, pullRequest); | ||
} | ||
@@ -435,2 +486,3 @@ | ||
let changed; | ||
let allocateResult = mqSet; | ||
if (this.options.isBroadcast) { | ||
@@ -446,3 +498,3 @@ changed = yield this.updateProcessQueueTableInRebalance(topic, mqSet); | ||
const allocateResult = this.allocateMessageQueueStrategy.allocate(this.consumerGroup, this._mqClient.clientId, mqSet, cidAll); | ||
allocateResult = this.allocateMessageQueueStrategy.allocate(this.consumerGroup, this._mqClient.clientId, mqSet, cidAll); | ||
this.logger.info('[mq:consumer] allocate queue for group: %s, clientId: %s, result: %j', this.consumerGroup, this._mqClient.clientId, allocateResult); | ||
@@ -453,4 +505,4 @@ changed = yield this.updateProcessQueueTableInRebalance(topic, allocateResult); | ||
if (changed) { | ||
this.logger.info('[mq:consumer] do rebalance and message queue changed, topic: %s, mqSet: %j', topic, mqSet); | ||
this.emit('messageQueueChanged', topic, mqSet); | ||
this.logger.info('[mq:consumer] do rebalance and message queue changed, topic: %s, mqSet: %j', topic, allocateResult); | ||
this.emit(`topic_${topic}_queue_changed`); | ||
} | ||
@@ -470,6 +522,2 @@ } | ||
const obj = this._processQueueTable.get(key); | ||
if (!obj) { | ||
this._processQueueTable.delete(key); | ||
} | ||
const messageQueue = obj.messageQueue; | ||
@@ -482,12 +530,10 @@ const processQueue = obj.processQueue; | ||
processQueue.droped = true; | ||
if (yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue)) { | ||
changed = true; | ||
this._processQueueTable.delete(key); | ||
} | ||
} | ||
} else if (processQueue.isPullExpired && this.consumeType === ConsumeType.CONSUME_PASSIVELY) { | ||
processQueue.droped = true; | ||
if (yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue)) { | ||
yield this.removeProcessQueue(messageQueue); | ||
changed = true; | ||
this._processQueueTable.delete(key); | ||
} else if (processQueue.isPullExpired && this.consumeType === ConsumeType.CONSUME_PASSIVELY) { | ||
processQueue.droped = true; | ||
yield this.removeProcessQueue(messageQueue); | ||
changed = true; | ||
this.logger.warn('[MQPushConsumer] BUG doRebalance, %s, remove unnecessary mq=%s, because pull is pause, so try to fixed it', | ||
this.consumerGroup, messageQueue.key); | ||
} | ||
@@ -586,3 +632,3 @@ } | ||
err.mesasge = 'computePullFromWhere() occurred an exception, ' + err.mesasge; | ||
this.logger.error(err); | ||
this._handleError(err); | ||
return -1; | ||
@@ -601,6 +647,5 @@ } | ||
if (processQueue) { | ||
const droped = processQueue.droped; | ||
processQueue.droped = true; | ||
yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue); | ||
this.logger.info('[mq:consumer] Fix Offset, %s, remove unnecessary messageQueue, %j Droped: %s', this.consumerGroup, messageQueue, droped); | ||
this.logger.info('[mq:consumer] remove unnecessary messageQueue, %s, Droped: %s', messageQueue.key, processQueue.droped); | ||
} | ||
@@ -612,3 +657,3 @@ } | ||
* @param {MessageQueue} messageQueue - message queue | ||
* @return {Boolean} success or not | ||
* @return {void} | ||
*/ | ||
@@ -619,3 +664,2 @@ * removeUnnecessaryMessageQueue(messageQueue) { | ||
// todo: consume later ? | ||
return true; | ||
} | ||
@@ -628,46 +672,13 @@ | ||
_error(err) { | ||
setImmediate(() => { | ||
err.message = 'MQPushConsumer occurred an error' + err.message; | ||
this.emit('error', err); | ||
}); | ||
_handleError(err) { | ||
err.message = 'MQPushConsumer occurred an error ' + err.message; | ||
this.emit('error', err); | ||
} | ||
on(event, listener) { | ||
let newListener = listener; | ||
if (event === 'message' && is.function(listener) && listener.length === 1) { | ||
newListener = (message, done) => { | ||
listener(message); | ||
done(); | ||
}; | ||
newListener.__oldListener = listener; | ||
} | ||
return super.on(event, newListener); | ||
_sleep(timeout) { | ||
return Promise.race([ | ||
this._closePromise, | ||
sleep(timeout), | ||
]); | ||
} | ||
once(event, listener) { | ||
let newListener = listener; | ||
if (event === 'message' && is.function(listener) && listener.length === 1) { | ||
newListener = (message, done) => { | ||
listener(message); | ||
done(); | ||
}; | ||
newListener.__oldListener = listener; | ||
} | ||
return super.once(event, newListener); | ||
} | ||
removeListener(event, listener) { | ||
let newListener = listener; | ||
if (event === 'message' && is.function(listener) && listener.length === 1) { | ||
const listeners = this.listeners('message'); | ||
for (let i = 0, len = listeners.length; i < len; i++) { | ||
if (listeners[i].__oldListener === listener) { | ||
newListener = listeners[i]; | ||
break; | ||
} | ||
} | ||
} | ||
return super.removeListener(event, newListener); | ||
} | ||
} | ||
@@ -688,11 +699,1 @@ | ||
} | ||
function sleep(interval) { | ||
return callback => { | ||
if (interval <= 0) { | ||
setImmediate(callback); | ||
} else { | ||
setTimeout(callback, interval); | ||
} | ||
}; | ||
} |
@@ -38,4 +38,4 @@ 'use strict'; | ||
mod > 0 && index < mod ? | ||
mqLen / cidLen + 1 : | ||
mqLen / cidLen; | ||
mqLen / cidLen + 1 : | ||
mqLen / cidLen; | ||
averageSize = Math.floor(averageSize); // 取整 | ||
@@ -42,0 +42,0 @@ const startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod; |
@@ -95,5 +95,5 @@ 'use strict'; | ||
// 6 QUEUEOFFSET | ||
msgExt.queueOffset = byteBuffer.getLong(); | ||
msgExt.queueOffset = byteBuffer.getLong().toNumber(); | ||
// 7 PHYSICALOFFSET | ||
msgExt.commitLogOffset = byteBuffer.getLong(); | ||
msgExt.commitLogOffset = byteBuffer.getLong().toNumber(); | ||
// 8 SYSFLAG | ||
@@ -103,3 +103,3 @@ const sysFlag = byteBuffer.getInt(); | ||
// 9 BORNTIMESTAMP | ||
msgExt.bornTimestamp = byteBuffer.getLong(); | ||
msgExt.bornTimestamp = byteBuffer.getLong().toNumber(); | ||
// 10 BORNHOST | ||
@@ -111,3 +111,3 @@ const host = new Buffer(4); | ||
// 11 STORETIMESTAMP | ||
msgExt.storeTimestamp = byteBuffer.getLong(); | ||
msgExt.storeTimestamp = byteBuffer.getLong().toNumber(); | ||
// 12 STOREHOST | ||
@@ -124,3 +124,3 @@ host.fill(0); | ||
// 14 Prepared Transaction Offset | ||
msgExt.preparedTransactionOffset = byteBuffer.getLong(); | ||
msgExt.preparedTransactionOffset = byteBuffer.getLong().toNumber(); | ||
// 15 BODY | ||
@@ -127,0 +127,0 @@ const bodyLen = byteBuffer.getInt(); |
@@ -31,3 +31,3 @@ 'use strict'; | ||
this.storeHost = null; | ||
this.reconsumeTimes = null; | ||
this.reconsumeTimes = 0; | ||
this.preparedTransactionOffset = null; // long | ||
@@ -34,0 +34,0 @@ this.topic = topic; |
@@ -31,3 +31,3 @@ 'use strict'; | ||
* @param {Object} options | ||
* - {HttpClient} urllib - http request client | ||
* - {HttpClient} httpclient - http request client | ||
* - {Object} [logger] - log module | ||
@@ -85,6 +85,6 @@ * - {Number} [responseTimeout] - tcp response timeout | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
return responseHeader && responseHeader.value; | ||
} | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
return responseHeader && responseHeader.value; | ||
} | ||
default: | ||
@@ -111,15 +111,15 @@ this._defaultHandler(request, response); | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const body = response.body; | ||
if (body) { | ||
this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString()); | ||
// JSON.parse dose not work here | ||
const routerInfoData = JSON2.parse(body.toString()); | ||
// sort | ||
routerInfoData.queueDatas.sort(compare); | ||
routerInfoData.brokerDatas.sort(compare); | ||
return routerInfoData; | ||
} | ||
break; | ||
{ | ||
const body = response.body; | ||
if (body) { | ||
this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString()); | ||
// JSON.parse dose not work here | ||
const routerInfoData = JSON2.parse(body.toString()); | ||
// sort | ||
routerInfoData.queueDatas.sort(compare); | ||
routerInfoData.brokerDatas.sort(compare); | ||
return routerInfoData; | ||
} | ||
break; | ||
} | ||
case ResponseCode.TOPIC_NOT_EXIST: | ||
@@ -177,13 +177,13 @@ this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic); | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const body = response.body; | ||
if (body) { | ||
const routerInfoData = JSON2.parse(body.toString()); | ||
// sort | ||
routerInfoData.queueDatas.sort(compare); | ||
routerInfoData.brokerDatas.sort(compare); | ||
return routerInfoData; | ||
} | ||
break; | ||
{ | ||
const body = response.body; | ||
if (body) { | ||
const routerInfoData = JSON2.parse(body.toString()); | ||
// sort | ||
routerInfoData.queueDatas.sort(compare); | ||
routerInfoData.brokerDatas.sort(compare); | ||
return routerInfoData; | ||
} | ||
break; | ||
} | ||
case ResponseCode.TOPIC_NOT_EXIST: | ||
@@ -253,6 +253,6 @@ this.logger.warn('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic); | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
return Number(responseHeader.offset.toString()); | ||
} | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
return Number(responseHeader.offset.toString()); | ||
} | ||
default: | ||
@@ -283,7 +283,7 @@ this._defaultHandler(request, response); | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
// todo: | ||
return responseHeader && Number(responseHeader.offset); | ||
} | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
// todo: | ||
return responseHeader && Number(responseHeader.offset); | ||
} | ||
default: | ||
@@ -315,7 +315,7 @@ this._defaultHandler(request, response); | ||
case ResponseCode.SUCCESS: | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
// todo: | ||
return responseHeader && Number(responseHeader.offset); | ||
} | ||
{ | ||
const responseHeader = response.decodeCommandCustomHeader(); | ||
// todo: | ||
return responseHeader && Number(responseHeader.offset); | ||
} | ||
default: | ||
@@ -497,3 +497,3 @@ this._defaultHandler(request, response); | ||
messageQueue, | ||
queueOffset: responseHeader.queueOffset, | ||
queueOffset: Number(responseHeader.queueOffset), | ||
}; | ||
@@ -500,0 +500,0 @@ } |
@@ -224,3 +224,3 @@ 'use strict'; | ||
data.error.message = `[mq:client] updateAllTopicRouterInfo occurred error, ${data.error.message}`; | ||
this.logger.error(data.error); | ||
this.emit('error', data.error); | ||
} | ||
@@ -244,3 +244,3 @@ }); | ||
defaultMQProducer.defaultTopicQueueNums < data.readQueueNums ? | ||
defaultMQProducer.defaultTopicQueueNums : data.readQueueNums; | ||
defaultMQProducer.defaultTopicQueueNums : data.readQueueNums; | ||
data.readQueueNums = queueNums; | ||
@@ -446,3 +446,6 @@ data.writeQueueNums = queueNums; | ||
for (const topic of consumer.subscriptions.keys()) { | ||
subscriptionDataSet.push(consumer.subscriptions.get(topic)); | ||
const data = consumer.subscriptions.get(topic); | ||
if (data && data.subscriptionData) { | ||
subscriptionDataSet.push(data.subscriptionData); | ||
} | ||
} | ||
@@ -449,0 +452,0 @@ |
'use strict'; | ||
const Base = require('sdk-base'); | ||
const bsInsert = require('binary-search-insert'); | ||
const comparator = (a, b) => a.queueOffset - b.queueOffset; | ||
const pullMaxIdleTime = 120000; | ||
class ProcessQueue { | ||
constructor() { | ||
this.msgs = []; | ||
class ProcessQueue extends Base { | ||
constructor(options = {}) { | ||
super(options); | ||
// 当前Q是否被rebalance丢弃 | ||
this.msgList = []; | ||
this.droped = false; | ||
this.lastPullTimestamp = Date.now(); | ||
// 最后一次消费的时间戳 | ||
this.lastConsumeTimestamp = Date.now(); | ||
// 是否从Broker锁定 | ||
this.locked = false; | ||
// 最后一次锁定成功时间戳 | ||
this.lastLockTimestamp = Date.now(); | ||
} | ||
get maxSpan() { | ||
const msgCount = this.msgCount; | ||
if (msgCount) { | ||
return this.msgList[msgCount - 1].queueOffset - this.msgList[0].queueOffset; | ||
} | ||
return 0; | ||
} | ||
get msgCount() { | ||
return this.msgList.length; | ||
} | ||
get isPullExpired() { | ||
return Date.now() - this.lastPullTimestamp > pullMaxIdleTime; | ||
} | ||
putMessage() {} | ||
removeMessage() {} | ||
takeMessags() {} | ||
putMessage(msgs) { | ||
for (const msg of msgs) { | ||
bsInsert(this.msgList, comparator, msg); | ||
} | ||
this.queueOffsetMax = this.msgList[this.msgCount - 1].queueOffset; | ||
} | ||
remove(count = 1) { | ||
this.msgList.splice(0, count); | ||
return this.msgCount ? this.msgList[0].queueOffset : this.queueOffsetMax + 1; | ||
} | ||
clear() { | ||
this.msgList = []; | ||
} | ||
} | ||
module.exports = ProcessQueue; |
@@ -11,3 +11,3 @@ 'use strict'; | ||
const ClientConfig = require('../client_config'); | ||
const PermName = require('../protocol/perm_name'); | ||
// const PermName = require('../protocol/perm_name'); | ||
const MessageConst = require('../message/message_const'); | ||
@@ -151,33 +151,33 @@ const TopicPublishInfo = require('./topic_publish_info'); | ||
*/ | ||
* createTopic(key, newTopic, queueNum, topicSysFlag) { | ||
yield this.ready(); | ||
// * createTopic(key, newTopic, queueNum, topicSysFlag) { | ||
// yield this.ready(); | ||
topicSysFlag = topicSysFlag || 0; | ||
const topicRouteData = yield this._mqClient.getTopicRouteInfoFromNameServer(key, 1000 * 3); | ||
const brokerDataList = topicRouteData.brokerDatas; | ||
if (brokerDataList && brokerDataList.length) { | ||
// 排序原因:即使没有配置顺序消息模式,默认队列的顺序同配置的一致。 | ||
brokerDataList.sort(compare); | ||
// topicSysFlag = topicSysFlag || 0; | ||
// const topicRouteData = yield this._mqClient.getTopicRouteInfoFromNameServer(key, 1000 * 3); | ||
// const brokerDataList = topicRouteData.brokerDatas; | ||
// if (brokerDataList && brokerDataList.length) { | ||
// // 排序原因:即使没有配置顺序消息模式,默认队列的顺序同配置的一致。 | ||
// brokerDataList.sort(compare); | ||
yield brokerDataList.map(brokerData => { | ||
const addr = brokerData.brokerAddrs[MixAll.MASTER_ID]; | ||
if (addr) { | ||
const topicConfig = { | ||
topicName: newTopic, | ||
readQueueNums: queueNum, | ||
writeQueueNums: queueNum, | ||
topicSysFlag, | ||
perm: PermName.PERM_READ | PermName.PERM_WRITE, | ||
topicFilterType: 'SINGLE_TAG', | ||
order: false, | ||
}; | ||
this.logger.info('[mq:producer] execute createTopic at broker: %s, topic: %s', addr, newTopic); | ||
return this._mqClient.createTopic(addr, key, topicConfig, 1000 * 3); | ||
} | ||
return null; | ||
}); | ||
} else { | ||
throw new Error('Not found broker, maybe key is wrong'); | ||
} | ||
} | ||
// yield brokerDataList.map(brokerData => { | ||
// const addr = brokerData.brokerAddrs[MixAll.MASTER_ID]; | ||
// if (addr) { | ||
// const topicConfig = { | ||
// topicName: newTopic, | ||
// readQueueNums: queueNum, | ||
// writeQueueNums: queueNum, | ||
// topicSysFlag, | ||
// perm: PermName.PERM_READ | PermName.PERM_WRITE, | ||
// topicFilterType: 'SINGLE_TAG', | ||
// order: false, | ||
// }; | ||
// this.logger.info('[mq:producer] execute createTopic at broker: %s, topic: %s', addr, newTopic); | ||
// return this._mqClient.createTopic(addr, key, topicConfig, 1000 * 3); | ||
// } | ||
// return null; | ||
// }); | ||
// } else { | ||
// throw new Error('Not found broker, maybe key is wrong'); | ||
// } | ||
// } | ||
@@ -335,11 +335,11 @@ /** | ||
// Helper | ||
// --------------- | ||
function compare(routerA, routerB) { | ||
if (routerA.brokerName > routerB.brokerName) { | ||
return 1; | ||
} else if (routerA.brokerName < routerB.brokerName) { | ||
return -1; | ||
} | ||
return 0; | ||
} | ||
// // Helper | ||
// // --------------- | ||
// function compare(routerA, routerB) { | ||
// if (routerA.brokerName > routerB.brokerName) { | ||
// return 1; | ||
// } else if (routerA.brokerName < routerB.brokerName) { | ||
// return -1; | ||
// } | ||
// return 0; | ||
// } |
@@ -10,3 +10,3 @@ 'use strict'; | ||
logger, | ||
responseTimeout: 3000, | ||
responseTimeout: 30000, | ||
}; | ||
@@ -19,3 +19,3 @@ | ||
* @param {Object} options | ||
* - {HttpClient} urllib - http request client | ||
* - {HttpClient} httpclient - http request client | ||
* - {Object} [logger] - log module | ||
@@ -27,3 +27,3 @@ * - {Number} [responseTimeout] - tcp response timeout | ||
assert(options.onsAddr, '[RemotingClient] options.onsAddr is required'); | ||
assert(options.urllib, '[RemotingClient] options.urllib is required'); | ||
assert(options.httpclient, '[RemotingClient] options.httpclient is required'); | ||
super(Object.assign({ initMethod: 'init' }, defaultOptions, options)); | ||
@@ -37,6 +37,6 @@ | ||
/** | ||
* @property {HttpClient} RemotingClient#urllib | ||
* @property {HttpClient} RemotingClient#httpclient | ||
*/ | ||
get urllib() { | ||
return this.options.urllib; | ||
get httpclient() { | ||
return this.options.httpclient; | ||
} | ||
@@ -121,3 +121,3 @@ | ||
* updateNameServerAddressList() { | ||
const ret = yield this.urllib.request(this.options.onsAddr); | ||
const ret = yield this.httpclient.request(this.options.onsAddr); | ||
if (ret.status === 200) { | ||
@@ -124,0 +124,0 @@ const addrs = ret.data.toString().trim(); |
'use strict'; | ||
exports.LocalFileOffsetStore = require('./local_file'); | ||
exports.LocalMemoryOffsetStore = require('./local_memory'); | ||
exports.RemoteBrokerOffsetStore = require('./remote_broker'); |
@@ -66,5 +66,6 @@ 'use strict'; | ||
} | ||
if (increaseOnly || prev < offset) { | ||
this.offsetTable.set(messageQueue.key, offset); | ||
if (prev >= offset && increaseOnly) { | ||
return; | ||
} | ||
this.offsetTable.set(messageQueue.key, offset); | ||
this.logger.info('[mq:LocalFileOffsetStore] update offset for messageQueue: %s, current offset: %d, prev offset: %s, increaseOnly: %s', messageQueue.key, offset, prev, increaseOnly); | ||
@@ -88,21 +89,21 @@ } | ||
case ReadOffsetType.READ_FROM_MEMORY: | ||
{ | ||
const offset = this.offsetTable.get(messageQueue.key); | ||
if (!is.nullOrUndefined(offset)) { | ||
return offset; | ||
} | ||
break; | ||
{ | ||
const offset = this.offsetTable.get(messageQueue.key); | ||
if (!is.nullOrUndefined(offset)) { | ||
return offset; | ||
} | ||
break; | ||
} | ||
case ReadOffsetType.READ_FROM_STORE: | ||
{ | ||
const data = yield this.readLocalOffset(); | ||
if (data && data.offsetTable) { | ||
const offset = data.offsetTable[messageQueue.key]; | ||
if (is.number(offset)) { | ||
this.updateOffset(messageQueue, offset, false); | ||
return offset; | ||
} | ||
{ | ||
const data = yield this.readLocalOffset(); | ||
if (data && data.offsetTable) { | ||
const offset = data.offsetTable[messageQueue.key]; | ||
if (is.number(offset)) { | ||
this.updateOffset(messageQueue, offset, false); | ||
return offset; | ||
} | ||
break; | ||
} | ||
break; | ||
} | ||
default: | ||
@@ -109,0 +110,0 @@ break; |
@@ -54,5 +54,6 @@ 'use strict'; | ||
} | ||
if (increaseOnly || prev < offset) { | ||
this.offsetTable.set(messageQueue.key, offset); | ||
if (prev >= offset && increaseOnly) { | ||
return; | ||
} | ||
this.offsetTable.set(messageQueue.key, offset); | ||
this.logger.info('[mq:RemoteBrokerOffsetStore] update offset for messageQueue: %s, current offset: %d, prev offset: %s, increaseOnly: %s', messageQueue.key, offset, prev, increaseOnly); | ||
@@ -76,15 +77,15 @@ } | ||
case ReadOffsetType.READ_FROM_MEMORY: | ||
{ | ||
const offset = this.offsetTable.get(messageQueue.key); | ||
if (!is.nullOrUndefined(offset)) { | ||
return offset; | ||
} | ||
break; | ||
{ | ||
const offset = this.offsetTable.get(messageQueue.key); | ||
if (!is.nullOrUndefined(offset)) { | ||
return offset; | ||
} | ||
break; | ||
} | ||
case ReadOffsetType.READ_FROM_STORE: | ||
{ | ||
const brokerOffset = yield this.fetchConsumeOffsetFromBroker(messageQueue); | ||
this.updateOffset(messageQueue, brokerOffset, false); | ||
return brokerOffset; | ||
} | ||
{ | ||
const brokerOffset = yield this.fetchConsumeOffsetFromBroker(messageQueue); | ||
this.updateOffset(messageQueue, brokerOffset, false); | ||
return brokerOffset; | ||
} | ||
default: | ||
@@ -91,0 +92,0 @@ break; |
{ | ||
"name": "ali-ons", | ||
"version": "1.0.0", | ||
"version": "2.0.0", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -10,3 +10,4 @@ "main": "./lib/index.js", | ||
"scripts": { | ||
"autod": "autod", | ||
"autod": "autod --check", | ||
"pkgfiles": "egg-bin pkgfiles --check", | ||
"lint": "eslint . --ext .js", | ||
@@ -16,3 +17,3 @@ "test": "npm run lint && npm run test-local", | ||
"cov": "TEST_TIMEOUT=60000 egg-bin cov", | ||
"ci": "npm run lint && npm run cov", | ||
"ci": "npm run autod && npm run pkgfiles && npm run lint && npm run cov", | ||
"contributors": "contributors -f plain -o AUTHORS" | ||
@@ -37,36 +38,32 @@ }, | ||
"JSON2": "^0.1.0", | ||
"address": "^1.0.1", | ||
"byte": "^1.1.5", | ||
"bytes": "^2.4.0", | ||
"chalk": "^1.1.3", | ||
"address": "^1.0.3", | ||
"binary-search-insert": "^1.0.3", | ||
"byte": "^1.2.0", | ||
"bytes": "^3.0.0", | ||
"chalk": "^2.1.0", | ||
"co": "^4.6.0", | ||
"co-gather": "^0.0.1", | ||
"debug": "^2.6.2", | ||
"egg-logger": "^1.5.0", | ||
"is-type-of": "^1.0.0", | ||
"debug": "^3.1.0", | ||
"egg-logger": "^1.6.0", | ||
"is-type-of": "^1.2.0", | ||
"long": "^3.2.0", | ||
"mkdirp": "^0.5.1", | ||
"mz-modules": "^2.0.0", | ||
"osenv": "^0.1.4", | ||
"sdk-base": "^3.1.0", | ||
"tcp-base": "^2.0.0", | ||
"utility": "^1.11.0" | ||
"sdk-base": "^3.3.0", | ||
"tcp-base": "^3.0.0", | ||
"utility": "^1.12.0" | ||
}, | ||
"devDependencies": { | ||
"autod": "^2.7.1", | ||
"autod": "^2.9.0", | ||
"contributors": "^0.5.1", | ||
"egg-bin": "^2.4.0", | ||
"egg-ci": "^1.5.0", | ||
"eslint": "^3.17.1", | ||
"eslint-config-egg": "^3.2.0", | ||
"mm": "^2.1.0", | ||
"pedding": "^1.1.0", | ||
"rimraf": "^2.6.1", | ||
"urllib": "^2.21.0" | ||
"egg-bin": "^4.3.3", | ||
"eslint": "^4.7.2", | ||
"eslint-config-egg": "^5.1.1", | ||
"mm": "^2.2.0", | ||
"urllib": "^2.25.0" | ||
}, | ||
"engines": { | ||
"node": ">= 6.0.0" | ||
}, | ||
"ci": { | ||
"version": "6, 7" | ||
} | ||
} |
@@ -35,19 +35,17 @@ ali-ons | ||
const httpclient = require('urllib'); | ||
const Consumer = require('ali-ons').Consumer; | ||
const consumer = new Consumer({ | ||
httpclient, | ||
accessKey: 'your-accesskey', | ||
secretKey: 'your-secretkey', | ||
consumerGroup: 'your-consumer-group', | ||
isBroadcast: false, // default is false, that mean messages will be pushed to consumer cluster only once. | ||
// isBroadcast: true, | ||
}); | ||
consumer.subscribe('your-topic', '*'); | ||
consumer.on('message', (msgs, done) => { | ||
msgs.forEach(msg => console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)); | ||
done(); | ||
consumer.subscribe(config.topic, '*', function*(msg) { | ||
console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`) | ||
}); | ||
consumer.on('error', err => console.log(err.stack)); | ||
consumer.ready(() => console.log('consumer is ready')); | ||
consumer.on('error', err => console.log(err)); | ||
``` | ||
@@ -54,0 +52,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
137020
7
40
3906
0
18
80
+ Addedbinary-search-insert@^1.0.3
+ Addedmz-modules@^2.0.0
+ Addedansi-styles@3.2.1(transitive)
+ Addedbalanced-match@1.0.2(transitive)
+ Addedbinary-search-insert@1.0.3(transitive)
+ Addedbrace-expansion@1.1.11(transitive)
+ Addedbytes@3.1.2(transitive)
+ Addedchalk@2.4.2(transitive)
+ Addedcolor-convert@1.9.3(transitive)
+ Addedcolor-name@1.1.3(transitive)
+ Addedconcat-map@0.0.1(transitive)
+ Addeddebug@3.2.7(transitive)
+ Addedend-of-stream@1.4.4(transitive)
+ Addedfs.realpath@1.0.0(transitive)
+ Addedglob@7.2.3(transitive)
+ Addedhas-flag@3.0.0(transitive)
+ Addedinflight@1.0.6(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedko-sleep@1.1.4(transitive)
+ Addedminimatch@3.1.2(transitive)
+ Addedms@2.1.3(transitive)
+ Addedmz-modules@2.1.0(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedpath-is-absolute@1.0.1(transitive)
+ Addedpump@3.0.2(transitive)
+ Addedrimraf@2.7.1(transitive)
+ Addedsupports-color@5.5.0(transitive)
+ Addedtcp-base@3.2.0(transitive)
+ Addedwrappy@1.0.2(transitive)
- Removedbytes@2.5.0(transitive)
- Removedtcp-base@2.0.0(transitive)
Updatedaddress@^1.0.3
Updatedbyte@^1.2.0
Updatedbytes@^3.0.0
Updatedchalk@^2.1.0
Updateddebug@^3.1.0
Updatedegg-logger@^1.6.0
Updatedis-type-of@^1.2.0
Updatedsdk-base@^3.3.0
Updatedtcp-base@^3.0.0
Updatedutility@^1.12.0