Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

jararaca

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

jararaca

A simple and fast API framework for Python

  • 0.2.7
  • PyPI
  • Socket score

Maintainers
1
README.md

Jararaca Microservice Framework

Overview

Jararaca is a aio-first microservice framework that provides a set of tools to build and deploy microservices in a simple and clear way.

Features

Hexagonal Architecture

The framework is based on the hexagonal architecture, which allows you to separate the business logic from the infrastructure, making the code more testable and maintainable.

Dependency Injection

The framework uses the dependency injection pattern to manage the dependencies between the components of the application.

    app = Microservice(
        providers=[
            ProviderSpec(
                provide=Token(AuthConfig, "AUTH_CONFIG"),
                use_value=AuthConfig(
                    secret="secret",
                    identity_refresh_token_expires_delta_seconds=60 * 60 * 24 * 30,
                    identity_token_expires_delta_seconds=60 * 60,
                ),
            ),
            ProviderSpec(
                provide=Token(AppConfig, "APP_CONFIG"),
                use_factory=AppConfig.provider,
            ),
            ProviderSpec(
                provide=TokenBlackListService,
                use_value=InMemoryTokenBlackListService(),
            ),
        ],
    )

Web Server Port

The framework provides a web server that listens on a specific port and routes the requests to the appropriate handler. It uses FastAPI as the web framework.

    @Delete("/{task_id}")
    async def delete_task(self, task_id: TaskId) -> None:
        await self.tasks_crud.delete_by_id(task_id)

        await use_ws_manager().broadcast(("Task %s deleted" % task_id).encode())

Message Bus

The framework provides a topic-based message bus that allows you to send messages between the components of the application. It uses AIO Pika as the message broker worker and publisher.

    @IncomingHandler("task")
    async def process_task(self, message: Message[Identifiable[TaskSchema]]) -> None:
        name = generate_random_name()
        now = asyncio.get_event_loop().time()
        print("Processing task: ", name)

        task = message.payload()

        print("Received task: ", task)
        await asyncio.sleep(random.randint(1, 5))

        await use_publisher().publish(task, topic="task")

        then = asyncio.get_event_loop().time()
        print("Task Finished: ", name, " Time: ", then - now)

Distributed Websocket

You can setup a room-based websocket server that allows you to send messages to a specific room or broadcast messages to all connected clients. All backend instances communicates with each other using a pub/sub mechanism (such as Redis).

    @WebSocketEndpoint("/ws")
    async def ws_endpoint(self, websocket: WebSocket) -> None:
        await websocket.accept()
        counter.increment()
        await use_ws_manager().add_websocket(websocket)
        await use_ws_manager().join(["tasks"], websocket)
        await use_ws_manager().broadcast(
            ("New Connection (%d) from %s" % (counter.count, self.hostname)).encode()
        )

        print("New Connection (%d)" % counter.count)

        while True:
            try:
                await websocket.receive_text()
            except WebSocketDisconnect:
                counter.decrement()
                await use_ws_manager().remove_websocket(websocket)

                await use_ws_manager().broadcast(
                    (
                        "Connection Closed (%d) from %s"
                        % (counter.count, self.hostname)
                    ).encode()
                )
                print("Connection Closed (%d)" % counter.count)
                break

Scheduled Routine

You can setup a scheduled routine that runs a specific task at a specific time or interval.

...
    @ScheduledAction("* * * * * */3", allow_overlap=False)
    async def scheduled_task(self) -> None:
        print("Scheduled Task at ", asyncio.get_event_loop().time())

        print("sleeping")
        await asyncio.sleep(5)

        await use_publisher().publish(
            message=Identifiable(
                id=uuid4(),
                data=TaskSchema(name=generate_random_name()),
            ),
            topic="task",
        )

Observability

You can setup Observability Interceptors for logs, traces and metric collection with OpenTelemetry-based Protocols

class HelloService:
    def __init__(
        self,
        hello_rpc: Annotated[HelloRPC, Token(HelloRPC, "HELLO_RPC")],
    ):
        self.hello_rpc = hello_rpc

    @TracedFunc("ping") # Decorator for tracing
    async def ping(self) -> HelloResponse:
        return await self.hello_rpc.ping()

    @TracedFunc("hello-service")
    async def hello(
        self,
        gather: bool,
    ) -> HelloResponse:
        now = asyncio.get_event_loop().time()
        if gather:
            await asyncio.gather(*[self.random_await(a) for a in range(10)])
        else:
            for a in range(10):
                await self.random_await(a)
        return HelloResponse(
            message="Elapsed time: {}".format(asyncio.get_event_loop().time() - now)
        )

    @TracedFunc("random-await")
    async def random_await(self, index: int) -> None:
        logger.info("Random await %s", index, extra={"index": index})
        await asyncio.sleep(random.randint(1, 3))
        logger.info("Random await %s done", index, extra={"index": index})

Installation

pip install jararaca

Usage

Create a Microservice

# app.py

from jararaca import Microservice, create_http_server, create_messagebus_worker
from jararaca.presentation.http_microservice import HttpMicroservice

app = Microservice(
    providers=[
        ProviderSpec(
            provide=Token[AppConfig],
            use_factory=AppConfig.provider,
        )
    ],
    controllers=[TasksController],
    interceptors=[
        AIOSqlAlchemySessionInterceptor(
            AIOSQAConfig(
                connection_name="default",
                url="sqlite+aiosqlite:///db.sqlite3",
            )
        ),
    ],
)


# App for specific Http Configuration Context
http_app = HttpMicroservice(app)

web_app = create_http_server(app)

Run as a Web Server

uvicorn app:web_app --reload
# or
jararaca server app:app
# or
jararaca server app:http_app

Run as a Message Bus Worker

jararaca worker app:app

Run as a scheduled routine

jararaca scheduler app:app

Generate Typescript intefaces from microservice app controllers

jararaca gen-tsi app.main:app app.ts

Documentation

Documentation is under construction here.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc