RedisMPX
RedisMPX is a Redis Pub/Sub multiplexer library written in multiple languages and live coded on Twitch.
Abstract
When bridging multiple application instances through Redis Pub/Sub it's easy to end up needing
support for multiplexing. RedisMPX streamlines this process in a consistent way across multiple
languages by offering a consistent set of features that cover the most common use cases.
The library works under the assumption that you are going to create separate subscriptions
for each client connected to your service (e.g. WebSockets clients):
- ChannelSubscription allows you to add and remove individual Redis
PubSub channels similarly to how a multi-room chat application would need to.
- PatternSubscription allows you to subscribe to a single Redis Pub/Sub pattern.
- PromiseSubscription allows you to create a networked promise system.
Installation
Requires Python 3.7+, based on aio-libs/aioredis,
an AsyncIO Redis client.
pip install redismpx
Features
- Simple channel subscriptions
- Pattern subscriptions
- Networked promise system
- Automatic reconnection with exponetial backoff + jitter
Documentation
Usage
from redismpx import Multiplexer
mpx = Multiplexer('redis://localhost')
async def my_on_message(channel: bytes, message: bytes):
await websocket.send(f"ch: {channel} msg: {message}")
def my_on_disconnect(error: Exception):
print("oh no!")
def my_on_activation(name: bytes):
print("activated:", name)
channel_sub = mpx.new_channel_subcription(
my_on_message, my_on_disconnect, None)
pattern_sub = mpx.new_pattern_subscription("hello-*",
my_on_message, None, my_on_activation)
promise_sub = mpx.new_promise_subscription("hello-")
ChannelSubscription
channel_sub = mpx.new_channel_subcription(
lambda ch, msg: print(f"Message @ {ch}: {msg}"),
lambda e: print(f"Network Error: {type(e)}: {e}"),
lambda s: print(f"Subscription now active: {s}"))
channel_sub.add("chan1")
channel_sub.add("chan2")
channel_sub.add("chan3")
channel_sub.remove("chan2")
channel_sub.close()
PatternSubscription
pattern_sub = mpx.new_pattern_subcription(
"notifications:*",
lambda ch, msg: print(f"Message @ {ch}: {msg}"),
lambda e: print(f"Network Error: {type(e)}: {e}"),
lambda s: print(f"Subscription now active: {s}"))
pattern_sub.close()
PromiseSubscription
promise_sub = mpx.new_promise_subscription("hello-")
await promise_sub.wait_for_activation()
try:
promise = promise_sub.new_promise("world", 10)
except redismpx.InactiveSubscription:
promise = await promise_sub.wait_for_new_promise("world", 10)
try:
result = await promise
print(result)
except asyncio.TimeoutError:
except asyncio.CancelledError:
promise_sub.close()
WebSocket Example
This is a more realistic example of how to use RedisMPX.
Code
This code is also available in examples/channel.py.
import asyncio
import aioredis
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from redismpx import Multiplexer
mpx = Multiplexer('redis://localhost')
pub_conn = None
async def handle_ws(ws):
global pub_conn
await ws.accept()
if pub_conn is None:
pub_conn = await aioredis.create_redis('redis://localhost')
async def on_message(channel: bytes, message: bytes):
await ws.send_text(f"ch: [{channel.decode()}] msg: [{message.decode()}]\n")
sub = mpx.new_channel_subscription(on_message,
lambda e: print(f"Network Error: {type(e)}: {e}"),
lambda s: print(f"Subscription now active: {s}"))
await ws.send_text('# Use +channel to join a channel, -channel to leave.')
await ws.send_text('# Sending !channel will send the next message to said channel.')
await ws.send_text('# To see a message sent to a given channel, you must have joined it beforehand.')
while True:
msg = None
try:
msg = await ws.receive_text()
except:
print('ws disconnected')
sub.close()
return
prefix, chan = msg[0], msg[1:]
if prefix == "+":
sub.add(chan)
elif prefix == "-":
sub.remove(chan)
elif prefix == '!':
await pub_conn.publish(chan, await ws.receive_text())
app = Starlette(debug=True, routes=[
WebSocketRoute('/ws', endpoint=handle_ws),
])
Dependences
pip install redismpx aioredis starlette uvcorn
Launching the example
$ uvicorn websocket:app
Interacting with the example
The application works like a simple WebSocket chat application that
expects commands from the user.
- Sending
+hello
will subscribe you to channel hello
, while -hello
will do the opposite.
- Sending
!hello
will broadcast the next message you send to hello
.
- You can use whatever channel name you like.
To send those commands you can use a browser:
let ws = new WebSocket("ws://localhost:8000/ws")
ws.onmessage = (x) => console.log("message:", x.data)
ws.send("+test")
ws.send("!test")
ws.send("hello world!")
A more handy way of interacting with websockets are command-line clients: