
Product
Reachability for Ruby Now in Beta
Reachability analysis for Ruby is now in beta, helping teams identify which vulnerabilities are truly exploitable in their applications.
A lightweight framework for building Python applications that is not tied to a specific transport protocol
A lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of aiomisc (service lifecycle, entrypoint) and dishka (dependency injection). On top of that, the following integrations are available:
APP and REQUEST.FromBody[T], FromQuery[T], FromPath[T].PostgresDatabaseAdapter + transactional PostgresTransactionDatabaseAdapter for repositories and units of work.Requires Python 3.11+.
pip install operetta
pip install 'operetta[aiohttp]'
pip install 'operetta[asyncpg]'
pip install 'operetta[hasql]'
pip install 'operetta[sentry]'
pip install 'operetta[prometheus]'
A minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.
from dataclasses import dataclass, asdict
from aiohttp import web
from operetta.app import Application
from operetta.integrations.aiohttp.annotations import (
FromBody,
FromPath,
FromQuery,
)
from operetta.integrations.aiohttp.response import success_response
from operetta.integrations.aiohttp.service import AIOHTTPService
@dataclass
class CreateUserBody:
name: str
email: str
@dataclass
class UserDto:
id: int
name: str
email: str
async def create_user(
_: web.Request, body: FromBody[CreateUserBody]
) -> web.StreamResponse:
# ... create a user ...
user = UserDto(id=1, name=body.name, email=body.email)
return success_response(asdict(user))
async def get_user(
_: web.Request,
user_id: FromPath[int],
detailed: FromQuery[bool] = False,
) -> UserDto:
# ... load a user ...
user = UserDto(id=user_id, name="Alice", email="alice@example.com")
return user
routes = [
web.post("/users", create_user),
web.get("/users/{user_id}", get_user),
]
app = Application(
AIOHTTPService(
address="127.0.0.1",
port=8080,
routes=routes,
docs_title="Demo API",
docs_servers=("http://127.0.0.1:8080",),
docs_default_type="swagger", # or "redoc"
),
di_providers=[], # your dishka providers if needed
warmup_dependencies=True,
)
if __name__ == "__main__":
app.run()
Short example: raising DDD errors in handlers
from operetta.ddd import NotFoundError, AuthorizationError
async def get_user(_: web.Request, user_id: FromPath[int]) -> User:
# Example auth check
if not has_access_to_user(user_id):
raise AuthorizationError(details=[{"permission": "users:read"}])
user = await repo.get_user(user_id)
if user is None:
raise NotFoundError(details=[{"id": user_id}])
return user
Open the docs at:
/static/openapi/openapi.yaml (static files path is configurable)./docs/swagger (and redirect from /docs)./docs/redoc.AIOHTTPService at app creation time:
FromBody/FromQuery/FromPath annotations.StreamResponse, serializes result into SuccessResponse[T] and returns JSON (format details).DIService and wired into the app.
REQUEST) for per-request dependencies.FromBody/FromQuery/FromPath via FromDishka.Operetta is not tied to HTTP. You can write background services/workers on aiomisc and use DI:
import asyncio
import contextlib
from operetta.app import Application
from operetta.service.base import Service
class Worker(Service):
async def start(self):
# example: a periodic task
self._task = asyncio.create_task(self._job())
async def stop(self, exception: Exception | None = None):
self._task.cancel()
with contextlib.suppress(Exception):
await self._task
async def _job(self):
while True:
# get dependencies if needed:
# db = await self.get_dependency(PostgresDatabaseAdapter)
await asyncio.sleep(1)
app = Application(Worker(), warmup_dependencies=True)
app.run()
operetta.service.base.Service (inherits aiomisc.Service).DIService (see operetta/service/di.py).
Application itself (argument di_providers),get_di_providers().warmup=True) for APP/REQUEST factories.await service.get_dependency(Type).To load config from YAML, use YAMLConfigurationService:
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService
config_service = YAMLConfigurationService() # reads --config path from CLI
app = Application(config_service)
Two values are provided to DI: ApplicationDictConfig (raw dict) and a config object (if you provide config_cls/config_factory).
Custom config class (mashumaro DataClassDictMixin):
from dataclasses import dataclass
from mashumaro import DataClassDictMixin
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService
# Define your typed config mapped to YAML structure
@dataclass
class AppConfig(DataClassDictMixin):
# You can use nested dataclasses as well; here kept minimal
creds: dict[str, str] | None = None
# Build service that parses YAML into AppConfig using mashumaro
config_service = YAMLConfigurationService(
config_cls=AppConfig,
config_factory=AppConfig.from_dict,
)
# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI
app = Application(config_service)
A first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.
Highlights:
FromBody[T], FromQuery[T], FromPath[T] (plus DI via FromDishka)./docs (Swagger or Redoc).Provided components:
AIOHTTPService β the main service that wraps routes, handles requests, and serves OpenAPI/docs.AIOHTTPConfigurationService β registers a config provider into DI.AIOHTTPServiceConfigProvider β reads ApplicationDictConfig['api'] and decodes it into AIOHTTPServiceConfig.Install extra:
pip install 'operetta[aiohttp]'
How to wire it up:
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.aiohttp import (
AIOHTTPService,
AIOHTTPServiceConfigProvider,
)
app = Application(
YAMLConfigurationService(), # loads --config path and exposes dict to DI
AIOHTTPService(
routes=[],
# You may still override settings here (constructor wins over YAML):
# port=9090,
# docs_default_type="redoc",
),
di_providers=[AIOHTTPServiceConfigProvider()],
)
You can configure AIOHTTPService in three complementary ways:
__init__) arguments β explicit values have the highest priority.YAMLConfigurationService + AIOHTTPConfigurationService/AIOHTTPServiceConfigProvider) β good for ops-driven setups; overrides defaults but not explicit __init__ values.Precedence rule:
__init__ β DI (AIOHTTPServiceConfigProvider) β internal defaults[!TIP]
AIOHTTPConfigurationServiceis a helper that installsAIOHTTPServiceConfigProviderinto DI.- This provider reads
ApplicationDictConfig['api']and decodes it intoAIOHTTPServiceConfig.- YAML is not required. You can provide
AIOHTTPServiceConfigvia any DI provider.
YAML keys (all optional) live under the api: section:
api:
address: 0.0.0.0 # bind address
port: 8081 # listen port
static_endpoint_prefix: /static/
static_files_root: ./var/static # where to serve static files and openapi spec
docs_default_path: /docs
docs_swagger_path: /docs/swagger
docs_redoc_path: /docs/redoc
docs_title: Demo API
docs_servers:
- http://127.0.0.1:8081
docs_default_type: swagger # swagger | redoc | null (no redirect from /docs)
docs_remove_path_prefix: /v1/
# Optional OpenAPI cosmetics
docs_tag_descriptions:
users: Operations with users
docs_tag_groups:
Management:
- users
Custom config provider example (env-vars):
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.aiohttp.config import AIOHTTPServiceConfig
from operetta.integrations.aiohttp import AIOHTTPService
class EnvAiohttpConfigProvider(Provider):
scope = Scope.APP
@provide
def get_config(self) -> AIOHTTPServiceConfig:
return AIOHTTPServiceConfig(
address=os.getenv("HTTP_ADDRESS", "0.0.0.0"),
port=int(os.getenv("HTTP_PORT", "8080")),
)
app = Application(
AIOHTTPService(routes=[]),
di_providers=[EnvAiohttpConfigProvider()],
)
{ "success": true, "data": ..., "error": null }.{ "success": false, "data": null, "error": { message, code, details } }.operetta.ddd.errors) are mapped by middleware from integrations/aiohttp/middlewares.py.integrations/aiohttp/errors.py (e.g., InvalidJSONBodyError, InvalidQueryParamsError, InvalidPathParamsError, ...).Recommended way to raise errors in your app
Import DDD exceptions from a single place:
from operetta.ddd import (
NotFoundError,
AlreadyExistsError,
ConflictError,
ValidationError,
AuthenticationError,
AuthorizationError,
RelatedResourceNotFoundError,
DependencyUnavailableError,
)
Raise with optional structured details (a sequence of JSON-serializable objects):
raise NotFoundError(
details=[{"resource": "User", "id": user_id}]
)
HTTP mapping of DDD exceptions (handled by middleware)
| DDD exception | HTTP status | HTTP error | code |
|---|---|---|---|
| AuthenticationError | 401 | UnauthorizedError | UNAUTHORIZED |
| AuthorizationError, PermissionDeniedError | 403 | ForbiddenError | FORBIDDEN |
| NotFoundError | 404 | ResourceNotFoundError | RESOURCE_NOT_FOUND |
| AlreadyExistsError | 409 | DuplicateRequestError | DUPLICATE_RESOURCE |
| ConflictError, InvalidOperationError | 409 | ConflictError | CONFLICT |
| ValidationError, RelatedResourceNotFoundError | 422 | UnprocessableEntityError | UNPROCESSABLE_ENTITY |
| DeadlineExceededError | 504 | GatewayTimeoutError | GATEWAY_TIMEOUT |
| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503 | ServiceUnavailableError | SERVICE_UNAVAILABLE |
| DependencyFailureError | 502 | BadGatewayError | BAD_GATEWAY |
| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback) | 500 | ServerError | INTERNAL_SERVER_ERROR |
Response envelope reference
Success:
{ "success": true, "data": { "id": 1, "name": "Alice" }, "error": null }
Error:
{
"success": false,
"data": null,
"error": {
"message": "Resource not found",
"code": "RESOURCE_NOT_FOUND",
"details": [ { "resource": "User", "id": 123 } ]
}
}
Advanced
operetta.integrations.aiohttp.errors (e.g., ForbiddenError, UnauthorizedError, UnprocessableEntityError).ddd_errors_middleware maps DDD exceptions to HTTP errors above.unhandled_error_middleware catches all other exceptions and returns a generic 500 with a safe message.Operetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:
PostgresDatabaseAdapter β a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.PostgresTransactionDatabaseAdapter β the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.There are two interchangeable backends:
Both backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:
PostgresDatabaseAdapter is provided with scope=APP (shared pool).PostgresTransactionDatabaseAdapter is provided with scope=REQUEST (per-request/operation handle for transactional work).Configuration is provided via DI:
AsyncpgPostgresDatabaseConfig (for asyncpg) and AsyncpgHAPostgresDatabaseConfig (for asyncpg HA).AsyncpgPoolFactoryKwargs (to pass init or other pool options to the driver/manager).AsyncpgPostgresDatabaseConfigProvider and AsyncpgHAPostgresDatabaseConfigProvider β read settings from ApplicationDictConfig['postgres'], which is loaded by YAMLConfigurationService from your YAML file.AsyncpgPoolFactoryKwargs by default; you can override it to customize connection initialization (see Advanced setup).Typical pattern:
PostgresDatabaseAdapter when you don't need explicit transaction management: it's suitable for any reads and writes.PostgresTransactionDatabaseAdapter, call start_transaction()/commit_transaction() (or rollback_transaction() on error), and run your operations within that transaction.Configuration can be loaded from YAML via YAMLConfigurationService under the postgres: key. Optional connection initialization (e.g., custom codecs or search_path) can be provided through AsyncpgPoolFactoryKwargs in DI; this works for both asyncpg and hasql variants.
Provides:
AsyncpgPostgresDatabaseProvider, AsyncpgPostgresDatabaseConfigProvider.Application:
AsyncpgPostgresDatabaseService β pool and adapters,AsyncpgPostgresDatabaseConfigurationService β loads config from ApplicationDictConfig.PostgresDatabaseAdapter with scope=APP β general-purpose adapter for any operations (fetch/fetch_one/execute, ...).PostgresTransactionDatabaseAdapter with scope=REQUEST (handy for HTTP requests) β same API plus transaction control methods (start/commit/rollback).YAML config example:
postgres:
user: app
password: secret
database: appdb
host: 127.0.0.1:5432
# optional pool params:
min_size: 5
max_size: 20
max_queries: 50000
max_inactive_connection_lifetime: 300
Plug into the app:
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.service import (
AsyncpgPostgresDatabaseConfigProvider,
AsyncpgPostgresDatabaseService,
)
app = Application(
YAMLConfigurationService(),
AsyncpgPostgresDatabaseService(),
di_providers=[AsyncpgPostgresDatabaseConfigProvider()],
)
Use in a repository:
from dataclasses import dataclass
from operetta.ddd.infrastructure.db.postgres.adapters.interface import (
PostgresDatabaseAdapter,
PostgresTransactionDatabaseAdapter,
)
@dataclass
class User:
id: int
name: str
class UserRepository:
def __init__(self, db: PostgresDatabaseAdapter):
self._db = db
async def get_by_id(self, user_id: int) -> User | None:
row = await self._db.fetch_one(
"SELECT id, name FROM users WHERE id=$1", user_id
)
return User(id=row["id"], name=row["name"]) if row else None
class UnitOfWork:
def __init__(self, tx: PostgresTransactionDatabaseAdapter):
self._tx = tx
async def __aenter__(self):
await self._tx.start_transaction()
return self
async def __aexit__(self, exc_type, exc, tb):
if exc:
await self._tx.rollback_transaction()
else:
await self._tx.commit_transaction()
If you run an HA cluster (multiple nodes), use the hasql integration.
Provides:
AsyncpgHAPostgresDatabaseProvider, AsyncpgHAPostgresDatabaseConfigProvider.Application:
AsyncpgHAPostgresDatabaseService β pool and adapters,AsyncpgHAPostgresDatabaseConfigurationService β loads config from ApplicationDictConfig.PostgresDatabaseAdapter with scope=APP β general-purpose adapter for any operations (fetch/fetch_one/execute, ...).PostgresTransactionDatabaseAdapter with scope=REQUEST (handy for HTTP requests) β same API plus transaction control methods (start/commit/rollback).YAML config example:
postgres:
user: app
password: secret
database: appdb
hosts:
- 10.0.0.1:5432
- 10.0.0.2:5432
min_masters: 1
min_replicas: 1
# optional:
acquire_timeout: 5
refresh_delay: 5
refresh_timeout: 5
fallback_master: false
master_as_replica_weight: 1.0
balancer_policy: greedy # or round_robin / random_weighted
stopwatch_window_size: 100
Plug into the app:
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseConfigProvider,
AsyncpgHAPostgresDatabaseService,
)
app = Application(
YAMLConfigurationService(),
AsyncpgHAPostgresDatabaseService(),
di_providers=[AsyncpgHAPostgresDatabaseConfigProvider()],
)
[!TIP] DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.
You can pass an init callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):
import json
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseConfigProvider,
AsyncpgHAPostgresDatabaseService,
)
class AsyncpgJSONCodecProvider(Provider):
scope = Scope.APP
@provide(override=True)
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
async def set_custom_codecs(conn):
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)
app = Application(
YAMLConfigurationService(),
AsyncpgHAPostgresDatabaseService(),
di_providers=[
AsyncpgHAPostgresDatabaseConfigProvider(),
AsyncpgJSONCodecProvider(),
],
)
[!IMPORTANT]
If you use the built-inAsyncpgPostgresDatabaseConfigProviderorAsyncpgHAPostgresDatabaseConfigProvider, they already register a default provider forAsyncpgPoolFactoryKwargs. To customize pool options, declare your provider with@provide(override=True)so it overrides the built-in one; otherwise container validation will fail.
When you provide your ownAsyncpgPoolFactoryKwargsand there is an existing default provider from those services,override=Trueis mandatory.
Define your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import (
AsyncpgPostgresDatabaseConfig,
AsyncpgPoolFactoryKwargs,
)
from operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService
class EnvAsyncpgConfigProvider(Provider):
scope = Scope.APP
@provide
def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:
return AsyncpgPostgresDatabaseConfig(
user=os.getenv("PGUSER", "app"),
database=os.getenv("PGDATABASE", "appdb"),
host=os.getenv("PGHOST", "127.0.0.1:5432"),
password=os.getenv("PGPASSWORD"),
)
@provide
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
return {}
app = Application(
AsyncpgPostgresDatabaseService(),
di_providers=[EnvAsyncpgConfigProvider()],
)
Example of an environment-based HA config provider:
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseConfigurationService,
AsyncpgHAPostgresDatabaseService,
)
class EnvHasqlConfigProvider(Provider):
scope = Scope.APP
@provide
def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:
hosts = os.getenv("PGHOSTS", "10.0.0.1:5432,10.0.0.2:5432").split(",")
return AsyncpgHAPostgresDatabaseConfig(
user=os.getenv("PGUSER", "app"),
database=os.getenv("PGDATABASE", "appdb"),
hosts=[h.strip() for h in hosts if h.strip()],
password=os.getenv("PGPASSWORD"),
min_masters=int(os.getenv("PG_MIN_MASTERS", "1")),
min_replicas=int(os.getenv("PG_MIN_REPLICAS", "1")),
)
@provide
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
return {}
app = Application(
AsyncpgHAPostgresDatabaseService(),
di_providers=[EnvHasqlConfigProvider()],
)
A built-in integration that initializes sentry-sdk with a logging integration for breadcrumbs and error events. Itβs optional and configured via DI and/or constructor parameters.
Provided components:
SentryService β initializes the SDK on start and closes the client on stop.SentryConfigurationService β registers a config provider into DI.SentryServiceConfigProvider β reads ApplicationDictConfig['sentry'] and decodes it into SentryServiceConfig.Install extra:
pip install 'operetta[sentry]'
How to wire it up:
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.sentry import (
SentryService,
SentryServiceConfigProvider,
)
app = Application(
YAMLConfigurationService(), # optional: load YAML into DI
SentryService(
# You can override any config here; constructor wins over DI
# send_default_pii=False,
# debug=False,
),
di_providers=[SentryServiceConfigProvider()]
)
You can configure SentryService in three complementary ways:
SentryService.__init__) β highest priority.SentryServiceConfig resolved from provider) β overrides defaults.YAML keys (all optional) live under the sentry: section:
sentry:
dsn: https://public@o0.ingest.sentry.io/0
enabled: true
# Logging integration
capture_log_level: ERROR # event level (string or int)
context_log_level: INFO # breadcrumbs level (string or int)
ignore_loggers: # loggers to exclude from breadcrumbs/events
- aiohttp.access
# Core SDK options
environment: production
release: 1.2.3
server_name: api-01
include_local_variables: true
max_breadcrumbs: 100
shutdown_timeout: 2.0
# Sampling
sample_rate: 1.0 # error event sampling
traces_sample_rate: 0.2 # performance tracing sampling
# Error filtering and in-app
ignore_errors:
- TimeoutError
in_app_include:
- myapp
in_app_exclude:
- aiohttp
# Privacy / debug
send_default_pii: false
debug: false
# HTTP body and propagation
max_request_body_size: medium
trace_propagation_targets:
- .*
# Transport tweaks
keep_alive: false
# Anything else passed to sentry_sdk.init (overrides same-named keys above)
extra_options:
profiles_sample_rate: 0.1
Custom config provider example (env-vars):
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.sentry.config import SentryServiceConfig
from operetta.integrations.sentry import SentryService
class EnvSentryConfigProvider(Provider):
scope = Scope.APP
@provide
def get_config(self) -> SentryServiceConfig:
return SentryServiceConfig(
dsn=os.getenv("SENTRY_DSN"),
enabled=os.getenv("SENTRY_ENABLED", "true").lower() == "true",
)
app = Application(
SentryService(),
di_providers=[EnvSentryConfigProvider()],
)
If enabled: false or no dsn is provided, Sentry is skipped (a message is logged).
All other parameters rely on the defaults defined by sentry-sdk itself. Operetta does not override those internal defaults: if you do not set a field in SentryServiceConfig and do not provide it via extra_options, the behavior is identical to calling sentry_sdk.init without that argument. See the official documentation for the full list of options and their default values:
https://docs.sentry.io/platforms/python/configuration/options/
The extra_options parameter lets you supply any additional keys for sentry_sdk.init that do not have a dedicated field in SentryServiceConfig. These keys are merged last (overriding same-named ones) into the final options dict passed to the SDK.
Expose Prometheus metrics over HTTP with zero dependencies on third-party web frameworks. The service uses Python's standard library (asyncio.start_server) to serve metrics efficiently in asyncio environments.
Provided components:
PrometheusService β a tiny asyncio HTTP server exposing metrics on a configured endpoint.PrometheusConfigurationService β registers a config provider into DI.PrometheusServiceConfigProvider β reads ApplicationDictConfig['prometheus'] and decodes it into PrometheusServiceConfig.Install extra:
pip install 'operetta[prometheus]'
How to wire it up:
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.prometheus import (
PrometheusService,
PrometheusServiceConfigProvider,
)
app = Application(
YAMLConfigurationService(),
PrometheusService(), # constructor args override YAML
di_providers=[PrometheusServiceConfigProvider()],
)
You can configure PrometheusService in three complementary ways:
PrometheusService.__init__) β highest priority.PrometheusServiceConfig resolved from provider) β overrides defaults.YAML keys (all optional) live under the prometheus: section:
prometheus:
address: 0.0.0.0
port: 9000
endpoint: /metrics
enabled: true
Custom config provider example (env-vars):
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.prometheus.config import PrometheusServiceConfig
from operetta.integrations.prometheus import PrometheusService
class EnvPromConfigProvider(Provider):
scope = Scope.APP
@provide
def get_config(self) -> PrometheusServiceConfig:
return PrometheusServiceConfig(
address=os.getenv("PROM_ADDRESS", "0.0.0.0"),
port=int(os.getenv("PROM_PORT", "9000")),
endpoint=os.getenv("PROM_ENDPOINT", "/metrics"),
enabled=os.getenv("PROM_ENABLED", "true").lower() == "true",
)
app = Application(
PrometheusService(),
di_providers=[EnvPromConfigProvider()],
)
from operetta import Application
from operetta.integrations.prometheus import PrometheusService
from dishka import Provider, Scope, provide
from prometheus_client import CollectorRegistry
class PromRegistryProvider(Provider):
scope = Scope.APP
@provide
def get_registry(self) -> CollectorRegistry:
return CollectorRegistry()
app = Application(
PrometheusService(),
di_providers=[PromRegistryProvider()],
)
from prometheus_client import Counter, REGISTRY
# Using default (global) registry
REQUESTS = Counter('http_requests_total', 'Count of HTTP requests')
REQUESTS.inc()
# If you provided a custom registry, pass it explicitly when creating metrics:
from prometheus_client import Counter, CollectorRegistry
registry = CollectorRegistry()
CUSTOM_COUNTER = Counter('my_counter', 'Help', registry=registry)
CUSTOM_COUNTER.inc()
/metrics?format=...).FAQs
A lightweight framework for building Python applications that is not tied to a specific transport protocol
We found that operetta demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago.Β It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Reachability analysis for Ruby is now in beta, helping teams identify which vulnerabilities are truly exploitable in their applications.

Research
/Security News
Malicious npm packages use Adspect cloaking and fake CAPTCHAs to fingerprint visitors and redirect victims to crypto-themed scam sites.

Security News
Recent coverage mislabels the latest TEA protocol spam as a worm. Hereβs whatβs actually happening.