
Security News
Open Source CAI Framework Handles Pen Testing Tasks up to 3,600× Faster Than Humans
CAI is a new open source AI framework that automates penetration testing tasks like scanning and exploitation up to 3,600× faster than humans.
Client with high level of abstraction for manipulation of messages in the event bus RabbitMQ.
Documentation: https://nutes-uepb.github.io/amqp-client-python
Source Code: https://github.com/nutes-uepb/amqp-client-python
version | compatible with |
---|---|
0.2.0 | 0.2.0 |
0.1.14 | ~0.1.12 |
# basic configuration
from amqp_client_python import (
AsyncEventbusRabbitMQ,
Config, Options
)
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content")
# subscribe
async def subscribe_handler(body) -> None:
print(body, type(body), flush=True) # handle messages
await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler)
# rpc_publish
response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content")
# provider
async def rpc_provider_handler(body) -> bytes:
print(f"body: {body}")
return b"content"
await eventbus.provide_resource("user.find", rpc_provider_handler)
from amqp_client_python import (
EventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
ROUTING_KEY: str = rpc_routing_key
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
self.routing_key = self.ROUTING_KEY
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body,"subscribe")
config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
from time import sleep
from random import randint
def handle(*body):
print(body[0], "rpc_provider")
return f"{body[0]}".encode("utf-8")
subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
try:
count += 1
if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
running = False
#eventbus.publish(publish_event, rpc_routing_key, "message_content")
#running = False
except TimeoutError as err:
print("timeout!!!: ", str(err))
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = EventbusWrapperRabbitMQ(config=config)
async def subscribe_handler(body) -> None:
print(f"{body}", type(body), flush=True)
async def rpc_provider_handler(body) -> bytes:
print(f"handle - {body}", type(body), flush=True)
return f"{body}".encode("utf-8")
# rpc_provider
eventbus.provide_resource("user.find", rpc_provider_handler).result()
# subscribe
eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result()
count = 0
running = True
while running:
try:
count += 1
# rpc_client call
eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8")
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content").result()
#running = False
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
The library is provided by NUTES-UEPB.
When using EventbusRabbitMQ Should not use rpc call inside of rpc provider or subscribe handlers, it may block the ioloop #/obs: fixed on other kinds of eventbus, will be removed on nexts releases
FAQs
Python AMQP Client Library
We found that amqp-client-python demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
CAI is a new open source AI framework that automates penetration testing tasks like scanning and exploitation up to 3,600× faster than humans.
Security News
Deno 2.4 brings back bundling, improves dependency updates and telemetry, and makes the runtime more practical for real-world JavaScript projects.
Security News
CVEForecast.org uses machine learning to project a record-breaking surge in vulnerability disclosures in 2025.