
Security News
Deno 2.6 + Socket: Supply Chain Defense In Your CLI
Deno 2.6 introduces deno audit with a new --socket flag that plugs directly into Socket to bring supply chain security checks into the Deno CLI.
huzzy-rabbit
Advanced tools
A robust, asynchronous RabbitMQ client built with aio_pika for Python applications. This class provides connection management, auto-reconnection, message publishing, queue declarations, binding, and RPC support. It handles disconnections gracefully and ensures reliable message delivery.
asyncio.Install the required dependencies:
pip install huzzy-rabbit
The class is self-contained and doesn't require additional setup beyond the dependencies.
Initialize the Client:
from huzzy_rabbit.rabbit_mq import RabbitMQ
rabbitmq = RabbitMQ()
Connect to RabbitMQ:
import asyncio
async def main():
await rabbitmq.connect("amqp://guest:guest@localhost/")
asyncio.run(main())
Publish a Message:
async def publish_example():
message_data = {"key": "value", "action": "test"}
await rabbitmq.publish(
message_dict=message_data,
exchange_name="my_exchange",
routing_key="my_routing_key"
)
Close the Connection (when done):
await rabbitmq.close()
Connect to your RabbitMQ server with retry logic:
await rabbitmq.connect(
connection_url="amqp://user:password@host:5672/",
max_retries=5 # Optional: default is 3
)
The client uses connect_robust for automatic reconnection on network issues. The reconnect_interval is set to 5 seconds by default.
Before any operation, the client automatically calls ensure_connection() internally. You can call it manually if needed:
await rabbitmq.ensure_connection()
This checks if the connection/channel is active and reconnects/reopens as necessary. Exchanges are re-declared after reconnection.
Publish JSON-serializable messages to an exchange:
# Declare exchange first (if not exists)
await rabbitmq.declare_exchange("my_exchange")
# Publish with default settings
message_data = {"user_id": 123, "event": "signup"}
await rabbitmq.publish(
message_dict=message_data,
exchange_name="my_exchange",
routing_key="user.events",
routing_action="process" # Optional: adds to message headers
)
# Or use a pre-built Message object
from aio_pika import Message
custom_message = Message(b"Raw body", headers={"custom": "header"})
await rabbitmq.publish(
message= custom_message,
exchange_name="my_exchange",
routing_key="user.events"
)
DeliveryMode.PERSISTENT)."action" key for routing logic.declare_exchange.Declare a durable direct exchange:
exchange = await rabbitmq.declare_exchange("my_exchange")
DIRECT (default).True (survives broker restarts).rabbitmq.exchanges and re-declared on reconnection.await rabbitmq.declare_queue(
queue_name="my_queue",
exchange_name="my_exchange",
routing_key="my_routing_key" # Optional: defaults to queue_name
)
This creates a durable queue and binds it to the exchange.
Set up a queue with a consumer callback:
async def message_listener(message):
async with message.process(): # Acknowledge after processing
data = json.loads(message.body.decode())
print(f"Received: {data}")
# Process your message here
await rabbitmq.declare_queue_and_bind(
queue_name="my_queue",
exchange_name="my_exchange",
app_listener=message_listener,
routing_key="my_routing_key"
)
DeliveredMessage.message.process() for manual acknowledgments.For request-response patterns:
async def response_listener(message):
# Process request and send reply
correlation_id = message.correlation_id
# ... process logic ...
reply_body = json.dumps({"result": "success"}).encode()
reply = Message(reply_body, correlation_id=correlation_id)
await reply_channel.publish(reply, routing_key=message.reply_to)
# Client side: Send RPC request
correlation_id = str(uuid.uuid4())
await rabbitmq.remote_procedure_call(
queue_name="rpc_reply_queue",
on_response=response_listener, # Server-side handler
correlation_id=correlation_id,
message_dict={"request": "data"}
)
correlation_id for matching responses.reply_to queue is auto-declared.publish catch ChannelClosed and AMQPConnectionError, reconnect, and retry.logging module. Configure your logger for production.max_retries times with exponential backoff.If a disconnection occurs (e.g., broker restart), the client will:
Customize via instance attributes:
rabbitmq = RabbitMQ()
rabbitmq.reconnect_interval = 10 # Seconds between reconnect attempts
await rabbitmq.close() in shutdown hooks.aio_pika for unit tests or use a local RabbitMQ container.amqps:// URLs with certificates.declare_exchange for others).Fork the repo, make changes, and submit a PR. Ensure tests pass and add type hints.
MIT License. See LICENSE for details.
For issues or questions, open a GitHub issue.
FAQs
Add your description here
We found that huzzy-rabbit demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Deno 2.6 introduces deno audit with a new --socket flag that plugs directly into Socket to bring supply chain security checks into the Deno CLI.

Security News
New DoS and source code exposure bugs in React Server Components and Next.js: what’s affected and how to update safely.

Security News
Socket CEO Feross Aboukhadijeh joins Software Engineering Daily to discuss modern software supply chain attacks and rising AI-driven security risks.