New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

bribrain

Package Overview
Dependencies
Maintainers
1
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bribrain

BRIBrain Standardization Code

pipPyPI
Version
1.3.6
Maintainers
1

📦 BRIBrain

Python Version PyPI version License

BRIBrain adalah Python package standar yang digunakan sebagai library untuk proses data engineering di Departemen BRIBrain. Package ini menyediakan berbagai modul untuk konfigurasi Spark, ingestion data, penulisan data ke berbagai target (Hive, PostgreSQL, MySQL, HDFS, Excel), utility functions, metadata, dan notifikasi via Telegram.

📋 Daftar Isi

🚀 Instalasi

Gunakan package manager pip untuk menginstal bribrain.

pip install bribrain

⚡ Quick Start

import bribrain

# Membuat Spark Session
spark = bribrain.sparkSession()

# Mendapatkan list partisi dari tabel Hive
partitions = bribrain.get_list_partition(spark, "schema_name", "table_name")

# Menulis DataFrame ke Hive
bribrain.load_to_hive(spark, df, schema="my_schema", table="my_table")

📚 Modul

1. Config — Konfigurasi Spark Session

Modul untuk membuat dan mengkonfigurasi Spark Session.

sparkSession() — Membuat Spark Session

Membuat spark session yang dapat digunakan untuk proses data engineering.

Parameter:

ParameterTipeDefaultDeskripsi
appnamestr"Bribrain Packages SparkSession"Nama dari Spark Session
executorstr"small"Standard resource (small, medium, high, custom)
instancesint2Jumlah instance Spark Session
coresint2Jumlah cores Spark Session
memoryint4Memory Spark Session (GB)
overheadint2Overhead memory Spark Session (GB)
verboseboolFalseMenampilkan console progress
extra_configsdictNoneAdditional Spark config

Preset Executor:

ExecutorInstancesCoresMemoryOverhead
small224 GB2 GB
medium446 GB2 GB
high668 GB4 GB
customsesuai inputsesuai inputsesuai inputsesuai input

Contoh Penggunaan:

import bribrain

# Menggunakan preset small (default)
spark = bribrain.sparkSession()

# Menggunakan preset medium
spark = bribrain.sparkSession(executor="medium")

# Menggunakan custom config
spark = bribrain.sparkSession(
    appname="My App",
    executor="custom",
    instances=4,
    cores=4,
    memory=8,
    overhead=4
)

# Menambahkan extra Spark config
spark = bribrain.sparkSession(
    extra_configs={
        "spark.sql.shuffle.partitions": "200",
        "spark.sql.adaptive.enabled": "true"
    }
)

Return: pyspark.sql.session.SparkSession

template() — Membuat Template Foldering

Membuat template foldering yang digunakan sebagai standard untuk proses data engineering.

Contoh Penggunaan:

import bribrain

bribrain.template()

⚠️ Fungsi ini akan mengunduh template dari HDFS dan mengekstraknya ke /home/cdsw/.

2. Function — Utility Functions

Modul yang berisi berbagai fungsi utilitas untuk mendukung proses data engineering.

try_or() — Menangkap Error

Menangkap error dari suatu fungsi dan memberikan keluaran alternatif.

Parameter:

ParameterTipeDefaultDeskripsi
funcfunctionrequiredPython function (tambahkan lambda sebelum nama function)
defaultobjectNoneNilai keluaran jika terjadi error
expected_excException(Exception,)Tipe exception yang diharapkan

Contoh Penggunaan:

import bribrain

# Menangkap error dan mengembalikan default value
result = bribrain.try_or(lambda: int("abc"), default=0)
print(result)  # Output: 0

# Tanpa error
result = bribrain.try_or(lambda: int("123"), default=0)
print(result)  # Output: 123

Return: object — keluaran dari function atau default value jika terjadi error

set_timer() — Menetapkan Waktu Awal

Menetapkan waktu awal untuk penghitungan durasi proses.

Contoh Penggunaan:

import bribrain

bribrain.set_timer()

Return: float — waktu dalam format float

get_timer() — Mendapatkan Durasi Proses

Mendapatkan durasi proses berdasarkan waktu awal dikurangi waktu sekarang.

Parameter:

ParameterTipeDefaultDeskripsi
start_timefloatNoneTimestamp waktu awal (jika None, menggunakan set_timer())

Contoh Penggunaan:

import bribrain

bribrain.set_timer()

# ... proses yang berjalan ...

duration = bribrain.get_timer()
print(duration)  # Output: "00:05:30"

Return: string — durasi proses dengan format HH:MM:SS

show_time() — Menampilkan Waktu Saat Ini

Menampilkan waktu saat ini sesuai format yang diinginkan.

Parameter:

ParameterTipeDefaultDeskripsi
format_timestr"%Y-%m-%d %H:%M:%S"Format waktu

Contoh Penggunaan:

import bribrain

print(bribrain.show_time())
# Output: "2026-02-12 09:30:00"

print(bribrain.show_time("%d/%m/%Y"))
# Output: "12/02/2026"

Return: string — waktu saat ini sesuai format input

remove_blank_space() — Menghapus Spasi Kosong pada Kolom

Menghapus spasi kosong pada kolom tertentu. Jika kolom hanya berisi spasi, akan diubah menjadi None.

Parameter:

ParameterTipeDefaultDeskripsi
xstrrequiredNama kolom yang akan dibersihkan

Contoh Penggunaan:

import bribrain
from pyspark.sql import functions as F

df = df.withColumn("nama", bribrain.remove_blank_space("nama"))
df.show()

Return: pyspark.sql.functions.Column — kolom dengan spasi kosong dihapus

cleanse_blank_space() — Membersihkan Spasi pada Semua Kolom String

Membersihkan spasi kosong pada semua kolom bertipe string dalam DataFrame secara otomatis.

Parameter:

ParameterTipeDefaultDeskripsi
df_to_cleanpyspark.sql.DataFramerequiredDataFrame yang akan dibersihkan

Contoh Penggunaan:

import bribrain

# Sebelum: kolom string mungkin berisi "  " atau "  data  "
df_clean = bribrain.cleanse_blank_space(df)
df_clean.show()
# Sesudah: spasi di-trim, kolom yang hanya spasi menjadi None

Return: pyspark.sql.DataFrame — DataFrame dengan spasi kosong dihapus pada semua kolom string

lower_columns_name() — Mengubah Nama Kolom Menjadi Huruf Kecil

Mengubah semua nama kolom dalam DataFrame menjadi huruf kecil.

Parameter:

ParameterTipeDefaultDeskripsi
dfpyspark.sql.DataFramerequiredDataFrame yang akan diubah

Contoh Penggunaan:

import bribrain

# Sebelum: kolom = ["Nama", "UMUR", "Alamat"]
df = bribrain.lower_columns_name(df)
print(df.columns)
# Sesudah: kolom = ["nama", "umur", "alamat"]

Return: pyspark.sql.DataFrame — DataFrame dengan nama kolom huruf kecil

pandas_to_spark() — Konversi Pandas ke Spark DataFrame

Mengkonversi pandas DataFrame menjadi Spark DataFrame dengan schema otomatis.

Parameter:

ParameterTipeDefaultDeskripsi
pdfpandas.DataFramerequiredDataFrame pandas yang akan dikonversi
tz_to_utcboolTrueKonversi kolom datetime yang memiliki timezone ke UTC

Contoh Penggunaan:

import bribrain
import pandas as pd

pdf = pd.DataFrame({
    "nama": ["Andri", "Budi"],
    "umur": [28, 30]
})

spark_df = bribrain.pandas_to_spark(pdf)
spark_df.show()

Return: pyspark.sql.DataFrame

get_list_partition() — Mendapatkan List Partisi

Memperoleh list partisi dari hive table yang diurutkan dari partisi terbaru.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema dari table di Hive
tablestrrequiredNama table di Hive

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
partitions = bribrain.get_list_partition(spark, "my_schema", "my_table")
print(partitions)
# Output: [{'year': '2026', 'month': '02'}, {'year': '2026', 'month': '01'}, ...]

Return: list — list partisi diurutkan dari partisi terbaru

get_first_partition() — Mendapatkan Partisi Pertama

Memperoleh partisi pertama dari hive table.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema dari table di Hive
tablestrrequiredNama table di Hive

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
first = bribrain.get_first_partition(spark, "my_schema", "my_table")
print(first)
# Output: {'year': '2024', 'month': '01'}

Return: dict — partisi pertama

get_last_partition() — Mendapatkan Partisi Terakhir

Memperoleh partisi terakhir dari hive table.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema dari table di Hive
tablestrrequiredNama table di Hive

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
last = bribrain.get_last_partition(spark, "my_schema", "my_table")
print(last)
# Output: {'year': '2026', 'month': '02'}

Return: dict — partisi terakhir

check_partition_exists() — Mengecek Keberadaan Partisi

