Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs-stream

Package Overview
Dependencies
Maintainers
1
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs-stream - npm Package Compare versions

Comparing version 0.0.6 to 0.0.7

7

CHANGELOG.md

@@ -5,2 +5,9 @@ # Changelog

### [0.0.7](https://github.com/kambing86/kafkajs-stream/compare/v0.0.6...v0.0.7) (2019-12-18)
### Bug Fixes
* add crash retry for ConsumerStream ([a583544](https://github.com/kambing86/kafkajs-stream/commit/a583544ac4566a5a1fbe075ea16eba082a44b668))
### [0.0.6](https://github.com/kambing86/kafkajs-stream/compare/v0.0.5...v0.0.6) (2019-12-16)

@@ -7,0 +14,0 @@

9

dist/ConsumerStream.d.ts
/// <reference types="node" />
import { ConsumerConfig, Kafka } from "kafkajs";
import { Readable } from "stream";
import { ConsumerConfig, Kafka } from 'kafkajs';
import { Readable } from 'stream';
export declare class ConsumerStream extends Readable {

@@ -13,2 +13,4 @@ constructor(kafka: Kafka, options: {

private consumer;
private kafka;
private config?;
private topic;

@@ -18,5 +20,8 @@ private connected;

private paused;
private init;
_read(): void;
private start;
private onCrash;
private run;
_destroy(error: Error | null): void;
}

@@ -16,4 +16,13 @@ "use strict";

super();
this.consumer = kafka.consumer(options.config);
this.onCrash = (err) => __awaiter(this, void 0, void 0, function* () {
console.error(err);
this.init();
yield this.start();
});
this.kafka = kafka;
this.config = options.config;
this.topic = options.topic;
this.init();
}
init() {
this.connected = false;

@@ -26,17 +35,3 @@ this.started = false;

try {
if (!this.connected) {
this.connected = true;
yield this.consumer.connect();
yield this.consumer.subscribe(this.topic);
this.consumer.on("consumer.crash", err => {
this.destroy(err);
});
}
if (!this.started) {
this.started = true;
yield this.run();
}
if (this.paused) {
this.paused = false;
}
yield this.start();
}

@@ -48,2 +43,20 @@ catch (e) {

}
start() {
return __awaiter(this, void 0, void 0, function* () {
if (!this.connected) {
this.connected = true;
this.consumer = this.kafka.consumer(this.config);
yield this.consumer.connect();
yield this.consumer.subscribe(this.topic);
this.consumer.on('consumer.crash', this.onCrash);
}
if (!this.started) {
this.started = true;
yield this.run();
}
if (this.paused) {
this.paused = false;
}
});
}
run() {

@@ -68,3 +81,3 @@ return __awaiter(this, void 0, void 0, function* () {

}
})
}),
});

@@ -71,0 +84,0 @@ });

@@ -1,2 +0,2 @@

