@ovotech/avro-stream
Advanced tools
Comparing version 1.2.0 to 1.3.0
@@ -15,3 +15,3 @@ "use strict"; | ||
const type = this.typeForSchema(schema); | ||
const transformedMessage = Object.assign({}, message, { schema, value: type.fromBuffer(buffer) }); | ||
const transformedMessage = { ...message, schema, value: type.fromBuffer(buffer) }; | ||
callback(undefined, transformedMessage); | ||
@@ -18,0 +18,0 @@ } |
@@ -13,3 +13,3 @@ "use strict"; | ||
typeForSchema(schema) { | ||
return avsc_1.Type.forSchema(schema, Object.assign({ registry: {} }, this.schemaOptions)); | ||
return avsc_1.Type.forSchema(schema, { registry: {}, ...this.schemaOptions }); | ||
} | ||
@@ -16,0 +16,0 @@ } |
@@ -11,3 +11,6 @@ "use strict"; | ||
const schemaId = await this.resolver.toId(request.topic, request.schema); | ||
callback(undefined, Object.assign({}, request, { messages: request.messages.map(message => message_1.constructMessage({ schemaId, buffer: type.toBuffer(message) })) })); | ||
callback(undefined, { | ||
...request, | ||
messages: request.messages.map(message => message_1.constructMessage({ schemaId, buffer: type.toBuffer(message) })), | ||
}); | ||
} | ||
@@ -14,0 +17,0 @@ catch (error) { |
@@ -7,5 +7,6 @@ export { AvroDeserializer } from './AvroDeserializer'; | ||
export { AvroSerializerError } from './AvroSerializerError'; | ||
export { SchemaResolver, AvroMessage, AvroProduceRequest } from './types'; | ||
export { SchemaResolver, AvroMessage, AvroProduceRequest, AvroTopicSenderOptions } from './types'; | ||
export { constructMessage, deconstructMessage } from './message'; | ||
export { MockAvroSerializer } from './MockAvroSerializer'; | ||
export { AvroTopicSender } from './AvroTopicSender'; | ||
export { MockSchemaRegistryResolver } from './MockSchemaRegistryResolver'; |
@@ -20,4 +20,6 @@ "use strict"; | ||
exports.MockAvroSerializer = MockAvroSerializer_1.MockAvroSerializer; | ||
var AvroTopicSender_1 = require("./AvroTopicSender"); | ||
exports.AvroTopicSender = AvroTopicSender_1.AvroTopicSender; | ||
var MockSchemaRegistryResolver_1 = require("./MockSchemaRegistryResolver"); | ||
exports.MockSchemaRegistryResolver = MockSchemaRegistryResolver_1.MockSchemaRegistryResolver; | ||
//# sourceMappingURL=index.js.map |
@@ -0,1 +1,2 @@ | ||
/// <reference types="node" /> | ||
import { Schema } from 'avsc'; | ||
@@ -7,2 +8,8 @@ import { Message, ProduceRequest } from 'kafka-node'; | ||
} | ||
export interface AvroTopicSenderOptions { | ||
schema: Schema; | ||
topic: string; | ||
partition?: number; | ||
key?: string | Buffer; | ||
} | ||
export interface AvroProduceRequest<TValue = any> extends ProduceRequest { | ||
@@ -9,0 +16,0 @@ schema: Schema; |
{ | ||
"name": "@ovotech/avro-stream", | ||
"description": "Serialize/deserialize kafka-node streams with avro data, using confluent schema-registry to hold the schemas", | ||
"version": "1.2.0", | ||
"version": "1.3.0", | ||
"main": "dist/index.js", | ||
@@ -38,3 +38,3 @@ "source": "src/index.ts", | ||
"tslint-config-prettier": "^1.18.0", | ||
"typescript": "^3.3.4000", | ||
"typescript": "^3.4.3", | ||
"uuid": "^3.3.2" | ||
@@ -45,3 +45,3 @@ }, | ||
}, | ||
"gitHead": "2d8eaa2657ba67c49eb42651e591df63e1814065" | ||
"gitHead": "cc0e4c6d447c2779688410b3fea65061c081cde6" | ||
} |
@@ -132,2 +132,34 @@ # Avro Stream | ||
### AvroTopicSender | ||
You can use an `AvroTopicSender` to produce ad-hock kafka messages. | ||
```typescript | ||
import { AvroSerializer, AvroTopicSender } from '@ovotech/avro-stream'; | ||
import { ProducerStream } from 'kafka-node'; | ||
interface Message { | ||
accountId: string; | ||
} | ||
const sender = new AvroTopicSender<Message>({ | ||
topic: 'test-topic-1', | ||
partition: 0, | ||
key: 'key-1', | ||
schema: { | ||
type: 'record', | ||
name: 'TestSchema1', | ||
fields: [{ name: 'accountId', type: 'string' }], | ||
}, | ||
}); | ||
const producerStream = new ProducerStream({ kafkaClient: { kafkaHost: 'localhost:29092' } }); | ||
const serializer = new AvroSerializer('http://localhost:8081'); | ||
sender.pipe(serializer).pipe(producerStream); | ||
sender.send({ accountId: '222' }, { accountId: '111' }); | ||
sender.close(); | ||
``` | ||
## Mocks | ||
@@ -134,0 +166,0 @@ |
@@ -7,5 +7,6 @@ export { AvroDeserializer } from './AvroDeserializer'; | ||
export { AvroSerializerError } from './AvroSerializerError'; | ||
export { SchemaResolver, AvroMessage, AvroProduceRequest } from './types'; | ||
export { SchemaResolver, AvroMessage, AvroProduceRequest, AvroTopicSenderOptions } from './types'; | ||
export { constructMessage, deconstructMessage } from './message'; | ||
export { MockAvroSerializer } from './MockAvroSerializer'; | ||
export { AvroTopicSender } from './AvroTopicSender'; | ||
export { MockSchemaRegistryResolver } from './MockSchemaRegistryResolver'; |
@@ -9,2 +9,9 @@ import { Schema } from 'avsc'; | ||
export interface AvroTopicSenderOptions { | ||
schema: Schema; | ||
topic: string; | ||
partition?: number; | ||
key?: string | Buffer; | ||
} | ||
export interface AvroProduceRequest<TValue = any> extends ProduceRequest { | ||
@@ -11,0 +18,0 @@ schema: Schema; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
38134
51
491
224