Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Write code like this:
Subscriber
async with Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
Publisher
async with Client("test.mosquitto.org") as client:
await client.publish("humidity/outside", payload=0.38)
asyncio-mqtt combines the stability of the time-proven paho-mqtt library with a modern, asyncio-based interface.
MqttError
)on_unsubscribe
, on_disconnect
, etc.)async
codeThe whole thing is less than 700 lines of code.
asyncio-mqtt can be installed via pip install asyncio-mqtt
. It requires Python 3.7+ to run. The only dependency is paho-mqtt.
If you can't wait for the latest version and want to install directly from GitHub, use:
pip install git+https://github.com/sbtinstruments/asyncio-mqtt
Since Python 3.8, the default asyncio event loop is the ProactorEventLoop
. Said loop doesn't support the add_reader
method that is required by asyncio-mqtt. Please switch to an event loop that supports the add_reader
method such as the built-in SelectorEventLoop
:
# Change to the "Selector" event loop
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Run your async application as usual
asyncio.run(main())
Let's make the example from before more interesting:
You can configure quite a few things when initializing the client. These are all the possible parameters together with their default values. See paho-mqtt's documentation for more information about the individual parameters.
import asyncio_mqtt as aiomqtt
import paho.mqtt as mqtt
aiomqtt.Client(
hostname="test.mosquitto.org", # The only non-optional parameter
port=1883,
username=None,
password=None,
logger=None,
client_id=None,
tls_context=None,
tls_params=None,
proxy=None,
protocol=None,
will=None,
clean_session=None,
transport="tcp",
keepalive=60,
bind_address="",
bind_port=0,
clean_start=mqtt.client.MQTT_CLEAN_START_FIRST_ONLY,
properties=None,
message_retry_set=20,
socket_options=(),
max_concurrent_outgoing_calls=None,
websocket_path=None,
websocket_headers=None,
)
Imagine you're measuring temperature and humidity on the outside and inside, and our topics look like this: temperature/outside
. You want to receive all types of measurements but handle them differently. asyncio-mqtt provides Topic.matches()
to make this easy:
import asyncio
import asyncio_mqtt as aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("#")
async for message in messages:
if message.topic.matches("humidity/outside"):
print(f"[humidity/outside] {message.payload}")
if message.topic.matches("+/inside"):
print(f"[+/inside] {message.payload}")
if message.topic.matches("temperature/#"):
print(f"[temperature/#] {message.payload}")
asyncio.run(main())
Note that in our example, messages to temperature/inside
are handled twice!
In many cases, you'll want to send and receive messages in different locations in your code. You could create a new client each time, but
You can share the connection by passing the Client
instance to all functions that need it:
import asyncio
import asyncio_mqtt as aiomqtt
async def publish_humidity(client):
await client.publish("humidity/outside", payload=0.38)
async def publish_temperature(client):
await client.publish("temperature/outside", payload=28.3)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await publish_humidity(client)
await publish_temperature(client)
asyncio.run(main())
Most web frameworks take control over the "main" function, which makes it difficult to figure out where to create and connect the Client
and how to share this connection.
Some frameworks like Starlette directly support lifespan context managers, with which you can safely set up a global client instance that you can then pass to functions that need it, just like before:
import asyncio
import asyncio_mqtt as aiomqtt
import starlette.applications
import contextlib
client = None
@contextlib.asynccontextmanager
async def lifespan(app):
global client
async with aiomqtt.Client("test.mosquitto.org") as c:
client = c
yield
app = starlette.applications.Starlette(
routes=[],
lifespan=lifespan,
)
FastAPI (which is built upon Starlette) doesn't expose that API yet, but there are multiple open PRs to add it. In the meantime, you can work around it via FastAPI's dependency injection.
connect
/disconnect
manually?Managing a connection by calling connect
and disconnect
directly is a bit tricky. For example, when you're disconnecting the client, you'd have to make sure that there's no other task that still relies on the connection. There are many similar situations where something can easily go wrong.
Context managers take care of all connection and disconnection logic for you, in a way that makes it very difficult to shoot yourself in the foot. They are a lot easier and less error-prone to use than connect
/disconnect
.
Supporting both context managers and manual connect
/disconnect
would add a lot of complexity to asyncio-mqtt. To keep maintainer burden manageable, we focus only on the preferred option: context managers.
If you run the basic example for subscribing and listening for messages, you'll notice that the program doesn't finish until you stop it. Waiting for messages blocks the execution of everything that comes afterwards. If you want to run other code after starting your listener (e.g. handling HTTP requests in a web framework) you don't want this.
To solve this, you can use asyncio's create_task
without await
ing the created task. The concept is similar to starting a new thread without join
ing it in a multithreaded application.
import asyncio
import asyncio_mqtt as aiomqtt
async def listen():
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
async def main():
# Wait for messages in (unawaited) asyncio task
loop = asyncio.get_event_loop()
task = loop.create_task(listen())
# This will still run!
print("Magic!")
# If you don't await the task here the program will simply finish.
# However, if you're using an async web framework you usually don't have to await
# the task, as the framework runs in an endless loop.
await task
asyncio.run(main())
You can reconnect when the connection to the broker is lost by wrapping your code in a try/except
-block and listening for MqttError
s.
import asyncio
import asyncio_mqtt as aiomqtt
async def main():
reconnect_interval = 5 # In seconds
while True:
try:
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload.decode())
except aiomqtt.MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
You can configure TLS via the TLSParameters
class. The parameters are directly passed through to paho-mqtt's tls_set
function. See paho-mqtt's documentation for more information about the individual parameters.
import asyncio
import asyncio_mqtt as aiomqtt
import ssl
tls_params = aiomqtt.TLSParameters(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
ciphers=None,
)
async def main():
async with aiomqtt.Client("test.mosquitto.org", tls_params=tls_params) as client:
await client.publish("humidity/outside", payload=0.38)
asyncio.run(main())
You can configure proxying via the ProxySettings
class. The parameters are directly passed through to paho-mqtt's proxy_set
functionality. Both SOCKS and HTTP proxies are supported. Note that proxying is an extra feature (even in paho-mqtt) that requires the PySocks
dependency. See paho-mqtt's documentation for more information about the individual parameters.
import asyncio
import asyncio_mqtt as aiomqtt
import socks
proxy_params = aiomqtt.ProxySettings(
proxy_type=socks.HTTP,
proxy_addr="www.example.com",
proxy_rdns=True,
proxy_username=None,
proxy_password=None,
)
async def main():
async with aiomqtt.Client("test.mosquitto.org", proxy=proxy_params) as client:
await client.publish("humidity/outside", payload=0.38)
asyncio.run(main())
Note that the underlying paho-mqtt library is dual-licensed. One of the licenses is the so-called Eclipse Distribution License v1.0. It is almost word-for-word identical to the BSD 3-clause License. The only differences are:
This project adheres to Semantic Versioning.
Expect API changes until we reach version 1.0.0
. After 1.0.0
, breaking changes will only occur in major release (e.g., 2.0.0
, 3.0.0
, etc.).
Please refer to the CHANGELOG document. It adheres to the principles of Keep a Changelog.
Is asyncio-mqtt not what you are looking for? Try another client:
FAQs
Idiomatic asyncio wrapper around paho-mqtt
We found that asyncio-mqtt demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.