Buelon
A scripting language to simply manage a very large amount of i/o heavy workloads. Such as API calls for your ETL, ELT or any program needing Python and/or SQL
Table of Contents
Installation
pip install buelon
That's it!
This will install the cli command bue
. Check install by running bue --version
or bue -v
Note:
This package uses Cython and you may need to install python3-dev
using
sudo apt-get install python3-dev
[more commands and information].
If you would like to use this repository without Cython,
you may git clone
since it is not technically dependent on
these scripts, but they do provide a significant performance boost.
Quick Start
- Get example template:
bue example
(warning: this command will over-write .env
) - Start Bucket server, Hub and 3 workers:
bue demo
- Upload script and wait for results:
python3 example.py
Production Start
Security: Make sure bucket, hub and workers are under
a private network only
(you will need a web server or something similar
under the same private network
to access this tool using bue upload
)
- Run bucket server:
bue bucket -b 0.0.0.0:61535
- Run hub:
bue hub -b 0.0.0.0:65432 -k localhost:61535
- Run worker(s):
bue worker -b localhost:65432 -k localhost:61535
- Upload code:
bue upload -b localhost:65432 -f ./example.bue
Supported Languages
Learn by Example
(see below for example.py
contents)
$ production-small
!0
accounts:
python
accounts
example.py
request:
python
request_report
example.py
status:
python
$ testing-small
get_status
example.py
download:
python
!9
get_report
example.py
manipulate_data:
sqlite3
some_table
`
SELECT
*,
CASE
WHEN sales = 0
THEN 0.0
ELSE spend / sales
END AS acos
FROM some_table
`
py_transform:
python
$ testing-heavy
transform_data
example.py
upload:
python
upload_to_db
example.py
accounts_pipe = | accounts
api_pipe = request | status | download | manipulate_data | py_transform | upload
for account in accounts_pipe():
api_pipe(account)
example.py
import time
import random
import uuid
import logging
from typing import List, Dict, Union
from buelon.core.step import Result, StepStatus
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def accounts(*args) -> List[Dict[str, Union[int, str]]]:
"""Returns a list of sample account dictionaries.
Returns:
List[Dict[str, Union[int, str]]]: A list of dictionaries containing account information.
"""
account_list = [
{'id': 0, 'name': 'Account 1'},
{'id': 2, 'name': 'Account 2'},
{'id': 3, 'name': 'Account 4'},
]
logger.info(f"Retrieved {len(account_list)} accounts")
return account_list
def request_report(config: Dict[str, Union[int, str]]) -> Dict[str, Union[Dict, uuid.UUID, float]]:
"""Simulates a report request for a given account.
Args:
config (Dict[str, Union[int, str]]): A dictionary containing account information.
Returns:
Dict[str, Union[Dict, uuid.UUID, float]]: A dictionary with account data and request details.
"""
account_id = config['id']
request = {
'report_id': uuid.uuid4(),
'time': time.time(),
'account_id': account_id
}
logger.info(f"Requested report for account ID: {account_id}, Report ID: {request['report_id']}")
return {
'account': config,
'request': request
}
def get_status(config: Dict[str, Union[Dict, uuid.UUID, float]]) -> Union[Dict, Result]:
"""Checks the status of a report request.
Args:
config (Dict[str, Union[Dict, uuid.UUID, float]]): A dictionary containing request information.
Returns:
Union[Dict, Result]: Either the input config if successful, or a Result object if pending.
"""
requested_time = config['request']['time']
account_id = config['account']['id']
status = 'success' if requested_time + random.randint(10, 15) < time.time() else 'pending'
if status == 'pending':
logger.info(f"Report status for account ID {account_id} is pending")
return Result(status=StepStatus.pending)
logger.info(f"Report status for account ID {account_id} is success")
return config
def get_report(config: Dict[str, Union[Dict, uuid.UUID, float]]) -> Union[Dict, Result]:
"""Retrieves a report or simulates an error.
Args:
config (Dict[str, Union[Dict, uuid.UUID, float]]): A dictionary containing request configuration.
Returns:
Union[Dict, Result]: Either a dictionary with report data or a Result object for reset.
Raises:
ValueError: If an unexpected error occurs.
"""
account_id = config['account']['id']
if random.randint(0, 10) == 0:
report_data = {'status': 'error', 'msg': 'timeout error'}
else:
report_data = [
{'sales': i * 10, 'spend': i % 10, 'clicks': i * 13}
for i in range(random.randint(25, 100))
]
if not isinstance(report_data, list):
if isinstance(report_data, dict):
if (report_data.get('status') == 'error'
and report_data.get('msg') == 'timeout error'):
logger.warning(f"Timeout error for account ID {account_id}. Resetting.")
return Result(status=StepStatus.reset)
error_msg = f'Unexpected error: {report_data}'
logger.error(f"Error getting report for account ID {account_id}: {error_msg}")
raise ValueError(error_msg)
logger.info(f"Successfully retrieved report for account ID {account_id} with {len(report_data)} rows")
return {
'config': config,
'table_data': report_data
}
def transform_data(data: Dict[str, Union[Dict, List[Dict]]]) -> None:
"""Transforms the report data by adding account information to each row.
Args:
data (Dict[str, Union[Dict, List[Dict]]]): A dictionary containing config and table data.
"""
config = data['config']
table_data = data['table_data']
account_name = config['account']['name']
for row in table_data:
row['account'] = account_name
logger.info(f"Transformed {len(table_data)} rows of data for account: {account_name}")
def upload_to_db(data: Dict[str, Union[Dict, List[Dict]]]) -> None:
"""Handles table upload to database.
Args:
data (Dict[str, Union[Dict, List[Dict]]]): A dictionary containing table data to be uploaded.
"""
table_data = data['table_data']
account_name = data['config']['account']['name']
logger.info(f"Uploaded {len(table_data)} rows to the database for account: {account_name}")
Known Defects
Currently the error handling for this scripting language is not the best.
When the script is run it is build into python,
so it then uses its error handling, which is very good.
Because of the language's current simplicity, this is not marked as a high priority.
Future Plans
If this projects sees some love, or I just find more free time, I'd like to support more languages. Even compiled languages such as rust
, go
and c++
. Allowing teams that write different languages to work on the same program.
Better bue script errors handling.
Possibly build in rust
once more mature for better performance.
License