You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

@vercel/python

Package Overview
Dependencies
Maintainers
2
Versions
227
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@vercel/python - npm Package Compare versions

Comparing version
6.6.0
to
6.7.0
+2
-2
package.json
{
"name": "@vercel/python",
"version": "6.6.0",
"version": "6.7.0",
"main": "./dist/index.js",

@@ -34,3 +34,3 @@ "license": "Apache-2.0",

"which": "3.0.0",
"@vercel/build-utils": "13.3.0",
"@vercel/build-utils": "13.3.2",
"@vercel/error-utils": "2.0.3"

@@ -37,0 +37,0 @@ },

from __future__ import annotations
import sys
import os
import site
import importlib
import base64
import json
import inspect
import asyncio
import http
import time
import traceback
from importlib import util
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import socket
import functools
import logging
import builtins
from typing import Callable, Literal, TextIO
import contextvars
import contextlib
import atexit
_here = os.path.dirname(__file__)
_vendor_rel = '__VC_HANDLER_VENDOR_DIR'
_vendor = os.path.normpath(os.path.join(_here, _vendor_rel))
if os.path.isdir(_vendor):
# Process .pth files like a real site-packages dir
site.addsitedir(_vendor)
# Move _vendor to the front (after script dir if present)
try:
while _vendor in sys.path:
sys.path.remove(_vendor)
except ValueError:
pass
# Put vendored deps ahead of site-packages but after the script dir
idx = 1 if (sys.path and sys.path[0] in ('', _here)) else 0
sys.path.insert(idx, _vendor)
importlib.invalidate_caches()
def setup_logging(send_message: Callable[[dict], None], storage: contextvars.ContextVar[dict | None]):
# Override logging.Handler to send logs to the platform when a request context is available.
class VCLogHandler(logging.Handler):
def emit(self, record: logging.LogRecord):
try:
message = record.getMessage()
except Exception:
message = repr(getattr(record, "msg", ""))
with contextlib.suppress(Exception):
if record.exc_info:
# logging allows exc_info=True or a (type, value, tb) tuple
exc_info = record.exc_info
if exc_info is True:
exc_info = sys.exc_info()
if isinstance(exc_info, tuple):
tb = ''.join(traceback.format_exception(*exc_info))
if tb:
if message:
message = f"{message}\n{tb}"
else:
message = tb
if record.levelno >= logging.CRITICAL:
level = "fatal"
elif record.levelno >= logging.ERROR:
level = "error"
elif record.levelno >= logging.WARNING:
level = "warn"
elif record.levelno >= logging.INFO:
level = "info"
else:
level = "debug"
context = storage.get()
if context is not None:
send_message({
"type": "log",
"payload": {
"context": {
"invocationId": context['invocationId'],
"requestId": context['requestId'],
},
"message": base64.b64encode(message.encode()).decode(),
"level": level,
}
})
else:
# If IPC is not ready, enqueue the message to be sent later.
enqueue_or_send_message({
"type": "log",
"payload": {
"context": {"invocationId": "0", "requestId": 0},
"message": base64.b64encode(message.encode()).decode(),
"level": level,
}
})
# Override sys.stdout and sys.stderr to map logs to the correct request
class StreamWrapper:
def __init__(self, stream: TextIO, stream_name: Literal["stdout", "stderr"]):
self.stream = stream
self.stream_name = stream_name
def write(self, message: str):
context = storage.get()
if context is not None:
send_message({
"type": "log",
"payload": {
"context": {
"invocationId": context['invocationId'],
"requestId": context['requestId'],
},
"message": base64.b64encode(message.encode()).decode(),
"stream": self.stream_name,
}
})
else:
enqueue_or_send_message({
"type": "log",
"payload": {
"context": {"invocationId": "0", "requestId": 0},
"message": base64.b64encode(message.encode()).decode(),
"stream": self.stream_name,
}
})
def __getattr__(self, name):
return getattr(self.stream, name)
sys.stdout = StreamWrapper(sys.stdout, "stdout")
sys.stderr = StreamWrapper(sys.stderr, "stderr")
logging.basicConfig(level=logging.INFO, handlers=[VCLogHandler()], force=True)
# Ensure built-in print funnels through stdout wrapper so prints are
# attributed to the current request context.
def print_wrapper(func: Callable[..., None]) -> Callable[..., None]:
@functools.wraps(func)
def wrapper(*args, sep=' ', end='\n', file=None, flush=False):
if file is None:
file = sys.stdout
if file in (sys.stdout, sys.stderr):
file.write(sep.join(map(str, args)) + end)
if flush:
file.flush()
else:
# User specified a different file, use original print behavior
func(*args, sep=sep, end=end, file=file, flush=flush)
return wrapper
builtins.print = print_wrapper(builtins.print)
def _stderr(message: str):
with contextlib.suppress(Exception):
_original_stderr.write(message + "\n")
_original_stderr.flush()
# If running in the platform (IPC present), logging must be setup before importing user code so that
# logs happening outside the request context are emitted correctly.
ipc_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
storage: contextvars.ContextVar[dict | None] = contextvars.ContextVar('storage', default=None)
send_message = lambda m: None
_original_stderr = sys.stderr
# Buffer for pre-handshake logs (to avoid blocking IPC on startup)
_ipc_ready = False
_init_log_buf: list[dict] = []
_INIT_LOG_BUF_MAX_BYTES = 1_000_000
_init_log_buf_bytes = 0
def enqueue_or_send_message(msg: dict):
global _init_log_buf_bytes
if _ipc_ready:
send_message(msg)
return
enc_len = len(json.dumps(msg))
if _init_log_buf_bytes + enc_len <= _INIT_LOG_BUF_MAX_BYTES:
_init_log_buf.append(msg)
_init_log_buf_bytes += enc_len
else:
# Fallback so message is not lost if buffer is full
with contextlib.suppress(Exception):
payload = msg.get("payload", {})
decoded = base64.b64decode(payload.get("message", "")).decode(errors="ignore")
_original_stderr.write(decoded + "\n")
def flush_init_log_buf_to_stderr():
global _init_log_buf, _init_log_buf_bytes
try:
combined: list[str] = []
for m in _init_log_buf:
payload = m.get("payload", {})
msg = payload.get("message")
if not msg:
continue
with contextlib.suppress(Exception):
decoded = base64.b64decode(msg).decode(errors="ignore")
combined.append(decoded)
if combined:
_stderr("".join(combined))
except Exception:
pass
finally:
_init_log_buf.clear()
_init_log_buf_bytes = 0
atexit.register(flush_init_log_buf_to_stderr)
if 'VERCEL_IPC_PATH' in os.environ:
with contextlib.suppress(Exception):
ipc_sock.connect(os.getenv("VERCEL_IPC_PATH", ""))
def send_message(message: dict):
with contextlib.suppress(Exception):
ipc_sock.sendall((json.dumps(message) + '\0').encode())
setup_logging(send_message, storage)
# Import relative path https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
try:
user_mod_path = os.path.join(_here, "__VC_HANDLER_ENTRYPOINT") # absolute
__vc_spec = util.spec_from_file_location("__VC_HANDLER_MODULE_NAME", user_mod_path)
__vc_module = util.module_from_spec(__vc_spec)
sys.modules["__VC_HANDLER_MODULE_NAME"] = __vc_module
__vc_spec.loader.exec_module(__vc_module)
__vc_variables = dir(__vc_module)
except Exception:
_stderr(f'Error importing __VC_HANDLER_ENTRYPOINT:')
_stderr(traceback.format_exc())
exit(1)
_use_legacy_asyncio = sys.version_info < (3, 10)
def format_headers(headers, decode=False):
keyToList = {}
for key, value in headers.items():
if decode and 'decode' in dir(key) and 'decode' in dir(value):
key = key.decode()
value = value.decode()
if key not in keyToList:
keyToList[key] = []
keyToList[key].append(value)
return keyToList
class ASGIMiddleware:
"""
ASGI middleware that preserves Vercel IPC semantics for request lifecycle:
- Handles /_vercel/ping
- Extracts x-vercel-internal-* headers and removes them from downstream app
- Sets request context into `storage` for logging/metrics
- Emits handler-started and end IPC messages
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope.get('type') != 'http':
# Non-HTTP traffic is forwarded verbatim
await self.app(scope, receive, send)
return
if scope.get('path') == '/_vercel/ping':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [],
})
await send({
'type': 'http.response.body',
'body': b'',
'more_body': False,
})
return
# Extract internal headers and set per-request context
headers_list = scope.get('headers', []) or []
new_headers = []
invocation_id = "0"
request_id = 0
def _b2s(b: bytes) -> str:
try:
return b.decode()
except Exception:
return ''
for k, v in headers_list:
key = _b2s(k).lower()
val = _b2s(v)
if key == 'x-vercel-internal-invocation-id':
invocation_id = val
continue
if key == 'x-vercel-internal-request-id':
request_id = int(val) if val.isdigit() else 0
continue
if key in ('x-vercel-internal-span-id', 'x-vercel-internal-trace-id'):
continue
new_headers.append((k, v))
new_scope = dict(scope)
new_scope['headers'] = new_headers
# Announce handler start and set context for logging/metrics
send_message({
"type": "handler-started",
"payload": {
"handlerStartedAt": int(time.time() * 1000),
"context": {
"invocationId": invocation_id,
"requestId": request_id,
}
}
})
token = storage.set({
"invocationId": invocation_id,
"requestId": request_id,
})
try:
await self.app(new_scope, receive, send)
finally:
storage.reset(token)
send_message({
"type": "end",
"payload": {
"context": {
"invocationId": invocation_id,
"requestId": request_id,
}
}
})
if 'VERCEL_IPC_PATH' in os.environ:
start_time = time.time()
# Override urlopen from urllib3 (& requests) to send Request Metrics
try:
import urllib3
from urllib.parse import urlparse
def timed_request(func):
fetchId = 0
@functools.wraps(func)
def wrapper(self, method, url, *args, **kwargs):
nonlocal fetchId
fetchId += 1
start_time = int(time.time() * 1000)
result = func(self, method, url, *args, **kwargs)
elapsed_time = int(time.time() * 1000) - start_time
parsed_url = urlparse(url)
context = storage.get()
if context is not None:
send_message({
"type": "metric",
"payload": {
"context": {
"invocationId": context['invocationId'],
"requestId": context['requestId'],
},
"type": "fetch-metric",
"payload": {
"pathname": parsed_url.path,
"search": parsed_url.query,
"start": start_time,
"duration": elapsed_time,
"host": parsed_url.hostname or self.host,
"statusCode": result.status,
"method": method,
"id": fetchId
}
}
})
return result
return wrapper
urllib3.connectionpool.HTTPConnectionPool.urlopen = timed_request(urllib3.connectionpool.HTTPConnectionPool.urlopen)
except:
pass
class BaseHandler(BaseHTTPRequestHandler):
# Re-implementation of BaseHTTPRequestHandler's log_message method to
# log to stdout instead of stderr.
def log_message(self, format, *args):
message = format % args
sys.stdout.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
message.translate(self._control_char_table)))
# Re-implementation of BaseHTTPRequestHandler's handle_one_request method
# to send the end message after the response is fully sent.
def handle_one_request(self):
self.raw_requestline = self.rfile.readline(65537)
if not self.raw_requestline:
self.close_connection = True
return
if not self.parse_request():
return
if self.path == '/_vercel/ping':
self.send_response(200)
self.end_headers()
return
invocationId = self.headers.get('x-vercel-internal-invocation-id')
requestId = int(self.headers.get('x-vercel-internal-request-id'))
del self.headers['x-vercel-internal-invocation-id']
del self.headers['x-vercel-internal-request-id']
del self.headers['x-vercel-internal-span-id']
del self.headers['x-vercel-internal-trace-id']
send_message({
"type": "handler-started",
"payload": {
"handlerStartedAt": int(time.time() * 1000),
"context": {
"invocationId": invocationId,
"requestId": requestId,
}
}
})
token = storage.set({
"invocationId": invocationId,
"requestId": requestId,
})
try:
self.handle_request()
finally:
storage.reset(token)
send_message({
"type": "end",
"payload": {
"context": {
"invocationId": invocationId,
"requestId": requestId,
}
}
})
if 'handler' in __vc_variables or 'Handler' in __vc_variables:
base = __vc_module.handler if ('handler' in __vc_variables) else __vc_module.Handler
if not issubclass(base, BaseHTTPRequestHandler):
_stderr('Handler must inherit from BaseHTTPRequestHandler')
_stderr('See the docs: https://vercel.com/docs/functions/serverless-functions/runtimes/python')
exit(1)
class Handler(BaseHandler, base):
def handle_request(self):
mname = 'do_' + self.command
if not hasattr(self, mname):
self.send_error(
http.HTTPStatus.NOT_IMPLEMENTED,
"Unsupported method (%r)" % self.command)
return
method = getattr(self, mname)
method()
self.wfile.flush()
elif 'app' in __vc_variables:
if (
not inspect.iscoroutinefunction(__vc_module.app) and
not inspect.iscoroutinefunction(__vc_module.app.__call__)
):
from io import BytesIO
string_types = (str,)
app = __vc_module.app
def wsgi_encoding_dance(s, charset="utf-8", errors="replace"):
if isinstance(s, str):
s = s.encode(charset)
return s.decode("latin1", errors)
class Handler(BaseHandler):
def handle_request(self):
# Prepare WSGI environment
if '?' in self.path:
path, query = self.path.split('?', 1)
else:
path, query = self.path, ''
content_length = int(self.headers.get('Content-Length', 0))
env = {
'CONTENT_LENGTH': str(content_length),
'CONTENT_TYPE': self.headers.get('content-type', ''),
'PATH_INFO': path,
'QUERY_STRING': query,
'REMOTE_ADDR': self.headers.get(
'x-forwarded-for', self.headers.get(
'x-real-ip')),
'REQUEST_METHOD': self.command,
'SERVER_NAME': self.headers.get('host', 'lambda'),
'SERVER_PORT': self.headers.get('x-forwarded-port', '80'),
'SERVER_PROTOCOL': 'HTTP/1.1',
'wsgi.errors': sys.stderr,
'wsgi.input': BytesIO(self.rfile.read(content_length)),
'wsgi.multiprocess': False,
'wsgi.multithread': False,
'wsgi.run_once': False,
'wsgi.url_scheme': self.headers.get('x-forwarded-proto', 'http'),
'wsgi.version': (1, 0),
}
for key, value in env.items():
if isinstance(value, string_types):
env[key] = wsgi_encoding_dance(value)
for k, v in self.headers.items():
env['HTTP_' + k.replace('-', '_').upper()] = v
def start_response(status, headers, exc_info=None):
self.send_response(int(status.split(' ')[0]))
for name, value in headers:
self.send_header(name, value)
self.end_headers()
return self.wfile.write
# Call the application
response = app(env, start_response)
try:
for data in response:
if data:
self.wfile.write(data)
self.wfile.flush()
finally:
if hasattr(response, 'close'):
response.close()
else:
# ASGI: Run with Uvicorn so we get proper lifespan and protocol handling
try:
import uvicorn
except Exception:
_stderr('Uvicorn is required to run ASGI apps. Please ensure it is installed.')
exit(1)
# Prefer a callable app.asgi when available; some frameworks expose a boolean here
user_app_candidate = getattr(__vc_module.app, 'asgi', None)
user_app = user_app_candidate if callable(user_app_candidate) else __vc_module.app
asgi_app = ASGIMiddleware(user_app)
# Pre-bind a socket to obtain an ephemeral port for IPC announcement
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 0))
sock.listen(2048)
http_port = sock.getsockname()[1]
config = uvicorn.Config(
app=asgi_app,
fd=sock.fileno(),
lifespan='auto',
access_log=False,
log_config=None,
log_level='warning',
)
server = uvicorn.Server(config)
send_message({
"type": "server-started",
"payload": {
"initDuration": int((time.time() - start_time) * 1000),
"httpPort": http_port,
}
})
# Mark IPC as ready and flush any buffered init logs
_ipc_ready = True
for m in _init_log_buf:
send_message(m)
_init_log_buf.clear()
# Run the server (blocking)
server.run()
# If the server ever returns, exit
sys.exit(0)
if 'Handler' in locals():
server = ThreadingHTTPServer(('127.0.0.1', 0), Handler)
send_message({
"type": "server-started",
"payload": {
"initDuration": int((time.time() - start_time) * 1000),
"httpPort": server.server_address[1],
}
})
# Mark IPC as ready and flush any buffered init logs
_ipc_ready = True
for m in _init_log_buf:
send_message(m)
_init_log_buf.clear()
server.serve_forever()
_stderr('Missing variable `handler` or `app` in file "__VC_HANDLER_ENTRYPOINT".')
_stderr('See the docs: https://vercel.com/docs/functions/serverless-functions/runtimes/python')
exit(1)
if 'handler' in __vc_variables or 'Handler' in __vc_variables:
base = __vc_module.handler if ('handler' in __vc_variables) else __vc_module.Handler
if not issubclass(base, BaseHTTPRequestHandler):
print('Handler must inherit from BaseHTTPRequestHandler')
print('See the docs: https://vercel.com/docs/functions/serverless-functions/runtimes/python')
exit(1)
print('using HTTP Handler')
from http.server import HTTPServer
import http
import _thread
server = HTTPServer(('127.0.0.1', 0), base)
port = server.server_address[1]
def vc_handler(event, context):
_thread.start_new_thread(server.handle_request, ())
payload = json.loads(event['body'])
path = payload['path']
headers = payload['headers']
method = payload['method']
encoding = payload.get('encoding')
body = payload.get('body')
if (
(body is not None and len(body) > 0) and
(encoding is not None and encoding == 'base64')
):
body = base64.b64decode(body)
request_body = body.encode('utf-8') if isinstance(body, str) else body
conn = http.client.HTTPConnection('127.0.0.1', port)
try:
conn.request(method, path, headers=headers, body=request_body)
except (http.client.HTTPException, socket.error) as ex:
print ("Request Error: %s" % ex)
res = conn.getresponse()
return_dict = {
'statusCode': res.status,
'headers': format_headers(res.headers),
}
data = res.read()
try:
return_dict['body'] = data.decode('utf-8')
except UnicodeDecodeError:
return_dict['body'] = base64.b64encode(data).decode('utf-8')
return_dict['encoding'] = 'base64'
return return_dict
elif 'app' in __vc_variables:
if (
not inspect.iscoroutinefunction(__vc_module.app) and
not inspect.iscoroutinefunction(__vc_module.app.__call__)
):
print('using Web Server Gateway Interface (WSGI)')
from io import BytesIO
from urllib.parse import urlparse
from werkzeug.datastructures import Headers
from werkzeug.wrappers import Response
string_types = (str,)
def to_bytes(x, charset=sys.getdefaultencoding(), errors="strict"):
if x is None:
return None
if isinstance(x, (bytes, bytearray, memoryview)):
return bytes(x)
if isinstance(x, str):
return x.encode(charset, errors)
raise TypeError("Expected bytes")
def wsgi_encoding_dance(s, charset="utf-8", errors="replace"):
if isinstance(s, str):
s = s.encode(charset)
return s.decode("latin1", errors)
def vc_handler(event, context):
payload = json.loads(event['body'])
headers = Headers(payload.get('headers', {}))
body = payload.get('body', '')
if body != '':
if payload.get('encoding') == 'base64':
body = base64.b64decode(body)
if isinstance(body, string_types):
body = to_bytes(body, charset='utf-8')
url = urlparse(payload['path'])
query = url.query
path = url.path
environ = {
'CONTENT_LENGTH': str(len(body)),
'CONTENT_TYPE': headers.get('content-type', ''),
'PATH_INFO': path,
'QUERY_STRING': query,
'REMOTE_ADDR': headers.get(
'x-forwarded-for', headers.get(
'x-real-ip', payload.get(
'true-client-ip', ''))),
'REQUEST_METHOD': payload['method'],
'SERVER_NAME': headers.get('host', 'lambda'),
'SERVER_PORT': headers.get('x-forwarded-port', '80'),
'SERVER_PROTOCOL': 'HTTP/1.1',
'event': event,
'context': context,
'wsgi.errors': sys.stderr,
'wsgi.input': BytesIO(body),
'wsgi.multiprocess': False,
'wsgi.multithread': False,
'wsgi.run_once': False,
'wsgi.url_scheme': headers.get('x-forwarded-proto', 'http'),
'wsgi.version': (1, 0),
}
for key, value in environ.items():
if isinstance(value, string_types):
environ[key] = wsgi_encoding_dance(value)
for key, value in headers.items():
key = 'HTTP_' + key.upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
response = Response.from_app(__vc_module.app, environ)
return_dict = {
'statusCode': response.status_code,
'headers': format_headers(response.headers)
}
if response.data:
return_dict['body'] = base64.b64encode(response.data).decode('utf-8')
return_dict['encoding'] = 'base64'
return return_dict
else:
print('using Asynchronous Server Gateway Interface (ASGI)')
# Originally authored by Jordan Eremieff and included under MIT license:
# https://github.com/erm/mangum/blob/b4d21c8f5e304a3e17b88bc9fa345106acc50ad7/mangum/__init__.py
# https://github.com/erm/mangum/blob/b4d21c8f5e304a3e17b88bc9fa345106acc50ad7/LICENSE
import asyncio
import enum
from urllib.parse import urlparse
from werkzeug.datastructures import Headers
class ASGICycleState(enum.Enum):
REQUEST = enum.auto()
RESPONSE = enum.auto()
class ASGICycle:
def __init__(self, scope):
self.scope = scope
self.body = b''
self.state = ASGICycleState.REQUEST
self.app_queue = None
self.response = {}
def __call__(self, app, body):
"""
Receives the application and any body included in the request, then builds the
ASGI instance using the connection scope.
Runs until the response is completely read from the application.
"""
if _use_legacy_asyncio:
loop = asyncio.new_event_loop()
self.app_queue = asyncio.Queue(loop=loop)
else:
self.app_queue = asyncio.Queue()
self.put_message({'type': 'http.request', 'body': body, 'more_body': False})
asgi_instance = app(self.scope, self.receive, self.send)
if _use_legacy_asyncio:
asgi_task = loop.create_task(asgi_instance)
loop.run_until_complete(asgi_task)
else:
asyncio.run(self.run_asgi_instance(asgi_instance))
return self.response
async def run_asgi_instance(self, asgi_instance):
await asgi_instance
def put_message(self, message):
self.app_queue.put_nowait(message)
async def receive(self):
"""
Awaited by the application to receive messages in the queue.
"""
message = await self.app_queue.get()
return message
async def send(self, message):
"""
Awaited by the application to send messages to the current cycle instance.
"""
message_type = message['type']
if self.state is ASGICycleState.REQUEST:
if message_type != 'http.response.start':
raise RuntimeError(
f"Expected 'http.response.start', received: {message_type}"
)
status_code = message['status']
raw_headers = message.get('headers', [])
# Headers from werkzeug transform bytes header value
# from b'value' to "b'value'" so we need to process
# ASGI headers manually
decoded_headers = []
for key, value in raw_headers:
decoded_key = key.decode() if isinstance(key, bytes) else key
decoded_value = value.decode() if isinstance(value, bytes) else value
decoded_headers.append((decoded_key, decoded_value))
headers = Headers(decoded_headers)
self.on_request(headers, status_code)
self.state = ASGICycleState.RESPONSE
elif self.state is ASGICycleState.RESPONSE:
if message_type != 'http.response.body':
raise RuntimeError(
f"Expected 'http.response.body', received: {message_type}"
)
body = message.get('body', b'')
more_body = message.get('more_body', False)
# The body must be completely read before returning the response.
self.body += body
if not more_body:
self.on_response()
self.put_message({'type': 'http.disconnect'})
def on_request(self, headers, status_code):
self.response['statusCode'] = status_code
self.response['headers'] = format_headers(headers, decode=True)
def on_response(self):
if self.body:
self.response['body'] = base64.b64encode(self.body).decode('utf-8')
self.response['encoding'] = 'base64'
def vc_handler(event, context):
payload = json.loads(event['body'])
headers = payload.get('headers', {})
body = payload.get('body', b'')
if payload.get('encoding') == 'base64':
body = base64.b64decode(body)
elif not isinstance(body, bytes):
body = body.encode()
url = urlparse(payload['path'])
query = url.query.encode()
path = url.path
headers_encoded = []
for k, v in headers.items():
# Cope with repeated headers in the encoding.
if isinstance(v, list):
headers_encoded.append([k.lower().encode(), [i.encode() for i in v]])
else:
headers_encoded.append([k.lower().encode(), v.encode()])
scope = {
'server': (headers.get('host', 'lambda'), headers.get('x-forwarded-port', 80)),
'client': (headers.get(
'x-forwarded-for', headers.get(
'x-real-ip', payload.get(
'true-client-ip', ''))), 0),
'scheme': headers.get('x-forwarded-proto', 'http'),
'root_path': '',
'query_string': query,
'headers': headers_encoded,
'type': 'http',
'http_version': '1.1',
'method': payload['method'],
'path': path,
'raw_path': path.encode(),
}
asgi_cycle = ASGICycle(scope)
response = asgi_cycle(__vc_module.app, body)
return response
else:
print('Missing variable `handler` or `app` in file "__VC_HANDLER_ENTRYPOINT".')
print('See the docs: https://vercel.com/docs/functions/serverless-functions/runtimes/python')
exit(1)

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display