Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoSign in
Socket

dlt-utils-lib

Package Overview
Dependencies
Maintainers
1
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dlt-utils-lib - pypi Package Compare versions

Comparing version
1.1.1
to
1.1.2
+55
dlt_utils/dlt_metadata_receiver.py
def get_table_description(spark,
catalog_name: str,
table_schema: str) -> dict:
bonze_table_comment = spark.sql(f'''
SELECT table_name, description
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY table_catalog, table_schema,table_name ORDER BY last_updated_time DESC) as rn
FROM apollo.core_map.ai_table_doc_metadata
WHERE table_catalog = '{catalog_name}'
AND table_schema = '{table_schema}'
) tmp
WHERE rn = 1
''')
return {el.table_name: el.description for el in
bonze_table_comment.collect()} if bonze_table_comment.count() > 0 else {}
def get_column_comments(spark,
catalog_name: str,
table_schema: str) -> dict:
bonze_column_comment = spark.sql(f'''
SELECT table_name, column_name, comment
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY table_catalog, table_schema, table_name, column_name
ORDER BY last_updated_time DESC
) as rn
FROM apollo.core_map.ai_column_doc_metadata
WHERE table_catalog = "{catalog_name}"
AND table_schema = "{table_schema}"
) tmp
WHERE rn = 1
''')
rows = bonze_column_comment.collect()
result = {}
if not rows:
return result
for row in rows:
table = row.table_name
column = row.column_name
comment = row.comment
if table not in result:
result[table] = {}
result[table][column] = comment
return result
+1
-1
Metadata-Version: 2.1
Name: dlt-utils-lib
Version: 1.1.1
Version: 1.1.2
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: UNKNOWN

setup.py
dlt_utils/__init__.py
dlt_utils/dlt_metadata_receiver.py
dlt_utils/dlt_transformations.py

@@ -4,0 +5,0 @@ dlt_utils/main_cdc_utils.py

from pyspark.sql import DataFrame
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql.functions import col, expr, greatest, when, datediff, lag, min as spark_min, expr, count, coalesce
from pyspark.sql.window import Window
from pyspark.sql.functions import col, greatest, when, datediff, expr

@@ -14,6 +13,10 @@

return df
def update_cdc_timestamp(df: DataFrame, time_diff_threshold: int) -> DataFrame:
# if cdc_timestamp is null or time difference is greater than threshold, set it to the max timestamp in the row
timestamp_cols = [col.name for col in df.schema.fields if isinstance(col.dataType, (TimestampType, DateType)) and col.name not in ['cdc_timestamp', 'commit_timestamp', 'dlt_timestamp']]
timestamp_cols = [col.name for col in df.schema.fields if
isinstance(col.dataType, (TimestampType, DateType)) and col.name not in ['cdc_timestamp',
'commit_timestamp',
'dlt_timestamp']]

@@ -28,22 +31,22 @@ if timestamp_cols:

max_timestamp_per_row = when(
col('last_update_date').isNotNull(),
col('last_update_date')
).otherwise(max_timestamp_per_row)
col('last_update_date').isNotNull(),
col('last_update_date')
).otherwise(max_timestamp_per_row)
# temp fix for artemis table if possible need take last_update_date becouse of market table contnin close time
if 'version' in timestamp_cols:
max_timestamp_per_row = when(
col('version').isNotNull(),
col('version')
).otherwise(max_timestamp_per_row)
col('version').isNotNull(),
col('version')
).otherwise(max_timestamp_per_row)
else:
max_timestamp_per_row = col(timestamp_cols[0])
df = df.withColumn(
'cdc_timestamp',
when(
(col('cdc_timestamp').isNull()) |
(datediff(col('cdc_timestamp'), max_timestamp_per_row) > time_diff_threshold),
max_timestamp_per_row
).otherwise(col('cdc_timestamp'))
)
when(
(col('cdc_timestamp').isNull()) |
(datediff(col('cdc_timestamp'), max_timestamp_per_row) > time_diff_threshold),
max_timestamp_per_row
).otherwise(col('cdc_timestamp'))
)
return df

