Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

no-kafka

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

no-kafka - npm Package Compare versions

Comparing version 2.7.0 to 2.7.1

80

lib/client.js

@@ -179,6 +179,6 @@ 'use strict';

Client.prototype.metadataRequest = function (topicNames) {
var self = this, buffer;
var self = this, buffer, correlationId = self.correlationId++;
buffer = self.protocol.write().MetadataRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -189,3 +189,3 @@ topicNames: topicNames || []

return Promise.any(self.initialBrokers.map(function (connection) {
return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).MetadataResponse().result;

@@ -288,4 +288,5 @@ });

return Promise.all(_.map(requests, function (topics, leader) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().ProduceRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -297,3 +298,3 @@ requiredAcks: self.options.requiredAcks,

return self.brokerConnections[leader].send(buffer, self.options.requiredAcks === 0).then(function (responseBuffer) {
return self.brokerConnections[leader].send(correlationId, buffer, self.options.requiredAcks === 0).then(function (responseBuffer) {
if (self.options.requiredAcks !== 0) {

@@ -322,3 +323,3 @@ // TODO: ThrottleTime is returned in V1 so we should change the return value soon

return Promise.all(_.map(requests, function (topics, leader) {
var buffer;
var buffer, correlationId = self.correlationId++;
// fake LeaderNotAvailable for all topics with no leader

@@ -330,3 +331,3 @@ if (leader === -1 || !self.brokerConnections[leader]) {

buffer = self.protocol.write().FetchRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -338,3 +339,3 @@ maxWaitTime: self.options.maxWaitTime,

return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) {
// TODO: ThrottleTime is returned in V1 so we should change the return value soon

@@ -374,4 +375,5 @@ // [ topics, throttleTime ] or { topics, throttleTime }

return Promise.all(_.map(requests, function (topics, leader) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().OffsetRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -381,3 +383,3 @@ topics: topics

return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).OffsetResponse().result.topics;

@@ -395,4 +397,5 @@ });

return Promise.all(_.map(requests, function (topics, leader) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().OffsetCommitRequestV0({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -403,3 +406,3 @@ groupId: groupId,

return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics;

@@ -418,4 +421,5 @@ });

return Promise.all(_.map(requests, function (topics, leader) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().OffsetFetchRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -427,3 +431,3 @@ apiVersion: 0,

return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics;

@@ -441,4 +445,5 @@ });

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().OffsetFetchRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -450,3 +455,3 @@ apiVersion: 1,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics;

@@ -474,3 +479,3 @@ });

Client.prototype._findGroupCoordinator = function (groupId) {
var self = this, buffer;
var self = this, buffer, correlationId = self.correlationId++;

@@ -482,3 +487,3 @@ if (self.groupCoordinators[groupId] && !self.groupCoordinators[groupId].isRejected()) {

buffer = self.protocol.write().GroupCoordinatorRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -489,3 +494,3 @@ groupId: groupId

self.groupCoordinators[groupId] = Promise.any(self.initialBrokers.map(function (connection) {
return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
var result = self.protocol.read(responseBuffer).GroupCoordinatorResponse().result;

@@ -518,4 +523,5 @@ if (result.error) {

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().JoinConsumerGroupRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -528,3 +534,3 @@ groupId: groupId,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
var result = self.protocol.read(responseBuffer).JoinConsumerGroupResponse().result;

@@ -543,4 +549,5 @@ if (result.error) {

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().HeartbeatRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -552,3 +559,3 @@ groupId: groupId,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
var result = self.protocol.read(responseBuffer).HeartbeatResponse().result;

@@ -567,4 +574,5 @@ if (result.error) {

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().SyncConsumerGroupRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -577,3 +585,3 @@ groupId: groupId,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
var result = self.protocol.read(responseBuffer).SyncConsumerGroupResponse().result;

@@ -592,4 +600,5 @@ if (result.error) {

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().LeaveGroupRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -600,3 +609,3 @@ groupId: groupId,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
var result = self.protocol.read(responseBuffer).LeaveGroupResponse().result;

@@ -616,4 +625,5 @@ if (result.error) {

return self._findGroupCoordinator(groupId).then(function (connection) {
var correlationId = self.correlationId++;
var buffer = self.protocol.write().OffsetCommitRequestV2({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -627,3 +637,3 @@ groupId: groupId,

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics;

@@ -637,7 +647,7 @@ });

Client.prototype.listGroupsRequest = function () {
var self = this, buffer;
var self = this, buffer, correlationId = self.correlationId++;
return self._waitMetadata().then(function () {
buffer = self.protocol.write().ListGroupsRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId

@@ -647,3 +657,3 @@ }).result;

return Promise.map(_.values(self.brokerConnections), function (connection) {
return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).ListGroupResponse().result.groups;

@@ -656,7 +666,7 @@ });

Client.prototype.describeGroupRequest = function (groupId) {
var self = this;
var self = this, correlationId = self.correlationId++;
return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.protocol.write().DescribeGroupRequest({
correlationId: self.correlationId++,
correlationId: correlationId,
clientId: self.options.clientId,

@@ -666,3 +676,3 @@ groups: [groupId]

return connection.send(buffer).then(function (responseBuffer) {
return connection.send(correlationId, buffer).then(function (responseBuffer) {
return self.protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0];

@@ -669,0 +679,0 @@ });

@@ -6,2 +6,3 @@ 'use strict';

var NoKafkaConnectionError = require('./errors').NoKafkaConnectionError;
var _ = require('lodash');

@@ -19,3 +20,3 @@ function Connection(options) {

this.queue = [];
this.queue = {};
}

@@ -95,7 +96,7 @@

this.queue.forEach(function (t) {
_.each(this.queue, function (t) {
t.reject(err);
});
this.queue = [];
this.queue = {};
};

@@ -123,3 +124,3 @@

*/
Connection.prototype.send = function (data, noresponse) {
Connection.prototype.send = function (correlationId, data, noresponse) {
var self = this, buffer = new Buffer(4 + data.length);

@@ -132,6 +133,6 @@

return new Promise(function (resolve, reject) {
self.queue.push({
self.queue[correlationId] = {
resolve: resolve,
reject: reject
});
};

@@ -141,3 +142,4 @@ self.socket.write(buffer);

if (noresponse === true) {
self.queue.shift().resolve();
self.queue[correlationId].resolve();
delete self.queue[correlationId];
}

@@ -157,3 +159,3 @@ });

Connection.prototype._receive = function (data) {
var length;
var length, correlationId;

@@ -188,4 +190,7 @@ if (!this.connected) {

this.queue.shift().resolve(data.slice(4, length + 4));
correlationId = data.readInt32BE(4);
this.queue[correlationId].resolve(data.slice(4, length + 4));
delete this.queue[correlationId];
if (data.length > 4 + length) {

@@ -192,0 +197,0 @@ this._receive(data.slice(length + 4));

@@ -9,3 +9,3 @@ {

},
"version": "2.7.0",
"version": "2.7.1",
"main": "./lib/index.js",

@@ -12,0 +12,0 @@ "keywords": [

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc