Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pyknot

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pyknot

A Python-based message bus system designed for message handling and crafted to integrate with dependency injection frameworks.

  • 1.0.0
  • PyPI
  • Socket score

Maintainers
1

py-knot

This repository offers a message bus system tailored for integration with python-dependency-injector . It efficiently manages command and query dispatching and supports dependency injection, enhancing application decoupling, organization, and maintainability.

Installation

The source code is currently hosted on GitHub at: https://github.com/Retr0327/py-knot

Binary installers for the latest released version are available at the Python Package Index (PyPI).

  • pip

    pip install pyknot
    
  • poetry

    poetry add pyknot
    

Usage

1. Fundamental Implementation

  1. Define messages:

    Create specific command or query messages by extending Command or Query classes.

    from dataclasses import dataclass
    from knot import Command
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
  2. Define handlers:

    Implement CommandHandler or QueryHandler for handling defined messages.

    from dataclasses import dataclass
    from knot import Command, CommandHandler
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            ... # do something
    
  3. Register handlers:

    Register handlers to MessageBus by using register_handler method.

    from dataclasses import dataclass
    from knot import (
         Command,
         CommandHandler,
         MessageBus,
         register_handlers,
     )
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            ... # do something
    
    
    messages = register_handlers((TestCommandHandler,))
    message_bus = MessageBus(messages=messages)
    
  4. Dispatch Messages:

    Utilize MessageBus to dispatch messages within your application

    from dataclasses import dataclass
    from knot import (
         Command,
         CommandHandler,
         MessageBus,
         register_handlers,
     )
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
       a: int
       b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
       def handle(self, message: TestCommand) -> ReturnType:
           ... # do something
    
    
    messages = register_handlers((TestCommandHandler,))
    message_bus = MessageBus(messages=messages)
    message_bus.dispatch(TestCommand(a=1, b=2))
    

2. Integration with Dependency Injection

please make sure you have installed python-dependency-injector

  1. Set Up Dependency Injection Container

    from dataclasses import dataclass
    from dependency_injector import (
        containers,
        providers,
        )
    
    from knot import (
        Command,
        CommandHandler,
        MessageBus,
        register_handlers,
    )
    from knot.plugins.di import handlers_to_factories
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            return message.as_dict()
    
    
    messages = register_handlers((TestCommandHandler,))
    
    
    class MessageBusContainer(containers.DeclarativeContainer):
        message_bus = providers.Singleton(
            MessageBus,
            messages=providers.Dict(handlers_to_factories(messages)),
        )
    

    In this container, you'll manage your dependencies, including the message bus. The key function here is handlers_to_factories, transforming the handlers into factory providers that are compatible with the dependency injection framework.

  2. Wire Dependencies and Dispatch Command

    Wire the dependencies and dispatch commands through the message bus, and inject the message bus to whereever you need it.

    
    from dependency_injector.wiring import inject, Provide
    
    @inject
    def test(
        message_bus: MessageBus = Provide["message_bus"],
    ):
        return message_bus.dispatch(TestCommand(a=1, b=2))
    
    if __name__ == "__main__":
        container = MessageBusContainer()
        container.wire(modules=[__name__])
        test()
    

3. Extension as a Celery Plugin

please ensure that your application is already configured to work with Celery

py-knot can be extended to work seamlessly with Celery, enabling the dispatching of messages to a task queue. This extension is particularly useful for applications that require asynchronous processing or distributed task execution.

  1. Extend MessageBus for Celery Integration

    Change:

    from knot import MessageBus
    

    to

    from knot.plugins.celery import MessageBus
    
  2. Wire queue_dispatcher_module to the DI Container

    Wiring queue_dispatcher_module to the DI container makes the dispatch_queue method accessible as a dependency.

    from dataclasses import dataclass
    
    from dependency_injector import (
        containers,
        providers,
    )
    from dependency_injector.wiring import (
        Provide,
        inject,
    )
    
    from knot import (
        Command,
        CommandHandler,
        register_handlers,
    )
    from knot.plugins.celery import (
        MessageBus,
        queue_dispatcher_module,
    )
    from knot.plugins.di import handlers_to_factories
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            return message.as_dict()
    
    
    messages = register_handlers((TestCommandHandler,))
    
    
    class MessageBusContainer(containers.DeclarativeContainer):
        message_bus = providers.Singleton(
            MessageBus,
            messages=providers.Dict(handlers_to_factories(messages)),
        )
    
    
    @inject
    def test(
        message_bus: MessageBus = Provide["message_bus"],
    ):
        return message_bus.dispatch_queue(TestCommand(a="a", b="b"))
    
    
    if __name__ == "__main__":
        container = MessageBusContainer()
        container.wire(modules=[__name__, queue_dispatcher_module])
        test()
    
    

Contact Me

If you have any suggestion or question, please do not hesitate to email me at retr0327.dev@gmail.com.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc