Message Queue Tasks
This module provides a framework for handling and processing asynchronous tasks through message queues.
Overview
- Utilizes the aio_pika library for AMQP message processing.
- Offers easy-to-use decorators and methods for defining tasks, producing and consuming them.
- Built with asynchronous patterns to allow seamless scalability.
Requirements
- Python 3.11+
- aio_pika library
- RabbitMQ or other AMQP broker
Usage
1. Defining a Task
To define a task, instantiate the MqTasks object and use the task decorator:
import mqtasks
mq_tasks = MqTasks(amqp_connection="amqp://localhost", queue_name="my_queue")
@mq_tasks.task(name="my_task")
def my_task_function(ctx: MqTaskContext):
pass
mq_tasks.run()
2. Sending a Task
Instantiate MqTasksClient and use it to send a task:
import asyncio
import mqtasks
client = MqTasksClient(
loop=asyncio.get_event_loop(),
amqp_connection="amqp://localhost"
)
channel = await client.queue(queue_name="my_queue")
result = await channel.request_task_async(task_name="my_task", body={"message": "hello world"})
Examples:
How to start example:
- Work dir:
./example
- Start RabbitMQ
docker pull rabbitmq:management
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
- Access the RabbitMQ management web UI:
- By default, the credentials are:
- Username: guest
- Password: guest
- Server
- Client
Release
make a release/x.x.x branch, up the version and commit
change the minor version 0.X.0
./scripts/release.sh minor
change the patch version 0.0.X
./scripts/release.sh patch
merge the release/x.x.x branch into the master and the develop branch
./scripts/merge.sh
License
MIT License