ff-storage

A comprehensive storage package for Fenixflow applications, providing async connection pools for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, S3-compatible services, and Azure Blob Storage.
Created by Ben Moag at Fenixflow
🔥 Version 4.6.3 - Latest Release
What's New in v4.6.x:
- v4.6.3 - Fixed false positive schema drift for float types + added PK/FK introspection to schema sync
- v4.6.2 - Documentation updates
- v4.6.1 - Fixed JSONB field serialization in
update() methods (NoneStrategy & CopyOnChangeStrategy)
- v4.6.0 - Fixed false positive schema drift for SQL function defaults (case sensitivity:
now() vs NOW())
- v4.5.0 - Field introspection methods (
get_base_fields(), get_system_fields(), get_user_fields()) and computed field exclusion from DB operations
🎯 Version 4.4.0 - Multi-Tenant Permissive Scope
Flexible multi-tenant access with separate tenant_id and tenant_ids parameters:
- 🔒 Strict Scope (
tenant_id): Single UUID - forces tenant_id on writes, strict isolation for broker/UW operations
- 🌐 Permissive Scope (
tenant_ids): List of UUIDs - validates writes, enables admin cross-tenant queries
- ✅ Clear Semantics - Different behavior for single vs multi-tenant use cases
repo = PydanticRepository(Product, db_pool, tenant_id=org_id)
repo_admin = PydanticRepository(Product, db_pool, tenant_ids=[tenant1, tenant2])
🎉 Version 3.0.0 - Pydantic ORM & Temporal Data Management
Major features in v3.0.0: Production-ready Pydantic ORM with built-in temporal data management!
- 🔥 Pydantic Models - Type-safe models with automatic schema generation
- ⏱️ Temporal Strategies - Choose from 3 strategies: none, copy_on_change (audit trail), scd2 (time travel)
- 🎯 Multi-Tenant by Default - Automatic tenant_id injection and filtering
- 📝 Audit Trails - Field-level change tracking with copy_on_change
- ⏰ Time Travel - Query historical data with scd2 strategy
- 🔧 Rich Field Metadata - Complete SQL control (FK, CHECK, defaults, partial indexes)
- 🚀 Auto-Sync Schema - SchemaManager now creates auxiliary tables (audit tables)
📚 v3.0.0 Documentation | ⚡ Quickstart Guide | 🎯 Strategy Selection
Backwards Compatible: All v2 features work unchanged. v3 is fully opt-in.
Version 2.0.0 - Schema Sync System
Added in 2.0.0: Terraform-like automatic schema synchronization! Define schema in model classes and let SchemaManager handle migrations automatically.
Important: v3.3.0 fixes critical false positive detection bugs in schema sync. Upgrade from v2.x/v3.2.x immediately.
Breaking Change in 2.0.0: Removed file-based migrations (MigrationManager). Use SchemaManager for automatic schema sync from model definitions.
New in 1.1.0: Added Azure Blob Storage backend with support for both Azurite (local development) and production Azure Blob Storage.
Quick Start
Installation
From PyPI
pip install ff-storage
From GitLab
pip install git+https://gitlab.com/fenixflow/fenix-packages.git
Async Pool (FastAPI, Production)
from ff_storage.db import PostgresPool
pool = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
min_size=10,
max_size=20
)
await pool.connect()
results = await pool.fetch_all("SELECT id, title, status FROM documents WHERE status = $1", "active")
print(results[0]['title'])
user = await pool.fetch_one("SELECT id, name, email FROM users WHERE id = $1", 123)
await pool.disconnect()
Sync Connection (Scripts, Simple Apps)
from ff_storage.db import Postgres
db = Postgres(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432
)
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %(status)s", {"status": "active"})
print(results[0]['title'])
db.close_connection()
FastAPI Integration
from fastapi import FastAPI
from ff_storage.db import PostgresPool
app = FastAPI()
app.state.db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
min_size=10,
max_size=20
)
@app.on_event("startup")
async def startup():
await app.state.db.connect()
@app.on_event("shutdown")
async def shutdown():
await app.state.db.disconnect()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await app.state.db.fetch_one(
"SELECT * FROM users WHERE id = $1", user_id
)
return user
Migration Guide
v3.2.x → v3.3.0 (Backward Compatible)
No action required - Internal architecture improvements only.
Changes:
- Schema normalization now centralized (eliminates false positives)
- WHERE clause parser added (fixes precedence bugs)
- All public APIs unchanged
Impact: If using schema sync, upgrade immediately to eliminate false positives causing index recreation on every run.
v2.x → v3.0.0 (Breaking - Pydantic ORM)
See docs/quickstart_v3.md for full migration guide.
v0.x → v1.0.0 (Breaking - Async Pools)
Pools are now async - all *Pool classes require await. Use direct connections for sync code (Postgres, MySQL, SQLServer - unchanged).
Features
Database Operations
- Async Connection Pools: High-performance async pools for PostgreSQL, MySQL, and SQL Server
- Sync Direct Connections: Simple sync connections for scripts and non-async code
- Multi-Database Support: Uniform interface across PostgreSQL, MySQL, and Microsoft SQL Server
- Transaction Management: Built-in support for transactions with rollback
- Batch Operations: Execute many queries efficiently
- Query Builder: SQL query construction utilities
Schema Sync System (v2.0.0+, Fixed in v3.3.0)
- Production-Ready: v3.3.0 fixes critical false positive detection bugs
- Normalization Framework: Centralized schema comparison (eliminates index churn)
- WHERE Clause Parser: SQL AST parsing with proper precedence handling
- Terraform-like Migrations: Define schema in code, auto-sync on startup
- Automatic Detection: Detects schema changes from model definitions
- Safe by Default: Additive changes auto-apply, destructive changes require explicit approval
- Dry Run Mode: Preview changes without applying them
- Transaction-Wrapped: All changes in single atomic transaction
- Provider Detection: Auto-detects PostgreSQL, MySQL, or SQL Server
Object Storage
- Multiple Backends: Local filesystem, S3/S3-compatible services, and Azure Blob Storage
- Async Operations: Non-blocking I/O for better performance
- Streaming Support: Handle large files without memory overhead
- Atomic Writes: Safe file operations with temp file + rename
- Metadata Management: Store and retrieve metadata with objects
Core Components
Database Connections
PostgreSQL with Connection Pooling
from ff_storage import PostgresPool
db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
pool_size=20
)
db.connect()
try:
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
print(results[0]['title'])
new_id = db.execute_query(
"INSERT INTO documents (title) VALUES (%s) RETURNING id",
{"title": "New Document"}
)
db.begin_transaction()
try:
db.execute("UPDATE documents SET status = %s WHERE id = %s", {"status": "archived", "id": 123})
db.execute("INSERT INTO audit_log (action) VALUES (%s)", {"action": "archive"})
db.commit_transaction()
except Exception:
db.rollback_transaction()
raise
finally:
db.close_connection()
MySQL with Connection Pooling
from ff_storage import MySQLPool
db = MySQLPool(
dbname="fenix_db",
user="root",
password="password",
host="localhost",
port=3306,
pool_size=10
)
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
print(results[0]['title'])
db.close_connection()
Microsoft SQL Server with Connection Pooling
from ff_storage import SQLServerPool
db = SQLServerPool(
dbname="fenix_db",
user="sa",
password="YourPassword123",
host="localhost",
port=1433,
driver="ODBC Driver 18 for SQL Server",
pool_size=10
)
db.connect()
try:
results = db.read_query("SELECT id, title, status FROM documents WHERE status = ?", {"status": "active"})
print(results[0]['title'])
new_id = db.execute_query(
"INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)",
{"title": "New Document"}
)
if db.table_exists("users", schema="dbo"):
columns = db.get_table_columns("users", schema="dbo")
finally:
db.close_connection()
Object Storage
Local Filesystem Storage
from ff_storage import LocalObjectStorage
import asyncio
async def main():
storage = LocalObjectStorage("/var/data/documents")
await storage.write(
"reports/2025/quarterly.pdf",
pdf_bytes,
metadata={"content-type": "application/pdf", "author": "system"}
)
data = await storage.read("reports/2025/quarterly.pdf")
exists = await storage.exists("reports/2025/quarterly.pdf")
files = await storage.list_keys(prefix="reports/2025/")
await storage.delete("reports/2025/quarterly.pdf")
asyncio.run(main())
S3-Compatible Storage
from ff_storage import S3ObjectStorage
import asyncio
async def main():
s3 = S3ObjectStorage(
bucket="fenix-documents",
region="us-east-1"
)
s3 = S3ObjectStorage(
bucket="fenix-documents",
endpoint_url="http://localhost:9000",
access_key="minioadmin",
secret_key="minioadmin"
)
await s3.write("docs/report.pdf", pdf_bytes)
async for chunk in s3.read_stream("large_file.bin", chunk_size=8192):
await process_chunk(chunk)
await s3.write("huge_file.bin", huge_data)
asyncio.run(main())
Azure Blob Storage
from ff_storage import AzureBlobObjectStorage
from azure.identity import DefaultAzureCredential
import asyncio
async def main():
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
connection_string="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
connection_string="DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=...;EndpointSuffix=core.windows.net",
prefix="documents/"
)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
account_url="https://mystorageaccount.blob.core.windows.net"
)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
account_url="https://mystorageaccount.blob.core.windows.net",
credential=DefaultAzureCredential()
)
await storage.write(
"reports/2025/quarterly.pdf",
pdf_bytes,
metadata={"content-type": "application/pdf", "author": "system"}
)
data = await storage.read("reports/2025/quarterly.pdf")
async for chunk in storage.read_stream("large_file.bin", chunk_size=8192):
await process_chunk(chunk)
exists = await storage.exists("reports/2025/quarterly.pdf")
files = await storage.list_keys(prefix="reports/2025/")
metadata = await storage.get_metadata("reports/2025/quarterly.pdf")
print(metadata["content-type"])
await storage.delete("reports/2025/quarterly.pdf")
asyncio.run(main())
Note: Azure Blob Storage has restrictions on metadata keys (must be valid C# identifiers). The implementation automatically converts hyphens to underscores (e.g., content-type becomes content_type) when storing and converts them back when retrieving.
Schema Sync (Terraform-like Migrations)
from ff_storage import Postgres, SchemaManager, PydanticModel, Field
class Document(PydanticModel):
__table_name__ = "documents"
__schema__ = "public"
title: str = Field(max_length=255)
content: str | None = None
status: str = Field(default="draft", max_length=50)
db = Postgres(dbname="mydb", user="user", password="pass", host="localhost", port=5432)
db.connect()
manager = SchemaManager(db)
print("Preview of changes:")
manager.sync_schema(
models=[Document],
allow_destructive=False,
dry_run=True
)
changes_applied = manager.sync_schema(
models=[Document],
allow_destructive=False,
dry_run=False
)
print(f"Applied {changes_applied} schema changes")
Features:
- Automatic Detection: Detects new tables, missing columns, and indexes
- Safe by Default: Additive changes (CREATE, ADD) auto-apply; destructive changes (DROP) require explicit flag
- Dry Run Mode: Preview all changes before applying
- Transaction-Wrapped: All changes in a single atomic transaction
- Provider-Agnostic: Works with PostgreSQL (full support), MySQL/SQL Server (stubs for future implementation)
Advanced Features
Transaction Management
async def transfer_ownership(db, doc_id, new_owner_id):
db.begin_transaction()
try:
db.execute("UPDATE documents SET owner_id = %s WHERE id = %s",
{"owner_id": new_owner_id, "id": doc_id})
db.execute("INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)",
{"action": "transfer", "doc_id": doc_id, "user_id": new_owner_id})
db.commit_transaction()
except Exception as e:
db.rollback_transaction()
raise
Connection Pool Monitoring
pool = PostgresPool(...)
open_connections = pool.get_open_connections()
print(f"Open connections: {open_connections}")
pool.close_all_connections()
Query Builder Utilities
from ff_storage.db.sql import build_insert, build_update, build_select
query, params = build_insert("documents", {
"title": "New Doc",
"status": "draft"
})
query, params = build_update("documents",
{"status": "published"},
{"id": doc_id}
)
query, params = build_select("documents",
columns=["id", "title"],
where={"status": "published", "author_id": user_id}
)
Error Handling
from ff_storage.exceptions import StorageError, DatabaseError
try:
db.connect()
results = db.read_query("SELECT * FROM documents")
except DatabaseError as e:
print(f"Database error: {e}")
except StorageError as e:
print(f"Storage error: {e}")
finally:
db.close_connection()
Testing
pytest tests/
pytest --cov=ff_storage tests/
pytest tests/test_postgres.py
pytest -v tests/
Configuration
Environment Variables
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=fenix_db
export DB_USER=fenix
export DB_PASSWORD=secret
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
export AWS_DEFAULT_REGION=us-east-1
export STORAGE_PATH=/var/data/documents
Configuration File
from ff_storage import PostgresPool, S3ObjectStorage
DATABASE = {
"dbname": os.getenv("DB_NAME", "fenix_db"),
"user": os.getenv("DB_USER", "fenix"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", 5432)),
"pool_size": 20
}
STORAGE = {
"bucket": os.getenv("S3_BUCKET", "fenix-documents"),
"region": os.getenv("AWS_DEFAULT_REGION", "us-east-1")
}
db = PostgresPool(**DATABASE)
storage = S3ObjectStorage(**STORAGE)
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License - See LICENSE file for details.
Author
Created and maintained by Ben Moag at Fenixflow
For more information, visit the GitLab repository.