
Product
Introducing Tier 1 Reachability: Precision CVE Triage for Enterprise Teams
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
A Python library for database connections and ORM queries with support for multiple database engines including SQLite, PostgreSQL, MySQL, MSSQL, Oracle, and MongoDB.
.env
file fallbackautoflush = False
expire_on_commit = False
echo = False
Note: All constructor parameters are optional and fall back to .env file variables.
pip install ddcDatabases
Note: The basic installation includes only SQlite. Database-specific drivers are optional extras that you can install as needed.
Install only the database drivers you need:
# All database drivers (recommended for development)
pip install ddcDatabases[all]
# SQL Server / MSSQL
pip install ddcDatabases[mssql]
# MySQL / MariaDB
pip install ddcDatabases[mysql]
# PostgreSQL
pip install ddcDatabases[pgsql]
# Oracle Database
pip install ddcDatabases[oracle]
# MongoDB
pip install ddcDatabases[mongodb]
# Multiple databases (example)
pip install ddcDatabases[mysql,pgsql,mongodb]
Available Database Extras:
all
- All database driversmssql
- Microsoft SQL Server (pyodbc, aioodbc)mysql
- MySQL/MariaDB (pymysql, aiomysql)pgsql
- PostgreSQL (psycopg2-binary, asyncpg)oracle
- Oracle Database (cx-oracle)mongodb
- MongoDB (pymongo)Platform Notes:
Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, Sqlite
from your_models import Model # Your SQLAlchemy model
with Sqlite(filepath="data.db") as session:
db_utils = DBUtils(session)
stmt = sa.select(Model).where(Model.id == 1)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, MSSQL
from your_models import Model
kwargs = {
"host": "127.0.0.1",
"port": 1433,
"user": "sa",
"password": "password",
"database": "master",
"db_schema": "dbo",
"echo": True,
"autoflush": True,
"expire_on_commit": True,
"autocommit": True,
"connection_timeout": 30,
"pool_recycle": 3600,
"pool_size": 25,
"max_overflow": 50,
}
with MSSQL(**kwargs) as session:
stmt = sa.select(Model).where(Model.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Asynchronous Example:
import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MSSQL
from your_models import Model
async def main():
async with MSSQL(**kwargs) as session:
stmt = sa.select(Model).where(Model.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
asyncio.run(main())
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, PostgreSQL
from your_models import Model
kwargs = {
"host": "127.0.0.1",
"port": 5432,
"user": "postgres",
"password": "postgres",
"database": "postgres",
"echo": True,
"autoflush": False,
"expire_on_commit": False,
"autocommit": True,
"connection_timeout": 30,
"pool_recycle": 3600,
"pool_size": 25,
"max_overflow": 50,
}
with PostgreSQL(**kwargs) as session:
stmt = sa.select(Model).where(Model.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Asynchronous Example:
import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, PostgreSQL
from your_models import Model
async def main():
async with PostgreSQL(**kwargs) as session:
stmt = sa.select(Model).where(Model.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
asyncio.run(main())
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, MySQL
kwargs = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "root",
"database": "dev",
"echo": True,
"autoflush": False,
"expire_on_commit": False,
"autocommit": True,
"connection_timeout": 30,
"pool_recycle": 3600,
"pool_size": 25,
"max_overflow": 50,
}
with MySQL(**kwargs) as session:
stmt = sa.text("SELECT * FROM users WHERE id = 1")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Asynchronous Example:
import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MySQL
async def main() -> None:
async with MySQL(**kwargs) as session:
stmt = sa.text("SELECT * FROM users")
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
asyncio.run(main())
Example with explicit credentials:
import sqlalchemy as sa
from ddcDatabases import DBUtils, Oracle
kwargs = {
"host": "127.0.0.1",
"port": 1521,
"user": "system",
"password": "oracle",
"servicename": "xe",
"echo": False,
"autoflush": False,
"expire_on_commit": False,
"autocommit": True,
"connection_timeout": 30,
"pool_recycle": 3600,
"pool_size": 25,
"max_overflow": 50,
}
with Oracle(**kwargs) as session:
stmt = sa.text("SELECT * FROM dual")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Example with explicit credentials:
from ddcDatabases import MongoDB
from bson.objectid import ObjectId
kwargs = {
"host": "127.0.0.1",
"port": 27017,
"user": "admin",
"password": "admin",
"database": "admin",
"collection": "test_collection",
"sort_column": "_id",
"sort_order": "asc", # asc or desc
}
query = {"_id": ObjectId("689c9f71dd642a68cfc60477")}
with MongoDB(**kwargs, query=query) as cursor:
for each in cursor:
print(each)
Access the underlying SQLAlchemy engine for advanced operations:
Synchronous Engine:
from ddcDatabases import PostgreSQL
with PostgreSQL() as session:
engine = session.bind
# Use engine for advanced operations
Asynchronous Engine:
from ddcDatabases import PostgreSQL
async def main():
async with PostgreSQL() as session:
engine = session.bind
# Use engine for advanced operations
The DBUtils
and DBUtilsAsync
classes provide convenient methods for common database operations:
from ddcDatabases import DBUtils, DBUtilsAsync
# Synchronous utilities
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt) # Returns list of RowMapping objects
value = db_utils.fetchvalue(stmt) # Returns single value as string
db_utils.insert(stmt) # Insert into model table
db_utils.deleteall(model) # Delete all records from model
db_utils.insertbulk(model, data_list) # Bulk insert from list of dictionaries
db_utils.execute(stmt) # Execute any SQLAlchemy statement
# Asynchronous utilities (similar interface with await)
db_utils_async = DBUtilsAsync(session)
results = await db_utils_async.fetchall(stmt)
import logging
logging.getLogger('ddcDatabases').setLevel(logging.INFO)
logging.getLogger('ddcDatabases').addHandler(logging.StreamHandler())
poetry build -f wheel
poetry update --with test
poe tests
Released under the MIT License
If you find this project helpful, consider supporting development:
FAQs
Simplified Database ORM Connections
We found that ddcDatabases demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
Research
/Security News
Ongoing npm supply chain attack spreads to DuckDB: multiple packages compromised with the same wallet-drainer malware.
Security News
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.