AioPika broker for taskiq
This lirary provides you with aio-pika broker for taskiq.
Usage:
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker()
@broker.task
async def test() -> None:
print("nothing")
Non-obvious things
You can send delayed messages and set priorities to messages using labels.
Delays
Default retries
To send delayed message, you have to specify
delay label. You can do it with task
decorator,
or by using kicker.
In this type of delay we are using additional queue with expiration
parameter and after with time message will be deleted from delay
queue and sent to the main taskiq queue.
For example:
broker = AioPikaBroker()
@broker.task(delay=3)
async def delayed_task() -> int:
return 1
async def main():
await broker.startup()
await delayed_task.kiq()
await delayed_task.kicker().with_labels(delay=4).kiq()
await delayed_task.kicker().with_labels(delay=None).kiq()
Retries with rabbitmq-delayed-message-exchange
plugin
To send delayed message you can install rabbitmq-delayed-message-exchange
plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.
And you need to configure you broker.
There is delayed_message_exchange_plugin
AioPikaBroker
parameter and it must be True
to turn on delayed message functionality.
The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.
For example:
broker = AioPikaBroker(
delayed_message_exchange_plugin=True,
)
@broker.task(delay=3)
async def delayed_task() -> int:
return 1
async def main():
await broker.startup()
await delayed_task.kiq()
await delayed_task.kicker().with_labels(delay=4).kiq()
Priorities
You can define priorities for messages using priority
label.
Messages with higher priorities are delivered faster.
But to use priorities you need to define max_priority
of the main queue, by passing max_priority
parameter in broker's init.
This parameter sets maximum priority for the queue and
declares it as the prority queue.
Before doing so please read the documentation about what
downsides you get by using prioritized queues.
broker = AioPikaBroker(max_priority=10)
@broker.task(priority=2)
async def prio_task() -> int:
return 1
async def main():
await broker.startup()
await prio_task.kiq()
await prio_task.kicker().with_labels(priority=4).kiq()
await prio_task.kicker().with_labels(priority=None).kiq()
Configuration
AioPikaBroker parameters:
url
- url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.result_backend
- custom result backend.task_id_generator
- custom task_id genertaor.exchange_name
- name of exchange that used to send messages.exchange_type
- type of the exchange. Used only if declare_exchange
is True.queue_name
- queue that used to get incoming messages.routing_key
- that used to bind that queue to the exchange.declare_exchange
- whether you want to declare new exchange if it doesn't exist.max_priority
- maximum priority for messages.delay_queue_name
- custom delay queue name.
This queue is used to deliver messages with delays.dead_letter_queue_name
- custom dead letter queue name.
This queue is used to receive negatively acknowleged messages from the main queue.qos
- number of messages that worker can prefetch.declare_queues
- whether you want to declare queues even on
client side. May be useful for message persistance.