Apache Flink Stateful Functions
Stateful Functions is an API that simplifies the building of distributed stateful applications with a runtime built for serverless architectures.
It brings together the benefits of stateful stream processing - the processing of large datasets with low latency and bounded resource constraints -
along with a runtime for modeling stateful entities that supports location transparency, concurrency, scaling, and resiliency.
It is designed to work with modern architectures, like cloud-native deployments and popular event-driven FaaS platforms
like AWS Lambda and KNative, and to provide out-of-the-box consistent state and messaging while preserving the serverless
experience and elasticity of these platforms.
Stateful Functions is developed under the umbrella of Apache Flink.
This README is meant as a brief walkthrough on the StateFun Python SDK and how to set things up
to get yourself started with Stateful Functions in Python.
For a fully detailed documentation, please visit the official docs.
For code examples, please take a look at the examples.
Table of Contents
Python SDK Overview
Background
The JVM-based Stateful Functions implementation has a RequestReply
extension (a protocol and an implementation) that allows calling into any HTTP endpoint that implements that protocol. Although it is possible to implement this protocol independently, this is a minimal library for the Python programing language that:
A Mini-Tutorial
Define and Declare a Function
from statefun import *
functions = StatefulFunctions()
@functions.bind(typename="demo/greeter")
def greet(context, message):
print(f"Hey {message.as_string()}!")
This code declares a function with of type demo/greeter
and binds it to the instance.
Registering and accessing persisted state
You can register persistent state that will be managed by the Stateful Functions workers
for state consistency and fault-tolerance. Values can be generally obtained via the context parameter:
from statefun import *
functions = StatefulFunctions()
@functions.bind(
typename="demo/greeter",
specs=[ValueSpec(name="seen", type=IntType)])
def greet(context, message):
seen = context.storage.seen or 0
seen += 1
context.storage.seen = seen
print(f"Hey {message.as_string()} I've seen you {seen} times")
Expose with a Request Reply Handler
handler = RequestReplyHandler(functions)
Using the Handler with your Favorite HTTP Serving Framework
For example, using Flask:
@app.route('/statefun', methods=['POST'])
def handle():
response_data = handler.handle_sync(request.data)
response = make_response(response_data)
response.headers.set('Content-Type', 'application/octet-stream')
return response
if __name__ == "__main__":
app.run()
This creates an HTTP server that accepts requests from the Stateful Functions cluster and
dispatches it to the handler.
Composing the Module YAML File
The remaining step would be to declare this function type in a module.yaml
functions:
- function:
meta:
kind: http
type: demo/greeter
spec:
endpoint: http://<end point url>/statefun
Testing
- Create a virtual environment
python3 -m venv venv
source venv/bin/activate
- Install dependencies
pip3 install .
- Run unit tests
python3 -m unittest tests
Contributing
There are multiple ways to enhance the Stateful Functions API for different types of applications; the runtime and operations will also evolve with the developments in Apache Flink.
You can learn more about how to contribute in the Apache Flink website. For code contributions, please read carefully the Contributing Code section and check the Stateful Functions component in Jira for an overview of ongoing community work.
License
The code in this repository is licensed under the Apache Software License 2.