πŸš€ DAY 1 OF LAUNCH WEEK: Reachability for Ruby Now in Beta.Learn more β†’
Socket
Book a DemoInstallSign in
Socket

operetta

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

operetta

A lightweight framework for building Python applications that is not tied to a specific transport protocol

pipPyPI
Version
0.0.15
Maintainers
1
logo
Design Python services right

Build Status Latest Version Python Version License

Operetta

A lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of aiomisc (service lifecycle, entrypoint) and dishka (dependency injection). On top of that, the following integrations are available:

  • AIOHTTP: declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with Swagger and Redoc.
  • PostgreSQL via asyncpg: a database adapter and DI provider for a connection pool.
  • PostgreSQL with HA via hasql: a pool with balancing, failover and the same adapter layer.
  • Error monitoring via Sentry using sentry-sdk.
  • Prometheus metrics via prometheus-client: expose metrics over HTTP for scraping.

Table of contents

Highlights

  • Services as units of functionality: each service starts/stops via aiomisc and may provide DI providers.
  • Single DI container (dishka) for the whole app; separate scopes for APP and REQUEST.
  • AIOHTTP integration:
    • Handler parameter annotations: FromBody[T], FromQuery[T], FromPath[T].
    • Automatic parsing and validation via mashumaro; friendly error details.
    • Unified JSON envelope for responses.
    • OpenAPI generation with static assets for Swagger/Redoc.
  • PostgreSQL integrations (asyncpg/hasql): interface adapter PostgresDatabaseAdapter + transactional PostgresTransactionDatabaseAdapter for repositories and units of work.
  • Sentry integration: simple, configurable initialization of sentry-sdk.
  • Prometheus integration: scrape metrics from a built-in HTTP endpoint; zero dependencies on third-party web frameworks.

Installation

Requires Python 3.11+.

  • Base:
pip install operetta
  • With AIOHTTP and OpenAPI:
pip install 'operetta[aiohttp]'
  • With PostgreSQL via asyncpg:
pip install 'operetta[asyncpg]'
  • With PostgreSQL HA via hasql:
pip install 'operetta[hasql]'
  • With Sentry:
pip install 'operetta[sentry]'
  • With Prometheus:
pip install 'operetta[prometheus]'

Quickstart (HTTP API)

A minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.

from dataclasses import dataclass, asdict
from aiohttp import web
from operetta.app import Application
from operetta.integrations.aiohttp.annotations import (
    FromBody,
    FromPath,
    FromQuery,
)
from operetta.integrations.aiohttp.response import success_response
from operetta.integrations.aiohttp.service import AIOHTTPService


@dataclass
class CreateUserBody:
    name: str
    email: str


@dataclass
class UserDto:
    id: int
    name: str
    email: str


async def create_user(
    _: web.Request, body: FromBody[CreateUserBody]
) -> web.StreamResponse:
    # ... create a user ...
    user = UserDto(id=1, name=body.name, email=body.email)
    return success_response(asdict(user))


async def get_user(
    _: web.Request,
    user_id: FromPath[int],
    detailed: FromQuery[bool] = False,
) -> UserDto:
    # ... load a user ...
    user = UserDto(id=user_id, name="Alice", email="alice@example.com")
    return user


routes = [
    web.post("/users", create_user),
    web.get("/users/{user_id}", get_user),
]

app = Application(
    AIOHTTPService(
        address="127.0.0.1",
        port=8080,
        routes=routes,
        docs_title="Demo API",
        docs_servers=("http://127.0.0.1:8080",),
        docs_default_type="swagger",  # or "redoc"
    ),
    di_providers=[],  # your dishka providers if needed
    warmup_dependencies=True,
)

if __name__ == "__main__":
    app.run()

Short example: raising DDD errors in handlers

from operetta.ddd import NotFoundError, AuthorizationError

async def get_user(_: web.Request, user_id: FromPath[int]) -> User:
    # Example auth check
    if not has_access_to_user(user_id):
        raise AuthorizationError(details=[{"permission": "users:read"}])

    user = await repo.get_user(user_id)
    if user is None:
        raise NotFoundError(details=[{"id": user_id}])

    return user

Open the docs at:

  • OpenAPI spec: /static/openapi/openapi.yaml (static files path is configurable).
  • Swagger UI: /docs/swagger (and redirect from /docs).
  • Redoc: /docs/redoc.

