kflow
Advanced tools
| sqlalchemy | ||
| pandas | ||
| psycopg2-binary | ||
| boto3 | ||
| smart_open |
| from . import authn | ||
| from . import extract | ||
| from . import load | ||
| from . import tools | ||
| from . import log | ||
| from . import monitoring |
| """ | ||
| Connection monitoring utilities for kflow ETL system | ||
| """ | ||
| import logging | ||
| from datetime import datetime | ||
| from kflow import authn | ||
| def get_connection_pool_stats(base_env: str = 'WarehouseProd'): | ||
| """ | ||
| Get statistics about connection pool usage | ||
| """ | ||
| try: | ||
| engine = authn.getConnectionDB(base_env) | ||
| pool = engine.pool | ||
| stats = { | ||
| 'timestamp': datetime.now().isoformat(), | ||
| 'environment': base_env, | ||
| 'pool_size': pool.size(), | ||
| 'checked_in': pool.checkedin(), | ||
| 'checked_out': pool.checkedout(), | ||
| 'overflow': pool.overflow(), | ||
| 'pool_status': 'healthy' if pool.checkedin() > 0 else 'warning' | ||
| } | ||
| logging.info(f"Connection Pool Stats for {base_env}: {stats}") | ||
| return stats | ||
| except Exception as e: | ||
| logging.error(f"Error getting connection pool stats: {str(e)}") | ||
| return None | ||
| def test_redshift_connection(): | ||
| """ | ||
| Test Redshift connection and query performance | ||
| """ | ||
| try: | ||
| engine = authn.getConnectionDB('WarehouseProd') | ||
| with engine.connect() as conn: | ||
| # Test basic connectivity | ||
| result = conn.execute("SELECT 1 as test") | ||
| test_value = result.fetchone()[0] | ||
| if test_value == 1: | ||
| logging.info("✅ Redshift connection test successful") | ||
| return True | ||
| else: | ||
| logging.error("❌ Redshift connection test failed") | ||
| return False | ||
| except Exception as e: | ||
| logging.error(f"❌ Redshift connection test failed: {str(e)}") | ||
| return False | ||
| def check_long_running_queries(base_env: str = 'WarehouseProd'): | ||
| """ | ||
| Check for long-running queries that might be causing locks | ||
| """ | ||
| try: | ||
| engine = authn.getConnectionDB(base_env) | ||
| query = """ | ||
| SELECT | ||
| pid, | ||
| user_name, | ||
| db_name, | ||
| query, | ||
| starttime, | ||
| DATEDIFF(second, starttime, GETDATE()) as duration_seconds | ||
| FROM stv_recents | ||
| WHERE status = 'Running' | ||
| AND DATEDIFF(second, starttime, GETDATE()) > 300 | ||
| ORDER BY starttime; | ||
| """ | ||
| with engine.connect() as conn: | ||
| result = conn.execute(query) | ||
| long_queries = result.fetchall() | ||
| if long_queries: | ||
| logging.warning(f"Found {len(long_queries)} long-running queries (>5 minutes)") | ||
| for query in long_queries: | ||
| logging.warning(f"PID {query[0]}: {query[5]}s - {query[3][:100]}...") | ||
| else: | ||
| logging.info("No long-running queries detected") | ||
| return long_queries | ||
| except Exception as e: | ||
| logging.error(f"Error checking long-running queries: {str(e)}") | ||
| return [] | ||
| def cleanup_idle_connections(): | ||
| """ | ||
| Force cleanup of idle connections in the pool | ||
| """ | ||
| try: | ||
| from kflow.authn import _connection_pools | ||
| for env_name, engine in _connection_pools.items(): | ||
| if hasattr(engine, 'pool'): | ||
| # Force pool cleanup | ||
| engine.pool.recreate() | ||
| logging.info(f"Cleaned up connection pool for {env_name}") | ||
| except Exception as e: | ||
| logging.error(f"Error cleaning up connections: {str(e)}") |
@@ -1,4 +0,4 @@ | ||
| Metadata-Version: 2.1 | ||
| Metadata-Version: 2.4 | ||
| Name: kflow | ||
| Version: 1.0.148 | ||
| Version: 1.1.0 | ||
| Summary: KLog.co package for ETLs | ||
@@ -9,3 +9,3 @@ Home-page: https://github.com/teu-ai/etl | ||
| License: Apache | ||
| Keywords: etl data science airflow | ||
| Keywords: etl data science airflow redshift connection pooling | ||
| Classifier: Development Status :: 3 - Alpha | ||
@@ -16,3 +16,18 @@ Classifier: License :: OSI Approved :: Apache Software License | ||
| License-File: LICENSE.txt | ||
| Requires-Dist: sqlalchemy | ||
| Requires-Dist: pandas | ||
| Requires-Dist: psycopg2-binary | ||
| Requires-Dist: boto3 | ||
| Requires-Dist: smart_open | ||
| Dynamic: author | ||
| Dynamic: author-email | ||
| Dynamic: classifier | ||
| Dynamic: description | ||
| Dynamic: home-page | ||
| Dynamic: keywords | ||
| Dynamic: license | ||
| Dynamic: license-file | ||
| Dynamic: requires-dist | ||
| Dynamic: summary | ||
| Functions to upload, download and transform data, from different sources, usually to be used with Pandas. |
| LICENSE.txt | ||
| README.md | ||
| setup.py | ||
| kflow/__init__.py | ||
| kflow/authn.py | ||
@@ -8,2 +9,3 @@ kflow/extract.py | ||
| kflow/log.py | ||
| kflow/monitoring.py | ||
| kflow/tools.py | ||
@@ -13,2 +15,3 @@ kflow.egg-info/PKG-INFO | ||
| kflow.egg-info/dependency_links.txt | ||
| kflow.egg-info/requires.txt | ||
| kflow.egg-info/top_level.txt |
+84
-35
| import os | ||
| import logging | ||
| from functools import lru_cache | ||
| # Add a connection pool cache | ||
| _connection_pools = {} | ||
| @lru_cache(maxsize=5) | ||
| def _get_connection_string(base_env: str): | ||
| """Cache connection strings to avoid repeated AWS Secrets Manager calls""" | ||
| rename_secret = {"PrismaProd":"prisma_bd", | ||
| "WarehouseProd":"secret-redshift-cluster-1", | ||
| "Vekna":"vekna_cluster", | ||
| "Data_RDS":"data_rds" | ||
| } | ||
| try: | ||
| secret_name = rename_secret[base_env] | ||
| except: | ||
| raise ValueError('No se reconoce la BD solicitada') | ||
| secret_value = get_secret(secret_name) | ||
| if secret_name == 'vekna_cluster': | ||
| sql_connector="postgresql" | ||
| db_name = 'vekna' | ||
| elif secret_name == 'data_rds': | ||
| sql_connector="postgresql" | ||
| db_name = 'postgres' | ||
| elif secret_name == 'secret-redshift-cluster-1': | ||
| sql_connector="redshift+psycopg2" | ||
| db_name = 'warehouse' | ||
| elif secret_name == 'prisma_bd': | ||
| sql_connector="postgresql" | ||
| db_name = 'prisma' | ||
| else: | ||
| raise ValueError('Current BD environment not recognized') | ||
| return f"{sql_connector}://{secret_value['username']}:{secret_value['password']}@{secret_value['host']}:{secret_value['port']}/{db_name}" | ||
| def getConnectionDB(base_env:str): | ||
| """ | ||
| Get a database connection with connection pooling. | ||
| Reuses existing connection pools to avoid connection churning. | ||
| """ | ||
| from sqlalchemy import create_engine | ||
| from sqlalchemy.pool import QueuePool | ||
| # Check if we already have a connection pool for this environment | ||
| if base_env in _connection_pools: | ||
| return _connection_pools[base_env] | ||
| conn_string = _get_connection_string(base_env) | ||
| # Create engine with connection pooling | ||
| if base_env == 'WarehouseProd': | ||
| # Redshift-specific pooling settings | ||
| engine = create_engine( | ||
| conn_string, | ||
| poolclass=QueuePool, | ||
| pool_size=5, # Keep 5 persistent connections | ||
| max_overflow=10, # Allow up to 10 additional connections | ||
| pool_recycle=3600, # Recycle connections after 1 hour | ||
| pool_pre_ping=True, # Validate connections before use | ||
| echo=False, | ||
| connect_args={ | ||
| 'application_name': 'kflow_etl', | ||
| 'options': '-c statement_timeout=1800000' # 30 minute timeout | ||
| } | ||
| ) | ||
| else: | ||
| # PostgreSQL pooling settings | ||
| engine = create_engine( | ||
| conn_string, | ||
| poolclass=QueuePool, | ||
| pool_size=3, | ||
| max_overflow=5, | ||
| pool_recycle=3600, | ||
| pool_pre_ping=True, | ||
| echo=False | ||
| ) | ||
| # Cache the engine | ||
| _connection_pools[base_env] = engine | ||
| logging.info(f"Created new connection pool for {base_env}") | ||
| return engine | ||
| def _get_s3_local_env(): | ||
@@ -33,37 +117,2 @@ | ||
| def getConnectionDB(base_env:str): | ||
| rename_secret = {"PrismaProd":"prisma_bd", | ||
| "WarehouseProd":"secret-redshift-cluster-1", | ||
| "Vekna":"vekna_cluster", | ||
| "Data_RDS":"data_rds" | ||
| } | ||
| try: | ||
| secret_name = rename_secret[base_env] | ||
| except: | ||
| raise ValueError('No se reconoce la BD solicitada') | ||
| secret_value = get_secret(secret_name) | ||
| from sqlalchemy import create_engine | ||
| if secret_name == 'vekna_cluster': | ||
| sql_connector="postgresql" | ||
| db_name = 'vekna' | ||
| elif secret_name == 'data_rds': | ||
| sql_connector="postgresql" | ||
| db_name = 'postgres' | ||
| elif secret_name == 'secret-redshift-cluster-1': | ||
| sql_connector="redshift+psycopg2" | ||
| db_name = 'warehouse' | ||
| elif secret_name == 'prisma_bd': | ||
| sql_connector="postgresql" | ||
| db_name = 'prisma' | ||
| else: | ||
| raise ValueError('Current BD environment not recognized') | ||
| conn_bd = f"{sql_connector}://{secret_value['username']}:{secret_value['password']}@{secret_value['host']}:{secret_value['port']}/{db_name}" | ||
| return create_engine(conn_bd) | ||
| def awsClient(type_auth:str='client',service:str='s3'): | ||
@@ -70,0 +119,0 @@ """ |
+4
-1
@@ -216,2 +216,3 @@ import logging | ||
| base_env:str ='WarehouseProd', | ||
| warehouse_engine = None, | ||
| to_lake_args = None, | ||
@@ -242,3 +243,5 @@ to_sql_kwargs = {"index":False, 'if_exists':'append', 'schema':'public', "method":'multi','chunksize':5000} | ||
| warehouse_engine = authn.getConnectionDB(base_env) | ||
| # Use provided engine or create new one | ||
| if warehouse_engine is None: | ||
| warehouse_engine = authn.getConnectionDB(base_env) | ||
@@ -245,0 +248,0 @@ if to_lake_args: |
+18
-3
@@ -1,4 +0,4 @@ | ||
| Metadata-Version: 2.1 | ||
| Metadata-Version: 2.4 | ||
| Name: kflow | ||
| Version: 1.0.148 | ||
| Version: 1.1.0 | ||
| Summary: KLog.co package for ETLs | ||
@@ -9,3 +9,3 @@ Home-page: https://github.com/teu-ai/etl | ||
| License: Apache | ||
| Keywords: etl data science airflow | ||
| Keywords: etl data science airflow redshift connection pooling | ||
| Classifier: Development Status :: 3 - Alpha | ||
@@ -16,3 +16,18 @@ Classifier: License :: OSI Approved :: Apache Software License | ||
| License-File: LICENSE.txt | ||
| Requires-Dist: sqlalchemy | ||
| Requires-Dist: pandas | ||
| Requires-Dist: psycopg2-binary | ||
| Requires-Dist: boto3 | ||
| Requires-Dist: smart_open | ||
| Dynamic: author | ||
| Dynamic: author-email | ||
| Dynamic: classifier | ||
| Dynamic: description | ||
| Dynamic: home-page | ||
| Dynamic: keywords | ||
| Dynamic: license | ||
| Dynamic: license-file | ||
| Dynamic: requires-dist | ||
| Dynamic: summary | ||
| Functions to upload, download and transform data, from different sources, usually to be used with Pandas. |
+11
-4
@@ -8,6 +8,6 @@ """This module does blah blah.""" | ||
| if github_run_number is not None: | ||
| version = f"1.0.{github_run_number}" | ||
| version = f"1.1.{github_run_number}" | ||
| setup(name='kflow', | ||
| version=version if version != None else "1.0.0", | ||
| version=version if version != None else "1.1.0", | ||
| description='KLog.co package for ETLs', | ||
@@ -21,3 +21,3 @@ long_description='Functions to upload, download and transform data, from different sources, usually to be used with Pandas.', | ||
| ], | ||
| keywords='etl data science airflow', | ||
| keywords='etl data science airflow redshift connection pooling', | ||
| url='https://github.com/teu-ai/etl', | ||
@@ -29,3 +29,10 @@ author='KLog.co Data & BI', | ||
| packages=['kflow'], | ||
| include_package_data=True | ||
| include_package_data=True, | ||
| install_requires=[ | ||
| 'sqlalchemy', | ||
| 'pandas', | ||
| 'psycopg2-binary', | ||
| 'boto3', | ||
| 'smart_open' | ||
| ] | ||
| ) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
99957
6.86%17
21.43%1973
8.23%