Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
databricks-labs-pytester
Advanced tools
debug_env_name
fixturedebug_env
fixtureenv_or_skip
fixturews
fixturemake_run_as
fixtureacc
fixturespark
fixturesql_backend
fixturesql_exec
fixturesql_fetch_all
fixturemake_random
fixturemake_instance_pool
fixturemake_instance_pool_permissions
fixturemake_job
fixturemake_job_permissions
fixturemake_cluster
fixturemake_cluster_permissions
fixturemake_cluster_policy
fixturemake_cluster_policy_permissions
fixturemake_pipeline
fixturemake_warehouse
fixturemake_group
fixturemake_acc_group
fixturemake_user
fixturemake_pipeline_permissions
fixturemake_notebook
fixturemake_notebook_permissions
fixturemake_workspace_file
fixturemake_directory
fixturemake_directory_permissions
fixturemake_repo
fixturemake_repo_permissions
fixturemake_workspace_file_permissions
fixturemake_workspace_file_path_permissions
fixturemake_secret_scope
fixturemake_secret_scope_acl
fixturemake_authorization_permissions
fixturemake_udf
fixturemake_catalog
fixturemake_schema
fixturemake_table
fixturemake_storage_credential
fixturemake_volume
fixtureproduct_info
fixturemake_model
fixturemake_experiment
fixturemake_experiment_permissions
fixturemake_warehouse_permissions
fixturemake_lakeview_dashboard_permissions
fixturelog_workspace_link
fixturelog_account_link
fixturemake_dashboard_permissions
fixturemake_alert_permissions
fixturemake_query
fixturemake_query_permissions
fixturemake_registered_model_permissions
fixturemake_serving_endpoint
fixturemake_serving_endpoint_permissions
fixturemake_feature_table
fixturemake_feature_table_permissions
fixturewatchdog_remove_after
fixturewatchdog_purge_suffix
fixtureis_in_debug
fixtureAdd a databricks-labs-pytester
dependency to your pyproject.toml
file (or legacy requirements.txt
file). You can
also install it directly from the command line:
pip install databricks-labs-pytester
If you use hatch
as a build system, make sure to add databricks-labs-pytester
as
a test-time dependency and not as a compile-time dependency, otherwise your wheels will
transitively depend on pytest
, which is not usually something you need.
[project]
name = "name-of-your-project"
# ...
dependencies = [
"databricks-sdk~=0.30",
# ... dependencies required for your code to execute
]
[tool.hatch.envs.default]
dependencies = [
# ... dependencies required to test/validate/format your code:
"black~=24.3.0",
"coverage[toml]~=7.4.4",
"mypy~=1.9.0",
"pylint~=3.2.2",
"pylint-pytest==2.0.0a0",
"databricks-labs-pylint~=0.4.0",
"databricks-labs-pytester~=0.2", # <= this library
"pytest~=8.3.3",
"pytest-cov~=4.1.0",
"pytest-mock~=3.14.0",
"pytest-timeout~=2.3.1",
"pytest-xdist~=3.5.0",
"python-lsp-server>=1.9.0",
"ruff~=0.3.4",
"types-PyYAML~=6.0.12",
"types-requests~=2.31.0",
]
Built on top of Databricks SDK for Python, this library is part of the Databricks Labs Python ecosystem, which includes the following projects:
See this video for a quick overview of the Databricks Labs Python ecosystem.
PyTest Fixtures are a powerful way to manage test setup and teardown in Python. This library provides a set of fixtures to help you write integration tests for Databricks. These fixtures were incubated within the Unity Catalog Automated Migrations project for more than a year and are now available for other projects to simplify integration testing with Databricks.
This library is built on years of debugging integration tests for Databricks and its ecosystem.
That's why it comes with a built-in logger that traces creation and deletion of dummy entities through links in the Databricks Workspace UI. If you run the following code:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
You will see the following output, where the first line is clickable and will take you to the user's profile in the Databricks Workspace UI:
12:30:53 INFO [d.l.p.fixtures.baseline] Created dummy-xwuq-...@example.com: https://.....azuredatabricks.net/#settings/workspace/identity-and-access/users/735...
12:30:53 DEBUG [d.l.p.fixtures.baseline] added workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
12:30:58 DEBUG [d.l.p.fixtures.baseline] clearing 1 workspace user fixtures
12:30:58 DEBUG [d.l.p.fixtures.baseline] removing workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
You may need to add the following to your conftest.py
file to enable this:
import logging
from databricks.labs.blueprint.logger import install_logger
install_logger()
logging.getLogger('databricks.labs.pytester').setLevel(logging.DEBUG)
debug_env_name
fixtureSpecify the name of the debug environment. By default, it is set to .env
,
which will try to find a file named .env
in any of the parent directories of the current working directory and load
the environment variables from it via the debug_env
fixture.
Alternatively, if you are concerned of the
risk of .env
files getting checked into version control,
we recommend using the ~/.databricks/debug-env.json
file to store different sets of environment variables.
The file cannot be checked into version control by design, because it is stored in the user's home directory.
This file is used for local debugging and integration tests in IDEs like PyCharm, VSCode, and IntelliJ IDEA while developing Databricks Platform Automation Stack, which includes Databricks SDKs for Python, Go, and Java, as well as Databricks Terraform Provider and Databricks CLI. This file enables multi-environment and multi-cloud testing with a single set of integration tests.
The file is typically structured as follows:
$ cat ~/.databricks/debug-env.json
{
"ws": {
"CLOUD_ENV": "azure",
"DATABRICKS_HOST": "....azuredatabricks.net",
"DATABRICKS_CLUSTER_ID": "0708-200540-...",
"DATABRICKS_WAREHOUSE_ID": "33aef...",
...
},
"acc": {
"CLOUD_ENV": "aws",
"DATABRICKS_HOST": "accounts.cloud.databricks.net",
"DATABRICKS_CLIENT_ID": "....",
"DATABRICKS_CLIENT_SECRET": "....",
...
}
}
And you can load it in your conftest.py
file as follows:
@pytest.fixture
def debug_env_name():
return "ws"
This will load the ws
environment from the ~/.databricks/debug-env.json
file.
If any of the environment variables are not found, env_or_skip
fixture
will gracefully skip the execution of tests.
See also debug_env
.
debug_env
fixtureLoads environment variables specified in debug_env_name
fixture from a file
for local debugging in IDEs, otherwise allowing the tests to run with the default environment variables
specified in the CI/CD pipeline.
See also acc
, env_or_skip
, ws
, debug_env_name
, is_in_debug
.
env_or_skip
fixtureFixture to get environment variables or skip tests.
It is extremely useful to skip tests if the required environment variables are not set.
In the following example, test_something
would only run if the environment variable
SOME_EXTERNAL_SERVICE_TOKEN
is set:
def test_something(env_or_skip):
token = env_or_skip("SOME_EXTERNAL_SERVICE_TOKEN")
assert token is not None
See also acc
, make_run_as
, make_udf
, sql_backend
, debug_env
, is_in_debug
.
ws
fixtureCreate and provide a Databricks WorkspaceClient object.
This fixture initializes a Databricks WorkspaceClient object, which can be used to interact with the Databricks workspace API. The created instance of WorkspaceClient is shared across all test functions within the test session.
See detailed documentation for the list of environment variables that can be used to authenticate the WorkspaceClient.
In your test functions, include this fixture as an argument to use the WorkspaceClient:
def test_workspace_operations(ws):
clusters = ws.clusters.list_clusters()
assert len(clusters) >= 0
See also log_workspace_link
, make_alert_permissions
, make_authorization_permissions
, make_catalog
, make_cluster
, make_cluster_permissions
, make_cluster_policy
, make_cluster_policy_permissions
, make_dashboard_permissions
, make_directory
, make_directory_permissions
, make_experiment
, make_experiment_permissions
, make_feature_table
, make_feature_table_permissions
, make_group
, make_instance_pool
, make_instance_pool_permissions
, make_job
, make_job_permissions
, make_lakeview_dashboard_permissions
, make_model
, make_notebook
, make_notebook_permissions
, make_pipeline
, make_pipeline_permissions
, make_query
, make_query_permissions
, make_registered_model_permissions
, make_repo
, make_repo_permissions
, make_run_as
, make_secret_scope
, make_secret_scope_acl
, make_serving_endpoint
, make_serving_endpoint_permissions
, make_storage_credential
, make_udf
, make_user
, make_volume
, make_warehouse
, make_warehouse_permissions
, make_workspace_file
, make_workspace_file_path_permissions
, make_workspace_file_permissions
, spark
, sql_backend
, debug_env
, product_info
.
make_run_as
fixtureThis fixture provides a function to create an account service principal via acc
fixture and
assign it to a workspace. The service principal is removed after the test is complete. The service principal is
created with a random display name and assigned to the workspace with the default permissions.
Use the account_groups
argument to assign the service principal to account groups, which have the required
permissions to perform a specific action.
Example:
def test_run_as_lower_privilege_user(make_run_as, ws):
run_as = make_run_as(account_groups=['account.group.name'])
through_query = next(run_as.sql_fetch_all("SELECT CURRENT_USER() AS my_name"))
me = ws.current_user.me()
assert me.user_name != through_query.my_name
Returned object has the following properties:
ws
: Workspace client that is authenticated as the ephemeral service principal.sql_backend
: SQL backend that is authenticated as the ephemeral service principal.sql_exec
: Function to execute a SQL statement on behalf of the ephemeral service principal.sql_fetch_all
: Function to fetch all rows from a SQL statement on behalf of the ephemeral service principal.display_name
: Display name of the ephemeral service principal.application_id
: Application ID of the ephemeral service principal.ws
fixture on the file level, which would make all workspace fixtures provided by this
plugin to run as lower privilege ephemeral service principal. You cannot combine it with the account-admin-level
principal you're using to create the ephemeral principal.Example:
from pytest import fixture
@fixture
def ws(make_run_as):
run_as = make_run_as(account_groups=['account.group.used.for.all.tests.in.this.file'])
return run_as.ws
def test_creating_notebook_on_behalf_of_ephemeral_principal(make_notebook):
notebook = make_notebook()
assert notebook.exists()
This fixture currently doesn't work with Databricks Metadata Service authentication on Azure Databricks.
See also acc
, ws
, make_random
, env_or_skip
, log_account_link
, is_in_debug
.
acc
fixtureCreate and provide a Databricks AccountClient object.
This fixture initializes a Databricks AccountClient object, which can be used to interact with the Databricks account API. The created instance of AccountClient is shared across all test functions within the test session.
Requires DATABRICKS_ACCOUNT_ID
environment variable to be set. If DATABRICKS_HOST
points to a workspace host, the fixture would automatically determine the account host
from it.
See detailed documentation for the list of environment variables that can be used to authenticate the AccountClient.
In your test functions, include this fixture as an argument to use the AccountClient:
def test_listing_workspaces(acc):
workspaces = acc.workspaces.list()
assert len(workspaces) >= 1
See also log_account_link
, make_acc_group
, make_run_as
, debug_env
, product_info
, env_or_skip
.
spark
fixtureGet Databricks Connect Spark session. Requires databricks-connect
package to be installed.
Usage:
def test_databricks_connect(spark):
rows = spark.sql("SELECT 1").collect()
assert rows[0][0] == 1
See also ws
.
sql_backend
fixtureCreate and provide a SQL backend for executing statements.
Requires the environment variable DATABRICKS_WAREHOUSE_ID
to be set.
See also make_schema
, make_table
, make_udf
, sql_exec
, sql_fetch_all
, ws
, env_or_skip
.
sql_exec
fixtureExecute SQL statement and don't return any results.
See also sql_backend
.
sql_fetch_all
fixtureFetch all rows from a SQL statement.
See also sql_backend
.
make_random
fixtureFixture to generate random strings.
This fixture provides a function to generate random strings of a specified length. The generated strings are created using a character set consisting of uppercase letters, lowercase letters, and digits.
To generate a random string with default length of 16 characters:
random_string = make_random()
assert len(random_string) == 16
To generate a random string with a specified length:
random_string = make_random(k=8)
assert len(random_string) == 8
See also make_acc_group
, make_catalog
, make_cluster
, make_cluster_policy
, make_directory
, make_experiment
, make_feature_table
, make_group
, make_instance_pool
, make_job
, make_model
, make_notebook
, make_pipeline
, make_query
, make_repo
, make_run_as
, make_schema
, make_secret_scope
, make_serving_endpoint
, make_table
, make_udf
, make_user
, make_volume
, make_warehouse
, make_workspace_file
.
make_instance_pool
fixtureCreate a Databricks instance pool and clean it up after the test. Returns a function to create instance pools.
Use instance_pool_id
attribute from the returned object to get an ID of the pool.
Keyword Arguments:
instance_pool_name
(str, optional): The name of the instance pool. If not provided, a random name will be generated.node_type_id
(str, optional): The node type ID of the instance pool. If not provided, a node type with local disk and 16GB memory will be used.WorkspaceClient.instance_pools.create
method.Usage:
def test_instance_pool(make_instance_pool):
logger.info(f"created {make_instance_pool()}")
See also ws
, make_random
, watchdog_remove_after
.
make_instance_pool_permissions
fixtureNo description yet.
See also ws
.
make_job
fixtureCreate a Databricks job and clean it up after the test. Returns a function to create jobs, that returns
a Job
instance.
Keyword Arguments:
name
(str, optional): The name of the job. If not provided, a random name will be generated.path
(str, optional): The path to the notebook or file used in the job. If not provided, a random notebook or file will be createdpath
instead] notebook_path
(str, optional): The path to the notebook. If not provided, a random notebook will be created.content
(str | bytes, optional): The content of the notebook or file used in the job. If not provided, default content of make_notebook
will be used.task_type
(type[NotebookTask] | type[SparkPythonTask], optional): The type of task. If not provides, type[NotebookTask]
will be used.instance_pool_id
(str, optional): The instance pool id to add to the job cluster. If not provided, no instance pool will be used.spark_conf
(dict, optional): The Spark configuration of the job. If not provided, Spark configuration is not explicitly set.libraries
(list, optional): The list of libraries to install on the job.tags
(list[str], optional): A list of job tags. If not provided, no additional tags will be set on the job.tasks
(list[Task], optional): A list of job tags. If not provided, a single task with a notebook task will be
created, along with a disposable notebook. Latest Spark version and a single worker clusters will be used to run
this ephemeral job.Usage:
def test_job(make_job):
logger.info(f"created {make_job()}")
See also ws
, make_random
, make_notebook
, make_workspace_file
, watchdog_remove_after
.
make_job_permissions
fixtureNo description yet.
See also ws
.
make_cluster
fixtureCreate a Databricks cluster, waits for it to start, and clean it up after the test.
Returns a function to create clusters. You can get cluster_id
attribute from the returned object.
Keyword Arguments:
single_node
(bool, optional): Whether to create a single-node cluster. Defaults to False.cluster_name
(str, optional): The name of the cluster. If not provided, a random name will be generated.spark_version
(str, optional): The Spark version of the cluster. If not provided, the latest version will be used.autotermination_minutes
(int, optional): The number of minutes before the cluster is automatically terminated. Defaults to 10.Usage:
def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")
See also ws
, make_random
, watchdog_remove_after
.
make_cluster_permissions
fixtureNo description yet.
See also ws
.
make_cluster_policy
fixtureCreate a Databricks cluster policy and clean it up after the test. Returns a function to create cluster policies,
which returns CreatePolicyResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the cluster policy. If not provided, a random name will be generated.Usage:
def test_cluster_policy(make_cluster_policy):
logger.info(f"created {make_cluster_policy()}")
See also ws
, make_random
, watchdog_purge_suffix
.
make_cluster_policy_permissions
fixtureNo description yet.
See also ws
.
make_pipeline
fixtureCreate Delta Live Table Pipeline and clean it up after the test. Returns a function to create pipelines.
Results in a CreatePipelineResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the pipeline. If not provided, a random name will be generated.libraries
(list, optional): The list of libraries to install on the pipeline. If not provided, a random disposable notebook will be created.clusters
(list, optional): The list of clusters to use for the pipeline. If not provided, a single node cluster will be created with 16GB memory and local disk.Usage:
def test_pipeline(make_pipeline, make_pipeline_permissions, make_group):
group = make_group()
pipeline = make_pipeline()
make_pipeline_permissions(
object_id=pipeline.pipeline_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_notebook
, watchdog_remove_after
, watchdog_purge_suffix
.
make_warehouse
fixtureCreate a Databricks warehouse and clean it up after the test. Returns a function to create warehouses.
Keyword Arguments:
warehouse_name
(str, optional): The name of the warehouse. If not provided, a random name will be generated.warehouse_type
(CreateWarehouseRequestWarehouseType, optional): The type of the warehouse. Defaults to PRO
.cluster_size
(str, optional): The size of the cluster. Defaults to 2X-Small
.Usage:
def test_warehouse_has_remove_after_tag(ws, make_warehouse):
new_warehouse = make_warehouse()
created_warehouse = ws.warehouses.get(new_warehouse.response.id)
warehouse_tags = created_warehouse.tags.as_dict()
assert warehouse_tags["custom_tags"][0]["key"] == "RemoveAfter"
See also ws
, make_random
, watchdog_remove_after
.
make_group
fixtureThis fixture provides a function to manage Databricks workspace groups. Groups can be created with specified
members and roles, and they will be deleted after the test is complete. Deals with eventual consistency issues by
retrying the creation process for 30 seconds and then waiting for up to 3 minutes for the group to be provisioned.
Returns an instance of Group
.
Keyword arguments:
members
(list of strings): A list of user IDs to add to the group.roles
(list of strings): A list of roles to assign to the group.display_name
(str): The display name of the group.entitlements
(list of strings): A list of entitlements to assign to the group.The following example creates a group with a single member and independently verifies that the group was created:
def test_new_group(make_group, make_user, ws):
user = make_user()
group = make_group(members=[user.id])
loaded = ws.groups.get(group.id)
assert group.display_name == loaded.display_name
assert group.members == loaded.members
See also ws
, make_random
, watchdog_purge_suffix
.
make_acc_group
fixtureThis fixture provides a function to manage Databricks account groups. Groups can be created with specified members and roles, and they will be deleted after the test is complete.
Has the same arguments and behavior as make_group
fixture but uses the account
client instead of the workspace client.
Example usage:
def test_new_account_group(make_acc_group, acc):
group = make_acc_group()
loaded = acc.groups.get(group.id)
assert group.display_name == loaded.display_name
See also acc
, make_random
, watchdog_purge_suffix
.
make_user
fixtureThis fixture returns a function that creates a Databricks workspace user
and removes it after the test is complete. In case of random naming conflicts,
the fixture will retry the creation process for 30 seconds. Returns an instance
of User
. Usage:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
See also ws
, make_random
, watchdog_purge_suffix
.
make_pipeline_permissions
fixtureNo description yet.
See also ws
.
make_notebook
fixtureReturns a function to create Databricks Notebooks and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults to dummy-*
notebook in current user's home folder.content
(str | bytes | io.BinaryIO, optional): The content of the notebook. Defaults to print(1)
for Python and SELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults to Language.PYTHON
.encoding
(str
, optional): The file encoding. Defaults to sys.getdefaultencoding()
.format
(ImportFormat
, optional): The format of the notebook. Defaults to ImportFormat.SOURCE
.overwrite
(bool, optional): Whether to overwrite the notebook if it already exists. Defaults to False
.This example creates a notebook and verifies that print(1)
is in the content:
def test_creates_some_notebook(make_notebook):
notebook = make_notebook()
assert "print(1)" in notebook.read_text()
See also make_job
, make_pipeline
, ws
, make_random
, watchdog_purge_suffix
.
make_notebook_permissions
fixtureNo description yet.
See also ws
.
make_workspace_file
fixtureReturns a function to create Databricks workspace file and clean up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the file. Defaults to dummy-*
notebook in current user's home folder.content
(str | bytes, optional): The content of the file. Defaults to print(1)
for Python and SELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults to Language.PYTHON
.encoding
(str
, optional): The file encoding. Defaults to sys.getdefaultencoding()
.This example creates a notebook and verifies that the workspace path is an existing file with contents print(1)
:
def test_create_file(make_workspace_file):
workspace_file = make_workspace_file()
assert workspace_file.is_file()
assert "print(1)" in workspace_file.read_text()
TODO:
Merge functionality with make_notebook
if WorkspacePath
supports creating notebooks.
See also make_job
, ws
, make_random
, watchdog_purge_suffix
.
make_directory
fixtureReturns a function to create Databricks Workspace Folders and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults to dummy-*
folder in current user's home folder.This example creates a folder and verifies that it contains a notebook:
def test_creates_some_folder_with_a_notebook(make_directory, make_notebook):
folder = make_directory()
notebook = make_notebook(path=folder / 'foo.py')
files = [_.name for _ in folder.iterdir()]
assert ['foo.py'] == files
assert notebook.parent == folder
See also make_experiment
, ws
, make_random
, watchdog_purge_suffix
.
make_directory_permissions
fixtureNo description yet.
See also ws
.
make_repo
fixtureReturns a function to create Databricks Repos and clean them up after the test.
The function returns a RepoInfo
object.
Keyword arguments:
url
(str, optional): The URL of the repository.provider
(str, optional): The provider of the repository.path
(str, optional): The path of the repository. Defaults to /Repos/{current_user}/sdk-{random}-{purge_suffix}
.Usage:
def test_repo(make_repo):
logger.info(f"created {make_repo()}")
See also ws
, make_random
, watchdog_purge_suffix
.
make_repo_permissions
fixtureNo description yet.
See also ws
.
make_workspace_file_permissions
fixtureNo description yet.
See also ws
.
make_workspace_file_path_permissions
fixtureNo description yet.
See also ws
.
make_secret_scope
fixtureThis fixture provides a function to create secret scopes. The created secret scope will be deleted after the test is complete. Returns the name of the secret scope.
To create a secret scope and use it within a test function:
def test_secret_scope_creation(make_secret_scope):
secret_scope_name = make_secret_scope()
assert secret_scope_name.startswith("dummy-")
See also ws
, make_random
.
make_secret_scope_acl
fixtureThis fixture provides a function to manage access control lists (ACLs) for secret scopes. ACLs define permissions for principals (users or groups) on specific secret scopes.
Arguments:
scope
: The name of the secret scope.principal
: The name of the principal (user or group).permission
: The permission level for the principal on the secret scope.Returns a tuple containing the secret scope name and the principal name.
To manage secret scope ACLs using the make_secret_scope_acl fixture:
from databricks.sdk.service.workspace import AclPermission
def test_secret_scope_acl_management(make_user, make_secret_scope, make_secret_scope_acl):
scope_name = make_secret_scope()
principal_name = make_user().display_name
permission = AclPermission.READ
acl_info = make_secret_scope_acl(
scope=scope_name,
principal=principal_name,
permission=permission,
)
assert acl_info == (scope_name, principal_name)
See also ws
.
make_authorization_permissions
fixtureNo description yet.
See also ws
.
make_udf
fixtureCreate a UDF and return its info. Remove it after the test. Returns instance of FunctionInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the UDF will be created. Default is hive_metastore
.schema_name
(str): The name of the schema where the UDF will be created. Default is a random string.name
(str): The name of the UDF. Default is a random string.hive_udf
(bool): If True
, the UDF will be created as a Hive UDF. Default is False
.Usage:
def test_make_some_udfs(make_schema, make_udf):
schema_a = make_schema(catalog_name="hive_metastore")
make_udf(schema_name=schema_a.name)
make_udf(schema_name=schema_a.name, hive_udf=True)
See also ws
, env_or_skip
, sql_backend
, make_schema
, make_random
.
make_catalog
fixtureCreate a catalog and return its info. Remove it after the test.
Returns instance of CatalogInfo
.
Keyword Arguments:
name
(str): The name of the catalog. Default is a random string.Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_volume
, ws
, make_random
, watchdog_remove_after
.
make_schema
fixtureCreate a schema and return its info. Remove it after the test. Returns instance of SchemaInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema will be created. Default is hive_metastore
.name
(str): The name of the schema. Default is a random string.location
(str): The path to the location if it should be a managed schema.Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_table
, make_udf
, make_volume
, sql_backend
, make_random
, watchdog_remove_after
.
make_table
fixtureCreate a table and return its info. Remove it after the test. Returns instance of TableInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the table will be created. Default is hive_metastore
.schema_name
(str): The name of the schema where the table will be created. Default is a random string.name
(str): The name of the table. Default is a random string.ctas
(str): The CTAS statement to create the table. Default is None
.non_delta
(bool): If True
, the table will be created as a non-delta table. Default is False
.external
(bool): If True
, the table will be created as an external table. Default is False
.external_csv
(str): The location of the external CSV table. Default is None
.external_delta
(str): The location of the external Delta table. Default is None
.view
(bool): If True
, the table will be created as a view. Default is False
.tbl_properties
(dict): The table properties. Default is None
.hiveserde_ddl
(str): The DDL statement to create the table. Default is None
.storage_override
(str): The storage location override. Default is None
.columns
(list): The list of columns. Default is None
.Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_query
, sql_backend
, make_schema
, make_random
, watchdog_remove_after
.
make_storage_credential
fixtureCreate a storage credential and return its info. Remove it after the test. Returns instance of StorageCredentialInfo
.
Keyword Arguments:
credential_name
(str): The name of the storage credential. Default is a random string.application_id
(str): The application ID for the Azure service principal. Default is an empty string.client_secret
(str): The client secret for the Azure service principal. Default is an empty string.directory_id
(str): The directory ID for the Azure service principal. Default is an empty string.aws_iam_role_arn
(str): The ARN of the AWS IAM role. Default is an empty string.read_only
(bool): If True
, the storage credential will be read-only. Default is False
.Usage:
def test_storage_credential(env_or_skip, make_storage_credential, make_random):
random = make_random(6).lower()
credential_name = f"dummy-{random}"
make_storage_credential(
credential_name=credential_name,
aws_iam_role_arn=env_or_skip("TEST_UBER_ROLE_ID"),
)
See also ws
, watchdog_remove_after
.
make_volume
fixtureCreate a volume and return its info. Remove it after the test. Returns instance of VolumeInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema and the volume are.schema_name
(str): The name of the schema where the volume is.name
(str): The name of the volume.comment
(str, optional): The comment attached to the volume.Usage:
def test_volume_creation(make_catalog, make_schema, make_volume, make_random):
# Create a catalog
catalog = make_catalog()
# Create a schema in the catalog
schema = make_schema(catalog_name=catalog.name)
# Generate a random name for the volume
volume_name = f"dummy_vol_{make_random(6).lower()}"
# Create the volume
volume = make_volume(
catalog_name=catalog.name,
schema_name=schema.name,
name=volume_name
)
See also ws
, make_catalog
, make_schema
, make_random
.
product_info
fixtureNo description yet.
make_model
fixtureReturns a function to create Databricks Models and clean them up after the test.
The function returns a GetModelResponse
object.
Keyword arguments:
model_name
(str, optional): The name of the model. Defaults to dummy-*
.Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_models(make_group, make_model, make_registered_model_permissions):
group = make_group()
model = make_model()
make_registered_model_permissions(
object_id=model.id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also make_serving_endpoint
, ws
, make_random
, watchdog_remove_after
.
make_experiment
fixtureReturns a function to create Databricks Experiments and clean them up after the test.
The function returns a CreateExperimentResponse
object.
Keyword arguments:
path
(str, optional): The path of the experiment. Defaults to dummy-*
experiment in current user's home folder.experiment_name
(str, optional): The name of the experiment. Defaults to dummy-*
.Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_experiments(make_group, make_experiment, make_experiment_permissions):
group = make_group()
experiment = make_experiment()
make_experiment_permissions(
object_id=experiment.experiment_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_directory
, watchdog_purge_suffix
.
make_experiment_permissions
fixtureNo description yet.
See also ws
.
make_warehouse_permissions
fixtureNo description yet.
See also ws
.
make_lakeview_dashboard_permissions
fixtureNo description yet.
See also ws
.
log_workspace_link
fixturerns a function to log a workspace link.
See also ws
.
log_account_link
fixturerns a function to log an account link.
See also make_run_as
, acc
.
make_dashboard_permissions
fixtureNo description yet.
See also ws
.
make_alert_permissions
fixtureNo description yet.
See also ws
.
make_query
fixtureCreate a query and remove it after the test is done. Returns the LegacyQuery
object.
Keyword Arguments:
sql_query
: The query to be stored. Default is SELECT * FROM <newly created random table>
.Usage:
from databricks.sdk.service.sql import PermissionLevel
def test_permissions_for_redash(
make_user,
make_query,
make_query_permissions,
):
user = make_user()
query = make_query()
make_query_permissions(
object_id=query.id,
permission_level=PermissionLevel.CAN_EDIT,
user_name=user.display_name,
)
See also ws
, make_table
, make_random
, watchdog_remove_after
.
make_query_permissions
fixtureNo description yet.
See also ws
.
make_registered_model_permissions
fixtureNo description yet.
See also ws
.
make_serving_endpoint
fixtureReturns a function to create Databricks Serving Endpoints and clean them up after the test.
The function returns a ServingEndpointDetailed
object.
Under the covers, this fixture also creates a model to serve on a small workload size.
Usage:
def test_endpoints(make_group, make_serving_endpoint, make_serving_endpoint_permissions):
group = make_group()
endpoint = make_serving_endpoint()
make_serving_endpoint_permissions(
object_id=endpoint.response.id,
permission_level=PermissionLevel.CAN_QUERY,
group_name=group.display_name,
)
See also ws
, make_random
, make_model
, watchdog_remove_after
.
make_serving_endpoint_permissions
fixtureNo description yet.
See also ws
.
make_feature_table
fixtureNo description yet.
See also ws
, make_random
.
make_feature_table_permissions
fixtureNo description yet.
See also ws
.
watchdog_remove_after
fixturePurge time for test objects, representing the (UTC-based) hour from which objects may be purged.
See also make_catalog
, make_cluster
, make_instance_pool
, make_job
, make_model
, make_pipeline
, make_query
, make_schema
, make_serving_endpoint
, make_storage_credential
, make_table
, make_warehouse
, watchdog_purge_suffix
.
watchdog_purge_suffix
fixtureHEX-encoded purge time suffix for test objects.
See also make_acc_group
, make_cluster_policy
, make_directory
, make_experiment
, make_group
, make_notebook
, make_pipeline
, make_repo
, make_user
, make_workspace_file
, watchdog_remove_after
.
is_in_debug
fixtureReturns true if the test is running from a debugger in IDE, otherwise false.
The following IDE are supported: IntelliJ IDEA (including Community Edition), PyCharm (including Community Edition), and Visual Studio Code.
See also debug_env
, env_or_skip
, make_run_as
.
Please note that this project is provided for your exploration only and is not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS, and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of this project.
Any issues discovered through the use of this project should be filed as GitHub Issues on this repository. They will be reviewed as time permits, but no formal SLAs for support exist.
FAQs
Python Testing for Databricks
We found that databricks-labs-pytester demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.