How it works under the hood

  • AIOHTTPService at app creation time:
    • Wraps your routes by inspecting handler signatures and FromBody/FromQuery/FromPath annotations.
    • Injects parsed values into the handler call.
    • If the return type is not a StreamResponse, serializes result into SuccessResponse[T] and returns JSON (format details).
    • Builds the OpenAPI spec via openapify and serves it as static.
    • Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.
  • DI is configured via dishka integration with AIOHTTP; the container is created by DIService and wired into the app.
    • Each request gets a new DI scope (REQUEST) for per-request dependencies.
    • Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to FromBody/FromQuery/FromPath via FromDishka.

Quickstart (non-HTTP app)

Operetta is not tied to HTTP. You can write background services/workers on aiomisc and use DI:

import asyncio
import contextlib
from operetta.app import Application
from operetta.service.base import Service

class Worker(Service):
    async def start(self):
        # example: a periodic task
        self._task = asyncio.create_task(self._job())

    async def stop(self, exception: Exception | None = None):
        self._task.cancel()
        with contextlib.suppress(Exception):
            await self._task

    async def _job(self):
        while True:
            # get dependencies if needed:
            # db = await self.get_dependency(PostgresDatabaseAdapter)
            await asyncio.sleep(1)

app = Application(Worker(), warmup_dependencies=True)
app.run()

Services and DI

  • Base service class: operetta.service.base.Service (inherits aiomisc.Service).
  • DI container: created inside DIService (see operetta/service/di.py).
    • Providers are collected from:
      • the Application itself (argument di_providers),
      • application services implementing get_di_providers().
    • Supports dependency warmup (warmup=True) for APP/REQUEST factories.
  • Retrieve a dependency from a service via await service.get_dependency(Type).

To load config from YAML, use YAMLConfigurationService:

from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

config_service = YAMLConfigurationService()  # reads --config path from CLI
app = Application(config_service)

Two values are provided to DI: ApplicationDictConfig (raw dict) and a config object (if you provide config_cls/config_factory).

Custom config class (mashumaro DataClassDictMixin):

from dataclasses import dataclass
from mashumaro import DataClassDictMixin
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

# Define your typed config mapped to YAML structure
@dataclass
class AppConfig(DataClassDictMixin):
    # You can use nested dataclasses as well; here kept minimal
    creds: dict[str, str] | None = None

# Build service that parses YAML into AppConfig using mashumaro
config_service = YAMLConfigurationService(
    config_cls=AppConfig,
    config_factory=AppConfig.from_dict,
)

# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI
app = Application(config_service)

AIOHTTP

A first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.

Highlights:

  • Handler parameter annotations: FromBody[T], FromQuery[T], FromPath[T] (plus DI via FromDishka).
  • Unified JSON responses out of the box.
  • Automatic OpenAPI spec generation and static docs at /docs (Swagger or Redoc).

Provided components:

Install extra:

pip install 'operetta[aiohttp]'

How to wire it up:

from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.aiohttp import (
    AIOHTTPService,
    AIOHTTPServiceConfigProvider,
)

app = Application(
    YAMLConfigurationService(),  # loads --config path and exposes dict to DI
    AIOHTTPService(
        routes=[],
        # You may still override settings here (constructor wins over YAML):
        # port=9090,
        # docs_default_type="redoc",
    ),
    di_providers=[AIOHTTPServiceConfigProvider()],
)

Configuration

You can configure AIOHTTPService in three complementary ways:

Precedence rule:

[!TIP]

YAML keys (all optional) live under the api: section:

api:
  address: 0.0.0.0         # bind address
  port: 8081               # listen port
  static_endpoint_prefix: /static/
  static_files_root: ./var/static  # where to serve static files and openapi spec
  docs_default_path: /docs
  docs_swagger_path: /docs/swagger
  docs_redoc_path: /docs/redoc
  docs_title: Demo API
  docs_servers:
    - http://127.0.0.1:8081
  docs_default_type: swagger  # swagger | redoc | null (no redirect from /docs)
  docs_remove_path_prefix: /v1/
  # Optional OpenAPI cosmetics
  docs_tag_descriptions:
    users: Operations with users
  docs_tag_groups:
    Management:
      - users

Custom config provider example (env-vars):

import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.aiohttp.config import AIOHTTPServiceConfig
from operetta.integrations.aiohttp import AIOHTTPService

class EnvAiohttpConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> AIOHTTPServiceConfig:
        return AIOHTTPServiceConfig(
            address=os.getenv("HTTP_ADDRESS", "0.0.0.0"),
            port=int(os.getenv("HTTP_PORT", "8080")),
        )

app = Application(
    AIOHTTPService(routes=[]),
    di_providers=[EnvAiohttpConfigProvider()],
)

