Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ali-ons

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ali-ons - npm Package Compare versions

Comparing version 1.0.0 to 2.0.0

lib/store/local_memory.js

9

History.md
2.0.0 / 2017-09-29
==================
* feat: add local memory store
* refactor: new consumer api
* fix: consumer offset update issue
1.0.0 / 2017-03-14
==================
* feat: implement ali-ons base on rocketmq #1
* feat: implement ali-ons base on rocketmq #1

287

lib/consumer/mq_push_consumer.js

@@ -10,2 +10,3 @@ 'use strict';

const MQClient = require('../mq_client');
const sleep = require('mz-modules/sleep');
const ClientConfig = require('../client_config');

@@ -20,3 +21,3 @@ const ProcessQueue = require('../process_queue');

const LocalFileOffsetStore = require('../store/local_file');
// const MessageDecoder = require('../message/message_decoder');
const LocalMemoryOffsetStore = require('../store/local_memory');
const RemoteBrokerOffsetStore = require('../store/remote_broker');

@@ -27,5 +28,7 @@ const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely');

logger,
persistent: false, // 是否持久化消费进度
isBroadcast: false, // 是否是广播模式(默认集群消费模式)
brokerSuspendMaxTimeMillis: 1000 * 15, // 长轮询模式,Consumer连接在Broker挂起最长时间
pullTimeDelayMillsWhenException: 3000, // 拉消息异常时,延迟一段时间再拉
pullTimeDelayMillsWhenFlowControl: 5000, // 进入流控逻辑,延迟一段时间再拉
consumerTimeoutMillisWhenSuspend: 1000 * 30, // 长轮询模式,Consumer超时时间(必须要大于brokerSuspendMaxTimeMillis)

@@ -41,3 +44,3 @@ consumerGroup: MixAll.DEFAULT_CONSUMER_GROUP,

consumeTimestamp: utils.timeMillisToHumanString(Date.now() - 1000 * 60 * 30),
pullThresholdForQueue: 1000, // 本地队列消息数超过此阀值,开始流控
pullThresholdForQueue: 500, // 本地队列消息数超过此阀值,开始流控
pullInterval: 0, // 拉取消息的频率, 如果为了降低拉取速度,可以设置大于0的值

@@ -61,5 +64,7 @@ consumeMessageBatchMaxSize: 1, // 消费一批消息,最大数

this._subscriptions = new Map();
this._handles = new Map();
this._topicSubscribeInfoTable = new Map();
this._processQueueTable = new Map();
this._inited = false;
this._closePromise = this.await('close');

@@ -71,8 +76,6 @@ if (this.messageModel === MessageModel.CLUSTERING) {

this._mqClient = MQClient.getAndCreateMQClient(this);
this._offsetStore = this.options.isBroadcast ?
new LocalFileOffsetStore(this._mqClient, this.consumerGroup) :
new RemoteBrokerOffsetStore(this._mqClient, this.consumerGroup);
this._offsetStore = this.newOffsetStoreInstance();
this._mqClient.on('error', err => this._error(err));
this._offsetStore.on('error', err => this._error(err));
this._mqClient.on('error', err => this._handleError(err));
this._offsetStore.on('error', err => this._handleError(err));
}

@@ -88,2 +91,6 @@

get processQueueTable() {
return this._processQueueTable;
}
get consumerGroup() {

@@ -132,7 +139,17 @@ return this.options.consumerGroup;

yield this._mqClient.close();
this.removeAllListeners();
this.logger.info('[mq:consumer] consumer closed');
this.emit('close');
}.bind(this));
}
newOffsetStoreInstance() {
if (this.messageModel === MessageModel.BROADCASTING) {
if (this.options.persistent) {
return new LocalFileOffsetStore(this._mqClient, this.consumerGroup);
}
return new LocalMemoryOffsetStore(this._mqClient, this.consumerGroup);
}
return new RemoteBrokerOffsetStore(this._mqClient, this.consumerGroup);
}
/**

@@ -142,15 +159,76 @@ * subscribe

* @param {String} subExpression - tag
* @param {Function} handler - message handler
* @return {void}
*/
subscribe(topic, subExpression) {
subscribe(topic, subExpression, handler) {
if (arguments.length === 2) {
handler = subExpression;
subExpression = null;
}
assert(is.generatorFunction(handler), '[MQPushConsumer] handler should a generatorFunction');
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression);
this.subscriptions.set(topic, subscriptionData);
const tagsSet = subscriptionData.tagsSet;
const needFilter = !!tagsSet.length;
this.subscriptions.set(topic, {
handler,
subscriptionData,
});
if (this._inited) {
co(function* () {
co(function* () {
yield this.ready();
// 如果 topic 没有路由信息,先更新一下
if (!this._topicSubscribeInfoTable.has(topic)) {
yield this._mqClient.updateAllTopicRouterInfo();
yield this._mqClient.sendHeartbeatToAllBroker();
yield this._mqClient.doRebalance();
}.bind(this)).catch(err => this._error(err));
}
}
while (this._inited && this.subscriptions.has(topic)) {
const mqList = this._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = this._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
const msg = pq.msgList[0];
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
try {
yield handler(msg, mq, pq);
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// TODO: support retry message
msg.reconsumeTimes++;
yield this._sleep(5000);
continue;
} else {
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
}
} else {
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
const offset = pq.remove();
if (offset >= 0) {
this._offsetStore.updateOffset(mq, offset, true);
}
}
}
}
}
if (!hasMsg) {
yield Promise.race([
this._closePromise,
this.await(`topic_${topic}_changed`),
]);
}
}
}.bind(this)).catch(err => this._handleError(err));
}

@@ -178,12 +256,8 @@

