mb_base
Advanced tools
| from numpy import * | ||
| from numpy import __version__ | ||
| import numpy as _np | ||
| ndarray = _np.ndarray | ||
| import numpy.core as core | ||
| from numpy.core import round | ||
| import numpy.lib as lib | ||
| import numpy.polynomial as polynomial | ||
| import numpy.linalg as linag | ||
| import numpy.fft as fft | ||
| import numpy.matlib as matlib | ||
| import numpy.random as random |
| from matplotlib.pyplot import * | ||
| from .emb_viz import * |
| ## file to view pca / umap / tsne embeddings in 2d or 3d with tf projector and plotly | ||
| from mb import pandas as pd | ||
| from sklearn.decomposition import PCA | ||
| from sklearn.manifold import TSNE | ||
| from sklearn.preprocessing import LabelEncoder | ||
| from matplotlib import pyplot as plt | ||
| import os | ||
| import numpy as np | ||
| from mb.utils.logging import logg | ||
| __all__ = ['get_emb','viz_emb','generate_sprite_images'] | ||
| def get_emb(df: pd.DataFrame, emb= 'embeddings', emb_type='umap', dim=2,keep_original_emb=False,file_save=None, logger=None,**kwargs): | ||
| """ | ||
| Visualize embeddings in 2d or 3d with tf projector and plotly | ||
| Args: | ||
| df (pd.DataFrame): dataframe containing embeddings. File location or DataFrame object. | ||
| emb (str): name of embedding column | ||
| emb_type (str, optional): embedding type. Defaults to 'umap'. | ||
| dim (int, optional): embedding dimension. Defaults to 2. | ||
| keep_original_emb (bool, optional): keep original embedding column. Defaults to False. | ||
| file_save (str, optional): file location to save embeddings csv. Defaults to None. | ||
| Output: | ||
| df (pd.DataFrame): dataframe containing embeddings. Original embedding column is dropped. | ||
| """ | ||
| if type(df) is not pd.DataFrame: | ||
| logg.info('Type of df :{}'.format(str(type(df))),logger=logger) | ||
| df = pd.load_any_df(df) | ||
| logg.info('Loaded dataframe from path {}'.format(str(df)),logger=logger) | ||
| logg.info('Data shape {}'.format(str(df.shape)),logger=logger) | ||
| logg.info('Data columns {}'.format(str(df.columns)),logger=logger) | ||
| logg.info('Performing {} on {} embeddings'.format(emb_type,emb),logger=logger) | ||
| if emb_type=='pca': | ||
| pca = PCA(n_components=dim) | ||
| pca_emb = pca.fit_transform(list(df[emb])) | ||
| logg.info('First PCA transform result : {}'.format(str(pca_emb[0])),logger=logger) | ||
| temp_res = list(pca_emb) | ||
| if emb_type=='tsne': | ||
| tsne = TSNE(n_components=dim, verbose=1, perplexity=30, n_iter=250, **kwargs) | ||
| df[emb] = df[emb].apply(lambda x: np.array(x)) | ||
| k1 = np.vstack(df[emb]) | ||
| tsne_emb = tsne.fit_transform(k1) | ||
| logg.info('First TSNE transform result : {}'.format(str(tsne_emb[0])),logger=logger) | ||
| temp_res = list(tsne_emb) | ||
| if emb_type=='umap': | ||
| try: | ||
| import umap | ||
| except ImportError: | ||
| logg.info('umap not installed, installing umap',logger=logger) | ||
| os.system('pip install umap-learn') | ||
| import umap | ||
| umap_emb = umap.UMAP(n_components=2,**kwargs).fit_transform(list(df[emb])) | ||
| logg.info('First UMAP transform result : {}'.format(str(umap_emb[0])),logger=logger) | ||
| temp_res = list(umap_emb) | ||
| df['emb_res'] = temp_res | ||
| if keep_original_emb==False: | ||
| df.drop(emb,axis=1,inplace=True) | ||
| logg.info('Dropped original embedding column',logger=logger) | ||
| if file_save: | ||
| df.to_csv(file_save + '/emb_res.csv',index=False) | ||
| else: | ||
| df.to_csv('./emb_res.csv',index=False) | ||
| return df | ||
| def viz_emb(df: pd.DataFrame, emb_column='emb_res' , target_column='taxcode', viz_type ='plt',dash_viz=False,limit = None,image_tb=None , file_save=None, | ||
| dont_viz=False, logger=None): | ||
| """ | ||
| Vizualize embeddings in 2d or 3d with tf projector and plotly | ||
| Args: | ||
| df (pd.DataFrame): dataframe containing embeddings. File location or DataFrame object. | ||
| emb_column (str): name of embedding column | ||
| target_column (str): name of target column. It can be used to color the embeddings. Defaults to 'taxcode'. Can be None too. | ||
| viz_type (str, optional): visualization type: 'plt','pe' or 'tf'. Defaults to 'plt'. | ||
| dash_viz (bool, optional): if True, then it will create a dash app. Defaults to False. | ||
| limit (int, optional): limit number of data points to visualize. Takes random samples. Defaults to None. | ||
| image_tb (str, optional): image location column to be used in tensorboard projector if want to create with images. Defaults to None. | ||
| file_save (str, optional): file location to save plot. If viz_type='tf', then it wont be saved. Defaults to None. | ||
| dont_viz (bool, optional): if True, then it wont visualize. Defaults to False. | ||
| logger (logger, optional): logger object. Defaults to None. | ||
| Output: | ||
| None | ||
| """ | ||
| if type(df) != pd.DataFrame: | ||
| logg.info('Type of df :{}'.format(str(type(df))),logger=logger) | ||
| df = pd.load_any_df(df) | ||
| if limit: | ||
| df = df.sample(limit) | ||
| assert emb_column in df.columns, f'Embedding column not found in dataframe: {df.columns}' | ||
| emb_data = np.concatenate(np.array(df[emb_column])) | ||
| emb_data = emb_data.reshape(-1,2) #change this for 3d | ||
| logg.info('Embedding data shape {}'.format(str(emb_data.shape)),logger=logger) | ||
| if target_column: | ||
| target_data = list(df[target_column]) | ||
| if type(target_data[0]) == str: | ||
| target_data = LabelEncoder().fit_transform(target_data) | ||
| assert target_column==None or target_column in df.columns, f'Target column not found in dataframe : {df.columns}' | ||
| if file_save == None: | ||
| file_save = './emb_plot.png' | ||
| # Visualize the embeddings using a scatter plot | ||
| if viz_type=='plt' and target_column: | ||
| plt.scatter(emb_data[:, 0], emb_data[:, 1], c=target_data, cmap='viridis') | ||
| #plt.legend() | ||
| if dont_viz==False: | ||
| plt.show() | ||
| if file_save: | ||
| plt.savefig(file_save+'/emb_plot.png') | ||
| elif viz_type=='plt' and target_column==None: | ||
| plt.scatter(emb_data[:, 0], emb_data[:, 1]) | ||
| #plt.legend() | ||
| if dont_viz==False: | ||
| plt.show() | ||
| if file_save: | ||
| plt.savefig(file_save+'/emb_plot.png') | ||
| elif viz_type=='pe' and target_column: | ||
| #importing plotly and dash | ||
| import plotly.express as px | ||
| if dash_viz: | ||
| code = """ | ||
| import dash | ||
| import dash_core_components as dcc | ||
| import dash_html_components as html | ||
| from dash.dependencies import Input, Output | ||
| import plotly.express as px | ||
| import pandas as pd | ||
| import numpy as np | ||
| # Create the Dash app | ||
| app = dash.Dash(__name__) | ||
| # Define the layout of the app | ||
| df=pd.read_csv('./emb_res.csv') | ||
| df['emb_res_np'] = df['emb_res'].apply(lambda x:np.fromstring(x[1:-1],sep=' ')) | ||
| #emb_data = np.concatenate(np.array(df['emb_res_np'])) | ||
| emb_data = np.array(df['emb_res_np'].tolist()) # Convert to 2D array | ||
| app.layout = html.Div([ | ||
| dcc.Graph( | ||
| id='scatter-plot', | ||
| figure=px.scatter(df , x=emb_data[:, 0], y=emb_data[:, 1], color=df['menu_code'], color_continuous_scale = 'rainbow'), | ||
| config={'staticPlot': False}), | ||
| html.Div([html.Img(id='selected-image', style={'width': '50%'}), | ||
| html.Div(id='hover-data-output')])]) | ||
| # Define a callback function for updating the hover data | ||
| @app.callback([Output('hover-data-output', 'children'),Output('selected-image', 'src')], | ||
| [Input('scatter-plot', 'hoverData')]) | ||
| def display_hover_data(hover_data): | ||
| if hover_data is None: | ||
| return ("Hover over a point to see data") | ||
| # Extract data from hover_data | ||
| point_index = hover_data['points'][0]['pointIndex'] | ||
| target_value = df.iloc[point_index]['event_id'] | ||
| image_url = df.iloc[point_index]['after_image_url'] | ||
| return f"Hovered over point {target_value}. Image URL: {image_url}",image_url | ||
| # Run the app in the notebook | ||
| if __name__ == '__main__': | ||
| app.run_server(mode='inline', port=8927,host='0.0.0.0')""" | ||
| with open(file_save + '/dash_app.py', 'w') as f: | ||
| f.write(code) | ||
| else: | ||
| fig = px.scatter(x=emb_data[:, 0], y=emb_data[:, 1], color=target_data, color_continuous_scale = 'rainbow', | ||
| title = f"Similarity to data visualised using dim reduction") | ||
| fig.update_layout(width = 650,height = 650) | ||
| if dont_viz==False: | ||
| fig.show() | ||
| if file_save: | ||
| fig.write_html(file_save+'/emb_plot.html',full_html=True) | ||
| elif viz_type=='tf' and target_column: | ||
| ##check from here | ||
| log_dir = './tp_logs' | ||
| if not os.path.exists(log_dir): | ||
| os.makedirs(log_dir) | ||
| emb_data = np.array(emb_data) | ||
| loc_emb_data = os.path.join(log_dir,'emb_data_tf.tsv') | ||
| np.savetxt(loc_emb_data, emb_data, delimiter='\t') | ||
| target_data = np.array(target_data) | ||
| loc_target_data = os.path.join(log_dir,'labels_tf.tsv') | ||
| np.savetxt(loc_target_data,target_data,delimiter='\t') | ||
| if image_tb is not None: | ||
| loc_sprite_image = os.path.join(log_dir,'sprite_image.png') | ||
| generate_sprite_images(df[image_tb], file_save=loc_sprite_image, img_size=28 ,logger=logger) | ||
| from tensorboard.plugins import projector | ||
| config = projector.ProjectorConfig() | ||
| embedding = config.embeddings.add() | ||
| embedding.tensor_path = loc_emb_data | ||
| embedding.metadata_path = loc_target_data | ||
| if image_tb is not None: | ||
| embedding.sprite.image_path = loc_sprite_image | ||
| embedding.sprite.single_image_dim.extend([32, 32]) | ||
| with open(os.path.join(log_dir, 'projector_config.pbtxt'), 'w') as f: | ||
| f.write(str(config)) | ||
| logg.info('Run tensorboard --logdir={} to view embeddings'.format(log_dir),logger=logger) | ||
| logg.info('if on jupyter notebook, run below code to view embeddings in notebook',logger=logger) | ||
| logg.info('%load_ext tensorboard',logger=logger) | ||
| logg.info('%tensorboard --logdir={}'.format(log_dir),logger=logger) | ||
| def generate_sprite_images(img_paths, file_save=None, img_size= 28 ,logger=None): | ||
| """ | ||
| Create a sprite image consisting of images | ||
| Args: | ||
| img_paths (list or pd.DataFrame): list of image paths | ||
| file_save (str, optional): file location to save sprite image. Defaults to None. Will save in current directory. | ||
| img_size (int, optional): image size. Defaults to 28. | ||
| logger (logger, optional): logger object. Defaults to None. | ||
| Output: | ||
| sprite_image (np.array): sprite image | ||
| """ | ||
| import tensorflow as tf | ||
| if type(img_paths) is not list: | ||
| img_paths = list(img_paths) | ||
| #create sprite image | ||
| images = [tf.io.read_file(img_path) for img_path in img_paths] | ||
| images = [tf.image.decode_image(img) for img in images] | ||
| images = [tf.image.resize(img, (img_size, img_size)) for img in images] | ||
| images = [img.numpy() for img in images] | ||
| sprite_image = np.concatenate(images, axis=1) | ||
| if file_save: | ||
| np.save(file_save,sprite_image) | ||
| tf.keras.utils.save_img(file_save,sprite_image) | ||
| else: | ||
| np.save('./sprite_image',sprite_image) | ||
| tf.keras.utils.save_img('./sprite_image.png',sprite_image) | ||
| return sprite_image | ||
| import matplotlib.pyplot as plt | ||
| import numpy as np | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| __all__ = ['dynamic_plt'] | ||
| def dynamic_plt(imgs: list,labels: list =None, bboxes: list =None , | ||
| bboxes_label: list = None,num_cols: int = 2, figsize=(16, 12), | ||
| return_fig: bool = False, show: bool = True, save_path: str = None, | ||
| max_workers: int = 4): | ||
| """ | ||
| Create dynamic plots based on the number of images and desired columns | ||
| Args: | ||
| imgs: List of images or paths to images | ||
| labels: List of labels corresponding to the images (default: None) | ||
| bboxes: List of bounding boxes corresponding to the images (default: None) | ||
| bboxes_label : List of labels corresponding to the bounding boxes (default: None) (Fontsize=12) | ||
| num_cols: Number of columns for the subplot grid (default: 2) | ||
| figsize: Size of the figure (default: (16, 12)) | ||
| return_fig: Return the figure object (default: False) | ||
| show: Show the plot (default: True) | ||
| save_path: Path to save the plot (default: None) | ||
| max_workers: Maximum number of threads to use for loading images (default: 4) | ||
| Return: | ||
| None | ||
| """ | ||
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
| imgs = list(executor.map(lambda x: plt.imread(x) if isinstance(x, str) else x, imgs)) | ||
| num_images = len(imgs) | ||
| num_rows = int(np.ceil(num_images / num_cols)) | ||
| fig, axes = plt.subplots(num_rows, num_cols, figsize=figsize) | ||
| # Ensure axes is always 2D | ||
| if num_rows == 1: | ||
| axes = axes.reshape(1, -1) | ||
| for i, img in enumerate(imgs): | ||
| row = i // num_cols | ||
| col = i % num_cols | ||
| ax = axes[row, col] | ||
| if img.shape[0]==3: | ||
| img = np.moveaxis(img, 0, -1) | ||
| ax.imshow(img) | ||
| ax.axis('off') | ||
| if labels: | ||
| ax.set_title(str(labels[i])) | ||
| if bboxes: | ||
| img_bboxes = bboxes[i] | ||
| if bboxes_label: | ||
| img_labels = bboxes_label[i] | ||
| else: | ||
| img_labels = None | ||
| for bbox_val, bbox in enumerate(img_bboxes): | ||
| rect = plt.Rectangle((bbox[0], bbox[1]), bbox[2], bbox[3], | ||
| fill=False, edgecolor='red', linewidth=2) | ||
| ax.add_patch(rect) | ||
| if img_labels: | ||
| try: | ||
| ax.text(bbox[0], bbox[1], img_labels[bbox_val], color='red', fontsize=12) | ||
| except: | ||
| pass | ||
| # Remove any unused subplots | ||
| for j in range(num_images, num_rows * num_cols): | ||
| row = j // num_cols | ||
| col = j % num_cols | ||
| fig.delaxes(axes[row, col]) | ||
| plt.tight_layout() | ||
| if save_path: | ||
| plt.savefig(save_path) | ||
| if show: | ||
| plt.show() | ||
| if return_fig: | ||
| return fig |
| MAJOR_VERSION = 2 | ||
| MINOR_VERSION = 0 | ||
| PATCH_VERSION = 8 | ||
| version = '{}.{}.{}'.format(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION) | ||
| __all__ = ['MAJOR_VERSION', 'MINOR_VERSION', 'PATCH_VERSION', 'version'] |
+3
-2
| Metadata-Version: 2.4 | ||
| Name: mb_base | ||
| Version: 2.0.1 | ||
| Summary: Pandas Package for mb.* packages | ||
| Version: 2.0.8 | ||
| Summary: Meta Package for mb_* packages | ||
| Author: ['Malav Bateriwala'] | ||
| Requires-Python: >=3.8 | ||
| Requires-Dist: mb_utils | ||
| Requires-Dist: mb_pandas | ||
| Dynamic: author | ||
@@ -9,0 +10,0 @@ Dynamic: requires-dist |
+10
-11
@@ -1,11 +0,10 @@ | ||
| mb/pandas/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 | ||
| mb/pandas/aio.py,sha256=p50zewrROjznbAURUNOSGKHOvvA-lg7I3ifh3vEUBlM,3256 | ||
| mb/pandas/convert_data.py,sha256=YIlMT7L64ZdVaHwAYCgouu7vwpRCKHspx9J_U-uSjcM,6671 | ||
| mb/pandas/dfload.py,sha256=SnQwQNpdgwkXfRDT9caycpX72hvNGypLTDstYbKvwXc,4748 | ||
| mb/pandas/profiler.py,sha256=Ad6-EgufMMBNKA9m6tytHmOs70A0XjWGKe4niiRq004,5880 | ||
| mb/pandas/transform.py,sha256=C8-heFv5heZCJr-nBFwclRU7a0DdpKia-kjh3el3VYE,10804 | ||
| mb/pandas/version.py,sha256=JJiVhzInvyc4XUc31yQR_q4UNm6rnom-oatXO_VvCfo,206 | ||
| mb_base-2.0.1.dist-info/METADATA,sha256=cHkhMmzVkHid5E2h9M4HdIUlEqdT3m9cp1hsUeZFjwQ,250 | ||
| mb_base-2.0.1.dist-info/WHEEL,sha256=YCfwYGOYMi5Jhw2fU4yNgwErybb2IX5PEwBKV4ZbdBo,91 | ||
| mb_base-2.0.1.dist-info/top_level.txt,sha256=2T5lqIVZs7HUr0KqUMPEaPF59QXcr0ErDy6K-J88ycM,3 | ||
| mb_base-2.0.1.dist-info/RECORD,, | ||
| mb/version.py,sha256=hksoK0MZZ7EqfudPmJnZVHd8LwRjL2FDUTE1W8-iGD4,206 | ||
| mb/numpy/__init__.py,sha256=paJ-HHrMmhCCBXA9GVOGoWUjB82J4sJ6bczyaL0xY2E,323 | ||
| mb/pandas/__init__.py,sha256=aoUqtk-zSychmd4ALxg1-Vu8Ii3Hpska_nCy6shVEqQ,45 | ||
| mb/plt/__init__.py,sha256=Ya4j8QIe5BCbMesMr1W8L9LP-FBy9LZbUow0XatJczA,54 | ||
| mb/plt/emb_viz.py,sha256=dWGZepldJvFC2YNrkCSIKqc0tE-Sf8d8YJ0QjYbQkXw,10999 | ||
| mb/plt/utils.py,sha256=eE48STXqeradeImdnL12E1B6isgqa31koP536zjfAgw,2845 | ||
| mb_base-2.0.8.dist-info/METADATA,sha256=pKUAoMvGf2_Ku072Kf-FiEHI9WytTbwHQQ4RmrKqeQs,273 | ||
| mb_base-2.0.8.dist-info/WHEEL,sha256=YCfwYGOYMi5Jhw2fU4yNgwErybb2IX5PEwBKV4ZbdBo,91 | ||
| mb_base-2.0.8.dist-info/top_level.txt,sha256=2T5lqIVZs7HUr0KqUMPEaPF59QXcr0ErDy6K-J88ycM,3 | ||
| mb_base-2.0.8.dist-info/RECORD,, |
| """ | ||
| Asynchronous I/O utilities. | ||
| This module provides utilities for handling asynchronous file operations and running | ||
| async functions in a synchronous context. | ||
| """ | ||
| from typing import Any, Dict, Optional, Callable, TypeVar, Coroutine, Union | ||
| import asyncio | ||
| import aiofiles | ||
| __all__ = ['srun', 'read_text'] | ||
| T = TypeVar('T') # Generic type for function return value | ||
| def srun(async_func: Callable[..., Coroutine[Any, Any, T]], | ||
| *args: Any, | ||
| extra_context_var: Optional[Dict[str, Any]] = None, | ||
| show_progress: bool = False, | ||
| **kwargs: Any) -> T: | ||
| """ | ||
| Run an async function in a synchronous context. | ||
| This function allows running asynchronous functions in a synchronous way by | ||
| handling the async/await machinery internally. | ||
| Args: | ||
| async_func: The async function to run | ||
| *args: Positional arguments to pass to the function | ||
| extra_context_var: Additional context variables to pass to the function | ||
| show_progress: Whether to show a progress bar | ||
| **kwargs: Keyword arguments to pass to the function | ||
| Returns: | ||
| The result of the async function execution | ||
| """ | ||
| context_vars = extra_context_var or {} | ||
| async def _run_async(): | ||
| try: | ||
| return await async_func(*args, context_vars=context_vars, **kwargs) | ||
| except Exception as e: | ||
| raise type(e)(f"Error in async function {async_func.__name__}: {str(e)}") | ||
| return asyncio.run(_run_async()) | ||
| async def read_text(filepath: str, | ||
| size: Optional[int] = None, | ||
| context_vars: Optional[Dict[str, Any]] = None) -> str: | ||
| """ | ||
| Asynchronously read text from a file. | ||
| This function provides both async and sync file reading capabilities based on | ||
| the context_vars['async'] flag. | ||
| Args: | ||
| filepath: Path to the file to read | ||
| size: Number of bytes to read from the beginning of the file. | ||
| If None, reads the entire file. | ||
| context_vars: Dictionary of context variables. Must include 'async' key | ||
| to determine whether to use async or sync reading. | ||
| Returns: | ||
| str: Content read from the file | ||
| Example: | ||
| >>> # Async reading | ||
| >>> content = await read_text('file.txt', context_vars={'async': True}) | ||
| >>> # Sync reading | ||
| >>> content = await read_text('file.txt', context_vars={'async': False}) | ||
| """ | ||
| if not filepath: | ||
| raise ValueError("filepath cannot be empty") | ||
| context_vars = context_vars or {'async': True} | ||
| if 'async' not in context_vars: | ||
| raise KeyError("context_vars must contain 'async' key") | ||
| try: | ||
| if context_vars["async"]: | ||
| async with aiofiles.open(filepath, mode="rt") as f: | ||
| return await f.read(size) | ||
| else: | ||
| with open(filepath, mode="rt") as f: | ||
| return f.read(size) | ||
| except FileNotFoundError: | ||
| raise FileNotFoundError(f"File not found: {filepath}") | ||
| except IOError as e: | ||
| raise IOError(f"Error reading file {filepath}: {str(e)}") | ||
| except Exception as e: | ||
| raise type(e)(f"Unexpected error reading file {filepath}: {str(e)}") |
| """ | ||
| Data conversion utilities for pandas DataFrames. | ||
| This module provides functions for converting data between different formats, | ||
| particularly focusing on string representations of complex data structures. | ||
| """ | ||
| from typing import Union, List, Optional, Any, Type | ||
| import ast | ||
| import pandas as pd | ||
| import json | ||
| from mb.utils.logging import logg | ||
| __all__ = ['convert_string_to_list', 'convert_string_to_dict', 'convert_string_to_type'] | ||
| def convert_string_to_list(df: pd.DataFrame, | ||
| column: str, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Convert string representations of lists in a DataFrame column to actual lists. | ||
| Args: | ||
| df: Input DataFrame | ||
| column: Name of the column containing string representations of lists | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with the specified column converted to lists | ||
| Example: | ||
| >>> df = pd.DataFrame({'data': ['[1, 2, 3]', '[4, 5, 6]']}) | ||
| >>> result = convert_string_to_list(df, 'data') | ||
| >>> print(result['data'].tolist()) | ||
| [[1, 2, 3], [4, 5, 6]] | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| if column not in df.columns: | ||
| raise KeyError(f"Column '{column}' not found in DataFrame") | ||
| # Create a copy to avoid modifying the original DataFrame | ||
| result = df.copy() | ||
| try: | ||
| logg.info(f"Converting column '{column}' from string to list", logger=logger) | ||
| # Convert strings to lists using ast.literal_eval | ||
| result[column] = result[column].apply(lambda x: ( | ||
| ast.literal_eval(x) if isinstance(x, str) else | ||
| x if isinstance(x, list) else | ||
| [x] if pd.notnull(x) else [] | ||
| )) | ||
| # Validate that all values are lists | ||
| non_list_mask = ~result[column].apply(lambda x: isinstance(x, list)) | ||
| if non_list_mask.any(): | ||
| problematic_rows = result.index[non_list_mask].tolist() | ||
| raise ValueError(f"Conversion failed for rows: {problematic_rows}") | ||
| logg.info(f"Successfully converted {len(result)} rows", logger=logger) | ||
| return result | ||
| except Exception as e: | ||
| logg.error(f"Error converting column '{column}': {str(e)}", logger=logger) | ||
| raise | ||
| def convert_string_to_dict(df: pd.DataFrame, | ||
| column: str, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Convert string representations of dictionaries in a DataFrame column to actual dictionaries. | ||
| Args: | ||
| df: Input DataFrame | ||
| column: Name of the column containing string representations of dictionaries | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with the specified column converted to dictionaries | ||
| Example: | ||
| >>> df = pd.DataFrame({'data': ['{"a": 1}', '{"b": 2}']}) | ||
| >>> result = convert_string_to_dict(df, 'data') | ||
| >>> print(result['data'].tolist()) | ||
| [{'a': 1}, {'b': 2}] | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| if column not in df.columns: | ||
| raise KeyError(f"Column '{column}' not found in DataFrame") | ||
| result = df.copy() | ||
| try: | ||
| logg.info(f"Converting column '{column}' from string to dict", logger=logger) | ||
| # Try both ast.literal_eval and json.loads for maximum compatibility | ||
| def safe_convert(x): | ||
| if isinstance(x, dict): | ||
| return x | ||
| if pd.isnull(x): | ||
| return {} | ||
| if not isinstance(x, str): | ||
| return {'value': x} | ||
| try: | ||
| return ast.literal_eval(x) | ||
| except (ValueError, SyntaxError): | ||
| try: | ||
| return json.loads(x) | ||
| except json.JSONDecodeError: | ||
| raise ValueError(f"Could not convert value: {x}") | ||
| result[column] = result[column].apply(safe_convert) | ||
| # Validate that all values are dictionaries | ||
| non_dict_mask = ~result[column].apply(lambda x: isinstance(x, dict)) | ||
| if non_dict_mask.any(): | ||
| problematic_rows = result.index[non_dict_mask].tolist() | ||
| raise ValueError(f"Conversion failed for rows: {problematic_rows}") | ||
| logg.info(f"Successfully converted {len(result)} rows", logger=logger) | ||
| return result | ||
| except Exception as e: | ||
| logg.error(f"Error converting column '{column}': {str(e)}", logger=logger) | ||
| raise | ||
| def convert_string_to_type(df: pd.DataFrame, | ||
| column: str, | ||
| target_type: Type, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Convert string values in a DataFrame column to a specified type. | ||
| Args: | ||
| df: Input DataFrame | ||
| column: Name of the column to convert | ||
| target_type: Type to convert values to (e.g., int, float, bool) | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with the specified column converted to target type | ||
| Example: | ||
| >>> df = pd.DataFrame({'numbers': ['1', '2', '3']}) | ||
| >>> result = convert_string_to_type(df, 'numbers', int) | ||
| >>> print(result['numbers'].tolist()) | ||
| [1, 2, 3] | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| if column not in df.columns: | ||
| raise KeyError(f"Column '{column}' not found in DataFrame") | ||
| result = df.copy() | ||
| try: | ||
| logg.info(f"Converting column '{column}' to {target_type.__name__}", logger=logger) | ||
| def safe_convert(x): | ||
| if pd.isnull(x): | ||
| return None | ||
| if isinstance(x, target_type): | ||
| return x | ||
| try: | ||
| return target_type(x) | ||
| except (ValueError, TypeError): | ||
| raise ValueError(f"Could not convert value '{x}' to {target_type.__name__}") | ||
| result[column] = result[column].apply(safe_convert) | ||
| logg.info(f"Successfully converted {len(result)} rows", logger=logger) | ||
| return result | ||
| except Exception as e: | ||
| logg.error(f"Error converting column '{column}': {str(e)}", logger=logger) | ||
| raise Exception(f"Error converting column '{column}': {str(e)}") |
| """ | ||
| DataFrame loading module with async support. | ||
| This module provides utilities for loading pandas DataFrames from various file formats | ||
| using asynchronous I/O operations for improved performance. | ||
| """ | ||
| from typing import Optional, List, Union, Any | ||
| import pandas as pd | ||
| import asyncio | ||
| import io | ||
| from ast import literal_eval | ||
| from pyarrow.parquet import ParquetFile | ||
| from mb.utils.logging import logg | ||
| __all__ = ['load_any_df'] | ||
| async def read_txt(filepath: str, size: Optional[int] = None) -> str: | ||
| """ | ||
| Asynchronously read text from a file. | ||
| Args: | ||
| filepath: Path to the file to read | ||
| size: Optional number of bytes to read | ||
| Returns: | ||
| str: Content of the file | ||
| """ | ||
| try: | ||
| with open(filepath, mode="rt") as f: | ||
| if size: | ||
| return f.read(size) | ||
| return f.read() | ||
| except FileNotFoundError: | ||
| raise FileNotFoundError(f"File not found: {filepath}") | ||
| except IOError as e: | ||
| raise IOError(f"Error reading file {filepath}: {str(e)}") | ||
| async def load_df_async(filepath: str, | ||
| chunk_size: int = 1024) -> pd.DataFrame: | ||
| """ | ||
| Load a DataFrame asynchronously from CSV or Parquet file. | ||
| Args: | ||
| filepath: Path to the input file | ||
| chunk_size: Number of rows to read per chunk | ||
| Returns: | ||
| pd.DataFrame: Loaded DataFrame | ||
| """ | ||
| def process_csv(data: io.StringIO) -> pd.DataFrame: | ||
| dfs = [] | ||
| chunk_iter = pd.read_csv(data, chunksize=1024) | ||
| for chunk in chunk_iter: | ||
| dfs.append(chunk) | ||
| return pd.concat(dfs, sort=False) | ||
| def process_parquet(data: str) -> pd.DataFrame: | ||
| try: | ||
| # Try standard parquet reading first | ||
| return pd.read_parquet(data) | ||
| except Exception: | ||
| # Fallback to pyarrow for problematic files | ||
| pf = ParquetFile(data) | ||
| table = pf.read() | ||
| return table.to_pandas() | ||
| try: | ||
| if filepath.endswith('.csv'): | ||
| data = await read_txt(filepath) | ||
| df = process_csv(io.StringIO(data)) | ||
| elif filepath.endswith('.parquet'): | ||
| df = process_parquet(filepath) | ||
| else: | ||
| raise ValueError(f"Unsupported file format: {filepath}") | ||
| return df | ||
| except Exception as e: | ||
| raise ValueError(f"Error loading file {filepath}: {str(e)}") | ||
| def load_any_df(file_path: Union[str, pd.DataFrame], | ||
| literal_ast_columns: Optional[List[str]] = None, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Load a DataFrame from various sources with support for type conversion. | ||
| This function can load from CSV files, Parquet files, or accept an existing DataFrame. | ||
| It supports asynchronous loading for better performance and can convert specified | ||
| columns using ast.literal_eval. | ||
| Args: | ||
| file_path: Path to the file or an existing DataFrame | ||
| show_progress: Whether to show a progress bar during loading | ||
| literal_ast_columns: List of column names to convert using ast.literal_eval | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: Loaded and processed DataFrame | ||
| """ | ||
| # Handle DataFrame input | ||
| if isinstance(file_path, pd.DataFrame): | ||
| return file_path | ||
| if not isinstance(file_path, str): | ||
| raise TypeError("file_path must be a string path or pandas DataFrame") | ||
| try: | ||
| # Load the DataFrame | ||
| logg.info(f"Loading DataFrame from {file_path}",logger=logger) | ||
| df = asyncio.run(load_df_async(file_path)) | ||
| # Remove unnamed columns | ||
| df = df.loc[:, ~df.columns.str.contains('^Unnamed')] | ||
| # Convert specified columns using literal_eval | ||
| if literal_ast_columns: | ||
| for col in literal_ast_columns: | ||
| if col not in df.columns: | ||
| raise KeyError(f"Column '{col}' not found in DataFrame") | ||
| logg.info(f"Converting column '{col}' using literal_eval",logger=logger) | ||
| try: | ||
| df[col] = df[col].apply(literal_eval) | ||
| except (ValueError, SyntaxError) as e: | ||
| logg.error(f"Error converting column '{col}': {str(e)}",logger=logger) | ||
| raise ValueError(f"Error converting column '{col}' using literal_eval: {str(e)}") | ||
| logg.info(f"Successfully loaded DataFrame with shape {df.shape}",logger=logger) | ||
| return df | ||
| except Exception as e: | ||
| logg.error(f"Error loading file: {str(e)}",logger=logger) | ||
| raise ValueError(f"Error loading file: {str(e)}") |
| """ | ||
| DataFrame profiling utilities. | ||
| This module provides functions for generating detailed profiling reports and comparisons | ||
| of pandas DataFrames using pandas-profiling. | ||
| """ | ||
| from typing import Union, List, Optional, Any | ||
| import pandas as pd | ||
| from pathlib import Path | ||
| from mb.utils.logging import logg | ||
| __all__ = ['create_profile', 'profile_compare'] | ||
| def create_profile(df: pd.DataFrame, | ||
| profile_name: Union[str, Path] = './pandas_profiling_report.html', | ||
| minimal: bool = False, | ||
| target: List[str] = None, | ||
| sample_size: int = 100000, | ||
| logger: Optional[Any] = None) -> None: | ||
| """ | ||
| Create a pandas profiling report for a DataFrame. | ||
| Args: | ||
| df: Input DataFrame to profile | ||
| profile_name: Output path for the HTML report | ||
| minimal: If True, creates a minimal report for better performance | ||
| target: List of target columns for correlation analysis | ||
| sample_size: Maximum number of rows to process | ||
| logger: Optional logger instance for logging operations | ||
| Example: | ||
| >>> df = pd.DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c']}) | ||
| >>> create_profile(df, 'report.html', target=['A']) | ||
| """ | ||
| try: | ||
| from pandas_profiling import ProfileReport | ||
| except ImportError: | ||
| raise ImportError( | ||
| "pandas-profiling is required for this function. " | ||
| "Install it with: pip install pandas-profiling" | ||
| ) | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| # Validate and process target columns | ||
| target = target or [] | ||
| if target: | ||
| missing_cols = set(target) - set(df.columns) | ||
| if missing_cols: | ||
| raise ValueError(f"Target columns not found in DataFrame: {missing_cols}") | ||
| # Sample large DataFrames | ||
| original_size = len(df) | ||
| if original_size > sample_size: | ||
| logg.warning(f'DataFrame size ({original_size}) exceeds limit ({sample_size})',logger=logger) | ||
| logg.info(f'Sampling {sample_size} rows',logger=logger) | ||
| df = df.sample(n=sample_size, random_state=42) | ||
| minimal = True | ||
| try: | ||
| logg.info('Generating profile report',logger=logger) | ||
| if minimal: | ||
| logg.info('Using minimal configuration for better performance',logger=logger) | ||
| # Configure profile report | ||
| profile = ProfileReport( | ||
| df, | ||
| title='Pandas Profiling Report', | ||
| minimal=minimal, | ||
| html={'style': {'full_width': True}}, | ||
| progress_bar=bool(logger), # Show progress bar if logger is provided | ||
| explorative=not minimal | ||
| ) | ||
| # Set target columns for correlation analysis | ||
| if target: | ||
| logg.info(f'Analyzing correlations for target columns: {target}',logger=logger) | ||
| profile.config.interactions.targets = target | ||
| # Save report | ||
| profile_path = Path(profile_name) | ||
| profile.to_file(output_file=profile_path) | ||
| logg.info(f'Profile report saved to: {profile_path.absolute()}',logger=logger) | ||
| except Exception as e: | ||
| logg.error(f'Error generating profile report: {str(e)}',logger=logger) | ||
| raise | ||
| def profile_compare(df1: pd.DataFrame, | ||
| df2: pd.DataFrame, | ||
| profile_name: Union[str, Path] = './pandas_compare_report.html', | ||
| sample_size: int = 100000, | ||
| logger: Optional[Any] = None) -> None: | ||
| """ | ||
| Create a comparison report between two DataFrames. | ||
| Args: | ||
| df1: First DataFrame to compare | ||
| df2: Second DataFrame to compare | ||
| profile_name: Output path for the HTML comparison report | ||
| sample_size: Maximum number of rows to process per DataFrame | ||
| logger: Optional logger instance for logging operations | ||
| Raises: | ||
| ImportError: If pandas-profiling is not installed | ||
| TypeError: If inputs are not pandas DataFrames | ||
| Example: | ||
| >>> df1 = pd.DataFrame({'A': [1, 2, 3]}) | ||
| >>> df2 = pd.DataFrame({'A': [1, 2, 4]}) | ||
| >>> profile_compare(df1, df2, 'comparison.html') | ||
| """ | ||
| try: | ||
| from pandas_profiling import ProfileReport | ||
| except ImportError: | ||
| raise ImportError( | ||
| "pandas-profiling is required for this function. " | ||
| "Install it with: pip install pandas-profiling" | ||
| ) | ||
| if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame): | ||
| raise TypeError("Both inputs must be pandas DataFrames") | ||
| try: | ||
| logg.info('Generating comparison report',logger=logger) | ||
| logg.info(f'DataFrame 1 shape: {df1.shape}',logger=logger) | ||
| logg.info(f'DataFrame 2 shape: {df2.shape}',logger=logger) | ||
| # Generate profiles for both DataFrames | ||
| profile1 = ProfileReport( | ||
| df1.sample(n=min(len(df1), sample_size), random_state=42) if len(df1) > sample_size else df1, | ||
| title="DataFrame 1", | ||
| minimal=True, | ||
| progress_bar=bool(logger) | ||
| ) | ||
| profile2 = ProfileReport( | ||
| df2.sample(n=min(len(df2), sample_size), random_state=42) if len(df2) > sample_size else df2, | ||
| title="DataFrame 2", | ||
| minimal=True, | ||
| progress_bar=bool(logger) | ||
| ) | ||
| # Generate comparison report | ||
| logg.info('Comparing profiles',logger=logger) | ||
| comparison = profile1.compare(profile2) | ||
| comparison_path = Path(profile_name) | ||
| comparison.to_file(comparison_path) | ||
| logg.info(f'Comparison report saved to: {comparison_path.absolute()}',logger=logger) | ||
| except Exception as e: | ||
| logg.error(f'Error generating comparison report: {str(e)}',logger=logger) | ||
| raise |
| """ | ||
| Transform module for pandas DataFrame operations. | ||
| This module provides utilities for DataFrame transformations, merging, and data validation. | ||
| """ | ||
| from typing import Union, List, Optional, Any | ||
| import pandas as pd | ||
| import numpy as np | ||
| import cv2 | ||
| from .dfload import load_any_df | ||
| from mb.utils.logging import logg | ||
| __all__ = ['check_null', 'remove_unnamed', 'rename_columns', 'check_drop_duplicates', 'get_dftype', 'merge_chunk', 'merge_dask'] | ||
| def merge_chunk(df1: pd.DataFrame, | ||
| df2: pd.DataFrame, | ||
| chunksize: int = 10000, | ||
| logger: Optional[Any] = None, | ||
| **kwargs) -> pd.DataFrame: | ||
| """ | ||
| Merge two DataFrames in chunks to handle large datasets efficiently. | ||
| Args: | ||
| df1: First DataFrame | ||
| df2: Second DataFrame | ||
| chunksize: Number of rows per chunk for processing | ||
| logger: Optional logger instance for logging operations | ||
| **kwargs: Additional arguments passed to pd.merge | ||
| Returns: | ||
| pd.DataFrame: Merged DataFrame | ||
| """ | ||
| if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame): | ||
| raise TypeError("Both inputs must be pandas DataFrames") | ||
| # Optimize by using smaller DataFrame as base | ||
| if df1.shape[0] > df2.shape[0]: | ||
| df1, df2 = df2, df1 | ||
| merge_on = set(df1.columns) & set(df2.columns) | ||
| if not merge_on: | ||
| raise ValueError("No common columns to merge on") | ||
| # Create chunks | ||
| list_df = [df2[i:i+chunksize] for i in range(0, df2.shape[0], chunksize)] | ||
| logg.info(f'Size of chunk: {chunksize}',logger=logger) | ||
| logg.info(f'Number of chunks: {len(list_df)}',logger=logger) | ||
| # Process first chunk | ||
| result = pd.merge(df1, list_df[0], **kwargs) | ||
| # Process remaining chunks | ||
| for chunk in list_df[1:]: | ||
| merged_chunk = pd.merge(df1, chunk, **kwargs) | ||
| result = pd.concat([result, merged_chunk], ignore_index=True) | ||
| return result | ||
| def merge_dask(df1: pd.DataFrame, | ||
| df2: pd.DataFrame, | ||
| logger: Optional[Any] = None, | ||
| **kwargs) -> pd.DataFrame: | ||
| """ | ||
| Merge two DataFrames using Dask for improved performance with large datasets. | ||
| Args: | ||
| df1: First DataFrame | ||
| df2: Second DataFrame | ||
| logger: Optional logger instance for logging operations | ||
| **kwargs: Additional arguments passed to dd.merge | ||
| Returns: | ||
| pd.DataFrame: Merged DataFrame | ||
| """ | ||
| try: | ||
| import dask.dataframe as dd | ||
| except ImportError: | ||
| raise ImportError("dask is required for this function. Install it with: pip install dask") | ||
| if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame): | ||
| raise TypeError("Both inputs must be pandas DataFrames") | ||
| logg.info('Converting pandas DataFrames to Dask DataFrames',logger=logger) | ||
| # Optimize number of partitions based on DataFrame size | ||
| npartitions = max(2, min(32, df1.shape[0] // 100000)) | ||
| ddf1 = dd.from_pandas(df1, npartitions=npartitions) | ||
| ddf2 = dd.from_pandas(df2, npartitions=npartitions) | ||
| merged_ddf = dd.merge(ddf1, ddf2, **kwargs) | ||
| logg.info('Computing merge operation',logger=logger) | ||
| merged_df = merged_ddf.compute() | ||
| logg.info('Merged DataFrame and converted back to pandas DataFrame',logger=logger) | ||
| return merged_df | ||
| def check_null(file_path: str, | ||
| fillna: bool = False, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Check and optionally handle null values in a DataFrame. | ||
| Args: | ||
| file_path: Path to the input file | ||
| fillna: If True, fills null values based on column type | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: Processed DataFrame | ||
| """ | ||
| logg.info(f'Loading file: {file_path}',logger=logger) | ||
| df = load_any_df(file_path) | ||
| logg.info(f'File shape: {df.shape}',logger=logger) | ||
| logg.info(f'File columns: {list(df.columns)}',logger=logger) | ||
| logg.info('Checking Null values',logger=logger) | ||
| for column in df.columns: | ||
| null_mask = df[column].isnull() | ||
| null_count = null_mask.sum() | ||
| if null_count > 0: | ||
| logg.warning(f'Column {column} has {null_count} null values',logger=logger) | ||
| if fillna: | ||
| if pd.api.types.is_numeric_dtype(df[column]): | ||
| fill_value = 0 if pd.api.types.is_integer_dtype(df[column]) else 0.0 | ||
| logg.info(f'Filling null values with {fill_value}',logger=logger) | ||
| df[column].fillna(fill_value, inplace=True) | ||
| else: | ||
| logg.info(f'Skipping non-numeric column {column}',logger=logger) | ||
| return df | ||
| def remove_unnamed(df: pd.DataFrame, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Remove unnamed columns from DataFrame. | ||
| Args: | ||
| df: Input DataFrame | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with unnamed columns removed | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| logg.info('Removing unnamed columns',logger=logger) | ||
| unnamed_pattern = '^Unnamed' | ||
| unnamed_cols = df.columns[df.columns.str.contains(unnamed_pattern)].tolist() | ||
| if unnamed_cols: | ||
| df = df.drop(columns=unnamed_cols) | ||
| logg.info(f'Removed columns: {unnamed_cols}',logger=logger) | ||
| return df | ||
| def rename_columns(df: pd.DataFrame, | ||
| new_column: str, | ||
| old_column: str, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Rename DataFrame columns. | ||
| Args: | ||
| df: Input DataFrame | ||
| new_column: New column name | ||
| old_column: Old column name to be renamed | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with renamed columns | ||
| Raises: | ||
| KeyError: If old_column doesn't exist in DataFrame | ||
| TypeError: If input is not a pandas DataFrame | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| if old_column not in df.columns: | ||
| logg.error(f'Column {old_column} not in DataFrame',logger=logger) | ||
| raise KeyError(f'Column {old_column} not in DataFrame') | ||
| df = df.rename(columns={old_column: new_column}) | ||
| logg.info(f'Column {old_column} renamed to {new_column}',logger=logger) | ||
| return df | ||
| def check_drop_duplicates(df: pd.DataFrame, | ||
| columns: Union[str, List[str]], | ||
| drop: bool = False, | ||
| logger: Optional[Any] = None) -> pd.DataFrame: | ||
| """ | ||
| Check and optionally remove duplicate rows based on specified columns. | ||
| Args: | ||
| df: Input DataFrame | ||
| columns: Column or list of columns to check for duplicates | ||
| drop: If True, removes duplicate rows | ||
| logger: Optional logger instance for logging operations | ||
| Returns: | ||
| pd.DataFrame: DataFrame with duplicates optionally removed | ||
| """ | ||
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
| if isinstance(columns, str): | ||
| columns = [columns] | ||
| # Validate columns exist | ||
| missing_cols = set(columns) - set(df.columns) | ||
| if missing_cols: | ||
| raise KeyError(f"Columns not found in DataFrame: {missing_cols}") | ||
| logg.info(f'Checking duplicates for columns: {columns}',logger=logger) | ||
| duplicates = df[df.duplicated(subset=columns, keep=False)] | ||
| duplicate_count = len(duplicates) | ||
| if duplicate_count > 0: | ||
| logg.warning(f'Found {duplicate_count} duplicate rows',logger=logger) | ||
| if drop: | ||
| logg.info('Removing duplicates',logger=logger) | ||
| df = df.drop_duplicates(subset=columns) | ||
| logg.info(f'Removed {duplicate_count} duplicate rows',logger=logger) | ||
| else: | ||
| logg.info('Duplicate removal not requested (drop=False)',logger=logger) | ||
| else: | ||
| logg.info('No duplicates found',logger=logger) | ||
| return df | ||
| def get_dftype(s: pd.Series) -> str: | ||
| """ | ||
| Detect the data type of a pandas Series. | ||
| This function determines whether a series contains special types like ndarrays, | ||
| sparse arrays, images, or standard types like strings and timestamps. | ||
| Args: | ||
| s: Input pandas Series | ||
| Returns: | ||
| str: Detected type as string. Possible values include: | ||
| - 'json': For lists and dictionaries | ||
| - 'ndarray': For numpy arrays | ||
| - 'SparseNdarray': For sparse numpy arrays | ||
| - 'Image': For OpenCV images | ||
| - 'str': For string data | ||
| - 'Timestamp': For datetime data | ||
| - 'Timedelta': For time duration data | ||
| - 'object': For mixed or unknown types | ||
| - 'none': For empty series | ||
| - Other pandas dtypes as strings | ||
| """ | ||
| if not isinstance(s, pd.Series): | ||
| raise TypeError("Input must be a pandas Series") | ||
| if len(s) == 0: | ||
| return 'object' | ||
| # Fast path for simple types | ||
| if pd.api.types.is_numeric_dtype(s.dtype): | ||
| return str(s.dtype) | ||
| # Check first non-null value for type inference | ||
| first_valid = s.first_valid_index() | ||
| if first_valid is not None: | ||
| val = s[first_valid] | ||
| if isinstance(val, str): | ||
| return 'str' if s.apply(lambda x: isinstance(x, str)).all() else 'object' | ||
| elif isinstance(val, (list, dict)): | ||
| return 'json' if s.apply(lambda x: isinstance(x, (list, dict))).all() else 'object' | ||
| elif isinstance(val, np.ndarray): | ||
| return 'ndarray' if s.apply(lambda x: isinstance(x, np.ndarray)).all() else 'object' | ||
| elif isinstance(val, np.SparseNdarray): | ||
| return 'SparseNdarray' if s.apply(lambda x: isinstance(x, np.SparseNdarray)).all() else 'object' | ||
| elif isinstance(val, cv2.Image): | ||
| return 'Image' if s.apply(lambda x: isinstance(x, cv2.Image)).all() else 'object' | ||
| elif isinstance(val, pd.Timestamp): | ||
| return 'Timestamp' if s.apply(lambda x: isinstance(x, pd.Timestamp)).all() else 'object' | ||
| elif isinstance(val, pd.Timedelta): | ||
| return 'Timedelta' if s.apply(lambda x: isinstance(x, pd.Timedelta)).all() else 'object' | ||
| # Check if all values are numeric | ||
| try: | ||
| pd.to_numeric(s, errors='raise') | ||
| return 'float64' | ||
| except (ValueError, TypeError): | ||
| pass | ||
| return 'object' |
| MAJOR_VERSION = 2 | ||
| MINOR_VERSION = 0 | ||
| PATCH_VERSION = 1 | ||
| version = '{}.{}.{}'.format(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION) | ||
| __all__ = ['MAJOR_VERSION', 'MINOR_VERSION', 'PATCH_VERSION', 'version'] |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.