Error handling and response format

  • Successful responses are automatically wrapped into { "success": true, "data": ..., "error": null }.
  • Errors use { "success": false, "data": null, "error": { message, code, details } }.
  • Standard AIOHTTP errors and domain/application/infrastructure errors (see operetta.ddd.errors) are mapped by middleware from integrations/aiohttp/middlewares.py.
  • Parsing errors for body/params use types from integrations/aiohttp/errors.py (e.g., InvalidJSONBodyError, InvalidQueryParamsError, InvalidPathParamsError, ...).

Recommended way to raise errors in your app

  • Import DDD exceptions from a single place:

    from operetta.ddd import (
        NotFoundError,
        AlreadyExistsError,
        ConflictError,
        ValidationError,
        AuthenticationError,
        AuthorizationError,
        RelatedResourceNotFoundError,
        DependencyUnavailableError,
    )
    
  • Raise with optional structured details (a sequence of JSON-serializable objects):

    raise NotFoundError(
        details=[{"resource": "User", "id": user_id}]
    )
    

HTTP mapping of DDD exceptions (handled by middleware)

DDD exceptionHTTP statusHTTP errorcode
AuthenticationError401UnauthorizedErrorUNAUTHORIZED
AuthorizationError, PermissionDeniedError403ForbiddenErrorFORBIDDEN
NotFoundError404ResourceNotFoundErrorRESOURCE_NOT_FOUND
AlreadyExistsError409DuplicateRequestErrorDUPLICATE_RESOURCE
ConflictError, InvalidOperationError409ConflictErrorCONFLICT
ValidationError, RelatedResourceNotFoundError422UnprocessableEntityErrorUNPROCESSABLE_ENTITY
DeadlineExceededError504GatewayTimeoutErrorGATEWAY_TIMEOUT
DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError503ServiceUnavailableErrorSERVICE_UNAVAILABLE
DependencyFailureError502BadGatewayErrorBAD_GATEWAY
StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback)500ServerErrorINTERNAL_SERVER_ERROR

Response envelope reference

  • Success:

    { "success": true, "data": { "id": 1, "name": "Alice" }, "error": null }
    
  • Error:

    {
      "success": false,
      "data": null,
      "error": {
        "message": "Resource not found",
        "code": "RESOURCE_NOT_FOUND",
        "details": [ { "resource": "User", "id": 123 } ]
      }
    }
    

Advanced

  • You can throw HTTP-specific errors directly if you need full control over the client response: see operetta.integrations.aiohttp.errors (e.g., ForbiddenError, UnauthorizedError, UnprocessableEntityError).
  • Two middlewares are installed by default:

PostgreSQL

Operetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:

  • PostgresDatabaseAdapter β€” a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.
  • PostgresTransactionDatabaseAdapter β€” the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.

There are two interchangeable backends:

  • asyncpg β€” a straightforward single-pool setup.
  • hasql (asyncpg HA) β€” a high-availability pool manager with balancing/failover.

Both backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:

Configuration is provided via DI:

Typical pattern:

  • Use PostgresDatabaseAdapter when you don't need explicit transaction management: it's suitable for any reads and writes.
  • When you need transactional boundaries, get PostgresTransactionDatabaseAdapter, call start_transaction()/commit_transaction() (or rollback_transaction() on error), and run your operations within that transaction.

Configuration can be loaded from YAML via YAMLConfigurationService under the postgres: key. Optional connection initialization (e.g., custom codecs or search_path) can be provided through AsyncpgPoolFactoryKwargs in DI; this works for both asyncpg and hasql variants.

Single-node PostgreSQL (asyncpg)

Provides:

YAML config example:

postgres:
  user: app
  password: secret
  database: appdb
  host: 127.0.0.1:5432
  # optional pool params:
  min_size: 5
  max_size: 20
  max_queries: 50000
  max_inactive_connection_lifetime: 300

Plug into the app:

from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.service import (
    AsyncpgPostgresDatabaseConfigProvider,
    AsyncpgPostgresDatabaseService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgPostgresDatabaseService(),
    di_providers=[AsyncpgPostgresDatabaseConfigProvider()],
)

Use in a repository:

from dataclasses import dataclass
from operetta.ddd.infrastructure.db.postgres.adapters.interface import (
    PostgresDatabaseAdapter,
    PostgresTransactionDatabaseAdapter,
)

@dataclass
class User:
    id: int
    name: str

class UserRepository:
    def __init__(self, db: PostgresDatabaseAdapter):
        self._db = db

    async def get_by_id(self, user_id: int) -> User | None:
        row = await self._db.fetch_one(
            "SELECT id, name FROM users WHERE id=$1", user_id
        )
        return User(id=row["id"], name=row["name"]) if row else None