const tags = subString.split('||');
if (tags && tags.length) {
for (let tag of tags) {
tag = tag.trim();
if (tag) {
subscriptionData.tagsSet.push(tag);
subscriptionData.codeSet.push(utils.hashCode(tag));
}
for (let tag of tags) {
tag = tag.trim();
if (tag) {
subscriptionData.tagsSet.push(tag);
subscriptionData.codeSet.push(utils.hashCode(tag));
}
} else {
throw new Error('[mq:consumer] subString split error');
}

@@ -211,6 +285,6 @@ }

co(function* () {
while (this._processQueueTable.has(messageQueue.key)) {
while (this._inited && this._processQueueTable.has(messageQueue.key)) {
try {
yield this.executePullRequestImmediately(messageQueue);
yield sleep(this.options.pullInterval);
yield this._sleep(this.options.pullInterval);
} catch (err) {

@@ -220,4 +294,4 @@ if (this._inited) {

err.message = `[mq:consumer] pull message for queue: ${messageQueue.key}, occurred error: ${err.message}`;
this.logger.error(err);
yield sleep(this.options.pullTimeDelayMillsWhenException);
this._handleError(err);
yield this._sleep(this.options.pullTimeDelayMillsWhenException);
}

@@ -245,8 +319,16 @@ }

}
// flow control
const size = processQueue.msgCount;
if (size > this.options.pullThresholdForQueue) {
yield this._sleep(this.options.pullTimeDelayMillsWhenFlowControl);
return;
}
processQueue.lastPullTimestamp = Date.now();
const subscriptionData = this.subscriptions.get(messageQueue.topic);
const data = this.subscriptions.get(messageQueue.topic);
const subscriptionData = data && data.subscriptionData;
if (!subscriptionData) {
this.logger.warn('[mq:consumer] execute pull request, but subscriptionData not found, topic: %s, queueId: %s', messageQueue.topic, messageQueue.queueId);
yield sleep(this.options.pullTimeDelayMillsWhenException);
yield this._sleep(this.options.pullTimeDelayMillsWhenException);
return;

@@ -274,37 +356,12 @@ }

case PullStatus.FOUND:
{
let msgList = pullResult.msgFoundList;
this.logger.info('[mq:consumer] new messages found, size: %d', msgList.length);
if (subscriptionData.tagsSet && subscriptionData.tagsSet.length && !subscriptionData.classFilterMode) {
msgList = msgList.filter(function(msg) {
if (msg.tags && subscriptionData.tagsSet.indexOf(msg.tags) >= 0) {
return true;
}
return false;
});
}
this.logger.info('[mq:consumer] after filter by tags %d messages remaining.', msgList.length);
if (msgList && msgList.length) {
// todo: long ?
const firstMsgOffset = Number(msgList[0].queueOffset);
const lastMsgOffset = Number(msgList[msgList.length - 1].queueOffset);
{
const pullRT = Date.now() - processQueue.lastPullTimestamp;
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.',
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT);
yield new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Message process timeout for topic: ${subscriptionData.topic}, from ${firstMsgOffset} to ${lastMsgOffset}.`));
}, 3000);
// emit message event
this.emit('message', msgList, () => {
resolve();
clearTimeout(timer);
});
});
// update offset
this._offsetStore.updateOffset(messageQueue, lastMsgOffset + 1);
this.logger.info('[mq:consumer] process message successfully for topic: %s at queue: %s from %d to %d', subscriptionData.topic, messageQueue.queueId, firstMsgOffset, lastMsgOffset);
}
break;
}
// submit to consumer
processQueue.putMessage(pullResult.msgFoundList);
this.emit(`topic_${messageQueue.topic}_changed`);
break;
}
case PullStatus.NO_NEW_MSG:

@@ -316,8 +373,4 @@ case PullStatus.NO_MATCHED_MSG:

case PullStatus.OFFSET_ILLEGAL:
this.logger.warn('[mq:consumer] the pull request offset illegal, message queue => %s, the originOffset => %j, pullResult => %j', messageQueue.key, originOffset, pullResult);
pullRequest.processQueue.droped = true;
yield sleep(10000);
this.logger.warn('[mq:consumer] the pull request offset illegal, message queue => %s, the originOffset => %d, pullResult => %j', messageQueue.key, originOffset, pullResult);
this._offsetStore.updateOffset(messageQueue, pullRequest.nextOffset);
yield this._offsetStore.persist(messageQueue);
yield this.removeProcessQueue(messageQueue);
break;

@@ -327,4 +380,2 @@ default:

}
// update pull request
this._processQueueTable.set(messageQueue.key, pullRequest);
}

@@ -435,2 +486,3 @@

let changed;
let allocateResult = mqSet;
if (this.options.isBroadcast) {

@@ -446,3 +498,3 @@ changed = yield this.updateProcessQueueTableInRebalance(topic, mqSet);

const allocateResult = this.allocateMessageQueueStrategy.allocate(this.consumerGroup, this._mqClient.clientId, mqSet, cidAll);
allocateResult = this.allocateMessageQueueStrategy.allocate(this.consumerGroup, this._mqClient.clientId, mqSet, cidAll);
this.logger.info('[mq:consumer] allocate queue for group: %s, clientId: %s, result: %j', this.consumerGroup, this._mqClient.clientId, allocateResult);

@@ -453,4 +505,4 @@ changed = yield this.updateProcessQueueTableInRebalance(topic, allocateResult);

if (changed) {
this.logger.info('[mq:consumer] do rebalance and message queue changed, topic: %s, mqSet: %j', topic, mqSet);
this.emit('messageQueueChanged', topic, mqSet);
this.logger.info('[mq:consumer] do rebalance and message queue changed, topic: %s, mqSet: %j', topic, allocateResult);
this.emit(`topic_${topic}_queue_changed`);
}

@@ -470,6 +522,2 @@ }

const obj = this._processQueueTable.get(key);
if (!obj) {
this._processQueueTable.delete(key);
}
const messageQueue = obj.messageQueue;

@@ -482,12 +530,10 @@ const processQueue = obj.processQueue;

processQueue.droped = true;
if (yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue)) {
changed = true;
this._processQueueTable.delete(key);
}
}
} else if (processQueue.isPullExpired && this.consumeType === ConsumeType.CONSUME_PASSIVELY) {
processQueue.droped = true;
if (yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue)) {
yield this.removeProcessQueue(messageQueue);
changed = true;
this._processQueueTable.delete(key);
} else if (processQueue.isPullExpired && this.consumeType === ConsumeType.CONSUME_PASSIVELY) {
processQueue.droped = true;
yield this.removeProcessQueue(messageQueue);
changed = true;
this.logger.warn('[MQPushConsumer] BUG doRebalance, %s, remove unnecessary mq=%s, because pull is pause, so try to fixed it',
this.consumerGroup, messageQueue.key);
}

@@ -586,3 +632,3 @@ }

err.mesasge = 'computePullFromWhere() occurred an exception, ' + err.mesasge;
this.logger.error(err);
this._handleError(err);
return -1;

@@ -601,6 +647,5 @@ }

if (processQueue) {
const droped = processQueue.droped;
processQueue.droped = true;
yield this.removeUnnecessaryMessageQueue(messageQueue, processQueue);
this.logger.info('[mq:consumer] Fix Offset, %s, remove unnecessary messageQueue, %j Droped: %s', this.consumerGroup, messageQueue, droped);
this.logger.info('[mq:consumer] remove unnecessary messageQueue, %s, Droped: %s', messageQueue.key, processQueue.droped);
}

@@ -612,3 +657,3 @@ }

* @param {MessageQueue} messageQueue - message queue
* @return {Boolean} success or not
* @return {void}
*/

@@ -619,3 +664,2 @@ * removeUnnecessaryMessageQueue(messageQueue) {

// todo: consume later ?
return true;
}

@@ -628,46 +672,13 @@

_error(err) {
setImmediate(() => {
err.message = 'MQPushConsumer occurred an error' + err.message;
this.emit('error', err);
});
_handleError(err) {
err.message = 'MQPushConsumer occurred an error ' + err.message;
this.emit('error', err);
}
on(event, listener) {
let newListener = listener;
if (event === 'message' && is.function(listener) && listener.length === 1) {
newListener = (message, done) => {
listener(message);
done();
};
newListener.__oldListener = listener;
}
return super.on(event, newListener);
_sleep(timeout) {
return Promise.race([
this._closePromise,
sleep(timeout),
]);
}
once(event, listener) {
let newListener = listener;
if (event === 'message' && is.function(listener) && listener.length === 1) {
newListener = (message, done) => {
listener(message);
done();
};
newListener.__oldListener = listener;
}
return super.once(event, newListener);
}
removeListener(event, listener) {
let newListener = listener;
if (event === 'message' && is.function(listener) && listener.length === 1) {
const listeners = this.listeners('message');
for (let i = 0, len = listeners.length; i < len; i++) {
if (listeners[i].__oldListener === listener) {
newListener = listeners[i];
break;
}
}
}
return super.removeListener(event, newListener);
}
}

@@ -688,11 +699,1 @@

}
function sleep(interval) {
return callback => {
if (interval <= 0) {
setImmediate(callback);
} else {
setTimeout(callback, interval);
}
};
}

@@ -38,4 +38,4 @@ 'use strict';

mod > 0 && index < mod ?
mqLen / cidLen + 1 :
mqLen / cidLen;
mqLen / cidLen + 1 :
mqLen / cidLen;
averageSize = Math.floor(averageSize); // 取整

@@ -42,0 +42,0 @@ const startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;

@@ -95,5 +95,5 @@ 'use strict';

// 6 QUEUEOFFSET
msgExt.queueOffset = byteBuffer.getLong();
msgExt.queueOffset = byteBuffer.getLong().toNumber();
// 7 PHYSICALOFFSET
msgExt.commitLogOffset = byteBuffer.getLong();
msgExt.commitLogOffset = byteBuffer.getLong().toNumber();
// 8 SYSFLAG

@@ -103,3 +103,3 @@ const sysFlag = byteBuffer.getInt();

// 9 BORNTIMESTAMP
msgExt.bornTimestamp = byteBuffer.getLong();
msgExt.bornTimestamp = byteBuffer.getLong().toNumber();
// 10 BORNHOST

@@ -111,3 +111,3 @@ const host = new Buffer(4);

// 11 STORETIMESTAMP
msgExt.storeTimestamp = byteBuffer.getLong();
msgExt.storeTimestamp = byteBuffer.getLong().toNumber();
// 12 STOREHOST

@@ -124,3 +124,3 @@ host.fill(0);

// 14 Prepared Transaction Offset
msgExt.preparedTransactionOffset = byteBuffer.getLong();
msgExt.preparedTransactionOffset = byteBuffer.getLong().toNumber();
// 15 BODY

@@ -127,0 +127,0 @@ const bodyLen = byteBuffer.getInt();

@@ -31,3 +31,3 @@ 'use strict';

this.storeHost = null;
this.reconsumeTimes = null;
this.reconsumeTimes = 0;
this.preparedTransactionOffset = null; // long

@@ -34,0 +34,0 @@ this.topic = topic;

@@ -31,3 +31,3 @@ 'use strict';

* @param {Object} options
* - {HttpClient} urllib - http request client
* - {HttpClient} httpclient - http request client
* - {Object} [logger] - log module

@@ -85,6 +85,6 @@ * - {Number} [responseTimeout] - tcp response timeout

case ResponseCode.SUCCESS:
{
const responseHeader = response.decodeCommandCustomHeader();
return responseHeader && responseHeader.value;
}
{
const responseHeader = response.decodeCommandCustomHeader();
return responseHeader && responseHeader.value;
}
default:

@@ -111,15 +111,15 @@ this._defaultHandler(request, response);

case ResponseCode.SUCCESS:
{
const body = response.body;
if (body) {
this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString());
// JSON.parse dose not work here
const routerInfoData = JSON2.parse(body.toString());
// sort
routerInfoData.queueDatas.sort(compare);
routerInfoData.brokerDatas.sort(compare);
return routerInfoData;
}
break;
{
const body = response.body;
if (body) {
this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString());
// JSON.parse dose not work here
const routerInfoData = JSON2.parse(body.toString());
// sort
routerInfoData.queueDatas.sort(compare);
routerInfoData.brokerDatas.sort(compare);
return routerInfoData;
}
break;
}
case ResponseCode.TOPIC_NOT_EXIST:

@@ -177,13 +177,13 @@ this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);

case ResponseCode.SUCCESS:
{
const body = response.body;
if (body) {
const routerInfoData = JSON2.parse(body.toString());
// sort
routerInfoData.queueDatas.sort(compare);
routerInfoData.brokerDatas.sort(compare);
return routerInfoData;
}
break;
{
const body = response.body;
if (body) {
const routerInfoData = JSON2.parse(body.toString());
// sort
routerInfoData.queueDatas.sort(compare);
routerInfoData.brokerDatas.sort(compare);
return routerInfoData;
}
break;
}
case ResponseCode.TOPIC_NOT_EXIST:

@@ -253,6 +253,6 @@ this.logger.warn('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);

case ResponseCode.SUCCESS:
{
const responseHeader = response.decodeCommandCustomHeader();
return Number(responseHeader.offset.toString());
}
{
const responseHeader = response.decodeCommandCustomHeader();
return Number(responseHeader.offset.toString());
}
default:

@@ -283,7 +283,7 @@ this._defaultHandler(request, response);

case ResponseCode.SUCCESS:
{
const responseHeader = response.decodeCommandCustomHeader();
// todo:
return responseHeader && Number(responseHeader.offset);
}
{
const responseHeader = response.decodeCommandCustomHeader();
// todo:
return responseHeader && Number(responseHeader.offset);
}
default:

@@ -315,7 +315,7 @@ this._defaultHandler(request, response);

case ResponseCode.SUCCESS:
{
const responseHeader = response.decodeCommandCustomHeader();
// todo:
return responseHeader && Number(responseHeader.offset);
}
{
const responseHeader = response.decodeCommandCustomHeader();
// todo:
return responseHeader && Number(responseHeader.offset);
}
default:

@@ -497,3 +497,3 @@ this._defaultHandler(request, response);

messageQueue,
queueOffset: responseHeader.queueOffset,
queueOffset: Number(responseHeader.queueOffset),
};

@@ -500,0 +500,0 @@ }

@@ -224,3 +224,3 @@ 'use strict';

data.error.message = `[mq:client] updateAllTopicRouterInfo occurred error, ${data.error.message}`;
this.logger.error(data.error);
this.emit('error', data.error);
}

@@ -244,3 +244,3 @@ });

defaultMQProducer.defaultTopicQueueNums < data.readQueueNums ?
defaultMQProducer.defaultTopicQueueNums : data.readQueueNums;
defaultMQProducer.defaultTopicQueueNums : data.readQueueNums;
data.readQueueNums = queueNums;

@@ -446,3 +446,6 @@ data.writeQueueNums = queueNums;

for (const topic of consumer.subscriptions.keys()) {
subscriptionDataSet.push(consumer.subscriptions.get(topic));
const data = consumer.subscriptions.get(topic);
if (data && data.subscriptionData) {
subscriptionDataSet.push(data.subscriptionData);
}
}

@@ -449,0 +452,0 @@

'use strict';
const Base = require('sdk-base');
const bsInsert = require('binary-search-insert');
const comparator = (a, b) => a.queueOffset - b.queueOffset;
const pullMaxIdleTime = 120000;
class ProcessQueue {
constructor() {
this.msgs = [];
class ProcessQueue extends Base {
constructor(options = {}) {
super(options);
// 当前Q是否被rebalance丢弃
this.msgList = [];
this.droped = false;
this.lastPullTimestamp = Date.now();
// 最后一次消费的时间戳
this.lastConsumeTimestamp = Date.now();
// 是否从Broker锁定
this.locked = false;
// 最后一次锁定成功时间戳
this.lastLockTimestamp = Date.now();
}
get maxSpan() {
const msgCount = this.msgCount;
if (msgCount) {
return this.msgList[msgCount - 1].queueOffset - this.msgList[0].queueOffset;
}
return 0;
}
get msgCount() {
return this.msgList.length;
}
get isPullExpired() {
return Date.now() - this.lastPullTimestamp > pullMaxIdleTime;
}
putMessage() {}
removeMessage() {}
takeMessags() {}
putMessage(msgs) {
for (const msg of msgs) {
bsInsert(this.msgList, comparator, msg);
}
this.queueOffsetMax = this.msgList[this.msgCount - 1].queueOffset;
}
remove(count = 1) {
this.msgList.splice(0, count);
return this.msgCount ? this.msgList[0].queueOffset : this.queueOffsetMax + 1;
}
clear() {
this.msgList = [];
}
}
module.exports = ProcessQueue;

@@ -11,3 +11,3 @@ 'use strict';

const ClientConfig = require('../client_config');
const PermName = require('../protocol/perm_name');
// const PermName = require('../protocol/perm_name');
const MessageConst = require('../message/message_const');

@@ -151,33 +151,33 @@ const TopicPublishInfo = require('./topic_publish_info');

*/
* createTopic(key, newTopic, queueNum, topicSysFlag) {
yield this.ready();
// * createTopic(key, newTopic, queueNum, topicSysFlag) {
// yield this.ready();
topicSysFlag = topicSysFlag || 0;
const topicRouteData = yield this._mqClient.getTopicRouteInfoFromNameServer(key, 1000 * 3);
const brokerDataList = topicRouteData.brokerDatas;
if (brokerDataList && brokerDataList.length) {
// 排序原因:即使没有配置顺序消息模式,默认队列的顺序同配置的一致。
brokerDataList.sort(compare);
// topicSysFlag = topicSysFlag || 0;
// const topicRouteData = yield this._mqClient.getTopicRouteInfoFromNameServer(key, 1000 * 3);
// const brokerDataList = topicRouteData.brokerDatas;
// if (brokerDataList && brokerDataList.length) {
// // 排序原因:即使没有配置顺序消息模式,默认队列的顺序同配置的一致。
// brokerDataList.sort(compare);
yield brokerDataList.map(brokerData => {
const addr = brokerData.brokerAddrs[MixAll.MASTER_ID];
if (addr) {
const topicConfig = {
topicName: newTopic,
readQueueNums: queueNum,
writeQueueNums: queueNum,
topicSysFlag,
perm: PermName.PERM_READ | PermName.PERM_WRITE,
topicFilterType: 'SINGLE_TAG',
order: false,
};
this.logger.info('[mq:producer] execute createTopic at broker: %s, topic: %s', addr, newTopic);
return this._mqClient.createTopic(addr, key, topicConfig, 1000 * 3);
}
return null;
});
} else {
throw new Error('Not found broker, maybe key is wrong');
}
}
// yield brokerDataList.map(brokerData => {
// const addr = brokerData.brokerAddrs[MixAll.MASTER_ID];
// if (addr) {
// const topicConfig = {
// topicName: newTopic,
// readQueueNums: queueNum,
// writeQueueNums: queueNum,
// topicSysFlag,
// perm: PermName.PERM_READ | PermName.PERM_WRITE,
// topicFilterType: 'SINGLE_TAG',
// order: false,
// };
// this.logger.info('[mq:producer] execute createTopic at broker: %s, topic: %s', addr, newTopic);
// return this._mqClient.createTopic(addr, key, topicConfig, 1000 * 3);
// }
// return null;
// });
// } else {
// throw new Error('Not found broker, maybe key is wrong');
// }
// }

@@ -335,11 +335,11 @@ /**

// Helper
// ---------------
function compare(routerA, routerB) {
if (routerA.brokerName > routerB.brokerName) {
return 1;
} else if (routerA.brokerName < routerB.brokerName) {
return -1;
}
return 0;
}
// // Helper
// // ---------------
// function compare(routerA, routerB) {
// if (routerA.brokerName > routerB.brokerName) {
// return 1;
// } else if (routerA.brokerName < routerB.brokerName) {
// return -1;
// }
// return 0;
// }

@@ -10,3 +10,3 @@ 'use strict';

logger,
responseTimeout: 3000,
responseTimeout: 30000,
};

@@ -19,3 +19,3 @@

* @param {Object} options
* - {HttpClient} urllib - http request client
* - {HttpClient} httpclient - http request client
* - {Object} [logger] - log module

@@ -27,3 +27,3 @@ * - {Number} [responseTimeout] - tcp response timeout

assert(options.onsAddr, '[RemotingClient] options.onsAddr is required');
assert(options.urllib, '[RemotingClient] options.urllib is required');
assert(options.httpclient, '[RemotingClient] options.httpclient is required');
super(Object.assign({ initMethod: 'init' }, defaultOptions, options));

@@ -37,6 +37,6 @@

/**
* @property {HttpClient} RemotingClient#urllib
* @property {HttpClient} RemotingClient#httpclient
*/
get urllib() {
return this.options.urllib;
get httpclient() {
return this.options.httpclient;
}

@@ -121,3 +121,3 @@

* updateNameServerAddressList() {
const ret = yield this.urllib.request(this.options.onsAddr);
const ret = yield this.httpclient.request(this.options.onsAddr);
if (ret.status === 200) {

@@ -124,0 +124,0 @@ const addrs = ret.data.toString().trim();

'use strict';
exports.LocalFileOffsetStore = require('./local_file');
exports.LocalMemoryOffsetStore = require('./local_memory');
exports.RemoteBrokerOffsetStore = require('./remote_broker');

@@ -66,5 +66,6 @@ 'use strict';

}
if (increaseOnly || prev < offset) {
this.offsetTable.set(messageQueue.key, offset);
if (prev >= offset && increaseOnly) {
return;
}
this.offsetTable.set(messageQueue.key, offset);
this.logger.info('[mq:LocalFileOffsetStore] update offset for messageQueue: %s, current offset: %d, prev offset: %s, increaseOnly: %s', messageQueue.key, offset, prev, increaseOnly);

@@ -88,21 +89,21 @@ }

case ReadOffsetType.READ_FROM_MEMORY:
{
const offset = this.offsetTable.get(messageQueue.key);
if (!is.nullOrUndefined(offset)) {
return offset;
}
break;
{
const offset = this.offsetTable.get(messageQueue.key);
if (!is.nullOrUndefined(offset)) {
return offset;
}
break;
}
case ReadOffsetType.READ_FROM_STORE:
{
const data = yield this.readLocalOffset();
if (data && data.offsetTable) {
const offset = data.offsetTable[messageQueue.key];
if (is.number(offset)) {
this.updateOffset(messageQueue, offset, false);
return offset;
}
{
const data = yield this.readLocalOffset();
if (data && data.offsetTable) {
const offset = data.offsetTable[messageQueue.key];
if (is.number(offset)) {
this.updateOffset(messageQueue, offset, false);
return offset;
}
break;
}
break;
}
default:

@@ -109,0 +110,0 @@ break;

@@ -54,5 +54,6 @@ 'use strict';

}
if (increaseOnly || prev < offset) {
this.offsetTable.set(messageQueue.key, offset);
if (prev >= offset && increaseOnly) {
return;
}
this.offsetTable.set(messageQueue.key, offset);
this.logger.info('[mq:RemoteBrokerOffsetStore] update offset for messageQueue: %s, current offset: %d, prev offset: %s, increaseOnly: %s', messageQueue.key, offset, prev, increaseOnly);

@@ -76,15 +77,15 @@ }

case ReadOffsetType.READ_FROM_MEMORY:
{
const offset = this.offsetTable.get(messageQueue.key);
if (!is.nullOrUndefined(offset)) {
return offset;
}
break;
{
const offset = this.offsetTable.get(messageQueue.key);
if (!is.nullOrUndefined(offset)) {
return offset;
}
break;
}
case ReadOffsetType.READ_FROM_STORE:
{
const brokerOffset = yield this.fetchConsumeOffsetFromBroker(messageQueue);
this.updateOffset(messageQueue, brokerOffset, false);
return brokerOffset;
}
{
const brokerOffset = yield this.fetchConsumeOffsetFromBroker(messageQueue);
this.updateOffset(messageQueue, brokerOffset, false);
return brokerOffset;
}
default:

@@ -91,0 +92,0 @@ break;

{
"name": "ali-ons",
"version": "1.0.0",
"version": "2.0.0",
"description": "Aliyun Open Notification Service Client",

@@ -10,3 +10,4 @@ "main": "./lib/index.js",

"scripts": {
"autod": "autod",
"autod": "autod --check",
"pkgfiles": "egg-bin pkgfiles --check",
"lint": "eslint . --ext .js",

@@ -16,3 +17,3 @@ "test": "npm run lint && npm run test-local",

"cov": "TEST_TIMEOUT=60000 egg-bin cov",
"ci": "npm run lint && npm run cov",
"ci": "npm run autod && npm run pkgfiles && npm run lint && npm run cov",
"contributors": "contributors -f plain -o AUTHORS"

@@ -37,36 +38,32 @@ },

"JSON2": "^0.1.0",
"address": "^1.0.1",
"byte": "^1.1.5",
"bytes": "^2.4.0",
"chalk": "^1.1.3",
"address": "^1.0.3",
"binary-search-insert": "^1.0.3",
"byte": "^1.2.0",
"bytes": "^3.0.0",
"chalk": "^2.1.0",
"co": "^4.6.0",
"co-gather": "^0.0.1",
"debug": "^2.6.2",
"egg-logger": "^1.5.0",
"is-type-of": "^1.0.0",
"debug": "^3.1.0",
"egg-logger": "^1.6.0",
"is-type-of": "^1.2.0",
"long": "^3.2.0",
"mkdirp": "^0.5.1",
"mz-modules": "^2.0.0",
"osenv": "^0.1.4",
"sdk-base": "^3.1.0",
"tcp-base": "^2.0.0",
"utility": "^1.11.0"
"sdk-base": "^3.3.0",
"tcp-base": "^3.0.0",
"utility": "^1.12.0"
},
"devDependencies": {
"autod": "^2.7.1",
"autod": "^2.9.0",
"contributors": "^0.5.1",
"egg-bin": "^2.4.0",
"egg-ci": "^1.5.0",
"eslint": "^3.17.1",
"eslint-config-egg": "^3.2.0",
"mm": "^2.1.0",
"pedding": "^1.1.0",
"rimraf": "^2.6.1",
"urllib": "^2.21.0"
"egg-bin": "^4.3.3",
"eslint": "^4.7.2",
"eslint-config-egg": "^5.1.1",
"mm": "^2.2.0",
"urllib": "^2.25.0"
},
"engines": {
"node": ">= 6.0.0"
},
"ci": {
"version": "6, 7"
}
}

@@ -35,19 +35,17 @@ ali-ons

const httpclient = require('urllib');
const Consumer = require('ali-ons').Consumer;
const consumer = new Consumer({
httpclient,
accessKey: 'your-accesskey',
secretKey: 'your-secretkey',
consumerGroup: 'your-consumer-group',
isBroadcast: false, // default is false, that mean messages will be pushed to consumer cluster only once.
// isBroadcast: true,
});
consumer.subscribe('your-topic', '*');
consumer.on('message', (msgs, done) => {
msgs.forEach(msg => console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`));
done();
consumer.subscribe(config.topic, '*', function*(msg) {
console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)
});
consumer.on('error', err => console.log(err.stack));
consumer.ready(() => console.log('consumer is ready'));
consumer.on('error', err => console.log(err));
```

@@ -54,0 +52,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc