
Security News
Potemkin Understanding in LLMs: New Study Reveals Flaws in AI Benchmarks
New research reveals that LLMs often fake understanding, passing benchmarks but failing to apply concepts or stay internally consistent.
Asyncio native pub/sub framework for Python
pip install eventiq
or
poetry add eventiq
pip install 'eventiq[broker]'
nats
rabbitmq
kafka
redis
asyncio
based python 3.8+ syntaxanyio
, pydantic
, typer
)pydantic
json
as default)*args
and **kwargs
in messages)import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
eventiq run app:service --log-level=info --reload=.
StubBroker
class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run
- run servicedocs
- generate AsyncAPI docssend
- send message to brokerFAQs
Publish/Subscribe asyncio framework for Python
We found that eventiq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
New research reveals that LLMs often fake understanding, passing benchmarks but failing to apply concepts or stay internally consistent.
Security News
Django has updated its security policies to reject AI-generated vulnerability reports that include fabricated or unverifiable content.
Security News
ECMAScript 2025 introduces Iterator Helpers, Set methods, JSON modules, and more in its latest spec update approved by Ecma in June 2025.