kover

Kover is a model-orientied strict typed mongodb driver supporting local mongod and replica sets. Battle tests are still required*
this library was inspired by <a href=https://github.com/sakal/aiomongo>this project i like it very much. Though its 8 years old.
Kover is linted by Ruff and supports pyright strict type checking mode.
import asyncio
from kover import AuthCredentials, Kover
async def main():
credentials = AuthCredentials(username="...", password="...")
client = await Kover.make_client(credentials=credentials)
found = await client.db.test.find().limit(10).to_list()
print(found)
if __name__ == "__main__":
asyncio.run(main())
The main reason why i created this project is that Motor - official async wrapper for mongodb, uses ThreadPool executor and it's just a wrapper around pymongo. In general thats slower than clear asyncio and looks more dirty.
- 02.12.24 UPDATE: pymongo added async support but its kinda messed up and not clear. pymongo's code looks dirty and have lots of unnecessary things. kover almost 1.5-2 times faster than pymongo.
Status
it still missing features.
e.g: bulk write API and Compression
but its already very cool!
Dependencies
- All platforms.
- python 3.10
- MongoDB 6.0+ (not sure about older versions)
- pydantic 2.10.6 or later
Features
Almost all features from pymongo. All auth types are supported. Integration with Pydantic supported.
this lib was built for new mongod versions. All features that were marked as DEPRECATED in docs
were NOT added. See docs for references. The kover.bson package was entirely copied from pymongo source code. Also pymongo.saslprep was copied for internal purposes, i do not own these files.
Cursors
if you just need list:
items = await db.test.find().limit(1000).batch_size(50).to_list()
or
async with db.test.find().limit(1000).batch_size(50) as cursor:
async for item in cursor:
print(item)
if collection has specific schema:
from kover import Document
class User(Document):
uuid: UUID
name: str
age: int
async with db.test.find(cls=User).limit(1000) as cursor:
async for item in cursor:
print(item)
Schema Validation
some people say that mongodb is dirty because you can insert any document in collection. Kover fixes that! Use pydantic.Field
here if you need
Document is a Pydantic Model in 2.0
import asyncio
from enum import Enum
import logging
from typing import Annotated
from pydantic import ValidationError
from kover import (
AuthCredentials,
Document,
Kover,
OperationFailure,
SchemaGenerator,
)
from kover.metadata import SchemaMetadata
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class UserType(Enum):
ADMIN = "ADMIN"
USER = "USER"
CREATOR = "CREATOR"
class Friend(Document):
name: str
age: Annotated[int, SchemaMetadata(minimum=18)]
class User(Document):
name: Annotated[str, SchemaMetadata(description="must be a string")]
age: Annotated[
int,
SchemaMetadata(
description="age must be int and more that 18", minimum=18,
),
]
user_type: Annotated[
UserType,
SchemaMetadata(description="can only be one of the enum values"),
]
friend: Friend | None
async def main() -> None:
credentials = AuthCredentials.from_environ()
client = await Kover.make_client(credentials=credentials)
generator = SchemaGenerator()
schema = generator.generate(User)
collection = await client.db.test.create_if_not_exists()
await collection.set_validator(schema)
valid_user = User(
name="John Doe",
age=20,
user_type=UserType.USER,
friend=Friend(name="dima", age=18),
)
object_id = await collection.insert(valid_user)
log.info(f"{object_id}, added!")
try:
invalid_user = User(
name="Rick",
age=15,
user_type=UserType.ADMIN,
friend=Friend(name="roma", age=25),
)
except ValidationError as e:
raise SystemExit(e.errors()) from e
try:
await collection.insert(invalid_user)
except OperationFailure as e:
msg: str = e.message["errmsg"]
log.info(f"got Error: {msg}")
assert e.code == 121
if __name__ == "__main__":
asyncio.run(main())
Transactions
import asyncio
import logging
from typing import TYPE_CHECKING
from kover.bson import ObjectId
from kover import AuthCredentials, Kover
if TYPE_CHECKING:
from kover import xJsonT
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def main() -> None:
credentials = AuthCredentials.from_environ()
client = await Kover.make_client(credentials=credentials)
session = await client.start_session()
doc: xJsonT = {"_id": ObjectId(), "name": "John", "age": 30}
collection = await client.db.test.create_if_not_exists()
async with session.start_transaction() as transaction:
await collection.insert(doc, transaction=transaction)
await collection.insert(doc, transaction=transaction)
exc = transaction.exception
log.info(f"{exc}, {type(exc)}")
log.info(f"trx state: {transaction.state}")
found = await collection.find().to_list()
log.info(found)
if __name__ == "__main__":
asyncio.run(main())
GridFS
import asyncio
import logging
from kover import AuthCredentials, Kover
from kover.gridfs import GridFS
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def main() -> None:
credentials = AuthCredentials.from_environ()
client = await Kover.make_client(credentials=credentials)
database = client.get_database("files")
fs = await GridFS(database).indexed()
file_id = await fs.put(b"Hello World!")
file, binary = await fs.get_by_file_id(file_id)
log.info(file, binary.read())
files = await fs.list()
log.info(f"total files: {len(files)}")
deleted = await fs.delete(file_id)
log.info(f"is file deleted? {deleted}")
if __name__ == "__main__":
asyncio.run(main())
Updating/Deleting docs
import asyncio
import logging
from typing import TYPE_CHECKING
from kover import (
AuthCredentials,
Delete,
Document,
Kover,
Update,
)
if TYPE_CHECKING:
from kover.bson import ObjectId
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class User(Document):
name: str
age: int
async def main() -> None:
credentials = AuthCredentials.from_environ()
kover = await Kover.make_client(credentials=credentials)
collection = kover.db.get_collection("test")
user = User(name="John", age=23)
file_id: ObjectId = await collection.insert(user)
update = Update({"_id": file_id}, {"$set": {"name": "Wick"}})
await collection.update(update)
delete = Delete({"_id": file_id}, limit=1)
n = await collection.delete(delete)
log.info(f"documents deleted: {n}")
if __name__ == "__main__":
asyncio.run(main())
If you found a bug, go ahead and open an issue, or even better a pull request, thx ❤️