Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

kflow

Package Overview
Dependencies
Maintainers
1
Versions
102
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kflow - npm Package Compare versions

Comparing version
1.1.0
to
1.2.0
+6
-3
kflow.egg-info/PKG-INFO
Metadata-Version: 2.4
Name: kflow
Version: 1.1.0
Summary: KLog.co package for ETLs
Version: 1.2.0
Summary: KLog.co package for ETLs with performance optimizations
Home-page: https://github.com/teu-ai/etl

@@ -20,2 +20,5 @@ Author: KLog.co Data & BI

Requires-Dist: smart_open
Requires-Dist: jinjasql
Requires-Dist: fastparquet
Requires-Dist: pyarrow
Dynamic: author

@@ -32,2 +35,2 @@ Dynamic: author-email

Functions to upload, download and transform data, from different sources, usually to be used with Pandas.
High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines.

@@ -6,1 +6,4 @@ sqlalchemy

smart_open
jinjasql
fastparquet
pyarrow

@@ -0,6 +1,73 @@

"""
KFlow - KLog.co ETL Package
===========================
A comprehensive Python package for Extract, Transform, Load (ETL) operations.
Designed for data engineering pipelines with support for various data sources
and destinations including AWS services, databases, and data warehouses.
Modules:
- extract: Data extraction from various sources (SQL, APIs, files)
- load: Data loading to warehouses and databases
- authn: Authentication and connection management
- tools: Utility functions and helpers
- monitoring: Performance monitoring and logging
- log: Advanced logging utilities
Example:
>>> from kflow import extract, load, authn
>>> df = extract.SQLFileToDataframe('queries/', 'my_query.sql', 'PrismaProd')
>>> load.DataFrameToWarehouse('my_table', df)
"""
__version__ = "1.2.0"
__author__ = "KLog.co Data & BI"
__email__ = "data@klog.co"
# Import main modules for easier access
from . import extract
from . import load
from . import authn
from . import extract
from . import load
from . import tools
from . import monitoring
from . import log
from . import monitoring
# Expose commonly used functions
from .extract import (
SQLFileToDataframe,
SQLFileToDataframeBatch,
RDSPostgresTableAsDataFrame,
WarehouseTableAsDataFrame,
LakeFileAsDataFrame,
PrismaTableSnapshot
)
from .load import (
DataFrameToWarehouse,
DataFrameToLake
)
from .authn import (
getConnectionDB,
awsClient
)
__all__ = [
'extract',
'load',
'authn',
'tools',
'monitoring',
'log',
# Commonly used functions
'SQLFileToDataframe',
'SQLFileToDataframeBatch',
'RDSPostgresTableAsDataFrame',
'WarehouseTableAsDataFrame',
'LakeFileAsDataFrame',
'PrismaTableSnapshot',
'DataFrameToWarehouse',
'DataFrameToLake',
'getConnectionDB',
'awsClient'
]

@@ -677,22 +677,172 @@ import io