@@ -55,3 +58,6 @@

if default_value_for_removed_col['name'] in df.columns:
df = df.withColumn(default_value_for_removed_col['name'], when(col(default_value_for_removed_col['name']).isNull(), expr(default_value_for_removed_col['expr'])).otherwise(col(default_value_for_removed_col['name'])))
df = df.withColumn(default_value_for_removed_col['name'],
when(col(default_value_for_removed_col['name']).isNull(),
expr(default_value_for_removed_col['expr'])).otherwise(
col(default_value_for_removed_col['name'])))
else:

@@ -65,5 +71,1 @@ df = df.withColumn(default_value_for_removed_col['name'], expr(default_value_for_removed_col['expr']))

return df.toDF(*new_columns)
from pyspark.sql.functions import col, expr, coalesce, lit, when, current_timestamp
from .dlt_transformations import update_cdc_timestamp, apply_partitions, add_default_value_for_removed_col, rename_columns
import json
# Base code pipeline: streaming process

@@ -23,8 +23,9 @@ def base_cdc_replication_process(dlt,

bronze_table_path = f"{bucket_name}/{bronze_directory}/{s3_files_path}"
# Optional parameters
default_value_for_removed_col = table.get('default_value_for_removed_col', {})
table_description = table.get('table_description')
column_comments = table.get('column_comments')
# Execute pipeline

@@ -34,11 +35,10 @@ create_bronze_table_definition(spark=spark,

table_name=source_table_name,
files_path=bronze_table_path,
files_path=bronze_table_path,
file_format=file_format,
partitions=partition_col,
schema_exclude_columns=exclude_columns,
keys=keys,
time_diff_history_cdc_timestamp=time_diff_history_cdc_timestamp,
default_value_for_removed_col=default_value_for_removed_col
)
silver_streaming_process(dlt=dlt,

@@ -49,14 +49,26 @@ table_name=source_table_name,

exclude_columns=exclude_columns,
enable_truncate=enable_truncate)
enable_truncate=enable_truncate,
table_description=table_description,
column_comments=column_comments
)
# Define the transformation function dynamically
def create_bronze_table_definition(spark, dlt, table_name: str, files_path: str, file_format: str, partitions: dict, schema_exclude_columns: list, keys: list, time_diff_history_cdc_timestamp: int, default_value_for_removed_col: dict):
def create_bronze_table_definition(spark,
dlt,
table_name: str,
files_path: str,
file_format: str,
partitions: dict,
schema_exclude_columns: list,
time_diff_history_cdc_timestamp: int,
default_value_for_removed_col: dict
):
@dlt.table(
name=f"bronze_{table_name}",
comment="This is the bronze table.",
comment= "This is the bronze table for table {table_name}.",
temporary=False
)
def transform_cdc_to_bronze():
df = spark.read.format(file_format).load(files_path).transform(rename_columns)
df = spark.read.format(file_format).load(files_path).transform(rename_columns)
fields = [field for field in df.schema.fields if field.name not in schema_exclude_columns]

@@ -83,3 +95,10 @@ schema_string = ', '.join([f"{field.name} {field.dataType.simpleString()}" for field in fields])

# Create silver streaming data
def silver_streaming_process(dlt, table_name: str, keys: list, partitions: dict, exclude_columns: list, enable_truncate: bool = False):
def silver_streaming_process(dlt,
table_name: str,
keys: list,
partitions: dict,
exclude_columns: list,
table_description: str = None,
column_comments: dict = None,
enable_truncate: bool = False):
dlt.create_streaming_table(

@@ -89,5 +108,6 @@ name=table_name,

"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
"delta.autoOptimize.autoCompact": "true",
"column_comments": json.dumps(column_comments) if column_comments else "{}"
},
comment = "This is the silver table with source in",
comment = table_description or "This is the silver table with source in",
partition_cols=partitions if partitions else None

@@ -94,0 +114,0 @@ )

Metadata-Version: 2.1
Name: dlt_utils_lib
Version: 1.1.1
Version: 1.1.2
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: UNKNOWN