dlt-utils-lib
Advanced tools
| def get_table_description(spark, | ||
| catalog_name: str, | ||
| table_schema: str) -> dict: | ||
| bonze_table_comment = spark.sql(f''' | ||
| SELECT table_name, description | ||
| FROM ( | ||
| SELECT *, | ||
| ROW_NUMBER() OVER (PARTITION BY table_catalog, table_schema,table_name ORDER BY last_updated_time DESC) as rn | ||
| FROM apollo.core_map.ai_table_doc_metadata | ||
| WHERE table_catalog = '{catalog_name}' | ||
| AND table_schema = '{table_schema}' | ||
| ) tmp | ||
| WHERE rn = 1 | ||
| ''') | ||
| return {el.table_name: el.description for el in | ||
| bonze_table_comment.collect()} if bonze_table_comment.count() > 0 else {} | ||
| def get_column_comments(spark, | ||
| catalog_name: str, | ||
| table_schema: str) -> dict: | ||
| bonze_column_comment = spark.sql(f''' | ||
| SELECT table_name, column_name, comment | ||
| FROM ( | ||
| SELECT *, | ||
| ROW_NUMBER() OVER ( | ||
| PARTITION BY table_catalog, table_schema, table_name, column_name | ||
| ORDER BY last_updated_time DESC | ||
| ) as rn | ||
| FROM apollo.core_map.ai_column_doc_metadata | ||
| WHERE table_catalog = "{catalog_name}" | ||
| AND table_schema = "{table_schema}" | ||
| ) tmp | ||
| WHERE rn = 1 | ||
| ''') | ||
| rows = bonze_column_comment.collect() | ||
| result = {} | ||
| if not rows: | ||
| return result | ||
| for row in rows: | ||
| table = row.table_name | ||
| column = row.column_name | ||
| comment = row.comment | ||
| if table not in result: | ||
| result[table] = {} | ||
| result[table][column] = comment | ||
| return result |
| Metadata-Version: 2.1 | ||
| Name: dlt-utils-lib | ||
| Version: 1.1.1 | ||
| Version: 1.1.2 | ||
| Summary: UNKNOWN | ||
@@ -5,0 +5,0 @@ Home-page: UNKNOWN |
| setup.py | ||
| dlt_utils/__init__.py | ||
| dlt_utils/dlt_metadata_receiver.py | ||
| dlt_utils/dlt_transformations.py | ||
@@ -4,0 +5,0 @@ dlt_utils/main_cdc_utils.py |
| from pyspark.sql import DataFrame | ||
| from pyspark.sql.types import TimestampType, DateType | ||
| from pyspark.sql.functions import col, expr, greatest, when, datediff, lag, min as spark_min, expr, count, coalesce | ||
| from pyspark.sql.window import Window | ||
| from pyspark.sql.functions import col, greatest, when, datediff, expr | ||
@@ -14,6 +13,10 @@ | ||
| return df | ||
| def update_cdc_timestamp(df: DataFrame, time_diff_threshold: int) -> DataFrame: | ||
| # if cdc_timestamp is null or time difference is greater than threshold, set it to the max timestamp in the row | ||
| timestamp_cols = [col.name for col in df.schema.fields if isinstance(col.dataType, (TimestampType, DateType)) and col.name not in ['cdc_timestamp', 'commit_timestamp', 'dlt_timestamp']] | ||
| timestamp_cols = [col.name for col in df.schema.fields if | ||
| isinstance(col.dataType, (TimestampType, DateType)) and col.name not in ['cdc_timestamp', | ||
| 'commit_timestamp', | ||
| 'dlt_timestamp']] | ||
@@ -28,22 +31,22 @@ if timestamp_cols: | ||
| max_timestamp_per_row = when( | ||
| col('last_update_date').isNotNull(), | ||
| col('last_update_date') | ||
| ).otherwise(max_timestamp_per_row) | ||
| col('last_update_date').isNotNull(), | ||
| col('last_update_date') | ||
| ).otherwise(max_timestamp_per_row) | ||
| # temp fix for artemis table if possible need take last_update_date becouse of market table contnin close time | ||
| if 'version' in timestamp_cols: | ||
| max_timestamp_per_row = when( | ||
| col('version').isNotNull(), | ||
| col('version') | ||
| ).otherwise(max_timestamp_per_row) | ||
| col('version').isNotNull(), | ||
| col('version') | ||
| ).otherwise(max_timestamp_per_row) | ||
| else: | ||
| max_timestamp_per_row = col(timestamp_cols[0]) | ||
| df = df.withColumn( | ||
| 'cdc_timestamp', | ||
| when( | ||
| (col('cdc_timestamp').isNull()) | | ||
| (datediff(col('cdc_timestamp'), max_timestamp_per_row) > time_diff_threshold), | ||
| max_timestamp_per_row | ||
| ).otherwise(col('cdc_timestamp')) | ||
| ) | ||
| when( | ||
| (col('cdc_timestamp').isNull()) | | ||
| (datediff(col('cdc_timestamp'), max_timestamp_per_row) > time_diff_threshold), | ||
| max_timestamp_per_row | ||
| ).otherwise(col('cdc_timestamp')) | ||
| ) | ||
| return df | ||
@@ -55,3 +58,6 @@ | ||
| if default_value_for_removed_col['name'] in df.columns: | ||
| df = df.withColumn(default_value_for_removed_col['name'], when(col(default_value_for_removed_col['name']).isNull(), expr(default_value_for_removed_col['expr'])).otherwise(col(default_value_for_removed_col['name']))) | ||
| df = df.withColumn(default_value_for_removed_col['name'], | ||
| when(col(default_value_for_removed_col['name']).isNull(), | ||
| expr(default_value_for_removed_col['expr'])).otherwise( | ||
| col(default_value_for_removed_col['name']))) | ||
| else: | ||
@@ -65,5 +71,1 @@ df = df.withColumn(default_value_for_removed_col['name'], expr(default_value_for_removed_col['expr'])) | ||
| return df.toDF(*new_columns) | ||
| from pyspark.sql.functions import col, expr, coalesce, lit, when, current_timestamp | ||
| from .dlt_transformations import update_cdc_timestamp, apply_partitions, add_default_value_for_removed_col, rename_columns | ||
| import json | ||
| # Base code pipeline: streaming process | ||
@@ -23,8 +23,9 @@ def base_cdc_replication_process(dlt, | ||
| bronze_table_path = f"{bucket_name}/{bronze_directory}/{s3_files_path}" | ||
| # Optional parameters | ||
| default_value_for_removed_col = table.get('default_value_for_removed_col', {}) | ||
| table_description = table.get('table_description') | ||
| column_comments = table.get('column_comments') | ||
| # Execute pipeline | ||
@@ -34,11 +35,10 @@ create_bronze_table_definition(spark=spark, | ||
| table_name=source_table_name, | ||
| files_path=bronze_table_path, | ||
| files_path=bronze_table_path, | ||
| file_format=file_format, | ||
| partitions=partition_col, | ||
| schema_exclude_columns=exclude_columns, | ||
| keys=keys, | ||
| time_diff_history_cdc_timestamp=time_diff_history_cdc_timestamp, | ||
| default_value_for_removed_col=default_value_for_removed_col | ||
| ) | ||
| silver_streaming_process(dlt=dlt, | ||
@@ -49,14 +49,26 @@ table_name=source_table_name, | ||
| exclude_columns=exclude_columns, | ||
| enable_truncate=enable_truncate) | ||
| enable_truncate=enable_truncate, | ||
| table_description=table_description, | ||
| column_comments=column_comments | ||
| ) | ||
| # Define the transformation function dynamically | ||
| def create_bronze_table_definition(spark, dlt, table_name: str, files_path: str, file_format: str, partitions: dict, schema_exclude_columns: list, keys: list, time_diff_history_cdc_timestamp: int, default_value_for_removed_col: dict): | ||
| def create_bronze_table_definition(spark, | ||
| dlt, | ||
| table_name: str, | ||
| files_path: str, | ||
| file_format: str, | ||
| partitions: dict, | ||
| schema_exclude_columns: list, | ||
| time_diff_history_cdc_timestamp: int, | ||
| default_value_for_removed_col: dict | ||
| ): | ||
| @dlt.table( | ||
| name=f"bronze_{table_name}", | ||
| comment="This is the bronze table.", | ||
| comment= "This is the bronze table for table {table_name}.", | ||
| temporary=False | ||
| ) | ||
| def transform_cdc_to_bronze(): | ||
| df = spark.read.format(file_format).load(files_path).transform(rename_columns) | ||
| df = spark.read.format(file_format).load(files_path).transform(rename_columns) | ||
| fields = [field for field in df.schema.fields if field.name not in schema_exclude_columns] | ||
@@ -83,3 +95,10 @@ schema_string = ', '.join([f"{field.name} {field.dataType.simpleString()}" for field in fields]) | ||
| # Create silver streaming data | ||
| def silver_streaming_process(dlt, table_name: str, keys: list, partitions: dict, exclude_columns: list, enable_truncate: bool = False): | ||
| def silver_streaming_process(dlt, | ||
| table_name: str, | ||
| keys: list, | ||
| partitions: dict, | ||
| exclude_columns: list, | ||
| table_description: str = None, | ||
| column_comments: dict = None, | ||
| enable_truncate: bool = False): | ||
| dlt.create_streaming_table( | ||
@@ -89,5 +108,6 @@ name=table_name, | ||
| "delta.autoOptimize.optimizeWrite": "true", | ||
| "delta.autoOptimize.autoCompact": "true" | ||
| "delta.autoOptimize.autoCompact": "true", | ||
| "column_comments": json.dumps(column_comments) if column_comments else "{}" | ||
| }, | ||
| comment = "This is the silver table with source in", | ||
| comment = table_description or "This is the silver table with source in", | ||
| partition_cols=partitions if partitions else None | ||
@@ -94,0 +114,0 @@ ) |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: dlt_utils_lib | ||
| Version: 1.1.1 | ||
| Version: 1.1.2 | ||
| Summary: UNKNOWN | ||
@@ -5,0 +5,0 @@ Home-page: UNKNOWN |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
15959
22.79%15
7.14%319
29.15%