@ovotech/castle
Advanced tools
Comparing version 0.5.2 to 0.6.0
/// <reference types="node" /> | ||
import { ProducerRecord } from 'kafkajs'; | ||
import { AvroProducerRecordSchema } from '@ovotech/avro-kafkajs'; | ||
import { CastleConsumerConfig, CastleConfig, Castle, CastleSender, CastleEachMessagePayload, CastleEachBatchPayload, FinalCastleConsumerConfig, OptionalCastleConsumerConfig } from './types'; | ||
import { ProducerRecord, ProducerConfig } from 'kafkajs'; | ||
import { AvroKafka, AvroProducer, AvroProducerRecordSchema } from '@ovotech/avro-kafkajs'; | ||
import { CastleConsumerConfig, CastleConfig, Castle, CastleConsumer, CastleSender, CastleEachMessagePayload, CastleEachBatchPayload, FinalCastleConsumerConfig, OptionalCastleConsumerConfig, CastleParts } from './types'; | ||
export declare const produce: <T>(config: Omit<ProducerRecord, 'messages'> & AvroProducerRecordSchema) => CastleSender<T, string | Buffer | null | undefined>; | ||
@@ -10,2 +10,7 @@ export declare const toFinalCastleConsumerConfig: (config: CastleConsumerConfig) => FinalCastleConsumerConfig; | ||
export declare const consumeEachBatch: <TValue, TContext extends {} = {}, TKey = Buffer>(config: (payload: CastleEachBatchPayload<TValue, TKey> & TContext) => Promise<void>) => (payload: CastleEachBatchPayload<TValue, TKey> & TContext) => Promise<void>; | ||
export declare const createKafka: (config: CastleConfig) => AvroKafka; | ||
export declare const createProducer: (kafka: AvroKafka, config?: ProducerConfig | undefined) => AvroProducer; | ||
export declare const createConsumers: (kafka: AvroKafka, config: CastleConsumerConfig[]) => CastleConsumer[]; | ||
export declare const toCastleParts: (config: CastleConfig) => CastleParts; | ||
export declare const createCastleFromParts: (parts: CastleParts) => Castle; | ||
export declare const createCastle: (config: CastleConfig) => Castle; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.createCastle = exports.consumeEachBatch = exports.consumeEachMessage = exports.optionalConsumers = exports.toFinalCastleConsumerConfig = exports.produce = void 0; | ||
exports.createCastle = exports.createCastleFromParts = exports.toCastleParts = exports.createConsumers = exports.createProducer = exports.createKafka = exports.consumeEachBatch = exports.consumeEachMessage = exports.optionalConsumers = exports.toFinalCastleConsumerConfig = exports.produce = void 0; | ||
const kafkajs_1 = require("kafkajs"); | ||
@@ -33,7 +33,24 @@ const avro_kafkajs_1 = require("@ovotech/avro-kafkajs"); | ||
exports.consumeEachBatch = (config) => config; | ||
exports.createCastle = (config) => { | ||
exports.createKafka = (config) => { | ||
const schemaRegistry = new avro_kafkajs_1.SchemaRegistry(config.schemaRegistry); | ||
return new avro_kafkajs_1.AvroKafka(schemaRegistry, new kafkajs_1.Kafka(config.kafka), config.topicsAlias); | ||
}; | ||
exports.createProducer = (kafka, config) => kafka.producer(config); | ||
exports.createConsumers = (kafka, config) => config.map((consumerConfig) => { | ||
const finalConfig = exports.toFinalCastleConsumerConfig(consumerConfig); | ||
const instance = kafka.consumer(finalConfig); | ||
return { instance, config: finalConfig }; | ||
}); | ||
exports.toCastleParts = (config) => { | ||
var _a; | ||
const kafka = exports.createKafka(config); | ||
return { | ||
kafka, | ||
producer: exports.createProducer(kafka, config.producer), | ||
consumers: exports.createConsumers(kafka, (_a = config.consumers) !== null && _a !== void 0 ? _a : []), | ||
}; | ||
}; | ||
exports.createCastleFromParts = (parts) => { | ||
const servicesStatus = new Map(); | ||
const schemaRegistry = new avro_kafkajs_1.SchemaRegistry(config.schemaRegistry); | ||
const kafka = new avro_kafkajs_1.AvroKafka(schemaRegistry, new kafkajs_1.Kafka(config.kafka), config.topicsAlias); | ||
const producer = kafka.producer(config.producer); | ||
const { producer, consumers, kafka } = parts; | ||
servicesStatus.set(producer, false); | ||
@@ -43,10 +60,7 @@ producer.on('producer.connect', () => servicesStatus.set(producer, true)); | ||
producer.on('producer.network.request', () => servicesStatus.set(producer, true)); | ||
const consumers = (config.consumers || []).map((config) => { | ||
const finalConfig = exports.toFinalCastleConsumerConfig(config); | ||
const instance = kafka.consumer(finalConfig); | ||
servicesStatus.set(instance, false); | ||
instance.on('consumer.connect', () => servicesStatus.set(instance, true)); | ||
instance.on('consumer.disconnect', () => servicesStatus.set(instance, false)); | ||
return { instance, config: finalConfig }; | ||
}); | ||
for (const consumer of consumers) { | ||
servicesStatus.set(consumer.instance, false); | ||
consumer.instance.on('consumer.connect', () => servicesStatus.set(consumer.instance, true)); | ||
consumer.instance.on('consumer.disconnect', () => servicesStatus.set(consumer.instance, false)); | ||
} | ||
const run = async () => { | ||
@@ -72,1 +86,2 @@ await Promise.all(consumers.map(async ({ instance, config }) => { | ||
}; | ||
exports.createCastle = (config) => exports.createCastleFromParts(exports.toCastleParts(config)); |
@@ -1,4 +0,4 @@ | ||
export { createCastle, produce, consumeEachMessage, consumeEachBatch, optionalConsumers, } from './castle'; | ||
export { createCastle, produce, consumeEachMessage, consumeEachBatch, optionalConsumers, createKafka, createProducer, createConsumers, createCastleFromParts, } from './castle'; | ||
export { CastleEachMessagePayload, CastleEachBatchPayload, Resolver, Middleware, CastleConsumerConfig, CastleConsumer, CastleConfig, Castle, } from './types'; | ||
export { describeCastle } from './describe'; | ||
export { createLogging, toLogCreator, LoggingContext, LoggerOptions, Logger, } from './middlewares/logging'; |
@@ -9,2 +9,6 @@ "use strict"; | ||
Object.defineProperty(exports, "optionalConsumers", { enumerable: true, get: function () { return castle_1.optionalConsumers; } }); | ||
Object.defineProperty(exports, "createKafka", { enumerable: true, get: function () { return castle_1.createKafka; } }); | ||
Object.defineProperty(exports, "createProducer", { enumerable: true, get: function () { return castle_1.createProducer; } }); | ||
Object.defineProperty(exports, "createConsumers", { enumerable: true, get: function () { return castle_1.createConsumers; } }); | ||
Object.defineProperty(exports, "createCastleFromParts", { enumerable: true, get: function () { return castle_1.createCastleFromParts; } }); | ||
var describe_1 = require("./describe"); | ||
@@ -11,0 +15,0 @@ Object.defineProperty(exports, "describeCastle", { enumerable: true, get: function () { return describe_1.describeCastle; } }); |
@@ -41,2 +41,7 @@ import { AvroProducer, AvroEachMessagePayload, AvroEachBatchPayload, SchemaRegistryConfig, AvroConsumer, AvroConsumerRun, AvroMessage, AvroKafka, TopicsAlias } from '@ovotech/avro-kafkajs'; | ||
} | ||
export interface CastleParts { | ||
kafka: AvroKafka; | ||
producer: AvroProducer; | ||
consumers: CastleConsumer[]; | ||
} | ||
export interface CastleService { | ||
@@ -43,0 +48,0 @@ connect(): Promise<void>; |
{ | ||
"name": "@ovotech/castle", | ||
"version": "0.5.2", | ||
"version": "0.6.0", | ||
"main": "dist/index.js", | ||
@@ -48,3 +48,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "5111becfcf0c060268c5607ad466383730f0eaec" | ||
"gitHead": "09e86adb1f765cb5c6eb91de60cb6ae5e8e9f356" | ||
} |
@@ -293,2 +293,55 @@ # Castle | ||
### Splitting the castle | ||
Sometimes you would want to use the producer / consumers instances independantly, before you assemble the main castle instance. | ||
For example if you also have a service that uses castle to produce messages, but the castle instance needs to use that service too. You can split the castle instance, creating the producer first, passing it to where its needed, then combining the rest into a castle sintance. | ||
> [examples/parts.ts](examples/parts.ts) | ||
```typescript | ||
import { | ||
produce, | ||
consumeEachMessage, | ||
describeCastle, | ||
createKafka, | ||
createProducer, | ||
createConsumers, | ||
createCastleFromParts, | ||
} from '@ovotech/castle'; | ||
import { Event, EventSchema } from './avro'; | ||
// Define producers as pure functions | ||
// With statically setting the typescript types and avro schemas | ||
const mySender = produce<Event>({ topic: 'my-topic-1', schema: EventSchema }); | ||
// Define consumers as pure functions | ||
// With statically setting which types it will accept | ||
const eachEvent = consumeEachMessage<Event>(async ({ message }) => { | ||
console.log(message.value); | ||
}); | ||
const main = async () => { | ||
const kafka = createKafka({ | ||
schemaRegistry: { uri: 'http://localhost:8081' }, | ||
kafka: { brokers: ['localhost:29092'] }, | ||
}); | ||
const producer = createProducer(kafka); | ||
const consumers = createConsumers(kafka, [ | ||
{ topic: 'my-topic-1', groupId: 'my-group-1', eachMessage: eachEvent }, | ||
]); | ||
const castle = createCastleFromParts({ kafka, producer, consumers }); | ||
// Start all consumers and producers | ||
await castle.start(); | ||
console.log(describeCastle(castle)); | ||
// You can use the stand alone producer elsewhere | ||
await mySender(producer, [{ value: { field1: 'my-string' } }]); | ||
}; | ||
main(); | ||
``` | ||
## Running the tests | ||
@@ -295,0 +348,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
32971
356
375