
Security News
Open VSX Begins Implementing Pre-Publish Security Checks After Repeated Supply Chain Incidents
Following multiple malicious extension incidents, Open VSX outlines new safeguards designed to catch risky uploads earlier.
sql-testing-library
Advanced tools
SQL Testing Framework for Python: Unit test SQL queries with mock data injection for BigQuery, Snowflake, Redshift, Athena, Trino, and DuckDB. Simplify data engineering ETL testing and analytics validation.
A powerful Python framework for unit testing SQL queries with mock data injection
Test SQL queries across BigQuery, Snowflake, Redshift, Athena, Trino, and DuckDB with type-safe mock data, pytest integration, and automatic table resolution. Perfect for data engineering, ETL pipeline testing, and analytics validation.
Quick Links: Installation | Quick Start | Documentation | Examples | PyPI Package
SQL testing in data engineering can be challenging, especially when working with large datasets and complex queries across multiple database platforms. This library was born from real-world production needs at scale, addressing the pain points of:
For more details on our journey and the engineering challenges we solved, read the full story: "Our Journey to Building a Scalable SQL Testing Library for Athena"
@sql_test decoratorlog_sql=True or SQL_TEST_LOG_ALL=true environment variableThe library supports different data types across database engines. All checkmarks indicate comprehensive test coverage with verified functionality.
| Data Type | Python Type | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| String | str | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Integer | int | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Float | float | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Boolean | bool | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Date | date | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Datetime | datetime | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Decimal | Decimal | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Optional | Optional[T] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Data Type | Python Type | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| String Array | List[str] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Integer Array | List[int] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Decimal Array | List[Decimal] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Optional Array | Optional[List[T]] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Map/Dict | Dict[K, V] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Struct/Record | dataclass | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Nested Arrays | List[List[T]] | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Arrays of Structs | List[dataclass] | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 3D Arrays | List[List[List[T]]] | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Arrays of Arrays of Structs | List[List[dataclass]] | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
[]; uses scientific notation for large decimals; dict/map types stored as JSON strings; struct types supported using STRUCT syntax with named fields (dataclasses and Pydantic models); does not support nested arrays (database limitation)ARRAY[] and MAP(ARRAY[], ARRAY[]) syntax; supports struct types using ROW with named fields (dataclasses and Pydantic models); full support for deeply nested types including nested arrays, arrays of structs, and 3D arraysROW with named fields (dataclasses and Pydantic models); full support for deeply nested types including nested arrays, arrays of structs, and 3D arraysSTRUCT syntax with named fields (dataclasses and Pydantic models); full support for deeply nested types including nested arrays (2D, 3D+), arrays of structs, and arrays of arrays of structsThe library supports two execution modes for mock data injection. CTE Mode is the default and is automatically used unless Physical Tables mode is explicitly requested or required due to query size limits.
| Execution Mode | Description | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| CTE Mode | Mock data injected as Common Table Expressions | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Physical Tables | Mock data created as temporary tables | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
use_physical_tables=True| Database | Table Type | Schema/Location | Cleanup Method | Cleanup Timing |
|---|---|---|---|---|
| BigQuery | Standard tables | Project dataset | Library executes client.delete_table() | After each test |
| Athena | External tables | S3-backed external tables | Library executes DROP TABLE (⚠️ S3 data remains) | After each test |
| Redshift | Temporary tables | Session-specific temp schema | Database automatic | Session end |
| Trino | Memory tables | memory.default schema | Library executes DROP TABLE | After each test |
| Snowflake | Temporary tables | Session-specific temp schema | Database automatic | Session end |
| DuckDB | Temporary tables | Database-specific temp schema | Library executes DROP TABLE | After each test |
Library-Managed Cleanup (BigQuery, Athena, Trino, DuckDB):
client.delete_table()DROP TABLE IF EXISTS (⚠️ S3 data files remain and require separate cleanup)DROP TABLE IF EXISTSDROP TABLE IF EXISTSDatabase-Managed Cleanup (Redshift, Snowflake):
CREATE TEMPORARY TABLE - automatically dropped when session endsCREATE TEMPORARY TABLE - automatically dropped when session endsWhy the Difference?
Q: Why does Athena require "manual" cleanup while others are automatic?
A: Athena creates external tables backed by S3 data. The library automatically calls DROP TABLE after each test, which removes the table metadata from AWS Glue catalog. However, the actual S3 data files remain and must be cleaned up separately - either manually or through S3 lifecycle policies. This two-step cleanup process is why it's considered "manual" compared to true temporary tables.
Q: What does "explicit cleanup" mean for Trino?
A: Trino's memory catalog doesn't automatically clean up tables when sessions end. The library explicitly calls DROP TABLE IF EXISTS after each test to remove the tables. Like Athena, if a test fails catastrophically, some tables might persist until the Trino server restarts.
Q: What is the TTL (Time To Live) for BigQuery tables?
A: BigQuery tables created by the library are standard tables without TTL - they persist until explicitly deleted. The library immediately calls client.delete_table() after each test. If you want to set TTL as a safety net, you can configure it at the dataset level (e.g., 24 hours) to auto-delete any orphaned tables.
Q: Which databases leave artifacts if tests crash?
Q: How to manually clean up orphaned tables?
-- BigQuery: List and delete tables with temp prefix
SELECT table_name FROM `project.dataset.INFORMATION_SCHEMA.TABLES`
WHERE table_name LIKE 'temp_%';
-- Athena: List and drop tables with temp prefix
SHOW TABLES LIKE 'temp_%';
DROP TABLE temp_table_name;
-- Trino: List and drop tables with temp prefix
SHOW TABLES FROM memory.default LIKE 'temp_%';
DROP TABLE memory.default.temp_table_name;
-- DuckDB: List and drop tables with temp prefix
SHOW TABLES;
DROP TABLE temp_table_name;
Q: How to handle S3 cleanup for Athena tables?
Athena external tables store data in S3. When DROP TABLE is called, only the table metadata is removed from AWS Glue catalog - S3 data files remain. Here are cleanup options:
Option 1: S3 Lifecycle Policy (Recommended)
{
"Rules": [
{
"ID": "DeleteSQLTestingTempFiles",
"Status": "Enabled",
"Filter": {
"Prefix": "temp_"
},
"Expiration": {
"Days": 1
}
}
]
}
Option 2: Manual S3 Cleanup
# List temp files in your Athena results bucket
aws s3 ls s3://your-athena-results-bucket/ --recursive | grep temp_
# Delete temp files older than 1 day
aws s3 rm s3://your-athena-results-bucket/ --recursive --exclude "*" --include "temp_*"
Option 3: Automated Cleanup Script
#!/bin/bash
# Delete S3 objects older than 1 day with temp_ prefix
aws s3api list-objects-v2 --bucket your-athena-results-bucket --prefix "temp_" \
--query 'Contents[?LastModified<=`2024-01-01`].Key' --output text | \
xargs -I {} aws s3 rm s3://your-athena-results-bucket/{}
| Database | CTE Query Size Limit | Physical Tables Threshold |
|---|---|---|
| BigQuery | ~1MB (estimated) | Large dataset or complex CTEs |
| Athena | 256KB | Automatically switches at 256KB |
| Redshift | 16MB | Automatically switches at 16MB |
| Trino | 16MB (estimated) | Large dataset or complex CTEs |
| Snowflake | 1MB | Automatically switches at 1MB |
| DuckDB | 32MB (estimated) | Large dataset or complex CTEs |
# Default: CTE Mode (recommended for most use cases)
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_default_mode():
return TestCase(query="SELECT * FROM table")
# Explicit CTE Mode
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_explicit_cte():
return TestCase(
query="SELECT * FROM table",
use_physical_tables=False # Explicit CTE mode
)
# Explicit Physical Tables Mode
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_physical_tables():
return TestCase(
query="SELECT * FROM table",
use_physical_tables=True # Force physical tables
)
# Physical Tables with Custom Parallel Settings
@sql_test(
mock_tables=[...],
result_class=ResultClass,
use_physical_tables=True,
max_workers=4 # Customize parallel execution
)
def test_with_custom_parallelism():
return TestCase(query="SELECT * FROM table")
Notes:
When using use_physical_tables=True with multiple mock tables, the library can create and cleanup tables in parallel for better performance.
Default Behavior:
Customization:
# Disable parallel creation
@sql_test(use_physical_tables=True, parallel_table_creation=False)
# Custom worker count
@sql_test(use_physical_tables=True, max_workers=2)
# In SQLTestCase directly
TestCase(
query="...",
use_physical_tables=True,
parallel_table_creation=True, # Default
max_workers=4 # Custom worker limit
)
Default Behavior:
Customization:
# Disable parallel cleanup
@sql_test(use_physical_tables=True, parallel_table_cleanup=False)
# Custom worker count for both creation and cleanup
@sql_test(use_physical_tables=True, max_workers=2)
# In SQLTestCase directly
TestCase(
query="...",
use_physical_tables=True,
parallel_table_creation=True, # Default
parallel_table_cleanup=True, # Default
max_workers=4 # Custom worker limit for both operations
)
Performance Benefits:
# Install with BigQuery support
pip install sql-testing-library[bigquery]
# Install with Athena support
pip install sql-testing-library[athena]
# Install with Redshift support
pip install sql-testing-library[redshift]
# Install with Trino support
pip install sql-testing-library[trino]
# Install with Snowflake support
pip install sql-testing-library[snowflake]
# Install with DuckDB support
pip install sql-testing-library[duckdb]
# Or install with all database adapters
pip install sql-testing-library[all]
# Install base dependencies
poetry install
# Install with specific database support
poetry install --with bigquery
poetry install --with athena
poetry install --with redshift
poetry install --with trino
poetry install --with snowflake
poetry install --with duckdb
# Install with all database adapters and dev tools
poetry install --with bigquery,athena,redshift,trino,snowflake,duckdb,dev
pytest.ini:[sql_testing]
adapter = bigquery # Use 'bigquery', 'athena', 'redshift', 'trino', 'snowflake', or 'duckdb'
# BigQuery configuration
[sql_testing.bigquery]
project_id = <my-test-project>
dataset_id = <test_dataset>
credentials_path = <path to credentials json>
# Athena configuration
# [sql_testing.athena]
# database = <test_database>
# s3_output_location = s3://my-athena-results/
# region = us-west-2
# aws_access_key_id = <optional> # Optional: if not using default credentials
# aws_secret_access_key = <optional> # Optional: if not using default credentials
# Redshift configuration
# [sql_testing.redshift]
# host = <redshift-host.example.com>
# database = <test_database>
# user = <redshift_user>
# password = <redshift_password>
# port = <5439> # Optional: default port is 5439
# Trino configuration
# [sql_testing.trino]
# host = <trino-host.example.com>
# port = <8080> # Optional: default port is 8080
# user = <trino_user>
# catalog = <memory> # Optional: default catalog is 'memory'
# schema = <default> # Optional: default schema is 'default'
# http_scheme = <http> # Optional: default is 'http', use 'https' for secure connections
#
# # Authentication configuration (choose one method)
# # For Basic Authentication:
# auth_type = basic
# password = <trino_password>
#
# # For JWT Authentication:
# # auth_type = jwt
# # token = <jwt_token>
# Snowflake configuration
# [sql_testing.snowflake]
# account = <account-identifier>
# user = <snowflake_user>
# database = <test_database>
# schema = <PUBLIC> # Optional: default schema is 'PUBLIC'
# warehouse = <compute_wh> # Required: specify a warehouse
# role = <role_name> # Optional: specify a role
#
# # Authentication (choose one):
# # Option 1: Key-pair authentication (recommended for MFA)
# private_key_path = </path/to/private_key.pem>
# # Or use environment variable SNOWFLAKE_PRIVATE_KEY
#
# # Option 2: Password authentication (for accounts without MFA)
# password = <snowflake_password>
# DuckDB configuration
# [sql_testing.duckdb]
# database = <path/to/database.duckdb> # Optional: defaults to in-memory database
Each database adapter uses a different concept for organizing tables and queries. Understanding the database context - the minimum qualification needed to uniquely identify a table - is crucial for writing mock tables and queries:
| Adapter | Database Context Format | Components | Mock Table Example | Query Example |
|---|---|---|---|---|
| BigQuery | {project_id}.{dataset_id} | project + dataset | "test-project.test_dataset" | SELECT * FROM test-project.test_dataset.users |
| Athena | {database} | database only | "test_db" | SELECT * FROM test_db.customers |
| Redshift | {database} | database only | "test_db" | SELECT * FROM test_db.orders |
| Snowflake | {database}.{schema} | database + schema | "test_db.public" | SELECT * FROM test_db.public.products |
| Trino | {catalog}.{schema} | catalog + schema | "memory.default" | SELECT * FROM memory.default.inventory |
| DuckDB | {database} | database only | "test_db" | SELECT * FROM test_db.analytics |
Mock Tables: Use hardcoded database contexts in your test mock tables. Don't rely on environment variables - this makes tests predictable and consistent.
default_namespace Parameter: This parameter serves as a namespace resolution context that qualifies unqualified table names in your SQL queries. When creating TestCase instances, use the same database context format as your mock tables.
Query References: Your SQL queries can use either:
SELECT * FROM test-project.test_dataset.usersSELECT * FROM users (qualified using default_namespace)Case Sensitivity:
"test_db.public" even if your SQL uses FROM CUSTOMERSdefault_namespace ParameterThe default_namespace parameter is not where your SQL executes - it's the namespace prefix used to resolve unqualified table names in your queries.
How it works:
SELECT * FROM users JOIN orders ON users.id = orders.user_id"test-project.test_dataset"users → test-project.test_dataset.users, orders → test-project.test_dataset.ordersget_qualified_name() valuesAlternative parameter names under consideration:
default_namespace ✅ (most clear about purpose)table_contextnamespace_prefixExample showing the difference:
# Option 1: Fully qualified table names (default_namespace not used for resolution)
TestCase(
query="SELECT * FROM test-project.test_dataset.users",
default_namespace="test-project.test_dataset", # For consistency, but not used
)
# Option 2: Unqualified table names (default_namespace used for resolution)
TestCase(
query="SELECT * FROM users", # Unqualified
default_namespace="test-project.test_dataset", # Qualifies to: test-project.test_dataset.users
)
# BigQuery Mock Table (Recommended: Use BigQueryMockTable for clearer three-part naming)
from sql_testing_library import BigQueryMockTable
class UsersMockTable(BigQueryMockTable):
project_name = "test-project"
dataset_name = "test_dataset"
table_name = "users"
# BigQuery Mock Table (Alternative: Use BaseMockTable with combined project.dataset)
class UsersMockTableAlternative(BaseMockTable):
def get_database_name(self) -> str:
return "test-project.test_dataset" # project.dataset format
def get_table_name(self) -> str:
return "users"
# Athena Mock Table
class CustomerMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db" # database only
def get_table_name(self) -> str:
return "customers"
# Snowflake Mock Table
class ProductsMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db.public" # database.schema format (lowercase)
def get_table_name(self) -> str:
return "products"
# DuckDB Mock Table
class AnalyticsMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db" # database only
def get_table_name(self) -> str:
return "analytics"
from dataclasses import dataclass
from datetime import date
from pydantic import BaseModel
from sql_testing_library import sql_test, TestCase
from sql_testing_library.mock_table import BaseMockTable
@dataclass
class User:
user_id: int
name: str
email: str
class UserResult(BaseModel):
user_id: int
name: str
class UsersMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "sqltesting_db"
def get_table_name(self) -> str:
return "users"
# Pattern 1: Define all test data in the decorator
@sql_test(
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
],
result_class=UserResult
)
def test_pattern_1():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db"
)
# Pattern 2: Define all test data in the TestCase
@sql_test() # Empty decorator
def test_pattern_2():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db",
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
],
result_class=UserResult
)
# Pattern 3: Mix and match between decorator and TestCase
@sql_test(
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
]
)
def test_pattern_3():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db",
result_class=UserResult
)
The library supports struct/record types using Python dataclasses or Pydantic models for Athena, Trino, and BigQuery:
from dataclasses import dataclass
from decimal import Decimal
from pydantic import BaseModel
from sql_testing_library import sql_test, TestCase
from sql_testing_library.mock_table import BaseMockTable
# Define nested structs using dataclasses
@dataclass
class Address:
street: str
city: str
state: str
zip_code: str
@dataclass
class Employee:
id: int
name: str
salary: Decimal
address: Address # Nested struct
is_active: bool = True
# Or use Pydantic models
class AddressPydantic(BaseModel):
street: str
city: str
state: str
zip_code: str
class EmployeeResultPydantic(BaseModel):
id: int
name: str
city: str # Extracted from nested struct
# Mock table with struct data
class EmployeesMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db"
def get_table_name(self) -> str:
return "employees"
# Test with struct types
@sql_test(
adapter_type="athena", # or "trino", "bigquery", or "duckdb"
mock_tables=[
EmployeesMockTable([
Employee(
id=1,
name="Alice Johnson",
salary=Decimal("120000.00"),
address=Address(
street="123 Tech Lane",
city="San Francisco",
state="CA",
zip_code="94105"
),
is_active=True
),
Employee(
id=2,
name="Bob Smith",
salary=Decimal("95000.00"),
address=Address(
street="456 Oak Ave",
city="New York",
state="NY",
zip_code="10001"
),
is_active=False
)
])
],
result_class=EmployeeResultPydantic
)
def test_struct_with_dot_notation():
return TestCase(
query="""
SELECT
id,
name,
address.city as city -- Access nested field with dot notation
FROM employees
WHERE address.state = 'CA' -- Use struct fields in WHERE clause
""",
default_namespace="test_db"
)
# You can also query entire structs
@sql_test(
adapter_type="trino", # or "athena", "bigquery", or "duckdb"
mock_tables=[EmployeesMockTable([...])],
result_class=dict # Returns full struct as dict
)
def test_query_full_struct():
return TestCase(
query="SELECT id, name, address FROM employees",
default_namespace="test_db"
)
Struct Type Features:
struct.field syntax in queriesList[StructType] with array operationsSQL Type Mapping:
ROW(field1 type1, field2 type2, ...)# Run all tests
pytest test_users.py
# Run only SQL tests (using the sql_test marker)
pytest -m sql_test
# Exclude SQL tests
pytest -m "not sql_test"
# Run a specific test
pytest test_users.py::test_user_query
# If using Poetry
poetry run pytest test_users.py::test_user_query
If you encounter the error No [sql_testing] section found in pytest.ini, setup.cfg, or tox.ini, this typically happens when using IDEs like PyCharm, VS Code, or other development environments that run pytest from a different working directory.
Problem: The library looks for configuration files (pytest.ini, setup.cfg, tox.ini) in the current working directory, but IDEs may run tests from a different location.
Set the SQL_TESTING_PROJECT_ROOT environment variable to point to your project root directory:
In PyCharm:
SQL_TESTING_PROJECT_ROOT/path/to/your/project/root (where your pytest.ini is located)In VS Code:
Add to your .vscode/settings.json:
{
"python.testing.pytestArgs": [
"--rootdir=/path/to/your/project/root"
],
"python.envFile": "${workspaceFolder}/.env"
}
Create a .env file in your project root:
SQL_TESTING_PROJECT_ROOT=/path/to/your/project/root
Create a conftest.py file in your project root directory:
"""
PyTest configuration file to ensure SQL Testing Library can find config
"""
import os
import pytest
def pytest_configure(config):
"""Ensure SQL_TESTING_PROJECT_ROOT is set for IDE compatibility"""
if not os.environ.get('SQL_TESTING_PROJECT_ROOT'):
# Set to current working directory where conftest.py is located
project_root = os.path.dirname(os.path.abspath(__file__))
os.environ['SQL_TESTING_PROJECT_ROOT'] = project_root
print(f"Setting SQL_TESTING_PROJECT_ROOT to: {project_root}")
This automatically sets the project root when pytest runs, regardless of the IDE or working directory.
Create a setup.cfg file alongside your pytest.ini:
[tool:pytest]
testpaths = tests
[sql_testing]
adapter = bigquery
[sql_testing.bigquery]
project_id = your-project-id
dataset_id = your_dataset
credentials_path = /path/to/credentials.json
In PyCharm:
pytest.ini is located)In VS Code:
Ensure your workspace is opened at the project root level where pytest.ini exists.
To verify your configuration is working, run this Python snippet:
from sql_testing_library._pytest_plugin import SQLTestDecorator
decorator = SQLTestDecorator()
try:
project_root = decorator._get_project_root()
print(f"Project root: {project_root}")
config_parser = decorator._get_config_parser()
print(f"Config sections: {config_parser.sections()}")
if 'sql_testing' in config_parser:
adapter = config_parser.get('sql_testing', 'adapter')
print(f"✅ Configuration found! Adapter: {adapter}")
else:
print("❌ No sql_testing section found")
except Exception as e:
print(f"❌ Error: {e}")
PyCharm: Often runs pytest from the project parent directory instead of the project root.
conftest.pyVS Code: May not respect the pytest.ini location when using the Python extension.
.env file or set python.testing.pytestArgs in settingsJupyter Notebooks: Running tests in notebooks may not find configuration files.
SQL_TESTING_PROJECT_ROOT environment variable in the notebookDocker/Containers: Configuration files may not be mounted or accessible.
The library supports flexible ways to configure your tests:
@sql_test decorator, with only query and default_namespace in TestCase.@sql_test() decorator and define everything in the TestCase return value.Important notes:
The adapter specified in [sql_testing] section acts as the default adapter for all tests. When you don't specify an adapter_type in your test, it uses this default.
[sql_testing]
adapter = snowflake # This becomes the default for all tests
You can override the default adapter for individual tests:
# Use BigQuery adapter for this test
@sql_test(
adapter_type="bigquery",
mock_tables=[...],
result_class=UserResult
)
def test_bigquery_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db"
)
# Use Athena adapter for this test
@sql_test(
adapter_type="athena",
mock_tables=[...],
result_class=UserResult
)
def test_athena_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Redshift adapter for this test
@sql_test(
adapter_type="redshift",
mock_tables=[...],
result_class=UserResult
)
def test_redshift_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Trino adapter for this test
@sql_test(
adapter_type="trino",
mock_tables=[...],
result_class=UserResult
)
def test_trino_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Snowflake adapter for this test
@sql_test(
adapter_type="snowflake",
mock_tables=[...],
result_class=UserResult
)
def test_snowflake_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use DuckDB adapter for this test
@sql_test(
adapter_type="duckdb",
mock_tables=[...],
result_class=UserResult
)
def test_duckdb_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
The adapter_type parameter will use the configuration from the corresponding section in pytest.ini, such as [sql_testing.bigquery], [sql_testing.athena], [sql_testing.redshift], [sql_testing.trino], [sql_testing.snowflake], or [sql_testing.duckdb].
Default Adapter Behavior:
adapter_type is not specified in the test, the library uses the adapter from [sql_testing] section's adapter setting[sql_testing] section, it defaults to "bigquery"[sql_testing.<adapter_name>] sectionBigQueryMockTable class for explicit three-part naming (project.dataset.table)BigQuery uses a three-part naming scheme (project.dataset.table) which doesn't fit naturally into the two-part database.table model used by most databases. The BigQueryMockTable class provides explicit support for BigQuery's naming convention.
Using BaseMockTable for BigQuery requires awkwardly cramming the project and dataset together:
# Awkward: Combines project.dataset into database_name
class UsersMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test-project.test_dataset" # Confusing!
def get_table_name(self) -> str:
return "users"
BigQueryMockTable makes BigQuery's three-part naming explicit and clear:
from sql_testing_library import BigQueryMockTable
class UsersMockTable(BigQueryMockTable):
project_name = "test-project"
dataset_name = "test_dataset"
table_name = "users"
Basic Usage:
from sql_testing_library import BigQueryMockTable
class UsersMockTable(BigQueryMockTable):
project_name = "my-project"
dataset_name = "analytics"
table_name = "users"
class OrdersMockTable(BigQueryMockTable):
project_name = "my-project"
dataset_name = "analytics"
table_name = "orders"
Avoid Repetition with Inheritance:
# Base class for all tables in the same project
class MyProjectTable(BigQueryMockTable):
project_name = "my-project"
# Subclasses only specify dataset and table
class UsersTable(MyProjectTable):
dataset_name = "analytics"
table_name = "users"
class OrdersTable(MyProjectTable):
dataset_name = "analytics"
table_name = "orders"
Available Methods:
table = UsersMockTable([...])
# BigQuery-specific methods
table.get_project_name() # "my-project"
table.get_dataset_name() # "analytics"
table.get_fully_qualified_name() # "my-project.analytics.users"
# Backwards compatible methods (from BaseMockTable)
table.get_database_name() # "my-project.analytics"
table.get_table_name() # "users"
table.get_qualified_name() # "my-project.analytics.users"
table.get_cte_alias() # "my_project_analytics__users"
Benefits:
BaseMockTable methodsDefault Behavior:
[sql_testing] section's adapter setting.[sql_testing] section, it defaults to "bigquery".[sql_testing.<adapter>] section.[sql_testing] section for backward compatibility.The project includes a Makefile for common development tasks:
# Install all dependencies
make install
# Run unit tests
make test
# Run linting and type checking
make lint
# Format code
make format
# Run all checks (lint + format check + tests)
make check
# See all available commands
make help
| Command | Description |
|---|---|
make install | Install all dependencies with poetry |
make test | Run unit tests with coverage |
make test-unit | Run unit tests (excludes integration tests) |
make test-integration | Run integration tests (requires DB credentials) |
make test-all | Run all tests (unit + integration) |
make test-tox | Run tests across all Python versions (3.9-3.12) |
make lint | Run ruff and mypy checks |
make format | Format code with black and ruff |
make check | Run all checks (lint + format + tests) |
make clean | Remove build artifacts and cache files |
make build | Build distribution packages |
make docs | Build documentation |
The project supports Python 3.9-3.12. You can test across all versions using:
# Using tox (automatically tests all Python versions)
make test-tox
# Or directly with tox
tox
# Test specific Python version
tox -e py39 # Python 3.9
tox -e py310 # Python 3.10
tox -e py311 # Python 3.11
tox -e py312 # Python 3.12
The project uses comprehensive tools to ensure code quality:
To set up the development environment:
Install development dependencies:
# Using make
make install
# Or directly with poetry
poetry install --all-extras
Set up pre-commit hooks:
./scripts/setup-hooks.sh
This ensures code is automatically formatted, linted, and type-checked on commit.
For more information on code quality standards, see docs/linting.md.
The library includes comprehensive GitHub Actions workflows for automated testing across multiple database platforms:
Automatically runs on every PR and merge to master:
tests/ (free)tests/integration/ (minimal cost)Required Setup:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ATHENA_OUTPUT_LOCATIONAWS_ATHENA_DATABASE, AWS_REGION (optional)Validation:
python scripts/validate-athena-setup.py
Required Setup:
GCP_SA_KEY, GCP_PROJECT_IDValidation:
python scripts/validate-bigquery-setup.py
Required Setup:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, REDSHIFT_ADMIN_PASSWORDAWS_REGION, REDSHIFT_NAMESPACE, REDSHIFT_WORKGROUP (optional)Validation:
python scripts/validate-redshift-setup.py
Manual Testing:
# Create Redshift cluster (automatically configures security groups for connectivity)
python scripts/manage-redshift-cluster.py create
# Get connection details and psql command
python scripts/manage-redshift-cluster.py endpoint
# Run integration tests
poetry run pytest tests/integration/test_redshift_integration.py -v
# Clean up resources (automatically waits for proper deletion order)
python scripts/manage-redshift-cluster.py destroy
Required Setup:
Manual Testing:
# Run integration tests (automatically manages Docker containers)
poetry run pytest tests/integration/test_trino_integration.py -v
Required Setup:
SNOWFLAKE_ACCOUNT, SNOWFLAKE_USER, SNOWFLAKE_PASSWORDSNOWFLAKE_DATABASE, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_ROLE (optional)Validation:
python scripts/validate-snowflake-setup.py
Manual Testing:
# Run integration tests
poetry run pytest tests/integration/test_snowflake_integration.py -v
The library automatically:
For detailed usage and configuration options, see the documentation.
SQL Testing Library works seamlessly with Mocksmith for automatic test data generation. Mocksmith can reduce your test setup code by ~70% while providing more realistic test data.
Install mocksmith with: pip install mocksmith[mock,pydantic]
# Without Mocksmith - Manual data creation
customers = []
for i in range(100):
customers.append(Customer(
id=i + 1,
name=f"Customer {i + 1}",
email=f"customer{i + 1}@test.com",
balance=Decimal(str(random.uniform(0, 10000)))
))
# With Mocksmith - Automatic realistic data
from mocksmith import mockable, Varchar, Integer, Money
@mockable
@dataclass
class Customer:
id: Integer()
name: Varchar(100)
email: Varchar(255)
balance: Money()
customers = [Customer.mock() for _ in range(100)]
See the Mocksmith Integration Guide for detailed usage patterns.
The library has a few known limitations that are planned to be addressed in future updates:
✅ Fully Supported (All Major Adapters):
List[List[int]], List[List[List[int]]]List[Address] where Address is a dataclassList[List[OrderItem]]Dict[str, str], Dict[str, int]tests/integration/test_deeply_nested_types_integration.py for comprehensive examplesImplementation Details:
_parse_json_if_string() in BaseTypeConverter for Redshift/Snowflake🚧 Database Limitations:
ARRAY<ARRAY<T>> constructs. Struct types and arrays of structs work fine.FAQs
SQL Testing Framework for Python: Unit test SQL queries with mock data injection for BigQuery, Snowflake, Redshift, Athena, Trino, and DuckDB. Simplify data engineering ETL testing and analytics validation.
We found that sql-testing-library demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Following multiple malicious extension incidents, Open VSX outlines new safeguards designed to catch risky uploads earlier.

Research
/Security News
Threat actors compromised four oorzc Open VSX extensions with more than 22,000 downloads, pushing malicious versions that install a staged loader, evade Russian-locale systems, pull C2 from Solana memos, and steal macOS credentials and wallets.

Security News
Lodash 4.17.23 marks a security reset, with maintainers rebuilding governance and infrastructure to support long-term, sustainable maintenance.