📦 BRIBrain

BRIBrain adalah Python package standar yang digunakan sebagai library untuk proses data engineering di Departemen BRIBrain. Package ini menyediakan berbagai modul untuk konfigurasi Spark, ingestion data, penulisan data ke berbagai target (Hive, PostgreSQL, MySQL, HDFS, Excel), utility functions, metadata, dan notifikasi via Telegram.
📋 Daftar Isi
🚀 Instalasi
Gunakan package manager pip untuk menginstal bribrain.
pip install bribrain
⚡ Quick Start
import bribrain
spark = bribrain.sparkSession()
partitions = bribrain.get_list_partition(spark, "schema_name", "table_name")
bribrain.load_to_hive(spark, df, schema="my_schema", table="my_table")
📚 Modul
1. Config — Konfigurasi Spark Session
Modul untuk membuat dan mengkonfigurasi Spark Session.
sparkSession() — Membuat Spark Session
Membuat spark session yang dapat digunakan untuk proses data engineering.
Parameter:
appname | str | "Bribrain Packages SparkSession" | Nama dari Spark Session |
executor | str | "small" | Standard resource (small, medium, high, custom) |
instances | int | 2 | Jumlah instance Spark Session |
cores | int | 2 | Jumlah cores Spark Session |
memory | int | 4 | Memory Spark Session (GB) |
overhead | int | 2 | Overhead memory Spark Session (GB) |
verbose | bool | False | Menampilkan console progress |
extra_configs | dict | None | Additional Spark config |
Preset Executor:
small | 2 | 2 | 4 GB | 2 GB |
medium | 4 | 4 | 6 GB | 2 GB |
high | 6 | 6 | 8 GB | 4 GB |
custom | sesuai input | sesuai input | sesuai input | sesuai input |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
spark = bribrain.sparkSession(executor="medium")
spark = bribrain.sparkSession(
appname="My App",
executor="custom",
instances=4,
cores=4,
memory=8,
overhead=4
)
spark = bribrain.sparkSession(
extra_configs={
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true"
}
)
Return: pyspark.sql.session.SparkSession
template() — Membuat Template Foldering
Membuat template foldering yang digunakan sebagai standard untuk proses data engineering.
Contoh Penggunaan:
import bribrain
bribrain.template()
⚠️ Fungsi ini akan mengunduh template dari HDFS dan mengekstraknya ke /home/cdsw/.
2. Function — Utility Functions
Modul yang berisi berbagai fungsi utilitas untuk mendukung proses data engineering.
try_or() — Menangkap Error
Menangkap error dari suatu fungsi dan memberikan keluaran alternatif.
Parameter:
func | function | required | Python function (tambahkan lambda sebelum nama function) |
default | object | None | Nilai keluaran jika terjadi error |
expected_exc | Exception | (Exception,) | Tipe exception yang diharapkan |
Contoh Penggunaan:
import bribrain
result = bribrain.try_or(lambda: int("abc"), default=0)
print(result)
result = bribrain.try_or(lambda: int("123"), default=0)
print(result)
Return: object — keluaran dari function atau default value jika terjadi error
set_timer() — Menetapkan Waktu Awal
Menetapkan waktu awal untuk penghitungan durasi proses.
Contoh Penggunaan:
import bribrain
bribrain.set_timer()
Return: float — waktu dalam format float
get_timer() — Mendapatkan Durasi Proses
Mendapatkan durasi proses berdasarkan waktu awal dikurangi waktu sekarang.
Parameter:
start_time | float | None | Timestamp waktu awal (jika None, menggunakan set_timer()) |
Contoh Penggunaan:
import bribrain
bribrain.set_timer()
duration = bribrain.get_timer()
print(duration)
Return: string — durasi proses dengan format HH:MM:SS
show_time() — Menampilkan Waktu Saat Ini
Menampilkan waktu saat ini sesuai format yang diinginkan.
Parameter:
format_time | str | "%Y-%m-%d %H:%M:%S" | Format waktu |
Contoh Penggunaan:
import bribrain
print(bribrain.show_time())
print(bribrain.show_time("%d/%m/%Y"))
Return: string — waktu saat ini sesuai format input
remove_blank_space() — Menghapus Spasi Kosong pada Kolom
Menghapus spasi kosong pada kolom tertentu. Jika kolom hanya berisi spasi, akan diubah menjadi None.
Parameter:
x | str | required | Nama kolom yang akan dibersihkan |
Contoh Penggunaan:
import bribrain
from pyspark.sql import functions as F
df = df.withColumn("nama", bribrain.remove_blank_space("nama"))
df.show()
Return: pyspark.sql.functions.Column — kolom dengan spasi kosong dihapus
cleanse_blank_space() — Membersihkan Spasi pada Semua Kolom String
Membersihkan spasi kosong pada semua kolom bertipe string dalam DataFrame secara otomatis.
Parameter:
df_to_clean | pyspark.sql.DataFrame | required | DataFrame yang akan dibersihkan |
Contoh Penggunaan:
import bribrain
df_clean = bribrain.cleanse_blank_space(df)
df_clean.show()
Return: pyspark.sql.DataFrame — DataFrame dengan spasi kosong dihapus pada semua kolom string
lower_columns_name() — Mengubah Nama Kolom Menjadi Huruf Kecil
Mengubah semua nama kolom dalam DataFrame menjadi huruf kecil.
Parameter:
df | pyspark.sql.DataFrame | required | DataFrame yang akan diubah |
Contoh Penggunaan:
import bribrain
df = bribrain.lower_columns_name(df)
print(df.columns)
Return: pyspark.sql.DataFrame — DataFrame dengan nama kolom huruf kecil
pandas_to_spark() — Konversi Pandas ke Spark DataFrame
Mengkonversi pandas DataFrame menjadi Spark DataFrame dengan schema otomatis.
Parameter:
pdf | pandas.DataFrame | required | DataFrame pandas yang akan dikonversi |
tz_to_utc | bool | True | Konversi kolom datetime yang memiliki timezone ke UTC |
Contoh Penggunaan:
import bribrain
import pandas as pd
pdf = pd.DataFrame({
"nama": ["Andri", "Budi"],
"umur": [28, 30]
})
spark_df = bribrain.pandas_to_spark(pdf)
spark_df.show()
Return: pyspark.sql.DataFrame
get_list_partition() — Mendapatkan List Partisi
Memperoleh list partisi dari hive table yang diurutkan dari partisi terbaru.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema dari table di Hive |
table | str | required | Nama table di Hive |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
partitions = bribrain.get_list_partition(spark, "my_schema", "my_table")
print(partitions)
Return: list — list partisi diurutkan dari partisi terbaru
get_first_partition() — Mendapatkan Partisi Pertama
Memperoleh partisi pertama dari hive table.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema dari table di Hive |
table | str | required | Nama table di Hive |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
first = bribrain.get_first_partition(spark, "my_schema", "my_table")
print(first)
Return: dict — partisi pertama
get_last_partition() — Mendapatkan Partisi Terakhir
Memperoleh partisi terakhir dari hive table.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema dari table di Hive |
table | str | required | Nama table di Hive |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
last = bribrain.get_last_partition(spark, "my_schema", "my_table")
print(last)
Return: dict — partisi terakhir
check_partition_exists() — Mengecek Keberadaan Partisi
Mengecek keberadaan partisi pada tabel HDFS dan mengembalikan metadata partisi terkait.
Parameter:
spark | SparkSession | required | Spark session |
target_value | str | required | Nilai partisi yang dicari |
schema | str | required | Nama schema dari table di Hive |
table | str | required | Nama table di Hive |
mode | str | None | Mode pengecekan |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
result = bribrain.check_partition_exists(spark, "2026-02", "my_schema", "my_table")
if result:
print("Partisi ditemukan:", result)
else:
print("Partisi tidak ditemukan")
Return: dict | None — detail kolom partisi atau None jika tidak ditemukan
put_to_bridrive() — Upload File ke BRIDrive
Mengirimkan file dari CDSW ke BRIDrive (Seafile).
Parameter:
file_path | str | required | Path file yang akan di upload |
upload_url | str | required | Upload link dari Seafile (BRIDrive) |
Contoh Penggunaan:
import bribrain
bribrain.put_to_bridrive(
file_path="resources/excel/report.xlsx",
upload_url="https://bridrive.example.com/upload-link"
)
put_to_hdfs() — Upload File ke HDFS
Mengirimkan file dari CDSW ke HDFS.
Parameter:
file_path | str | required | Path file yang akan di upload |
hdfs_path | str | "/tmp/development/bribrain" | Path lokasi penyimpanan di HDFS |
Contoh Penggunaan:
import bribrain
bribrain.put_to_hdfs("resources/csv/data.csv")
bribrain.put_to_hdfs("resources/csv/data.csv", hdfs_path="/tmp/my_project")
get_from_hdfs() — Download File dari HDFS
Mengambil file dari HDFS ke CDSW.
Parameter:
file_path | str | required | Path file di HDFS yang akan dipindahkan |
cdsw_path | str | "/home/cdsw/" | Path penyimpanan file di CDSW |
Contoh Penggunaan:
import bribrain
bribrain.get_from_hdfs("/tmp/development/bribrain/data.csv")
bribrain.get_from_hdfs("/tmp/data.csv", cdsw_path="/home/cdsw/data/")
list_file_hdfs() — Menampilkan List File di HDFS
Memperlihatkan list file yang ada pada lokasi HDFS.
Parameter:
hdfs_path | str | "/user/{username}/" | Path HDFS yang ingin ditampilkan |
Contoh Penggunaan:
import bribrain
bribrain.list_file_hdfs()
bribrain.list_file_hdfs("/tmp/development/bribrain")
get_formatted_bytes() — Format Ukuran File
Mengonversi ukuran file dari bytes ke format yang mudah dibaca (PB, TB, GB, MB, KB, B).
Parameter:
size | int | required | Ukuran file dalam bytes |
Contoh Penggunaan:
import bribrain
print(bribrain.get_formatted_bytes(1073741824))
print(bribrain.get_formatted_bytes(5242880))
Return: str
get_list_table() — Mendapatkan List Table
Memperoleh list table dari schema di Hive.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema di Hive |
limit | int | None | Jumlah maksimum table |
sort | bool | None | True | True = A-Z, False = Z-A, None = tanpa urutan |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
tables = bribrain.get_list_table(spark, "my_schema")
print(tables)
tables = bribrain.get_list_table(spark, "my_schema", limit=5)
Return: list | None
get_list_tables_with_metadata() — Mendapatkan List Table dengan Metadata
Memperoleh list table beserta metadata (ukuran, owner, tanggal modifikasi, dll) dari schema di HDFS.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema di Hive |
owner | str | None | None | Filter berdasarkan owner (skip perhitungan size untuk table lain) |
sort | str | None | "oldest" | Urutan: "oldest", "newest", "biggest", "smallest", None |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
result = bribrain.get_list_tables_with_metadata(spark, "dgv", sort="biggest")
for r in result:
print(r['table'], r['size_formatted'], r['owner'])
result = bribrain.get_list_tables_with_metadata(spark, "dgv", owner="infabri")
Return: list[dict] — setiap dict berisi: table, owner, size, size_formatted, space_consumed, space_consumed_formatted, directory_count, file_count, modified_date, modified_date_formatted, path
drop_hive_table() — Menghapus Table dari Hive
Menghapus satu atau beberapa table dari Hive.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema di Hive |
table | str | list[str] | required | Nama table atau list table yang akan dihapus |
verbose | bool | True | Menampilkan informasi setiap table yang dihapus |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
bribrain.drop_hive_table(spark, "my_schema", "old_table")
bribrain.drop_hive_table(spark, "my_schema", ["table_a", "table_b", "table_c"])
Return: list[str] | None — list table yang berhasil dihapus
3. Ingestion — Standard Ingestion ke Hive
Modul orchestrator untuk proses standard ingestion data ke Hive.
ingest_to_hive() — Orchestrator Ingestion
Orchestrator untuk proses standard ingestion data ke Hive. Mengatur seluruh alur ingestion mulai dari validasi, penambahan kolom partisi, penulisan data, hingga pencatatan metadata.
Parameter:
spark | SparkSession | required | Spark session |
params | dict | required | Parameter pendukung dalam bentuk dictionary |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
params = {
"schema": "my_schema",
"table": "my_table",
"partition": "year|month",
"mode": "append",
}
result = bribrain.ingest_to_hive(spark, params)
Return: dict — parameter hasil yang disimpan dalam bentuk dictionary
4. Load — Penulisan Data ke Berbagai Target
Modul untuk menulis/menyimpan data ke berbagai target: Hive, HDFS (CSV/Parquet/JSON/Excel), PostgreSQL, dan MySQL.
📂 Hive
load_to_hive() — Menulis DataFrame ke Hive
Melakukan penulisan PySpark DataFrame sebagai tabel Hive di HDFS.
Parameter:
spark | SparkSession | required | Spark session |
df | DataFrame | required | PySpark DataFrame |
schema | str | required | Target schema di HDFS |
table | str | required | Target table di HDFS |
partition | str | None | Kolom partisi (pisahkan dengan | jika lebih dari satu) |
mode | str | "append" | Mode penulisan (append, overwrite, error, ignore) |
repartition_number | int | None | None | Jumlah file parquet per partisi |
blocksize | int | None | 256 | Ukuran per file dalam MB (64, 128, 256, 512, None) |
modified_date | bool | True | Menambahkan kolom modified_date |
schema_temp | str | "sharingvision" | Schema untuk temp table |
count | bool | False | Menampilkan count data tersimpan |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
bribrain.load_to_hive(spark, df, schema="my_schema", table="my_table")
bribrain.load_to_hive(
spark, df,
schema="my_schema",
table="my_table",
partition="year|month",
mode="overwrite",
count=True
)
bribrain.load_to_hive(
spark, df,
schema="my_schema",
table="my_table",
modified_date=False,
blocksize=128
)
Return: int | None — total record jika count=True, None jika tidak
📂 HDFS File
load_to_file() — Menulis DataFrame ke HDFS (CSV/Parquet/JSON/Excel)
Melakukan penulisan PySpark DataFrame ke HDFS dengan format file tertentu.
Parameter:
df | DataFrame | required | PySpark DataFrame |
file_name | str | required | HDFS path / nama file (tanpa ekstensi) |
file_type | str | "csv" | Tipe file (csv, parquet, json, excel) |
repartition_number | int | 1 | Jumlah file hasil (termasuk Excel) |
delimiter | str | "," | Pemisah antar kolom (untuk CSV) |
download | bool | True | Download ke local CML |
delete_hdfs_file | bool | True | Hapus file di HDFS setelah download |
Contoh Penggunaan:
import bribrain
bribrain.load_to_file(df, "my_report", file_type="csv")
bribrain.load_to_file(df, "my_report", file_type="excel")
bribrain.load_to_file(df, "my_report", file_type="excel", repartition_number=3)
bribrain.load_to_file(df, "my_data", file_type="parquet")
bribrain.load_to_file(df, "my_report", file_type="csv", download=False)
download_hdfs_file() — Download File dari HDFS
Mengunduh semua file partisi HDFS dari direktori user ke lokal dan melakukan pembersihan opsional.
Parameter:
file_name | str | required | Nama file/direktori target di HDFS |
file_type | str | required | Tipe file (csv, parquet, json, excel) |
delete_hdfs_file | bool | False | Hapus direktori di HDFS setelah download |
Contoh Penggunaan:
from bribrain.load import download_hdfs_file
download_hdfs_file("my_report", "csv", delete_hdfs_file=True)
Return: list[str] — daftar path file lokal yang berhasil diunduh
load_to_excel() — Menulis DataFrame ke Excel (Multi-Sheet)
Melakukan penulisan PySpark DataFrame ke HDFS dengan format Excel. Mendukung multi-sheet — Anda dapat menulis beberapa DataFrame sekaligus sebagai sheet terpisah dalam satu file Excel.
Parameter:
df | DataFrame | list[DataFrame] | required | Satu DataFrame atau list DataFrame |
file_name | str | required | HDFS path / nama file (tanpa ekstensi) |
sheet_name | str | list[str] | None | None | Nama sheet atau list nama sheet |
repartition_number | int | 1 | Jumlah file excel hasil |
download | bool | True | Download ke local CML |
delete_hdfs_file | bool | True | Hapus file di HDFS setelah download |
Aturan sheet_name:
None → sheet diberi nama otomatis: Sheet1, Sheet2, dst.
str → hanya berlaku untuk satu DataFrame
list[str] → jumlah harus sama dengan jumlah DataFrame
Contoh Penggunaan:
import bribrain
bribrain.load_to_excel(df, "my_report")
bribrain.load_to_excel(df, "my_report", sheet_name="Summary")
bribrain.load_to_excel(
[df_revenue, df_cost, df_profit],
"financial_report",
sheet_name=["Revenue", "Cost", "Profit"]
)
bribrain.load_to_excel(
[df1, df2],
"my_report"
)
🐘 PostgreSQL
load_to_postgresql() — Menulis DataFrame ke PostgreSQL
Melakukan penulisan PySpark DataFrame sebagai tabel di PostgreSQL.
Parameter:
df | DataFrame | required | PySpark DataFrame |
database | str | required | Target database |
schema | str | required | Target schema |
table | str | required | Target table |
ip | str | required | Target IP |
port | str | required | Target port |
username | str | required | Username |
password | str | required | Password |
mode | str | "append" | Mode penulisan (append, overwrite, error, ignore) |
strategy | str | None | Proses sebelum penulisan (truncate, drop, None) |
ddl | str | None | Query CREATE TABLE |
count | bool | True | Menampilkan count data tersimpan |
Contoh Penggunaan:
import bribrain
bribrain.load_to_postgresql(
df,
database="mydb",
schema="public",
table="my_table",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret"
)
bribrain.load_to_postgresql(
df,
database="mydb",
schema="public",
table="my_table",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret",
strategy="truncate"
)
bribrain.load_to_postgresql(
df,
database="mydb",
schema="public",
table="my_table",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret",
strategy="drop",
ddl="""
CREATE TABLE public.my_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
value NUMERIC
)
"""
)
postgresql_execute_command() — Menjalankan Query di PostgreSQL
Menjalankan query (non-SELECT) di PostgreSQL.
Parameter:
database | str | required | Target database |
ip | str | required | Target IP |
port | str | required | Target port |
username | str | required | Username |
password | str | required | Password |
query | str | required | Query yang akan dieksekusi |
Contoh Penggunaan:
import bribrain
bribrain.postgresql_execute_command(
database="mydb",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret",
query="UPDATE public.my_table SET status = 'active' WHERE id = 1"
)
postgresql_execute_fetchall() — Query Select di PostgreSQL
Menjalankan query SELECT di PostgreSQL dan mengembalikan hasilnya.
Parameter:
database | str | required | Target database |
ip | str | required | Target IP |
port | str | required | Target port |
username | str | required | Username |
password | str | required | Password |
query | str | required | Query yang akan dieksekusi |
Contoh Penggunaan:
import bribrain
rows = bribrain.postgresql_execute_fetchall(
database="mydb",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret",
query="SELECT * FROM public.my_table LIMIT 10"
)
for row in rows:
print(row)
Return: list — data keluaran dari query
postgresql_drop_table() — Drop Table di PostgreSQL
Menghapus tabel di PostgreSQL.
Contoh Penggunaan:
import bribrain
bribrain.postgresql_drop_table(
database="mydb",
schema="public",
table="my_table",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret"
)
postgresql_truncate_table() — Truncate Table di PostgreSQL
Mengosongkan data tabel di PostgreSQL (tanpa menghapus strukturnya).
Contoh Penggunaan:
import bribrain
bribrain.postgresql_truncate_table(
database="mydb",
schema="public",
table="my_table",
ip="192.168.1.100",
port="5432",
username="admin",
password="secret"
)
🐬 MySQL
load_to_mysql() — Menulis DataFrame ke MySQL
Melakukan penulisan PySpark DataFrame sebagai tabel di MySQL.
Parameter:
df | DataFrame | required | PySpark DataFrame |
schema | str | required | Target schema |
table | str | required | Target table |
ip | str | required | Target IP |
port | str | required | Target port |
username | str | required | Username |
password | str | required | Password |
mode | str | "append" | Mode penulisan (append, overwrite, error, ignore) |
strategy | str | None | Proses sebelum penulisan (truncate, drop, None) |
ddl | str | None | Query CREATE TABLE |
count | bool | True | Menampilkan count data tersimpan |
Contoh Penggunaan:
import bribrain
bribrain.load_to_mysql(
df,
schema="my_database",
table="my_table",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret",
mode="append"
)
bribrain.load_to_mysql(
df,
schema="my_database",
table="my_table",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret",
strategy="truncate"
)
mysql_execute_command() — Menjalankan Query di MySQL
Menjalankan query (non-SELECT) di MySQL.
Contoh Penggunaan:
import bribrain
bribrain.mysql_execute_command(
schema="my_database",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret",
query="UPDATE my_table SET status = 'done' WHERE id = 1"
)
mysql_execute_fetchall() — Query Select di MySQL
Menjalankan query SELECT di MySQL dan mengembalikan hasilnya.
Contoh Penggunaan:
import bribrain
rows = bribrain.mysql_execute_fetchall(
schema="my_database",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret",
query="SELECT * FROM my_table LIMIT 10"
)
for row in rows:
print(row)
Return: list — data keluaran dari query
mysql_drop_table() — Drop Table di MySQL
Menghapus tabel di MySQL.
Contoh Penggunaan:
import bribrain
bribrain.mysql_drop_table(
schema="my_database",
table="my_table",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret"
)
mysql_truncate_table() — Truncate Table di MySQL
Mengosongkan data tabel di MySQL.
Contoh Penggunaan:
import bribrain
bribrain.mysql_truncate_table(
schema="my_database",
table="my_table",
ip="192.168.1.100",
port="3306",
username="admin",
password="secret"
)
5. Metadata — Informasi Environment
Modul untuk mengambil informasi metadata dari environment variable.
get_userid() — Mendapatkan User ID
Mengambil Hadoop user ID dari environment variable HADOOP_USER_NAME.
Contoh Penggunaan:
import bribrain
user_id = bribrain.get_userid()
print(user_id)
Return: str
get_username() — Mendapatkan Username
Menghasilkan nama lengkap berdasarkan Git author email. Nama dibentuk dari bagian local email (sebelum @).
Contoh Penggunaan:
import bribrain
username = bribrain.get_username()
print(username)
Return: str
get_email() — Mendapatkan Email
Mengambil Git author email dari environment variable GIT_AUTHOR_EMAIL.
Contoh Penggunaan:
import bribrain
email = bribrain.get_email()
print(email)
Return: str
get_project_url() — Mendapatkan Project URL
Mengambil URL project dari environment variable CDSW_PROJECT_URL.
import bribrain
url = bribrain.get_project_url()
print(url)
Return: str
get_project_id() — Mendapatkan Project ID
Mengambil ID project dari environment variable CDSW_PROJECT_ID.
import bribrain
project_id = bribrain.get_project_id()
print(project_id)
Return: str
get_project_num() — Mendapatkan Project Number
Mengambil nomor project dari environment variable CDSW_PROJECT_NUM.
import bribrain
project_num = bribrain.get_project_num()
print(project_num)
Return: str
get_project_name() — Mendapatkan Nama Project
Membentuk nama project dengan format yang dapat disesuaikan.
Parameter:
env | str | None | None | Environment (None, "dev", "prod") |
case | str | None | None | Format huruf (None, "lower", "upper") |
delimiter | str | None | "_" | Karakter pemisah (menggantikan -) |
Contoh Penggunaan:
import bribrain
print(bribrain.get_project_name())
print(bribrain.get_project_name(env="dev", case="upper"))
print(bribrain.get_project_name(delimiter=None))
Return: str
get_proxy() — Mendapatkan Konfigurasi Proxy
Mengambil konfigurasi proxy HTTP dan HTTPS dari environment variable.
Contoh Penggunaan:
import bribrain
proxies = bribrain.get_proxy()
print(proxies)
Return: dict
📊 HDFS Table Metadata
get_schema_space() — Mendapatkan Space Quota Schema
Mengambil informasi penggunaan ruang HDFS dari suatu schema.
Parameter:
schema | str | required | Nama schema di Hive/HDFS |
Contoh Penggunaan:
import bribrain
result = bribrain.get_schema_space("my_schema")
print(result)
Return: dict | None
get_table_count_files() — Jumlah File dalam Table
Mengambil jumlah file dari sebuah table di Hive menggunakan Hadoop FileSystem API.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
count = bribrain.get_table_count_files(spark, "dgv", "my_table")
print(count)
Return: int | None
get_table_average_files() — Rata-rata File per Partisi
Mengambil rata-rata jumlah file per partisi. Mendukung nested partition (year/month/day).
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
avg = bribrain.get_table_average_files(spark, "dgv", "my_table")
print(avg)
Return: float | None
get_partition_count_files() — Jumlah File dalam Partisi
Mengambil jumlah file dari sebuah partisi tertentu.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
partition | str | dict | required | Partisi ("ds=202508" atau dict dari get_list_partition) |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
count = bribrain.get_partition_count_files(spark, "dgv", "my_table", "ds=202508")
partitions = bribrain.get_list_partition(spark, "dgv", "my_table")
count = bribrain.get_partition_count_files(spark, "dgv", "my_table", partitions[0])
Return: int | None
get_table_size() — Ukuran Table
Mengambil ukuran file dari sebuah table di Hive (dalam bytes).
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
size = bribrain.get_table_size(spark, "dgv", "my_table")
print(bribrain.get_formatted_bytes(size))
Return: int | None
get_table_average_size() — Rata-rata Ukuran per Partisi
Mengambil rata-rata ukuran (bytes) per partisi. Mendukung nested partition.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
avg_size = bribrain.get_table_average_size(spark, "dgv", "my_table")
print(bribrain.get_formatted_bytes(avg_size))
Return: float | None
get_partition_size() — Ukuran Partisi
Mengambil ukuran file dari sebuah partisi tertentu.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
partition | str | dict | required | Partisi ("ds=202508" atau dict) |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
size = bribrain.get_partition_size(spark, "dgv", "my_table", "ds=202508")
print(bribrain.get_formatted_bytes(size))
Return: int | None
get_table_last_modified() — Waktu Modifikasi Terakhir
Mengambil waktu modifikasi terakhir dari sebuah table.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
dt = bribrain.get_table_last_modified(spark, "dgv", "my_table")
print(dt)
Return: datetime | None
get_table_last_access() — Waktu Akses Terakhir
Mengambil waktu akses terakhir dari sebuah table.
Parameter:
spark | SparkSession | required | Spark session |
schema | str | required | Nama schema |
table | str | required | Nama table |
Contoh Penggunaan:
import bribrain
spark = bribrain.sparkSession()
dt = bribrain.get_table_last_access(spark, "dgv", "my_table")
print(dt)
Return: datetime | None
6. Notification — Notifikasi Telegram
Modul untuk mengirim pesan, gambar, dan file ke Telegram via Bot API.
send_message_telegram() — Kirim Pesan Teks
Mengirim pesan teks ke Telegram. Otomatis membagi pesan panjang menjadi beberapa bagian jika melebihi batas 4096 karakter.
Parameter:
message | str | required | Pesan yang akan dikirim |
bot_token | str | required | Token bot Telegram |
chat_id | str | int | required | Chat ID tujuan |
parse_mode | str | "HTML" | Mode parsing (HTML, Markdown) |
disable_web_page_preview | bool | True | Nonaktifkan preview link |
proxies | dict | None | Konfigurasi proxy |
timeout | int | 20 | Request timeout (detik) |
max_len | int | 4096 | Panjang maksimum per pesan |
Contoh Penggunaan:
import bribrain
BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"
bribrain.send_message_telegram(
message="✅ Proses ETL selesai!",
bot_token=BOT_TOKEN,
chat_id=CHAT_ID
)
bribrain.send_message_telegram(
message="<b>Status:</b> Sukses\n<i>Durasi:</i> 5 menit",
bot_token=BOT_TOKEN,
chat_id=CHAT_ID
)
proxies = bribrain.get_proxy()
bribrain.send_message_telegram(
message="Pesan dengan proxy",
bot_token=BOT_TOKEN,
chat_id=CHAT_ID,
proxies=proxies
)
send_image_telegram() — Kirim Gambar
Mengirim gambar ke Telegram. Mendukung pengiriman dari file lokal maupun gambar hasil generate di memory (bytes / BytesIO).
Parameter:
bot_token | str | required | Token bot Telegram |
chat_id | str | int | required | Chat ID tujuan |
image | str | bytes | BytesIO | required | Path file gambar atau objek gambar |
caption | str | None | Caption gambar |
parse_mode | str | "HTML" | Mode parsing |
filename | str | "image.png" | Nama file gambar |
proxies | dict | None | Konfigurasi proxy |
timeout | int | 60 | Request timeout (detik) |
Contoh Penggunaan:
import bribrain
BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"
bribrain.send_image_telegram(
bot_token=BOT_TOKEN,
chat_id=CHAT_ID,
image="resources/chart.png",
caption="📊 Grafik penjualan bulan ini"
)
import io
img_buffer = io.BytesIO(image_bytes)
bribrain.send_image_telegram(
bot_token=BOT_TOKEN,
chat_id=CHAT_ID,
image=img_buffer,
caption="Generated chart"
)
send_file_telegram() — Kirim File/Dokumen
Mengirim file atau dokumen ke Telegram. Cocok untuk mengirim report, log, CSV, Excel, atau file hasil generate lainnya.
Parameter:
bot_token | str | required | Token bot Telegram |
chat_id | str | int | required | Chat ID tujuan |
file_obj | str | bytes | BytesIO | required | Path file atau objek file |
filename | str | required | Nama file |
caption | str | None | Caption file |
parse_mode | str | "HTML" | Mode parsing |
proxies | dict | None | Konfigurasi proxy |
timeout | int | 120 | Request timeout (detik) |
Contoh Penggunaan:
import bribrain
BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"
bribrain.send_file_telegram(
bot_token=BOT_TOKEN,
chat_id=CHAT_ID,
file_obj="resources/excel/report.xlsx",
filename="report.xlsx",
caption="📄 Laporan bulanan"
)
bribrain.send_file_telegram(
bot_token=BOT_TOKEN,
chat_id=CHAT_ID,
file_obj="resources/csv/data.csv",
filename="data.csv"
)
📦 Dependencies
mysql-connector-python | 8.0.28 |
psycopg2 | latest |
openpyxl | latest |
pyspark | (environment) |
pandas | (environment) |
requests | (environment) |
👨💻 Author
Andri Ariyanto
📄 License
Copyright © Andri Ariyanto — AI Department - BRI