@arque/kafka-stream-adapter
Advanced tools
Comparing version 0.5.0 to 0.6.0
@@ -15,3 +15,4 @@ import { StreamAdapter, Subscriber } from '@arque/core'; | ||
private readonly joser; | ||
private producerPromise; | ||
private _producer; | ||
private _init; | ||
constructor(opts?: Partial<Options>); | ||
@@ -26,2 +27,3 @@ subscribe(stream: string, handle: (event: Event) => Promise<void>, opts?: { | ||
}): Promise<Subscriber>; | ||
init(): Promise<void>; | ||
private producer; | ||
@@ -28,0 +30,0 @@ sendEvents(params: { |
@@ -10,4 +10,4 @@ "use strict"; | ||
const joser_1 = require("@scaleforge/joser"); | ||
const exponential_backoff_1 = require("exponential-backoff"); | ||
const assert_1 = tslib_1.__importDefault(require("assert")); | ||
const exponential_backoff_1 = require("exponential-backoff"); | ||
class KafkaStreamAdapter { | ||
@@ -17,8 +17,7 @@ constructor(opts) { | ||
this.logger = { | ||
info: (0, debug_1.debug)('KafkaStreamAdapter:info'), | ||
error: (0, debug_1.debug)('KafkaStreamAdapter:error'), | ||
warn: (0, debug_1.debug)('KafkaStreamAdapter:warn'), | ||
verbose: (0, debug_1.debug)('KafkaStreamAdapter:verbose'), | ||
info: (0, debug_1.debug)('info:KafkaStreamAdapter'), | ||
error: (0, debug_1.debug)('error:KafkaStreamAdapter'), | ||
warn: (0, debug_1.debug)('warn:KafkaStreamAdapter'), | ||
verbose: (0, debug_1.debug)('verbose:KafkaStreamAdapter'), | ||
}; | ||
this.producerPromise = null; | ||
this.opts = { | ||
@@ -98,6 +97,16 @@ prefix: (_a = opts === null || opts === void 0 ? void 0 : opts.prefix) !== null && _a !== void 0 ? _a : 'arque', | ||
} | ||
async init() { | ||
if (!this._init) { | ||
this._init = (async () => { | ||
await this.producer(); | ||
})().catch((err) => { | ||
delete this._init; | ||
throw err; | ||
}); | ||
} | ||
await this._init; | ||
} | ||
async producer() { | ||
if (!this.producerPromise) { | ||
this.producerPromise = (async () => { | ||
this.logger.info('connecting producer'); | ||
if (!this._producer) { | ||
this._producer = (async () => { | ||
const producer = this.kafka.producer({ | ||
@@ -107,6 +116,6 @@ allowAutoTopicCreation: true, | ||
retry: { | ||
maxRetryTime: 1600, | ||
maxRetryTime: 800, | ||
factor: 0.5, | ||
initialRetryTime: 100, | ||
retries: 10, | ||
retries: 5, | ||
multiplier: 2, | ||
@@ -117,13 +126,9 @@ }, | ||
await producer.connect(); | ||
producer.on(producer.events.DISCONNECT, () => { | ||
this.producerPromise = null; | ||
}); | ||
this.logger.info('producer connected'); | ||
return producer; | ||
})().catch(err => { | ||
this.producerPromise = null; | ||
})().catch((err) => { | ||
delete this._producer; | ||
throw err; | ||
}); | ||
} | ||
return this.producerPromise; | ||
return this._producer; | ||
} | ||
@@ -150,8 +155,6 @@ async sendEvents(params, opts) { | ||
async close() { | ||
this.logger.info('closing'); | ||
if (this.producerPromise) { | ||
const producer = await this.producerPromise; | ||
if (this._producer) { | ||
const producer = await this._producer; | ||
await producer.disconnect(); | ||
} | ||
this.logger.info('closed'); | ||
} | ||
@@ -158,0 +161,0 @@ } |
{ | ||
"name": "@arque/kafka-stream-adapter", | ||
"version": "0.5.0", | ||
"version": "0.6.0", | ||
"author": "ScaleForge", | ||
@@ -24,3 +24,3 @@ "homepage": "https://github.com/ScaleForge/arque#README.md", | ||
"dependencies": { | ||
"@arque/core": "^0.5.0", | ||
"@arque/core": "^0.6.0", | ||
"@scaleforge/joser": "0.5.1", | ||
@@ -36,3 +36,3 @@ "debug": "4.3.4", | ||
}, | ||
"gitHead": "a3c5c6907d96a89e191077a3c14598f2aaca656d" | ||
"gitHead": "ac65ce278706a8ff357f716b258acd71b00312e0" | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
470
38131
+ Added@arque/core@0.6.1(transitive)
- Removed@arque/core@0.5.0(transitive)
Updated@arque/core@^0.6.0