
Security News
CISA Kills Off RSS Feeds for KEVs and Cyber Alerts
CISA is discontinuing official RSS support for KEV and cybersecurity alerts, shifting updates to email and social media, disrupting automation workflows.
FastAPI plugin for CloudEvents Integration
Allows to easily consume and produce CloudEvents over REST API.
Automatically parses CloudEvents both in the binary and structured format and
provides an interface very similar to the regular FastAPI interface. No more
hustling with to_structured
and from_http
function calls!
@app.post("/")
async def on_event(event: CloudEvent) -> CloudEvent:
pass
See more examples below
pip install fastapi-cloudevents
import uvicorn
from fastapi import FastAPI
from fastapi_cloudevents import CloudEvent, install_fastapi_cloudevents
app = FastAPI()
app = install_fastapi_cloudevents(app)
@app.post("/")
async def on_event(event: CloudEvent) -> CloudEvent:
return CloudEvent(
type="my.response-type.v1",
data=event.data,
datacontenttype=event.datacontenttype,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
The rout accepts both binary CloudEvents
curl http://localhost:8000 -i -X POST -d "Hello World!" \
-H "Content-Type: text/plain" \
-H "ce-specversion: 1.0" \
-H "ce-type: my.request-type.v1" \
-H "ce-id: 123" \
-H "ce-source: my-source"
And structured CloudEvents
curl http://localhost:8000 -i -X POST -H "Content-Type: application/json" \
-d '{"data":"Hello World", "source":"my-source", "id":"123", "type":"my.request-type.v1","specversion":"1.0"}'
Both of the requests will yield a response in the same format:
HTTP/1.1 200 OK
date: Fri, 05 Aug 2022 23:50:52 GMT
server: uvicorn
content-length: 13
content-type: application/json
ce-specversion: 1.0
ce-id: 25cd28f0-0605-4a76-b1d8-cffbe3375413
ce-source: http://localhost:8000/
ce-type: my.response-type.v1
ce-time: 2022-08-05T23:50:52.809697+00:00
"Hello World"
from typing import Literal, Union
import uvicorn
from fastapi import FastAPI, Body
from pydantic import Field
from typing_extensions import Annotated
from fastapi_cloudevents import (
CloudEvent,
CloudEventSettings,
ContentMode,
install_fastapi_cloudevents,
)
app = FastAPI()
app = install_fastapi_cloudevents(
app, settings=CloudEventSettings(default_response_mode=ContentMode.structured)
)
class MyEvent(CloudEvent):
type: Literal["my.type.v1"]
class YourEvent(CloudEvent):
type: Literal["your.type.v1"]
OurEvent = Annotated[Union[MyEvent, YourEvent], Body(discriminator="type")]
_source = "dummy:source"
@app.post("/")
async def on_event(event: OurEvent) -> CloudEvent:
if isinstance(event, MyEvent):
return CloudEvent(
type="my.response-type.v1",
data=f"got {event.data} from my event!",
datacontenttype="text/plain",
)
else:
return CloudEvent(
type="your.response-type.v1",
data=f"got {event.data} from your event!",
datacontenttype="text/plain",
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8002)
To send the response in the http CloudEvent structured format, you MAY use the
BinaryCloudEventResponse
class
import uvicorn
from fastapi import FastAPI
from fastapi_cloudevents import (CloudEvent, StructuredCloudEventResponse,
install_fastapi_cloudevents)
app = FastAPI()
app = install_fastapi_cloudevents(app)
@app.post("/", response_class=StructuredCloudEventResponse)
async def on_event(event: CloudEvent) -> CloudEvent:
return CloudEvent(
type="com.my-corp.response.v1",
data=event.data,
datacontenttype=event.datacontenttype,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
curl http://localhost:8001 -i -X POST -d "Hello World!" \
-H "Content-Type: text/plain" \
-H "ce-specversion: 1.0" \
-H "ce-type: my.request-type.v1" \
-H "ce-id: 123" \
-H "ce-source: my-source"
HTTP/1.1 200 OK
date: Fri, 05 Aug 2022 23:51:26 GMT
server: uvicorn
content-length: 247
content-type: application/json
{"data":"Hello World!","source":"http://localhost:8001/","id":"3412321f-85b3-4f7f-a551-f4c23a05de3a","type":"com.my-corp.response.v1","specversion":"1.0","time":"2022-08-05T23:51:26.878723+00:00","datacontenttype":"text/plain"}
FAQs
FastAPI plugin for CloudEvents Integration
We found that fastapi-cloudevents demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
CISA is discontinuing official RSS support for KEV and cybersecurity alerts, shifting updates to email and social media, disrupting automation workflows.
Security News
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
Research
Security News
Socket uncovers an npm Trojan stealing crypto wallets and BullX credentials via obfuscated code and Telegram exfiltration.