def SQLFileToDataframe(path:str, sql_filename:str, base_env:str, template_params=None, bucket:str="klog-etl"):
def SQLFileToDataframe(path: str, sql_filename: str, base_env: str, template_params=None,
bucket: str = "klog-etl", chunksize: int = None, engine_reuse=None,
performance_monitoring: bool = True) -> pd.DataFrame:
"""
Execute SQL file against database and return DataFrame with optimized performance.
"""
Documentar
Parameters
----------
path : str
S3 path where the SQL file is located
sql_filename : str
Name of the SQL file to execute
base_env : str
Database environment identifier for connection
template_params : dict, optional
Parameters for Jinja2 template rendering in SQL
bucket : str, default "klog-etl"
S3 bucket containing the SQL file
chunksize : int, optional
Number of rows to read at a time for large datasets.
If None, reads entire result set at once.
engine_reuse : sqlalchemy.Engine, optional
Reuse existing database engine instead of creating new connection
performance_monitoring : bool, default True
Enable performance timing and memory usage logging
Returns
-------
pd.DataFrame
Result of SQL query execution
Raises
------
Exception
If SQL execution fails or template rendering fails
"""
import time
from contextlib import contextmanager
@contextmanager
def perf_timer(operation_name):
"""Context manager for timing operations"""
if performance_monitoring:
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
logging.info(f"⏱️ {operation_name} completed in {duration:.2f} seconds")
else:
yield
# Use provided engine or create new one
if engine_reuse is not None:
bd_engine = engine_reuse
dispose_engine = False
logging.info("🔌 Reusing provided database engine")
else:
with perf_timer("Database connection establishment"):
bd_engine = authn.getConnectionDB(base_env)
dispose_engine = True
try:
# Load and process SQL file
with perf_timer(f"SQL file loading from s3://{bucket}/{path}{sql_filename}"):
query = SQLFileToString(path, sql_filename, bucket)
# Process template parameters if provided
bind_params = None
if template_params:
with perf_timer("SQL template processing"):
from jinjasql import JinjaSql
j = JinjaSql()
query, bind_params = j.prepare_query(query, template_params)
logging.info(f"📝 Template parameters applied: {list(template_params.keys())}")
else:
# Escape % characters for SQL execution
query = query.replace("%", "%%")
# Execute query with performance monitoring
if chunksize:
logging.info(f"📦 Reading data in chunks of {chunksize:,} rows")
with perf_timer(f"Chunked SQL execution ({sql_filename})"):
chunks = []
chunk_count = 0
for chunk in pd.read_sql_query(query, bd_engine, params=bind_params, chunksize=chunksize):
chunks.append(chunk)
chunk_count += 1
if performance_monitoring and chunk_count % 10 == 0:
logging.info(f"📊 Processed {chunk_count} chunks...")
if chunks:
with perf_timer("DataFrame concatenation"):
df = pd.concat(chunks, ignore_index=True)
logging.info(f"📋 Concatenated {chunk_count} chunks into DataFrame")
else:
df = pd.DataFrame()
else:
with perf_timer(f"SQL execution ({sql_filename})"):
df = pd.read_sql_query(query, bd_engine, params=bind_params)
# Log result statistics
if performance_monitoring and not df.empty:
memory_usage = df.memory_usage(deep=True).sum() / 1024 / 1024 # MB
logging.info(f"📊 Query result: {len(df):,} rows, {df.shape[1]} columns, {memory_usage:.2f} MB")
# Log column info for debugging
logging.info(f"📝 Columns: {list(df.columns)}")
elif df.empty:
logging.warning(f"⚠️ Query returned empty result: {sql_filename}")
return df
except Exception as e:
logging.error(f"❌ Error executing SQL file {sql_filename}: {str(e)}")
logging.error(f"🔍 Error type: {type(e).__name__}")
if template_params:
logging.error(f"📝 Template parameters: {template_params}")
raise
finally:
# Clean up database connection if we created it
if dispose_engine and bd_engine:
try:
bd_engine.dispose()
if performance_monitoring:
logging.info("🔌 Database connection disposed")
except Exception as cleanup_error:
logging.warning(f"⚠️ Warning during connection cleanup: {cleanup_error}")
bd_engine = authn.getConnectionDB(base_env)
query = SQLFileToString(path,sql_filename,bucket)
params = ()
if template_params:
from jinjasql import JinjaSql
j = JinjaSql()
query, bind_params = j.prepare_query(query, template_params)
else:
query = query.replace("%","%%")
bind_params = None
def SQLFileToDataframeBatch(path: str, sql_filename: str, base_env: str,
batch_size: int = 50000, template_params=None,
bucket: str = "klog-etl") -> pd.DataFrame:
"""
Optimized version for very large SQL results using chunked processing.
return pd.read_sql_query(query, bd_engine, params=bind_params)
This function is specifically designed for queries that return millions of rows
and helps manage memory usage by processing data in batches.
Parameters
----------
path : str
S3 path where the SQL file is located
sql_filename : str
Name of the SQL file to execute
base_env : str
Database environment identifier for connection
batch_size : int, default 50000
Number of rows to process in each batch
template_params : dict, optional
Parameters for Jinja2 template rendering in SQL
bucket : str, default "klog-etl"
S3 bucket containing the SQL file
Returns
-------
pd.DataFrame
Complete result of SQL query execution
"""
return SQLFileToDataframe(
path=path,
sql_filename=sql_filename,
base_env=base_env,
template_params=template_params,
bucket=bucket,
chunksize=batch_size,
performance_monitoring=True
)

@@ -699,0 +849,0 @@

Metadata-Version: 2.4
Name: kflow
Version: 1.1.0
Summary: KLog.co package for ETLs
Version: 1.2.0
Summary: KLog.co package for ETLs with performance optimizations
Home-page: https://github.com/teu-ai/etl

@@ -20,2 +20,5 @@ Author: KLog.co Data & BI

Requires-Dist: smart_open
Requires-Dist: jinjasql
Requires-Dist: fastparquet
Requires-Dist: pyarrow
Dynamic: author

@@ -32,2 +35,2 @@ Dynamic: author-email

Functions to upload, download and transform data, from different sources, usually to be used with Pandas.
High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines.

@@ -8,8 +8,8 @@ """This module does blah blah."""

if github_run_number is not None:
version = f"1.1.{github_run_number}"
version = f"1.2.{github_run_number}"
setup(name='kflow',
version=version if version != None else "1.1.0",
description='KLog.co package for ETLs',
long_description='Functions to upload, download and transform data, from different sources, usually to be used with Pandas.',
version=version if version != None else "1.2.0",
description='KLog.co package for ETLs with performance optimizations',
long_description='High-performance functions to extract, transform, and load data from various sources. Features optimized chunked processing, connection pooling, and performance monitoring for data engineering pipelines.',
classifiers=[

@@ -34,4 +34,7 @@ 'Development Status :: 3 - Alpha',

'boto3',
'smart_open'
'smart_open',
'jinjasql',
'fastparquet',
'pyarrow'
]
)