export { ConsumerStream } from "./ConsumerStream";
export { ProducerStream } from "./ProducerStream";
export { ConsumerStream } from './ConsumerStream';
export { ProducerStream } from './ProducerStream';
/// <reference types="node" />
import { CompressionTypes, Kafka, ProducerConfig } from "kafkajs";
import { Writable } from "stream";
import { CompressionTypes, Kafka, ProducerConfig } from 'kafkajs';
import { Writable } from 'stream';
export declare class ProducerStream extends Writable {

@@ -5,0 +5,0 @@ constructor(kafka: Kafka, options: {

@@ -25,3 +25,3 @@ "use strict";

}
if (typeof encoding === "string") {
if (typeof encoding === 'string') {
return super.write(value, encoding, callback);

@@ -41,4 +41,3 @@ }

compression: this.compression,
// @ts-ignore Buffer.from with Uint8Array | Buffer | string has issue
messages: chunks.map(({ chunk }) => ({ value: Buffer.from(chunk) }))
messages: chunks.map(({ chunk }) => ({ value: Buffer.from(chunk) })),
});

@@ -45,0 +44,0 @@ callback(null);

/// <reference types="node" />
import { ConsumerConfig, Kafka } from "kafkajs";
import { Readable } from "stream";
import { ConsumerConfig, Kafka } from 'kafkajs';
import { Readable } from 'stream';
export declare class ConsumerStream extends Readable {

@@ -13,2 +13,4 @@ constructor(kafka: Kafka, options: {

private consumer;
private kafka;
private config?;
private topic;

@@ -18,5 +20,8 @@ private connected;

private paused;
private init;
_read(): void;
private start;
private onCrash;
private run;
_destroy(error: Error | null): void;
}

@@ -1,7 +0,16 @@

import { Readable } from "stream";
import { Readable } from 'stream';
export class ConsumerStream extends Readable {
constructor(kafka, options) {
super();
this.consumer = kafka.consumer(options.config);
this.onCrash = async (err) => {
console.error(err);
this.init();
await this.start();
};
this.kafka = kafka;
this.config = options.config;
this.topic = options.topic;
this.init();
}
init() {
this.connected = false;

@@ -14,17 +23,3 @@ this.started = false;

try {
if (!this.connected) {
this.connected = true;
await this.consumer.connect();
await this.consumer.subscribe(this.topic);
this.consumer.on("consumer.crash", err => {
this.destroy(err);
});
}
if (!this.started) {
this.started = true;
await this.run();
}
if (this.paused) {
this.paused = false;
}
await this.start();
}

@@ -36,2 +31,18 @@ catch (e) {

}
async start() {
if (!this.connected) {
this.connected = true;
this.consumer = this.kafka.consumer(this.config);
await this.consumer.connect();
await this.consumer.subscribe(this.topic);
this.consumer.on('consumer.crash', this.onCrash);
}
if (!this.started) {
this.started = true;
await this.run();
}
if (this.paused) {
this.paused = false;
}
}
async run() {

@@ -55,3 +66,3 @@ await this.consumer.run({

}
}
},
});

@@ -58,0 +69,0 @@ }

@@ -1,2 +0,2 @@

export { ConsumerStream } from "./ConsumerStream";
export { ProducerStream } from "./ProducerStream";
export { ConsumerStream } from './ConsumerStream';
export { ProducerStream } from './ProducerStream';

@@ -1,3 +0,3 @@

export { ConsumerStream } from "./ConsumerStream";
export { ProducerStream } from "./ProducerStream";
export { ConsumerStream } from './ConsumerStream';
export { ProducerStream } from './ProducerStream';
//# sourceMappingURL=index.js.map
/// <reference types="node" />
import { CompressionTypes, Kafka, ProducerConfig } from "kafkajs";
import { Writable } from "stream";
import { CompressionTypes, Kafka, ProducerConfig } from 'kafkajs';
import { Writable } from 'stream';
export declare class ProducerStream extends Writable {

@@ -5,0 +5,0 @@ constructor(kafka: Kafka, options: {

@@ -1,2 +0,2 @@

import { Writable } from "stream";
import { Writable } from 'stream';
export class ProducerStream extends Writable {

@@ -14,3 +14,3 @@ constructor(kafka, options) {

}
if (typeof encoding === "string") {
if (typeof encoding === 'string') {
return super.write(value, encoding, callback);

@@ -30,4 +30,3 @@ }

compression: this.compression,
// @ts-ignore Buffer.from with Uint8Array | Buffer | string has issue
messages: chunks.map(({ chunk }) => ({ value: Buffer.from(chunk) }))
messages: chunks.map(({ chunk }) => ({ value: Buffer.from(chunk) })),
});

@@ -34,0 +33,0 @@ callback(null);

{
"name": "kafkajs-stream",
"version": "0.0.6",
"version": "0.0.7",
"description": "Stream for kafkajs",

@@ -34,2 +34,3 @@ "main": "dist/index.js",

"kafkajs": "^1.11.0",
"prettier": "^1.19.1",
"standard-version": "^7.0.1",

@@ -36,0 +37,0 @@ "typescript": "^3.7.3"

@@ -18,14 +18,13 @@ # Stream for Kafka.js in Node.js

```ts
import fs from "fs";
import http from "http";
import gracefulShutdown from "http-graceful-shutdown";
import { CompressionCodecs, CompressionTypes, Kafka } from "kafkajs";
import SnappyCodec from "kafkajs-snappy";
import { times } from "lodash";
import { from } from "rxjs";
import { rxToStream } from "rxjs-stream";
import app from "./app";
import fs from 'fs';
import http from 'http';
import gracefulShutdown from 'http-graceful-shutdown';
import { CompressionCodecs, CompressionTypes, Kafka } from 'kafkajs';
import SnappyCodec from 'kafkajs-snappy';
import { ConsumerStream, ProducerStream } from 'kafkajs-stream';
import { range } from 'rxjs';
import { rxToStream } from 'rxjs-stream';
import { map } from 'rxjs/operators';
import app from './app';
import { ConsumerStream, ProducerStream } from "kafkajs-stream";
const { PORT = 4000 } = process.env;

@@ -35,9 +34,9 @@

const kafka = new Kafka({
clientId: "my-app",
brokers: ["bitnami-kafka:9092"]
clientId: 'my-app',
brokers: ['bitnami-kafka:9092'],
});
// Producing
const producerStream = new ProducerStream(kafka, { topic: "test-topic" });
const number$ = from(times(200000, i => `${i.toString()},`));
const producerStream = new ProducerStream(kafka, { topic: 'test-topic' });
const number$ = range(0, 200000).pipe(map(i => `${i.toString()},`));
rxToStream(number$).pipe(producerStream);

@@ -47,14 +46,20 @@

const consumerStream = new ConsumerStream(kafka, {
config: { groupId: "test-group" },
topic: { topic: "test-topic", fromBeginning: true }
config: { groupId: 'test-group' },
topic: { topic: 'test-topic', fromBeginning: true },
});
const producer2Stream = new ProducerStream(kafka, { topic: "test-topic-2" });
const producer2Stream = new ProducerStream(kafka, { topic: 'test-topic-2' });
consumerStream.pipe(producer2Stream);
consumerStream.on('error', err => {
console.error('consumerStream', err);
});
const consumerStream2 = new ConsumerStream(kafka, {
config: { groupId: "test-group-2" },
topic: { topic: "test-topic-2", fromBeginning: true }
config: { groupId: 'test-group-2' },
topic: { topic: 'test-topic-2', fromBeginning: true },
});
const writeStream = fs.createWriteStream("./testWrite.txt");
const writeStream = fs.createWriteStream('./testWrite.txt');
consumerStream2.pipe(writeStream);
consumerStream2.on('error', err => {
console.error('consumerStream2', err);
});

@@ -73,6 +78,6 @@ (async () => {

await new Promise(resolve => writeStream.end(resolve));
}
},
});
server.on("listening", () => {
server.on('listening', () => {
console.log(`application is listening on port ${PORT}`);

@@ -79,0 +84,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc