Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

dotorm

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dotorm - npm Package Compare versions

Comparing version
2.0.5
to
2.0.6
+134
dotorm/databases/abstract/dialect.py
"""Database dialect abstraction (Strategy pattern)."""
from abc import ABC, abstractmethod
from typing import Any, Literal
# Unified cursor types for all dialects
CursorType = Literal[
"fetchall",
"fetch",
"fetchrow",
"fetchval",
"executemany",
"lastrowid", # MySQL-specific
"void", # Execute without returning results
]
class Dialect(ABC):
"""
Abstract dialect defining database-specific behavior.
Each database has its own dialect that handles:
- Placeholder conversion (%s → $1 for Postgres, %s stays for MySQL)
- Cursor method mapping (fetchall → fetch for asyncpg)
- Result conversion (Record → dict)
"""
_cursor_map: dict[str, str]
def convert_placeholders(self, stmt: str) -> str:
"""Convert %s placeholders to database-specific format."""
return stmt
def get_cursor_method(self, cursor: CursorType) -> str:
"""Map CursorType to actual driver method name."""
return self._cursor_map.get(cursor, "void")
@abstractmethod
def convert_result(self, rows: Any, cursor: CursorType) -> Any:
"""Convert raw database result to standard format (list of dicts, etc)."""
...
class PostgresDialect(Dialect):
"""PostgreSQL dialect for asyncpg driver."""
_cursor_map = {
"fetchall": "fetch",
"fetch": "fetch",
"fetchrow": "fetchrow",
"fetchval": "fetchval",
}
def convert_placeholders(self, stmt: str) -> str:
"""Convert %s to $1, $2, $3..."""
counter = 1
while "%s" in stmt:
stmt = stmt.replace("%s", f"${counter}", 1)
counter += 1
return stmt
def convert_result(self, rows: Any, cursor: CursorType) -> Any:
"""Convert asyncpg Record objects to dicts."""
if rows is None or cursor in ("void", "executemany"):
return rows
if cursor == "fetchval":
return rows # Single value
if cursor == "fetchrow":
return dict(rows) if rows else None
# fetchall/fetch
return [dict(rec) for rec in rows] if rows else []
class MySQLDialect(Dialect):
"""MySQL dialect for aiomysql driver."""
_cursor_map = {
"fetchall": "fetchall",
"fetch": "fetchall",
"fetchrow": "fetchone",
"fetchval": "fetchone",
}
def convert_result(self, rows: Any, cursor: CursorType) -> Any:
"""Convert MySQL results."""
if rows is None or cursor in ("void", "executemany", "lastrowid"):
return rows
if cursor == "fetchval":
# fetchone returns tuple, get first element
return (
rows[0] if rows and isinstance(rows, (tuple, list)) else rows
)
if cursor == "fetchrow":
return dict(rows) if rows else None
# fetchall/fetch
return [dict(rec) for rec in rows] if rows else []
class ClickHouseDialect(Dialect):
"""ClickHouse dialect for asynch driver."""
_cursor_map = {
"fetchall": "fetchall",
"fetch": "fetchall",
"fetchrow": "fetchone",
"fetchval": "fetchone",
}
def convert_result(self, rows: Any, cursor: CursorType) -> Any:
"""Convert ClickHouse results to dicts."""
if rows is None or cursor in ("void", "executemany"):
return rows
if cursor == "fetchval":
# fetchone returns tuple, get first element
if rows and isinstance(rows, (tuple, list)):
return rows[0]
return rows
if cursor == "fetchrow":
# Single row - convert to dict if tuple with column info
return dict(rows) if rows and hasattr(rows, "_fields") else rows
# fetchall/fetch - list of tuples
if rows and hasattr(rows[0], "_fields"):
return [dict(rec._asdict()) for rec in rows]
return rows if rows else []
"""
Декораторы для DotORM моделей - улучшенная версия.
@hybridmethod - декоратор для гибридных методов (работают И как classmethod И как instance).
@model - декоратор для бизнес-методов модели.
Эта версия улучшает типизацию через:
1. Generic типы с ParamSpec для точных параметров
2. @overload для корректной работы IDE
3. __slots__ для производительности
"""
from __future__ import annotations
import functools
from typing import (
TYPE_CHECKING,
TypeVar,
Generic,
Callable,
Any,
Coroutine,
overload,
ParamSpec,
Concatenate,
)
if TYPE_CHECKING:
pass
# TypeVar для типизации
_T = TypeVar("_T")
_P = ParamSpec("_P")
_R = TypeVar("_R")
_R_co = TypeVar("_R_co", covariant=True)
class hybridmethod(Generic[_T, _P, _R]):
"""
Декоратор для гибридных методов (работают И как classmethod И как instance).
При вызове из класса (Model.method(...)) автоматически создает пустой instance.
При вызове из instance (self.method(...)) использует существующий instance.
Преимущества:
- Полная обратная совместимость с существующим кодом
- Упрощенный синтаксис в @model методах
- self.__class__ всегда правильный класс
- Точные типы параметров и возвращаемого значения
- IDE автокомплит работает корректно
Примеры использования:
```python
from backend.base.system.dotorm.dotorm.decorators import hybridmethod
from typing import Self
class DotModel:
@hybridmethod
async def get(self, id: int, fields: list[str] = []) -> Self:
'''Получить запись по ID.'''
cls = self.__class__
stmt, values = cls._builder.build_get(id, fields)
record = await session.execute(stmt, values)
return record
@hybridmethod
async def search(self, filter=None, **kwargs) -> list[Self]:
'''Поиск записей.'''
cls = self.__class__
stmt, values = cls._builder.build_search(filter, **kwargs)
records = await session.execute(stmt, values)
return records
# ✅ Вариант 1: Вызов из класса (обратная совместимость)
user: User = await User.get(1) # Type: User ✅
users: list[User] = await User.search() # Type: list[User] ✅
# ✅ Вариант 2: Вызов из instance
@model
async def create_link(self, external_id: str) -> Self:
link_id = await self.create(payload=link) # Type: int ✅
return await self.get(link_id) # Type: Self ✅
# ✅ Вариант 3: Явный пустой instance
Model = ChatExternalChat()
link = await Model.create_link("ext_123")
```
Типизация:
Декоратор сохраняет точные типы через:
- Generic[_T, _P, _R] для типа класса, параметров и результата
- ParamSpec для точных типов параметров
- @overload для корректной работы IDE в обоих контекстах
"""
__slots__ = ("func", "__wrapped__", "name", "__dict__")
func: Callable[..., Coroutine[Any, Any, _R]]
__wrapped__: Callable[..., Any]
__annotations__: dict[str, Any]
name: str
def __init__(
self, func: Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R]]
) -> None:
self.func = func
self.__wrapped__ = func
functools.update_wrapper(self, func)
self.__annotations__ = getattr(func, "__annotations__", {})
self.name = ""
@overload
def __get__(
self, instance: None, owner: type[_T]
) -> Callable[_P, Coroutine[Any, Any, _R]]:
"""Вызов из класса: Model.method(...)"""
...
@overload
def __get__(
self, instance: _T, owner: type[_T]
) -> Callable[_P, Coroutine[Any, Any, _R]]:
"""Вызов из instance: self.method(...)"""
...
def __get__(
self, instance: _T | None, owner: type[_T]
) -> Callable[_P, Coroutine[Any, Any, _R]]:
"""
Дескриптор протокол - возвращает bound метод.
@overload позволяет IDE понимать типы в обоих случаях:
- Model.get(1) -> IDE знает что возвращает Self
- self.get(1) -> IDE знает что возвращает Self
Args:
instance: Экземпляр класса или None (если вызов из класса)
owner: Класс владелец
Returns:
Async функция с сохраненными типами параметров и результата
"""
if instance is None:
# Вызов из класса: Model.method(...)
# Автоматически создаем пустой instance
@functools.wraps(self.func)
async def class_method(*args: _P.args, **kwargs: _P.kwargs) -> _R:
empty_instance = owner()
return await self.func(empty_instance, *args, **kwargs)
class_method.__annotations__ = self.__annotations__
return class_method
else:
# Вызов из instance: self.method(...)
# Используем существующий instance
@functools.wraps(self.func)
async def instance_method(
*args: _P.args, **kwargs: _P.kwargs
) -> _R:
return await self.func(instance, *args, **kwargs)
instance_method.__annotations__ = self.__annotations__
return instance_method
def __set_name__(self, owner: type[Any], name: str) -> None:
"""Сохраняем имя метода для отладки."""
self.name = name
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""
Fallback для прямого вызова (не используется в runtime).
Этот метод нужен для:
1. Поддержки типизации в IDE
2. Корректной работы inspect модуля
"""
return self.func(*args, **kwargs)
# def model(
# func: Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R]],
# ) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R]]:
# """
# Декоратор для бизнес-методов модели (фабричные методы, поиск, бизнес-логика).
# Помечает метод как "метод уровня модели" - работает на уровне класса,
# а не с конкретными записями. self может быть пустым экземпляром.
# Преимущества:
# - self.__class__ всегда правильный (с расширениями при наследовании)
# - Instance метод - легко расширять через наследование
# - Работает с @hybridmethod для self.get(), self.search()
# - Правильная типизация с Self
# Примеры использования:
# ```python
#
# from typing import Self
# class ChatExternalChat(DotModel):
# @model
# async def create_link(
# self,
# external_id: str,
# connector_id: int,
# chat_id: int
# ) -> Self:
# '''Создать связь между внешним и внутренним чатом.'''
# # self - пустой экземпляр
# # self.__class__ - ChatExternalChat (или подкласс!)
# link = self.__class__(
# external_id=external_id,
# connector_id=connector_id,
# chat_id=chat_id
# )
# # self.create(), self.get() работают благодаря @hybridmethod
# link_id: int = await self.create(payload=link)
# return await self.get(link_id)
# @model
# async def find_by_external_id(
# self,
# external_id: str,
# connector_id: int
# ) -> Self | None:
# '''Найти связь по внешнему ID.'''
# results: list[Self] = await self.search(
# filter=[
# ("external_id", "=", external_id),
# ("connector_id", "=", connector_id),
# ],
# limit=1,
# )
# return results[0] if results else None
# # Использование
# Chat = ChatExternalChat() # Пустой экземпляр
# link = await Chat.create_link("ext_123", 1, 42)
# found = await Chat.find_by_external_id("ext_123", 1)
# ```
# Расширение через наследование:
# ```python
# class TelegramChat(ChatExternalChat):
# @model
# async def create_link(
# self,
# external_id: str,
# connector_id: int,
# chat_id: int,
# thread_id: int | None = None
# ) -> Self:
# '''Расширенное создание с Telegram-специфичными данными.'''
# # self.__class__ = TelegramChat автоматически!
# link = await super().create_link(external_id, connector_id, chat_id)
# if thread_id:
# link.telegram_thread_id = thread_id
# await link.update()
# return link
# # Использование - правильный тип автоматически
# Telegram = TelegramChat()
# telegram_link = await Telegram.create_link("tg_123", 1, 42, thread_id=999)
# # Type: TelegramChat ✅
# ```
# """
# @functools.wraps(func)
# async def wrapper(
# self_or_cls: _T | type[_T], *args: _P.args, **kwargs: _P.kwargs
# ) -> _R:
# # Поддержка вызова и из класса и из instance
# if isinstance(self_or_cls, type):
# # Вызов из класса: ChatExternalChat.create_link(...)
# instance: _T = self_or_cls()
# else:
# # Вызов из instance: chat.create_link(...)
# instance = self_or_cls
# return await func(instance, *args, **kwargs)
# # Помечаем метод специальным атрибутом для introspection
# wrapper._dotorm_model_method = True # type: ignore[attr-defined]
# wrapper._original_func = func # type: ignore[attr-defined]
# return wrapper
# Экспортируем декораторы
__all__ = ["hybridmethod"]
+1
-1