Mengecek keberadaan partisi pada tabel HDFS dan mengembalikan metadata partisi terkait.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
target_valuestrrequiredNilai partisi yang dicari
schemastrrequiredNama schema dari table di Hive
tablestrrequiredNama table di Hive
modestrNoneMode pengecekan

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
result = bribrain.check_partition_exists(spark, "2026-02", "my_schema", "my_table")

if result:
    print("Partisi ditemukan:", result)
else:
    print("Partisi tidak ditemukan")

Return: dict | None — detail kolom partisi atau None jika tidak ditemukan

put_to_bridrive() — Upload File ke BRIDrive

Mengirimkan file dari CDSW ke BRIDrive (Seafile).

Parameter:

ParameterTipeDefaultDeskripsi
file_pathstrrequiredPath file yang akan di upload
upload_urlstrrequiredUpload link dari Seafile (BRIDrive)

Contoh Penggunaan:

import bribrain

bribrain.put_to_bridrive(
    file_path="resources/excel/report.xlsx",
    upload_url="https://bridrive.example.com/upload-link"
)
put_to_hdfs() — Upload File ke HDFS

Mengirimkan file dari CDSW ke HDFS.

Parameter:

ParameterTipeDefaultDeskripsi
file_pathstrrequiredPath file yang akan di upload
hdfs_pathstr"/tmp/development/bribrain"Path lokasi penyimpanan di HDFS

Contoh Penggunaan:

import bribrain

bribrain.put_to_hdfs("resources/csv/data.csv")

# Dengan custom path
bribrain.put_to_hdfs("resources/csv/data.csv", hdfs_path="/tmp/my_project")
get_from_hdfs() — Download File dari HDFS

Mengambil file dari HDFS ke CDSW.

Parameter:

ParameterTipeDefaultDeskripsi
file_pathstrrequiredPath file di HDFS yang akan dipindahkan
cdsw_pathstr"/home/cdsw/"Path penyimpanan file di CDSW

Contoh Penggunaan:

import bribrain

bribrain.get_from_hdfs("/tmp/development/bribrain/data.csv")

# Dengan custom CDSW path
bribrain.get_from_hdfs("/tmp/data.csv", cdsw_path="/home/cdsw/data/")
list_file_hdfs() — Menampilkan List File di HDFS

Memperlihatkan list file yang ada pada lokasi HDFS.

Parameter:

ParameterTipeDefaultDeskripsi
hdfs_pathstr"/user/{username}/"Path HDFS yang ingin ditampilkan

Contoh Penggunaan:

import bribrain

# Menampilkan file di direktori user default
bribrain.list_file_hdfs()

# Menampilkan file di path tertentu
bribrain.list_file_hdfs("/tmp/development/bribrain")
get_formatted_bytes() — Format Ukuran File

Mengonversi ukuran file dari bytes ke format yang mudah dibaca (PB, TB, GB, MB, KB, B).

Parameter:

ParameterTipeDefaultDeskripsi
sizeintrequiredUkuran file dalam bytes

Contoh Penggunaan:

import bribrain

print(bribrain.get_formatted_bytes(1073741824))  # Output: "1.00 GB"
print(bribrain.get_formatted_bytes(5242880))      # Output: "5.00 MB"

Return: str

get_list_table() — Mendapatkan List Table

Memperoleh list table dari schema di Hive.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema di Hive
limitintNoneJumlah maksimum table
sortbool | NoneTrueTrue = A-Z, False = Z-A, None = tanpa urutan

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
tables = bribrain.get_list_table(spark, "my_schema")
print(tables)  # Output: ['table_a', 'table_b', 'table_c']

# Ambil 5 table teratas
tables = bribrain.get_list_table(spark, "my_schema", limit=5)

Return: list | None

get_list_tables_with_metadata() — Mendapatkan List Table dengan Metadata

Memperoleh list table beserta metadata (ukuran, owner, tanggal modifikasi, dll) dari schema di HDFS.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema di Hive
ownerstr | NoneNoneFilter berdasarkan owner (skip perhitungan size untuk table lain)
sortstr | None"oldest"Urutan: "oldest", "newest", "biggest", "smallest", None

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()

# List semua table, urutkan dari terbesar
result = bribrain.get_list_tables_with_metadata(spark, "dgv", sort="biggest")
for r in result:
    print(r['table'], r['size_formatted'], r['owner'])

# Filter hanya table milik owner tertentu
result = bribrain.get_list_tables_with_metadata(spark, "dgv", owner="infabri")

Return: list[dict] — setiap dict berisi: table, owner, size, size_formatted, space_consumed, space_consumed_formatted, directory_count, file_count, modified_date, modified_date_formatted, path

drop_hive_table() — Menghapus Table dari Hive

Menghapus satu atau beberapa table dari Hive.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema di Hive
tablestr | list[str]requiredNama table atau list table yang akan dihapus
verboseboolTrueMenampilkan informasi setiap table yang dihapus

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()

# Hapus satu table
bribrain.drop_hive_table(spark, "my_schema", "old_table")

# Hapus beberapa table sekaligus
bribrain.drop_hive_table(spark, "my_schema", ["table_a", "table_b", "table_c"])

Return: list[str] | None — list table yang berhasil dihapus

3. Ingestion — Standard Ingestion ke Hive

Modul orchestrator untuk proses standard ingestion data ke Hive.

ingest_to_hive() — Orchestrator Ingestion

Orchestrator untuk proses standard ingestion data ke Hive. Mengatur seluruh alur ingestion mulai dari validasi, penambahan kolom partisi, penulisan data, hingga pencatatan metadata.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
paramsdictrequiredParameter pendukung dalam bentuk dictionary

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()

params = {
    "schema": "my_schema",
    "table": "my_table",
    "partition": "year|month",
    "mode": "append",
    # parameter lainnya sesuai kebutuhan
}

result = bribrain.ingest_to_hive(spark, params)

Return: dict — parameter hasil yang disimpan dalam bentuk dictionary

4. Load — Penulisan Data ke Berbagai Target

Modul untuk menulis/menyimpan data ke berbagai target: Hive, HDFS (CSV/Parquet/JSON/Excel), PostgreSQL, dan MySQL.

📂 Hive

load_to_hive() — Menulis DataFrame ke Hive

Melakukan penulisan PySpark DataFrame sebagai tabel Hive di HDFS.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
dfDataFramerequiredPySpark DataFrame
schemastrrequiredTarget schema di HDFS
tablestrrequiredTarget table di HDFS
partitionstrNoneKolom partisi (pisahkan dengan | jika lebih dari satu)
modestr"append"Mode penulisan (append, overwrite, error, ignore)
repartition_numberint | NoneNoneJumlah file parquet per partisi
blocksizeint | None256Ukuran per file dalam MB (64, 128, 256, 512, None)
modified_dateboolTrueMenambahkan kolom modified_date
schema_tempstr"sharingvision"Schema untuk temp table
countboolFalseMenampilkan count data tersimpan

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()

# Penulisan sederhana (append)
bribrain.load_to_hive(spark, df, schema="my_schema", table="my_table")

# Penulisan dengan partisi
bribrain.load_to_hive(
    spark, df,
    schema="my_schema",
    table="my_table",
    partition="year|month",
    mode="overwrite",
    count=True
)

# Penulisan tanpa modified_date
bribrain.load_to_hive(
    spark, df,
    schema="my_schema",
    table="my_table",
    modified_date=False,
    blocksize=128
)

Return: int | None — total record jika count=True, None jika tidak

📂 HDFS File

load_to_file() — Menulis DataFrame ke HDFS (CSV/Parquet/JSON/Excel)

Melakukan penulisan PySpark DataFrame ke HDFS dengan format file tertentu.

Parameter:

ParameterTipeDefaultDeskripsi
dfDataFramerequiredPySpark DataFrame
file_namestrrequiredHDFS path / nama file (tanpa ekstensi)
file_typestr"csv"Tipe file (csv, parquet, json, excel)
repartition_numberint1Jumlah file hasil (termasuk Excel)
delimiterstr","Pemisah antar kolom (untuk CSV)
downloadboolTrueDownload ke local CML
delete_hdfs_fileboolTrueHapus file di HDFS setelah download

Contoh Penggunaan:

import bribrain

# Menulis ke CSV
bribrain.load_to_file(df, "my_report", file_type="csv")

# Menulis ke Excel (single file)
bribrain.load_to_file(df, "my_report", file_type="excel")

# Menulis ke Excel (3 file)
bribrain.load_to_file(df, "my_report", file_type="excel", repartition_number=3)

# Menulis ke Parquet
bribrain.load_to_file(df, "my_data", file_type="parquet")

# Tanpa download ke local
bribrain.load_to_file(df, "my_report", file_type="csv", download=False)
download_hdfs_file() — Download File dari HDFS

Mengunduh semua file partisi HDFS dari direktori user ke lokal dan melakukan pembersihan opsional.

Parameter:

ParameterTipeDefaultDeskripsi
file_namestrrequiredNama file/direktori target di HDFS
file_typestrrequiredTipe file (csv, parquet, json, excel)
delete_hdfs_fileboolFalseHapus direktori di HDFS setelah download

Contoh Penggunaan:

from bribrain.load import download_hdfs_file

download_hdfs_file("my_report", "csv", delete_hdfs_file=True)

Return: list[str] — daftar path file lokal yang berhasil diunduh

load_to_excel() — Menulis DataFrame ke Excel (Multi-Sheet)

Melakukan penulisan PySpark DataFrame ke HDFS dengan format Excel. Mendukung multi-sheet — Anda dapat menulis beberapa DataFrame sekaligus sebagai sheet terpisah dalam satu file Excel.

Parameter:

ParameterTipeDefaultDeskripsi
dfDataFrame | list[DataFrame]requiredSatu DataFrame atau list DataFrame
file_namestrrequiredHDFS path / nama file (tanpa ekstensi)
sheet_namestr | list[str] | NoneNoneNama sheet atau list nama sheet
repartition_numberint1Jumlah file excel hasil
downloadboolTrueDownload ke local CML
delete_hdfs_fileboolTrueHapus file di HDFS setelah download

Aturan sheet_name:

  • None → sheet diberi nama otomatis: Sheet1, Sheet2, dst.
  • str → hanya berlaku untuk satu DataFrame
  • list[str] → jumlah harus sama dengan jumlah DataFrame

Contoh Penggunaan:

import bribrain

# Single DataFrame (backward compatible)
bribrain.load_to_excel(df, "my_report")

# Single DataFrame dengan custom sheet name
bribrain.load_to_excel(df, "my_report", sheet_name="Summary")

# Multiple DataFrame dengan custom sheet name
bribrain.load_to_excel(
    [df_revenue, df_cost, df_profit],
    "financial_report",
    sheet_name=["Revenue", "Cost", "Profit"]
)

# Multiple DataFrame dengan auto naming
bribrain.load_to_excel(
    [df1, df2],
    "my_report"
)
# Hasil sheet: Sheet1, Sheet2

🐘 PostgreSQL

load_to_postgresql() — Menulis DataFrame ke PostgreSQL

Melakukan penulisan PySpark DataFrame sebagai tabel di PostgreSQL.

Parameter:

ParameterTipeDefaultDeskripsi
dfDataFramerequiredPySpark DataFrame
databasestrrequiredTarget database
schemastrrequiredTarget schema
tablestrrequiredTarget table
ipstrrequiredTarget IP
portstrrequiredTarget port
usernamestrrequiredUsername
passwordstrrequiredPassword
modestr"append"Mode penulisan (append, overwrite, error, ignore)
strategystrNoneProses sebelum penulisan (truncate, drop, None)
ddlstrNoneQuery CREATE TABLE
countboolTrueMenampilkan count data tersimpan

Contoh Penggunaan:

import bribrain

# Append data
bribrain.load_to_postgresql(
    df,
    database="mydb",
    schema="public",
    table="my_table",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret"
)

# Truncate lalu insert
bribrain.load_to_postgresql(
    df,
    database="mydb",
    schema="public",
    table="my_table",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret",
    strategy="truncate"
)

# Drop lalu create ulang dengan DDL
bribrain.load_to_postgresql(
    df,
    database="mydb",
    schema="public",
    table="my_table",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret",
    strategy="drop",
    ddl="""
        CREATE TABLE public.my_table (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            value NUMERIC
        )
    """
)
postgresql_execute_command() — Menjalankan Query di PostgreSQL

Menjalankan query (non-SELECT) di PostgreSQL.

Parameter:

ParameterTipeDefaultDeskripsi
databasestrrequiredTarget database
ipstrrequiredTarget IP
portstrrequiredTarget port
usernamestrrequiredUsername
passwordstrrequiredPassword
querystrrequiredQuery yang akan dieksekusi

Contoh Penggunaan:

import bribrain

bribrain.postgresql_execute_command(
    database="mydb",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret",
    query="UPDATE public.my_table SET status = 'active' WHERE id = 1"
)
postgresql_execute_fetchall() — Query Select di PostgreSQL

Menjalankan query SELECT di PostgreSQL dan mengembalikan hasilnya.

Parameter:

ParameterTipeDefaultDeskripsi
databasestrrequiredTarget database
ipstrrequiredTarget IP
portstrrequiredTarget port
usernamestrrequiredUsername
passwordstrrequiredPassword
querystrrequiredQuery yang akan dieksekusi

