
About
A collection of random transforms for the Apache beam python SDK . Many are
simple transforms. The most useful ones are those for
reading/writing from/to relational databases.
Installation
pip install beam-nuggets
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .
Supported transforms
IO
Others
Documentation
See here.
Usage
Write data to an SQLite table using beam-nugget's
relational_db.Write transform.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
records = [
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2}
]
source_config = relational_db.SourceConfiguration(
drivername='sqlite',
database='/tmp/months_db.sqlite',
create_if_missing=True
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True,
primary_key_columns=['num']
)
with beam.Pipeline(options=PipelineOptions()) as p:
months = p | "Reading month records" >> beam.Create(records)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
Execute the pipeline
python write_sqlite.py
Examine the contents
sqlite3 /tmp/months_db.sqlite 'select * from months'
To write the same data to a PostgreSQL table instead, just create a suitable
relational_db.SourceConfiguration as follows.
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True
)
Click here
for more examples, including writing to PostgreSQL in Google Cloud Platform
using the DataFlowRunner.
An example showing how you can use beam-nugget's relational_db.ReadFromDB
transform to read from a PostgreSQL database table.
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
)
records = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name='months',
query='select num, name from months'
)
records | 'Writing to stdout' >> beam.Map(print)
See here for more examples.
Development
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
- Make changes on dedicated dev branches
- Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
- Create a PR against master.
- After merging the accepted PR and updating the local master, upload a new
build to pypi.
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh
Backlog
- versioned docs?
- Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
- more nuggets: WriteToCsv
- Investigate readiness of SDF ParDo, and possibility to use for relational_db.ReadFromDB
- integration tests
- DB transforms failures handling on IO transforms
- more nuggets: Elasticsearch, Mongo
- WriteToRelationalDB, logging
Contributions by
mohaseeb, astrocox, 2514millerj, alfredo, shivangkumar
Licence
MIT