class UnitOfWork:
    def __init__(self, tx: PostgresTransactionDatabaseAdapter):
        self._tx = tx

    async def __aenter__(self):
        await self._tx.start_transaction()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if exc:
            await self._tx.rollback_transaction()
        else:
            await self._tx.commit_transaction()

High-availability PostgreSQL cluster (hasql)

If you run an HA cluster (multiple nodes), use the hasql integration.

Provides:

YAML config example:

postgres:
  user: app
  password: secret
  database: appdb
  hosts:
    - 10.0.0.1:5432
    - 10.0.0.2:5432
  min_masters: 1
  min_replicas: 1
  # optional:
  acquire_timeout: 5
  refresh_delay: 5
  refresh_timeout: 5
  fallback_master: false
  master_as_replica_weight: 1.0
  balancer_policy: greedy  # or round_robin / random_weighted
  stopwatch_window_size: 100

Plug into the app:

from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigProvider,
    AsyncpgHAPostgresDatabaseService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[AsyncpgHAPostgresDatabaseConfigProvider()],
)

[!TIP] DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.

Advanced setup

You can pass an init callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):

import json
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigProvider,
    AsyncpgHAPostgresDatabaseService,
)

class AsyncpgJSONCodecProvider(Provider):
    scope = Scope.APP

    @provide(override=True)
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        async def set_custom_codecs(conn):
            await conn.set_type_codec(
                "jsonb",
                encoder=json.dumps,
                decoder=json.loads,
                schema="pg_catalog",
            )
        return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[
        AsyncpgHAPostgresDatabaseConfigProvider(),
        AsyncpgJSONCodecProvider(),
    ],
)

[!IMPORTANT]
If you use the built-in AsyncpgPostgresDatabaseConfigProvider or AsyncpgHAPostgresDatabaseConfigProvider, they already register a default provider for AsyncpgPoolFactoryKwargs. To customize pool options, declare your provider with @provide(override=True) so it overrides the built-in one; otherwise container validation will fail.
When you provide your own AsyncpgPoolFactoryKwargs and there is an existing default provider from those services, override=True is mandatory.

Define your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:

import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import (
    AsyncpgPostgresDatabaseConfig,
    AsyncpgPoolFactoryKwargs,
)
from operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService

class EnvAsyncpgConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:
        return AsyncpgPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            host=os.getenv("PGHOST", "127.0.0.1:5432"),
            password=os.getenv("PGPASSWORD"),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}

app = Application(
    AsyncpgPostgresDatabaseService(),
    di_providers=[EnvAsyncpgConfigProvider()],
)

Example of an environment-based HA config provider:

import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigurationService,
    AsyncpgHAPostgresDatabaseService,
)


class EnvHasqlConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:
        hosts = os.getenv("PGHOSTS", "10.0.0.1:5432,10.0.0.2:5432").split(",")
        return AsyncpgHAPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            hosts=[h.strip() for h in hosts if h.strip()],
            password=os.getenv("PGPASSWORD"),
            min_masters=int(os.getenv("PG_MIN_MASTERS", "1")),
            min_replicas=int(os.getenv("PG_MIN_REPLICAS", "1")),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}


app = Application(
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[EnvHasqlConfigProvider()],
)

Sentry

A built-in integration that initializes sentry-sdk with a logging integration for breadcrumbs and error events. It’s optional and configured via DI and/or constructor parameters.

Provided components:

Install extra:

pip install 'operetta[sentry]'

How to wire it up:

from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.sentry import (
    SentryService,
    SentryServiceConfigProvider,
)

app = Application(
    YAMLConfigurationService(),          # optional: load YAML into DI
    SentryService(
        # You can override any config here; constructor wins over DI
        # send_default_pii=False,
        # debug=False,
    ),
    di_providers=[SentryServiceConfigProvider()]
)

Configuration

You can configure SentryService in three complementary ways:

  • Constructor (SentryService.__init__) β€” highest priority.
  • DI (SentryServiceConfig resolved from provider) β€” overrides defaults.
  • Internal defaults β€” used if neither of the above specify a value.

YAML keys (all optional) live under the sentry: section:

sentry:
  dsn: https://public@o0.ingest.sentry.io/0
  enabled: true

  # Logging integration
  capture_log_level: ERROR      # event level (string or int)
  context_log_level: INFO       # breadcrumbs level (string or int)
  ignore_loggers:               # loggers to exclude from breadcrumbs/events
    - aiohttp.access

  # Core SDK options
  environment: production
  release: 1.2.3
  server_name: api-01

  include_local_variables: true
  max_breadcrumbs: 100
  shutdown_timeout: 2.0

  # Sampling
  sample_rate: 1.0              # error event sampling
  traces_sample_rate: 0.2       # performance tracing sampling

  # Error filtering and in-app
  ignore_errors:
    - TimeoutError
  in_app_include:
    - myapp
  in_app_exclude:
    - aiohttp

  # Privacy / debug
  send_default_pii: false
  debug: false

  # HTTP body and propagation
  max_request_body_size: medium
  trace_propagation_targets:
    - .*

  # Transport tweaks
  keep_alive: false

  # Anything else passed to sentry_sdk.init (overrides same-named keys above)
  extra_options:
    profiles_sample_rate: 0.1

