Socket
Socket
Sign inDemoInstall

pyesorm

Package Overview
Dependencies
0
Maintainers
1
Alerts
File Explorer

Install Socket

Detect and block malicious and high-risk dependencies

Install

    pyesorm

Python ElasticSearch ORM based on Pydantic


Maintainers
1

Readme

Logo

ESORM - Python ElasticSearch ORM based on Pydantic

Some ideas come from Pydastic library, which is similar, but not as advanced (yet).

☰ Table of Contents

💾 Installation

pip install pyesorm

🚀 Features

  • Pydantic model representation of ElasticSearch documents
  • Automatic mapping and index creation
  • CRUD operations
  • Full async support (no sync version at all)
  • Mapping to and from ElasticSearch types
  • Support for nested documents
  • Custom id field
  • Context for bulk operations
  • Supported IDE autocompletion and type checking (PyCharm tested)
  • Everything in the source code is documented and annotated
  • TypedDicts for ElasticSearch queries and aggregations
  • Docstring support for fields
  • Shard routing support
  • Lazy properties
  • Support >= Python 3.8 (tested with 3.8 through 3.12)
  • Support for ElasticSearch 8.x and 7.x
  • Watcher support (You may need ElasticSearch subscription license for this)
  • Pagination and sorting
  • FastAPI integration

Not all ElasticSearch features are supported yet, pull requests are welcome.

Supported ElasticSearch versions

It is tested with ElasticSearch 7.x and 8.x.

Supported Python versions

Tested with Python 3.8 through 3.12.

📖 Usage

Define a model

You can use all Pydantic model features, because ESModel is a subclass of pydantic.BaseModel.

Python basic types
from esorm import ESModel


class User(ESModel):
    name: str
    age: int

This is how the python types are converted to ES types:

Python typeES type
strtext
intlong
floatdouble
boolboolean
datetime.datetimedate
datetime.datedate
datetime.timedate

ESORM field types

You can specify ElasticSearch special fields using esorm.fields module.

from esorm import ESModel
from esorm.fields import keyword, text, byte, geo_point


class User(ESModel):
    name: text
    email: keyword
    age: byte
    location: geo_point
    ...

The supported fields are:

Field nameES type
keywordkeyword
texttext
binarybinary
bytebyte
shortshort
integer or int32integer
long or int64long
float16 or half_floathalf_float
float32float
doubledouble
booleanboolean
geo_pointgeo_point

Nested documents
from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    name: text
    email: keyword
    age: byte = 18


class Post(ESModel):
    title: text
    content: text
    writer: User  # User is a nested document

Id field

You can specify id field in model settings:

from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    class ESConfig:
        id_field = 'email'

    name: text
    email: keyword
    age: byte = 18

This way the field specified in id_field will be removed from the document and used as the document _id in the index.

If you specify a field named id in your model, it will be used as the document _id in the index (it will automatically override the id_field setting):

from esorm import ESModel


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str

You can also create an __id__ property in your model to return a custom id:

from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    name: text
    email: keyword
    age: byte = 18

    @property
    def __id__(self) -> str:
        return self.email

NOTE: annotation of __id__ method is important, and it must be declared as a property.

Model Settings

You can specify model settings using ESConfig child class.

from typing import Optional, List, Dict, Any
from esorm import ESModel


class User(ESModel):
    class ESConfig:
        """ ESModel Config """
        # The index name
        index: Optional[str] = None
        # The name of the 'id' field
        id_field: Optional[str] = None
        # Default sort
        default_sort: Optional[List[Dict[str, Dict[str, str]]]] = None
        # ElasticSearch index settings (https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
        settings: Optional[Dict[str, Any]] = None
        # Maximum recursion depth of lazy properties
        lazy_property_max_recursion_depth: int = 1

ESModelTimestamp

You can use ESModelTimestamp class to add created_at and updated_at fields to your model:

from esorm import ESModelTimestamp


class User(ESModelTimestamp):
    name: str
    age: int

These fields will be automatically updated to the actual datetime when you create or update a document. The created_at field will be set only when you create a document. The updated_at field will be set when you create or update a document.

Describe fields

You can use the usual Pydantic field description, but you can also use docstrings like this:

from esorm import ESModel
from esorm.fields import TextField


class User(ESModel):
    name: str = 'John Doe'
    """ The name of the user """
    age: int = 18
    """ The age of the user """

    # This is the usual Pydantic way, but I think docstrings are more intuitive and readable
    address: str = TextField(description="The address of the user")

The documentation is usseful if you create an API and you want to generate documentation from the model. It can be used in FastAPI for example.

Connecting to ElasticSearch

You can connect with a simple connection string:

from esorm import connect


async def es_init():
    await connect('localhost:9200')

Also you can connect to multiple hosts if you have a cluster:

from esorm import connect


async def es_init():
    await connect(['localhost:9200', 'localhost:9201'])

You can wait for node or cluster to be ready (recommended):

from esorm import connect


async def es_init():
    await connect('localhost:9200', wait=True)

This will ping the node in 2 seconds intervals until it is ready. It can be a long time.

You can pass any arguments that AsyncElasticsearch supports:

from esorm import connect


async def es_init():
    await connect('localhost:9200', wait=True, sniff_on_start=True, sniff_on_connection_fail=True)

Client

The connect function is a wrapper for the AsyncElasticsearch constructor. It creates and stores a global instance of a proxy to an AsyncElasticsearch instance. The model operations will use this instance to communicate with ElasticSearch. You can retrieve the proxy client instance and you can use the same way as AsyncElasticsearch instance:

from esorm import es


async def es_init():
    await es.ping()

Create index templates

You can create index templates easily:

from esorm import model as esorm_model


# Create index template
async def prepare_es():
    await esorm_model.create_index_template('default_template',
                                            prefix_name='esorm_',
                                            shards=3,
                                            auto_expand_replicas='1-5')

Here this will be applied all esorm_ prefixed (default) indices.

All indices created by ESORM have a prefix, which you can modify globally if you want:

from esorm.model import set_default_index_prefix

set_default_index_prefix('custom_prefix_')

The default prefix is esorm_.

Create indices and mappings

You can create indices and mappings automatically from your models:

from esorm import setup_mappings


# Create indices and mappings
async def prepare_es():
    import models  # Import your models
    # Here models argument is not needed, but you can pass it to prevent unused import warning
    await setup_mappings(models)  

First you must create (import) all model classes. Model classes will be registered into a global registry. Then you can call setup_mappings function to create indices and mappings for all registered models.

IMPORTANT: This method will ignore mapping errors if you already have an index with the same name. It can update the indices by new fields, but cannot modify or delete fields! For that you need to reindex your ES database. It is an ElasticSearch limitation.

CRUD: Create

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def create_user():
    # Create a new user 
    user = User(name='John Doe', age=25)
    # Save the user to ElasticSearch
    new_user_id = await user.save()
    print(new_user_id)

CRUD: Read

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def get_user(user_id: str):
    user = await User.get(user_id)
    print(user.name)

CRUD: Update

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def update_user(user_id: str):
    user = await User.get(user_id)
    user.name = 'Jane Doe'
    await user.save()

CRUD: Delete

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def delete_user(user_id: str):
    user = await User.get(user_id)
    await user.delete()

Bulk operations

Bulk operations could be much faster than single operations, if you have lot of documents to create, update or delete.

You can use context for bulk operations:

from typing import List
from esorm import ESModel, ESBulk


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def bulk_create_users():
    async with ESBulk() as bulk:
        # Creating or modifiying models
        for i in range(10):
            user = User(name=f'User {i}', age=i)
            await bulk.save(user)


async def bulk_delete_users(users: List[User]):
    async with ESBulk(wait_for=True) as bulk:  # Here we wait for the bulk operation to finish
        # Deleting models
        for user in users:
            await bulk.delete(user)

The wait_for argument is optional. If it is True, the context will wait for the bulk operation to finish.

You can search for documents using search method, where an ES query can be specified as a dictionary. You can use res_dict=True argument to get the result as a dictionary instead of a list. The key will be the id of the document: await User.search(query, res_dict=True).

If you only need one result, you can use search_one method.

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def search_users():
    # Search for users at least 18 years old
    users = await User.search(
        query={
            'bool': {
                'must': [{
                    'range': {
                        'age': {
                            'gte': 18
                        }
                    }
                }]
            }
        }
    )
    for user in users:
        print(user.name)


async def search_one_user():
    # Search a user named John Doe
    user = await User.search_one(
        query={
            'bool': {
                'must': [{
                    'match': {
                        'name': {
                            'query': 'John Doe'
                        }
                    }
                }]
            }
        }
    )
    print(user.name)

Queries are type checked, because they are annotated as TypedDicts. You can use IDE autocompletion and type checking.

You can search for documents using search_by_fields method, where you can specify a field and a value. It also has a res_dict argument and search_one_by_fields variant.

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def search_users():
    # Search users age is 18
    users = await User.search_by_fields({'age': 18})
    for user in users:
        print(user.name)

Aggregations

You can use aggregate method to get aggregations. You can specify an ES aggregation query as a dictionary. It also accepts normal ES queries, to be able to fiter which documents you want to aggregate. Both the aggs parameter and the query parameter are type checked, because they are annotated as TypedDicts. You can use IDE autocompletion and type checking.

from esorm import ESModel

# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int
    country: str
    
async def aggregate_avg():
    # Get average age of users
    aggs_def = {
        'avg_age': {
            'avg': {
                'field': 'age'
            }
        }
    }
    aggs = await User.aggregate(aggs_def)
    print(aggs['avg_age']['value'])
    
async def aggregate_avg_by_country(country = 'Hungary'):
    # Get average age of users by country
    aggs_def = {
        'avg_age': {
            'avg': {
                'field': 'age'
            }
        }
    }
    query = {
        'bool': {
            'must': [{
                'match': {
                    'country': {
                        'query': country
                    }
                }
            }]
        }
    }
    aggs = await User.aggregate(aggs_def, query)
    print(aggs['avg_age']['value'])
    
    
async def aggregate_terms():
    # Get number of users by country
    aggs_def = {
        'countries': {
            'terms': {
                'field': 'country'
            }
        }
    }
    aggs = await User.aggregate(aggs_def)
    for bucket in aggs['countries']['buckets']:
        print(bucket['key'], bucket['doc_count'])

Pagination and sorting

You can use Pagination and Sort classes to decorate your models. They simply wrap your models and add pagination and sorting functionality to them.

Pagination
from esorm.model import ESModel, Pagination


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str
    age: int


def get_users(page = 1, page_size = 10):
    # 1st create the decorator itself
    pagination = Pagination(page=page, page_size=page_size)
    
    # Then decorate your model
    res = pagination(User).search_by_fields(age=18)
    
    # Here the result has maximum 10 items
    return res
Sorting

It is similar to pagination:

from esorm.model import ESModel, Sort


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str
    age: int
    
    
def get_users():
    # 1st create the decorator itself
    sort = Sort(sort=[
        {'age': {'order': 'desc'}},
        {'name': {'order': 'asc'}}
    ])
    
    # Then decorate your model
    res = sort(User).search_by_fields(age=18)
    
    # Here the result is sorted by age ascending
    return res
    
def get_user_sorted_by_name():
    # You can also use this simplified syntax 
    sort = Sort(sort='name')
    
    # Then decorate your model
    res = sort(User).all()
    
    # Here the result is sorted by age descending
    return res

🧪 Testing

For testing you can use the test.sh in the root directory. It is a script to running tests on multiple python interpreters in virtual environments. At the top of the file you can specify which python interpreters you want to test. The ES versions are specified in tests/docker-compose.yml file.

If you already have a virtual environment, simply use pytest to run the tests.

🛡 License

This project is licensed under the terms of the Mozilla Public License 2.0 ( MPL 2.0) license.

📃 Citation

If you use this project in your research, please cite it using the following BibTeX entry:

@misc{esorm,
  author = {Adam Wallner},
  title = {ESORM: ElasticSearch Object Relational Mapper},
  year = {2023},
  publisher = {GitHub},
  journal = {GitHub repository},
  howpublished = {\url{https://github.com/wallneradam/esorm}},
} 

Keywords

FAQs


Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc