
Security News
Opengrep Adds Apex Support and New Rule Controls in Latest Updates
The latest Opengrep releases add Apex scanning, precision rule tuning, and performance gains for open source static code analysis.
The library provides a standardized way to connect to DB, and Base DAO class implementation
The library provides a standardized way to connect to DB, and Base DAO class implementation
pip install ash-dal
# OR
poetry add ash-dal
In order to install Ash DAL directly from GitHub repository, run:
pip install git+https://github.com/meetash/ash-dal.git@main
# OR
poetry add git+https://github.com/meetash/ash-dal.git@main
There are two options: sync or async database connection.
from ash_dal import Database, URL
from ash_dal.utils import prepare_ssl_context
from models import User
ssl_context = prepare_ssl_context(
ssl_root_dir='/tmp/certs',
client_cert_path='client-cert.pem',
client_key_path='client-key.pem',
server_ca_path='server-ca.pem'
)
db_url = URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
read_replica_db_url = URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3307,
database="my_db",
)
DATABASE = Database(
db_url=db_url,
read_replica_url=read_replica_db_url,
ssl_context=ssl_context,
read_replica_ssl_context=ssl_context
)
def get_users(db: Database):
with db.session as session:
users = session.scalars(User)
return users
from ash_dal import AsyncDatabase, URL
from ash_dal.utils import prepare_ssl_context
from models import User
ssl_context = prepare_ssl_context(
ssl_root_dir='/tmp/certs',
client_cert_path='client-cert.pem',
client_key_path='client-key.pem',
server_ca_path='server-ca.pem'
)
db_url = URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
read_replica_db_url = URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3307,
database="my_db",
)
DATABASE = AsyncDatabase(
db_url=db_url,
read_replica_url=read_replica_db_url,
ssl_context=ssl_context,
read_replica_ssl_context=ssl_context
)
async def async_get_users(db: AsyncDatabase):
async with db.session as session:
users = await session.scalars(User)
return users
Like you can use sync/async Database classes, there are also two variations of DAO Base class
from dataclasses import dataclass
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import BaseDAO, Database, URL
class Base(DeclarativeBase):
pass
class ExampleORMModel(Base):
__tablename__ = "example_table"
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
first_name: Mapped[str] = mapped_column(String(64))
last_name: Mapped[str] = mapped_column(String(64))
age: Mapped[int]
@dataclass()
class ExampleEntity:
id: int
first_name: str
last_name: str
age: int
class ExampleDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
if __name__ == '__main__':
db = Database(
db_url=URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
)
dao = ExampleDAO(database=db)
entity = dao.get_by_pk(pk='some-primary-key')
from dataclasses import dataclass
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import AsyncBaseDAO, AsyncDatabase, URL
import asyncio
class Base(DeclarativeBase):
pass
class ExampleORMModel(Base):
__tablename__ = "example_table"
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
first_name: Mapped[str] = mapped_column(String(64))
last_name: Mapped[str] = mapped_column(String(64))
age: Mapped[int]
@dataclass()
class ExampleEntity:
id: int
first_name: str
last_name: str
age: int
class ExampleDAO(AsyncBaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
if __name__ == '__main__':
db = AsyncDatabase(
db_url=URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
)
dao = ExampleDAO(database=db)
entity = asyncio.run(dao.get_by_pk(pk='some-primary-key'))
The main goal of BaseDAO class is to provide CRUD methods that could be useful while building a basic CRUD API.
BaseDAO
class provides the following default methods:
BaseDAO.get_by_pk(pk)
- Using this method you can fetch an entity by its primary key.
entity = dao.get_by_pk(pk='AWABCD1234')
BaseDAO.all
- Using this method you can fetch all entities from the database. It's might be useful for fetching
data from small tables where you don't actually need pagination (configs etc)
entities = dao.get_all()
BaseDAO.get_page(page_index, [page_size])
- Fetch a page with entities by page index. If the index is out of range,
an empty page will be returned. If page_size
is not passed - the default page size (20) will be applied.
page = dao.get_page(page_index=2, page_size=10)
for entity in page:
# Do some stuff with entity
...
BaseDAO.paginate([specification, page_size])
- An iterator that returns pages with entities. A specification
can be applied to fetch filtered data.
for page in dao.paginate(specification={'status': 'notified'}, page_size=15):
# Do some stuff with page
...
BaseDAO.filter(specification)
- Fetch entities from database by specification. It's might be useful for fetching
filtered data from small tables where you don't actually need pagination (configs etc)
entities = dao.filter(specification={'labId': 2})
BaseDAO.create(data)
- Create an entity in database based on passed data. Returns back an entity
data = {'foo': 'bar'}
entity = dao.create(data=data)
BaseDAO.bulk_create(data)
- Create multiple entities within one query. Unlike the previous method
this one doesn't return anything.
data = [{'foo': 'bar'}, {'foo': 'beer'}]
dao.bulk_create(data=data)
BaseDAO.update(specification, update_data)
- Patch entity(ies) by specification.
update_data = {'foo': 'bar'}
is_updated = dao.update(specification={'foo': 'beer'}, update_data=update_data)
BaseDAO.delete(specification)
- Remove entity(ies) by specification.
is_removed = dao.delete(specification={'id': 'some-id'},)
By default the BaseDAO class uses the simplest pagination strategy that is based on SQL offset & limit mechanisms. The library provides one more strategy out of the box that is called deferred join pagination. You can learn more about it here.
If you want to change the pagination strategy for you DAO class, you can do it by re-defining
the __paginator_factory__
field inside your DAO class:
from ash_dal import BaseDAO, DeferredJoinPaginator
from ash_dal.utils import DeferredJoinPaginatorFactory, AsyncDeferredJoinPaginator
class ExampleEntity:
...
class ExampleORMModel:
id: int
...
class ExampleDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
__paginator_factory__ = DeferredJoinPaginatorFactory(
paginator_class=DeferredJoinPaginator,
pk_field=ExampleORMModel.id,
)
# OR async
class ExampleAsyncDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
__paginator_factory__ = DeferredJoinPaginatorFactory(
paginator_class=AsyncDeferredJoinPaginator,
pk_field=ExampleORMModel.id,
)
You can also define your own pagination strategy. Be aware that your paginator class should implement IPaginator or IAsyncPaginator interfaces:
from ash_dal.utils.paginator.interface import IPaginator, IAsyncPaginator
from ash_dal import PaginatorPage
import typing as t
class ExampleORM:
...
class MyPaginator(IPaginator[ExampleORM]):
def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
# Do page fetching
...
def paginate(self) -> t.Iterator[PaginatorPage[ExampleORM]]:
# Do pagination
...
@property
def size(self) -> int:
# Get pages count
return 10
# or async paginator
class MyAsyncPaginator(IAsyncPaginator[ExampleORM]):
async def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
# Do page fetching asynchronously
...
async def paginate(self) -> t.AsyncIterator[PaginatorPage[ExampleORM]]:
# Do pagination asynchronously
...
@property
async def size(self) -> int:
# Get pages count asynchronously
return 10
FAQs
The library provides a standardized way to connect to DB, and Base DAO class implementation
We found that ash-dal demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The latest Opengrep releases add Apex scanning, precision rule tuning, and performance gains for open source static code analysis.
Security News
npm now supports Trusted Publishing with OIDC, enabling secure package publishing directly from CI/CD workflows without relying on long-lived tokens.
Research
/Security News
A RubyGems malware campaign used 60 malicious packages posing as automation tools to steal credentials from social media and marketing tool users.