@@ -49,3 +49,3 @@ """

__version__ = "2.0.5ы"
__version__ = "2.0.6"

@@ -52,0 +52,0 @@ __all__ = [

@@ -5,2 +5,3 @@ """Abstract database interfaces."""

from .session import SessionAbstract
from .dialect import Dialect, PostgresDialect, MySQLDialect, ClickHouseDialect, CursorType
from .types import (

@@ -16,2 +17,7 @@ ContainerSettings,

"SessionAbstract",
"Dialect",
"PostgresDialect",
"MySQLDialect",
"ClickHouseDialect",
"CursorType",
"ContainerSettings",

@@ -18,0 +24,0 @@ "PostgresPoolSettings",

"""Abstract session interface."""
from abc import ABC, abstractmethod
from typing import Any, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from .dialect import CursorType
class SessionAbstract(ABC):
"""
Abstract database session.
Subclasses must implement execute() method.
Use dialect helper for SQL transformations.
"""
@abstractmethod

@@ -11,5 +22,47 @@ async def execute(

stmt: str,
val=None,
func_prepare=None,
func_cur="fetch",
): ...
values: Any = None,
*,
prepare: Callable | None = None,
cursor: "CursorType" = "fetchall",
) -> Any:
"""
Execute SQL query.
Args:
stmt: SQL statement with %s placeholders
values: Query parameters:
- Tuple/list for single query
- List of tuples for executemany
prepare: Optional function to transform results
cursor: Fetch mode:
- "fetchall"/"fetch": Return list of dicts
- "fetchrow": Return single dict or None
- "fetchval": Return single value or None
- "executemany": Execute multiple inserts
- "lastrowid": Return last inserted row ID (MySQL only)
- "void": Execute without returning rows (INSERT/UPDATE/DELETE)
Returns:
Query results based on cursor mode
Example:
# Fetch all rows
rows = await session.execute("SELECT * FROM users WHERE active = %s", (True,))
# Fetch single row
user = await session.execute("SELECT * FROM users WHERE id = %s", (1,), cursor="fetchrow")
# Fetch single value
count = await session.execute("SELECT COUNT(*) FROM users", cursor="fetchval")
# Execute without return
await session.execute("UPDATE users SET active = %s", (False,), cursor="void")
# Execute many
await session.execute(
"INSERT INTO users (name) VALUES (%s)",
[("Alice",), ("Bob",)],
cursor="executemany"
)
"""
...

@@ -1,20 +0,83 @@

from typing import Any, Callable
"""ClickHouse session implementations."""
try:
import asynch
except ImportError:
...
from typing import Any, Callable, TYPE_CHECKING
from ..abstract.session import SessionAbstract
from ..abstract.dialect import ClickHouseDialect, CursorType
class ClickhouseSession(SessionAbstract): ...
if TYPE_CHECKING:
import asynch
from asynch.cursors import Cursor
# Shared dialect instance
_dialect = ClickHouseDialect()
class ClickhouseSession(SessionAbstract):
"""Base ClickHouse session."""
@staticmethod
async def _do_execute(
cursor: "Cursor",
stmt: str,
values: Any,
cursor_type: CursorType,
) -> Any:
"""
Execute query on cursor (shared logic).
Args:
cursor: asynch cursor
stmt: SQL with %s placeholders
values: Query values
cursor_type: Cursor type
Returns:
Raw result from asynch
"""
# executemany
if cursor_type == "executemany":
if not values:
raise ValueError("executemany requires values")
for row in values:
await cursor.execute(stmt, row)
return None
# Execute query
if values:
await cursor.execute(stmt, values)
else:
await cursor.execute(stmt)
# void - execute only (INSERT without return)
if cursor_type == "void":
return None
# fetch operations
method_name = _dialect.get_cursor_method(cursor_type)
if method_name:
method = getattr(cursor, method_name)
return await method()
return None
class NoTransactionSession(ClickhouseSession):
"""Этот класс берет соединение из пулла и выполняет запрос в нем."""
"""
Session for non-transactional queries.
Acquires connection from pool per query.
def __init__(self, pool: asynch.Pool):
self.pool = pool
Note: ClickHouse doesn't support transactions.
"""
default_pool: "asynch.Pool | None" = None
def __init__(self, pool: "asynch.Pool | None" = None) -> None:
if pool is None:
assert self.default_pool is not None
self.pool = self.default_pool
else:
self.pool = pool
async def execute(

@@ -26,19 +89,13 @@ self,

prepare: Callable | None = None,
cursor: str = "fetchall",
cursor: CursorType = "fetchall",
) -> Any:
"""
Простая реализация сессии в кликхаусе.
Выполнение запроса, и возврат соединения в пул.
"""
stmt = _dialect.convert_placeholders(stmt)
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
assert isinstance(cur, asynch.Cursor)
if values:
await cur.execute(stmt, values)
else:
await cur.execute(stmt)
rows = await cur.fetchall()
if prepare and rows:
return prepare(rows)
return rows
result = await self._do_execute(cur, stmt, values, cursor)
result = _dialect.convert_result(result, cursor)
if prepare and result:
return prepare(result)
return result
"""MySQL session implementations."""
from typing import Any, Callable
from typing import Any, Callable, TYPE_CHECKING
try:
from ..abstract.session import SessionAbstract
from ..abstract.dialect import MySQLDialect, CursorType
if TYPE_CHECKING:
import aiomysql
except ImportError:
...
from ..abstract.session import SessionAbstract
# Shared dialect instance
_dialect = MySQLDialect()
class MysqlSession(SessionAbstract): ...
class MysqlSession(SessionAbstract):
"""Base MySQL session."""
@staticmethod
async def _do_execute(
cursor: "aiomysql.Cursor",
stmt: str,
values: Any,
cursor_type: CursorType,
) -> Any:
"""
Execute query on cursor (shared logic).
Args:
cursor: aiomysql cursor
stmt: SQL with %s placeholders
values: Query values
cursor_type: Cursor type
Returns:
Raw result from aiomysql
"""
# executemany
if cursor_type == "executemany":
if not values:
raise ValueError("executemany requires values")
await cursor.executemany(stmt, values)
return None
# Execute query
if values:
await cursor.execute(stmt, values)
else:
await cursor.execute(stmt)
# void - execute only (INSERT/UPDATE/DELETE without return)
if cursor_type == "void":
return None
# lastrowid special case
if cursor_type == "lastrowid":
return cursor.lastrowid
# fetch operations
method = getattr(cursor, _dialect.get_cursor_method(cursor_type))
return await method()
class TransactionSession(MysqlSession):
"""Этот класс работает в одном соединении не закрывая его.
Пока его не закроют явно. Используется при работе в транзакции.
Паттерн unit of work."""
"""
Session for transactional queries.
Uses single connection within transaction context.
"""

@@ -33,26 +83,17 @@ def __init__(

prepare: Callable | None = None,
cursor: str = "fetchall",
cursor: CursorType = "fetchall",
) -> Any:
if values:
await self.cursor.execute(stmt, values)
else:
await self.cursor.execute(stmt)
stmt = _dialect.convert_placeholders(stmt)
result = await self._do_execute(self.cursor, stmt, values, cursor)
result = _dialect.convert_result(result, cursor)
if cursor == "lastrowid":
rows = self.cursor.lastrowid
elif cursor is not None:
rows = await getattr(self.cursor, cursor)()
else:
rows = None
if prepare and result:
return prepare(result)
return result
if prepare:
return prepare(rows)
return rows
class NoTransactionSession(MysqlSession):
"""
Session for non-transactional queries.
Uses pool with autocommit enabled.
Acquires connection from pool per query.
"""

@@ -75,18 +116,15 @@

prepare: Callable | None = None,
cursor: str = "fetchall",
cursor: CursorType = "fetchall",
) -> Any:
import aiomysql
stmt = _dialect.convert_placeholders(stmt)
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
if values:
await cur.execute(stmt, values)
else:
await cur.execute(stmt)
result = await self._do_execute(cur, stmt, values, cursor)
result = _dialect.convert_result(result, cursor)
if cursor == "lastrowid":
rows = cur.lastrowid
else:
rows = await getattr(cur, cursor)()
if prepare:
return prepare(rows)
return rows
if prepare and result:
return prepare(result)
return result

@@ -10,3 +10,4 @@ """PostgreSQL database support."""

)
from .transaction import ContainerTransaction
from .transaction import ContainerTransaction, get_current_session
from ..abstract.dialect import CursorType, PostgresDialect

@@ -20,2 +21,5 @@ __all__ = [

"ContainerTransaction",
"get_current_session",
"CursorType",
"PostgresDialect",
]
"""PostgreSQL session implementations."""
from typing import Any, Callable
from typing import Any, Callable, TYPE_CHECKING
from ..abstract.types import PostgresPoolSettings
from ..abstract.session import SessionAbstract
from ..abstract.dialect import PostgresDialect, CursorType
try:
if TYPE_CHECKING:
import asyncpg
from asyncpg.transaction import Transaction
except ImportError:
asyncpg = None # type: ignore
Transaction = None # type: ignore
class PostgresSession(SessionAbstract): ...
# Shared dialect instance
_dialect = PostgresDialect()
class PostgresSession(SessionAbstract):
"""Base PostgreSQL session."""
@staticmethod
async def _do_execute(
conn: "asyncpg.Connection",
stmt: str,
values: Any,
cursor: CursorType,
) -> Any:
"""
Execute query on connection (shared logic).
Args:
conn: asyncpg connection
stmt: SQL with $1, $2... placeholders (already converted)
values: Query values
cursor: Cursor type
Returns:
Raw result from asyncpg
"""
# executemany
if cursor == "executemany":
if not values:
raise ValueError("executemany requires values")
# Handle [[v1, v2], [v3, v4]] or [[[v1, v2], [v3, v4]]] format
rows = (
values[0]
if isinstance(values[0], list)
and values[0]
and isinstance(values[0][0], (list, tuple))
else values
)
for row in rows:
await conn.execute(stmt, *row)
return None
# void - execute only (INSERT/UPDATE/DELETE without return)
if cursor == "void":
if values:
await conn.execute(stmt, *values)
else:
await conn.execute(stmt)
return None
# fetch operations
method = getattr(conn, _dialect.get_cursor_method(cursor))
if values:
return await method(stmt, *values)
return await method(stmt)
class TransactionSession(PostgresSession):
"""
Session for transactional queries.
Works in single connection without closing it.
Used in transaction context manager.
Uses single connection within transaction context.
"""

@@ -40,50 +90,17 @@

prepare: Callable | None = None,
cursor: str = "fetchall",
cursor: CursorType = "fetchall",
) -> Any:
# Заменить %s на $1...$n dollar-numberic
counter = 1
while "%s" in stmt:
stmt = stmt.replace("%s", "$" + str(counter), 1)
counter += 1
stmt = _dialect.convert_placeholders(stmt)
result = await self._do_execute(self.connection, stmt, values, cursor)
result = _dialect.convert_result(result, cursor)
rows_dict = []
if cursor is None:
if values:
rows = await self.connection.execute(stmt, *values)
else:
rows = await self.connection.execute(stmt)
else:
if values:
rows = await self.connection.fetch(stmt, *values)
else:
rows = await self.connection.fetch(stmt)
for rec in rows:
rows_dict.append(dict(rec))
if prepare and result:
return prepare(result)
return result
if prepare:
return prepare(rows_dict)
return rows_dict or rows
async def fetch(
self,
stmt: str,
values: Any = None,
*,
prepare: Callable | None = None,
) -> Any:
if values:
rows = await self.connection.fetch(stmt, values)
else:
rows = await self.connection.fetch(stmt)
if prepare:
return prepare(rows)
return rows
class NoTransactionSession(PostgresSession):
"""
Session for non-transactional queries.
Acquires connection from pool, executes query, releases back to pool.
Acquires connection from pool per query.
"""

@@ -106,45 +123,34 @@

prepare: Callable | None = None,
cursor: str = "fetchall",
cursor: CursorType = "fetchall",
) -> Any:
stmt = _dialect.convert_placeholders(stmt)
async with self.pool.acquire() as conn:
# Заменить %s на $1...$n dollar-numberic
counter = 1
while "%s" in stmt:
stmt = stmt.replace("%s", "$" + str(counter), 1)
counter += 1
result = await self._do_execute(conn, stmt, values, cursor)
result = _dialect.convert_result(result, cursor)
rows_dict = []
if cursor is None:
if values:
rows = await conn.execute(stmt, *values)
else:
rows = await conn.execute(stmt)
else:
# asyncpg использует fetch вместо fetchall
cursor_method = "fetch" if cursor == "fetchall" else cursor
if values:
rows = await getattr(conn, cursor_method)(stmt, *values)
else:
rows = await getattr(conn, cursor_method)(stmt)
if rows:
for rec in rows:
rows_dict.append(dict(rec))
if prepare and result:
return prepare(result)
return result
if prepare and rows_dict:
return prepare(rows_dict)
return rows_dict or rows
class NoTransactionNoPoolSession(PostgresSession):
"""
Session without pool.
Opens single connection, executes query, closes connection.
Used for administrative tasks like creating databases.
Opens connection, executes, closes. For admin tasks.
"""
@classmethod
async def get_connection(
cls, settings: PostgresPoolSettings
) -> "asyncpg.Connection":
"""Create new connection without pool."""
import asyncpg
return await asyncpg.connect(**settings.model_dump())
@classmethod
async def execute(
cls,
settings,
settings: PostgresPoolSettings,
stmt: str,

@@ -158,18 +164,14 @@ values: Any = None,

if values:
await conn.execute(stmt, values)
else:
await conn.execute(stmt)
try:
if values:
await conn.execute(stmt, values)
else:
await conn.execute(stmt)
rows = await getattr(conn, cursor)()
await conn.close()
if prepare:
return prepare(rows)
return rows
rows = await getattr(conn, cursor)()
@classmethod
async def get_connection(cls, settings: PostgresPoolSettings):
conn: "asyncpg.Connection" = await asyncpg.connect(
**settings.model_dump()
)
return conn
if prepare:
return prepare(rows)
return rows
finally:
await conn.close()
"""PostgreSQL transaction management."""
from contextvars import ContextVar
try:

@@ -13,2 +15,13 @@ import asyncpg

# Context variable для хранения текущей сессии транзакции
_current_session: ContextVar["TransactionSession | None"] = ContextVar(
"current_session", default=None
)
def get_current_session() -> "TransactionSession | None":
"""Получить текущую сессию из контекста (если есть активная транзакция)."""
return _current_session.get()
class ContainerTransaction:

@@ -21,6 +34,10 @@ """