Contoh Penggunaan:

import bribrain

rows = bribrain.postgresql_execute_fetchall(
    database="mydb",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret",
    query="SELECT * FROM public.my_table LIMIT 10"
)

for row in rows:
    print(row)

Return: list — data keluaran dari query

postgresql_drop_table() — Drop Table di PostgreSQL

Menghapus tabel di PostgreSQL.

Contoh Penggunaan:

import bribrain

bribrain.postgresql_drop_table(
    database="mydb",
    schema="public",
    table="my_table",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret"
)
postgresql_truncate_table() — Truncate Table di PostgreSQL

Mengosongkan data tabel di PostgreSQL (tanpa menghapus strukturnya).

Contoh Penggunaan:

import bribrain

bribrain.postgresql_truncate_table(
    database="mydb",
    schema="public",
    table="my_table",
    ip="192.168.1.100",
    port="5432",
    username="admin",
    password="secret"
)

🐬 MySQL

load_to_mysql() — Menulis DataFrame ke MySQL

Melakukan penulisan PySpark DataFrame sebagai tabel di MySQL.

Parameter:

ParameterTipeDefaultDeskripsi
dfDataFramerequiredPySpark DataFrame
schemastrrequiredTarget schema
tablestrrequiredTarget table
ipstrrequiredTarget IP
portstrrequiredTarget port
usernamestrrequiredUsername
passwordstrrequiredPassword
modestr"append"Mode penulisan (append, overwrite, error, ignore)
strategystrNoneProses sebelum penulisan (truncate, drop, None)
ddlstrNoneQuery CREATE TABLE
countboolTrueMenampilkan count data tersimpan

Contoh Penggunaan:

import bribrain

bribrain.load_to_mysql(
    df,
    schema="my_database",
    table="my_table",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret",
    mode="append"
)

# Dengan strategy truncate
bribrain.load_to_mysql(
    df,
    schema="my_database",
    table="my_table",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret",
    strategy="truncate"
)
mysql_execute_command() — Menjalankan Query di MySQL

Menjalankan query (non-SELECT) di MySQL.

Contoh Penggunaan:

import bribrain

bribrain.mysql_execute_command(
    schema="my_database",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret",
    query="UPDATE my_table SET status = 'done' WHERE id = 1"
)
mysql_execute_fetchall() — Query Select di MySQL

Menjalankan query SELECT di MySQL dan mengembalikan hasilnya.

Contoh Penggunaan:

import bribrain

rows = bribrain.mysql_execute_fetchall(
    schema="my_database",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret",
    query="SELECT * FROM my_table LIMIT 10"
)

for row in rows:
    print(row)

Return: list — data keluaran dari query

mysql_drop_table() — Drop Table di MySQL

Menghapus tabel di MySQL.

Contoh Penggunaan:

import bribrain

bribrain.mysql_drop_table(
    schema="my_database",
    table="my_table",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret"
)
mysql_truncate_table() — Truncate Table di MySQL

Mengosongkan data tabel di MySQL.

Contoh Penggunaan:

import bribrain

bribrain.mysql_truncate_table(
    schema="my_database",
    table="my_table",
    ip="192.168.1.100",
    port="3306",
    username="admin",
    password="secret"
)

5. Metadata — Informasi Environment

Modul untuk mengambil informasi metadata dari environment variable.

get_userid() — Mendapatkan User ID

Mengambil Hadoop user ID dari environment variable HADOOP_USER_NAME.

Contoh Penggunaan:

import bribrain

user_id = bribrain.get_userid()
print(user_id)  # Output: "bribrain_sac"

Return: str

get_username() — Mendapatkan Username

Menghasilkan nama lengkap berdasarkan Git author email. Nama dibentuk dari bagian local email (sebelum @).

Contoh Penggunaan:

import bribrain

username = bribrain.get_username()
print(username)  # Output: "Andri Ariyanto"

Return: str

get_email() — Mendapatkan Email

Mengambil Git author email dari environment variable GIT_AUTHOR_EMAIL.

Contoh Penggunaan:

import bribrain

email = bribrain.get_email()
print(email)  # Output: "user@example.com"

Return: str

get_project_url() — Mendapatkan Project URL

Mengambil URL project dari environment variable CDSW_PROJECT_URL.

import bribrain

url = bribrain.get_project_url()
print(url)

Return: str

get_project_id() — Mendapatkan Project ID

Mengambil ID project dari environment variable CDSW_PROJECT_ID.

import bribrain

project_id = bribrain.get_project_id()
print(project_id)

Return: str

get_project_num() — Mendapatkan Project Number

Mengambil nomor project dari environment variable CDSW_PROJECT_NUM.

import bribrain

project_num = bribrain.get_project_num()
print(project_num)

Return: str

get_project_name() — Mendapatkan Nama Project

Membentuk nama project dengan format yang dapat disesuaikan.

Parameter:

ParameterTipeDefaultDeskripsi
envstr | NoneNoneEnvironment (None, "dev", "prod")
casestr | NoneNoneFormat huruf (None, "lower", "upper")
delimiterstr | None"_"Karakter pemisah (menggantikan -)

Contoh Penggunaan:

import bribrain

# Default
print(bribrain.get_project_name())
# Output: "bribrain_pipeline"

# Dengan environment dan uppercase
print(bribrain.get_project_name(env="dev", case="upper"))
# Output: "DEV_BRIBRAIN_PIPELINE"

# Mempertahankan delimiter asli
print(bribrain.get_project_name(delimiter=None))
# Output: "dev-bribrain-pipeline"

Return: str

get_proxy() — Mendapatkan Konfigurasi Proxy

Mengambil konfigurasi proxy HTTP dan HTTPS dari environment variable.

Contoh Penggunaan:

import bribrain

proxies = bribrain.get_proxy()
print(proxies)
# Output: {"http": "http://proxy.company.com:8080", "https": "https://proxy.company.com:8080"}

Return: dict

📊 HDFS Table Metadata

get_schema_space() — Mendapatkan Space Quota Schema

Mengambil informasi penggunaan ruang HDFS dari suatu schema.

Parameter:

ParameterTipeDefaultDeskripsi
schemastrrequiredNama schema di Hive/HDFS

Contoh Penggunaan:

import bribrain

result = bribrain.get_schema_space("my_schema")
print(result)
# Output: {'schema': 'my_schema', 'space_quota': 1099511627776.0, ...}

Return: dict | None

get_table_count_files() — Jumlah File dalam Table

Mengambil jumlah file dari sebuah table di Hive menggunakan Hadoop FileSystem API.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
count = bribrain.get_table_count_files(spark, "dgv", "my_table")
print(count)  # Output: 42

Return: int | None

get_table_average_files() — Rata-rata File per Partisi

Mengambil rata-rata jumlah file per partisi. Mendukung nested partition (year/month/day).

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
avg = bribrain.get_table_average_files(spark, "dgv", "my_table")
print(avg)  # Output: 3.5

Return: float | None

get_partition_count_files() — Jumlah File dalam Partisi

Mengambil jumlah file dari sebuah partisi tertentu.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table
partitionstr | dictrequiredPartisi ("ds=202508" atau dict dari get_list_partition)

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()

# Dengan string
count = bribrain.get_partition_count_files(spark, "dgv", "my_table", "ds=202508")

# Dengan dict
partitions = bribrain.get_list_partition(spark, "dgv", "my_table")
count = bribrain.get_partition_count_files(spark, "dgv", "my_table", partitions[0])

Return: int | None

get_table_size() — Ukuran Table

Mengambil ukuran file dari sebuah table di Hive (dalam bytes).

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
size = bribrain.get_table_size(spark, "dgv", "my_table")
print(bribrain.get_formatted_bytes(size))  # Output: "2.50 GB"

Return: int | None

get_table_average_size() — Rata-rata Ukuran per Partisi

Mengambil rata-rata ukuran (bytes) per partisi. Mendukung nested partition.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
avg_size = bribrain.get_table_average_size(spark, "dgv", "my_table")
print(bribrain.get_formatted_bytes(avg_size))  # Output: "128.00 MB"

Return: float | None

get_partition_size() — Ukuran Partisi

Mengambil ukuran file dari sebuah partisi tertentu.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table
partitionstr | dictrequiredPartisi ("ds=202508" atau dict)

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
size = bribrain.get_partition_size(spark, "dgv", "my_table", "ds=202508")
print(bribrain.get_formatted_bytes(size))  # Output: "256.00 MB"

Return: int | None

get_table_last_modified() — Waktu Modifikasi Terakhir

Mengambil waktu modifikasi terakhir dari sebuah table.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
dt = bribrain.get_table_last_modified(spark, "dgv", "my_table")
print(dt)  # Output: 2026-02-28 14:30:00

Return: datetime | None

get_table_last_access() — Waktu Akses Terakhir

Mengambil waktu akses terakhir dari sebuah table.

Parameter:

ParameterTipeDefaultDeskripsi
sparkSparkSessionrequiredSpark session
schemastrrequiredNama schema
tablestrrequiredNama table

Contoh Penggunaan:

import bribrain

spark = bribrain.sparkSession()
dt = bribrain.get_table_last_access(spark, "dgv", "my_table")
print(dt)  # Output: 2026-03-01 09:15:00

Return: datetime | None

6. Notification — Notifikasi Telegram

Modul untuk mengirim pesan, gambar, dan file ke Telegram via Bot API.

send_message_telegram() — Kirim Pesan Teks

Mengirim pesan teks ke Telegram. Otomatis membagi pesan panjang menjadi beberapa bagian jika melebihi batas 4096 karakter.

Parameter:

ParameterTipeDefaultDeskripsi
messagestrrequiredPesan yang akan dikirim
bot_tokenstrrequiredToken bot Telegram
chat_idstr | intrequiredChat ID tujuan
parse_modestr"HTML"Mode parsing (HTML, Markdown)
disable_web_page_previewboolTrueNonaktifkan preview link
proxiesdictNoneKonfigurasi proxy
timeoutint20Request timeout (detik)
max_lenint4096Panjang maksimum per pesan

Contoh Penggunaan:

import bribrain

BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"

# Kirim pesan sederhana
bribrain.send_message_telegram(
    message="✅ Proses ETL selesai!",
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID
)

# Kirim pesan dengan HTML formatting
bribrain.send_message_telegram(
    message="<b>Status:</b> Sukses\n<i>Durasi:</i> 5 menit",
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID
)

# Menggunakan proxy
proxies = bribrain.get_proxy()
bribrain.send_message_telegram(
    message="Pesan dengan proxy",
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID,
    proxies=proxies
)
send_image_telegram() — Kirim Gambar

Mengirim gambar ke Telegram. Mendukung pengiriman dari file lokal maupun gambar hasil generate di memory (bytes / BytesIO).

Parameter:

ParameterTipeDefaultDeskripsi
bot_tokenstrrequiredToken bot Telegram
chat_idstr | intrequiredChat ID tujuan
imagestr | bytes | BytesIOrequiredPath file gambar atau objek gambar
captionstrNoneCaption gambar
parse_modestr"HTML"Mode parsing
filenamestr"image.png"Nama file gambar
proxiesdictNoneKonfigurasi proxy
timeoutint60Request timeout (detik)

Contoh Penggunaan:

import bribrain

BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"

# Kirim gambar dari file lokal
bribrain.send_image_telegram(
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID,
    image="resources/chart.png",
    caption="📊 Grafik penjualan bulan ini"
)

# Kirim gambar dari BytesIO
import io
img_buffer = io.BytesIO(image_bytes)
bribrain.send_image_telegram(
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID,
    image=img_buffer,
    caption="Generated chart"
)
send_file_telegram() — Kirim File/Dokumen

Mengirim file atau dokumen ke Telegram. Cocok untuk mengirim report, log, CSV, Excel, atau file hasil generate lainnya.

Parameter:

ParameterTipeDefaultDeskripsi
bot_tokenstrrequiredToken bot Telegram
chat_idstr | intrequiredChat ID tujuan
file_objstr | bytes | BytesIOrequiredPath file atau objek file
filenamestrrequiredNama file
captionstrNoneCaption file
parse_modestr"HTML"Mode parsing
proxiesdictNoneKonfigurasi proxy
timeoutint120Request timeout (detik)

Contoh Penggunaan:

import bribrain

BOT_TOKEN = "your-bot-token"
CHAT_ID = "-100123456789"

# Kirim file Excel
bribrain.send_file_telegram(
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID,
    file_obj="resources/excel/report.xlsx",
    filename="report.xlsx",
    caption="📄 Laporan bulanan"
)

# Kirim file CSV
bribrain.send_file_telegram(
    bot_token=BOT_TOKEN,
    chat_id=CHAT_ID,
    file_obj="resources/csv/data.csv",
    filename="data.csv"
)

📦 Dependencies

PackageVersi
mysql-connector-python8.0.28
psycopg2latest
openpyxllatest
pyspark(environment)
pandas(environment)
requests(environment)

👨‍💻 Author

Andri Ariyanto

📄 License

Copyright © Andri Ariyanto — AI Department - BRI

Keywords

bribrain

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts