Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
1
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 0.2.10 to 0.2.11

2

example/consumer.js

@@ -11,3 +11,3 @@ 'use strict';

var client = new Client();
var client = new Client('localhost:2181');
var topics = [

@@ -14,0 +14,0 @@ {topic: topic, partition: 1},

@@ -9,3 +9,3 @@ 'use strict';

var topic = argv.topic || 'topic1';
var client = new Client();
var client = new Client('localhost:2181');
var topics = [ { topic: topic }];

@@ -12,0 +12,0 @@ var options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 };

var kafka = require('../kafka'),
Producer = kafka.Producer,
Client = kafka.Client,
client = new Client();
client = new Client('localhost:2181');

@@ -16,2 +16,6 @@ var argv = require('optimist').argv;

producer.on('error', function (err) {
console.log('error', err)
})
function send(message) {

@@ -18,0 +22,0 @@ for (var i = 0; i < count; i++) {

@@ -40,2 +40,3 @@ 'use strict';

this.ready = false;
this.paused = this.options.paused;
this.id = nextId();

@@ -159,3 +160,3 @@ this.payloads = this.buildPayloads(topics);

Consumer.prototype.fetch = function () {
if (!this.ready) return;
if (!this.ready || this.paused) return;
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes);

@@ -253,2 +254,11 @@ }

Consumer.prototype.pause = function () {
this.paused = true;
}
Consumer.prototype.resume = function () {
this.paused = false;
this.fetch();
}
module.exports = Consumer;

@@ -425,21 +425,7 @@ 'use strict';

HighLevelConsumer.prototype.fetch = function () {
if (!this.ready) {
if (!this.ready || this.rebalancing || this.paused) {
return;
}
if (this.rebalancing) return;
// If paused request 0 data - need to pause in this way otherwise the connection is dropped
if (this.paused) {
// Build empty payloads on demand
var empty_payloads = this.topicPayloads.map(function (j) {
var k = { topic: j.topic, partition: j.partition || 0, offset: j.offset || 0, maxBytes: 0, metadata: 'm'};
return k;
});
this.client.sendFetchRequest(this, empty_payloads, this.options.fetchMaxWaitMs, 0);
}
else {
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages);
}
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages);
}

@@ -548,4 +534,5 @@

this.paused = false;
this.fetch();
}
module.exports = HighLevelConsumer;
{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.2.10",
"version": "0.2.11",
"main": "kafka.js",

@@ -6,0 +6,0 @@ "dependencies": {

@@ -67,2 +67,4 @@ Kafka-node

});
producer.on('error', function (err) {})
```

@@ -280,2 +282,8 @@

### pause()
Pause the consumer
### resume()
Resume the consumer
### close(force, cb)

@@ -282,0 +290,0 @@ * `force`: **Boolean**, if set true, it force commit current offset before close, default false

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc