ESORM - Python ElasticSearch ORM based on Pydantic
ESORM is an ElasticSearch Object Relational Mapper or Object Document Mapper (ODM) if you like,
for Python based on Pydantic. It is a high-level library for managing ElasticSearch documents
in Python. It is fully async and uses annotations and type hints for type checking and IDE autocompletion.
☰ Table of Contents
💾 Installation
pip install pyesorm
🚀 Features
- Pydantic model representation of ElasticSearch documents
- Automatic mapping and index creation
- CRUD operations
- Full async support (no sync version at all)
- Mapping to and from ElasticSearch types
- Support for nested documents
- Automatic optimistic concurrency control
- Custom id field
- Context for bulk operations
- Supported IDE autocompletion and type checking (PyCharm tested)
- Everything in the source code is documented and annotated
TypedDict
s for ElasticSearch queries and aggregations- Docstring support for fields
- Shard routing support
- Lazy properties
- Support >= Python 3.8 (tested with 3.8 through 3.12)
- Support for ElasticSearch 8.x and 7.x
- Watcher support (You may need ElasticSearch subscription license for this)
- Pagination and sorting
- FastAPI integration
Not all ElasticSearch features are supported yet, pull requests are welcome.
Supported ElasticSearch versions
It is tested with ElasticSearch 7.x and 8.x.
Supported Python versions
Tested with Python 3.8 through 3.12.
📖 Usage
Define a model
You can use all Pydantic model features, because ESModel
is a subclass of pydantic.BaseModel
.
(Actually it is a subclass of ESBaseModel
, see more below...)
ESModel
extends pydantic BaseModel
with ElasticSearch specific features. It serializes and deserializes
documents to and from ElasticSearch types and handle ElasticSearch operations in the background.
Python basic types
from esorm import ESModel
class User(ESModel):
name: str
age: int
This is how the python types are converted to ES types:
Python type | ES type | Comment |
---|
str | text | |
int | long | |
float | double | |
bool | boolean | |
datetime.datetime | date | |
datetime.date | date | |
datetime.time | date | Stored as 1970-01-01 + time |
typing.Literal | keyword | |
UUID | keyword | |
Path | keyword | |
IntEnum | integer | |
Enum | keyword | also StrEnum |
Some special pydanctic types are also supported:
Pydantic type | ES type | Comment |
---|
URL | keyword | |
IPvAddressAny | ip | |
ESORM field types
You can specify ElasticSearch special fields using esorm.fields
module.
from esorm import ESModel
from esorm.fields import keyword, text, byte, geo_point
class User(ESModel):
name: text
email: keyword
age: byte
location: geo_point
...
The supported fields are:
Field name | ES type |
---|
keyword | keyword |
text | text |
binary | binary |
byte | byte |
short | short |
integer or int32 | integer |
long or int64 | long |
unsigned_long or uint64 | unsigned_long |
float16 or half_float | half_float |
float32 | float |
double | double |
boolean | boolean |
geo_point | geo_point |
The binary
field accepts base64 encoded strings. However, if you provide bytes
to it, they
will be automatically converted to a base64 string during serialization. When you retrieve the
field, it will always be a base64 encoded string. You can easily convert it back to bytes using
the bytes()
method: binary_field.bytes()
.
You can also use Annotated
types to specify the ES type, like Pydantic PositiveInt
and
NegativeInt
and similar.
geo_point
You can use geo_point field type for location data:
from esorm import ESModel
from esorm.fields import geo_point
class Place(ESModel):
name: str
location: geo_point
def create_place():
place = Place(name='Budapest', location=geo_point(lat=47.4979, long=19.0402))
place.save()
Nested documents
from esorm import ESModel
from esorm.fields import keyword, text, byte
class User(ESModel):
name: text
email: keyword
age: byte = 18
class Post(ESModel):
title: text
content: text
writer: User
List primitive fields
You can use list of primitive fields:
from typing import List
from esorm import ESModel
class User(ESModel):
emails: List[str]
favorite_ids: List[int]
...
ESBaseModel
ESBaseModel
is the base of ESModel
.
Use it for abstract models
from esorm import ESModel, ESBaseModel
from esorm.fields import keyword, text, byte
class BaseUser(ESBaseModel):
class ESConfig:
id_field = 'email'
name: text
email: keyword
class UserExtended(BaseUser, ESModel):
age: byte = 18
async def create_user():
user = UserExtended(
name='John Doe',
email="john@example.com",
age=25
)
await user.save()
Use it for nested documents
It is useful to use it for nested documents, because by using it will not be included in the
ElasticSearch index.
from esorm import ESModel, ESBaseModel
from esorm.fields import keyword, text, byte
class User(ESBaseModel):
name: text
email: keyword
age: byte = 18
class Post(ESModel):
title: text
content: text
writer: User
Id field
You can specify id field in model settings:
from esorm import ESModel
from esorm.fields import keyword, text, byte
class User(ESModel):
class ESConfig:
id_field = 'email'
name: text
email: keyword
age: byte = 18
This way the field specified in id_field
will be removed from the document and used as the document _id
in the
index.
If you specify a field named id
in your model, it will be used as the document _id
in the index
(it will automatically override the id_field
setting):
from esorm import ESModel
class User(ESModel):
id: int
name: str
You can also create an __id__
property in your model to return a custom id:
from esorm import ESModel
from esorm.fields import keyword, text, byte
class User(ESModel):
name: text
email: keyword
age: byte = 18
@property
def __id__(self) -> str:
return self.email
NOTE: annotation of __id__
method is important, and it must be declared as a property.
Model Settings
You can specify model settings using ESConfig
child class.
from typing import Optional, List, Dict, Any
from esorm import ESModel
class User(ESModel):
class ESConfig:
""" ESModel Config """
index: Optional[str] = None
id_field: Optional[str] = None
default_sort: Optional[List[Dict[str, Dict[str, str]]]] = None
settings: Optional[Dict[str, Any]] = None
lazy_property_max_recursion_depth: int = 1
ESModelTimestamp
You can use ESModelTimestamp
class to add created_at
and updated_at
fields to your model:
from esorm import ESModelTimestamp
class User(ESModelTimestamp):
name: str
age: int
These fields will be automatically updated to the actual datetime
when you create or update a document.
The created_at
field will be set only when you create a document. The updated_at
field will be set
when you create or update a document.
Describe fields
You can use the usual Pydantic
field description, but you can also use docstrings like this:
from esorm import ESModel
from esorm.fields import TextField
class User(ESModel):
name: str = 'John Doe'
""" The name of the user """
age: int = 18
""" The age of the user """
address: str = TextField(description="The address of the user")
The documentation is usseful if you create an API and you want to generate documentation from the model.
It can be used in FastAPI for example.
Aliases
You can specify aliases for fields:
from esorm import ESModel
from esorm.fields import keyword, Field
class User(ESModel):
full_name: keyword = Field(alias='fullName')
This is good for renaming fields in the model without changing the ElasticSearch field name.
Connecting to ElasticSearch
You can connect with a simple connection string:
from esorm import connect
async def es_init():
await connect('localhost:9200')
Also you can connect to multiple hosts if you have a cluster:
from esorm import connect
async def es_init():
await connect(['localhost:9200', 'localhost:9201'])
You can wait for node or cluster to be ready (recommended):
from esorm import connect
async def es_init():
await connect('localhost:9200', wait=True)
This will ping the node in 2 seconds intervals until it is ready. It can be a long time.
You can pass any arguments that AsyncElasticsearch
supports:
from esorm import connect
async def es_init():
await connect('localhost:9200', wait=True, sniff_on_start=True, sniff_on_connection_fail=True)
Client
The connect
function is a wrapper for the AsyncElasticsearch
constructor. It creates and stores
a global instance of a proxy to an AsyncElasticsearch
instance. The model operations will use this
instance to communicate with ElasticSearch. You can retrieve the proxy client instance and you can
use the same way as AsyncElasticsearch
instance:
from esorm import es
async def es_init():
await es.ping()
Create index templates
You can create index templates easily:
from esorm import model as esorm_model
async def prepare_es():
await esorm_model.create_index_template('default_template',
prefix_name='esorm_',
shards=3,
auto_expand_replicas='1-5')
Here this will be applied all esorm_
prefixed (default) indices.
All indices created by ESORM have a prefix, which you can modify globally if you want:
from esorm.model import set_default_index_prefix
set_default_index_prefix('custom_prefix_')
The default prefix is esorm_
.
Create indices and mappings
You can create indices and mappings automatically from your models:
from esorm import setup_mappings
async def prepare_es():
import models
await setup_mappings(models)
First you must create (import) all model classes. Model classes will be registered into a global registry.
Then you can call setup_mappings
function to create indices and mappings for all registered models.
IMPORTANT: This method will ignore mapping errors if you already have an index with the same name. It can update the
indices
by new fields, but cannot modify or delete fields! For that you need to reindex your ES database. It is an ElasticSearch
limitation.
Model instances
When you get a model instance from elasticsearch by search
or get
methods, you will get the following private
attributes filled automatically:
Attribute | Description |
---|
_id | The ES id of the document |
_routing | The routing value of the document |
_version | Version of the document |
_primary_term | The primary term of the document |
_seq_no | The sequence number of the document |
CRUD: Create
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def create_user():
user = User(name='John Doe', age=25)
new_user_id = await user.save()
print(new_user_id)
CRUD: Read
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def get_user(user_id: str):
user = await User.get(user_id)
print(user.name)
CRUD: Update
On update race conditions are checked automatically (with the help of _primary_term and _seq_no fields).
This way an optimistic locking mechanism is implemented.
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def update_user(user_id: str):
user = await User.get(user_id)
user.name = 'Jane Doe'
await user.save()
CRUD: Delete
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def delete_user(user_id: str):
user = await User.get(user_id)
await user.delete()
Bulk operations
Bulk operations could be much faster than single operations, if you have lot of documents to
create, update or delete.
You can use context for bulk operations:
from typing import List
from esorm import ESModel, ESBulk
class User(ESModel):
name: str
age: int
async def bulk_create_users():
async with ESBulk() as bulk:
for i in range(10):
user = User(name=f'User {i}', age=i)
await bulk.save(user)
async def bulk_delete_users(users: List[User]):
async with ESBulk(wait_for=True) as bulk:
for user in users:
await bulk.delete(user)
The wait_for
argument is optional. If it is True
, the context will wait for the bulk operation to finish.
Search
General search
You can search for documents using search
method, where an ES query can be specified as a dictionary.
You can use res_dict=True
argument to get the result as a dictionary instead of a list. The key will be the
id
of the document: await User.search(query, res_dict=True)
.
If you only need one result, you can use search_one
method.
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def search_users():
users = await User.search(
query={
'bool': {
'must': [{
'range': {
'age': {
'gte': 18
}
}
}]
}
}
)
for user in users:
print(user.name)
async def search_one_user():
user = await User.search_one(
query={
'bool': {
'must': [{
'match': {
'name': {
'query': 'John Doe'
}
}
}]
}
}
)
print(user.name)
Queries are type checked, because they are annotated as TypedDict
s. You can use IDE autocompletion and type checking.
Search with field value terms (dictionary search)
You can search for documents using search_by_fields
method, where you can specify a field and a value.
It also has a res_dict
argument and search_one_by_fields
variant.
from esorm import ESModel
class User(ESModel):
name: str
age: int
async def search_users():
users = await User.search_by_fields({'age': 18})
for user in users:
print(user.name)
Aggregations
You can use aggregate
method to get aggregations.
You can specify an ES aggregation query as a dictionary. It also accepts normal ES queries,
to be able to fiter which documents you want to aggregate.
Both the aggs parameter and the query parameter are type checked, because they are annotated as TypedDict
s.
You can use IDE autocompletion and type checking.
from esorm import ESModel
class User(ESModel):
name: str
age: int
country: str
async def aggregate_avg():
aggs_def = {
'avg_age': {
'avg': {
'field': 'age'
}
}
}
aggs = await User.aggregate(aggs_def)
print(aggs['avg_age']['value'])
async def aggregate_avg_by_country(country = 'Hungary'):
aggs_def = {
'avg_age': {
'avg': {
'field': 'age'
}
}
}
query = {
'bool': {
'must': [{
'match': {
'country': {
'query': country
}
}
}]
}
}
aggs = await User.aggregate(aggs_def, query)
print(aggs['avg_age']['value'])
async def aggregate_terms():
aggs_def = {
'countries': {
'terms': {
'field': 'country'
}
}
}
aggs = await User.aggregate(aggs_def)
for bucket in aggs['countries']['buckets']:
print(bucket['key'], bucket['doc_count'])
Pagination and sorting
You can use Pagination
and Sort
classes to decorate your models. They simply wrap your models
and add pagination and sorting functionality to them.
You can add a callback parameter to the Pagination
class which will be invoked after the search with
the total number of documents found.
from esorm.model import ESModel, Pagination
class User(ESModel):
id: int
name: str
age: int
def get_users(page = 1, page_size = 10):
def pagination_callback(total: int):
print(f'Total users: {total}')
pagination = Pagination(page=page, page_size=page_size)
res = pagination(User).search_by_fields(age=18)
return res
Sorting
It is similar to pagination:
from esorm.model import ESModel, Sort
class User(ESModel):
id: int
name: str
age: int
def get_users():
sort = Sort(sort=[
{'age': {'order': 'desc'}},
{'name': {'order': 'asc'}}
])
res = sort(User).search_by_fields(age=18)
return res
def get_user_sorted_by_name():
sort = Sort(sort='name')
res = sort(User).all()
return res
🧪 Testing
For testing you can use the test.sh
in the root directory. It is a script to running
tests on multiple python interpreters in virtual environments. At the top of the file you can specify
which python interpreters you want to test. The ES versions are specified in tests/docker-compose.yml
file.
If you already have a virtual environment, simply use pytest
to run the tests.
🛡 License
This project is licensed under the terms of the Mozilla Public License 2.0 (
MPL 2.0) license.
📃 Citation
If you use this project in your research, please cite it using the following BibTeX entry:
@misc{esorm,
author = {Adam Wallner},
title = {ESORM: ElasticSearch Object Relational Mapper},
year = {2023},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{https://github.com/wallneradam/esorm}},
}