Python Testing for Databricks
Installation
Add a databricks-labs-pytester
dependency to your pyproject.toml
file (or legacy requirements.txt
file). You can
also install it directly from the command line:
pip install databricks-labs-pytester
If you use hatch
as a build system, make sure to add databricks-labs-pytester
as
a test-time dependency and not as a compile-time dependency, otherwise your wheels will
transitively depend on pytest
, which is not usually something you need.
[project]
name = "name-of-your-project"
dependencies = [
"databricks-sdk~=0.30",
]
[tool.hatch.envs.default]
dependencies = [
"black~=24.3.0",
"coverage[toml]~=7.4.4",
"mypy~=1.9.0",
"pylint~=3.2.2",
"pylint-pytest==2.0.0a0",
"databricks-labs-pylint~=0.4.0",
"databricks-labs-pytester~=0.2",
"pytest~=8.3.3",
"pytest-cov~=4.1.0",
"pytest-mock~=3.14.0",
"pytest-timeout~=2.3.1",
"pytest-xdist~=3.5.0",
"python-lsp-server>=1.9.0",
"ruff~=0.3.4",
"types-PyYAML~=6.0.12",
"types-requests~=2.31.0",
]
[back to top]
Ecosystem
Built on top of Databricks SDK for Python, this library is part of the Databricks Labs Python ecosystem, which includes the following projects:
See this video for a quick overview of the Databricks Labs Python ecosystem.
[back to top]
PyTest Fixtures
PyTest Fixtures are a powerful way to manage test setup and teardown in Python. This library provides
a set of fixtures to help you write integration tests for Databricks. These fixtures were incubated
within the Unity Catalog Automated Migrations project
for more than a year and are now available for other projects to simplify integration testing with Databricks.
[back to top]
Logging
This library is built on years of debugging integration tests for Databricks and its ecosystem.
That's why it comes with a built-in logger that traces creation and deletion of dummy entities through links in
the Databricks Workspace UI. If you run the following code:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
You will see the following output, where the first line is clickable and will take you to the user's profile in the Databricks Workspace UI:
12:30:53 INFO [d.l.p.fixtures.baseline] Created dummy-xwuq-...@example.com: https://.....azuredatabricks.net/#settings/workspace/identity-and-access/users/735...
12:30:53 DEBUG [d.l.p.fixtures.baseline] added workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
12:30:58 DEBUG [d.l.p.fixtures.baseline] clearing 1 workspace user fixtures
12:30:58 DEBUG [d.l.p.fixtures.baseline] removing workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
You may need to add the following to your conftest.py
file to enable this:
import logging
from databricks.labs.blueprint.logger import install_logger
install_logger()
logging.getLogger('databricks.labs.pytester').setLevel(logging.DEBUG)
[back to top]
debug_env_name
fixture
Specify the name of the debug environment. By default, it is set to .env
,
which will try to find a file named .env
in any of the parent directories of the current working directory and load
the environment variables from it via the debug_env
fixture.
Alternatively, if you are concerned of the
risk of .env
files getting checked into version control,
we recommend using the ~/.databricks/debug-env.json
file to store different sets of environment variables.
The file cannot be checked into version control by design, because it is stored in the user's home directory.
This file is used for local debugging and integration tests in IDEs like PyCharm, VSCode, and IntelliJ IDEA
while developing Databricks Platform Automation Stack, which includes Databricks SDKs for Python, Go, and Java,
as well as Databricks Terraform Provider and Databricks CLI. This file enables multi-environment and multi-cloud
testing with a single set of integration tests.
The file is typically structured as follows:
$ cat ~/.databricks/debug-env.json
{
"ws": {
"CLOUD_ENV": "azure",
"DATABRICKS_HOST": "....azuredatabricks.net",
"DATABRICKS_CLUSTER_ID": "0708-200540-...",
"DATABRICKS_WAREHOUSE_ID": "33aef...",
...
},
"acc": {
"CLOUD_ENV": "aws",
"DATABRICKS_HOST": "accounts.cloud.databricks.net",
"DATABRICKS_CLIENT_ID": "....",
"DATABRICKS_CLIENT_SECRET": "....",
...
}
}
And you can load it in your conftest.py
file as follows:
@pytest.fixture
def debug_env_name():
return "ws"
This will load the ws
environment from the ~/.databricks/debug-env.json
file.
If any of the environment variables are not found, env_or_skip
fixture
will gracefully skip the execution of tests.
See also debug_env
.
[back to top]
debug_env
fixture
Loads environment variables specified in debug_env_name
fixture from a file
for local debugging in IDEs, otherwise allowing the tests to run with the default environment variables
specified in the CI/CD pipeline.
See also acc
, env_or_skip
, ws
, debug_env_name
, is_in_debug
.
[back to top]
env_or_skip
fixture
Fixture to get environment variables or skip tests.
It is extremely useful to skip tests if the required environment variables are not set.
In the following example, test_something
would only run if the environment variable
SOME_EXTERNAL_SERVICE_TOKEN
is set:
def test_something(env_or_skip):
token = env_or_skip("SOME_EXTERNAL_SERVICE_TOKEN")
assert token is not None
See also acc
, make_run_as
, make_udf
, sql_backend
, debug_env
, is_in_debug
.
[back to top]
ws
fixture
Create and provide a Databricks WorkspaceClient object.
This fixture initializes a Databricks WorkspaceClient object, which can be used
to interact with the Databricks workspace API. The created instance of WorkspaceClient
is shared across all test functions within the test session.
See detailed documentation for the list
of environment variables that can be used to authenticate the WorkspaceClient.
In your test functions, include this fixture as an argument to use the WorkspaceClient:
def test_workspace_operations(ws):
clusters = ws.clusters.list_clusters()
assert len(clusters) >= 0
See also log_workspace_link
, make_alert_permissions
, make_authorization_permissions
, make_catalog
, make_cluster
, make_cluster_permissions
, make_cluster_policy
, make_cluster_policy_permissions
, make_dashboard_permissions
, make_directory
, make_directory_permissions
, make_experiment
, make_experiment_permissions
, make_feature_table
, make_feature_table_permissions
, make_group
, make_instance_pool
, make_instance_pool_permissions
, make_job
, make_job_permissions
, make_lakeview_dashboard_permissions
, make_model
, make_notebook
, make_notebook_permissions
, make_pipeline
, make_pipeline_permissions
, make_query
, make_query_permissions
, make_registered_model_permissions
, make_repo
, make_repo_permissions
, make_run_as
, make_secret_scope
, make_secret_scope_acl
, make_serving_endpoint
, make_serving_endpoint_permissions
, make_storage_credential
, make_udf
, make_user
, make_volume
, make_warehouse
, make_warehouse_permissions
, make_workspace_file
, make_workspace_file_path_permissions
, make_workspace_file_permissions
, spark
, sql_backend
, debug_env
, product_info
.
[back to top]
make_run_as
fixture
This fixture provides a function to create an account service principal via acc
fixture and
assign it to a workspace. The service principal is removed after the test is complete. The service principal is
created with a random display name and assigned to the workspace with the default permissions.
Use the account_groups
argument to assign the service principal to account groups, which have the required
permissions to perform a specific action.
Example:
def test_run_as_lower_privilege_user(make_run_as, ws):
run_as = make_run_as(account_groups=['account.group.name'])
through_query = next(run_as.sql_fetch_all("SELECT CURRENT_USER() AS my_name"))
me = ws.current_user.me()
assert me.user_name != through_query.my_name
Returned object has the following properties:
ws
: Workspace client that is authenticated as the ephemeral service principal.sql_backend
: SQL backend that is authenticated as the ephemeral service principal.sql_exec
: Function to execute a SQL statement on behalf of the ephemeral service principal.sql_fetch_all
: Function to fetch all rows from a SQL statement on behalf of the ephemeral service principal.display_name
: Display name of the ephemeral service principal.application_id
: Application ID of the ephemeral service principal.- if you want to have other fixtures available in the context of the ephemeral service principal, you can override
the
ws
fixture on the file level, which would make all workspace fixtures provided by this
plugin to run as lower privilege ephemeral service principal. You cannot combine it with the account-admin-level
principal you're using to create the ephemeral principal.
Example:
from pytest import fixture
@fixture
def ws(make_run_as):
run_as = make_run_as(account_groups=['account.group.used.for.all.tests.in.this.file'])
return run_as.ws
def test_creating_notebook_on_behalf_of_ephemeral_principal(make_notebook):
notebook = make_notebook()
assert notebook.exists()
This fixture currently doesn't work with Databricks Metadata Service authentication on Azure Databricks.
See also acc
, ws
, make_random
, env_or_skip
, log_account_link
, is_in_debug
.
[back to top]
acc
fixture
Create and provide a Databricks AccountClient object.
This fixture initializes a Databricks AccountClient object, which can be used
to interact with the Databricks account API. The created instance of AccountClient
is shared across all test functions within the test session.
Requires DATABRICKS_ACCOUNT_ID
environment variable to be set. If DATABRICKS_HOST
points to a workspace host, the fixture would automatically determine the account host
from it.
See detailed documentation for the list
of environment variables that can be used to authenticate the AccountClient.
In your test functions, include this fixture as an argument to use the AccountClient:
def test_listing_workspaces(acc):
workspaces = acc.workspaces.list()
assert len(workspaces) >= 1
See also log_account_link
, make_acc_group
, make_run_as
, debug_env
, product_info
, env_or_skip
.
[back to top]
spark
fixture
Get Databricks Connect Spark session. Requires databricks-connect
package to be installed.
Usage:
def test_databricks_connect(spark):
rows = spark.sql("SELECT 1").collect()
assert rows[0][0] == 1
See also ws
.
[back to top]
sql_backend
fixture
Create and provide a SQL backend for executing statements.
Requires the environment variable DATABRICKS_WAREHOUSE_ID
to be set.
See also make_schema
, make_table
, make_udf
, sql_exec
, sql_fetch_all
, ws
, env_or_skip
.
[back to top]
sql_exec
fixture
Execute SQL statement and don't return any results.
See also sql_backend
.
[back to top]
sql_fetch_all
fixture
Fetch all rows from a SQL statement.
See also sql_backend
.
[back to top]
make_random
fixture
Fixture to generate random strings.
This fixture provides a function to generate random strings of a specified length.
The generated strings are created using a character set consisting of uppercase letters,
lowercase letters, and digits.
To generate a random string with default length of 16 characters:
random_string = make_random()
assert len(random_string) == 16
To generate a random string with a specified length:
random_string = make_random(k=8)
assert len(random_string) == 8
See also make_acc_group
, make_catalog
, make_cluster
, make_cluster_policy
, make_directory
, make_experiment
, make_feature_table
, make_group
, make_instance_pool
, make_job
, make_model
, make_notebook
, make_pipeline
, make_query
, make_repo
, make_run_as
, make_schema
, make_secret_scope
, make_serving_endpoint
, make_table
, make_udf
, make_user
, make_volume
, make_warehouse
, make_workspace_file
.
[back to top]
make_instance_pool
fixture
Create a Databricks instance pool and clean it up after the test. Returns a function to create instance pools.
Use instance_pool_id
attribute from the returned object to get an ID of the pool.
Keyword Arguments:
instance_pool_name
(str, optional): The name of the instance pool. If not provided, a random name will be generated.node_type_id
(str, optional): The node type ID of the instance pool. If not provided, a node type with local disk and 16GB memory will be used.- other arguments are passed to
WorkspaceClient.instance_pools.create
method.
Usage:
def test_instance_pool(make_instance_pool):
logger.info(f"created {make_instance_pool()}")
See also ws
, make_random
, watchdog_remove_after
.
[back to top]
make_instance_pool_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_job
fixture
Create a Databricks job and clean it up after the test. Returns a function to create jobs, that returns
a Job
instance.
Keyword Arguments:
name
(str, optional): The name of the job. If not provided, a random name will be generated.path
(str, optional): The path to the notebook or file used in the job. If not provided, a random notebook or file will be created- [DEPRECATED: Use
path
instead] notebook_path
(str, optional): The path to the notebook. If not provided, a random notebook will be created. content
(str | bytes, optional): The content of the notebook or file used in the job. If not provided, default content of make_notebook
will be used.task_type
(type[NotebookTask] | type[SparkPythonTask], optional): The type of task. If not provides, type[NotebookTask]
will be used.instance_pool_id
(str, optional): The instance pool id to add to the job cluster. If not provided, no instance pool will be used.spark_conf
(dict, optional): The Spark configuration of the job. If not provided, Spark configuration is not explicitly set.libraries
(list, optional): The list of libraries to install on the job.tags
(list[str], optional): A list of job tags. If not provided, no additional tags will be set on the job.tasks
(list[Task], optional): A list of job tags. If not provided, a single task with a notebook task will be
created, along with a disposable notebook. Latest Spark version and a single worker clusters will be used to run
this ephemeral job.
Usage:
def test_job(make_job):
logger.info(f"created {make_job()}")
See also ws
, make_random
, make_notebook
, make_workspace_file
, watchdog_remove_after
.
[back to top]
make_job_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_cluster
fixture
Create a Databricks cluster, waits for it to start, and clean it up after the test.
Returns a function to create clusters. You can get cluster_id
attribute from the returned object.
Keyword Arguments:
single_node
(bool, optional): Whether to create a single-node cluster. Defaults to False.cluster_name
(str, optional): The name of the cluster. If not provided, a random name will be generated.spark_version
(str, optional): The Spark version of the cluster. If not provided, the latest version will be used.autotermination_minutes
(int, optional): The number of minutes before the cluster is automatically terminated. Defaults to 10.
Usage:
def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")
See also ws
, make_random
, watchdog_remove_after
.
[back to top]
make_cluster_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_cluster_policy
fixture
Create a Databricks cluster policy and clean it up after the test. Returns a function to create cluster policies,
which returns CreatePolicyResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the cluster policy. If not provided, a random name will be generated.
Usage:
def test_cluster_policy(make_cluster_policy):
logger.info(f"created {make_cluster_policy()}")
See also ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_cluster_policy_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_pipeline
fixture
Create Delta Live Table Pipeline and clean it up after the test. Returns a function to create pipelines.
Results in a CreatePipelineResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the pipeline. If not provided, a random name will be generated.libraries
(list, optional): The list of libraries to install on the pipeline. If not provided, a random disposable notebook will be created.clusters
(list, optional): The list of clusters to use for the pipeline. If not provided, a single node cluster will be created with 16GB memory and local disk.
Usage:
def test_pipeline(make_pipeline, make_pipeline_permissions, make_group):
group = make_group()
pipeline = make_pipeline()
make_pipeline_permissions(
object_id=pipeline.pipeline_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_notebook
, watchdog_remove_after
, watchdog_purge_suffix
.
[back to top]
make_warehouse
fixture
Create a Databricks warehouse and clean it up after the test. Returns a function to create warehouses.
Keyword Arguments:
warehouse_name
(str, optional): The name of the warehouse. If not provided, a random name will be generated.warehouse_type
(CreateWarehouseRequestWarehouseType, optional): The type of the warehouse. Defaults to PRO
.cluster_size
(str, optional): The size of the cluster. Defaults to 2X-Small
.
Usage:
def test_warehouse_has_remove_after_tag(ws, make_warehouse):
new_warehouse = make_warehouse()
created_warehouse = ws.warehouses.get(new_warehouse.response.id)
warehouse_tags = created_warehouse.tags.as_dict()
assert warehouse_tags["custom_tags"][0]["key"] == "RemoveAfter"
See also ws
, make_random
, watchdog_remove_after
.
[back to top]
make_group
fixture
This fixture provides a function to manage Databricks workspace groups. Groups can be created with specified
members and roles, and they will be deleted after the test is complete. Deals with eventual consistency issues by
retrying the creation process for 30 seconds and then waiting for up to 3 minutes for the group to be provisioned.
Returns an instance of Group
.
Keyword arguments:
members
(list of strings): A list of user IDs to add to the group.roles
(list of strings): A list of roles to assign to the group.display_name
(str): The display name of the group.entitlements
(list of strings): A list of entitlements to assign to the group.
The following example creates a group with a single member and independently verifies that the group was created:
def test_new_group(make_group, make_user, ws):
user = make_user()
group = make_group(members=[user.id])
loaded = ws.groups.get(group.id)
assert group.display_name == loaded.display_name
assert group.members == loaded.members
See also ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_acc_group
fixture
This fixture provides a function to manage Databricks account groups. Groups can be created with
specified members and roles, and they will be deleted after the test is complete.
Has the same arguments and behavior as make_group
fixture but uses the account
client instead of the workspace client.
Example usage:
def test_new_account_group(make_acc_group, acc):
group = make_acc_group()
loaded = acc.groups.get(group.id)
assert group.display_name == loaded.display_name
See also acc
, make_random
, watchdog_purge_suffix
.
[back to top]
make_user
fixture
This fixture returns a function that creates a Databricks workspace user
and removes it after the test is complete. In case of random naming conflicts,
the fixture will retry the creation process for 30 seconds. Returns an instance
of User
. Usage:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
See also ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_pipeline_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_notebook
fixture
Returns a function to create Databricks Notebooks and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults to dummy-*
notebook in current user's home folder.content
(str | bytes | io.BinaryIO, optional): The content of the notebook. Defaults to print(1)
for Python and SELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults to Language.PYTHON
.encoding
(str
, optional): The file encoding. Defaults to sys.getdefaultencoding()
.format
(ImportFormat
, optional): The format of the notebook. Defaults to ImportFormat.SOURCE
.overwrite
(bool, optional): Whether to overwrite the notebook if it already exists. Defaults to False
.
This example creates a notebook and verifies that print(1)
is in the content:
def test_creates_some_notebook(make_notebook):
notebook = make_notebook()
assert "print(1)" in notebook.read_text()
See also make_job
, make_pipeline
, ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_notebook_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_workspace_file
fixture
Returns a function to create Databricks workspace file and clean up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the file. Defaults to dummy-*
notebook in current user's home folder.content
(str | bytes, optional): The content of the file. Defaults to print(1)
for Python and SELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults to Language.PYTHON
.encoding
(str
, optional): The file encoding. Defaults to sys.getdefaultencoding()
.
This example creates a notebook and verifies that the workspace path is an existing file with contents print(1)
:
def test_create_file(make_workspace_file):
workspace_file = make_workspace_file()
assert workspace_file.is_file()
assert "print(1)" in workspace_file.read_text()
TODO:
Merge functionality with make_notebook
if WorkspacePath
supports creating notebooks.
See also make_job
, ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_directory
fixture
Returns a function to create Databricks Workspace Folders and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults to dummy-*
folder in current user's home folder.
This example creates a folder and verifies that it contains a notebook:
def test_creates_some_folder_with_a_notebook(make_directory, make_notebook):
folder = make_directory()
notebook = make_notebook(path=folder / 'foo.py')
files = [_.name for _ in folder.iterdir()]
assert ['foo.py'] == files
assert notebook.parent == folder
See also make_experiment
, ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_directory_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_repo
fixture
Returns a function to create Databricks Repos and clean them up after the test.
The function returns a RepoInfo
object.
Keyword arguments:
url
(str, optional): The URL of the repository.provider
(str, optional): The provider of the repository.path
(str, optional): The path of the repository. Defaults to /Repos/{current_user}/sdk-{random}-{purge_suffix}
.
Usage:
def test_repo(make_repo):
logger.info(f"created {make_repo()}")
See also ws
, make_random
, watchdog_purge_suffix
.
[back to top]
make_repo_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_workspace_file_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_workspace_file_path_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_secret_scope
fixture
This fixture provides a function to create secret scopes. The created secret scope will be
deleted after the test is complete. Returns the name of the secret scope.
To create a secret scope and use it within a test function:
def test_secret_scope_creation(make_secret_scope):
secret_scope_name = make_secret_scope()
assert secret_scope_name.startswith("dummy-")
See also ws
, make_random
.
[back to top]
make_secret_scope_acl
fixture
This fixture provides a function to manage access control lists (ACLs) for secret scopes.
ACLs define permissions for principals (users or groups) on specific secret scopes.
Arguments:
scope
: The name of the secret scope.principal
: The name of the principal (user or group).permission
: The permission level for the principal on the secret scope.
Returns a tuple containing the secret scope name and the principal name.
To manage secret scope ACLs using the make_secret_scope_acl fixture:
from databricks.sdk.service.workspace import AclPermission
def test_secret_scope_acl_management(make_user, make_secret_scope, make_secret_scope_acl):
scope_name = make_secret_scope()
principal_name = make_user().display_name
permission = AclPermission.READ
acl_info = make_secret_scope_acl(
scope=scope_name,
principal=principal_name,
permission=permission,
)
assert acl_info == (scope_name, principal_name)
See also ws
.
[back to top]
make_authorization_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_udf
fixture
Create a UDF and return its info. Remove it after the test. Returns instance of FunctionInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the UDF will be created. Default is hive_metastore
.schema_name
(str): The name of the schema where the UDF will be created. Default is a random string.name
(str): The name of the UDF. Default is a random string.hive_udf
(bool): If True
, the UDF will be created as a Hive UDF. Default is False
.
Usage:
def test_make_some_udfs(make_schema, make_udf):
schema_a = make_schema(catalog_name="hive_metastore")
make_udf(schema_name=schema_a.name)
make_udf(schema_name=schema_a.name, hive_udf=True)
See also ws
, env_or_skip
, sql_backend
, make_schema
, make_random
.
[back to top]
make_catalog
fixture
Create a catalog and return its info. Remove it after the test.
Returns instance of CatalogInfo
.
Keyword Arguments:
name
(str): The name of the catalog. Default is a random string.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_volume
, ws
, make_random
, watchdog_remove_after
.
[back to top]
make_schema
fixture
Create a schema and return its info. Remove it after the test. Returns instance of SchemaInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema will be created. Default is hive_metastore
.name
(str): The name of the schema. Default is a random string.location
(str): The path to the location if it should be a managed schema.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_table
, make_udf
, make_volume
, sql_backend
, make_random
, watchdog_remove_after
.
[back to top]
make_table
fixture
Create a table and return its info. Remove it after the test. Returns instance of TableInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the table will be created. Default is hive_metastore
.schema_name
(str): The name of the schema where the table will be created. Default is a random string.name
(str): The name of the table. Default is a random string.ctas
(str): The CTAS statement to create the table. Default is None
.non_delta
(bool): If True
, the table will be created as a non-delta table. Default is False
.external
(bool): If True
, the table will be created as an external table. Default is False
.external_csv
(str): The location of the external CSV table. Default is None
.external_delta
(str): The location of the external Delta table. Default is None
.view
(bool): If True
, the table will be created as a view. Default is False
.tbl_properties
(dict): The table properties. Default is None
.hiveserde_ddl
(str): The DDL statement to create the table. Default is None
.storage_override
(str): The storage location override. Default is None
.columns
(list): The list of columns. Default is None
.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_query
, sql_backend
, make_schema
, make_random
, watchdog_remove_after
.
[back to top]
make_storage_credential
fixture
Create a storage credential and return its info. Remove it after the test. Returns instance of StorageCredentialInfo
.
Keyword Arguments:
credential_name
(str): The name of the storage credential. Default is a random string.application_id
(str): The application ID for the Azure service principal. Default is an empty string.client_secret
(str): The client secret for the Azure service principal. Default is an empty string.directory_id
(str): The directory ID for the Azure service principal. Default is an empty string.aws_iam_role_arn
(str): The ARN of the AWS IAM role. Default is an empty string.read_only
(bool): If True
, the storage credential will be read-only. Default is False
.
Usage:
def test_storage_credential(env_or_skip, make_storage_credential, make_random):
random = make_random(6).lower()
credential_name = f"dummy-{random}"
make_storage_credential(
credential_name=credential_name,
aws_iam_role_arn=env_or_skip("TEST_UBER_ROLE_ID"),
)
See also ws
, watchdog_remove_after
.
[back to top]
make_volume
fixture
Create a volume and return its info. Remove it after the test. Returns instance of VolumeInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema and the volume are.schema_name
(str): The name of the schema where the volume is.name
(str): The name of the volume.comment
(str, optional): The comment attached to the volume.
Usage:
def test_volume_creation(make_catalog, make_schema, make_volume, make_random):
catalog = make_catalog()
schema = make_schema(catalog_name=catalog.name)
volume_name = f"dummy_vol_{make_random(6).lower()}"
volume = make_volume(
catalog_name=catalog.name,
schema_name=schema.name,
name=volume_name
)
See also ws
, make_catalog
, make_schema
, make_random
.
[back to top]
product_info
fixture
No description yet.
See also acc
, ws
.
[back to top]
make_model
fixture
Returns a function to create Databricks Models and clean them up after the test.
The function returns a GetModelResponse
object.
Keyword arguments:
model_name
(str, optional): The name of the model. Defaults to dummy-*
.
Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_models(make_group, make_model, make_registered_model_permissions):
group = make_group()
model = make_model()
make_registered_model_permissions(
object_id=model.id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also make_serving_endpoint
, ws
, make_random
, watchdog_remove_after
.
[back to top]
make_experiment
fixture
Returns a function to create Databricks Experiments and clean them up after the test.
The function returns a CreateExperimentResponse
object.
Keyword arguments:
path
(str, optional): The path of the experiment. Defaults to dummy-*
experiment in current user's home folder.experiment_name
(str, optional): The name of the experiment. Defaults to dummy-*
.
Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_experiments(make_group, make_experiment, make_experiment_permissions):
group = make_group()
experiment = make_experiment()
make_experiment_permissions(
object_id=experiment.experiment_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_directory
, watchdog_purge_suffix
.
[back to top]
make_experiment_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_warehouse_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_lakeview_dashboard_permissions
fixture
No description yet.
See also ws
.
[back to top]
log_workspace_link
fixture
rns a function to log a workspace link.
See also ws
.
[back to top]
log_account_link
fixture
rns a function to log an account link.
See also make_run_as
, acc
.
[back to top]
make_dashboard_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_alert_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_query
fixture
Create a query and remove it after the test is done. Returns the LegacyQuery
object.
Keyword Arguments:
sql_query
: The query to be stored. Default is SELECT * FROM <newly created random table>
.
Usage:
from databricks.sdk.service.sql import PermissionLevel
def test_permissions_for_redash(
make_user,
make_query,
make_query_permissions,
):
user = make_user()
query = make_query()
make_query_permissions(
object_id=query.id,
permission_level=PermissionLevel.CAN_EDIT,
user_name=user.display_name,
)
See also ws
, make_table
, make_random
, watchdog_remove_after
.
[back to top]
make_query_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_registered_model_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_serving_endpoint
fixture
Returns a function to create Databricks Serving Endpoints and clean them up after the test.
The function returns a ServingEndpointDetailed
object.
Under the covers, this fixture also creates a model to serve on a small workload size.
Usage:
def test_endpoints(make_group, make_serving_endpoint, make_serving_endpoint_permissions):
group = make_group()
endpoint = make_serving_endpoint()
make_serving_endpoint_permissions(
object_id=endpoint.response.id,
permission_level=PermissionLevel.CAN_QUERY,
group_name=group.display_name,
)
See also ws
, make_random
, make_model
, watchdog_remove_after
.
[back to top]
make_serving_endpoint_permissions
fixture
No description yet.
See also ws
.
[back to top]
make_feature_table
fixture
No description yet.
See also ws
, make_random
.
[back to top]
make_feature_table_permissions
fixture
No description yet.
See also ws
.
[back to top]
watchdog_remove_after
fixture
Purge time for test objects, representing the (UTC-based) hour from which objects may be purged.
See also make_catalog
, make_cluster
, make_instance_pool
, make_job
, make_model
, make_pipeline
, make_query
, make_schema
, make_serving_endpoint
, make_storage_credential
, make_table
, make_warehouse
, watchdog_purge_suffix
.
[back to top]
watchdog_purge_suffix
fixture
HEX-encoded purge time suffix for test objects.
See also make_acc_group
, make_cluster_policy
, make_directory
, make_experiment
, make_group
, make_notebook
, make_pipeline
, make_repo
, make_user
, make_workspace_file
, watchdog_remove_after
.
[back to top]
is_in_debug
fixture
Returns true if the test is running from a debugger in IDE, otherwise false.
The following IDE are supported: IntelliJ IDEA (including Community Edition),
PyCharm (including Community Edition), and Visual Studio Code.
See also debug_env
, env_or_skip
, make_run_as
.
[back to top]
Project Support
Please note that this project is provided for your exploration only and is not
formally supported by Databricks with Service Level Agreements (SLAs). They are
provided AS-IS, and we do not make any guarantees of any kind. Please do not
submit a support ticket relating to any issues arising from the use of this project.
Any issues discovered through the use of this project should be filed as GitHub
Issues on this repository.
They will be reviewed as time permits, but no formal SLAs for support exist.