
Security News
How Enterprise Security Is Adapting to AI-Accelerated Threats
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.
Asyncio native pub/sub framework for Python
pip install eventiq
or
poetry add eventiq
pip install 'eventiq[broker]'
natsrabbitmqkafkaredisasyncio based python 3.8+ syntaxanyio, pydantic, typer)pydanticjson as default)*args and **kwargs in messages)import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
eventiq run app:service --log-level=info --reload=.
StubBroker class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run - run servicedocs - generate AsyncAPI docssend - send message to brokerFAQs
Publish/Subscribe asyncio framework for Python
We found that eventiq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.

Security News
Learn the essential steps every developer should take to stay secure on npm and reduce exposure to supply chain attacks.

Security News
Experts push back on new claims about AI-driven ransomware, warning that hype and sponsored research are distorting how the threat is understood.