⚡🗞️ FastAPI Websocket Pub/Sub
A fast and durable Pub/Sub channel over Websockets.
The easiest way to create a live publish / subscribe multi-cast over the web.
Supports and tested on Python >= 3.7
As seen at PyCon IL 2021 and EuroPython 2021
Installation 🛠️
pip install fastapi_websocket_pubsub
Intro
The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).
FastAPI + WebSockets + PubSub == ⚡💪 ❤️
-
Subscribe
-
Publish
- Directly from server code to connected clients.
app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, path="/pubsub")
endpoint.publish(["my_event_topic"], data=["my", "data", 1])
- From client to client (through the servers)
async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
endpoint.publish(["my_event_topic"], data=["my", "data", 1])
- Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
- No matter which server a client connects to - it will get the messages it subscribes to
app = FastAPI()
endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
@app.websocket("/pubsub")
async def websocket_rpc_endpoint(websocket: WebSocket):
await endpoint.main_loop(websocket)
see examples/pubsub_broadcaster_server_example.py for full usage example
Usage example (server publishing following HTTP trigger):
In the code below, a client connects to the server and subscribes to a topic named "triggered".
Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.
Server:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, "/pubsub")
@app.get("/trigger")
async def trigger_events():
endpoint.publish(["triggered"])
Client:
from fastapi_websocket_pubsub import PubSubClient
async def on_trigger(data):
print("Trigger URL was accessed")
async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
client.subscribe("triggered", on_trigger)
More Examples
What can I do with this?
The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.
- Update mechanism
- Remote control mechanism
- Data processing
- Distributed computing
- Realtime communications over the web
Foundations:
Logging
fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config.
It provides a helper logging module to control how it produces logs for you.
See fastapi_websocket_rpc/logger.py.
Use logging_config.set_mode
or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer.
Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'
example:
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)
Pull requests - welcome!
- Please include tests for new features