Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
databricks-labs-blueprint
Advanced tools
Baseline for Databricks Labs projects written in Python. Sources are validated with mypy
and pylint
. See Contributing instructions if you would like to improve this project.
pathlib.Path
-like interfaces
@dataclass
configuration@dataclass
configurationSerdeError
with as_dict()
and from_dict()
databricks labs ...
Router
You can install this project via pip
:
pip install databricks-labs-blueprint
This library contains a proven set of building blocks, tested in production through UCX and projects.
pathlib.Path
-like interfacesThis library exposes subclasses of pathlib
from Python's standard
library that work with Databricks Workspace paths. These classes provide a more intuitive and Pythonic way to work
with Databricks Workspace paths than the standard str
paths. The classes are designed to be drop-in replacements
for pathlib.Path
and provide additional functionality for working with Databricks Workspace paths.
This code initializes a client to interact with a Databricks workspace, creates
a relative workspace path (~/some-folder/foo/bar/baz
), verifies the path is not absolute, and then demonstrates
that converting this relative path to an absolute path is not implemented and raises an error. Subsequently,
it expands the relative path to the user's home directory and creates the specified directory if it does not
already exist.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz")
assert not wsp.is_absolute()
wsp.absolute() # raises NotImplementedError
with_user = wsp.expanduser()
with_user.mkdir()
user_name = ws.current_user.me().user_name
wsp_check = WorkspacePath(ws, f"/Users/{user_name}/{name}/foo/bar/baz")
assert wsp_check.is_dir()
wsp_check.parent.rmdir() # raises BadRequest
wsp_check.parent.rmdir(recursive=True)
assert not wsp_check.exists()
This code expands the ~
symbol to the full path of the user's home directory, computes the relative path from this
home directory to the previously created directory (~/some-folder/foo/bar/baz
), and verifies it matches the expected
relative path (some-folder/foo/bar/baz
). It then confirms that the expanded path is absolute, checks that
calling absolute()
on this path returns the path itself, and converts the path to a FUSE-compatible path
format (/Workspace/username@example.com/some-folder/foo/bar/baz
).
from pathlib import Path
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz")
with_user = wsp.expanduser()
home = WorkspacePath(ws, "~").expanduser()
relative_name = with_user.relative_to(home)
assert relative_name.as_posix() == f"{name}/foo/bar/baz"
assert with_user.is_absolute()
assert with_user.absolute() == with_user
assert with_user.as_fuse() == Path("/Workspace") / with_user.as_posix()
as_uri()
method returns a browser-accessible URI for the workspace path. This example retrieves the current user's username
from the Databricks workspace client, constructs a browser-accessible URI for the previously created directory
(~/some-folder/foo/bar/baz) by formatting the host URL and encoding the username, and then verifies that the URI
generated by the with_user path object matches the constructed browser URI:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz")
with_user = wsp.expanduser()
user_name = ws.current_user.me().user_name
browser_uri = f'{ws.config.host}#workspace/Users/{user_name.replace("@", "%40")}/{name}/foo/bar/baz'
assert with_user.as_uri() == browser_uri
read/write_text()
, read/write_bytes()
, and glob()
MethodsThis code creates a WorkspacePath
object for the path ~/some-folder/a/b/c
, expands it to the full user path,
and creates the directory along with any necessary parent directories. It then creates a file named hello.txt
within
this directory, writes "Hello, World!" to it, and verifies the content. The code lists all .txt
files in the directory
and ensures there is exactly one file, which is hello.txt
. Finally, it deletes hello.txt
and confirms that the file
no longer exists.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}/a/b/c")
with_user = wsp.expanduser()
with_user.mkdir(parents=True)
hello_txt = with_user / "hello.txt"
hello_txt.write_text("Hello, World!")
assert hello_txt.read_text() == "Hello, World!"
files = list(with_user.glob("**/*.txt"))
assert len(files) == 1
assert hello_txt == files[0]
assert files[0].name == "hello.txt"
with_user.joinpath("hello.txt").unlink()
assert not hello_txt.exists()
read_bytes()
method works as expected:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}")
with_user = wsp.expanduser()
with_user.mkdir(parents=True)
hello_bin = with_user.joinpath("hello.bin")
hello_bin.write_bytes(b"Hello, World!")
assert hello_bin.read_bytes() == b"Hello, World!"
with_user.joinpath("hello.bin").unlink()
assert not hello_bin.exists()
This code creates a WorkspacePath object for the path ~/some-folder, expands it to the full user path, and creates the directory along with any necessary parent directories. It then creates a file named hello.txt within this directory and writes "Hello, World!" to it. The code then renames the file to hello2.txt, verifies that hello.txt no longer exists, and checks that the content of hello2.txt is "Hello, World!".
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
name = 'some-folder'
ws = WorkspaceClient()
wsp = WorkspacePath(ws, f"~/{name}")
with_user = wsp.expanduser()
with_user.mkdir(parents=True)
hello_txt = with_user / "hello.txt"
hello_txt.write_text("Hello, World!")
hello_txt.replace(with_user / "hello2.txt")
assert not hello_txt.exists()
assert (with_user / "hello2.txt").read_text() == "Hello, World!"
This code initializes a Databricks WorkspaceClient, creates a WorkspacePath object for the path ~/some-folder, and defines two items within this folder: a text file (a.txt) and a Python notebook (b). It creates the notebook with specified content and writes "Hello, World!" to the text file. The code then retrieves all files in the folder, asserts there are exactly two files, and verifies the suffix and content of each file. Specifically, it checks that a.txt has a .txt suffix and b has a .py suffix, with the notebook containing the expected code.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.paths import WorkspacePath
ws = WorkspaceClient()
folder = WorkspacePath(ws, "~/some-folder")
txt_file = folder / "a.txt"
py_notebook = folder / "b" # notebooks have no file extension
make_notebook(path=py_notebook, content="display(spark.range(10))")
txt_file.write_text("Hello, World!")
files = {_.name: _ for _ in folder.glob("**/*")}
assert len(files) == 2
assert files["a.txt"].suffix == ".txt"
assert files["b"].suffix == ".py" # suffix is determined from ObjectInfo
assert files["b"].read_text() == "# Databricks notebook source\ndisplay(spark.range(10))"
Your command-line apps do need testable interactivity, which is provided by from databricks.labs.blueprint.tui import Prompts
. Here are some examples of it:
It is also integrated with our command router.
Use prompts.question()
as a bit more involved than input()
builtin:
from databricks.labs.blueprint.tui import Prompts
prompts = Prompts()
answer = prompts.question('Enter a year', default='2024', valid_number=True)
print(answer)
Optional arguments are:
default
(str) - use given value if user didn't input anythingmax_attempts
(int, default 10) - number of attempts to throw exception after invalid or empty inputvalid_number
(bool) - input has to be a valid numbervalid_regex
(bool) - input has to be a valid regular expressionvalidate
- function that takes a string and returns boolean, like lambda x: 'awesome' in x
, that could be used to further validate input.Use prompts.confirm()
to guard any optional or destructive actions of your app:
if prompts.confirm('Destroy database?'):
print('DESTROYING DATABASE')
Use to select a value from a list:
answer = prompts.choice('Select a language', ['Python', 'Rust', 'Go', 'Java'])
print(answer)
Use to select a value from the dictionary by showing users sorted dictionary keys:
answer = prompts.choice_from_dict('Select a locale', {
'Українська': 'ua',
'English': 'en'
})
print(f'Locale is: {answer}')
Use to select multiple items from dictionary
answer = prompts.multiple_choice_from_dict(
'What projects are written in Python? Select [DONE] when ready.', {
'Databricks Labs UCX': 'ucx',
'Databricks SDK for Python': 'sdk-py',
'Databricks SDK for Go': 'sdk-go',
'Databricks CLI': 'cli',
})
print(f'Answer is: {answer}')
Use MockPrompts
with regular expressions as keys and values as answers. The longest key takes precedence.
from databricks.labs.blueprint.tui import MockPrompts
def test_ask_for_int():
prompts = MockPrompts({r".*": ""})
res = prompts.question("Number of threads", default="8", valid_number=True)
assert "8" == res
There's a basic logging configuration available for Python SDK, but the default output is not pretty and is relatively inconvenient to read. Here's how make output from Python's standard logging facility more enjoyable to read:
from databricks.labs.blueprint.logger import install_logger
install_logger()
import logging
logging.root.setLevel("DEBUG") # use only for development or demo purposes
logger = logging.getLogger("name.of.your.module")
logger.debug("This is a debug message")
logger.info("This is an table message")
logger.warning("This is a warning message")
logger.error("This is an error message", exc_info=KeyError(123))
logger.critical("This is a critical message")
Here are the assumptions made by this formatter:
src/databricks/labs/ucx/migration/something.py
, which translate into databricks.labs.ucx.migration.something
fully-qualified Python module names, that get reflected into __name__
top-level code environment special variable, that you idiomatically use with logging as logger.getLogger(__name__)
. This log formatter shortens the full module path to a more readable d.l.u.migration.something
, which is easier to consume from a terminal screen or a notebook.MainThread
, because the overwhelming majority of Python applications are single-threaded.Here's how the output would look like on dark terminal backgrounds, including those from GitHub Actions:
And here's how things will appear when executed from Databricks Runtime as part of notebook or a workflow:
Just place the following code in your wheel's top-most __init__.py
file:
from databricks.labs.blueprint.logger import install_logger
install_logger(level="INFO")
And place this idiomatic
# ... insert this into the top of your file
from databricks.labs.blueprint.entrypoint import get_logger
logger = get_logger(__file__)
# ... top of the file insert end
... and you'll be able to benefit from the readable console stderr formatting everywhere
Each time you'd need to turn on debug logging, just invoke logging.root.setLevel("DEBUG")
(even in notebook).
console_script
EntrypointsWhen you invoke Python as an entry point to your wheel (also known as console_scripts
), __name__
top-level code environment would always be equal to __main__
. But you really want to get the logger to be named after your Python module and not just __main__
(see rendering in Databricks notebooks).
If you create a dist/logger.py
file with the following contents:
from databricks.labs.blueprint.entrypoint import get_logger, run_main
logger = get_logger(__file__)
def main(first_arg, second_arg, *other):
logger.info(f'First arg is: {first_arg}')
logger.info(f'Second arg is: {second_arg}')
logger.info(f'Everything else is: {other}')
logger.debug('... and this message is only shown when you are debugging from PyCharm IDE')
if __name__ == '__main__':
run_main(main)
... and invoke it with python dist/logger.py Hello world, my name is Serge
, you should get back the following output.
13:46:42 INFO [dist.logger] First arg is: Hello
13:46:42 INFO [dist.logger] Second arg is: world,
13:46:42 INFO [dist.logger] Everything else is: ('my', 'name', 'is', 'Serge')
Everything is made easy thanks to run_main(fn)
helper.
Python applies global interpreter lock (GIL) for compute-intensive tasks, though IO-intensive tasks, like calling Databricks APIs through Databricks SDK for Python, are not subject to GIL. It's quite a common task to perform multiple different API calls in parallel, though it is overwhelmingly difficult to do multi-threading right. concurrent.futures import ThreadPoolExecutor
is great, but sometimes we want something even more high level. This library helps you navigate the most common road bumps.
This library helps you filtering out empty results from background tasks, so that the downstream code is generally simpler. We're also handling the thread pool namind, so that the name of the list of tasks properly gets into log messages. After all background tasks completed their execution, we log something like Finished 'task group name' tasks: 50% results available (2/4). Took 0:00:00.000604
.
from databricks.labs.blueprint.parallel import Threads
def not_really_but_fine():
logger.info("did something, but returned None")
def doing_something():
logger.info("doing something important")
return f'result from {doing_something.__name__}'
logger.root.setLevel('DEBUG')
tasks = [not_really_but_fine, not_really_but_fine, doing_something, doing_something]
results, errors = Threads.gather("task group name", tasks)
assert ['result from doing_something', 'result from doing_something'] == results
assert [] == errors
This will log the following messages:
14:20:15 DEBUG [d.l.blueprint.parallel] Starting 4 tasks in 20 threads
14:20:15 INFO [dist.logger][task_group_name_0] did something, but returned None
14:20:15 INFO [dist.logger][task_group_name_1] did something, but returned None
14:20:15 INFO [dist.logger][task_group_name_1] doing something important
14:20:15 INFO [dist.logger][task_group_name_1] doing something important
14:20:15 INFO [d.l.blueprint.parallel][task_group_name_1] task group name 4/4, rps: 7905.138/sec
14:20:15 INFO [d.l.blueprint.parallel] Finished 'task group name' tasks: 50% results available (2/4). Took 0:00:00.000604
Inspired by Go Language's idiomatic error handling approach, this library allows for collecting errors from all of the background tasks and handle them separately. For all other cases, we recommend using strict failures
from databricks.sdk.errors import NotFound
from databricks.labs.blueprint.parallel import Threads
def works():
return True
def fails():
raise NotFound("something is not right")
tasks = [works, fails, works, fails, works, fails, works, fails]
results, errors = Threads.gather("doing some work", tasks)
assert [True, True, True, True] == results
assert 4 == len(errors)
This will log the following messages:
14:08:31 ERROR [d.l.blueprint.parallel][doing_some_work_0] doing some work task failed: something is not right: ...
...
14:08:31 ERROR [d.l.blueprint.parallel][doing_some_work_3] doing some work task failed: something is not right: ...
14:08:31 ERROR [d.l.blueprint.parallel] More than half 'doing some work' tasks failed: 50% results available (4/8). Took 0:00:00.001011
Use Threads.strict(...)
to raise ManyError
with the summary of all failed tasks:
from databricks.sdk.errors import NotFound
from databricks.labs.blueprint.parallel import Threads
def works():
return True
def fails():
raise NotFound("something is not right")
tasks = [works, fails, works, fails, works, fails, works, fails]
results = Threads.strict("doing some work", tasks)
# this line won't get executed
assert [True, True, True, True] == results
This will log the following messages:
...
14:11:46 ERROR [d.l.blueprint.parallel] More than half 'doing some work' tasks failed: 50% results available (4/8). Took 0:00:00.001098
...
databricks.labs.blueprint.parallel.ManyError: Detected 4 failures: NotFound: something is not right
There always needs to be a location, where you put application code, artifacts, and configuration.
The Installation
class is used to manage the ~/.{product}
folder on WorkspaceFS to track typed files.
It provides methods for serializing and deserializing objects of a specific type, as well as managing the storage location
for those objects. The class includes methods for loading and saving objects, uploading and downloading
files, and managing the installation folder.
The Installation
class can be helpful for unit testing by allowing you to mock the file system and control
the behavior of the load
and save
methods.
See unit testing for more details.
The install_folder
method returns the path to the installation folder on WorkspaceFS. The installation folder
is used to store typed files that are managed by the Installation
class. Publishing wheels
update the version.json
file in the install folder.
When integration testing, you may want to have a random installation folder for each test execution.
If an install_folder
argument is provided to the constructor of the Installation
class, it will be used
as the installation folder. Otherwise, the installation folder will be determined based on the current user's
username. Specifically, the installation folder will be /Users/{user_name}/.{product}
, where {user_name}
is the username of the current user and {product}
is the name of the product
associated with the installation. Here is an example of how you can use the install_folder
method:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
# Create an Installation object for the "blueprint" product
install = Installation(WorkspaceClient(), "blueprint")
# Print the path to the installation folder
print(install.install_folder())
# Output: /Users/{user_name}/.blueprint
In this example, the Installation
object is created for the "blueprint" product. The install_folder
method
is then called to print the path to the installation folder. The output will be /Users/{user_name}/.blueprint
,
where {user_name}
is the username of the current user.
You can also provide an install_folder
argument to the constructor to specify a custom installation folder.
Here is an example of how you can do this:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
# Create an Installation object for the "blueprint" product with a custom installation folder
install = Installation(WorkspaceClient(), "blueprint", install_folder="/my/custom/folder")
# Print the path to the installation folder
print(install.install_folder())
# Output: /my/custom/folder
In this example, the Installation
object is created for the "blueprint" product with a custom installation
folder of /my/custom/folder
. The install_folder
method is then called to print the path to the installation
folder. The output will be /my/custom/folder
.
Installation.current(ws, product)
returns the Installation
object for the given product in the current workspace.
If the installation is not found, a NotFound
error is raised. If assume_user
argument is True, the method
will assume that the installation is in the user's home directory and return it if found. If False, the method
will only return an installation that is in the /Applications
directory.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
ws = WorkspaceClient()
# current user installation
installation = Installation.assume_user_home(ws, "blueprint")
assert "/Users/foo/.blueprint" == installation.install_folder()
assert not installation.is_global()
# workspace global installation
installation = Installation.current(ws, "blueprint")
assert "/Applications/blueprint" == installation.install_folder()
assert installation.is_global()
Installation.existing(ws, product)
Returns a collection of all existing installations for the given product in the current workspace.
This method searches for installations in the root /Applications directory and home directories of all users in the workspace.
Let's say, users foo@example.com
and bar@example.com
installed blueprint
product in their home folders. The following
code will print /Workspace/bar@example.com/.blueprint
and /Workspace/foo@example.com/.blueprint
:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
ws = WorkspaceClient()
global_install = Installation.assume_global(ws, 'blueprint')
global_install.upload("some.bin", b"...")
user_install = Installation.assume_user_home(ws, 'blueprint')
user_install.upload("some.bin", b"...")
for blueprint in Installation.existing(ws, "blueprint"):
print(blueprint.install_folder())
@dataclass
configurationThe save(obj)
method saves a dataclass instance of type T
to a file on WorkspaceFS. If no filename
is provided,
the name of the type_ref
class will be used as the filename. Any missing parent directories are created automatically.
If the object has a __version__
attribute, the method will add a version
field to the serialized object
with the value of the __version__
attribute. See configuration format evolution
for more details. save(obj)
works with JSON and YAML configurations without the need to supply filename
keyword
attribute. When you need to save CSV files, the filename
attribute is required. If you need to
upload arbitrary and untyped files, use the upload()
method.
Here is an example of how you can use the save
method:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
install = Installation(WorkspaceClient(), "blueprint")
@dataclass
class MyClass:
field1: str
field2: str
obj = MyClass('value1', 'value2')
install.save(obj)
# Verify that the object was saved correctly
loaded_obj = install.load(MyClass)
assert loaded_obj == obj
In this example, the Installation
object is created for the "blueprint" product. A dataclass object of type
MyClass
is then created and saved to a file using the save
method. The object is then loaded from the file
using the load
method and compared to the original object to verify that
it was saved correctly.
You may need to upload a CSV file to Databricks Workspace, so that it's easier editable from a Databricks Workspace UI or tools like Google Sheets or Microsoft Excel. If non-technical humands don't need to edit application state, use dataclasses for configuration. CSV files currently don't support format evolution.
The following example will save workspaces.csv
file with two records and a header:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.provisioning import Workspace
from databricks.labs.blueprint.installation import Installation
installation = Installation(WorkspaceClient(), "blueprint")
installation.save([
Workspace(workspace_id=1234, workspace_name="first"),
Workspace(workspace_id=1235, workspace_name="second"),
], filename="workspaces.csv")
# ~ $ databricks workspace export /Users/foo@example.com/.blueprint/workspaces.csv
# ... workspace_id,workspace_name
# ... 1234,first
# ... 1235,second
@dataclass
configurationThe load(type_ref[, filename])
method loads an object of type type_ref
from a file on WorkspaceFS. If no filename
is
provided, the __file__
attribute of type_ref
will be used as the filename, otherwise the library will figure out the name
based on a class name.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
@dataclass
class SomeConfig: # <-- auto-detected filename is `some-config.json`
version: str
ws = WorkspaceClient()
installation = Installation.current(ws, "blueprint")
cfg = installation.load(SomeConfig)
installation.save(SomeConfig("0.1.2"))
installation.assert_file_written("some-config.json", {"version": "0.1.2"})
SerdeError
with as_dict()
and from_dict()
In the rare circumstances when you cannot use @dataclass or you get SerdeError
that you cannot explain, you can implement from_dict(cls, raw: dict) -> 'T'
and as_dict(self) -> dict
methods on the class:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
class SomePolicy:
def __init__(self, a, b):
self._a = a
self._b = b
def as_dict(self) -> dict:
return {"a": self._a, "b": self._b}
@classmethod
def from_dict(cls, raw: dict):
return cls(raw.get("a"), raw.get("b"))
def __eq__(self, o):
assert isinstance(o, SomePolicy)
return self._a == o._a and self._b == o._b
policy = SomePolicy(1, 2)
installation = Installation.current(WorkspaceClient(), "blueprint")
installation.save(policy, filename="backups/policy-123.json")
load = installation.load(SomePolicy, filename="backups/policy-123.json")
assert load == policy
As time progresses, your application evolves. So does the configuration file format with it. This library provides a common utility to seamlessly evolve configuration file format across versions, providing callbacks to convert from older versions to newer. If you need to migrate configuration or database state of the entire application, use the application state migrations.
If the type has a __version__
attribute, the method will check that the version of the object in the file
matches the expected version. If the versions do not match, the method will attempt to migrate the object to
the expected version using a method named v{actual_version}_migrate
on the type_ref
class. If the migration
is successful, the method will return the migrated object. If the migration is not successful, the method will
raise an IllegalState
exception. Let's say, we have /Users/foo@example.com/.blueprint/config.yml
file with
only the initial: 999
as content, which is from older installations of the blueprint
product:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
@dataclass
class EvolvedConfig:
__file__ = "config.yml"
__version__ = 3
initial: int
added_in_v1: int
added_in_v2: int
@staticmethod
def v1_migrate(raw: dict) -> dict:
raw["added_in_v1"] = 111
raw["version"] = 2
return raw
@staticmethod
def v2_migrate(raw: dict) -> dict:
raw["added_in_v2"] = 222
raw["version"] = 3
return raw
installation = Installation.current(WorkspaceClient(), "blueprint")
cfg = installation.load(EvolvedConfig)
assert 999 == cfg.initial
assert 111 == cfg.added_in_v1 # <-- added by v1_migrate()
assert 222 == cfg.added_in_v2 # <-- added by v2_migrate()
The upload(filename, raw_bytes)
and upload_dbfs(filename, raw_bytes)
methods upload raw bytes to a file on
WorkspaceFS (or DBFS) with the given filename
, creating any missing directories where required. This method
is used to upload files that are not typed, i.e., they do not use the @dataclass
decorator.
installation = Installation(ws, "blueprint")
target = installation.upload("wheels/foo.whl", b"abc")
assert "/Users/foo/.blueprint/wheels/foo.whl" == target
The most common example is a wheel, which we already integrate with Installation
framework.
You can use files()
method to recursively list all files in the install folder.
You can create a MockInstallation
object and use it to override the default installation folder and the contents
of the files in that folder. This allows you to test the of your code in different scenarios, such as when a file
is not found or when the contents of a file do not match the expected format.
For example, you have the following WorkspaceConfig
class that is serialized into config.yml
on your workspace:
@dataclass
class WorkspaceConfig:
__file__ = "config.yml"
__version__ = 2
inventory_database: str
connect: Config | None = None
workspace_group_regex: str | None = None
include_group_names: list[str] | None = None
num_threads: int | None = 10
database_to_catalog_mapping: dict[str, str] | None = None
log_level: str | None = "INFO"
workspace_start_path: str = "/"
Here's the only code necessary to verify that specific content got written:
from databricks.labs.blueprint.installation import MockInstallation
installation = MockInstallation()
installation.save(WorkspaceConfig(inventory_database="some_blueprint"))
installation.assert_file_written("config.yml", {
"version": 2,
"inventory_database": "some_blueprint",
"log_level": "INFO",
"num_threads": 10,
"workspace_start_path": "/",
})
This method is far superior than directly comparing raw bytes content via mock:
ws.workspace.upload.assert_called_with(
"/Users/foo/.blueprint/config.yml",
yaml.dump(
{
"version": 2,
"num_threads": 10,
"inventory_database": "some_blueprint",
"include_group_names": ["foo", "bar"],
"workspace_start_path": "/",
"log_level": "INFO",
}
).encode("utf8"),
format=ImportFormat.AUTO,
overwrite=True,
)
And it's even better if you use PyTest, where we have even deeper integration.
If you are using PyTest, then add this to your conftest.py
, so that
the assertions are more readable:
import pytest
pytest.register_assert_rewrite('databricks.labs.blueprint.installation')
As time goes by, your applications evolve as well, requiring the addition of new columns to database schemas, changes of the database state, or some migrations of configured workflows. This utility allows you to do seamless upgrades from version X to version Z through version Y. Idiomatic usage in your deployment automation is as follows:
from ... import Config
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.upgrades import Upgrades
from databricks.labs.blueprint.wheels import ProductInfo
product_info = ProductInfo.from_class(Config)
ws = WorkspaceClient(product=product_info.product_name(), product_version=product_info.version())
installation = product_info.current_installation(ws)
config = installation.load(Config)
upgrades = Upgrades(product_info, installation)
upgrades.apply(ws)
The upgrade process loads the version of the product that is about to be installed from __about__.py
file that
declares the __version__
variable. This version is compares with the version currently installed on
the Databricks Workspace by loading it from the version.json
file in the installation folder. This file is kept
up-to-date automatically if you use the databricks.labs.blueprint.wheels.WheelsV2.
If those versions are different, the process looks for the upgrades
folder next to __about__.py
file and
computes a difference for the upgrades in need to be rolled out. Every upgrade script in that directory has to
start with a valid SemVer identifier, followed by the alphanumeric description of the change,
like v0.0.1_add_service.py
. Each script has to expose a function that takes Installation
and
WorkspaceClient
arguments to perform the relevant upgrades. Here's the example:
from ... import Config
import logging, dataclasses
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation
upgrade_logger = logging.getLogger(__name__)
def upgrade(installation: Installation, ws: WorkspaceClient):
upgrade_logger.info(f"creating new automated service user for the installation")
config = installation.load(Config)
service_principal = ws.service_principals.create(display_name='blueprint-service')
new_config = dataclasses.replace(config, application_id=service_principal.application_id)
installation.save(new_config)
To prevent the same upgrade script from being applies twice, we use applied-upgrades.json
file in
the installation directory. At the moment, there's no downgrade(installation, ws)
, but it can easily be added in
the future versions of this library.
We recommend deploying applications as wheels, which are part of the application installation. But versioning, testing, and deploying those is often a tedious process.
When you deploy your Python app as a wheel, every time it has to have a different version. This library detects __about__.py
file automatically anywhere in the project root and reads __version__
variable from it. We support SemVer versioning scheme. Publishing wheels update version.json
file in the install folder.
from databricks.labs.blueprint.wheels import ProductInfo
product_info = ProductInfo(__file__)
version = product_info.released_version()
logger.info(f'Version is: {version}')
When you develop your wheel and iterate on testing it, it's often required to upload a file with different name each time you build it. We use git describe --tags
command to fetch the latest SemVer-compatible tag (e.g. v0.0.2
) and append the number of commits with timestamp to it. For example, if the released version is v0.0.1
, then the unreleased version would be something like 0.0.2+120240105144650
. We verify that this version is compatible with both SemVer and PEP 440. Publishing wheels update version.json
file in the install folder.
product_info = ProductInfo(__file__)
version = product_info.unreleased_version()
is_git = product_info.is_git_checkout()
is_unreleased = product_info.is_unreleased_version()
logger.info(f'Version is: {version}')
logger.info(f'Git checkout: {is_git}')
logger.info(f'Is unreleased: {is_unreleased}')
Library can infer the name of application by taking the directory name when __about__.py
file is located within the current project. See released version detection for more details.
ProductInfo.for_testing(klass)
creates a new ProductInfo
object with a random product_name
.
from databricks.labs.blueprint.wheels import ProductInfo
product_info = ProductInfo(__file__)
logger.info(f'Product name is: {product_info.product_name()}')
ProductInfo
with integration testsWhen you're integration testing your installations, you may want to have different installation folders for each test execution. ProductInfo.for_testing(klass)
helps you with this:
from ... import ConfigurationClass
from databricks.labs.blueprint.wheels import ProductInfo
first = ProductInfo.for_testing(ConfigurationClass)
second = ProductInfo.for_testing(ConfigurationClass)
assert first.product_name() != second.product_name()
Before you execute a wheel on Databricks, you have to build it and upload it. This library provides detects released or unreleased version of the wheel, copies it over to a temporary folder, changes the __about__.py
file with the right version, and builds the wheel in the temporary location, so that it's not polluted with build artifacts. Wheels
is a context manager, so it removes all temporary files and folders ather with
block finishes. This library is successfully used to concurrently test wheels on Shared Databricks Clusters through notebook-scoped libraries. Before you deploy the new version of the wheel, it is highly advised that you perform application state upgrades.
Every call wheels.upload_to_wsfs()
updates version.json
file in the install folder, which holds version
field with the current wheel version. There's also wheel
field, that contains the path to the current wheel file on WorkspaceFS.
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.wheels import ProductInfo
w = WorkspaceClient()
product_info = ProductInfo(__file__)
installation = product_info.current_installation(w)
with product_info.wheels(w) as wheels:
remote_wheel = wheels.upload_to_wsfs()
logger.info(f'Uploaded to {remote_wheel}')
This will print something like:
15:08:44 INFO [dist.logger] Uploaded to /Users/serge.smertin@databricks.com/.blueprint/wheels/databricks_labs_blueprint-0.0.2+120240105150840-py3-none-any.whl
You can also do wheels.upload_to_dbfs()
, though you're not able to set any access control over it.
Python wheel may have dependencies that are not included in the wheel itself. These dependencies are usually other Python packages that your wheel relies on. During installation on regular Databricks Workspaces, these dependencies get automatically fetched from Python Package Index.
Some Databricks Workspaces are configured with extra layers of network security, that block all access to Public Internet, including Python Package Index. To ensure installations working on these kinds of workspaces, developers need to explicitly upload all upstream dependencies for their applications to work correctly.
The upload_wheel_dependencies(prefixes)
method can be used to upload these dependencies to Databricks Workspace. This method takes a list of prefixes as an argument. It will upload all the dependencies of the wheel that have names starting with any of the provided prefixes.
Here is an example of how you can use this method:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.wheels import ProductInfo
ws = WorkspaceClient()
product_info = ProductInfo(__file__)
installation = product_info.current_installation(ws)
with product_info.wheels(ws) as wheels:
wheel_paths = wheels.upload_wheel_dependencies(['databricks_sdk', 'pandas'])
for path in wheel_paths:
print(f'Uploaded dependency to {path}')
In this example, the upload_wheel_dependencies(['databricks_sdk', 'pandas'])
call will upload all the dependencies of the wheel that have names starting with 'databricks_sdk' or 'pandas'. This method excludes any platform specific dependencies (i.e. ending with -none-any.whl
). Also the main wheel file is not uploaded. The method returns a list of paths to the uploaded dependencies on WorkspaceFS.
databricks labs ...
RouterThis library contains common utilities for Databricks CLI entrypoints defined in labs.yml
file. Here's the example metadata for a tool named blueprint
with a single me
command and flag named --greeting
, that has Hello
as default value:
---
name: blueprint
description: Common libraries for Databricks Labs
install:
script: src/databricks/labs/blueprint/__init__.py
entrypoint: src/databricks/labs/blueprint/__main__.py
min_python: 3.10
commands:
- name: me
description: shows current username
flags:
- name: greeting
default: Hello
description: Greeting prefix
And here's the content for src/databricks/labs/blueprint/__main__.py
file, that executes databricks labs blueprint me
command with databricks.sdk.WorkspaceClient
automatically injected into an argument with magical name w
:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.entrypoint import get_logger
from databricks.labs.blueprint.cli import App
app = App(__file__)
logger = get_logger(__file__)
@app.command
def me(w: WorkspaceClient, greeting: str):
"""Shows current username"""
logger.info(f"{greeting}, {w.current_user.me().user_name}!")
if "__main__" == __name__:
app()
As you may have noticed, there were only workspace-level commands, but you can also nave native account-level command support. You need to specify the is_account
property when declaring it in labs.yml
file:
commands:
# ...
- name: workspaces
is_account: true
description: shows current workspaces
and @app.command(is_account=True)
will get you databricks.sdk.AccountClient
injected into a
argument:
from databricks.sdk import AccountClient
@app.command(is_account=True)
def workspaces(a: AccountClient):
"""Shows workspaces"""
for ws in a.workspaces.list():
logger.info(f"Workspace: {ws.workspace_name} ({ws.workspace_id})")
If your command needs some terminal interactivity, simply add prompts: Prompts
argument to your command:
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.entrypoint import get_logger
from databricks.labs.blueprint.cli import App
from databricks.labs.blueprint.tui import Prompts
app = App(__file__)
logger = get_logger(__file__)
@app.command
def me(w: WorkspaceClient, prompts: Prompts):
"""Shows current username"""
if prompts.confirm("Are you sure?"):
logger.info(f"Hello, {w.current_user.me().user_name}!")
if "__main__" == __name__:
app()
Invoking Sparksession using Databricks Connect
from databricks.sdk import WorkspaceClient
from databricks.connect import DatabricksSession
@app.command
def example(w: WorkspaceClient):
"""Building Spark Session using Databricks Connect"""
spark = DatabricksSession.builder().sdk_config(w.config).getOrCreate()
spark.sql("SHOW TABLES")
This tooling makes it easier to start new projects. First, install the CLI:
databricks labs install blueprint
After, create new project in a designated directory:
databricks labs blueprint init-project --target /path/to/folder
This library is used in the following projects:
Please note that this project is provided for your exploration only and is not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS, and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of this project.
Any issues discovered through the use of this project should be filed as GitHub Issues on this repository. They will be reviewed as time permits, but no formal SLAs for support exist.
FAQs
Common libraries for Databricks Labs
We found that databricks-labs-blueprint demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.