You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

apscale

Package Overview
Dependencies
Maintainers
1
Versions
69
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

apscale - pypi Package Compare versions

Comparing version
4.1.6
to
4.2.0
+1
-1
apscale.egg-info/PKG-INFO
Metadata-Version: 2.4
Name: apscale
Version: 4.1.6
Version: 4.2.0
Summary: Advanced Pipeline for Simple yet Comprehensive AnaLysEs of DNA metabarcoding data

@@ -5,0 +5,0 @@ Home-page: https://github.com/DominikBuchner/apscale

@@ -72,8 +72,12 @@ import streamlit as st

# try to fetch the new name, else return a NULL value
try:
species_name = api_response["species"]
except KeyError:
species_name = pd.NA
# keep repeating until all data is fetched
while True:
try:
species_name, key = api_response["species"], api_response["usageKey"]
break
except KeyError:
species_name, key = pd.NA, pd.NA
break
return species_name
return species_name, key

@@ -105,3 +109,3 @@

# fetch a chunk
chunk = read_data_to_modify.fetch_df_chunk(100)
chunk = read_data_to_modify.fetch_df_chunk()
if chunk.empty:

@@ -116,9 +120,10 @@ break

species_names = chunk[species_column].to_list()
corrected_names = Parallel(n_jobs=-2)(
api_data = Parallel(n_jobs=-2)(
delayed(api_request)(name) for name in species_names
)
corrected_names, taxon_keys = zip(*api_data)
# create an output dataframe
chunk_df = pd.DataFrame(
data=zip(species_names, corrected_names),
columns=[species_column, "gbif_taxonomy"],
data=zip(species_names, corrected_names, taxon_keys),
columns=[species_column, "gbif_taxonomy", "gbif_taxon_key"],
)

@@ -141,17 +146,19 @@ # save to intermediate parquet untill all chunks are finished

# drop a potential existing column
try:
read_data_to_modify.execute(
"ALTER TABLE sequence_metadata DROP COLUMN gbif_taxonomy"
)
# add a fresh column
read_data_to_modify.execute(
"ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT"
)
except duckdb.BinderException:
# add a fresh column
read_data_to_modify.execute(
"ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT"
)
# drop potential existing column
for col in ["gbif_taxonomy", "gbif_usage_key"]:
try:
read_data_to_modify.execute(
f"ALTER TABLE sequence_metadata DROP COLUMN {col}"
)
except duckdb.BinderException:
pass
# add the new columns
read_data_to_modify.execute(
"ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT"
)
read_data_to_modify.execute(
"ALTER TABLE sequence_metadata ADD COLUMN gbif_usage_key BIGINT"
)
# add to the sequence metadata

@@ -161,3 +168,5 @@ read_data_to_modify.execute(

UPDATE sequence_metadata AS smd
SET gbif_taxonomy = gtp.gbif_taxonomy
SET
gbif_taxonomy = gtp.gbif_taxonomy,
gbif_usage_key = gtp.gbif_taxon_key
FROM gbif_tax_parquet AS gtp

@@ -169,17 +178,17 @@ WHERE smd."{species_column}" = gtp."{species_column}"

# also add to the group metadata
# drop a potential existing column
try:
read_data_to_modify.execute(
"ALTER TABLE group_metadata DROP COLUMN gbif_taxonomy"
)
# add a fresh column
read_data_to_modify.execute(
"ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT"
)
except duckdb.BinderException:
# add a fresh column
read_data_to_modify.execute(
"ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT"
)
# drop potential existing column
for col in ["gbif_taxonomy", "gbif_usage_key"]:
try:
read_data_to_modify.execute(f"ALTER TABLE group_metadata DROP COLUMN {col}")
except duckdb.BinderException:
pass
# add the new columns
read_data_to_modify.execute(
"ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT"
)
read_data_to_modify.execute(
"ALTER TABLE group_metadata ADD COLUMN gbif_usage_key BIGINT"
)
# add to the sequence metadata

@@ -189,3 +198,5 @@ read_data_to_modify.execute(

UPDATE group_metadata AS gmd
SET gbif_taxonomy = gtp.gbif_taxonomy
SET
gbif_taxonomy = gtp.gbif_taxonomy,
gbif_usage_key = gtp.gbif_taxon_key
FROM gbif_tax_parquet AS gtp

@@ -192,0 +203,0 @@ WHERE gmd."{species_column}" = gtp."{species_column}"

@@ -1,5 +0,5 @@

import duckdb, pyproj, time, requests, random
import duckdb, pyproj, time, requests, random, zipfile, glob, shutil
import pandas as pd
import pathlib
from joblib import Parallel, delayed
from joblib import Parallel, delayed, load, dump
import streamlit as st

@@ -13,2 +13,4 @@ from pathlib import Path

from pygbif.gbifutils import NoResultException
from shapely.ops import unary_union
from streamlit_autorefresh import st_autorefresh

@@ -130,2 +132,3 @@

smd.gbif_taxonomy,
smd.gbif_usage_key,
samd."{lat_col}" AS lat,

@@ -165,2 +168,3 @@ samd."{lon_col}" AS lon,

)
# create a dataframe with idx wkt

@@ -228,3 +232,4 @@ wkt_df = pd.DataFrame(

dd.gbif_taxonomy,
wkt.radius
dd.gbif_usage_key,
wkt.radius,
FROM wkt_data AS wkt

@@ -284,82 +289,83 @@ LEFT JOIN distribution_data AS dd

def validation_api_request(species_name, wkt_string) -> str:
# define a backoff factor for each individual request
backoff = 1.0
# perform the api call
while True:
try:
validation_result = occ.search(
scientificName=species_name, geometry=wkt_string, limit=1, timeout=60
)
return (
"plausible" if validation_result.get("count", 0) > 0 else "implausible"
)
except (
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
) as e:
if e.response.status_code == 429:
wait = backoff + random.uniform(0, 1)
time.sleep(wait)
backoff *= 2
def initialize_download(temp_db, temp_folder, username, password, email):
# establish the connection to the duckdb database
temp_db_con = duckdb.connect(temp_db, read_only=True)
temp_db_con.execute("INSTALL spatial; LOAD spatial;")
except NoResultException:
wait = backoff + random.uniform(0, 1)
time.sleep(wait)
backoff *= 2
minx, miny, maxx, maxy = temp_db_con.execute(
"""
SELECT
MIN(ST_XMin(ST_GeomFromText(wkt_string))) AS minx,
MIN(ST_YMin(ST_GeomFromText(wkt_string))) AS miny,
MAX(ST_XMax(ST_GeomFromText(wkt_string))) AS maxx,
MAX(ST_YMax(ST_GeomFromText(wkt_string))) AS maxy
FROM wkt_data
"""
).fetchone()
# bbox = f"POLYGON(({minx} {miny}, {minx} {maxy}, {maxx} {maxy}, {maxx} {miny}, {minx} {miny}))"
def api_validation(temp_db, temp_folder):
# establish the connection
temp_db_con = duckdb.connect(temp_db)
# collect the required data
api_data = temp_db_con.execute(
f"""
SELECT
DISTINCT(wd.sequence_idx),
dd.gbif_taxonomy,
wd.wkt_string
FROM wkt_data AS wd
LEFT JOIN distribution_data AS dd
ON wd.sequence_idx = dd.sequence_idx
# load all distinct usage keys
usage_keys = (
temp_db_con.execute(
f"""
SELECT
DISTINCT(dd.gbif_usage_key)
FROM distribution_data AS dd
"""
)
.df()["gbif_usage_key"]
.to_list()
)
chunk_count = 1
temp_db_con.close()
while True:
api_data_chunk = api_data.fetch_df_chunk()
if api_data_chunk.empty:
break
else:
# create a filename for the chunk
output_name = temp_folder.joinpath(
f"gbif_validation_{chunk_count}.parquet.snappy"
)
chunk_args = zip(
api_data_chunk["gbif_taxonomy"], api_data_chunk["wkt_string"]
)
validation_status = Parallel(n_jobs=1)(
delayed(validation_api_request)(species_name, wkt_string)
for species_name, wkt_string in chunk_args
)
# some user output
st.toast(f"Chunk {chunk_count} processed!")
download_predicate = {
"type": "and",
"predicates": [
{
"type": "in",
"key": "TAXON_KEY",
"values": [str(k) for k in usage_keys],
"matchCase": False,
},
# Latitude min
{
"type": "greaterThanOrEquals",
"key": "DECIMAL_LATITUDE",
"value": miny,
},
# Latitude max
{
"type": "lessThanOrEquals",
"key": "DECIMAL_LATITUDE",
"value": maxy,
},
# Longitude min
{
"type": "greaterThanOrEquals",
"key": "DECIMAL_LONGITUDE",
"value": minx,
},
# Longitude max
{
"type": "lessThanOrEquals",
"key": "DECIMAL_LONGITUDE",
"value": maxx,
},
],
}
# save to parquet
chunk_output = pd.DataFrame(
zip(api_data_chunk["sequence_idx"], validation_status),
columns=["sequence_idx", "gbif_validation"],
)
# submit the download
download_info = occ.download(
queries=download_predicate,
format="SIMPLE_PARQUET",
user=username,
pwd=password,
email=email,
)
# write to parquet to later ingest into the main database
chunk_output.to_parquet(output_name)
return download_info
# increase the chunk count
chunk_count += 1
temp_db_con.close()
def add_validation_to_metadata(read_data_to_modify, temp_folder):

@@ -369,3 +375,3 @@ # connect to the main database

# collect the parquet data
parquet_files = temp_folder.joinpath("gbif_validation_*.parquet.snappy")
parquet_files = temp_folder.joinpath("gbif_validation.parquet.snappy")

@@ -401,3 +407,3 @@ # create a view over the parquet files

# remove the parquet files
for file in temp_folder.glob("gbif_validation_*.parquet.snappy"):
for file in temp_folder.glob("gbif_validation.parquet.snappy"):
if file.is_file():

@@ -511,2 +517,143 @@ file.unlink()

def download_initialized(pickle_path):
if pickle_path.is_file():
return True
else:
return False
def download_info(pickle_path):
if not download_initialized(pickle_path):
st.info("There is no active download at the moment.")
return False, False
else:
download_data = load(pickle_path)
download_key = download_data[0]
meta = occ.download_meta(download_key)
created = meta["created"]
status = meta["status"]
st.info(
f"""
The download key is: {download_key}\n
The download has been created at: {created}\n
Current status: {status}\n
This page will refresh every 20 seconds until the data is ready.
"""
)
return status, download_key
def download_gbif_data(download_key, temp_folder):
# show some status updates
status = st.status("Downloading and processing data.", expanded=True)
# download the data to the temp folder, unpack it and load it into a duckdb database
occ.download_get(download_key, path=temp_folder)
status.write("Download completed.")
# unzip and push data into duckdb
status.write("Unzipping downloaded results.")
download_file = temp_folder.joinpath(f"{download_key}.zip")
with zipfile.ZipFile(download_file, "r") as in_stream:
in_stream.extractall(temp_folder)
# remove the zip file
download_file.unlink()
# Load data into duckDB
status.write("Loading data into DuckDB.")
# collect all files to load into duckdb (remove the empty file first)
for file in temp_folder.joinpath("occurrence.parquet").iterdir():
if file.stat().st_size == 0:
file.unlink()
# load into duckdb
gbif_database = temp_folder.joinpath("gbif_db.duckdb")
gbif_con = duckdb.connect(gbif_database)
gbif_con.execute("INSTALL spatial; LOAD spatial;")
gbif_con.execute(
f"""
CREATE OR REPLACE TABLE
occ_data
AS SELECT
CAST(taxonkey AS BIGINT) AS taxonkey,
ST_Collect(ARRAY_AGG(ST_Point(decimallongitude, decimallatitude))) AS multipoints
FROM read_parquet('{temp_folder.joinpath("occurrence.parquet", ("*"))}')
GROUP BY taxonkey
"""
)
# close the connection
gbif_con.close()
status.write("Data successfully loaded.")
# remove the downloaded parquet
shutil.rmtree(temp_folder.joinpath("occurrence.parquet"))
# return status widget to use further
return status
def compute_validation(status, temp_db, temp_folder, download_pickle):
# update the status
status.write("Computing validation.")
# collect the required data to validate
temp_db_con = duckdb.connect(temp_db)
gbif_database = temp_folder.joinpath("gbif_db.duckdb")
temp_db_con.execute(f"ATTACH '{gbif_database}' as gbif_data")
temp_db_con.execute("INSTALL spatial; LOAD spatial;")
# collect the polygon data
temp_db_con.execute(
f"""
CREATE OR REPLACE TABLE main.validation_data AS
SELECT
DISTINCT(wd.sequence_idx),
dd.gbif_usage_key,
wd.wkt_string,
god.multipoints
FROM wkt_data AS wd
LEFT JOIN distribution_data AS dd
ON wd.sequence_idx = dd.sequence_idx
LEFT JOIN gbif_data.occ_data AS god
ON dd.gbif_usage_key = god.taxonkey
"""
)
# create an output file as parquet to ingest in the next step
output_parquet = temp_folder.joinpath("gbif_validation.parquet.snappy")
# create the validation parquet
temp_db_con.execute(
f"""
COPY (
SELECT
sequence_idx,
CASE
WHEN ST_Intersects(
multipoints,
ST_GeomFromText(wkt_string)
) THEN 'plausible'
ELSE 'implausible'
END AS gbif_validation
FROM main.validation_data
) TO '{output_parquet}' (FORMAT PARQUET)
"""
)
# close the connection
temp_db_con.close()
# remove the download file in the end and all other files not longer needed
if download_pickle.is_file():
download_pickle.unlink()
gbif_database.unlink()
# finalize the status window
status.update(label="Finished", state="complete", expanded=False)
def main():

@@ -527,2 +674,3 @@ # prevent page from scroling up on click

temp_db = Path(temp_folder).joinpath("temp.duckdb")
download_pickle = temp_folder.joinpath("download.pkl")

@@ -626,16 +774,65 @@ # header

# option to reset the species distribution data
compute = st.button(label="Validate species via GBIF API", type="primary")
reset = st.button(label="Reset species distributions", type="secondary")
st.info(
"""To submit a download to GBIF credentials are required. They will only be used to
initialize the download and never be stored. After sending the download the fields will
be emptied automatically. You will receive an email once the download is finished.""",
icon="ℹ️",
)
# reset wkt data
if reset:
temp_db.unlink()
st.rerun()
# give the input fields for username, password and email
col1, col2, col3 = st.columns(3)
# perform the GBIF validation algorithm
if compute:
with st.spinner("Querying GBIF API. Hold on.", show_time=True):
api_validation(temp_db, temp_folder)
with col1:
user = st.text_input(label="GBIF username")
with col2:
pwd = st.text_input(label="GBIF password", type="password")
with col3:
email = st.text_input(label="Your mail adress")
# option to initialize the download
if user and pwd and email and not download_initialized(download_pickle):
validate = st.button(
label="Initialize download of validation data",
type="primary",
disabled=False,
)
else:
validate = st.button(
label="Initialize download of validation data",
type="primary",
disabled=True,
)
if validate:
with st.spinner("Initializing download. Hold on.", show_time=True):
pickle_data = initialize_download(
temp_db, temp_folder, user, pwd, email
)
# pickle the download data after the download has been requested
dump(pickle_data, download_pickle)
# rerun to clear all fields and disable the button
st.rerun()
# display download info
status, download_key = download_info(download_pickle)
if status == "SUCCEEDED":
download_process = st.button(
label="Download and process data", disabled=False, type="primary"
)
else:
download_process = st.button(
label="Download and process data", disabled=True
)
st_autorefresh(interval=20_000)
# download and process the data, remove download files afterwards
if download_process:
# download the data and pass to duckdb
status = download_gbif_data(download_key, temp_folder)
# compute the actual validaton
compute_validation(status, temp_db, temp_folder, download_pickle)
# ingest the data into the sequence metadata

@@ -650,2 +847,12 @@ add_validation_to_metadata(

st.divider()
reset = st.button(label="Reset species distributions", type="secondary")
# reset wkt data
if reset:
temp_db.unlink()
download_pickle.unlink()
st.rerun()
# if validated data exists

@@ -652,0 +859,0 @@ if gbif_validated:

Metadata-Version: 2.4
Name: apscale
Version: 4.1.6
Version: 4.2.0
Summary: Advanced Pipeline for Simple yet Comprehensive AnaLysEs of DNA metabarcoding data

@@ -5,0 +5,0 @@ Home-page: https://github.com/DominikBuchner/apscale

@@ -8,3 +8,3 @@ import setuptools

name="apscale",
version="4.1.6",
version="4.2.0",
author="Dominik Buchner",

@@ -11,0 +11,0 @@ author_email="dominik.buchner524@googlemail.com",