DLT with Debug
Running DLT workflows from interactive notebooks.
About The Project
Delta Live Tables (DLTs) are a great way to design data pipelines with only focusing on the core business logic.
It makes the life of data engineers easy but while the development workflows are streamlined in DLT, when it comes to
debugging and seeing how the data looks after each transformation step in a typical DLT pipeline it becomes very
painful and cumbersome as we dont have the DLT package available in our interactive environment.
Enter dlt-with-debug a lightweight decorator utility which allows developers to do interactive
pipeline development by having a unified source code for both DLT run and Non-DLT interactive notebook run.
(back to top)
Built With
(back to top)
Installation
pip install in your Databricks Notebook
PyPI
%pip install dlt-with-debug
(back to top)
Prerequisites
(back to top)
Usage
Note:
- Use the
dlt.create_table()
API instead of dlt.table()
as dlt.table()
sometimes gets mixed with spark.table()
in the global namespace.
- Always pass the
globals()
namespace to dltwithdebug
decorator like this @dltwithdebug(globals())
(back to top)
Sample DLT with debug
DLT pipeline example
Code:
Cmd 1
%pip install -e git+https://github.com/souvik-databricks/dlt-with-debug.git
Cmd 2
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
if pipeline_id:
import dlt
else:
from dlt_with_debug import dlt
Cmd 3
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
Cmd 4
@dlt.create_table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
@dltwithdebug(globals())
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
)
Cmd 5
showoutput(clickstream_raw)

Cmd 6
@dlt.create_table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
@dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"})
@dltwithdebug(globals())
def clickstream_clean():
return (
dlt.read("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Cmd 7
showoutput(clickstream_clean)

Important to note that here you can see we are also seeing how many records will the expectations affect.
(back to top)
Same sample DLT with debug
DLT pipeline executed as part of a delta live table

Below we can the expectation results also match up with the expectation metrics that we got from dltwithdebug earlier
with showoutput(clickstream_clean)

(back to top)
Quick API guide
Table syntax
@dlt.create_table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals())
def <function-name>():
return (<query>)
View syntax
@dlt.create_view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals())
def <function-name>():
return (<query>)
Getting results syntax
showoutput(function_name)
Import syntax
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
if pipeline_id:
import dlt
else:
from dlt_with_debug import dlt
(back to top)
Upcoming functionality
As of now the following DLT APIs are covered for interactive use:
Limitation
DLT with Debug
is a fully python based utility and as such it doesn't supports spark.table("LIVE.func_name")
syntax.
So instead of spark.table("LIVE.func_name")
use dlt.read("func_name")
or dlt.read_stream("func_name")
License
Distributed under the MIT License.
(back to top)