
Security News
OWASP 2025 Top 10 Adds Software Supply Chain Failures, Ranked Top Community Concern
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.
ff-storage
Advanced tools
Fenixflow storage package with Pydantic ORM, temporal data management, and state-based schema sync
A comprehensive storage package for Fenixflow applications, providing async connection pools for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, S3-compatible services, and Azure Blob Storage.
Created by Ben Moag at Fenixflow
CRITICAL UPDATE in v3.3.0: Fixes production bug causing false positives in schema drift detection!
If you're using schema sync (v2.0+), upgrade immediately - v3.2.x had a critical bug causing unnecessary schema changes.
Major features in v3.0.0: Production-ready Pydantic ORM with built-in temporal data management!
📚 v3.0.0 Documentation | ⚡ Quickstart Guide | 🎯 Strategy Selection
Backwards Compatible: All v2 features work unchanged. v3 is fully opt-in.
Added in 2.0.0: Terraform-like automatic schema synchronization! Define schema in model classes and let SchemaManager handle migrations automatically.
Important: v3.3.0 fixes critical false positive detection bugs in schema sync. Upgrade from v2.x/v3.2.x immediately.
Breaking Change in 2.0.0: Removed file-based migrations (MigrationManager). Use SchemaManager for automatic schema sync from model definitions.
New in 1.1.0: Added Azure Blob Storage backend with support for both Azurite (local development) and production Azure Blob Storage.
pip install ff-storage
pip install git+https://gitlab.com/fenixflow/fenix-packages.git#subdirectory=ff-storage
from ff_storage.db import PostgresPool
# Create async connection pool
pool = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
min_size=10,
max_size=20
)
# Connect once at startup
await pool.connect()
# Use many times - pool handles connections internally
# Returns dictionaries by default for easy access
results = await pool.fetch_all("SELECT id, title, status FROM documents WHERE status = $1", "active")
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Access by column name - intuitive!
# Fetch single row
user = await pool.fetch_one("SELECT id, name, email FROM users WHERE id = $1", 123)
# user = {'id': 123, 'name': 'Alice', 'email': 'alice@example.com'}
# Disconnect once at shutdown
await pool.disconnect()
from ff_storage.db import Postgres
# Create direct connection
db = Postgres(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432
)
# Connect and query - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %(status)s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
db.close_connection()
from fastapi import FastAPI
from ff_storage.db import PostgresPool
app = FastAPI()
# Create pool once
app.state.db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
min_size=10,
max_size=20
)
@app.on_event("startup")
async def startup():
await app.state.db.connect()
@app.on_event("shutdown")
async def shutdown():
await app.state.db.disconnect()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Pool handles connection automatically
user = await app.state.db.fetch_one(
"SELECT * FROM users WHERE id = $1", user_id
)
return user
No action required - Internal architecture improvements only.
Changes:
Impact: If using schema sync, upgrade immediately to eliminate false positives causing index recreation on every run.
See docs/quickstart_v3.md for full migration guide.
Pools are now async - all *Pool classes require await. Use direct connections for sync code (Postgres, MySQL, SQLServer - unchanged).
from ff_storage import PostgresPool
# Initialize pool
db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
pool_size=20
)
# Use connection from pool - returns dicts by default
db.connect()
try:
# Execute queries - returns list of dicts
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
# Execute with RETURNING
new_id = db.execute_query(
"INSERT INTO documents (title) VALUES (%s) RETURNING id",
{"title": "New Document"}
)
# new_id = [{'id': 123}]
# Transaction example
db.begin_transaction()
try:
db.execute("UPDATE documents SET status = %s WHERE id = %s", {"status": "archived", "id": 123})
db.execute("INSERT INTO audit_log (action) VALUES (%s)", {"action": "archive"})
db.commit_transaction()
except Exception:
db.rollback_transaction()
raise
finally:
# Return connection to pool
db.close_connection()
from ff_storage import MySQLPool
# Initialize pool
db = MySQLPool(
dbname="fenix_db",
user="root",
password="password",
host="localhost",
port=3306,
pool_size=10
)
# Similar usage pattern as PostgreSQL - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
db.close_connection()
from ff_storage import SQLServerPool
# Initialize pool
db = SQLServerPool(
dbname="fenix_db",
user="sa",
password="YourPassword123",
host="localhost",
port=1433,
driver="ODBC Driver 18 for SQL Server",
pool_size=10
)
# Connect and execute queries - returns dicts by default
db.connect()
try:
# Read query - returns list of dicts
results = db.read_query("SELECT id, title, status FROM documents WHERE status = ?", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
# Execute with OUTPUT clause
new_id = db.execute_query(
"INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)",
{"title": "New Document"}
)
# new_id = [{'id': 123}]
# Check table existence
if db.table_exists("users", schema="dbo"):
columns = db.get_table_columns("users", schema="dbo")
finally:
db.close_connection()
from ff_storage import LocalObjectStorage
import asyncio
async def main():
# Initialize local storage
storage = LocalObjectStorage("/var/data/documents")
# Write file with metadata
await storage.write(
"reports/2025/quarterly.pdf",
pdf_bytes,
metadata={"content-type": "application/pdf", "author": "system"}
)
# Read file
data = await storage.read("reports/2025/quarterly.pdf")
# Check existence
exists = await storage.exists("reports/2025/quarterly.pdf")
# List files with prefix
files = await storage.list_keys(prefix="reports/2025/")
# Delete file
await storage.delete("reports/2025/quarterly.pdf")
asyncio.run(main())
from ff_storage import S3ObjectStorage
import asyncio
async def main():
# AWS S3
s3 = S3ObjectStorage(
bucket="fenix-documents",
region="us-east-1"
)
# Or MinIO/other S3-compatible
s3 = S3ObjectStorage(
bucket="fenix-documents",
endpoint_url="http://localhost:9000",
access_key="minioadmin",
secret_key="minioadmin"
)
# Write file
await s3.write("docs/report.pdf", pdf_bytes)
# Stream large files
async for chunk in s3.read_stream("large_file.bin", chunk_size=8192):
await process_chunk(chunk)
# Multipart upload for large files (automatic)
await s3.write("huge_file.bin", huge_data) # Automatically uses multipart if > 5MB
asyncio.run(main())
from ff_storage import AzureBlobObjectStorage
from azure.identity import DefaultAzureCredential
import asyncio
async def main():
# Azurite (local development with connection string)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
connection_string="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
)
# Production with connection string (access key)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
connection_string="DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=...;EndpointSuffix=core.windows.net",
prefix="documents/" # Optional prefix for all keys
)
# Production with Managed Identity (DefaultAzureCredential)
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
account_url="https://mystorageaccount.blob.core.windows.net"
)
# Production with custom credential
storage = AzureBlobObjectStorage(
container_name="fenix-documents",
account_url="https://mystorageaccount.blob.core.windows.net",
credential=DefaultAzureCredential()
)
# Write file with metadata
await storage.write(
"reports/2025/quarterly.pdf",
pdf_bytes,
metadata={"content-type": "application/pdf", "author": "system"}
)
# Read file
data = await storage.read("reports/2025/quarterly.pdf")
# Stream large files
async for chunk in storage.read_stream("large_file.bin", chunk_size=8192):
await process_chunk(chunk)
# Check existence
exists = await storage.exists("reports/2025/quarterly.pdf")
# List blobs with prefix
files = await storage.list_keys(prefix="reports/2025/")
# Get metadata
metadata = await storage.get_metadata("reports/2025/quarterly.pdf")
print(metadata["content-type"])
# Delete blob
await storage.delete("reports/2025/quarterly.pdf")
asyncio.run(main())
Note: Azure Blob Storage has restrictions on metadata keys (must be valid C# identifiers). The implementation automatically converts hyphens to underscores (e.g., content-type becomes content_type) when storing and converts them back when retrieving.
from ff_storage.db import Postgres, SchemaManager
from ff_storage.db.models import BaseModel
# Define your model with schema in code
class Document(BaseModel):
__table_name__ = "documents"
__schema__ = "public"
@classmethod
def create_table_sql(cls):
return """
CREATE TABLE IF NOT EXISTS public.documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(255) NOT NULL,
content TEXT,
status VARCHAR(50) DEFAULT 'draft',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_documents_status
ON public.documents(status);
CREATE INDEX IF NOT EXISTS idx_documents_created_at
ON public.documents(created_at DESC);
"""
# Connect to database
db = Postgres(dbname="mydb", user="user", password="pass", host="localhost", port=5432)
db.connect()
# Create schema manager (auto-detects PostgreSQL)
manager = SchemaManager(db)
# Dry run to preview changes
print("Preview of changes:")
manager.sync_schema(
models=[Document],
allow_destructive=False,
dry_run=True
)
# Apply changes automatically
changes_applied = manager.sync_schema(
models=[Document],
allow_destructive=False, # Safe by default
dry_run=False
)
print(f"Applied {changes_applied} schema changes")
Features:
from ff_storage.db.models import BaseModel, BaseModelWithDates
from dataclasses import dataclass
from typing import Optional
import uuid
@dataclass
class Document(BaseModelWithDates):
title: str
content: str
status: str = "draft"
author_id: Optional[uuid.UUID] = None
# Automatic UUID and timestamp handling
doc = Document(
title="Quarterly Report",
content="...",
status="published"
)
# doc.id = UUID automatically generated
# doc.created_at = current timestamp
# doc.updated_at = current timestamp
# Context manager for automatic transaction handling
async def transfer_ownership(db, doc_id, new_owner_id):
db.begin_transaction()
try:
# Multiple operations in single transaction
db.execute("UPDATE documents SET owner_id = %s WHERE id = %s",
{"owner_id": new_owner_id, "id": doc_id})
db.execute("INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)",
{"action": "transfer", "doc_id": doc_id, "user_id": new_owner_id})
db.commit_transaction()
except Exception as e:
db.rollback_transaction()
raise
# Check pool statistics
pool = PostgresPool(...)
open_connections = pool.get_open_connections()
print(f"Open connections: {open_connections}")
# Graceful shutdown
pool.close_all_connections()
from ff_storage.db.sql import build_insert, build_update, build_select
# Build INSERT query
query, params = build_insert("documents", {
"title": "New Doc",
"status": "draft"
})
# Build UPDATE query
query, params = build_update("documents",
{"status": "published"},
{"id": doc_id}
)
# Build SELECT with conditions
query, params = build_select("documents",
columns=["id", "title"],
where={"status": "published", "author_id": user_id}
)
from ff_storage.exceptions import StorageError, DatabaseError
try:
db.connect()
results = db.read_query("SELECT * FROM documents")
except DatabaseError as e:
print(f"Database error: {e}")
except StorageError as e:
print(f"Storage error: {e}")
finally:
db.close_connection()
# Run tests
pytest tests/
# With coverage
pytest --cov=ff_storage tests/
# Run specific test file
pytest tests/test_postgres.py
# Run with verbose output
pytest -v tests/
# Database
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=fenix_db
export DB_USER=fenix
export DB_PASSWORD=secret
# S3 Storage
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
export AWS_DEFAULT_REGION=us-east-1
# Local Storage
export STORAGE_PATH=/var/data/documents
# config.py
from ff_storage import PostgresPool, S3ObjectStorage
# Database configuration
DATABASE = {
"dbname": os.getenv("DB_NAME", "fenix_db"),
"user": os.getenv("DB_USER", "fenix"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", 5432)),
"pool_size": 20
}
# Storage configuration
STORAGE = {
"bucket": os.getenv("S3_BUCKET", "fenix-documents"),
"region": os.getenv("AWS_DEFAULT_REGION", "us-east-1")
}
# Initialize
db = PostgresPool(**DATABASE)
storage = S3ObjectStorage(**STORAGE)
Contributions are welcome! Please feel free to submit a Pull Request.
MIT License - See LICENSE file for details.
Created and maintained by Ben Moag at Fenixflow
For more information, visit the GitLab repository.
FAQs
Fenixflow storage package with Pydantic ORM, temporal data management, and state-based schema sync
We found that ff-storage demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.

Research
/Security News
Socket researchers discovered nine malicious NuGet packages that use time-delayed payloads to crash applications and corrupt industrial control systems.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.