Custom config provider example (env-vars):

import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.sentry.config import SentryServiceConfig
from operetta.integrations.sentry import SentryService

class EnvSentryConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> SentryServiceConfig:
        return SentryServiceConfig(
            dsn=os.getenv("SENTRY_DSN"),
            enabled=os.getenv("SENTRY_ENABLED", "true").lower() == "true",
        )

app = Application(
    SentryService(),
    di_providers=[EnvSentryConfigProvider()],
)

Behavior and notes

If enabled: false or no dsn is provided, Sentry is skipped (a message is logged).

All other parameters rely on the defaults defined by sentry-sdk itself. Operetta does not override those internal defaults: if you do not set a field in SentryServiceConfig and do not provide it via extra_options, the behavior is identical to calling sentry_sdk.init without that argument. See the official documentation for the full list of options and their default values: https://docs.sentry.io/platforms/python/configuration/options/

The extra_options parameter lets you supply any additional keys for sentry_sdk.init that do not have a dedicated field in SentryServiceConfig. These keys are merged last (overriding same-named ones) into the final options dict passed to the SDK.

Prometheus

Expose Prometheus metrics over HTTP with zero dependencies on third-party web frameworks. The service uses Python's standard library (asyncio.start_server) to serve metrics efficiently in asyncio environments.

Provided components:

Install extra:

pip install 'operetta[prometheus]'

How to wire it up:

from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.prometheus import (
    PrometheusService,
    PrometheusServiceConfigProvider,
)

app = Application(
    YAMLConfigurationService(),
    PrometheusService(),  # constructor args override YAML
    di_providers=[PrometheusServiceConfigProvider()],
)

Configuration

You can configure PrometheusService in three complementary ways:

  • Constructor (PrometheusService.__init__) β€” highest priority.
  • DI (PrometheusServiceConfig resolved from provider) β€” overrides defaults.
  • Internal defaults β€” used if neither of the above specify a value.

YAML keys (all optional) live under the prometheus: section:

prometheus:
  address: 0.0.0.0
  port: 9000
  endpoint: /metrics
  enabled: true

Custom config provider example (env-vars):

import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.prometheus.config import PrometheusServiceConfig
from operetta.integrations.prometheus import PrometheusService

class EnvPromConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> PrometheusServiceConfig:
        return PrometheusServiceConfig(
            address=os.getenv("PROM_ADDRESS", "0.0.0.0"),
            port=int(os.getenv("PROM_PORT", "9000")),
            endpoint=os.getenv("PROM_ENDPOINT", "/metrics"),
            enabled=os.getenv("PROM_ENABLED", "true").lower() == "true",
        )

app = Application(
    PrometheusService(),
    di_providers=[EnvPromConfigProvider()],
)

Usage

  • By default, the global Prometheus registry is used. If you want a custom registry, provide one via DI:
from operetta import Application
from operetta.integrations.prometheus import PrometheusService
from dishka import Provider, Scope, provide
from prometheus_client import CollectorRegistry

class PromRegistryProvider(Provider):
    scope = Scope.APP

    @provide
    def get_registry(self) -> CollectorRegistry:
        return CollectorRegistry()

app = Application(
    PrometheusService(),
    di_providers=[PromRegistryProvider()],
)
  • Create and register metrics as usual using prometheus-client; they will be collected from the registry the service uses (global by default, or your DI-provided one):
from prometheus_client import Counter, REGISTRY

# Using default (global) registry
REQUESTS = Counter('http_requests_total', 'Count of HTTP requests')
REQUESTS.inc()

# If you provided a custom registry, pass it explicitly when creating metrics:
from prometheus_client import Counter, CollectorRegistry
registry = CollectorRegistry()
CUSTOM_COUNTER = Counter('my_counter', 'Help', registry=registry)
CUSTOM_COUNTER.inc()
  • HTTP details:
    • Methods: GET and HEAD.
    • Path match ignores query string (e.g., /metrics?format=...).
    • Content-Type is set to Prometheus TEXT format.
    • The server is a tiny asyncio server based on the standard library.

Keywords

application

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts