Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
An event dispatching/handling library for FastAPI, and Starlette.
Features:
async def
) are treated as first-class citizensfastapi_events
providesIf you use or appreciate this project, please consider giving it a star to help it reach more developers. Thanks =)
pip install fastapi-events
To use it with AWS handlers, install:
pip install fastapi-events[aws]
To use it with GCP handlers. install:
pip install fastapi-events[google]
To enable OpenTelemetry (OTEL) support, install:
pip install fastapi-events[otel]
fastapi-events
supports both FastAPI and Starlette. To use it, simply configure it as middleware.
Configuring fastapi-events
for FastAPI:
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import JSONResponse
from fastapi_events.dispatcher import dispatch
from fastapi_events.middleware import EventHandlerASGIMiddleware
from fastapi_events.handlers.local import local_handler
app = FastAPI()
app.add_middleware(EventHandlerASGIMiddleware,
handlers=[local_handler]) # registering handler(s)
@app.get("/")
def index(request: Request) -> JSONResponse:
dispatch("my-fancy-event", payload={"id": 1}) # Emit events anywhere in your code
return JSONResponse()
Configuring fastapi-events
for Starlette:
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from fastapi_events.dispatcher import dispatch
from fastapi_events.handlers.local import local_handler
from fastapi_events.middleware import EventHandlerASGIMiddleware
app = Starlette(middleware=[
Middleware(EventHandlerASGIMiddleware,
handlers=[local_handler]) # registering handlers
])
@app.route("/")
async def root(request: Request) -> JSONResponse:
dispatch("new event", payload={"id": 1}) # Emit events anywhere in your code
return JSONResponse()
Configuring fastapi-events
for Starlite:
from starlite.app import Starlite
from starlite.enums import MediaType
from starlite.handlers import get
from starlite.middleware import DefineMiddleware
from fastapi_events.dispatcher import dispatch
from fastapi_events.handlers.local import local_handler
from fastapi_events.middleware import EventHandlerASGIMiddleware
@get(path="/", media_type=MediaType.TEXT)
async def root() -> str:
dispatch("new event", payload={"id": 1}) # Emit events anywhere in your code
return "OK"
app = Starlite(middleware=[
DefineMiddleware(EventHandlerASGIMiddleware,
handlers=[local_handler]) # registering handlers
],
route_handlers=[root],
)
Events can be dispatched anywhere in the code, provided that they are dispatched before a response is generated.
# anywhere in code
from fastapi_events.dispatcher import dispatch
dispatch(
"cat-requested-a-fish", # Event name, accepts any valid string
payload={"cat_id": "fd375d23-b0c9-4271-a9e0-e028c4cd7230"} # Event payload, accepts any arbitrary data
)
dispatch("a_cat_is_spotted") # This works too!
New feature since version 0.10.0
It is now possible to dispatch pydantic model as events. A special thanks to @WilliamStam for introducing this remarkable idea.
# anywhere in code
import pydantic
from fastapi_events.dispatcher import dispatch
class CatRequestedAFishEvent(pydantic.BaseModel):
__event_name__ = "cat-requested-a-fish"
cat_id: pydantic.UUID4
# Option 2 - dispatching event with pydantic model
dispatch(CatRequestedAFishEvent(cat_id="fd375d23-b0c9-4271-a9e0-e028c4cd7230"))
# which is equivalent to:
dispatch("cat-requested-a-fish", payload={"cat_id": "fd375d23-b0c9-4271-a9e0-e028c4cd7230"})
Since version 0.3.0, event payload validation is possible. To enable this feature, register a Pydantic model with the corresponding event name.
>=0.10.0: Event name can now be defined as a part of the payload schema as
__event_name__
import uuid
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
from fastapi_events.registry.payload_schema import registry as payload_schema
class UserEvents(Enum):
SIGNED_UP = "USER_SIGNED_UP"
ACTIVATED = "USER_ACTIVATED"
# Registering your event payload schema
@payload_schema.register(event_name=UserEvents.SIGNED_UP)
class SignUpPayload(BaseModel):
user_id: uuid.UUID
created_at: datetime
# which is also equivalent to
@payload_schema.register
class SignUpPayload(BaseModel):
__event_name__ = "USER_SIGNED_UP"
user_id: uuid.UUID
created_at: datetime
Wildcard in event name is currently not supported
The payload will be validated automatically without any changes required when invoking the dispatcher.
# Events with payload schema registered
dispatch(UserEvents.SIGNED_UP) # raises ValidationError, missing payload
dispatch(UserEvents.SIGNED_UP,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"}) # raises ValidationError, missing `created_at`
dispatch(UserEvents.SIGNED_UP,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a", "created_at": datetime.utcnow()}) # OK!
# Events without payload schema -> No validation will be performed
dispatch(UserEvents.ACTIVATED,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"}) # OK! no validation will be performed
# Events dispatched with Pydantic model (>=0.10.0) -> Validation will be skipped since it would have been already validated
# If you choose to do this, you must ensure __event_name__ is defined in SignUpPayload
dispatch(SignUpPayload(user_id="9e79cdbb-b216-40f7-9a05-20d223dee89a", created_at=datetime.utcnow()))
Payload validation is optional. Payload of events without its schema registered will not be validated.
The flexibility of fastapi-events
enales customisation of how events should be handled. To begin, you may want to handle your events locally.
# ex: in handlers.py
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event
@local_handler.register(event_name="cat*")
def handle_all_cat_events(event: Event):
"""
this handler will match with an events prefixed with `cat`.
ex: "cat_eats_a_fish", "cat_is_cute", etc
"""
# the `event` argument is nothing more than a tuple of event name and payload
event_name, payload = event
# TODO do anything you'd like with the event
@local_handler.register(event_name="cat*") # Tip: You can register several handlers with the same event name
def handle_all_cat_events_another_way(event: Event):
pass
@local_handler.register(event_name="*")
async def handle_all_events(event: Event):
# event handlers can be coroutine function too (`async def`)
pass
new feature in fastapi-events>=0.9.0
Dependencies can now be utilized with local handlers, and sub-dependencies are also supported.
As of now, dependencies utilizing a generator (with the yield
keyword) are not yet supported.
# ex: in handlers.py
from fastapi import Depends
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event
async def get_db_conn():
pass # return a DB conn
async def get_db_session(
db_conn=Depends(get_db_conn)
):
pass # return a DB session created from `db_conn`
@local_handler.register(event_name="*")
async def handle_all_events(
event: Event,
db_session=Depends(get_db_session)
):
# use the `db_session` here
pass
In larger projects, it's common to have dedicated services for handling events separately.
For example, fastapi-events
includes an AWS SQS forwarder, allowing you to forward events to a remote queue.
Register SQSForwardHandler
as handlers:
app = FastAPI()
app.add_middleware(EventHandlerASGIMiddleware,
handlers=[SQSForwardHandler(queue_url="test-queue",
region_name="eu-central-1")]) # registering handler(s)
Start dispatching events! By default, events will be serialised into JSON format:
["event name", {"payload": "here is the payload"}]
Tip: to pipe events to multiple queues, provide multiple handlers while adding
EventHandlerASGIMiddleware
.
Here is a list of built-in event handlers:
LocalHandler
/ local_handler
:
fastapi_events.handlers.local
fnmatch
)SQSForwardHandler
:
fastapi_events.handlers.aws
EchoHandler
:
fastapi_events.handlers.echo
pprint
. Great for debugging purposeGoogleCloudSimplePubSubHandler
:
fastapi_events.handlers.gcp
Creating your own handler is as simple as inheriting from the BaseEventHandler
class
in fastapi_events.handlers.base
.
To handle events, fastapi_events
calls one of these methods, following this priority order:
handle_many(events)
:
The coroutine function should expect the backlog of the events collected.
handle(event)
:
If handle_many()
is not defined in your custom handler, handle()
will be called by iterating through the events in the backlog.
from typing import Iterable
from fastapi_events.typing import Event
from fastapi_events.handlers.base import BaseEventHandler
class MyOwnEventHandler(BaseEventHandler):
async def handle(self, event: Event) -> None:
"""
Handle events one by one
"""
pass
async def handle_many(self, events: Iterable[Event]) -> None:
"""
Handle events by batch
"""
pass
Since version 0.7.0, OpenTelemetry support has been added as an optional feature.
To enable it, make sure you install the following optional modules:
pip install fastapi-events[otel]
Note that no instrumentation library is needed as fastapi_events supports OTEL natively
Spans will be created when:
fastapi_events.dispatcher.dispatch
is invoked,fastapi_events.handlers.local.LocalHandler
is handling an eventSupport for other handlers will be added in the future.
dispatch()
GloballyIf you wish to globally suppress events, especially during testing, you can achieve this without having to mock or patch the dispatch() function. Simply set the environment variable FASTAPI_EVENTS_DISABLE_DISPATCH to 1, True, or any truthy values.
This feature requires Pydantic, which is included with FastAPI. If you're using Starlette, ensure that Pydantic is installed separately.
See Event Payload Validation With Pydantic
It is now possible to dispatch events within another event handlers. You'll need version 0.4 or above.
Comparison between events dispatched within the request-response cycle and event handlers are:
dispatched within request-response cycle | dispatched within event handlers | |
---|---|---|
processing of events | will be handled after the response has been made | will be scheduled to the running event loop immediately |
order of processing | always after the response is made | not guaranteed |
supports payload schema validation with Pydantic | Yes | Yes |
can be disabled globally with FASTAPI_EVENTS_DISABLE_DISPATCH | Yes | Yes |
One of the goals of fastapi-events
is to dispatch events without the need to manage specific instance of EventHandlerASGIMiddleware
.
By default, this is handled using ContextVars
.
However, there are scenarios where users may want to dispatch events outside the standard request sequence.
This can be achieved by generating a custom identifier for the middleware.
By default, the middleware identifier is generated from the object ID of the EventHandlerASGIMiddleware
instance and is managed internally without user intervention.
If a user needs to dispatch events outside of a request-response lifecycle, they can generate a custom middleware_id
value and passed it to EventHandlerASGIMiddleware
during its creation.
This value can then be used with dispatch()
to ensure the correct EventHandlerASGIMiddleware
instance is selected.
It's important to note that dispatching events during a request does not require the middleware_id. The dispatcher will automatically discover the appropriate event handler.
In the following example, the ID is generated using the object ID of the FastAPI
instance.
The middleware identifier must be a unique int
, but there are no other restrictions.
import asyncio
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import JSONResponse
from fastapi_events.dispatcher import dispatch
from fastapi_events.middleware import EventHandlerASGIMiddleware
from fastapi_events.handlers.local import local_handler
app = FastAPI()
event_handler_id: int = id(app)
app.add_middleware(EventHandlerASGIMiddleware,
handlers=[local_handler], # registering handler(s)
middleware_id=event_handler_id) # register custom middleware id
async def dispatch_task() -> None:
""" background task to dispatch autonomous events """
for i in range(100):
# without the middleware_id, this call would raise a LookupError
dispatch("date", payload={"idx": i}, middleware_id=event_handler_id)
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event() -> None:
asyncio.create_task(dispatch_task())
@app.get("/")
def index(request: Request) -> JSONResponse:
dispatch("hello", payload={"id": 1}) # Emit events anywhere in your code
return JSONResponse({"detail": {"msg": "hello world"}})
I'm getting LookupError
when dispatch()
is used:
def dispatch(event_name: str, payload: Optional[Any] = None) -> None:
> q: Deque[Event] = event_store.get()
E LookupError: <ContextVar name='fastapi_context' at 0x400a1f12b0>
Answer:
The proper functioning of dispatch()
relies on ContextVars.
Various factors can lead to a LookupError, with a common cause being the invocation of dispatch()
outside the request-response lifecycle of FastAPI/Starlette, such as calling dispatch()
after a response has been returned.
If you encounter this issue, a workaround is available by using a user-defined middleware_id. Refer to Dispatching Events Outside of a Request for details.
If you're encountering this during testing, consider disabling dispatch()
for testing purposes.
Refer to Suppressing Events / Disabling dispatch()
Globally for
details.
My event handlers are not registered / Local handlers are not being executed:
Answer:
To ensure that the module where your local event handlers are defined is loaded during runtime, make sure to import the module in your init.py. This straightforward fix guarantees the proper loading of modules during runtime.
Any form of feedback and questions are welcome! Please create an issue here.
FAQs
Event dispatching library for FastAPI
We found that fastapi-events demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.