DHuolib um produto do DHuo.data
Dhuolib é uma biblioteca projetada para gerenciar o ciclo de vida de modelos de machine learning de forma rápida e eficaz. Com a Dhuolib, é possível implementar e gerenciar ciclos de deploy, controle de versões e gerenciamento de modelos em predição de maneira simples e intuitiva. Este documento tem como objetivo demonstrar essas funcionalidades, facilitando o uso da ferramenta e potencializando sua eficiência.
==versão: 0.7.7==
https://pypi.org/project/dhuodata-lib/
Projetos referência
Funcionalidades
Nesta sessão serão abordadas as principais funcionalidades presentes na dhuolib
Análise Exploratória / Persistencia / Aquisição de Dados
A análise exploratória de dados envolve a persistência e aquisição de dados de forma eficiente. Utiliza métodos para inserção direta de dados, atualização de tabelas com DataFrames, recuperação paginada de registros e conversão de resultados em DataFrames, facilitando o trabalho dos desenvolvedores na predição de modelos de machine learning.
Class: GenericRepository
A classe GenericRepository Simplifica o uso e o acesso ao datalake. Possui diversos métodos, incluindo inserção e busca de dados. Seu objetivo é facilitar o trabalho dos desenvolvedores no processo de predição de modelos de machine learning, fornecendo uma interface simplificada para a interação com os dados.
init(self, db_connection)
O repositorio é iniciado passando uma conexão com o banco de dados como parametro no construtor
- Parameters:
- db_connection: Uma instancia do DatabaseConnection que provê a conexão e controle de acesso com o datalake
create_table_by_dataframe(self, table_name: str, df: pd.DataFrame)
Cria uma tabela baseada em um dataframe existente
- Parameters:
- table_name: Nome da tabela onde o dado sera inserido.
- dataframe: Um dataframe representando a tabela e os dados a serem inseridos
- Returns:
- Retorna o numero de itens inseridos na tabela
Example:
df = pd.DataFrame({"name": ["example"], "value": [42]})
number_lines = repo.create_table_by_dataframe("my_table", data)
update_table_by_dataframe(self, table_name: str, dataframe: pd.DataFrame, if_exists: str = "append", is_update_version: bool)
Atualiza uma tabela adicionando ou substituindo registros usando um DataFrame do pandas.
-
Parâmetros:
-
table_name: O nome da tabela a ser atualizada.
-
dataframe: Um DataFrame do pandas contendo os registros a serem inseridos.
-
if_exists: Especifica o comportamento se a tabela já existir, podendo ser replace ou append. O padrão é "append" .
-
is_update_version: Faz update da versão do dado a ser inserido na tabela que vai receber o resultado da predição . A table precisa ter três colunas ==PREDICT, CREATED_AT e VERSION.==
-
Example:
df = pd.DataFrame({"name": ["example"], "value": [42]})
repo.update_table_by_dataframe("my_table", df)
to_dataframe(self, table_name: str = None, filter_clause: str = None, list_columns: list = None)
Converte os resultados da consulta em um DataFrame do pandas.
Criação de Experimentos, Ciclo de vida do modelo e Predição
Class: DhuolibExperimentClient
DhuolibExperimentClient interage com o serviço Dhuolib para gerenciar experimentos, executar modelos, criar modelos e fazer previsões. Inclui métodos para criar e executar experimentos, criar modelos e fazer previsões em lote.
init(self, service_uri: str, token: str)
Inicializa o cliente com um endpoint de serviço.
- Parâmetros:
- service_uri ==obrigatório==: O uri da api de serviço Dhuolib
- token ==obrigatório==: token de autenticação
- Lança:
- ValueError: Se service_uri não for fornecido.
create_experiment(self, experiment_name: str, experiment_tags: dict = None) -> dict
Cria um novo experimento com o nome e tags especificados.
- Parâmetros:
- experiment_name ==obrigatório==: O nome do experimento.
- experiment_tags: Dicionário opcional de tags para o experimento.
- Retorna:
- Um dicionário contendo os detalhes do experimento criado ou uma mensagem de erro.
Exemplo:
experiment_response = dholib_client.create_experiment(
experiment_name="iris-classification", experiment_tags={"tag": "iris"}
)
execute_run_for_experiment(self, type_model: str, experiment_name: str, modelpkl_path: str, requirements_path:str, tags: dict) -> dict
Executa um experimento com o modelo e requisitos especificados.
-
Parâmetros:
- type_model ==obrigatório==: O tipo do modelo.
- experiment_name ==obrigatório==: O nome do experimento.
- modelpkl_path ==obrigatório==: O caminho para o arquivo pickle do modelo.
- requirements_path ==obrigatório==: O caminho para o arquivo de requisitos.
- tags: O parametro tags é utilizado para facilitar o processo de busca de execuções para um determinado experimento
-
Retorna:
- Um dict com os seguintes valores:
- run_id : id de execução do run
- model_uri: model_uri que sera utilizado para criar um modelo para esse run executado
-
Exemplo:
experiment_run = dholib_client.execute_run_for_experiment(
type_model="lightgbm",
experiment_name="lighhtgbm-iris-classification053",
tags={"version": "v2", "priority": "P2"},
modelpkl_path="{path}/iris.pkl",
requirements_path="{path}/requirements.txt"
)
create_model(self, model_params) -> dict
Cria um novo modelo com os parâmetros especificados.
- Parâmetros:
- modelname ==obrigatório==: Nome que sera dado para identificar o modelo
- stage : O estado do modelo podendo ser NONE, STAGING, PRODUCTION e ARCHIVED
- run_id: O run id da execução do experimento
- model_uri==obrigatório==: O model path da execução do experimento
- tags: O parametro tags é utilizado para facilitar o processo de busca de modelos
- Retorna:
- Um dicionário contendo os detalhes do modelo criado ou uma mensagem de erro.
- model_version: O numero que identifica a versão do modelo
- model_version_name: O nome dessa versão do modelo
- run_id: O identificador unico da execução do experimento para um pkl especifico
- previous_stage: O estado anterior do modelo
- current_stage: O estado atual do modelo
- last_updated_timestamp: data demontrando o tempo da ultima alteração do modelo
Exemplo:
dholib_client.create_model(modelname="iris-classification",
stage="Production",
run_id=experiment_run["run_id"],
model_uri=experiment_run["model_uri"],
tags={"type_model": "lightgbm"})
search_experiments(filter_string: str = "", max_results: int = 10, page_token: str = "" ) -> dict:
Busca por experimentos atraves de um filtro passada por parametro
- Parâmetros:
- max_results: Quantidade de experimentos que são retornados . Valor default é 10
- page_token: Token code que deve ser passado para buscar novos valores paginados
- filter_string: String de consulta de filtro (por exemplo, "name = 'my_experiment'"), por padrão, busca todos os experimentos. Os seguintes identificadores, comparadores e operadores lógicos são suportados.
- Retorna uma lista de experimentos com os seguintes parâmetros:
- experiment_id: Identificador do experimento.
- experiment_name: Nome do experimento.
- lifecycle_stage: Estágio do ciclo de vida do experimento.
- creation_time: Hora de criação do experimento.
- last_update_time: Hora da última atualização do experimento.
- tags: Tags associadas ao experimento.
results = dholib_client.search_experiments(
filter_string="tags.version LIKE 'v%'", max_results=10
)
results = dholib_client.search_experiments(
filter_string="tags.version='v1'", max_results=2
)
print("All Experiments:")
for result in results["experiments"]:
print(result)
search_runs(filter_string: str = "", max_results: int = 10, page_token: str = "", experiment_name: str = "") -> dict:
Busca por execuções através de um filtro passado por parâmetro
- Parâmetros:
- max_results: Número máximo de resultados a serem retornados.
- page_token: Token code que deve ser passado para buscar novos valores paginados.
- experiment_name: Nome do experimento para filtrar as execuções.
- filter_string:
- Retorna uma lista de execuções com os seguintes parâmetros:
- run_id: Identificador da execução.
- experiment_id: Identificador do experimento.
- status: Status da execução.
- start_time: Hora de início da execução.
- end_time: Hora de término da execução.
- metrics: Métricas da execução.
- params: Parâmetros da execução.
- tags: Tags da execução.
dholib_client = DhuolibExperimentMLClient(service_uri)
print("Runs:")
results = dholib_client.search_runs(
experiment_name="wine-regression",
filter_string="tags.type_model='keras'",
max_results=2,
)
for result in results["runs"]:
print(result)
search_models(filter_string: str = "", max_results: int = 10, page_token: str = "") -> dict:
Busca por modelos através de um filtro passado por parâmetro
- Parâmetros:
-
max_results: Número máximo de resultados a serem retornados.
-
page_token: Token da página para busca paginada.
-
filter_string: String de consulta de filtro (por exemplo, "name = 'a_model_name' and tag.key = 'value1'"), por padrão, busca todas as versões de modelos. Os seguintes identificadores, comparadores e operadores lógicos são suportados.
Identificadores:
- name: Nome do modelo.
- source_path: Caminho da fonte da versão do modelo.
- run_id: ID da execução do mlflow que gerou a versão do modelo.
- tags.<tag_key>: Tag da versão do modelo. Se tag_key contiver espaços, deve ser envolvida por crases (por exemplo, "tags.
extra key
").
Comparadores:
- =: Igual a.
- !=: Diferente de.
- LIKE: Correspondência de padrão sensível a maiúsculas e minúsculas.
- ILIKE: Correspondência de padrão insensível a maiúsculas e minúsculas.
- IN: Em uma lista de valores. Apenas o identificador run_id suporta o comparador IN.
Operadores Lógicos:
- AND: Combina duas subconsultas e retorna True se ambas forem True.
- Retorna uma lista de modelos com os seguintes parâmetros:
- model_id: Identificador do modelo.
- model_name: Nome do modelo.
- creation_time: Hora de criação do modelo.
- last_update_time: Hora da última atualização do modelo.
- tags: Tags associadas ao modelo.
results = dholib_client.search_models(
filter_string="tags.version='v1'", max_results=2
)
transition_model(self, model_name: str, version: str, stage: str) -> dict:
Faz a transição dos modelos para diferentes ambientes por padrão o valor é None. Existem outros ambientes tambem Production, Stagging ou Archive.
- Parâmetros:
- model_name ==obrigatório==: O nome do modelo
- version ==obrigatório==: A versão do modelo que se deseja modar de ambiente
- stage ==obrigatório==: Qual ambiente que se quer transicionar
- Retorna:
- Uma messagem dcom indicando que o modelo foi transicionado ou uma mensagem de erro.
download_pkl(self, batch_params: dict) -> model
Faz o download de um determinado pkl.
- Parâmetros:
- batch_params ==obrigatório==: Um dicionário contendo os parâmetros da previsão em lote.
- Retorna:
- O load do pkl que foi baixado do servidor.
Exemplo:
batch_params = {
"modelname": "iris-classification-lightgbm",
"stage": "Production",
"experiment_name": "iris-classification",
"type_model": "lightgbm",
"run_id": "",
"batch_model_dir": "iris.pkl",
}
response = client.download_pkl(batch_params)
prediction_batch_with_dataframe(self, batch_params: dict, df: pd.DataFrame) -> dict
Faz uma previsão em lote usando um DataFrame do pandas.
- Parâmetros:
- batch_params ==obrigatório==: Um dicionário contendo os parâmetros da previsão em lote.
- df ==obrigatório==: Um DataFrame do pandas contendo os dados para a previsão.
- Retorna:
- Um dicionário contendo os resultados da previsão em lote ou uma mensagem de erro.
Exemplo:
batch_params = {
"modelname": "iris-classification-lightgbm",
"stage": "Production",
"experiment_name": "iris-classification",
"type_model": "lightgbm",
"run_id": "121",
"batch_model_dir": "iris.pkl",
}
response = client.prediction_batch_with_dataframe(batch_params, dataframe)
Exemplo de Uso
import pickle
import lightgbm as lgb
from dhuolib.clients.experiment import DhuolibExperimentMLClient
from dhuolib.repository import DatabaseConnection, GenericRepository
def get_repository(config_file_name):
if not config_file_name:
raise ValueError("config_file_name is required")
db = DatabaseConnection(config_file_name=config_file_name)
repository = GenericRepository(db_connection=db)
return repository
def train():
service_uri = "https://dhuo-data-api-data-service-stg.br.engineering"
dholib_client = DhuolibExperimentMLClient(
service_uri=service_uri,
token="{token}"
)
repository = get_repository(
config_file_name="{path}/config/database.json"
)
df_iris_train = repository.to_dataframe(table_name="IRIS_TRAIN")
print(df_iris_train.columns)
X = df_iris_train[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
y = df_iris_train["class"]
clf = lgb.LGBMClassifier()
clf.fit(X, y)
with open("iris.pkl", "wb") as f:
pickle.dump(clf, f)
dholib_client.create_experiment(
experiment_name="lightgbm-iris", experiment_tags={"tag": "iris"}
)
experiment_run = dholib_client.execute_run_for_experiment(
type_model="lightgbm",
experiment_name="lightgbm-iris",
tags={"version": "v2", "priority": "P2"},
modelpkl_path="{path}/iris.pkl",
requirements_path="{path}/requirements.txt",
)
print(experiment_run)
result = dholib_client.create_model(
modelname="iris-classification",
stage="Production",
run_id=experiment_run["run_id"],
model_uri=experiment_run["model_uri"],
tags={"type_model": "lightgbm"},
)
print(result)
if __name__ == "__main__":
train()
Class: DhuolibPlatformClient
DhuolibPlatformClient interage com o serviço Dhuolib para gerenciar projetos em lote, implantar scripts, verificar o status do pipeline, criar clusters e executar pipelines em lote.
init(self, service_uri=None, project_name=None)
Inicializa o cliente com um endpoint de serviço e um nome de projeto opcional.
- Parâmetros:
- service_endpoint==obrigatório==: O endpoint do serviço Dhuolib.
- project_name: Nome opcional do projeto.
- token ==obrigatório==: token de autenticação
- Lança:
- ValueError: Se o projeto já existir.
- ConnectionError: Se houver um erro de conexão.
create_batch_project(self, project_name: str)
Cria um novo projeto em lote com o nome especificado.
deploy_batch_project(self, script_filename: str, requirements_filename: str)
Implanta um projeto em lote com o script e requisitos especificados.
- Parâmetros:
- script_filename ==obrigatório==: O nome do arquivo do script.
- requirements_filename ==obrigatório==: O nome do arquivo de requisitos.
- Retorna:
- A resposta do serviço Dhuolib ou uma mensagem de erro.
- Lança:
- ValueError: Se project_name, script_filename ou requirements_filename não foram fornecidos
- FileNotFoundError: Se os arquivos especificados não forem encontrados.
Exemplo:
response = dholib_platform.deploy_batch_project(
script_filename="{path}/script.py",
requirements_filename="{path}/requirements.txt"
)
pipeline_status_report(self)
Gera um relatório de status do pipeline para o projeto em lote.
create_cluster(self, cluster_size: int)
Cria um cluster com o tamanho especificado para o projeto em lote.
batch_run(self)
Executa o pipeline em lote para o projeto.
schedule_batch_run(self, project_name: str, schedule_interval: str):
Cria o agendamento de execução de um processamento em batch
-
Parâmetros:
- project_name ==obrigatório==: O nome do projeto que foi utilizado anteriormente
- schedule_interval ==obrigatório==: O padrão de schedule a ser atribuido para o projeto ==schedule_interval="*/5 * * * *"==
-
Lança:
- ValueError: Se o project_name não for fornecido.
-
Retorna:
- Retorna um json com uma mensagem indicando se a schedule foi ou não criada
-
Exemplo:
dholib_platform.schedule_batch_run(
project_name="lightgbm-iris",
schedule_interval="*/5 * * * *"
)
remove_schedule(self, project_name: str):
Deleta o agendamento de execução de um determinado projeto
-
Parâmetros:
- project_name ==obrigatório==: O nome do projeto que foi utilizado anteriormente
-
Lança:
- ValueError: Se o project_name não for fornecido.
-
Retorna:
- Retorna um json com uma mensagem indicando se a schedule foi ou não criada
dholib_platform.remove_schedule(project_name="lightgbm-iris")
Iris-t com lightgbm
Para utilizar o Dhuolib para predição em batch usando o Data Lake, é necessário ter duas fontes de dados. A primeira fonte contém as features com os valores para treinamento e a segunda fonte contém os valores que serão usados para a inferência. Exemplos dessas fontes podem ser as tabelas: IRIS_TRAIN para treinamento e IRIS_DATA_FOR_INFERENCY para inferência.
Os dados para inferência e os dados de treinamento podem ser substituídos por DataFrames pandas obtidos de outras fontes. Ao salvar os dados no Data Lake, é importante ter em mente que a tabela deve conter, no mínimo, três colunas obrigatórias: ==PREDICT, CREATED_AT e VERSION.==
CREATE TABLE "DHUODATA"."IRIS_OUTPUT"
(
"SEPAL_LENGTH" NUMBER,
"SEPAL_WIDTH" NUMBER,
"PETAL_LENGTH" NUMBER,
"PETAL_WIDTH" NUMBER,
"PREDICT" NUMBER(*,0),
"VERSION" NUMBER(*,0),
"CREATED_AT" TIMESTAMP (6)
);
Exemplo de Colunas Necessárias:
- ==PREDICT:== Coluna que armazenará as previsões geradas pelo modelo.
- ==CREATED_AT==: Coluna que registrará a data e hora em que a previsão foi feita.
- ==VERSION:== Coluna que indicará a versão do modelo utilizado para a previsão.
Ao garantir que essas colunas estejam presentes, você assegura a integridade e a rastreabilidade dos dados no Data Lake, facilitando o monitoramento e a manutenção do pipeline de predição.
Na primeira etapa é importante criar o código de treino para gerar o pickle
Apos o pickle ser gerado é necessário cadastrar o pickle
- dholib_client.create_experiment - crie o experimento
- dholib_client.run_experiment - execute o experimento logando ele no nosso sistema
- dholib_client.create_model - crie o modelo. O modelo uma vez criado é apto para ser posto em produção. O modelo pode ter 4 estados None, Production, Stagging ou Archive
- None: Este é o estado padrão de um modelo quando ele é registrado pela primeira vez. Ele indica que o modelo ainda não foi atribuído a nenhum dos outros estados.
- Staging: Este estado é usado para modelos que estão em um ambiente de teste. Eles foram testados em um ambiente isolado e estão prontos para serem movidos para produção.
- Production: Este estado é usado para modelos que estão atualmente em uso em um ambiente de produção.
- Archived: Este estado é usado para modelos que não estão mais em uso. Eles são mantidos para fins de registro e não devem ser usados para inferência.
Treino:
import pickle
import lightgbm as lgb
from dhuolib.clients.experiment import DhuolibExperimentMLClient
from dhuolib.repository import DatabaseConnection, GenericRepository
def get_repository(config_file_name):
if not config_file_name:
raise ValueError("config_file_name is required")
db = DatabaseConnection(config_file_name=config_file_name)
repository = GenericRepository(db_connection=db)
return repository
def train():
service_uri = ""
dholib_client = DhuolibExperimentMLClient(service_uri=service_uri)
repository = get_repository(
config_file_name="{path}/database.json"
)
df_iris_train = repository.to_dataframe(table_name="IRIS_TRAIN")
print(df_iris_train.columns)
X = df_iris_train[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
y = df_iris_train["class"]
clf = lgb.LGBMClassifier()
clf.fit(X, y)
with open("iris.pkl", "wb") as f:
pickle.dump(clf, f)
dholib_client.create_experiment(
experiment_name="lightgbm-iris", experiment_tags={"tag": "iris"}
)
experiment_run = dholib_client.execute_run_for_experiment(
type_model="lightgbm",
experiment_name="lightgbm-iris",
tags={"version": "v2", "priority": "P2"},
modelpkl_path=f"{path}/iris.pkl",
requirements_path=f"{path}/requirements.txt",
)
print(experiment_run)
result = dholib_client.create_model(
modelname="iris-classification",
stage="Production",
run_id=experiment_run["run_id"],
model_uri=experiment_run["model_uri"],
tags={"type_model": "lightgbm"},
)
print(result)
if __name__ == "__main__":
train()
Inferência
Esse é o script de predição. Para que o fluxo em lote seja executado é necessario fazer um deploy desse arquivo junto com o requirements na dhuolib. Ao fazer isso o script estará disponivel para o processamento em batch.
Obs: O nome deve ser sempre script.py
from dhuolib.clients.experiment import DhuolibExperimentMLClient
from dhuolib.repository import DatabaseConnection, GenericRepository
def get_repository(config_file_name):
if not config_file_name:
raise ValueError("config_file_name is required")
db = DatabaseConnection(config_file_name=config_file_name)
repository = GenericRepository(db_connection=db)
return repository
def predict():
service_uri = ""
dholib_client = DhuolibExperimentMLClient(service_uri=service_uri)
repository = get_repository(
config_file_name="{path}/database.json"
)
df_iris = repository.to_dataframe(
table_name="IRIS_FEATURE",
list_columns=["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"],
)
print(df_iris.head(10))
batch_params = {
"modelname": "iris-classification-lightgbm",
"stage": "Production",
"experiment_name": "lightgbm-iris",
"type_model": "lightgbm",
"run_id": "",
"batch_model_dir": "iris.pkl",
}
predicts = dholib_client.prediction_batch_with_dataframe(
batch_params=batch_params, df=df_iris
)
df_iris["predict"] = predicts
repository.update_table_by_dataframe(
table_name="iris_output", df_predict=df_iris, is_update_version=True
)
if __name__ == "__main__":
predict()
Fazendo o deploy e executando a solução
-
dholib_platform.create_batch_project - Crie o projeto
from dhuolib.clients.platform import DhuolibPlatformClient
dholib_platform = DhuolibPlatformClient(service_endpoint="http://{endpoint}", project_name="lightgbm-iris")
dholib_platform.create_batch_project('lightgbm-iris')
-
dholib_platform.deploy_batch_project - Faça o deploy do projeto. Lembre de passar o caminho do script e o caminho do requirements
response = dholib_platform.deploy_batch_project(
script_filename="{path}/script.py",
requirements_filename="{path}/requirements.txt"
)
-
dholib_platform.pipeline_status_report - Verifique os status da execução
dholib_platform.pipeline_status_report()
Executando o projeto remotamente
from dhuolib.clients.platform import DhuolibPlatformClient
dholib_platform = DhuolibPlatformClient(
service_uri="http://dhuo-data-api-data-service.br.engineering",
project_name="lightgbm-iris",
)
-
dholib_platform.create_cluster - Crie o cluster. O cluster possui 3 niveis de capacidade. SMALL(1), MEDIUM(2) ou LARGE(3).
response_create_cluster = dholib_platform.create_cluster(1)
-
dholib_platform.batch_run - Execute o script do projeto
response_batch_run = dholib_platform.batch_run()