🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

fast-rdb

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fast-rdb

A high-performance, generic CRUD operations package combining SQLAlchemy async ORM with Redis caching. Features type-safe CRUD operations, automatic cache invalidation, pagination support, and Pydantic integration for schema validation. Ideal for FastAPI backends and read-heavy applications requiring optimized database access.

0.0.2
PyPI
Maintainers
1

FastRDB

A high-performance, generic CRUD operations package combining SQLAlchemy async ORM with Redis caching. Features type-safe CRUD operations, automatic cache invalidation, pagination support, and Pydantic integration for schema validation. Ideal for FastAPI backends and read-heavy applications requiring optimized database access.

Features

  • Type-Safe CRUD Operations: Built with Python type hints and generics for robust type checking
  • Redis Caching: Integrated Redis caching with automatic cache invalidation
  • Async Support: Fully asynchronous operations using SQLAlchemy async ORM
  • Pagination: Built-in pagination support with customizable limits and page numbers
  • Pydantic Integration: Seamless integration with Pydantic for schema validation
  • Generic Base Class: Extensible base class for creating custom CRUD operations
  • Automatic Cache Management: Smart cache invalidation on data modifications

Installation

pip install fast-rdb

Requirements

  • Python 3.10 or higher
  • SQLAlchemy
  • Redis
  • Pydantic
  • orjson

Quick Start

from fast_rdb import CRUDBase
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
from pydantic import BaseModel
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import IntegrityError, NoResultFound

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

class UserCreate(BaseModel):
    name: str
    email: str

class UserUpdate(BaseModel):
    name: str | None = None
    email: str | None = None

class UserResponse(BaseModel):
    id: int
    name: str
    email: str

class UserCRUD(CRUDBase[User, UserCreate, UserUpdate, UserResponse]):
    def __init__(self):
        super().__init__(
            model=User,
            response_schema=UserResponse,
            pattern="user:{id}",
            list_pattern="users:page:{page}:limit:{limit}",
            exp=3600,
            invalidate_pattern_prefix="user:*"
        )

# Usage
async def main():
    db = AsyncSession(...)  # Your SQLAlchemy async session
    redis = Redis(...)      # Your Redis client
    
    user_crud = UserCRUD()
    
    # Create a user
    try:
        new_user = await user_crud.create(
            db=db,
            redis=redis,
            obj_in=UserCreate(name="John Doe", email="john@example.com")
        )
    except IntegrityError:
        raise DuplicateValueException(detail="User already exists")
    
    # Get a user
    user = await user_crud.get(db=db, redis=redis, id=1)
    if not user:
        raise NotFoundException(detail="User not found")
    
    # Update a user
    try:
        updated_user = await user_crud.update(
            db=db,
            redis=redis,
            obj_in=UserUpdate(name="Jane Doe"),
            id=1
        )
    except NoResultFound:
        raise NotFoundException(detail="User not found")
    
    # Delete a user
    try:
        await user_crud.delete(db=db, redis=redis, id=1)
    except NoResultFound:
        raise NotFoundException(detail="User not found")
    
    # Get paginated users
    users = await user_crud.get_multi(
        db=db,
        redis=redis,
        limit=10,
        page=1,
        order_by="name"
    )
    return user_crud.paginate(data=users, limit=10, page=1)

Detailed API Reference

CRUDBase Class

The main class providing CRUD operations with Redis caching.

Initialization Parameters

  • model (Type[ModelType]): SQLAlchemy model class that represents your database table
  • response_schema (Type[ResponseSchemaType]): Pydantic model for response serialization
  • pattern (str): Redis key pattern for single record caching (e.g., "user:{id}")
  • list_pattern (str): Redis key pattern for list caching (e.g., "users:page:{page}:limit:{limit}")
  • exp (int): Redis key expiration time in seconds
  • invalidate_pattern_prefix (str): Pattern prefix for cache invalidation (e.g., "user:*")

Methods

create
async def create(
    db: AsyncSession,
    redis: Redis,
    obj_in: CreateSchemaType,
    **kwargs: Any
) -> ResponseSchemaType

Creates a new record in the database and caches it in Redis.

  • db: SQLAlchemy async session
  • redis: Redis client instance
  • obj_in: Pydantic model with data to create
  • **kwargs: Additional filters for the record
  • Returns: Created record as ResponseSchemaType
  • Raises: IntegrityError if record already exists
get
async def get(
    db: AsyncSession,
    redis: Redis,
    **kwargs: Any
) -> ResponseSchemaType | None

Retrieves a single record from cache or database.

  • db: SQLAlchemy async session
  • redis: Redis client instance
  • **kwargs: Filters to identify the record
  • Returns: Record as ResponseSchemaType or None if not found
get_multi
async def get_multi(
    db: AsyncSession,
    redis: Redis,
    limit: int = 10,
    page: int = 1,
    order_by: Optional[str] = None,
    ascending: bool = True,
    **kwargs: Any
) -> List[ResponseSchemaType]

Retrieves multiple records with pagination and optional ordering.

  • db: SQLAlchemy async session
  • redis: Redis client instance
  • limit: Number of records per page
  • page: Page number
  • order_by: Field to order by
  • ascending: Sort order
  • **kwargs: Additional filters
  • Returns: List of records
update
async def update(
    db: AsyncSession,
    redis: Redis,
    obj_in: UpdateSchemaType,
    **matches: Any
) -> ResponseSchemaType

Updates an existing record.

  • db: SQLAlchemy async session
  • redis: Redis client instance
  • obj_in: Pydantic model with update data
  • **matches: Filters to identify the record
  • Returns: Updated record
  • Raises: NoResultFound if record doesn't exist
delete
async def delete(
    db: AsyncSession,
    redis: Redis,
    **kwargs: Any
) -> None

Deletes a record from database and cache.

  • db: SQLAlchemy async session
  • redis: Redis client instance
  • **kwargs: Filters to identify the record
  • Raises: NoResultFound if record doesn't exist
create_multi
async def create_multi(
    db: AsyncSession,
    instances: list[CreateSchemaType]
) -> Sequence[ModelType]

Creates multiple records in a single transaction.

  • db: SQLAlchemy async session
  • instances: List of Pydantic models to create
  • Returns: List of created records
  • Raises: IntegrityError if any record already exists
paginate
@staticmethod
def paginate(
    data: List[Any],
    limit: int,
    page: int
) -> PaginatedResponse[Any]

Helper method to create paginated responses.

  • data: List of items to paginate
  • limit: Items per page
  • page: Current page number
  • Returns: PaginatedResponse object

Error Handling

The package integrates with common database exceptions:

  • IntegrityError: Raised when trying to create a duplicate record
  • NoResultFound: Raised when a record is not found during update or delete operations

Example error handling:

try:
    result = await user_crud.get(db=db, redis=redis, id=1)
    if not result:
        raise NotFoundException(detail="User not found")
    return result
except NoResultFound:
    raise NotFoundException(detail="User not found")

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

  • Biisal (biisal.int@gmail.com)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Keywords

fastapi

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts