Comparing version 3.9.0 to 3.10.0-beta.0
@@ -341,2 +341,5 @@ 'use strict'; | ||
subscriptionData.subString = '*'; | ||
} else if (typeof subString === 'object') { | ||
subscriptionData.expressionType = subString.expressionType; | ||
subscriptionData.subString = subString.subString; | ||
} else { | ||
@@ -434,2 +437,3 @@ const tags = subString.split('||'); | ||
const subExpression = this.options.postSubscriptionWhenPull ? subscriptionData.subString : null; | ||
const expressionType = subscriptionData.expressionType; | ||
const subVersion = subscriptionData.subVersion; | ||
@@ -445,5 +449,5 @@ | ||
this.logger.info('[MQPushConsumer] start to pull message from queue: %s, nextOffset: %s, commitOffset: %s, subExpression: %s, subVersion: %s', | ||
messageQueue.key, pullRequest.nextOffset, commitOffset, subExpression, subVersion); | ||
const pullResult = await this.pullKernelImpl(messageQueue, subExpression, subVersion, pullRequest.nextOffset, commitOffset); | ||
this.logger.info('[MQPushConsumer] start to pull message from queue: %s, nextOffset: %s, commitOffset: %s, subExpression: %s, expressionType: %s, subVersion: %s', | ||
messageQueue.key, pullRequest.nextOffset, commitOffset, subExpression, expressionType, subVersion); | ||
const pullResult = await this.pullKernelImpl(messageQueue, subExpression, expressionType, subVersion, pullRequest.nextOffset, commitOffset); | ||
this.updatePullFromWhichNode(messageQueue, pullResult.suggestWhichBrokerId); | ||
@@ -484,3 +488,3 @@ const originOffset = pullRequest.nextOffset; | ||
async pullKernelImpl(messageQueue, subExpression, subVersion, offset, commitOffset) { | ||
async pullKernelImpl(messageQueue, subExpression, subVersion, expressionType, offset, commitOffset) { | ||
let sysFlag = PullSysFlag.buildSysFlag( // | ||
@@ -514,2 +518,6 @@ commitOffset > 0, // commitOffset | ||
}; | ||
// 拉取消息时需要设置表达式类型,未设置默认为 tag | ||
if (expressionType) { | ||
requestHeader.expressionType = expressionType; | ||
} | ||
return await this._mqClient.pullMessage(result.brokerAddr, requestHeader, this.options.consumerTimeoutMillisWhenSuspend); | ||
@@ -516,0 +524,0 @@ } |
{ | ||
"name": "ali-ons", | ||
"version": "3.9.0", | ||
"version": "3.10.0-beta.0", | ||
"description": "Aliyun Open Notification Service Client", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -51,2 +51,17 @@ ali-ons | ||
If you want to use sql filter, you can subscribe a topic with a sql expression: | ||
```js | ||
consumer.subscribe( | ||
config.topic, | ||
{ | ||
expressionType: 'SQL92', | ||
subString: 'a is not null' | ||
}, | ||
async msg => { | ||
console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`) | ||
} | ||
); | ||
``` | ||
For more information about sql filter, see: [Filter Messages By SQL92](https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/) | ||
producer | ||
@@ -53,0 +68,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
159427
4219
108
1