Автоматически устанавливает текущую сессию в contextvars,
так что методы ORM могут использовать её без явной передачи.
Example:
async with ContainerTransaction(pool) as session:
await session.execute("INSERT INTO users ...")
await session.execute("INSERT INTO orders ...")
# Или без явной передачи session:
await User.create(payload=user) # session подставится из контекста
# Commits on exit

@@ -38,2 +55,3 @@ """

self.pool = pool
self._token = None

@@ -50,5 +68,12 @@ async def __aenter__(self):

# Устанавливаем текущую сессию в контекст
self._token = _current_session.set(self.session)
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Сбрасываем контекст
if self._token is not None:
_current_session.reset(self._token)
if exc_type is not None:

@@ -55,0 +80,0 @@ # Выпало исключение вызвать ролбек

@@ -373,2 +373,70 @@ """ORM field definitions."""

# class Many2manyAccessor[T: "DotModel"]:
# """
# Accessor для работы с M2M полем на экземпляре модели.
# Позволяет использовать удобный синтаксис:
# await chat.member_ids.link([user1_id, user2_id])
# await chat.member_ids.unlink([user_id])
# Вместо:
# await chat.link_many2many(field=Chat.member_ids, values=[[chat.id, user_id]])
# """
# __slots__ = ("_instance", "_field", "_data")
# def __init__(
# self,
# instance: "DotModel",
# field: "Many2many[T]",
# data: list[T] | None = None,
# ):
# self._instance = instance
# self._field = field
# self._data = data
# async def link(self, ids: list[int], session=None):
# """
# Добавить связи M2M.
# Args:
# ids: Список ID записей для связывания
# session: Сессия БД
# Example:
# await chat.member_ids.link([user1_id, user2_id])
# """
# values = [[self._instance.id, id] for id in ids]
# return await self._instance.link_many2many(
# self._field, values, session
# )
# async def unlink(self, ids: list[int], session=None):
# """
# Удалить связи M2M.
# Args:
# ids: Список ID записей для отвязывания
# session: Сессия БД
# Example:
# await chat.member_ids.unlink([user_id])
# """
# return await self._instance.unlink_many2many(self._field, ids, session)
# # Поддержка итерации по загруженным данным
# def __iter__(self):
# if self._data is None:
# return iter([])
# return iter(self._data)
# def __len__(self):
# if self._data is None:
# return 0
# return len(self._data)
# def __bool__(self):
# return self._data is not None and len(self._data) > 0
class Many2many[T: "DotModel"](Field[list[T]]):

@@ -397,3 +465,17 @@ """Many-to-many relation field."""

# def __get__(
# self, instance: "DotModel | None", owner: type
# ) -> "Many2many[T] | Many2manyAccessor[T]":
# if instance is None:
# # Доступ через класс — возвращаем дескриптор (для get_fields и т.д.)
# return self
# # Доступ через экземпляр — возвращаем accessor
# # Получаем данные если они были загружены в __dict__
# data = instance.__dict__.get(self._field_name)
# return Many2manyAccessor(instance, self, data)
# def __set_name__(self, owner: type, name: str):
# self._field_name = name
class One2many[T: "DotModel"](Field[list[T]]):

@@ -400,0 +482,0 @@ """One-to-many relation field."""

@@ -18,2 +18,3 @@ """DotModel - main ORM model class."""

from .components.dialect import POSTGRES, Dialect

@@ -26,9 +27,16 @@

# import asynch
from .databases.mysql.session import (
NoTransactionSession as MysqlNoTransactionSession,
)
# from .databases.mysql.session import (
# NoTransactionSession as MysqlNoTransactionSession,
# )
from .databases.postgres.session import (
NoTransactionSession as PostgresNoTransactionSession,
)
# from .databases.clickhouse.session import (
# NoTransactionSession as ClickhouseNoTransactionSession,
# )
from .fields import (

@@ -50,2 +58,3 @@ AttachmentOne2many,

CREATE = 3
UPDATE = 4

@@ -128,10 +137,9 @@

# use for default usage in orm (without explicit set)
_pool: ClassVar[Union["aiomysql.Pool", "asyncpg.Pool"]]
_pool: ClassVar["asyncpg.Pool | None"]
# class that implement no transaction execute
# single connection -> execute -> release connection to pool
# use for default usage in orm (without explicit set)
# _no_transaction: ClassVar[
# Type[MysqlNoTransactionSession | PostgresNoTransactionSession]
# ]
_no_transaction: ClassVar[Type]
_no_transaction: Type[PostgresNoTransactionSession] = (
PostgresNoTransactionSession
)
# base validation schema for routers endpoints

@@ -240,3 +248,22 @@ # __schema__: ClassVar[Type]

@classmethod
def _get_db_session(cls):
def _get_db_session(cls, session=None):
"""
Получить сессию БД.
Приоритет:
1. Явно переданная session
2. Сессия из контекста транзакции (contextvars)
3. NoTransaction сессия (автокоммит)
"""
if session is not None:
return session
# Проверяем контекст транзакции
from .databases.postgres.transaction import get_current_session
ctx_session = get_current_session()
if ctx_session is not None:
return ctx_session
# Fallback на NoTransaction
return cls._no_transaction(cls._pool)

@@ -280,3 +307,6 @@

def get_fields(cls) -> dict[str, Field]:
"""Основная функция, которая возвращает все поля модели."""
"""Возвращает только собственные поля класса (без унаследованных).
Для получения всех полей включая унаследованные используйте get_all_fields().
"""
return {

@@ -289,2 +319,32 @@ attr_name: attr

@classmethod
def get_all_fields(cls) -> dict[str, Field]:
"""
Возвращает все поля модели, включая унаследованные из миксинов и родительских классов.
Использует MRO (Method Resolution Order) для сбора полей из всей цепочки наследования.
Поля из дочерних классов переопределяют поля из родительских.
Returns:
dict[str, Field]: Словарь {имя_поля: объект_Field}
Example:
class AuditMixin:
created_at = Datetime()
class Lead(AuditMixin, DotModel):
name = Char()
Lead.get_all_fields() # {'created_at': Datetime, 'name': Char}
Lead.get_fields() # {'name': Char} - только собственные
"""
fields = {}
for klass in reversed(cls.__mro__):
if klass is object:
continue
for attr_name, attr in klass.__dict__.items():
if isinstance(attr, Field):
fields[attr_name] = attr
return fields
@classmethod
def get_compute_fields(cls):

@@ -531,3 +591,3 @@ """Только те поля, которые имеют связи. Ассоциации."""

fields_json[field_name] = field.json()
elif mode == JsonMode.CREATE:
elif mode == JsonMode.CREATE or mode == JsonMode.UPDATE:
fields_json[field_name] = field.id

@@ -534,0 +594,0 @@

@@ -106,4 +106,3 @@ """DDL Mixin - provides table creation functionality."""

"""Метод для создания таблицы в базе данных, основанной на атрибутах класса."""
if session is None:
session = cls._get_db_session()
session = cls._get_db_session(session)

@@ -151,3 +150,7 @@ # описание поля для создания в бд со всеми аттрибутами

# создание индекса для поля с index=True
if field.index and not field.primary_key and not field.unique:
if (
field.index
and not field.primary_key
and not field.unique
):
index_name = f"idx_{cls.__table__}_{field_name}"

@@ -201,3 +204,4 @@ index_statements.append(

field_exist = await session.execute(sql)
if field_exist == "SELECT 0":
# field_exist будет пустым списком [] если колонки нет
if not field_exist:
await session.execute(

@@ -204,0 +208,0 @@ f"""ALTER TABLE {cls.__table__} ADD COLUMN {field_declaration};"""

@@ -14,2 +14,3 @@ """Many2many ORM operations mixin."""

from ...fields import Field, Many2many, Many2one, One2many
from ...decorators import hybridmethod

@@ -53,4 +54,3 @@

fields = []
if session is None:
session = cls._get_db_session()
session = cls._get_db_session(session)
# защита, оставить только те поля, которые действительно хранятся в базе

@@ -91,9 +91,9 @@ fields_store = [

@classmethod
@hybridmethod
async def link_many2many(
cls, field: Many2many, values: list, session=None
self, field: Many2many, values: list, session=None
):
"""Link records in M2M relation."""
if session is None:
session = cls._get_db_session()
cls = self.__class__
session = cls._get_db_session(session)
query_placeholders = ", ".join(["%s"] * len(values[0]))

@@ -110,4 +110,3 @@ stmt = f"""INSERT INTO {field.many2many_table}

"""Unlink records from M2M relation."""
if session is None:
session = cls._get_db_session()
session = cls._get_db_session(session)
args: str = ",".join(["%s"] * len(ids))

@@ -114,0 +113,0 @@ stmt = f"DELETE FROM {field.many2many_table} WHERE {field.column1} in ({args})"

"""Primary ORM operations mixin."""
from typing import TYPE_CHECKING, Self
from typing import TYPE_CHECKING, Self, TypeVar
if TYPE_CHECKING:
from ..protocol import DotModelProtocol
from ...model import DotModel

@@ -14,4 +15,9 @@ _Base = DotModelProtocol

from ...model import JsonMode
from ...decorators import hybridmethod
# TypeVar for generic payload - accepts any DotModel subclass
_M = TypeVar("_M", bound="DotModel")
class OrmPrimaryMixin(_Base):

@@ -36,11 +42,10 @@ """

async def delete(self, session=None):
if session is None:
session = self._get_db_session()
session = self._get_db_session(session)
stmt = self._builder.build_delete()
return await session.execute(stmt, [self.id])
@classmethod
async def delete_bulk(cls, ids: list[int], session=None):
if session is None:
session = cls._get_db_session()
@hybridmethod
async def delete_bulk(self, ids: list[int], session=None):
cls = self.__class__
session = cls._get_db_session(session)
stmt = cls._builder.build_delete_bulk(len(ids))

@@ -51,8 +56,7 @@ return await session.execute(stmt, ids)

self,
payload: Self | None = None,
payload: "_M | None" = None,
fields=None,
session=None,
):
if session is None:
session = self._get_db_session()
session = self._get_db_session(session)
if payload is None:

@@ -69,2 +73,3 @@ payload = self

only_store=True,
mode=JsonMode.UPDATE,
)

@@ -77,2 +82,3 @@ else:

only_store=True,
mode=JsonMode.UPDATE,
)

@@ -83,11 +89,11 @@

@classmethod
@hybridmethod
async def update_bulk(
cls,
self,
ids: list[int],
payload: Self,
payload: _M,
session=None,
):
if session is None:
session = cls._get_db_session()
cls = self.__class__
session = cls._get_db_session(session)

@@ -105,6 +111,6 @@ # Сериализация в ORM слое

@classmethod
async def create(cls, payload: Self, session=None):
if session is None:
session = cls._get_db_session()
@hybridmethod
async def create(self, payload: _M, session=None) -> int:
cls = self.__class__
session = cls._get_db_session(session)

@@ -133,6 +139,6 @@ # Сериализация в ORM слое

@classmethod
async def create_bulk(cls, payload: list[Self], session=None):
if session is None:
session = cls._get_db_session()
@hybridmethod
async def create_bulk(self, payload: list[_M], session=None):
cls = self.__class__
session = cls._get_db_session(session)

@@ -162,8 +168,6 @@ # Исключаем primary_key поля

@classmethod
async def get(
cls, id, fields: list[str] = [], session=None
) -> Self | None:
if session is None:
session = cls._get_db_session()
@hybridmethod
async def get(self, id, fields: list[str] = [], session=None) -> Self:
cls = self.__class__
session = cls._get_db_session(session)

@@ -176,10 +180,11 @@ stmt, values = cls._builder.build_get(id, fields)

if not record:
return None
# return None
raise ValueError("Record not found")
assert isinstance(record, cls)
return record
@classmethod
async def table_len(cls, session=None) -> int:
if session is None:
session = cls._get_db_session()
@hybridmethod
async def table_len(self, session=None) -> int:
cls = self.__class__
session = cls._get_db_session(session)
stmt, values = cls._builder.build_table_len()

@@ -186,0 +191,0 @@

"""Relations ORM operations mixin."""
import asyncio
from typing import TYPE_CHECKING, Any, Literal, Self
from typing import TYPE_CHECKING, Any, Literal, Self, TypeVar
from ...decorators import hybridmethod
if TYPE_CHECKING:
from ..protocol import DotModelProtocol
from ...model import DotModel

@@ -13,2 +16,5 @@ _Base = DotModelProtocol

# TypeVar for generic payload - accepts any DotModel subclass
_M = TypeVar("_M", bound="DotModel")
from ...builder.request_builder import (

@@ -50,5 +56,5 @@ FilterExpression,

@classmethod
@hybridmethod
async def search(
cls,
self,
fields: list[str] = ["id"],

@@ -64,4 +70,4 @@ start: int | None = None,

) -> list[Self]:
if session is None:
session = cls._get_db_session()
cls = self.__class__
session = cls._get_db_session(session)

@@ -75,3 +81,5 @@ # Use dialect from class

prepare = cls.prepare_list_ids if not raw else None
records: list[Self] = await session.execute(stmt, values, prepare=prepare)
records: list[Self] = await session.execute(
stmt, values, prepare=prepare
)

@@ -102,4 +110,3 @@ # если есть хоть одна запись и вообще нужно читать поля связей

fields = []
if session is None:
session = cls._get_db_session()
session = cls._get_db_session(session)

@@ -314,7 +321,6 @@ dialect = cls._dialect

async def update_with_relations(
self, payload: Self, fields=[], session=None
self, payload: _M, fields=[], session=None
):
"""Update record with relations."""
if session is None:
session = self._get_db_session()
session = self._get_db_session(session)

@@ -337,4 +343,6 @@ # Handle attachments

field_obj["res_id"] = self.id
# Оборачиваем dict в объект модели
attachment_payload = field.relation_table(**field_obj)
attachment_id = await field.relation_table.create(
field_obj, session
attachment_payload, session
)

@@ -341,0 +349,0 @@ setattr(payload, name, attachment_id)

@@ -41,3 +41,3 @@ """Protocols defining what ORM mixins expect from the model class."""

@classmethod
def _get_db_session(cls) -> Any: ...
def _get_db_session(cls, session=None) -> Any: ...

@@ -44,0 +44,0 @@ # Field introspection

Metadata-Version: 2.4
Name: dotorm
Version: 2.0.5
Version: 2.0.6
Summary: Async Python ORM for PostgreSQL, MySQL and ClickHouse with dot-notation access

@@ -5,0 +5,0 @@ Project-URL: Homepage, https://github.com/shurshilov/dotorm

@@ -7,3 +7,3 @@ [build-system]

name = "dotorm"
version = "2.0.5"
version = "2.0.6"
description = "Async Python ORM for PostgreSQL, MySQL and ClickHouse with dot-notation access"

@@ -10,0 +10,0 @@ readme = "README.md"