Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-dramatiq

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-dramatiq

Dramatiq with Asyncio support and some other goodies

  • 0.1.8
  • PyPI
  • Socket score

Maintainers
1

Quick Start

Background

Dramatiq is a background task-processing library for Python with a focus on simplicity, reliability and performance.

This package, async-dramatiq, extends Dramatiq to provide the following:

  1. Support for Asyncio ( issue #238 )
  2. Message scheduling support ( scheduling cookbook )

Setup

To provide async support for your actors all you need to do is add the AsyncMiddleware to your broker.

RabbitMQ Broker
import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker

rabbitmq_broker = RabbitmqBroker(host="rabbitmq")
rabbitmq_broker.add_middleware(AsyncMiddleware())  # <--- Here
dramatiq.set_broker(rabbitmq_broker)
Redis Broker
import dramatiq
from dramatiq.brokers.redis import RedisBroker

redis_broker = RedisBroker(host="redis")
redis_broker.add_middleware(AsyncMiddleware()) # <--- Here
dramatiq.set_broker(redis_broker)

Running

The Scheduler

We leverage apscheduler as our scheduling system. Check out run_scheduler.py for an example of running this scheduler.

Dramatiq Worker

For more details check out the official guide to dramatiq or docker-compose.yaml for a specific example.

Example

Play around with worker-heartbeat-example. A functioning and featured example implementation.


Async Middleware

AsyncMiddleware will start a AsyncWorker which will be used to run the event loop. This event loop will be shared across the Worker threads.

Startup and Shutdown Events

To startup and shutdown any resources the AsyncMiddleware provides two hooks:

  1. Before the event loop is started
  2. After the event loop is stopped To allow for standing up or tearing down of shared async resources
Example
from async_dramatiq.middleware import AsyncMiddleware

async def startup() -> None:
    """This function should contain your resource initialization code."""
    pass

async def shutdown() -> None:
    """This function should contain your resource teardown code."""
    pass


class MyAsyncMiddleware(AsyncMiddleware):
    def before_async_worker_thread_startup(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(startup())

    def after_async_worker_thread_shutdown(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(shutdown())
        thread.event_loop.close()

Async Actor

The async actor, async_actor, acts as a thin wrapper around the Dramatiq actor providing a variety of new functionality.

Interval Jobs

Run a job at some interval

@async_actor(interval=timedelta(seconds=5))
def run_every_5_seconds() -> None:
    pass

Cron Jobs

Run a job on a crontab ( See https://crontab.guru/. )

@async_actor(interval="0 0 * * *")
def run_at_midnight() -> None:
  pass

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc