supabase-py-async


async-part of supabase-py Python client for Supabase-py
Installation
We recommend activating your virtual environment. For example, we like poetry
and conda
!
PyPi installation
Install the package (for > Python 3.9):
pip install supabase-py-async
poetry add supabase-py-async
Local installation
You can also install locally after cloning this repo. Install Development mode with pip install -e
, which makes it so when you edit the source code the changes will be reflected in your python module.
differences
what's different from supabase-py?
- a optional of
access_token: str | None = None,
key word argument in create_client
function, which is used to set the Authorization
header in the request.
- more tests on crud operations and auth operations.
more tutorials in
Usage
Set your Supabase environment variables in a dotenv file, or using the shell:
export SUPABASE_URL="my-url-to-my-awesome-supabase-instance"
export SUPABASE_KEY="my-supa-dupa-secret-supabase-api-key"
init the client in fastapi lifespan event
import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI
from supabase_py_async import create_client
from supabase_py_async.lib.client_options import ClientOptions
client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
""" life span events"""
identify_worker = None
try:
load_dotenv()
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
client = await create_client(url, key, options=ClientOptions(
postgrest_client_timeout=10, storage_client_timeout=10))
yield
finally:
pass
Use the supabase client to interface with your database.
Authenticate
async def authenticate(email: str, password: str):
""" authenticate user """
random_email: str = "3hf82fijf92@supamail.com"
random_password: str = "fqj13bnf2hiu23h"
user = await client.auth.sign_in(email=email, password=password)
Sign-in
async def sign_in(email: str, password: str):
""" sign in user """
random_email: str = "3hf82fijf92@supamail.com"
random_password: str = "fqj13bnf2hiu23h"
user = await client.auth.sign_in_with_password({ "email": random_email, "password": random_password })
Insert Data
async def insert_data():
""" insert data """
data = await client.table("countries").insert({"name":"Germany"}).execute()
assert len(data.data) > 0
Select Data
async def select_data():
""" select data """
data = await client.table("countries").select("*").execute()
assert len(data.data) > 0
Update Data
async def update_data():
""" update data """
data = await client.table("countries").update({"country": "Indonesia", "capital_city": "Jakarta"}).eq("id", 1).execute()
assert len(data.data) > 0
Update data with duplicate keys
async def update_data_with_duplicate_keys():
""" update data with duplicate keys """
data = await client.table("countries").update({"country": "Indonesia", "capital_city": "Jakarta"}).eq("id", 1).execute()
Delete Data
async def delete_data():
""" delete data """
data = await client.table("countries").delete().eq("id", 1).execute()
Call Edge Functions
async def test_func():
try:
resp = await client.functions.invoke("hello-world", invoke_options={'body':{}})
return resp
except (FunctionsRelayError, FunctionsHttpError) as exception:
err = exception.to_dict()
print(err.get("message"))
Download a file from Storage
async def download_file():
""" download file """
bucket_name: str = "photos"
data = await client.storage.from_(bucket_name).download("photo1.png")
Upload a file
async def upload_file():
""" upload file """
bucket_name: str = "photos"
new_file = getUserFile()
data = await client.storage.from_(bucket_name).upload("/user1/profile.png", new_file)
Remove a file
async def remove_file():
""" remove file """
bucket_name: str = "photos"
data = await client.storage.from_(bucket_name).remove(["old_photo.png", "image5.jpg"])
List all files
async def list_files():
""" list files """
bucket_name: str = "photos"
data = await client.storage.from_(bucket_name).list()
Move and rename files
async def move_files():
""" move files """
bucket_name: str = "charts"
old_file_path: str = "generic/graph1.png"
new_file_path: str = "important/revenue.png"
data = await client.storage.from_(bucket_name).move(old_file_path, new_file_path)
Roadmap
Overall Tasks:
Contributing
Contributing to the Python libraries are a great way to get involved with the Supabase community. Reach out to us on Discord if you want to get involved.