Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Asyncio RabbitMQ client that handles all the edge cases.
pip install carehare
Consumer (async for
)::
try:
async with carehare.connect("amqps://guest:guest@localhost") as connection:
try:
await connection.queue_declare("my-queue", exclusive=True)
async with connection.acking_consumer("my-queue") as consumer:
async for message in consumer:
print(repr(message))
# ... if we raise an exception here, we won't ack.
#
# ... if we `break` from this loop, we won't ack.
#
# Call `consumer.close()` before (or instead of) `break`
# to abort iteration.
except carehare.ChannelClosedByServer:
logger.info("RabbitMQ told this one consumer to go away")
except carehare.ConnectionClosed:
# Either RabbitMQ is telling us an error (and the outer context
# manager will throw it), or we called connection.close()
# ourselves (so we want to close).
pass
except carehare.ConnectionClosedByServer:
# str(error) will give the RabbitMQ error message
logger.error("RabbitMQ closed our connection")
except carehare.ConnectionClosedByHeartbeatMonitor:
logger.error("RabbitMQ went away")
Consumer (next_delivery
)::
try:
async with carehare.connect("amqps://guest:guest@localhost") as connection:
try:
await connection.queue_declare("my-queue", exclusive=True)
async with connection.acking_consumer("my-queue") as consumer:
while True:
message, delivery_tag = await consumer.next_delivery()
# You must ack() (with no await). If RabbitMQ doesn't
# receive this ack, it may deliver the same message to
# another client.
consumer.ack(delivery_tag)
if message.startswith(b"okay, go away now"):
break
except carehare.ChannelClosedByServer:
logger.info("RabbitMQ told this one consumer to go away")
except carehare.ConnectionClosed:
# Either RabbitMQ is telling us an error (and the outer context
# manager will throw it), or we called connection.close()
# ourselves (so we want to close).
pass
except carehare.ConnectionClosedByServer:
# str(error) will give the RabbitMQ error message
logger.error("RabbitMQ closed our connection")
except carehare.ConnectionClosedByHeartbeatMonitor:
logger.error("RabbitMQ went away")
Publisher::
try:
async with carehare.connect("amqps://guest:guest@localhost") as connection:
try:
await connection.publish(b"Hello, world!", routing_key="my-queue")
except carehare.ServerSentNack:
logger.warn("Failed to publish message")
except carehare.ChannelClosedByServer:
# str(err) will give the RabbitMQ error message -- for instance,
# "404 NOT_FOUND" if the exchange does not exist
logger.error("Problem with the exchange")
except carehare.ConnectionClosedByServer:
# str(error) will give the RabbitMQ error message
logger.error("RabbitMQ closed our connection")
except carehare.ConnectionClosedByHeartbeatMonitor:
logger.error("RabbitMQ went away")
carehare
is designed to turn RabbitMQ's asynchronous error system into
understandable Python exceptions.
Channels
Carehare doesn't let you control RabbitMQ Channels. They aren't Pythonic. (In
RabbitMQ, an exception on a channel closes the channel -- and cancels all its
pending operations.)
Instead, carehare uses channels to handle errors. For instance, Queue.Declare
costs three operations: Channel.Open, Queue.Declare, Channel.Close. Since the
operation has its own channel, it won't interfere with other operations if it
causes an exception.
There's a speed-up for publishing: we lazily open a Channel per *exchange*.
Error codes like "not found", "access refused" and "not implemented" will make
carehare raise an exception on all pending publishes on the same exchange. Don't
worry: a normal "Nack" ("message wasn't delivered") will only make your single
message fail.
Exceptions
"Exceptions" are hardly exceptional: as a programmer, they are your job. These ones are designed to help you solve them.
Connection methods return asyncio.Future
objects. You must await each one
and handle its errors.
Even though you're using Python async context managers, exceptions can't
happen everywhere. Carehare will only raise when you await
a response
from RabbitMQ.
In particular, consumer.ack()
will never raise! You must call it from the
main event loop, but you won't await it.
To code safely, catch these exceptions religiously:
carehare.ChannelClosedByServer
: RabbitMQ did not like the command you
just ran. Read the exception message for details. After you receive this
message, you may continue using the RabbitMQ connection.carehare.ConnectionClosed
: When the connection shuts down, every pending
Future
will raise this. Only the actual Connection
context manager
will raise the underlying exception: a carehare.ConnectionClosedByServer
with a descriptive error message.Carehare won't raise asyncio.Cancelled
.
Back-pressure
The core logic is synchronous. It's simpler to reason about. The downside:
neither RabbitMQ nor users will wait for buffers to empty before sending more
data.
Use application-level logic to make sure you don't run out of memory:
* Consuming? Don't worry. Use ``prefetch_count`` to limit the number of messages
RabbitMQ sends. Always ack: carehare won't permit ``no-ack``.
* Publishing? Carehare forces "publisher confirms", so each publish returns a
``Future``. Your application is responsible for not calling publish() too many
times simultaneously. Use an ``asyncio.Semaphore`` or reason about your
specific use (for instance, "my server will host max 100 clients, and each
client can only publish one message at a time").
Comparison to other async RabbitMQ clients
Other clients tend to try and achieve "RabbitMQ in Python". They fail when it comes to exceptions:
aiormq
: If your code generates an error during consume, aiormq (4.1.1)
will catch it and ignore it -- stalling your program.aioamqp
: If your connection produces an unexpected error, aioamqp will
catch it and ignore it -- stalling your program. Also, the latest release was
in 2019.This author believes it's too confusing to model RabbitMQ's API in Python. Instead, carehare models your intent in Python.
You'll need Python 3.8+ and a RabbitMQ server.
If you have Docker, here's how to start a development server::
test-server/prepare-certs.sh # Create SSL certificates used in tests
docker run --rm -it \
-p 5671:5671 \
-p 5672:5672 \
-p 15672:15672 \
-v "/$(pwd)"/test-server:/test-server \
-e RABBITMQ_SSL_CACERTFILE=/test-server/ca.cert \
-e RABBITMQ_SSL_CERTFILE=/test-server/server.cert \
-e RABBITMQ_SSL_KEYFILE=/test-server/server.key \
-e RABBITMQ_SSL_VERIFY=verify_peer \
-e RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true \
-e RABBITMQ_CONFIG_FILE=/test-server/rabbitmq \
rabbitmq:3.8.11-management-alpine
During testing, see the RabbitMQ management interface at http://localhost:15672.
To add features and fix bugs
First, start a development RabbitMQ server (see above).
Now take on the development cycle:
#. ``tox`` # to ensure tests pass.
#. Write new tests in ``tests/`` and make sure they fail.
#. Write new code in ``carehare/`` to make the tests pass.
#. Submit a pull request.
To deploy
~~~~~~~~~
Use `semver <https://semver.org/>`_.
#. ``git push`` and make sure Travis tests all pass.
#. ``git tag vX.X.X``
#. ``git push --tags``
TravisCI will push to PyPi.
FAQs
asyncio RabbitMQ client
We found that carehare demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.