Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@ovotech/castle

Package Overview
Dependencies
Maintainers
180
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@ovotech/castle - npm Package Compare versions

Comparing version 0.5.2 to 0.6.0

11

dist/castle.d.ts
/// <reference types="node" />
import { ProducerRecord } from 'kafkajs';
import { AvroProducerRecordSchema } from '@ovotech/avro-kafkajs';
import { CastleConsumerConfig, CastleConfig, Castle, CastleSender, CastleEachMessagePayload, CastleEachBatchPayload, FinalCastleConsumerConfig, OptionalCastleConsumerConfig } from './types';
import { ProducerRecord, ProducerConfig } from 'kafkajs';
import { AvroKafka, AvroProducer, AvroProducerRecordSchema } from '@ovotech/avro-kafkajs';
import { CastleConsumerConfig, CastleConfig, Castle, CastleConsumer, CastleSender, CastleEachMessagePayload, CastleEachBatchPayload, FinalCastleConsumerConfig, OptionalCastleConsumerConfig, CastleParts } from './types';
export declare const produce: <T>(config: Omit<ProducerRecord, 'messages'> & AvroProducerRecordSchema) => CastleSender<T, string | Buffer | null | undefined>;

@@ -10,2 +10,7 @@ export declare const toFinalCastleConsumerConfig: (config: CastleConsumerConfig) => FinalCastleConsumerConfig;

export declare const consumeEachBatch: <TValue, TContext extends {} = {}, TKey = Buffer>(config: (payload: CastleEachBatchPayload<TValue, TKey> & TContext) => Promise<void>) => (payload: CastleEachBatchPayload<TValue, TKey> & TContext) => Promise<void>;
export declare const createKafka: (config: CastleConfig) => AvroKafka;
export declare const createProducer: (kafka: AvroKafka, config?: ProducerConfig | undefined) => AvroProducer;
export declare const createConsumers: (kafka: AvroKafka, config: CastleConsumerConfig[]) => CastleConsumer[];
export declare const toCastleParts: (config: CastleConfig) => CastleParts;
export declare const createCastleFromParts: (parts: CastleParts) => Castle;
export declare const createCastle: (config: CastleConfig) => Castle;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.createCastle = exports.consumeEachBatch = exports.consumeEachMessage = exports.optionalConsumers = exports.toFinalCastleConsumerConfig = exports.produce = void 0;
exports.createCastle = exports.createCastleFromParts = exports.toCastleParts = exports.createConsumers = exports.createProducer = exports.createKafka = exports.consumeEachBatch = exports.consumeEachMessage = exports.optionalConsumers = exports.toFinalCastleConsumerConfig = exports.produce = void 0;
const kafkajs_1 = require("kafkajs");

@@ -33,7 +33,24 @@ const avro_kafkajs_1 = require("@ovotech/avro-kafkajs");

exports.consumeEachBatch = (config) => config;
exports.createCastle = (config) => {
exports.createKafka = (config) => {
const schemaRegistry = new avro_kafkajs_1.SchemaRegistry(config.schemaRegistry);
return new avro_kafkajs_1.AvroKafka(schemaRegistry, new kafkajs_1.Kafka(config.kafka), config.topicsAlias);
};
exports.createProducer = (kafka, config) => kafka.producer(config);
exports.createConsumers = (kafka, config) => config.map((consumerConfig) => {
const finalConfig = exports.toFinalCastleConsumerConfig(consumerConfig);
const instance = kafka.consumer(finalConfig);
return { instance, config: finalConfig };
});
exports.toCastleParts = (config) => {
var _a;
const kafka = exports.createKafka(config);
return {
kafka,
producer: exports.createProducer(kafka, config.producer),
consumers: exports.createConsumers(kafka, (_a = config.consumers) !== null && _a !== void 0 ? _a : []),
};
};
exports.createCastleFromParts = (parts) => {
const servicesStatus = new Map();
const schemaRegistry = new avro_kafkajs_1.SchemaRegistry(config.schemaRegistry);
const kafka = new avro_kafkajs_1.AvroKafka(schemaRegistry, new kafkajs_1.Kafka(config.kafka), config.topicsAlias);
const producer = kafka.producer(config.producer);
const { producer, consumers, kafka } = parts;
servicesStatus.set(producer, false);

@@ -43,10 +60,7 @@ producer.on('producer.connect', () => servicesStatus.set(producer, true));

producer.on('producer.network.request', () => servicesStatus.set(producer, true));
const consumers = (config.consumers || []).map((config) => {
const finalConfig = exports.toFinalCastleConsumerConfig(config);
const instance = kafka.consumer(finalConfig);
servicesStatus.set(instance, false);
instance.on('consumer.connect', () => servicesStatus.set(instance, true));
instance.on('consumer.disconnect', () => servicesStatus.set(instance, false));
return { instance, config: finalConfig };
});
for (const consumer of consumers) {
servicesStatus.set(consumer.instance, false);
consumer.instance.on('consumer.connect', () => servicesStatus.set(consumer.instance, true));
consumer.instance.on('consumer.disconnect', () => servicesStatus.set(consumer.instance, false));
}
const run = async () => {

@@ -72,1 +86,2 @@ await Promise.all(consumers.map(async ({ instance, config }) => {

};
exports.createCastle = (config) => exports.createCastleFromParts(exports.toCastleParts(config));

@@ -1,4 +0,4 @@

export { createCastle, produce, consumeEachMessage, consumeEachBatch, optionalConsumers, } from './castle';
export { createCastle, produce, consumeEachMessage, consumeEachBatch, optionalConsumers, createKafka, createProducer, createConsumers, createCastleFromParts, } from './castle';
export { CastleEachMessagePayload, CastleEachBatchPayload, Resolver, Middleware, CastleConsumerConfig, CastleConsumer, CastleConfig, Castle, } from './types';
export { describeCastle } from './describe';
export { createLogging, toLogCreator, LoggingContext, LoggerOptions, Logger, } from './middlewares/logging';

@@ -9,2 +9,6 @@ "use strict";

Object.defineProperty(exports, "optionalConsumers", { enumerable: true, get: function () { return castle_1.optionalConsumers; } });
Object.defineProperty(exports, "createKafka", { enumerable: true, get: function () { return castle_1.createKafka; } });
Object.defineProperty(exports, "createProducer", { enumerable: true, get: function () { return castle_1.createProducer; } });
Object.defineProperty(exports, "createConsumers", { enumerable: true, get: function () { return castle_1.createConsumers; } });
Object.defineProperty(exports, "createCastleFromParts", { enumerable: true, get: function () { return castle_1.createCastleFromParts; } });
var describe_1 = require("./describe");

@@ -11,0 +15,0 @@ Object.defineProperty(exports, "describeCastle", { enumerable: true, get: function () { return describe_1.describeCastle; } });

@@ -41,2 +41,7 @@ import { AvroProducer, AvroEachMessagePayload, AvroEachBatchPayload, SchemaRegistryConfig, AvroConsumer, AvroConsumerRun, AvroMessage, AvroKafka, TopicsAlias } from '@ovotech/avro-kafkajs';

}
export interface CastleParts {
kafka: AvroKafka;
producer: AvroProducer;
consumers: CastleConsumer[];
}
export interface CastleService {

@@ -43,0 +48,0 @@ connect(): Promise<void>;

{
"name": "@ovotech/castle",
"version": "0.5.2",
"version": "0.6.0",
"main": "dist/index.js",

@@ -48,3 +48,3 @@ "types": "dist/index.d.ts",

},
"gitHead": "5111becfcf0c060268c5607ad466383730f0eaec"
"gitHead": "09e86adb1f765cb5c6eb91de60cb6ae5e8e9f356"
}

@@ -293,2 +293,55 @@ # Castle

### Splitting the castle
Sometimes you would want to use the producer / consumers instances independantly, before you assemble the main castle instance.
For example if you also have a service that uses castle to produce messages, but the castle instance needs to use that service too. You can split the castle instance, creating the producer first, passing it to where its needed, then combining the rest into a castle sintance.
> [examples/parts.ts](examples/parts.ts)
```typescript
import {
produce,
consumeEachMessage,
describeCastle,
createKafka,
createProducer,
createConsumers,
createCastleFromParts,
} from '@ovotech/castle';
import { Event, EventSchema } from './avro';
// Define producers as pure functions
// With statically setting the typescript types and avro schemas
const mySender = produce<Event>({ topic: 'my-topic-1', schema: EventSchema });
// Define consumers as pure functions
// With statically setting which types it will accept
const eachEvent = consumeEachMessage<Event>(async ({ message }) => {
console.log(message.value);
});
const main = async () => {
const kafka = createKafka({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'] },
});
const producer = createProducer(kafka);
const consumers = createConsumers(kafka, [
{ topic: 'my-topic-1', groupId: 'my-group-1', eachMessage: eachEvent },
]);
const castle = createCastleFromParts({ kafka, producer, consumers });
// Start all consumers and producers
await castle.start();
console.log(describeCastle(castle));
// You can use the stand alone producer elsewhere
await mySender(producer, [{ value: { field1: 'my-string' } }]);
};
main();
```
## Running the tests

@@ -295,0 +348,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc