
Security News
Open Source Maintainers Feeling the Weight of the EU’s Cyber Resilience Act
The EU Cyber Resilience Act is prompting compliance requests that open source maintainers may not be obligated or equipped to handle.
python-schema-registry-client
Advanced tools
Python Rest Client to interact against Schema Registry confluent server
Python Rest Client to interact against schema-registry confluent server to manage Avro and JSON schemas resources.
python 3.8+
pip install python-schema-registry-client
If you want the Faust
functionality:
pip install python-schema-registry-client[faust]
Note that this will automatically add a dependency on the faust-streaming fork of faust. If you want to use the
old faust version, simply install it manually and then install python-schema-registry-client
without the faust
extra enabled, the functionality will
be the same.
Documentation: https://marcosschroh.github.io/python-schema-registry-client.io
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = client.register("test-deployment", avro_schema)
or async
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", avro_schema)
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = client.register("test-deployment", json_schema)
or async
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", json_schema)
You can generate the avro schema
directely from a python class using dataclasses-avroschema
and use it in the API for register schemas
, check versions
and test compatibility
:
import dataclasses
from dataclasses_avroschema import AvroModel, types
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
@dataclasses.dataclass
class UserAdvance(AvroModel):
name: str
age: int
pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')
compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)
# >>> True
You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:
import typing
from enum import Enum
from pydantic import BaseModel
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class ColorEnum(str, Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
class UserAdvance(BaseModel):
name: str
age: int
pets: typing.List[str] = ["dog", "cat"]
accounts: typing.Dict[str, int] = {"key": 1}
has_car: bool = False
favorite_colors: ColorEnum = ColorEnum.BLUE
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)
compatibility = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatibility)
# >>> True
You can use AvroMessageSerializer
to encode/decode messages in avro
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)
avro_user_schema = schema.AvroSchema({
"type": "record",
"namespace": "com.example",
"name": "AvroUsers",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "age", "type": "int"},
],
})
# We want to encode the user_record with avro_user_schema
user_record = {
"first_name": "my_first_name",
"last_name": "my_last_name",
"age": 20,
}
# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
"user", avro_user_schema, user_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('
or with json schemas
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)
json_schema = schema.JsonSchema({
"definitions" : {
"record:python.test.basic.basic" : {
"description" : "basic schema for tests",
"type" : "object",
"required" : [ "number", "name" ],
"properties" : {
"number" : {
"oneOf" : [ {
"type" : "integer"
}, {
"type" : "null"
} ]
},
"name" : {
"oneOf" : [ {
"type" : "string"
} ]
}
}
}
},
"$ref" : "#/definitions/record:python.test.basic.basic"
})
# Encode the record
basic_record = {
"number": 10,
"name": "a_name",
}
message_encoded = json_message_serializer.encode_record_with_schema(
"basic", json_schema, basic_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'
Usually, we have a situation like this:
So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust
application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server
to do that for us. In this scenario, the MessageSerializer
is perfect.
Also, could be a use case that we would like to have an Application only to administrate Avro Schemas
(register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient
is perfect.
Poetry is needed to install the dependencies and develope locally
poetry install --all-extras
./scripts/format
./scripts/test
For commit messages we use commitizen in order to standardize a way of committing rules
Note: The tests are run against the Schema Server
using docker compose
, so you will need
Docker
and Docker Compose
installed.
In a terminal run docker-compose up
. Then in a different terminal run the tests:
./scripts/test
All additional args will be passed to pytest, for example:
./scripts/test ./tests/client/
To perform tests using the python shell you can run the project using docker-compose
.
docker-compose up
. Then, the schema registry server
will run on http://127.0.0.1:8081
, then you can interact against it using the SchemaRegistryClient
:python
in your command line)schema server
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
# do some operations with the client...
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1
Then, you can check the schema using your browser going to the url http://127.0.0.1:8081/schemas/ids/1
FAQs
Python Rest Client to interact against Schema Registry confluent server
We found that python-schema-registry-client demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The EU Cyber Resilience Act is prompting compliance requests that open source maintainers may not be obligated or equipped to handle.
Security News
Crates.io adds Trusted Publishing support, enabling secure GitHub Actions-based crate releases without long-lived API tokens.
Research
/Security News
Undocumented protestware found in 28 npm packages disrupts UI for Russian-language users visiting Russian and Belarusian domains.