Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

kflow

Package Overview
Dependencies
Maintainers
1
Versions
102
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kflow - npm Package Compare versions

Comparing version
1.0.148
to
1.1.0
+5
kflow.egg-info/requires.txt
sqlalchemy
pandas
psycopg2-binary
boto3
smart_open
from . import authn
from . import extract
from . import load
from . import tools
from . import log
from . import monitoring
"""
Connection monitoring utilities for kflow ETL system
"""
import logging
from datetime import datetime
from kflow import authn
def get_connection_pool_stats(base_env: str = 'WarehouseProd'):
"""
Get statistics about connection pool usage
"""
try:
engine = authn.getConnectionDB(base_env)
pool = engine.pool
stats = {
'timestamp': datetime.now().isoformat(),
'environment': base_env,
'pool_size': pool.size(),
'checked_in': pool.checkedin(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'pool_status': 'healthy' if pool.checkedin() > 0 else 'warning'
}
logging.info(f"Connection Pool Stats for {base_env}: {stats}")
return stats
except Exception as e:
logging.error(f"Error getting connection pool stats: {str(e)}")
return None
def test_redshift_connection():
"""
Test Redshift connection and query performance
"""
try:
engine = authn.getConnectionDB('WarehouseProd')
with engine.connect() as conn:
# Test basic connectivity
result = conn.execute("SELECT 1 as test")
test_value = result.fetchone()[0]
if test_value == 1:
logging.info("✅ Redshift connection test successful")
return True
else:
logging.error("❌ Redshift connection test failed")
return False
except Exception as e:
logging.error(f"❌ Redshift connection test failed: {str(e)}")
return False
def check_long_running_queries(base_env: str = 'WarehouseProd'):
"""
Check for long-running queries that might be causing locks
"""
try:
engine = authn.getConnectionDB(base_env)
query = """
SELECT
pid,
user_name,
db_name,
query,
starttime,
DATEDIFF(second, starttime, GETDATE()) as duration_seconds
FROM stv_recents
WHERE status = 'Running'
AND DATEDIFF(second, starttime, GETDATE()) > 300
ORDER BY starttime;
"""
with engine.connect() as conn:
result = conn.execute(query)
long_queries = result.fetchall()
if long_queries:
logging.warning(f"Found {len(long_queries)} long-running queries (>5 minutes)")
for query in long_queries:
logging.warning(f"PID {query[0]}: {query[5]}s - {query[3][:100]}...")
else:
logging.info("No long-running queries detected")
return long_queries
except Exception as e:
logging.error(f"Error checking long-running queries: {str(e)}")
return []
def cleanup_idle_connections():
"""
Force cleanup of idle connections in the pool
"""
try:
from kflow.authn import _connection_pools
for env_name, engine in _connection_pools.items():
if hasattr(engine, 'pool'):
# Force pool cleanup
engine.pool.recreate()
logging.info(f"Cleaned up connection pool for {env_name}")
except Exception as e:
logging.error(f"Error cleaning up connections: {str(e)}")
+18
-3

@@ -1,4 +0,4 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: kflow
Version: 1.0.148
Version: 1.1.0
Summary: KLog.co package for ETLs

@@ -9,3 +9,3 @@ Home-page: https://github.com/teu-ai/etl

License: Apache
Keywords: etl data science airflow
Keywords: etl data science airflow redshift connection pooling
Classifier: Development Status :: 3 - Alpha

@@ -16,3 +16,18 @@ Classifier: License :: OSI Approved :: Apache Software License

License-File: LICENSE.txt
Requires-Dist: sqlalchemy
Requires-Dist: pandas
Requires-Dist: psycopg2-binary
Requires-Dist: boto3
Requires-Dist: smart_open
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: requires-dist
Dynamic: summary
Functions to upload, download and transform data, from different sources, usually to be used with Pandas.
LICENSE.txt
README.md
setup.py
kflow/__init__.py
kflow/authn.py

@@ -8,2 +9,3 @@ kflow/extract.py

kflow/log.py
kflow/monitoring.py
kflow/tools.py

@@ -13,2 +15,3 @@ kflow.egg-info/PKG-INFO

kflow.egg-info/dependency_links.txt
kflow.egg-info/requires.txt
kflow.egg-info/top_level.txt
+84
-35
import os
import logging
from functools import lru_cache
# Add a connection pool cache
_connection_pools = {}
@lru_cache(maxsize=5)
def _get_connection_string(base_env: str):
"""Cache connection strings to avoid repeated AWS Secrets Manager calls"""
rename_secret = {"PrismaProd":"prisma_bd",
"WarehouseProd":"secret-redshift-cluster-1",
"Vekna":"vekna_cluster",
"Data_RDS":"data_rds"
}
try:
secret_name = rename_secret[base_env]
except:
raise ValueError('No se reconoce la BD solicitada')
secret_value = get_secret(secret_name)
if secret_name == 'vekna_cluster':
sql_connector="postgresql"
db_name = 'vekna'
elif secret_name == 'data_rds':
sql_connector="postgresql"
db_name = 'postgres'
elif secret_name == 'secret-redshift-cluster-1':
sql_connector="redshift+psycopg2"
db_name = 'warehouse'
elif secret_name == 'prisma_bd':
sql_connector="postgresql"
db_name = 'prisma'
else:
raise ValueError('Current BD environment not recognized')
return f"{sql_connector}://{secret_value['username']}:{secret_value['password']}@{secret_value['host']}:{secret_value['port']}/{db_name}"
def getConnectionDB(base_env:str):
"""
Get a database connection with connection pooling.
Reuses existing connection pools to avoid connection churning.
"""
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Check if we already have a connection pool for this environment
if base_env in _connection_pools:
return _connection_pools[base_env]
conn_string = _get_connection_string(base_env)
# Create engine with connection pooling
if base_env == 'WarehouseProd':
# Redshift-specific pooling settings
engine = create_engine(
conn_string,
poolclass=QueuePool,
pool_size=5, # Keep 5 persistent connections
max_overflow=10, # Allow up to 10 additional connections
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Validate connections before use
echo=False,
connect_args={
'application_name': 'kflow_etl',
'options': '-c statement_timeout=1800000' # 30 minute timeout
}
)
else:
# PostgreSQL pooling settings
engine = create_engine(
conn_string,
poolclass=QueuePool,
pool_size=3,
max_overflow=5,
pool_recycle=3600,
pool_pre_ping=True,
echo=False
)
# Cache the engine
_connection_pools[base_env] = engine
logging.info(f"Created new connection pool for {base_env}")
return engine
def _get_s3_local_env():

@@ -33,37 +117,2 @@

def getConnectionDB(base_env:str):
rename_secret = {"PrismaProd":"prisma_bd",
"WarehouseProd":"secret-redshift-cluster-1",
"Vekna":"vekna_cluster",
"Data_RDS":"data_rds"
}
try:
secret_name = rename_secret[base_env]
except:
raise ValueError('No se reconoce la BD solicitada')
secret_value = get_secret(secret_name)
from sqlalchemy import create_engine
if secret_name == 'vekna_cluster':
sql_connector="postgresql"
db_name = 'vekna'
elif secret_name == 'data_rds':
sql_connector="postgresql"
db_name = 'postgres'
elif secret_name == 'secret-redshift-cluster-1':
sql_connector="redshift+psycopg2"
db_name = 'warehouse'
elif secret_name == 'prisma_bd':
sql_connector="postgresql"
db_name = 'prisma'
else:
raise ValueError('Current BD environment not recognized')
conn_bd = f"{sql_connector}://{secret_value['username']}:{secret_value['password']}@{secret_value['host']}:{secret_value['port']}/{db_name}"
return create_engine(conn_bd)
def awsClient(type_auth:str='client',service:str='s3'):

@@ -70,0 +119,0 @@ """

@@ -216,2 +216,3 @@ import logging

base_env:str ='WarehouseProd',
warehouse_engine = None,
to_lake_args = None,

@@ -242,3 +243,5 @@ to_sql_kwargs = {"index":False, 'if_exists':'append', 'schema':'public', "method":'multi','chunksize':5000}

warehouse_engine = authn.getConnectionDB(base_env)
# Use provided engine or create new one
if warehouse_engine is None:
warehouse_engine = authn.getConnectionDB(base_env)

@@ -245,0 +248,0 @@ if to_lake_args:

@@ -1,4 +0,4 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: kflow
Version: 1.0.148
Version: 1.1.0
Summary: KLog.co package for ETLs

@@ -9,3 +9,3 @@ Home-page: https://github.com/teu-ai/etl

License: Apache
Keywords: etl data science airflow
Keywords: etl data science airflow redshift connection pooling
Classifier: Development Status :: 3 - Alpha

@@ -16,3 +16,18 @@ Classifier: License :: OSI Approved :: Apache Software License

License-File: LICENSE.txt
Requires-Dist: sqlalchemy
Requires-Dist: pandas
Requires-Dist: psycopg2-binary
Requires-Dist: boto3
Requires-Dist: smart_open
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: requires-dist
Dynamic: summary
Functions to upload, download and transform data, from different sources, usually to be used with Pandas.

@@ -8,6 +8,6 @@ """This module does blah blah."""

if github_run_number is not None:
version = f"1.0.{github_run_number}"
version = f"1.1.{github_run_number}"
setup(name='kflow',
version=version if version != None else "1.0.0",
version=version if version != None else "1.1.0",
description='KLog.co package for ETLs',

@@ -21,3 +21,3 @@ long_description='Functions to upload, download and transform data, from different sources, usually to be used with Pandas.',

],
keywords='etl data science airflow',
keywords='etl data science airflow redshift connection pooling',
url='https://github.com/teu-ai/etl',

@@ -29,3 +29,10 @@ author='KLog.co Data & BI',

packages=['kflow'],
include_package_data=True
include_package_data=True,
install_requires=[
'sqlalchemy',
'pandas',
'psycopg2-binary',
'boto3',
'smart_open'
]
)