Comparing version 1.5.2 to 1.6.0
@@ -8,2 +8,7 @@ # Changelog | ||
## [1.6.0] - 2019-04-01 | ||
### Added | ||
- Allow providing a socketFactory on client creation #263 | ||
- Add fetchTopicOffsets method #314 | ||
## [1.5.2] - 2019-04-01 | ||
@@ -10,0 +15,0 @@ ### Fixed |
{ | ||
"name": "kafkajs", | ||
"version": "1.5.2", | ||
"version": "1.6.0", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", |
@@ -1,10 +0,3 @@ | ||
[](https://kafka.js.org) | ||
# <a href='https://kafka.js.org'><img src='https://raw.githubusercontent.com/tulios/kafkajs/master/logoV2.png' height='60' alt='KafkaJS' aria-label='kafka.js.org' /></a> | ||
# [KafkaJS](https://kafka.js.org) | ||
[](https://travis-ci.org/tulios/kafkajs) | ||
[](https://badge.fury.io/js/kafkajs) | ||
[](https://kafkajs-slackin.herokuapp.com/) | ||
A modern Apache Kafka client for node.js. This library is compatible with Kafka `0.10+`. | ||
@@ -15,2 +8,6 @@ Native support for Kafka `0.11` features. | ||
[](https://dev.azure.com/tulios/kafkajs/_build/latest?definitionId=1&branchName=master) | ||
[](https://badge.fury.io/js/kafkajs) | ||
[](https://kafkajs-slackin.herokuapp.com/) | ||
## Features | ||
@@ -20,2 +17,3 @@ | ||
- Consumer groups with pause, resume, and seek | ||
- Transactional support for producers and consumers | ||
- Message headers | ||
@@ -87,6 +85,8 @@ - GZIP compression | ||
Thanks to [Sebastian Norde](https://github.com/sebastiannorde) for the logo ❤️ | ||
Thanks to [Sebastian Norde](https://github.com/sebastiannorde) for the V1 logo ❤️ | ||
Thanks to [Tracy (Tan Yun)](https://medium.com/@tanyuntracy) for the V2 logo ❤️ | ||
## License | ||
See [LICENSE](https://github.com/tulios/kafkajs/blob/master/LICENSE) for more details. |
@@ -177,2 +177,39 @@ const createRetry = require('../retry') | ||
/** | ||
* @param {string} topic | ||
*/ | ||
const fetchTopicOffsets = async topic => { | ||
if (!topic || typeof topic !== 'string') { | ||
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) | ||
} | ||
const retrier = createRetry(retry) | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await cluster.addTargetTopic(topic) | ||
await cluster.refreshMetadataIfNecessary() | ||
const metadata = cluster.findTopicPartitionMetadata(topic) | ||
const offsets = await cluster.fetchTopicsOffset([ | ||
{ | ||
topic, | ||
partitions: metadata.map(p => ({ partition: p.partitionId })), | ||
}, | ||
]) | ||
const { partitions } = offsets.pop() | ||
return partitions.map(({ partition, offset }) => ({ partition, offset })) | ||
} catch (e) { | ||
if (e.type === 'UNKNOWN_TOPIC_OR_PARTITION') { | ||
await cluster.refreshMetadata() | ||
throw e | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
/** | ||
* @param {string} groupId | ||
@@ -526,2 +563,3 @@ * @param {string} topic | ||
fetchOffsets, | ||
fetchTopicOffsets, | ||
setOffsets, | ||
@@ -528,0 +566,0 @@ resetOffsets, |
const Connection = require('../network/connection') | ||
module.exports = ({ | ||
socketFactory, | ||
brokers, | ||
@@ -35,2 +36,3 @@ ssl, | ||
clientId, | ||
socketFactory, | ||
connectionTimeout, | ||
@@ -37,0 +39,0 @@ requestTimeout, |
@@ -42,2 +42,3 @@ const BrokerPool = require('./brokerPool') | ||
logger: rootLogger, | ||
socketFactory, | ||
brokers, | ||
@@ -66,2 +67,3 @@ ssl, | ||
instrumentationEmitter, | ||
socketFactory, | ||
brokers, | ||
@@ -68,0 +70,0 @@ ssl, |
@@ -13,2 +13,3 @@ const { | ||
const ISOLATION_LEVEL = require('./protocol/isolationLevel') | ||
const defaultSocketFactory = require('./network/socketFactory') | ||
@@ -33,2 +34,3 @@ const PRIVATE = { | ||
retry, | ||
socketFactory = defaultSocketFactory(), | ||
logLevel = INFO, | ||
@@ -51,2 +53,4 @@ logCreator = LoggerConsole, | ||
retry: this[PRIVATE.CLUSTER_RETRY], | ||
offsets: this[PRIVATE.OFFSETS], | ||
socketFactory, | ||
brokers, | ||
@@ -66,3 +70,2 @@ ssl, | ||
isolationLevel, | ||
offsets: this[PRIVATE.OFFSETS], | ||
}) | ||
@@ -69,0 +72,0 @@ } |
@@ -39,2 +39,3 @@ const createRetry = require('../retry') | ||
logger, | ||
socketFactory, | ||
rack = null, | ||
@@ -58,2 +59,3 @@ ssl = null, | ||
this.socketFactory = socketFactory | ||
this.ssl = ssl | ||
@@ -167,2 +169,3 @@ this.sasl = sasl | ||
this.socket = createSocket({ | ||
socketFactory: this.socketFactory, | ||
host: this.host, | ||
@@ -169,0 +172,0 @@ port: this.port, |
@@ -1,18 +0,14 @@ | ||
const net = require('net') | ||
const tls = require('tls') | ||
module.exports = ({ | ||
socketFactory, | ||
host, | ||
port, | ||
ssl, | ||
onConnect, | ||
onData, | ||
onEnd, | ||
onError, | ||
onTimeout, | ||
}) => { | ||
const socket = socketFactory({ host, port, ssl, onConnect }) | ||
const KEEP_ALIVE_DELAY = 60000 // in ms | ||
module.exports = ({ host, port, ssl, onConnect, onData, onEnd, onError, onTimeout }) => { | ||
const socket = ssl | ||
? tls.connect( | ||
Object.assign({ host, port }, ssl), | ||
onConnect | ||
) | ||
: net.connect( | ||
{ host, port }, | ||
onConnect | ||
) | ||
socket.setKeepAlive(true, KEEP_ALIVE_DELAY) | ||
socket.on('data', onData) | ||
@@ -19,0 +15,0 @@ socket.on('end', onEnd) |
388148
219
11478