aiopubsub - asyncio publish-subscribe within a process
Simple publish-subscribe pattern for asyncio applications.
Why
When building big applications, separation of concerns is a great way to keep things manageable.
In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data
consumers. We went a step ahead and designed even the internals of our applications around this pattern.
We explain our thinking and the workings of aiopubsub
in detail in our article
Design your app using the pub-sub pattern with aiopubsub <https://quantlane.com/blog/aiopubsub/>
_.
We recommend reading it before using aiopubsub
in your project.
Installation
aiopubsub
is only compatible with Python 3.8 and higher. There are no plans to support older versions.
aiopubsub
is available on PyPI <https://pypi.org/project/aiopubsub/>
_ and you can install it with:
::
pip install aiopubsub
or
::
poetry add aiopubsub
How to use it
The following comprehensive example is explained step-by-step
in our article
"Design your app using the pub-sub pattern with aiopubsub" <https://quantlane.com/blog/aiopubsub/>
_.
.. code-block:: python
import asyncio
import dataclasses
import decimal
import aiopubsub
@dataclasses.dataclass
class Trade:
timestamp: float
quantity: int
price: decimal.Decimal
async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key}.')
async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key} that happened in NYSE')
async def main():
# create an aiopubsub hub
hub = aiopubsub.Hub()
# create a sample of data to send
trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))
# subscriber listens on every trade and calls the `on_trade` function
subscriber = aiopubsub.Subscriber(hub, 'trades')
subscribe_key = aiopubsub.Key('*', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_trade)
# publisher has a NASDAQ prefix and sends the trade that happened on Google stock
publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
publish_key = aiopubsub.Key('trade', 'GOOGL')
publisher.publish(publish_key, trade)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43')) with key = ('NASDAQ', 'trade', 'GOOGL').
# sample from another stock exchange
trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))
# subscribe only for the NYSE exchange
subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)
# publish NYSE trade
publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL').
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE
# clean the subscriber before the end of the program
await subscriber.remove_all_listeners()
if __name__ == '__main__':
asyncio.run(main())
Aiopubsub will use logwood <https://github.com/qntln/logwood>
_ if it is installed, otherwise it will default
to the standard logging module. Note that logwood
is required to run tests.
Architecture
Hub accepts messages from Publishers and routes them to Subscribers. Each message is routed by its
Key - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys,
where any part of the key may be replaced replaced with a *
(star).
addedSubscriber
and removedSubscriber
messages
When a new subscriber is added the Hub sends this message
.. code-block::
{
"key": ("key", "of", "added", "subscriber"),
"currentSubscriberCount": 2
}
under the key ``('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber')`` (the part after ``addedSubscriber``
is made of the subscribed key). Note the ``currentSubscriberCount`` field indicating how many subscribers are currently
subscribed.
When a subscriber is removed a message in the same format is sent, but this time under the key
``('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber')``.
Contributing
-------------
Pull requests are welcome! In particular, we are aware that the documentation could be improved.
If anything about ``aiopubsub`` is unclear, please feel free to
`simply open an issue <https://gitlab.com/quantlane/libs/aiopubsub/-/issues/new>`_ and we will do our best
to advise and explain 🙂
****
.. image:: quantlane.png
``fastenum`` was made by `Quantlane <https://quantlane.com>`_, a systematic trading firm.
We design, build and run our own stock trading platform.