
Security News
NVD Concedes Inability to Keep Pace with Surging CVE Disclosures in 2025
Security experts warn that recent classification changes obscure the true scope of the NVD backlog as CVE volume hits all-time highs.
Asyncio native pub/sub framework for Python
pip install eventiq
or
poetry add eventiq
pip install 'eventiq[broker]'
nats
rabbitmq
kafka
redis
asyncio
based python 3.8+ syntaxanyio
, pydantic
, typer
)pydantic
json
as default)*args
and **kwargs
in messages)import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
eventiq run app:service --log-level=info --reload=.
StubBroker
class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run
- run servicedocs
- generate AsyncAPI docssend
- send message to brokerFAQs
Publish/Subscribe asyncio framework for Python
We found that eventiq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Security experts warn that recent classification changes obscure the true scope of the NVD backlog as CVE volume hits all-time highs.
Security Fundamentals
Attackers use obfuscation to hide malware in open source packages. Learn how to spot these techniques across npm, PyPI, Maven, and more.
Security News
Join Socket for exclusive networking events, rooftop gatherings, and one-on-one meetings during BSidesSF and RSA 2025 in San Francisco.