kflow
Advanced tools
| Metadata-Version: 2.4 | ||
| Name: kflow | ||
| Version: 1.1.0 | ||
| Summary: KLog.co package for ETLs | ||
| Version: 1.2.0 | ||
| Summary: KLog.co package for ETLs with performance optimizations | ||
| Home-page: https://github.com/teu-ai/etl | ||
@@ -20,2 +20,5 @@ Author: KLog.co Data & BI | ||
| Requires-Dist: smart_open | ||
| Requires-Dist: jinjasql | ||
| Requires-Dist: fastparquet | ||
| Requires-Dist: pyarrow | ||
| Dynamic: author | ||
@@ -32,2 +35,2 @@ Dynamic: author-email | ||
| Functions to upload, download and transform data, from different sources, usually to be used with Pandas. | ||
| High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines. |
@@ -6,1 +6,4 @@ sqlalchemy | ||
| smart_open | ||
| jinjasql | ||
| fastparquet | ||
| pyarrow |
+70
-3
@@ -0,6 +1,73 @@ | ||
| """ | ||
| KFlow - KLog.co ETL Package | ||
| =========================== | ||
| A comprehensive Python package for Extract, Transform, Load (ETL) operations. | ||
| Designed for data engineering pipelines with support for various data sources | ||
| and destinations including AWS services, databases, and data warehouses. | ||
| Modules: | ||
| - extract: Data extraction from various sources (SQL, APIs, files) | ||
| - load: Data loading to warehouses and databases | ||
| - authn: Authentication and connection management | ||
| - tools: Utility functions and helpers | ||
| - monitoring: Performance monitoring and logging | ||
| - log: Advanced logging utilities | ||
| Example: | ||
| >>> from kflow import extract, load, authn | ||
| >>> df = extract.SQLFileToDataframe('queries/', 'my_query.sql', 'PrismaProd') | ||
| >>> load.DataFrameToWarehouse('my_table', df) | ||
| """ | ||
| __version__ = "1.2.0" | ||
| __author__ = "KLog.co Data & BI" | ||
| __email__ = "data@klog.co" | ||
| # Import main modules for easier access | ||
| from . import extract | ||
| from . import load | ||
| from . import authn | ||
| from . import extract | ||
| from . import load | ||
| from . import tools | ||
| from . import monitoring | ||
| from . import log | ||
| from . import monitoring | ||
| # Expose commonly used functions | ||
| from .extract import ( | ||
| SQLFileToDataframe, | ||
| SQLFileToDataframeBatch, | ||
| RDSPostgresTableAsDataFrame, | ||
| WarehouseTableAsDataFrame, | ||
| LakeFileAsDataFrame, | ||
| PrismaTableSnapshot | ||
| ) | ||
| from .load import ( | ||
| DataFrameToWarehouse, | ||
| DataFrameToLake | ||
| ) | ||
| from .authn import ( | ||
| getConnectionDB, | ||
| awsClient | ||
| ) | ||
| __all__ = [ | ||
| 'extract', | ||
| 'load', | ||
| 'authn', | ||
| 'tools', | ||
| 'monitoring', | ||
| 'log', | ||
| # Commonly used functions | ||
| 'SQLFileToDataframe', | ||
| 'SQLFileToDataframeBatch', | ||
| 'RDSPostgresTableAsDataFrame', | ||
| 'WarehouseTableAsDataFrame', | ||
| 'LakeFileAsDataFrame', | ||
| 'PrismaTableSnapshot', | ||
| 'DataFrameToWarehouse', | ||
| 'DataFrameToLake', | ||
| 'getConnectionDB', | ||
| 'awsClient' | ||
| ] |
+165
-15
@@ -677,22 +677,172 @@ import io | ||
| def SQLFileToDataframe(path:str, sql_filename:str, base_env:str, template_params=None, bucket:str="klog-etl"): | ||
| def SQLFileToDataframe(path: str, sql_filename: str, base_env: str, template_params=None, | ||
| bucket: str = "klog-etl", chunksize: int = None, engine_reuse=None, | ||
| performance_monitoring: bool = True) -> pd.DataFrame: | ||
| """ | ||
| Execute SQL file against database and return DataFrame with optimized performance. | ||
| """ | ||
| Documentar | ||
| Parameters | ||
| ---------- | ||
| path : str | ||
| S3 path where the SQL file is located | ||
| sql_filename : str | ||
| Name of the SQL file to execute | ||
| base_env : str | ||
| Database environment identifier for connection | ||
| template_params : dict, optional | ||
| Parameters for Jinja2 template rendering in SQL | ||
| bucket : str, default "klog-etl" | ||
| S3 bucket containing the SQL file | ||
| chunksize : int, optional | ||
| Number of rows to read at a time for large datasets. | ||
| If None, reads entire result set at once. | ||
| engine_reuse : sqlalchemy.Engine, optional | ||
| Reuse existing database engine instead of creating new connection | ||
| performance_monitoring : bool, default True | ||
| Enable performance timing and memory usage logging | ||
| Returns | ||
| ------- | ||
| pd.DataFrame | ||
| Result of SQL query execution | ||
| Raises | ||
| ------ | ||
| Exception | ||
| If SQL execution fails or template rendering fails | ||
| """ | ||
| import time | ||
| from contextlib import contextmanager | ||
| @contextmanager | ||
| def perf_timer(operation_name): | ||
| """Context manager for timing operations""" | ||
| if performance_monitoring: | ||
| start_time = time.time() | ||
| try: | ||
| yield | ||
| finally: | ||
| duration = time.time() - start_time | ||
| logging.info(f"⏱️ {operation_name} completed in {duration:.2f} seconds") | ||
| else: | ||
| yield | ||
| # Use provided engine or create new one | ||
| if engine_reuse is not None: | ||
| bd_engine = engine_reuse | ||
| dispose_engine = False | ||
| logging.info("🔌 Reusing provided database engine") | ||
| else: | ||
| with perf_timer("Database connection establishment"): | ||
| bd_engine = authn.getConnectionDB(base_env) | ||
| dispose_engine = True | ||
| try: | ||
| # Load and process SQL file | ||
| with perf_timer(f"SQL file loading from s3://{bucket}/{path}{sql_filename}"): | ||
| query = SQLFileToString(path, sql_filename, bucket) | ||
| # Process template parameters if provided | ||
| bind_params = None | ||
| if template_params: | ||
| with perf_timer("SQL template processing"): | ||
| from jinjasql import JinjaSql | ||
| j = JinjaSql() | ||
| query, bind_params = j.prepare_query(query, template_params) | ||
| logging.info(f"📝 Template parameters applied: {list(template_params.keys())}") | ||
| else: | ||
| # Escape % characters for SQL execution | ||
| query = query.replace("%", "%%") | ||
| # Execute query with performance monitoring | ||
| if chunksize: | ||
| logging.info(f"📦 Reading data in chunks of {chunksize:,} rows") | ||
| with perf_timer(f"Chunked SQL execution ({sql_filename})"): | ||
| chunks = [] | ||
| chunk_count = 0 | ||
| for chunk in pd.read_sql_query(query, bd_engine, params=bind_params, chunksize=chunksize): | ||
| chunks.append(chunk) | ||
| chunk_count += 1 | ||
| if performance_monitoring and chunk_count % 10 == 0: | ||
| logging.info(f"📊 Processed {chunk_count} chunks...") | ||
| if chunks: | ||
| with perf_timer("DataFrame concatenation"): | ||
| df = pd.concat(chunks, ignore_index=True) | ||
| logging.info(f"📋 Concatenated {chunk_count} chunks into DataFrame") | ||
| else: | ||
| df = pd.DataFrame() | ||
| else: | ||
| with perf_timer(f"SQL execution ({sql_filename})"): | ||
| df = pd.read_sql_query(query, bd_engine, params=bind_params) | ||
| # Log result statistics | ||
| if performance_monitoring and not df.empty: | ||
| memory_usage = df.memory_usage(deep=True).sum() / 1024 / 1024 # MB | ||
| logging.info(f"📊 Query result: {len(df):,} rows, {df.shape[1]} columns, {memory_usage:.2f} MB") | ||
| # Log column info for debugging | ||
| logging.info(f"📝 Columns: {list(df.columns)}") | ||
| elif df.empty: | ||
| logging.warning(f"⚠️ Query returned empty result: {sql_filename}") | ||
| return df | ||
| except Exception as e: | ||
| logging.error(f"❌ Error executing SQL file {sql_filename}: {str(e)}") | ||
| logging.error(f"🔍 Error type: {type(e).__name__}") | ||
| if template_params: | ||
| logging.error(f"📝 Template parameters: {template_params}") | ||
| raise | ||
| finally: | ||
| # Clean up database connection if we created it | ||
| if dispose_engine and bd_engine: | ||
| try: | ||
| bd_engine.dispose() | ||
| if performance_monitoring: | ||
| logging.info("🔌 Database connection disposed") | ||
| except Exception as cleanup_error: | ||
| logging.warning(f"⚠️ Warning during connection cleanup: {cleanup_error}") | ||
| bd_engine = authn.getConnectionDB(base_env) | ||
| query = SQLFileToString(path,sql_filename,bucket) | ||
| params = () | ||
| if template_params: | ||
| from jinjasql import JinjaSql | ||
| j = JinjaSql() | ||
| query, bind_params = j.prepare_query(query, template_params) | ||
| else: | ||
| query = query.replace("%","%%") | ||
| bind_params = None | ||
| def SQLFileToDataframeBatch(path: str, sql_filename: str, base_env: str, | ||
| batch_size: int = 50000, template_params=None, | ||
| bucket: str = "klog-etl") -> pd.DataFrame: | ||
| """ | ||
| Optimized version for very large SQL results using chunked processing. | ||
| return pd.read_sql_query(query, bd_engine, params=bind_params) | ||
| This function is specifically designed for queries that return millions of rows | ||
| and helps manage memory usage by processing data in batches. | ||
| Parameters | ||
| ---------- | ||
| path : str | ||
| S3 path where the SQL file is located | ||
| sql_filename : str | ||
| Name of the SQL file to execute | ||
| base_env : str | ||
| Database environment identifier for connection | ||
| batch_size : int, default 50000 | ||
| Number of rows to process in each batch | ||
| template_params : dict, optional | ||
| Parameters for Jinja2 template rendering in SQL | ||
| bucket : str, default "klog-etl" | ||
| S3 bucket containing the SQL file | ||
| Returns | ||
| ------- | ||
| pd.DataFrame | ||
| Complete result of SQL query execution | ||
| """ | ||
| return SQLFileToDataframe( | ||
| path=path, | ||
| sql_filename=sql_filename, | ||
| base_env=base_env, | ||
| template_params=template_params, | ||
| bucket=bucket, | ||
| chunksize=batch_size, | ||
| performance_monitoring=True | ||
| ) | ||
@@ -699,0 +849,0 @@ |
+6
-3
| Metadata-Version: 2.4 | ||
| Name: kflow | ||
| Version: 1.1.0 | ||
| Summary: KLog.co package for ETLs | ||
| Version: 1.2.0 | ||
| Summary: KLog.co package for ETLs with performance optimizations | ||
| Home-page: https://github.com/teu-ai/etl | ||
@@ -20,2 +20,5 @@ Author: KLog.co Data & BI | ||
| Requires-Dist: smart_open | ||
| Requires-Dist: jinjasql | ||
| Requires-Dist: fastparquet | ||
| Requires-Dist: pyarrow | ||
| Dynamic: author | ||
@@ -32,2 +35,2 @@ Dynamic: author-email | ||
| Functions to upload, download and transform data, from different sources, usually to be used with Pandas. | ||
| High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines. |
+8
-5
@@ -8,8 +8,8 @@ """This module does blah blah.""" | ||
| if github_run_number is not None: | ||
| version = f"1.1.{github_run_number}" | ||
| version = f"1.2.{github_run_number}" | ||
| setup(name='kflow', | ||
| version=version if version != None else "1.1.0", | ||
| description='KLog.co package for ETLs', | ||
| long_description='Functions to upload, download and transform data, from different sources, usually to be used with Pandas.', | ||
| version=version if version != None else "1.2.0", | ||
| description='KLog.co package for ETLs with performance optimizations', | ||
| long_description='High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines.', | ||
| classifiers=[ | ||
@@ -34,4 +34,7 @@ 'Development Status :: 3 - Alpha', | ||
| 'boto3', | ||
| 'smart_open' | ||
| 'smart_open', | ||
| 'jinjasql', | ||
| 'fastparquet', | ||
| 'pyarrow' | ||
| ] | ||
| ) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
108268
8.31%2169
9.93%