Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

aiopubsub

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aiopubsub

Simple pubsub pattern for asyncio applications

  • 3.0.0
  • PyPI
  • Socket score

Maintainers
1

aiopubsub - asyncio publish-subscribe within a process

Simple publish-subscribe pattern for asyncio applications.

Why

When building big applications, separation of concerns is a great way to keep things manageable. In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data consumers. We went a step ahead and designed even the internals of our applications around this pattern.

We explain our thinking and the workings of aiopubsub in detail in our article Design your app using the pub-sub pattern with aiopubsub <https://quantlane.com/blog/aiopubsub/>_. We recommend reading it before using aiopubsub in your project.

Installation

aiopubsub is only compatible with Python 3.8 and higher. There are no plans to support older versions.

aiopubsub is available on PyPI <https://pypi.org/project/aiopubsub/>_ and you can install it with:

::

pip install aiopubsub

or

::

poetry add aiopubsub

How to use it

The following comprehensive example is explained step-by-step in our article "Design your app using the pub-sub pattern with aiopubsub" <https://quantlane.com/blog/aiopubsub/>_.

.. code-block:: python

import asyncio
import dataclasses
import decimal

import aiopubsub


@dataclasses.dataclass
class Trade:
	timestamp: float
	quantity: int
	price: decimal.Decimal


async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
	print(f'Processing trade = {trade}  with key = {key}.')


async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
	print(f'Processing trade = {trade}  with key = {key} that happened in NYSE')


async def main():
	# create an aiopubsub hub
	hub = aiopubsub.Hub()

	# create a sample of data to send
	trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))

	# subscriber listens on every trade and calls the `on_trade` function
	subscriber = aiopubsub.Subscriber(hub, 'trades')
	subscribe_key = aiopubsub.Key('*', 'trade', '*')
	subscriber.add_async_listener(subscribe_key, on_trade)

	# publisher has a NASDAQ prefix and sends the trade that happened on Google stock
	publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
	publish_key = aiopubsub.Key('trade', 'GOOGL')
	publisher.publish(publish_key, trade)

	# sleep so the event loop can process the action
	await asyncio.sleep(0.001)

	# expected output:
	# Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43'))  with key = ('NASDAQ', 'trade', 'GOOGL').

	# sample from another stock exchange
	trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))

	# subscribe only for the NYSE exchange
	subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
	subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)

	# publish NYSE trade
	publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
	publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)

	# sleep so the event loop can process the action
	await asyncio.sleep(0.001)

	# expected output:
	# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL').
	# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE

	# clean the subscriber before the end of the program
	await subscriber.remove_all_listeners()

if __name__ == '__main__':
	asyncio.run(main())

Aiopubsub will use logwood <https://github.com/qntln/logwood>_ if it is installed, otherwise it will default to the standard logging module. Note that logwood is required to run tests.

Architecture

Hub accepts messages from Publishers and routes them to Subscribers. Each message is routed by its Key - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys, where any part of the key may be replaced replaced with a * (star).

addedSubscriber and removedSubscriber messages


When a new subscriber is added the Hub sends this message

.. code-block::

	{
		"key": ("key", "of", "added", "subscriber"),
		"currentSubscriberCount": 2
	}

under the key ``('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber')`` (the part after ``addedSubscriber``
is made of the subscribed key). Note the ``currentSubscriberCount`` field indicating how many subscribers are currently
subscribed.

When a subscriber is removed a message in the same format is sent, but this time under the key
``('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber')``.


Contributing
-------------
Pull requests are welcome! In particular, we are aware that the documentation could be improved.
If anything about ``aiopubsub`` is unclear, please feel free to
`simply open an issue <https://gitlab.com/quantlane/libs/aiopubsub/-/issues/new>`_ and we will do our best
to advise and explain 🙂


****

	.. image:: quantlane.png

	``fastenum`` was made by `Quantlane <https://quantlane.com>`_, a systematic trading firm.
	We design, build and run our own stock trading platform.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc