Bowline
Easily build performant data stream processing pipelines in Python.
Bowline is a Python library that simplifies creating data pipelines that perform sequential computations
on data streams.
Key features of Bowline include:
- Performance: Each Bowline
Processor
runs in its own process, meaning that Bowline is ideal for high
throughput and computationally heavy workloads. - Simplicity: Bowline abstracts away the complexity of
multiprocessing
by handling process creation,
inter-process communication, and clean process shutdown. - Typing: Data inputs and ouputs are validated with
pydantic
. - Create Complex Pipelines: Bowline provides a
ProcessorChain
that chains processes together, and a ProcessGraph
to enable the creation of more complex, branching pipelines.
Installation
python -m pip install bowline-streaming
Usage
The following section describe the Processor
, ProcessorChain
, and ProcessorGraph
classes.
Processors
Processors are the "building blocks" of the process chains.
A Processor defines the function to be executed, data input and output formats, and a processor name.
Processors are executed in a background process, and data is transferred via process-safe queues.
Example
The following example shows creating and using a Processor
that adds two numbers and prints the results.
from pydantic import BaseModel
from bowline import Processor
class AddInputModel(BaseModel):
x: int
y: int
class AddOutputModel(BaseModel):
result: int
def add_and_print(input: AddInputModel) -> AddOutputModel:
result = input.x + input.y
return AddOutputModel(result=result)
if __name__ == '__main__':
addition_processor = Processor(target_function=add_and_print,
name="add",
input_model=AddInputModel,
output_model=AddOutputModel)
addition_processor.start()
addition_processor.push_input(AddInputModel(x=2, y=2))
addition_processor.push_input(AddInputModel(x=3, y=4))
addition_processor.push_input(AddInputModel(x=123, y=456))
print("Results: ")
for _ in range(3):
while not addition_processor.has_output():
pass
result = addition_processor.get_output()
print(result)
addition_processor.shutdown()
Processor Chains
Processor Chains connect multiple Processors in a chain, such that the output of a Processor is passed as the input to the next.
This allows for the definition of pipelines that can run in sequence on streaming data.
Example
The following example shows creating and using a ProcessorChain
that adds two numbers, then squares the result.
from pydantic import BaseModel
from bowline import Processor
from bowline import ProcessorChain
class AddInputModel(BaseModel):
x: int
y: int
class AddOutputModel(BaseModel):
result: int
class SquareOutputModel(BaseModel):
result: int
def add_two_numbers(input: AddInputModel) -> AddOutputModel:
result = input.x + input.y
return AddOutputModel(result=result)
def square_number(input: AddOutputModel) -> SquareOutputModel:
result = input.result * input.result
return SquareOutputModel(result=result)
if __name__ == '__main__':
add_two_numbers_processor = Processor(target_function=add_two_numbers,
name="add",
input_model=AddInputModel,
output_model=AddOutputModel)
square_number_processor = Processor(target_function=square_number,
name="square",
input_model=AddOutputModel,
output_model=SquareOutputModel)
processor_chain = ProcessorChain()
processor_chain.add_processor(add_two_numbers_processor)
processor_chain.add_processor(square_number_processor)
processor_chain.start()
processor_chain.push_input(AddInputModel(x=2, y=2))
processor_chain.push_input(AddInputModel(x=3, y=4))
processor_chain.push_input(AddInputModel(x=123, y=456))
print(f"Results: ")
for _ in range(3):
while not processor_chain.has_output():
pass
print(processor_chain.get_output())
processor_chain.shutdown()
Processor Graphs
Processor Graphs connect multiple Processors in a graph, such that the output of a Processor can be passed to multiple subsequent Processors.
This allows for the definition of pipelines that can run with an arbitrary number of branching processors within them.
Example
The following example creates a ProcessorGraph
in which two numbers are added together, then the result of that
calculation is passed to two subsequent processors: one which squares that result, and the other of which computes its
square root.
import math
from pydantic import BaseModel
from bowline import Processor, ProcessorGraph
class AddInputModel(BaseModel):
x: int
y: int
class AddOutputModel(BaseModel):
result: int
class SquareOutputModel(BaseModel):
result: int
class SquareRootOutputModel(BaseModel):
result: float
def add_two_numbers(input: AddInputModel) -> AddOutputModel:
result = input.x + input.y
print(f"{input.x} + {input.y} = {result}")
return AddOutputModel(result=result)
def square_number(input: AddOutputModel) -> SquareOutputModel:
result = input.result * input.result
print(f"{input.result} squared is {result}")
return SquareOutputModel(result=result)
def square_root(input: AddOutputModel) -> SquareRootOutputModel:
result = math.sqrt(input.result)
print(f"The square root of {input.result} is {result}")
return SquareRootOutputModel(result=result)
if __name__ == '__main__':
add_two_numbers_processor = Processor(target_function=add_two_numbers,
name="addition",
input_model=AddInputModel,
output_model=AddOutputModel)
square_number_processor = Processor(target_function=square_number,
name="square",
input_model=AddOutputModel,
output_model=SquareOutputModel)
square_root_processor = Processor(target_function=square_root,
name="sqrt",
input_model=AddOutputModel,
output_model=SquareRootOutputModel)
processor_graph = ProcessorGraph()
processor_graph.add_processor(add_two_numbers_processor)
processor_graph.add_processor(square_number_processor, add_two_numbers_processor)
processor_graph.add_processor(square_root_processor, add_two_numbers_processor)
processor_graph.start()
processor_graph.push_input(AddInputModel(x=2, y=2))
processor_graph.push_input(AddInputModel(x=3, y=4))
processor_graph.push_input(AddInputModel(x=123, y=456))
for _ in range(6):
while not processor_graph.has_output():
pass
result = processor_graph.get_output()
print(f"Received output {result.output} from processor {result.processor}")
processor_graph.shutdown()
Configuration-Driven Pipelines
You can also create pipelines via a yaml configuration file, for both ProcessorChain
s and ProcessorGraph
s.
ProcessorChain
Example yaml
configuration file, in which the add
Processor provides input to the square
Processor.
chain:
processors:
- add:
target_function: simple_chain.add_two_numbers
input_model: simple_chain.AddInputModel
output_model: simple_chain.AddOutputModel
- square:
target_function: simple_chain.square_number
input_model: simple_chain.AddOutputModel
output_model: simple_chain.SquareOutputModel
Example code:
from bowline.utils.config import ProcessorConfig
config_file_path = "examples/chain-config.yml"
config = ProcessorConfig(config_file_path)
processor_chain = config.generate_processors()
processor_chain.start()
ProcessorGraph
Example yaml
configuration file, which definesa graph in which the addition
Processor provides inputs to the square
and sqrt
Processors:
graph:
processors:
- addition:
target_function: simple_graph.add_two_numbers
input_model: simple_graph.AddInputModel
output_model: simple_graph.AddOutputModel
processors:
- square:
target_function: simple_graph.square_number
input_model: simple_graph.AddOutputModel
output_model: simple_graph.SquareOutputModel
- sqrt:
target_function: simple_graph.square_root
input_model: simple_graph.AddOutputModel
output_model: simple_graph.SquareRootOutputModel
Example code:
from bowline.utils.config import ProcessorConfig
config_file_path = "examples/graph-config.yml"
config = ProcessorConfig(config_file_path)
processor_graph = config.generate_processors()
processor_graph.start()
Considerations
Because Bowline uses multiprocessing
behind the scenes, all data models must be serializable.
Local Development
- Clone the repository:
git clone git@github.com:scottbarnesg/bowline.git
- Install
bowline
as an interactive package: python -m pip install -e .
Running the tests
- Install the test dependencies:
python -m pip install -e .[dev]
- Run the tests:
python -m pytest test/
- Or, to view log output while running tests:
python -m pytest --capture=no --log-cli-level=INFO test/test_processor.py
FAQ
Why doesn't Bowline use asyncio?
Bowline is built on the multiprocessing
library, so each Processor
instance runs in its own process.
This means that async functionality is generally not needed (although you are welcome to implement it in the functions you want Bowline to run).
Bowline's primary use case is creating high-throughput, low-latency pipelines for streaming data that you want to perform computationally-heavy operations on.
If you are looking for an async-first library that runs tasks in a single process and is designed for lighter workloads, a tool like Faust may be a better
fit for you use case.