
Security News
MCP Steering Committee Launches Official MCP Registry in Preview
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.
avro-registry-client
Advanced tools
A node client to interact with the avro registry. There are methods for updating your subjects and schemas, but also some convenience methods for encoding and decoding.
A node client to interact with the avro registry. There are methods for updating your subjects and schemas, but also some convenience methods for encoding and decoding.
npm install avro-registry-client
If you're planning on using the kafka consuming/producing functionality, you'll
want to install kafka-node
as well.
You should have a local copy of the confluent avro schema registry running, which means you also need kafka and maybe zookepper running locally. More documentation on how to get that running forthcoming...sometime.
If you plan on using the .decode()
method, take special note of some of the
caveats described in the description section as it may affect how you need to
restrict schemas in your schema registry.
tl;dr All your your schemas should be unique to each subject/version. This means that you should only be using record types and you should have a name/namespace property that is unique to each subject.
const kafka = require('kafka-node')
const MY_TOPIC = 'my-topic'
const kafkaProducerClient = new kafka.KafkaClient({
kafkaHost: 'localhost:9092'
})
const kafkaConsumerClient = new kafka.KafkaClient({
kafkaHost: 'localhost:9092'
})
const kafkaProducer = new kafka.Producer(kafkaProducerClient)
const kafkaConsumer = new kafka.Consumer(
kafkaConsumerClient,
[{ topic: MY_TOPIC }],
{ encoding: 'buffer', keyEncoding: 'buffer' }
)
//
;(async function run() {
const registry = require('avro-registry-client')('http://localhost:8081')
await registry.createSubjectVersion('MySubject', {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
})
await registry.createSubjectVersion('MySubject', {
name: 'MySubjectId',
type: 'record',
fields: [
{
name: 'id',
type: 'string'
}
]
})
const mySubjectProducer = registry.createProducer(kafkaProducer, MY_TOPIC, {
valueSubject: { subject: 'MySubject', version: 'latest' },
keySubject: { subject: 'MySubjectId', version: 'latest' }
})
registry
.createConsumer(kafkaConsumer, [
{
valueSubject: { subject: 'MySubject', version: 'latest' },
keySubject: { subject: 'MySubjectId', version: 'latest' },
handler({ key, value, message }) {
console.log(key) // { id: '123' }
console.log(value) // { field1: 'my field value' }
console.log(message) // This is the raw message from kafka-node .on('message')
}
}
])
.listen(err => {
// This gets called if there is an error handling the message
})
})()
createAvroRegistryClient (registryHost)
(default export)Creates a registry client whose methods are described below.
registryHost
(String) - The host of the avro registryconst createAvroRegistryClient = require('avro-registry-client')
const registry = createAvroRegistryClient('http://localhost:8081')
createAvroRegistryClient.COMPATIBILITY_TYPES
An object whose properties are the allowed compatibility types when setting compatibility. Note that you don't have to use this object to put into the compatibility methods' argument. You can just use the raw string value. This object is just helpful for enumeration purposes.
const createAvroRegistryClient = require('avro-registry-client')
const { COMPATIBILITY_TYPES } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')
registry.setGlobalCompatibility(COMPATIBILITY_TYPES.FULL).then(() => {})
createAvroRegistryClient.errors
An object whose values are the known errors that this library will produce when parsing errors from the schema registry responses.
const createAvroRegistryClient = require('avro-registry-client')
const { errors } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')
const invalidSchema = {
// no name property
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
console.log(errors)
/**
* { RouteNotFoundError: [Function: RouteNotFoundError],
* SubjectNotFoundError: [Function: SubjectNotFoundError],
* VersionNotFoundError: [Function: VersionNotFoundError],
* SchemaNotFoundError: [Function: SchemaNotFoundError],
* IncompatibleSchemaError: [Function: IncompatibleSchemaError],
* InvalidSchemaError: [Function: InvalidSchemaError],
* InvalidVersionError: [Function: InvalidVersionError],
* InvalidCompatibilityLevelError: [Function: InvalidCompatibilityLevelError] }
*/
registry.createSubjectVersion('MyInvalidSchema', invalidSchema).catch(err => {
if (err instanceof errors.InvalidSchemaError) {
// do something with it
}
})
registry.createProducer(kafkaProducer, topic, options)
Creates a Kafka Producer that will encode the key and value with avro.
kafkaProducer
(kafka-node.Producer) - an instance of kafka-node.Producertopic
(String) - The topic this producer will publish tooptions
(Object) - An object that contains the keySubject and valueSubject
definitions. See example for detailed usage.(key: any, value: any, sendOptions: Object) => void
- Returns a function
that takes in three arguments: key
, value
, and sendOptions
.const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaProducer = new kafka.Producer(kafkaClient)
const producer = createProducer(kafkaProducer, 'my-topic', {
valueSubject: { subject: 'MySubject', version: 1 }, // Or pin to a specific version
keySubject: { subject: 'MySubjectId', version: 'latest' }
})
// You can also set the options this way:
const producer2 = createProducer(kafkaProducer, 'my-topic')
.valueSubject('MySubject', 1)
.keySubject('MySubjectId') // You may omit the second argument if you just want 'latest'
// Same api for producer2
producer({ id: '123' }, { field1: 'body' }).then(() => {
console.log('sent!')
})
registry.createConsumer(kafkaConsumer, messageTypes)
Creates a Kafka Consumer that will decode messages with avro.
kafkaConsumer
(kafka-node.Consumer|kafka-node.ConsumerGroup) - An instance
of a kafka-node consumer. Can be a Consumer
or ConsumerGroup
. Note that
when you configure the consumer, you will have already told it which topics to
listen to. Also note that the consumer must be configured to handle the keys
and values as buffers. See Example for details.messageTypes
(Object[]) - An array of message types that this consumer will
be able to handle. Message Type definitions consist of a valueSubject,
keySubject, and a handler. The reason this is an array is because it is
possible to receive multiple message types on a single topic. For more
details, see the note under registry.decode (possibleSchemas, message)
.const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaConsumer = new kafka.Producer(kafkaClient, [{ topic: 'my-topic' }], {
encoding: 'buffer',
keyEncoding: 'buffer'
})
registry
.createConsumer(kafkaConsumer, [{
valueSubject: { version: 'MySubject', version: 1 },
keySubject: { version: 'MySubjectId', version: 'latest' },
handler(({ key, value, message }) {
// The `key` is the avro decoded key
// The `value` is the avro decoded value
// The `message` is the raw message from kafka-node .on('message') handler
})
}])
.listen((err) => {
// This will be called on any error that occurs when there is an error
// handling the message. This will also get called if a message comes
// through that you have not setup a message type handler for.
})
// You can also handle passing in message types this way:
registry
.createConsumer(kafkaConsumer)
.messageType({
valueSubject: { version: 'MySubject', version: 1 },
keySubject: { version: 'MySubjectId', version: 'latest' },
handler({ key, value, message }) {}
})
.listen((err) => {})
// Or this way
registry
.createConsumer(kafkaConsumer)
.messageType()
.valueSubject('MySubject', 1)
.keySubject('MySubjectId') // No need to pass the second argument if you intend to use the latest version
.handler(({ key, value, message }) => {})
// Calling this breaks out of the messageType definition. Calling .messageType() also does this
.listen((err) => {})
.listen()
returns the result of calling kafkaConsumer.on('message')
, so it
just returns the kafkaConsumer so you can listen for error messages and the
like.
registry.encode (subject, version, value)
Encodes a value with the schema that has the given subject and subject version.
subject
(String) - The subject of the schema to encode withversion
(Number|String) - The version of the subject to encode with. Must
be a number or the string 'latest'
Promise< Buffer >
- The encoded messageregistry.encode('MySchema', 'latest', { field1: 'foobar' }).then(response => {
console.log(response) // <Buffer 00 00 00 00 01 ...>
})
registry.decode (possibleSchemas, message)
Decodes a value with the one of the given subjects defined in the handlerMap.
NOTE!!!!
The reason that you pass in an object (possibleSchemas) as the first argument is
because it is possible to send multiple "Subjects" of messages down a single
pipe, but the avro schema registry encoding only encodes the schema ID into the
message, which by itself isn't enough information to map it to a handler for a
specific subject. The possibleSchemas
is kind of a "seed" to help the decoder
figure out which subject it is mapped to so you can handle them differently.
For example:
You're sending PersonUpdated and PersonCreated encoded messages down a People kafka topic, and you want different behavior depending on whether you get a PersonCreated event or a PersonUpdated event. The messages themselves only have a schema ID encoded in them, which you can't use to lookup which subject the message was intended to be written with (limitations of the schema registry). We can get around that because there is a way to lookup to see if a schema (which we can get by the schema ID) exists in a given subject. The problem with this is that the schema registry allows a schema (with a schema ID) to belong to more than one subject/version, so it would be possible to map the message to the wrong subject.
For the above reason, this library is really only compatible with schema registries in which unique schemas are published to schema versions. With care, this can be done by following these tenets:
handlerMap
(Object.<String, Number|String>)message
(Buffer) - The message to decode.Promise< Any >
- The decoded value// message being something we got from kafka or wherever
registry.decode({ MySchema: 'latest' }, message).then(response => {
console.log(response)
/**
* { subject: 'MySchema',
* value: MySchema { field1: 'test' } }
*/
})
registry.primeSubjectByVersion (subject, version)
subject
(String) - The subject to primeversion
(Number|String) - The version to primePromise<null>
registry.primeSubjectByVersion('MySubject', 1).catch(err => {
console.log('Priming failed', err)
})
registry.primeSubjectByAllVersions (subject)
subject
(String) - The subject to primePromise<null>
registry.primeSubjectByAllVersions('MySubject').catch(err => {
console.log('Priming failed', err)
})
registry.getSchemaById (id)
Get a schema by its specific id (unique to all schemas within a registry)
id
(Number) - The id of the schema to fetchPromise< { schema: String } >
- An object with a "schema" property which is
the stringified JSON object of the schema.registry.getSchemaById(1).then(response => {
console.log(response) // { schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}' }
})
registry.listSubjects ()
Lists all available subjects in the registry
Promise< [ String ] >
- An array of subject namesregistry.listSubjects().then(response => {
console.log(response) // [ 'MySubject' ]
})
registry.listSubjectVersions (subject)
Lists the version numbers of the versions under a given subject
subject
(String) - The subject whose versions we want to checkPromise< [ Number ] >
- An array of version numbersregistry.listSubjectVersions('MySubject').then(response => {
console.log(response) // [ 1 ]
})
registry.deleteSubject (subject)
Deletes all of the versions from a subject
subject
(String) - The subject to delete a version fromPromise< [ Number ] >
- An array of version numbers that were deletedregistry.deleteSubject('MySubject').then(response => {
console.log(response) // [ 1 ]
})
registry.getSubjectByVersion (subject, version)
Get a schema by its subject and version number
subject
(String) - The subject to fetchversion
(Number|String) - The specific version number to fetch, or
'latest'
if you want the latest version. There is another method to fetch
the latest which just calls this function with 'latest'
as the version
parameterPromise< { subject: String, version: Number, id: Number, schema: String } >
-
An object with the properties describing the schema.registry.getSubjectByVersion('MySubject', 1).then(response => {
console.log(response)
/**
* {
* subject: 'MySubject',
* version: 1,
* id: 1,
* schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
* }
*/
})
registry.createSubjectVersion (subject, schema)
Registers a new version of the given subject
subject
(String) - The subject to add the schema version toschema
(Object) - The avro compliant schema to push upPromise< { id: Number } >
- An object with the unique schema id as the id
property.const schema = {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
registry.createSubjectVersion('MySubject', schema).then(response => {
console.log(response) // { id: 1 }
})
registry.checkSchemaBySubject (subject, schema)
Checks to see if a schema exists in a subject
subject
(String) - The subject to checkschema
(Object) - The avro compliant schema to checkPromise< { subject: String, version: Number, id: Number, schema: String } >
const schema = {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
registry.checkSchemaBySubject('MySubject', schema).then(response => {
console.log(response)
/**
* {
* subject: 'MySubject',
* version: 1,
* id: 1,
* schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
* }
*/
})
registry.deleteSubjectVersion (subject, version)
Deletes a version out of the given subject
subject
(String) - The subject to delete a version fromversion
(Number|String) - The specific version number to deletePromise< Number >
- The number is the version number that was deletedregistry.deleteSubjectVersion('MySubject', 1).then(response => {
console.log(response) // 1
})
registry.subjectVersionCompatibilityWithSchema (subject, version, schema)
Checks to see if a given schema is compatible with the schema at the given subject/version.
subject
(String) - The subject to delete a version fromversion
(Number|String) - The specific version number to deleteschema
(Object) - The schema to checkPromise< { is_compatible: Boolean } >
registry
.subjectVersionCompatibilityWithSchema('MySchema', 1, {
name: 'MySchema',
type: 'record',
fields: [
{
name: 'field2',
type: 'string'
}
]
})
.then(response => {
console.log(response) // { is_compatible: false }
})
registry.setGlobalCompatibility (compatibility)
Sets the default global compatibility level for all subjects (except for the ones that have their own set)
compatibility
(String) - Has to be 'NONE'
, 'BACKWARD'
, 'FORWARD'
, or 'FULL'
Promise< { compatibility: String } >
registry.setGlobalCompatibility('FULL').then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.getGlobalCompatibility ()
Sets the default global compatibility level for all subjects (except for the ones that have their own set)
Promise< { compatibility: String } >
registry.getGlobalCompatibility().then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.setSubjectCompatibility (subject, compatibility)
Sets the compatibility level for a specific subject
compatibility
(String) - Has to be 'NONE'
, 'BACKWARD'
, 'FORWARD'
, or 'FULL'
Promise< { compatibility: String } >
registry.setSubjectCompatibility('MySubject', 'FULL').then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.getSubjectCompatibility (subject)
Sets the compatibility level for a specific subject
Promise< { compatibility: String } >
registry.getSubjectCompatibility('MySubject').then(response => {
console.log(response) // { compatibility: 'FULL' }
})
FAQs
A node client to interact with the avro registry. There are methods for updating your subjects and schemas, but also some convenience methods for encoding and decoding.
We found that avro-registry-client demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.
Product
Socket’s new Pull Request Stories give security teams clear visibility into dependency risks and outcomes across scanned pull requests.
Research
/Security News
npm author Qix’s account was compromised, with malicious versions of popular packages like chalk-template, color-convert, and strip-ansi published.