Comparing version 2.7.0 to 2.7.1
@@ -179,6 +179,6 @@ 'use strict'; | ||
Client.prototype.metadataRequest = function (topicNames) { | ||
var self = this, buffer; | ||
var self = this, buffer, correlationId = self.correlationId++; | ||
buffer = self.protocol.write().MetadataRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -189,3 +189,3 @@ topicNames: topicNames || [] | ||
return Promise.any(self.initialBrokers.map(function (connection) { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).MetadataResponse().result; | ||
@@ -288,4 +288,5 @@ }); | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().ProduceRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -297,3 +298,3 @@ requiredAcks: self.options.requiredAcks, | ||
return self.brokerConnections[leader].send(buffer, self.options.requiredAcks === 0).then(function (responseBuffer) { | ||
return self.brokerConnections[leader].send(correlationId, buffer, self.options.requiredAcks === 0).then(function (responseBuffer) { | ||
if (self.options.requiredAcks !== 0) { | ||
@@ -322,3 +323,3 @@ // TODO: ThrottleTime is returned in V1 so we should change the return value soon | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var buffer; | ||
var buffer, correlationId = self.correlationId++; | ||
// fake LeaderNotAvailable for all topics with no leader | ||
@@ -330,3 +331,3 @@ if (leader === -1 || !self.brokerConnections[leader]) { | ||
buffer = self.protocol.write().FetchRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -338,3 +339,3 @@ maxWaitTime: self.options.maxWaitTime, | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) { | ||
// TODO: ThrottleTime is returned in V1 so we should change the return value soon | ||
@@ -374,4 +375,5 @@ // [ topics, throttleTime ] or { topics, throttleTime } | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().OffsetRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -381,3 +383,3 @@ topics: topics | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).OffsetResponse().result.topics; | ||
@@ -395,4 +397,5 @@ }); | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().OffsetCommitRequestV0({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -403,3 +406,3 @@ groupId: groupId, | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
@@ -418,4 +421,5 @@ }); | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().OffsetFetchRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -427,3 +431,3 @@ apiVersion: 0, | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return self.brokerConnections[leader].send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
@@ -441,4 +445,5 @@ }); | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().OffsetFetchRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -450,3 +455,3 @@ apiVersion: 1, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
@@ -474,3 +479,3 @@ }); | ||
Client.prototype._findGroupCoordinator = function (groupId) { | ||
var self = this, buffer; | ||
var self = this, buffer, correlationId = self.correlationId++; | ||
@@ -482,3 +487,3 @@ if (self.groupCoordinators[groupId] && !self.groupCoordinators[groupId].isRejected()) { | ||
buffer = self.protocol.write().GroupCoordinatorRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -489,3 +494,3 @@ groupId: groupId | ||
self.groupCoordinators[groupId] = Promise.any(self.initialBrokers.map(function (connection) { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
var result = self.protocol.read(responseBuffer).GroupCoordinatorResponse().result; | ||
@@ -518,4 +523,5 @@ if (result.error) { | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().JoinConsumerGroupRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -528,3 +534,3 @@ groupId: groupId, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
var result = self.protocol.read(responseBuffer).JoinConsumerGroupResponse().result; | ||
@@ -543,4 +549,5 @@ if (result.error) { | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().HeartbeatRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -552,3 +559,3 @@ groupId: groupId, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
var result = self.protocol.read(responseBuffer).HeartbeatResponse().result; | ||
@@ -567,4 +574,5 @@ if (result.error) { | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().SyncConsumerGroupRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -577,3 +585,3 @@ groupId: groupId, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
var result = self.protocol.read(responseBuffer).SyncConsumerGroupResponse().result; | ||
@@ -592,4 +600,5 @@ if (result.error) { | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().LeaveGroupRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -600,3 +609,3 @@ groupId: groupId, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
var result = self.protocol.read(responseBuffer).LeaveGroupResponse().result; | ||
@@ -616,4 +625,5 @@ if (result.error) { | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var correlationId = self.correlationId++; | ||
var buffer = self.protocol.write().OffsetCommitRequestV2({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -627,3 +637,3 @@ groupId: groupId, | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
@@ -637,7 +647,7 @@ }); | ||
Client.prototype.listGroupsRequest = function () { | ||
var self = this, buffer; | ||
var self = this, buffer, correlationId = self.correlationId++; | ||
return self._waitMetadata().then(function () { | ||
buffer = self.protocol.write().ListGroupsRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId | ||
@@ -647,3 +657,3 @@ }).result; | ||
return Promise.map(_.values(self.brokerConnections), function (connection) { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).ListGroupResponse().result.groups; | ||
@@ -656,7 +666,7 @@ }); | ||
Client.prototype.describeGroupRequest = function (groupId) { | ||
var self = this; | ||
var self = this, correlationId = self.correlationId++; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.protocol.write().DescribeGroupRequest({ | ||
correlationId: self.correlationId++, | ||
correlationId: correlationId, | ||
clientId: self.options.clientId, | ||
@@ -666,3 +676,3 @@ groups: [groupId] | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return connection.send(correlationId, buffer).then(function (responseBuffer) { | ||
return self.protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0]; | ||
@@ -669,0 +679,0 @@ }); |
@@ -6,2 +6,3 @@ 'use strict'; | ||
var NoKafkaConnectionError = require('./errors').NoKafkaConnectionError; | ||
var _ = require('lodash'); | ||
@@ -19,3 +20,3 @@ function Connection(options) { | ||
this.queue = []; | ||
this.queue = {}; | ||
} | ||
@@ -95,7 +96,7 @@ | ||
this.queue.forEach(function (t) { | ||
_.each(this.queue, function (t) { | ||
t.reject(err); | ||
}); | ||
this.queue = []; | ||
this.queue = {}; | ||
}; | ||
@@ -123,3 +124,3 @@ | ||
*/ | ||
Connection.prototype.send = function (data, noresponse) { | ||
Connection.prototype.send = function (correlationId, data, noresponse) { | ||
var self = this, buffer = new Buffer(4 + data.length); | ||
@@ -132,6 +133,6 @@ | ||
return new Promise(function (resolve, reject) { | ||
self.queue.push({ | ||
self.queue[correlationId] = { | ||
resolve: resolve, | ||
reject: reject | ||
}); | ||
}; | ||
@@ -141,3 +142,4 @@ self.socket.write(buffer); | ||
if (noresponse === true) { | ||
self.queue.shift().resolve(); | ||
self.queue[correlationId].resolve(); | ||
delete self.queue[correlationId]; | ||
} | ||
@@ -157,3 +159,3 @@ }); | ||
Connection.prototype._receive = function (data) { | ||
var length; | ||
var length, correlationId; | ||
@@ -188,4 +190,7 @@ if (!this.connected) { | ||
this.queue.shift().resolve(data.slice(4, length + 4)); | ||
correlationId = data.readInt32BE(4); | ||
this.queue[correlationId].resolve(data.slice(4, length + 4)); | ||
delete this.queue[correlationId]; | ||
if (data.length > 4 + length) { | ||
@@ -192,0 +197,0 @@ this._receive(data.slice(length + 4)); |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "2.7.0", | ||
"version": "2.7.1", | ||
"main": "./lib/index.js", | ||
@@ -12,0 +12,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
243851
4374