ease-ali-kafka
基于 node-rdkafka 的阿里云MQ(Kafka) Node.js 客户端. 主要是简化配置和使用方式.
npm install ease-ali-kafka
使用方法
引入模块
const Kafka = require('ease-ali-kafka');
初始化
Kafka.init({
bootstrap: 'kafka-ons-internet.aliyun.com:8080',
username: '你的阿里云帐号 access key',
password: '你的阿里云帐号 secret 后 10 位',
consumerID: '在阿里云上配置的 consumer ID',
reportTimeout: 3000,
keyEncode: 'base64',
producerPollInterval: 200,
retries: 2,
});
发送消息
Kafka.sendKafkaMessage('TOPIC_NAME', {
id: 123,
message: 'hello'
}).then(report => console.log('send success, report is ', report))
.catch(err => console.error('send failed. ', err));
.sendKafkaMessage(topicName, content)
return: Promise<report>
topicName
: {String} 目标topic名称
content
: {String|Object|Buffer} 要发送的内容, 可以直接写 Object.
report
: {Object} 消息回执. 其中主要包含字段 key
{Buffer}, topic
{String}, partition
{Number}, offset
{Number}
如因网络问题, 发送后3秒内未收到回执, 则会报 timeout 错误.
订阅并消费消息
Kafka.subscribe('TOPIC_NAME', function (data, commit, next) {
console.log('consuming data', data);
commit();
next();
})
.subscribe(topicName, handler)
return: undefined
topicName
: {String} 目标topic名称
handler
: {Function|AsyncFunction} 处理消息的函数. 有多种消费方式
Kafka.subscribe(topic, function (data) {
});
Kafka.subscribe(topic, function (data, commit) {
});
Kafka.subscribe(topic, function (data, commit, next) {
})
data
: {Buffer} 收到的消息内容, Buffer 类型. 通过 ._metadata
属性可以获取相关元数据
commit
: {Function} 执行该函数则给 Kafka 发送消息成功消费的回执
next
: {Function} 执行该函数则开始处理下一个消息. 否则即使有新的消息待处理, 也会等待当前消息处理完成才会处理下一个消息
支持订阅多个不同的 topic.
License
MIT