Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

mb_base

Package Overview
Dependencies
Maintainers
1
Versions
74
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mb_base - npm Package Compare versions

Comparing version
2.0.1
to
2.0.8
+14
mb/numpy/__init__.py
from numpy import *
from numpy import __version__
import numpy as _np
ndarray = _np.ndarray
import numpy.core as core
from numpy.core import round
import numpy.lib as lib
import numpy.polynomial as polynomial
import numpy.linalg as linag
import numpy.fft as fft
import numpy.matlib as matlib
import numpy.random as random
from matplotlib.pyplot import *
from .emb_viz import *
## file to view pca / umap / tsne embeddings in 2d or 3d with tf projector and plotly
from mb import pandas as pd
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.preprocessing import LabelEncoder
from matplotlib import pyplot as plt
import os
import numpy as np
from mb.utils.logging import logg
__all__ = ['get_emb','viz_emb','generate_sprite_images']
def get_emb(df: pd.DataFrame, emb= 'embeddings', emb_type='umap', dim=2,keep_original_emb=False,file_save=None, logger=None,**kwargs):
"""
Visualize embeddings in 2d or 3d with tf projector and plotly
Args:
df (pd.DataFrame): dataframe containing embeddings. File location or DataFrame object.
emb (str): name of embedding column
emb_type (str, optional): embedding type. Defaults to 'umap'.
dim (int, optional): embedding dimension. Defaults to 2.
keep_original_emb (bool, optional): keep original embedding column. Defaults to False.
file_save (str, optional): file location to save embeddings csv. Defaults to None.
Output:
df (pd.DataFrame): dataframe containing embeddings. Original embedding column is dropped.
"""
if type(df) is not pd.DataFrame:
logg.info('Type of df :{}'.format(str(type(df))),logger=logger)
df = pd.load_any_df(df)
logg.info('Loaded dataframe from path {}'.format(str(df)),logger=logger)
logg.info('Data shape {}'.format(str(df.shape)),logger=logger)
logg.info('Data columns {}'.format(str(df.columns)),logger=logger)
logg.info('Performing {} on {} embeddings'.format(emb_type,emb),logger=logger)
if emb_type=='pca':
pca = PCA(n_components=dim)
pca_emb = pca.fit_transform(list(df[emb]))
logg.info('First PCA transform result : {}'.format(str(pca_emb[0])),logger=logger)
temp_res = list(pca_emb)
if emb_type=='tsne':
tsne = TSNE(n_components=dim, verbose=1, perplexity=30, n_iter=250, **kwargs)
df[emb] = df[emb].apply(lambda x: np.array(x))
k1 = np.vstack(df[emb])
tsne_emb = tsne.fit_transform(k1)
logg.info('First TSNE transform result : {}'.format(str(tsne_emb[0])),logger=logger)
temp_res = list(tsne_emb)
if emb_type=='umap':
try:
import umap
except ImportError:
logg.info('umap not installed, installing umap',logger=logger)
os.system('pip install umap-learn')
import umap
umap_emb = umap.UMAP(n_components=2,**kwargs).fit_transform(list(df[emb]))
logg.info('First UMAP transform result : {}'.format(str(umap_emb[0])),logger=logger)
temp_res = list(umap_emb)
df['emb_res'] = temp_res
if keep_original_emb==False:
df.drop(emb,axis=1,inplace=True)
logg.info('Dropped original embedding column',logger=logger)
if file_save:
df.to_csv(file_save + '/emb_res.csv',index=False)
else:
df.to_csv('./emb_res.csv',index=False)
return df
def viz_emb(df: pd.DataFrame, emb_column='emb_res' , target_column='taxcode', viz_type ='plt',dash_viz=False,limit = None,image_tb=None , file_save=None,
dont_viz=False, logger=None):
"""
Vizualize embeddings in 2d or 3d with tf projector and plotly
Args:
df (pd.DataFrame): dataframe containing embeddings. File location or DataFrame object.
emb_column (str): name of embedding column
target_column (str): name of target column. It can be used to color the embeddings. Defaults to 'taxcode'. Can be None too.
viz_type (str, optional): visualization type: 'plt','pe' or 'tf'. Defaults to 'plt'.
dash_viz (bool, optional): if True, then it will create a dash app. Defaults to False.
limit (int, optional): limit number of data points to visualize. Takes random samples. Defaults to None.
image_tb (str, optional): image location column to be used in tensorboard projector if want to create with images. Defaults to None.
file_save (str, optional): file location to save plot. If viz_type='tf', then it wont be saved. Defaults to None.
dont_viz (bool, optional): if True, then it wont visualize. Defaults to False.
logger (logger, optional): logger object. Defaults to None.
Output:
None
"""
if type(df) != pd.DataFrame:
logg.info('Type of df :{}'.format(str(type(df))),logger=logger)
df = pd.load_any_df(df)
if limit:
df = df.sample(limit)
assert emb_column in df.columns, f'Embedding column not found in dataframe: {df.columns}'
emb_data = np.concatenate(np.array(df[emb_column]))
emb_data = emb_data.reshape(-1,2) #change this for 3d
logg.info('Embedding data shape {}'.format(str(emb_data.shape)),logger=logger)
if target_column:
target_data = list(df[target_column])
if type(target_data[0]) == str:
target_data = LabelEncoder().fit_transform(target_data)
assert target_column==None or target_column in df.columns, f'Target column not found in dataframe : {df.columns}'
if file_save == None:
file_save = './emb_plot.png'
# Visualize the embeddings using a scatter plot
if viz_type=='plt' and target_column:
plt.scatter(emb_data[:, 0], emb_data[:, 1], c=target_data, cmap='viridis')
#plt.legend()
if dont_viz==False:
plt.show()
if file_save:
plt.savefig(file_save+'/emb_plot.png')
elif viz_type=='plt' and target_column==None:
plt.scatter(emb_data[:, 0], emb_data[:, 1])
#plt.legend()
if dont_viz==False:
plt.show()
if file_save:
plt.savefig(file_save+'/emb_plot.png')
elif viz_type=='pe' and target_column:
#importing plotly and dash
import plotly.express as px
if dash_viz:
code = """
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd
import numpy as np
# Create the Dash app
app = dash.Dash(__name__)
# Define the layout of the app
df=pd.read_csv('./emb_res.csv')
df['emb_res_np'] = df['emb_res'].apply(lambda x:np.fromstring(x[1:-1],sep=' '))
#emb_data = np.concatenate(np.array(df['emb_res_np']))
emb_data = np.array(df['emb_res_np'].tolist()) # Convert to 2D array
app.layout = html.Div([
dcc.Graph(
id='scatter-plot',
figure=px.scatter(df , x=emb_data[:, 0], y=emb_data[:, 1], color=df['menu_code'], color_continuous_scale = 'rainbow'),
config={'staticPlot': False}),
html.Div([html.Img(id='selected-image', style={'width': '50%'}),
html.Div(id='hover-data-output')])])
# Define a callback function for updating the hover data
@app.callback([Output('hover-data-output', 'children'),Output('selected-image', 'src')],
[Input('scatter-plot', 'hoverData')])
def display_hover_data(hover_data):
if hover_data is None:
return ("Hover over a point to see data")
# Extract data from hover_data
point_index = hover_data['points'][0]['pointIndex']
target_value = df.iloc[point_index]['event_id']
image_url = df.iloc[point_index]['after_image_url']
return f"Hovered over point {target_value}. Image URL: {image_url}",image_url
# Run the app in the notebook
if __name__ == '__main__':
app.run_server(mode='inline', port=8927,host='0.0.0.0')"""
with open(file_save + '/dash_app.py', 'w') as f:
f.write(code)
else:
fig = px.scatter(x=emb_data[:, 0], y=emb_data[:, 1], color=target_data, color_continuous_scale = 'rainbow',
title = f"Similarity to data visualised using dim reduction")
fig.update_layout(width = 650,height = 650)
if dont_viz==False:
fig.show()
if file_save:
fig.write_html(file_save+'/emb_plot.html',full_html=True)
elif viz_type=='tf' and target_column:
##check from here
log_dir = './tp_logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
emb_data = np.array(emb_data)
loc_emb_data = os.path.join(log_dir,'emb_data_tf.tsv')
np.savetxt(loc_emb_data, emb_data, delimiter='\t')
target_data = np.array(target_data)
loc_target_data = os.path.join(log_dir,'labels_tf.tsv')
np.savetxt(loc_target_data,target_data,delimiter='\t')
if image_tb is not None:
loc_sprite_image = os.path.join(log_dir,'sprite_image.png')
generate_sprite_images(df[image_tb], file_save=loc_sprite_image, img_size=28 ,logger=logger)
from tensorboard.plugins import projector
config = projector.ProjectorConfig()
embedding = config.embeddings.add()
embedding.tensor_path = loc_emb_data
embedding.metadata_path = loc_target_data
if image_tb is not None:
embedding.sprite.image_path = loc_sprite_image
embedding.sprite.single_image_dim.extend([32, 32])
with open(os.path.join(log_dir, 'projector_config.pbtxt'), 'w') as f:
f.write(str(config))
logg.info('Run tensorboard --logdir={} to view embeddings'.format(log_dir),logger=logger)
logg.info('if on jupyter notebook, run below code to view embeddings in notebook',logger=logger)
logg.info('%load_ext tensorboard',logger=logger)
logg.info('%tensorboard --logdir={}'.format(log_dir),logger=logger)
def generate_sprite_images(img_paths, file_save=None, img_size= 28 ,logger=None):
"""
Create a sprite image consisting of images
Args:
img_paths (list or pd.DataFrame): list of image paths
file_save (str, optional): file location to save sprite image. Defaults to None. Will save in current directory.
img_size (int, optional): image size. Defaults to 28.
logger (logger, optional): logger object. Defaults to None.
Output:
sprite_image (np.array): sprite image
"""
import tensorflow as tf
if type(img_paths) is not list:
img_paths = list(img_paths)
#create sprite image
images = [tf.io.read_file(img_path) for img_path in img_paths]
images = [tf.image.decode_image(img) for img in images]
images = [tf.image.resize(img, (img_size, img_size)) for img in images]
images = [img.numpy() for img in images]
sprite_image = np.concatenate(images, axis=1)
if file_save:
np.save(file_save,sprite_image)
tf.keras.utils.save_img(file_save,sprite_image)
else:
np.save('./sprite_image',sprite_image)
tf.keras.utils.save_img('./sprite_image.png',sprite_image)
return sprite_image
import matplotlib.pyplot as plt
import numpy as np
from concurrent.futures import ThreadPoolExecutor
__all__ = ['dynamic_plt']
def dynamic_plt(imgs: list,labels: list =None, bboxes: list =None ,
bboxes_label: list = None,num_cols: int = 2, figsize=(16, 12),
return_fig: bool = False, show: bool = True, save_path: str = None,
max_workers: int = 4):
"""
Create dynamic plots based on the number of images and desired columns
Args:
imgs: List of images or paths to images
labels: List of labels corresponding to the images (default: None)
bboxes: List of bounding boxes corresponding to the images (default: None)
bboxes_label : List of labels corresponding to the bounding boxes (default: None) (Fontsize=12)
num_cols: Number of columns for the subplot grid (default: 2)
figsize: Size of the figure (default: (16, 12))
return_fig: Return the figure object (default: False)
show: Show the plot (default: True)
save_path: Path to save the plot (default: None)
max_workers: Maximum number of threads to use for loading images (default: 4)
Return:
None
"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
imgs = list(executor.map(lambda x: plt.imread(x) if isinstance(x, str) else x, imgs))
num_images = len(imgs)
num_rows = int(np.ceil(num_images / num_cols))
fig, axes = plt.subplots(num_rows, num_cols, figsize=figsize)
# Ensure axes is always 2D
if num_rows == 1:
axes = axes.reshape(1, -1)
for i, img in enumerate(imgs):
row = i // num_cols
col = i % num_cols
ax = axes[row, col]
if img.shape[0]==3:
img = np.moveaxis(img, 0, -1)
ax.imshow(img)
ax.axis('off')
if labels:
ax.set_title(str(labels[i]))
if bboxes:
img_bboxes = bboxes[i]
if bboxes_label:
img_labels = bboxes_label[i]
else:
img_labels = None
for bbox_val, bbox in enumerate(img_bboxes):
rect = plt.Rectangle((bbox[0], bbox[1]), bbox[2], bbox[3],
fill=False, edgecolor='red', linewidth=2)
ax.add_patch(rect)
if img_labels:
try:
ax.text(bbox[0], bbox[1], img_labels[bbox_val], color='red', fontsize=12)
except:
pass
# Remove any unused subplots
for j in range(num_images, num_rows * num_cols):
row = j // num_cols
col = j % num_cols
fig.delaxes(axes[row, col])
plt.tight_layout()
if save_path:
plt.savefig(save_path)
if show:
plt.show()
if return_fig:
return fig
MAJOR_VERSION = 2
MINOR_VERSION = 0
PATCH_VERSION = 8
version = '{}.{}.{}'.format(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION)
__all__ = ['MAJOR_VERSION', 'MINOR_VERSION', 'PATCH_VERSION', 'version']
+3
-2
Metadata-Version: 2.4
Name: mb_base
Version: 2.0.1
Summary: Pandas Package for mb.* packages
Version: 2.0.8
Summary: Meta Package for mb_* packages
Author: ['Malav Bateriwala']
Requires-Python: >=3.8
Requires-Dist: mb_utils
Requires-Dist: mb_pandas
Dynamic: author

@@ -9,0 +10,0 @@ Dynamic: requires-dist

+10
-11

@@ -1,11 +0,10 @@

mb/pandas/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
mb/pandas/aio.py,sha256=p50zewrROjznbAURUNOSGKHOvvA-lg7I3ifh3vEUBlM,3256
mb/pandas/convert_data.py,sha256=YIlMT7L64ZdVaHwAYCgouu7vwpRCKHspx9J_U-uSjcM,6671
mb/pandas/dfload.py,sha256=SnQwQNpdgwkXfRDT9caycpX72hvNGypLTDstYbKvwXc,4748
mb/pandas/profiler.py,sha256=Ad6-EgufMMBNKA9m6tytHmOs70A0XjWGKe4niiRq004,5880
mb/pandas/transform.py,sha256=C8-heFv5heZCJr-nBFwclRU7a0DdpKia-kjh3el3VYE,10804
mb/pandas/version.py,sha256=JJiVhzInvyc4XUc31yQR_q4UNm6rnom-oatXO_VvCfo,206
mb_base-2.0.1.dist-info/METADATA,sha256=cHkhMmzVkHid5E2h9M4HdIUlEqdT3m9cp1hsUeZFjwQ,250
mb_base-2.0.1.dist-info/WHEEL,sha256=YCfwYGOYMi5Jhw2fU4yNgwErybb2IX5PEwBKV4ZbdBo,91
mb_base-2.0.1.dist-info/top_level.txt,sha256=2T5lqIVZs7HUr0KqUMPEaPF59QXcr0ErDy6K-J88ycM,3
mb_base-2.0.1.dist-info/RECORD,,
mb/version.py,sha256=hksoK0MZZ7EqfudPmJnZVHd8LwRjL2FDUTE1W8-iGD4,206
mb/numpy/__init__.py,sha256=paJ-HHrMmhCCBXA9GVOGoWUjB82J4sJ6bczyaL0xY2E,323
mb/pandas/__init__.py,sha256=aoUqtk-zSychmd4ALxg1-Vu8Ii3Hpska_nCy6shVEqQ,45
mb/plt/__init__.py,sha256=Ya4j8QIe5BCbMesMr1W8L9LP-FBy9LZbUow0XatJczA,54
mb/plt/emb_viz.py,sha256=dWGZepldJvFC2YNrkCSIKqc0tE-Sf8d8YJ0QjYbQkXw,10999
mb/plt/utils.py,sha256=eE48STXqeradeImdnL12E1B6isgqa31koP536zjfAgw,2845
mb_base-2.0.8.dist-info/METADATA,sha256=pKUAoMvGf2_Ku072Kf-FiEHI9WytTbwHQQ4RmrKqeQs,273
mb_base-2.0.8.dist-info/WHEEL,sha256=YCfwYGOYMi5Jhw2fU4yNgwErybb2IX5PEwBKV4ZbdBo,91
mb_base-2.0.8.dist-info/top_level.txt,sha256=2T5lqIVZs7HUr0KqUMPEaPF59QXcr0ErDy6K-J88ycM,3
mb_base-2.0.8.dist-info/RECORD,,
"""
Asynchronous I/O utilities.
This module provides utilities for handling asynchronous file operations and running
async functions in a synchronous context.
"""
from typing import Any, Dict, Optional, Callable, TypeVar, Coroutine, Union
import asyncio
import aiofiles
__all__ = ['srun', 'read_text']
T = TypeVar('T') # Generic type for function return value
def srun(async_func: Callable[..., Coroutine[Any, Any, T]],
*args: Any,
extra_context_var: Optional[Dict[str, Any]] = None,
show_progress: bool = False,
**kwargs: Any) -> T:
"""
Run an async function in a synchronous context.
This function allows running asynchronous functions in a synchronous way by
handling the async/await machinery internally.
Args:
async_func: The async function to run
*args: Positional arguments to pass to the function
extra_context_var: Additional context variables to pass to the function
show_progress: Whether to show a progress bar
**kwargs: Keyword arguments to pass to the function
Returns:
The result of the async function execution
"""
context_vars = extra_context_var or {}
async def _run_async():
try:
return await async_func(*args, context_vars=context_vars, **kwargs)
except Exception as e:
raise type(e)(f"Error in async function {async_func.__name__}: {str(e)}")
return asyncio.run(_run_async())
async def read_text(filepath: str,
size: Optional[int] = None,
context_vars: Optional[Dict[str, Any]] = None) -> str:
"""
Asynchronously read text from a file.
This function provides both async and sync file reading capabilities based on
the context_vars['async'] flag.
Args:
filepath: Path to the file to read
size: Number of bytes to read from the beginning of the file.
If None, reads the entire file.
context_vars: Dictionary of context variables. Must include 'async' key
to determine whether to use async or sync reading.
Returns:
str: Content read from the file
Example:
>>> # Async reading
>>> content = await read_text('file.txt', context_vars={'async': True})
>>> # Sync reading
>>> content = await read_text('file.txt', context_vars={'async': False})
"""
if not filepath:
raise ValueError("filepath cannot be empty")
context_vars = context_vars or {'async': True}
if 'async' not in context_vars:
raise KeyError("context_vars must contain 'async' key")
try:
if context_vars["async"]:
async with aiofiles.open(filepath, mode="rt") as f:
return await f.read(size)
else:
with open(filepath, mode="rt") as f:
return f.read(size)
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {filepath}")
except IOError as e:
raise IOError(f"Error reading file {filepath}: {str(e)}")
except Exception as e:
raise type(e)(f"Unexpected error reading file {filepath}: {str(e)}")
"""
Data conversion utilities for pandas DataFrames.
This module provides functions for converting data between different formats,
particularly focusing on string representations of complex data structures.
"""
from typing import Union, List, Optional, Any, Type
import ast
import pandas as pd
import json
from mb.utils.logging import logg
__all__ = ['convert_string_to_list', 'convert_string_to_dict', 'convert_string_to_type']
def convert_string_to_list(df: pd.DataFrame,
column: str,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Convert string representations of lists in a DataFrame column to actual lists.
Args:
df: Input DataFrame
column: Name of the column containing string representations of lists
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with the specified column converted to lists
Example:
>>> df = pd.DataFrame({'data': ['[1, 2, 3]', '[4, 5, 6]']})
>>> result = convert_string_to_list(df, 'data')
>>> print(result['data'].tolist())
[[1, 2, 3], [4, 5, 6]]
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if column not in df.columns:
raise KeyError(f"Column '{column}' not found in DataFrame")
# Create a copy to avoid modifying the original DataFrame
result = df.copy()
try:
logg.info(f"Converting column '{column}' from string to list", logger=logger)
# Convert strings to lists using ast.literal_eval
result[column] = result[column].apply(lambda x: (
ast.literal_eval(x) if isinstance(x, str) else
x if isinstance(x, list) else
[x] if pd.notnull(x) else []
))
# Validate that all values are lists
non_list_mask = ~result[column].apply(lambda x: isinstance(x, list))
if non_list_mask.any():
problematic_rows = result.index[non_list_mask].tolist()
raise ValueError(f"Conversion failed for rows: {problematic_rows}")
logg.info(f"Successfully converted {len(result)} rows", logger=logger)
return result
except Exception as e:
logg.error(f"Error converting column '{column}': {str(e)}", logger=logger)
raise
def convert_string_to_dict(df: pd.DataFrame,
column: str,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Convert string representations of dictionaries in a DataFrame column to actual dictionaries.
Args:
df: Input DataFrame
column: Name of the column containing string representations of dictionaries
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with the specified column converted to dictionaries
Example:
>>> df = pd.DataFrame({'data': ['{"a": 1}', '{"b": 2}']})
>>> result = convert_string_to_dict(df, 'data')
>>> print(result['data'].tolist())
[{'a': 1}, {'b': 2}]
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if column not in df.columns:
raise KeyError(f"Column '{column}' not found in DataFrame")
result = df.copy()
try:
logg.info(f"Converting column '{column}' from string to dict", logger=logger)
# Try both ast.literal_eval and json.loads for maximum compatibility
def safe_convert(x):
if isinstance(x, dict):
return x
if pd.isnull(x):
return {}
if not isinstance(x, str):
return {'value': x}
try:
return ast.literal_eval(x)
except (ValueError, SyntaxError):
try:
return json.loads(x)
except json.JSONDecodeError:
raise ValueError(f"Could not convert value: {x}")
result[column] = result[column].apply(safe_convert)
# Validate that all values are dictionaries
non_dict_mask = ~result[column].apply(lambda x: isinstance(x, dict))
if non_dict_mask.any():
problematic_rows = result.index[non_dict_mask].tolist()
raise ValueError(f"Conversion failed for rows: {problematic_rows}")
logg.info(f"Successfully converted {len(result)} rows", logger=logger)
return result
except Exception as e:
logg.error(f"Error converting column '{column}': {str(e)}", logger=logger)
raise
def convert_string_to_type(df: pd.DataFrame,
column: str,
target_type: Type,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Convert string values in a DataFrame column to a specified type.
Args:
df: Input DataFrame
column: Name of the column to convert
target_type: Type to convert values to (e.g., int, float, bool)
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with the specified column converted to target type
Example:
>>> df = pd.DataFrame({'numbers': ['1', '2', '3']})
>>> result = convert_string_to_type(df, 'numbers', int)
>>> print(result['numbers'].tolist())
[1, 2, 3]
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if column not in df.columns:
raise KeyError(f"Column '{column}' not found in DataFrame")
result = df.copy()
try:
logg.info(f"Converting column '{column}' to {target_type.__name__}", logger=logger)
def safe_convert(x):
if pd.isnull(x):
return None
if isinstance(x, target_type):
return x
try:
return target_type(x)
except (ValueError, TypeError):
raise ValueError(f"Could not convert value '{x}' to {target_type.__name__}")
result[column] = result[column].apply(safe_convert)
logg.info(f"Successfully converted {len(result)} rows", logger=logger)
return result
except Exception as e:
logg.error(f"Error converting column '{column}': {str(e)}", logger=logger)
raise Exception(f"Error converting column '{column}': {str(e)}")
"""
DataFrame loading module with async support.
This module provides utilities for loading pandas DataFrames from various file formats
using asynchronous I/O operations for improved performance.
"""
from typing import Optional, List, Union, Any
import pandas as pd
import asyncio
import io
from ast import literal_eval
from pyarrow.parquet import ParquetFile
from mb.utils.logging import logg
__all__ = ['load_any_df']
async def read_txt(filepath: str, size: Optional[int] = None) -> str:
"""
Asynchronously read text from a file.
Args:
filepath: Path to the file to read
size: Optional number of bytes to read
Returns:
str: Content of the file
"""
try:
with open(filepath, mode="rt") as f:
if size:
return f.read(size)
return f.read()
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {filepath}")
except IOError as e:
raise IOError(f"Error reading file {filepath}: {str(e)}")
async def load_df_async(filepath: str,
chunk_size: int = 1024) -> pd.DataFrame:
"""
Load a DataFrame asynchronously from CSV or Parquet file.
Args:
filepath: Path to the input file
chunk_size: Number of rows to read per chunk
Returns:
pd.DataFrame: Loaded DataFrame
"""
def process_csv(data: io.StringIO) -> pd.DataFrame:
dfs = []
chunk_iter = pd.read_csv(data, chunksize=1024)
for chunk in chunk_iter:
dfs.append(chunk)
return pd.concat(dfs, sort=False)
def process_parquet(data: str) -> pd.DataFrame:
try:
# Try standard parquet reading first
return pd.read_parquet(data)
except Exception:
# Fallback to pyarrow for problematic files
pf = ParquetFile(data)
table = pf.read()
return table.to_pandas()
try:
if filepath.endswith('.csv'):
data = await read_txt(filepath)
df = process_csv(io.StringIO(data))
elif filepath.endswith('.parquet'):
df = process_parquet(filepath)
else:
raise ValueError(f"Unsupported file format: {filepath}")
return df
except Exception as e:
raise ValueError(f"Error loading file {filepath}: {str(e)}")
def load_any_df(file_path: Union[str, pd.DataFrame],
literal_ast_columns: Optional[List[str]] = None,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Load a DataFrame from various sources with support for type conversion.
This function can load from CSV files, Parquet files, or accept an existing DataFrame.
It supports asynchronous loading for better performance and can convert specified
columns using ast.literal_eval.
Args:
file_path: Path to the file or an existing DataFrame
show_progress: Whether to show a progress bar during loading
literal_ast_columns: List of column names to convert using ast.literal_eval
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: Loaded and processed DataFrame
"""
# Handle DataFrame input
if isinstance(file_path, pd.DataFrame):
return file_path
if not isinstance(file_path, str):
raise TypeError("file_path must be a string path or pandas DataFrame")
try:
# Load the DataFrame
logg.info(f"Loading DataFrame from {file_path}",logger=logger)
df = asyncio.run(load_df_async(file_path))
# Remove unnamed columns
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
# Convert specified columns using literal_eval
if literal_ast_columns:
for col in literal_ast_columns:
if col not in df.columns:
raise KeyError(f"Column '{col}' not found in DataFrame")
logg.info(f"Converting column '{col}' using literal_eval",logger=logger)
try:
df[col] = df[col].apply(literal_eval)
except (ValueError, SyntaxError) as e:
logg.error(f"Error converting column '{col}': {str(e)}",logger=logger)
raise ValueError(f"Error converting column '{col}' using literal_eval: {str(e)}")
logg.info(f"Successfully loaded DataFrame with shape {df.shape}",logger=logger)
return df
except Exception as e:
logg.error(f"Error loading file: {str(e)}",logger=logger)
raise ValueError(f"Error loading file: {str(e)}")
"""
DataFrame profiling utilities.
This module provides functions for generating detailed profiling reports and comparisons
of pandas DataFrames using pandas-profiling.
"""
from typing import Union, List, Optional, Any
import pandas as pd
from pathlib import Path
from mb.utils.logging import logg
__all__ = ['create_profile', 'profile_compare']
def create_profile(df: pd.DataFrame,
profile_name: Union[str, Path] = './pandas_profiling_report.html',
minimal: bool = False,
target: List[str] = None,
sample_size: int = 100000,
logger: Optional[Any] = None) -> None:
"""
Create a pandas profiling report for a DataFrame.
Args:
df: Input DataFrame to profile
profile_name: Output path for the HTML report
minimal: If True, creates a minimal report for better performance
target: List of target columns for correlation analysis
sample_size: Maximum number of rows to process
logger: Optional logger instance for logging operations
Example:
>>> df = pd.DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c']})
>>> create_profile(df, 'report.html', target=['A'])
"""
try:
from pandas_profiling import ProfileReport
except ImportError:
raise ImportError(
"pandas-profiling is required for this function. "
"Install it with: pip install pandas-profiling"
)
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
# Validate and process target columns
target = target or []
if target:
missing_cols = set(target) - set(df.columns)
if missing_cols:
raise ValueError(f"Target columns not found in DataFrame: {missing_cols}")
# Sample large DataFrames
original_size = len(df)
if original_size > sample_size:
logg.warning(f'DataFrame size ({original_size}) exceeds limit ({sample_size})',logger=logger)
logg.info(f'Sampling {sample_size} rows',logger=logger)
df = df.sample(n=sample_size, random_state=42)
minimal = True
try:
logg.info('Generating profile report',logger=logger)
if minimal:
logg.info('Using minimal configuration for better performance',logger=logger)
# Configure profile report
profile = ProfileReport(
df,
title='Pandas Profiling Report',
minimal=minimal,
html={'style': {'full_width': True}},
progress_bar=bool(logger), # Show progress bar if logger is provided
explorative=not minimal
)
# Set target columns for correlation analysis
if target:
logg.info(f'Analyzing correlations for target columns: {target}',logger=logger)
profile.config.interactions.targets = target
# Save report
profile_path = Path(profile_name)
profile.to_file(output_file=profile_path)
logg.info(f'Profile report saved to: {profile_path.absolute()}',logger=logger)
except Exception as e:
logg.error(f'Error generating profile report: {str(e)}',logger=logger)
raise
def profile_compare(df1: pd.DataFrame,
df2: pd.DataFrame,
profile_name: Union[str, Path] = './pandas_compare_report.html',
sample_size: int = 100000,
logger: Optional[Any] = None) -> None:
"""
Create a comparison report between two DataFrames.
Args:
df1: First DataFrame to compare
df2: Second DataFrame to compare
profile_name: Output path for the HTML comparison report
sample_size: Maximum number of rows to process per DataFrame
logger: Optional logger instance for logging operations
Raises:
ImportError: If pandas-profiling is not installed
TypeError: If inputs are not pandas DataFrames
Example:
>>> df1 = pd.DataFrame({'A': [1, 2, 3]})
>>> df2 = pd.DataFrame({'A': [1, 2, 4]})
>>> profile_compare(df1, df2, 'comparison.html')
"""
try:
from pandas_profiling import ProfileReport
except ImportError:
raise ImportError(
"pandas-profiling is required for this function. "
"Install it with: pip install pandas-profiling"
)
if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
raise TypeError("Both inputs must be pandas DataFrames")
try:
logg.info('Generating comparison report',logger=logger)
logg.info(f'DataFrame 1 shape: {df1.shape}',logger=logger)
logg.info(f'DataFrame 2 shape: {df2.shape}',logger=logger)
# Generate profiles for both DataFrames
profile1 = ProfileReport(
df1.sample(n=min(len(df1), sample_size), random_state=42) if len(df1) > sample_size else df1,
title="DataFrame 1",
minimal=True,
progress_bar=bool(logger)
)
profile2 = ProfileReport(
df2.sample(n=min(len(df2), sample_size), random_state=42) if len(df2) > sample_size else df2,
title="DataFrame 2",
minimal=True,
progress_bar=bool(logger)
)
# Generate comparison report
logg.info('Comparing profiles',logger=logger)
comparison = profile1.compare(profile2)
comparison_path = Path(profile_name)
comparison.to_file(comparison_path)
logg.info(f'Comparison report saved to: {comparison_path.absolute()}',logger=logger)
except Exception as e:
logg.error(f'Error generating comparison report: {str(e)}',logger=logger)
raise
"""
Transform module for pandas DataFrame operations.
This module provides utilities for DataFrame transformations, merging, and data validation.
"""
from typing import Union, List, Optional, Any
import pandas as pd
import numpy as np
import cv2
from .dfload import load_any_df
from mb.utils.logging import logg
__all__ = ['check_null', 'remove_unnamed', 'rename_columns', 'check_drop_duplicates', 'get_dftype', 'merge_chunk', 'merge_dask']
def merge_chunk(df1: pd.DataFrame,
df2: pd.DataFrame,
chunksize: int = 10000,
logger: Optional[Any] = None,
**kwargs) -> pd.DataFrame:
"""
Merge two DataFrames in chunks to handle large datasets efficiently.
Args:
df1: First DataFrame
df2: Second DataFrame
chunksize: Number of rows per chunk for processing
logger: Optional logger instance for logging operations
**kwargs: Additional arguments passed to pd.merge
Returns:
pd.DataFrame: Merged DataFrame
"""
if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
raise TypeError("Both inputs must be pandas DataFrames")
# Optimize by using smaller DataFrame as base
if df1.shape[0] > df2.shape[0]:
df1, df2 = df2, df1
merge_on = set(df1.columns) & set(df2.columns)
if not merge_on:
raise ValueError("No common columns to merge on")
# Create chunks
list_df = [df2[i:i+chunksize] for i in range(0, df2.shape[0], chunksize)]
logg.info(f'Size of chunk: {chunksize}',logger=logger)
logg.info(f'Number of chunks: {len(list_df)}',logger=logger)
# Process first chunk
result = pd.merge(df1, list_df[0], **kwargs)
# Process remaining chunks
for chunk in list_df[1:]:
merged_chunk = pd.merge(df1, chunk, **kwargs)
result = pd.concat([result, merged_chunk], ignore_index=True)
return result
def merge_dask(df1: pd.DataFrame,
df2: pd.DataFrame,
logger: Optional[Any] = None,
**kwargs) -> pd.DataFrame:
"""
Merge two DataFrames using Dask for improved performance with large datasets.
Args:
df1: First DataFrame
df2: Second DataFrame
logger: Optional logger instance for logging operations
**kwargs: Additional arguments passed to dd.merge
Returns:
pd.DataFrame: Merged DataFrame
"""
try:
import dask.dataframe as dd
except ImportError:
raise ImportError("dask is required for this function. Install it with: pip install dask")
if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
raise TypeError("Both inputs must be pandas DataFrames")
logg.info('Converting pandas DataFrames to Dask DataFrames',logger=logger)
# Optimize number of partitions based on DataFrame size
npartitions = max(2, min(32, df1.shape[0] // 100000))
ddf1 = dd.from_pandas(df1, npartitions=npartitions)
ddf2 = dd.from_pandas(df2, npartitions=npartitions)
merged_ddf = dd.merge(ddf1, ddf2, **kwargs)
logg.info('Computing merge operation',logger=logger)
merged_df = merged_ddf.compute()
logg.info('Merged DataFrame and converted back to pandas DataFrame',logger=logger)
return merged_df
def check_null(file_path: str,
fillna: bool = False,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Check and optionally handle null values in a DataFrame.
Args:
file_path: Path to the input file
fillna: If True, fills null values based on column type
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: Processed DataFrame
"""
logg.info(f'Loading file: {file_path}',logger=logger)
df = load_any_df(file_path)
logg.info(f'File shape: {df.shape}',logger=logger)
logg.info(f'File columns: {list(df.columns)}',logger=logger)
logg.info('Checking Null values',logger=logger)
for column in df.columns:
null_mask = df[column].isnull()
null_count = null_mask.sum()
if null_count > 0:
logg.warning(f'Column {column} has {null_count} null values',logger=logger)
if fillna:
if pd.api.types.is_numeric_dtype(df[column]):
fill_value = 0 if pd.api.types.is_integer_dtype(df[column]) else 0.0
logg.info(f'Filling null values with {fill_value}',logger=logger)
df[column].fillna(fill_value, inplace=True)
else:
logg.info(f'Skipping non-numeric column {column}',logger=logger)
return df
def remove_unnamed(df: pd.DataFrame,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Remove unnamed columns from DataFrame.
Args:
df: Input DataFrame
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with unnamed columns removed
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
logg.info('Removing unnamed columns',logger=logger)
unnamed_pattern = '^Unnamed'
unnamed_cols = df.columns[df.columns.str.contains(unnamed_pattern)].tolist()
if unnamed_cols:
df = df.drop(columns=unnamed_cols)
logg.info(f'Removed columns: {unnamed_cols}',logger=logger)
return df
def rename_columns(df: pd.DataFrame,
new_column: str,
old_column: str,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Rename DataFrame columns.
Args:
df: Input DataFrame
new_column: New column name
old_column: Old column name to be renamed
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with renamed columns
Raises:
KeyError: If old_column doesn't exist in DataFrame
TypeError: If input is not a pandas DataFrame
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if old_column not in df.columns:
logg.error(f'Column {old_column} not in DataFrame',logger=logger)
raise KeyError(f'Column {old_column} not in DataFrame')
df = df.rename(columns={old_column: new_column})
logg.info(f'Column {old_column} renamed to {new_column}',logger=logger)
return df
def check_drop_duplicates(df: pd.DataFrame,
columns: Union[str, List[str]],
drop: bool = False,
logger: Optional[Any] = None) -> pd.DataFrame:
"""
Check and optionally remove duplicate rows based on specified columns.
Args:
df: Input DataFrame
columns: Column or list of columns to check for duplicates
drop: If True, removes duplicate rows
logger: Optional logger instance for logging operations
Returns:
pd.DataFrame: DataFrame with duplicates optionally removed
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if isinstance(columns, str):
columns = [columns]
# Validate columns exist
missing_cols = set(columns) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
logg.info(f'Checking duplicates for columns: {columns}',logger=logger)
duplicates = df[df.duplicated(subset=columns, keep=False)]
duplicate_count = len(duplicates)
if duplicate_count > 0:
logg.warning(f'Found {duplicate_count} duplicate rows',logger=logger)
if drop:
logg.info('Removing duplicates',logger=logger)
df = df.drop_duplicates(subset=columns)
logg.info(f'Removed {duplicate_count} duplicate rows',logger=logger)
else:
logg.info('Duplicate removal not requested (drop=False)',logger=logger)
else:
logg.info('No duplicates found',logger=logger)
return df
def get_dftype(s: pd.Series) -> str:
"""
Detect the data type of a pandas Series.
This function determines whether a series contains special types like ndarrays,
sparse arrays, images, or standard types like strings and timestamps.
Args:
s: Input pandas Series
Returns:
str: Detected type as string. Possible values include:
- 'json': For lists and dictionaries
- 'ndarray': For numpy arrays
- 'SparseNdarray': For sparse numpy arrays
- 'Image': For OpenCV images
- 'str': For string data
- 'Timestamp': For datetime data
- 'Timedelta': For time duration data
- 'object': For mixed or unknown types
- 'none': For empty series
- Other pandas dtypes as strings
"""
if not isinstance(s, pd.Series):
raise TypeError("Input must be a pandas Series")
if len(s) == 0:
return 'object'
# Fast path for simple types
if pd.api.types.is_numeric_dtype(s.dtype):
return str(s.dtype)
# Check first non-null value for type inference
first_valid = s.first_valid_index()
if first_valid is not None:
val = s[first_valid]
if isinstance(val, str):
return 'str' if s.apply(lambda x: isinstance(x, str)).all() else 'object'
elif isinstance(val, (list, dict)):
return 'json' if s.apply(lambda x: isinstance(x, (list, dict))).all() else 'object'
elif isinstance(val, np.ndarray):
return 'ndarray' if s.apply(lambda x: isinstance(x, np.ndarray)).all() else 'object'
elif isinstance(val, np.SparseNdarray):
return 'SparseNdarray' if s.apply(lambda x: isinstance(x, np.SparseNdarray)).all() else 'object'
elif isinstance(val, cv2.Image):
return 'Image' if s.apply(lambda x: isinstance(x, cv2.Image)).all() else 'object'
elif isinstance(val, pd.Timestamp):
return 'Timestamp' if s.apply(lambda x: isinstance(x, pd.Timestamp)).all() else 'object'
elif isinstance(val, pd.Timedelta):
return 'Timedelta' if s.apply(lambda x: isinstance(x, pd.Timedelta)).all() else 'object'
# Check if all values are numeric
try:
pd.to_numeric(s, errors='raise')
return 'float64'
except (ValueError, TypeError):
pass
return 'object'
MAJOR_VERSION = 2
MINOR_VERSION = 0
PATCH_VERSION = 1
version = '{}.{}.{}'.format(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION)
__all__ = ['MAJOR_VERSION', 'MINOR_VERSION', 'PATCH_VERSION', 'version']