apscale
Advanced tools
| Metadata-Version: 2.4 | ||
| Name: apscale | ||
| Version: 4.1.6 | ||
| Version: 4.2.0 | ||
| Summary: Advanced Pipeline for Simple yet Comprehensive AnaLysEs of DNA metabarcoding data | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/DominikBuchner/apscale |
@@ -72,8 +72,12 @@ import streamlit as st | ||
| # try to fetch the new name, else return a NULL value | ||
| try: | ||
| species_name = api_response["species"] | ||
| except KeyError: | ||
| species_name = pd.NA | ||
| # keep repeating until all data is fetched | ||
| while True: | ||
| try: | ||
| species_name, key = api_response["species"], api_response["usageKey"] | ||
| break | ||
| except KeyError: | ||
| species_name, key = pd.NA, pd.NA | ||
| break | ||
| return species_name | ||
| return species_name, key | ||
@@ -105,3 +109,3 @@ | ||
| # fetch a chunk | ||
| chunk = read_data_to_modify.fetch_df_chunk(100) | ||
| chunk = read_data_to_modify.fetch_df_chunk() | ||
| if chunk.empty: | ||
@@ -116,9 +120,10 @@ break | ||
| species_names = chunk[species_column].to_list() | ||
| corrected_names = Parallel(n_jobs=-2)( | ||
| api_data = Parallel(n_jobs=-2)( | ||
| delayed(api_request)(name) for name in species_names | ||
| ) | ||
| corrected_names, taxon_keys = zip(*api_data) | ||
| # create an output dataframe | ||
| chunk_df = pd.DataFrame( | ||
| data=zip(species_names, corrected_names), | ||
| columns=[species_column, "gbif_taxonomy"], | ||
| data=zip(species_names, corrected_names, taxon_keys), | ||
| columns=[species_column, "gbif_taxonomy", "gbif_taxon_key"], | ||
| ) | ||
@@ -141,17 +146,19 @@ # save to intermediate parquet untill all chunks are finished | ||
| # drop a potential existing column | ||
| try: | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE sequence_metadata DROP COLUMN gbif_taxonomy" | ||
| ) | ||
| # add a fresh column | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| except duckdb.BinderException: | ||
| # add a fresh column | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| # drop potential existing column | ||
| for col in ["gbif_taxonomy", "gbif_usage_key"]: | ||
| try: | ||
| read_data_to_modify.execute( | ||
| f"ALTER TABLE sequence_metadata DROP COLUMN {col}" | ||
| ) | ||
| except duckdb.BinderException: | ||
| pass | ||
| # add the new columns | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE sequence_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE sequence_metadata ADD COLUMN gbif_usage_key BIGINT" | ||
| ) | ||
| # add to the sequence metadata | ||
@@ -161,3 +168,5 @@ read_data_to_modify.execute( | ||
| UPDATE sequence_metadata AS smd | ||
| SET gbif_taxonomy = gtp.gbif_taxonomy | ||
| SET | ||
| gbif_taxonomy = gtp.gbif_taxonomy, | ||
| gbif_usage_key = gtp.gbif_taxon_key | ||
| FROM gbif_tax_parquet AS gtp | ||
@@ -169,17 +178,17 @@ WHERE smd."{species_column}" = gtp."{species_column}" | ||
| # also add to the group metadata | ||
| # drop a potential existing column | ||
| try: | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE group_metadata DROP COLUMN gbif_taxonomy" | ||
| ) | ||
| # add a fresh column | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| except duckdb.BinderException: | ||
| # add a fresh column | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| # drop potential existing column | ||
| for col in ["gbif_taxonomy", "gbif_usage_key"]: | ||
| try: | ||
| read_data_to_modify.execute(f"ALTER TABLE group_metadata DROP COLUMN {col}") | ||
| except duckdb.BinderException: | ||
| pass | ||
| # add the new columns | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE group_metadata ADD COLUMN gbif_taxonomy TEXT" | ||
| ) | ||
| read_data_to_modify.execute( | ||
| "ALTER TABLE group_metadata ADD COLUMN gbif_usage_key BIGINT" | ||
| ) | ||
| # add to the sequence metadata | ||
@@ -189,3 +198,5 @@ read_data_to_modify.execute( | ||
| UPDATE group_metadata AS gmd | ||
| SET gbif_taxonomy = gtp.gbif_taxonomy | ||
| SET | ||
| gbif_taxonomy = gtp.gbif_taxonomy, | ||
| gbif_usage_key = gtp.gbif_taxon_key | ||
| FROM gbif_tax_parquet AS gtp | ||
@@ -192,0 +203,0 @@ WHERE gmd."{species_column}" = gtp."{species_column}" |
@@ -1,5 +0,5 @@ | ||
| import duckdb, pyproj, time, requests, random | ||
| import duckdb, pyproj, time, requests, random, zipfile, glob, shutil | ||
| import pandas as pd | ||
| import pathlib | ||
| from joblib import Parallel, delayed | ||
| from joblib import Parallel, delayed, load, dump | ||
| import streamlit as st | ||
@@ -13,2 +13,4 @@ from pathlib import Path | ||
| from pygbif.gbifutils import NoResultException | ||
| from shapely.ops import unary_union | ||
| from streamlit_autorefresh import st_autorefresh | ||
@@ -130,2 +132,3 @@ | ||
| smd.gbif_taxonomy, | ||
| smd.gbif_usage_key, | ||
| samd."{lat_col}" AS lat, | ||
@@ -165,2 +168,3 @@ samd."{lon_col}" AS lon, | ||
| ) | ||
| # create a dataframe with idx wkt | ||
@@ -228,3 +232,4 @@ wkt_df = pd.DataFrame( | ||
| dd.gbif_taxonomy, | ||
| wkt.radius | ||
| dd.gbif_usage_key, | ||
| wkt.radius, | ||
| FROM wkt_data AS wkt | ||
@@ -284,82 +289,83 @@ LEFT JOIN distribution_data AS dd | ||
| def validation_api_request(species_name, wkt_string) -> str: | ||
| # define a backoff factor for each individual request | ||
| backoff = 1.0 | ||
| # perform the api call | ||
| while True: | ||
| try: | ||
| validation_result = occ.search( | ||
| scientificName=species_name, geometry=wkt_string, limit=1, timeout=60 | ||
| ) | ||
| return ( | ||
| "plausible" if validation_result.get("count", 0) > 0 else "implausible" | ||
| ) | ||
| except ( | ||
| requests.exceptions.HTTPError, | ||
| requests.exceptions.ConnectionError, | ||
| ) as e: | ||
| if e.response.status_code == 429: | ||
| wait = backoff + random.uniform(0, 1) | ||
| time.sleep(wait) | ||
| backoff *= 2 | ||
| def initialize_download(temp_db, temp_folder, username, password, email): | ||
| # establish the connection to the duckdb database | ||
| temp_db_con = duckdb.connect(temp_db, read_only=True) | ||
| temp_db_con.execute("INSTALL spatial; LOAD spatial;") | ||
| except NoResultException: | ||
| wait = backoff + random.uniform(0, 1) | ||
| time.sleep(wait) | ||
| backoff *= 2 | ||
| minx, miny, maxx, maxy = temp_db_con.execute( | ||
| """ | ||
| SELECT | ||
| MIN(ST_XMin(ST_GeomFromText(wkt_string))) AS minx, | ||
| MIN(ST_YMin(ST_GeomFromText(wkt_string))) AS miny, | ||
| MAX(ST_XMax(ST_GeomFromText(wkt_string))) AS maxx, | ||
| MAX(ST_YMax(ST_GeomFromText(wkt_string))) AS maxy | ||
| FROM wkt_data | ||
| """ | ||
| ).fetchone() | ||
| # bbox = f"POLYGON(({minx} {miny}, {minx} {maxy}, {maxx} {maxy}, {maxx} {miny}, {minx} {miny}))" | ||
| def api_validation(temp_db, temp_folder): | ||
| # establish the connection | ||
| temp_db_con = duckdb.connect(temp_db) | ||
| # collect the required data | ||
| api_data = temp_db_con.execute( | ||
| f""" | ||
| SELECT | ||
| DISTINCT(wd.sequence_idx), | ||
| dd.gbif_taxonomy, | ||
| wd.wkt_string | ||
| FROM wkt_data AS wd | ||
| LEFT JOIN distribution_data AS dd | ||
| ON wd.sequence_idx = dd.sequence_idx | ||
| # load all distinct usage keys | ||
| usage_keys = ( | ||
| temp_db_con.execute( | ||
| f""" | ||
| SELECT | ||
| DISTINCT(dd.gbif_usage_key) | ||
| FROM distribution_data AS dd | ||
| """ | ||
| ) | ||
| .df()["gbif_usage_key"] | ||
| .to_list() | ||
| ) | ||
| chunk_count = 1 | ||
| temp_db_con.close() | ||
| while True: | ||
| api_data_chunk = api_data.fetch_df_chunk() | ||
| if api_data_chunk.empty: | ||
| break | ||
| else: | ||
| # create a filename for the chunk | ||
| output_name = temp_folder.joinpath( | ||
| f"gbif_validation_{chunk_count}.parquet.snappy" | ||
| ) | ||
| chunk_args = zip( | ||
| api_data_chunk["gbif_taxonomy"], api_data_chunk["wkt_string"] | ||
| ) | ||
| validation_status = Parallel(n_jobs=1)( | ||
| delayed(validation_api_request)(species_name, wkt_string) | ||
| for species_name, wkt_string in chunk_args | ||
| ) | ||
| # some user output | ||
| st.toast(f"Chunk {chunk_count} processed!") | ||
| download_predicate = { | ||
| "type": "and", | ||
| "predicates": [ | ||
| { | ||
| "type": "in", | ||
| "key": "TAXON_KEY", | ||
| "values": [str(k) for k in usage_keys], | ||
| "matchCase": False, | ||
| }, | ||
| # Latitude min | ||
| { | ||
| "type": "greaterThanOrEquals", | ||
| "key": "DECIMAL_LATITUDE", | ||
| "value": miny, | ||
| }, | ||
| # Latitude max | ||
| { | ||
| "type": "lessThanOrEquals", | ||
| "key": "DECIMAL_LATITUDE", | ||
| "value": maxy, | ||
| }, | ||
| # Longitude min | ||
| { | ||
| "type": "greaterThanOrEquals", | ||
| "key": "DECIMAL_LONGITUDE", | ||
| "value": minx, | ||
| }, | ||
| # Longitude max | ||
| { | ||
| "type": "lessThanOrEquals", | ||
| "key": "DECIMAL_LONGITUDE", | ||
| "value": maxx, | ||
| }, | ||
| ], | ||
| } | ||
| # save to parquet | ||
| chunk_output = pd.DataFrame( | ||
| zip(api_data_chunk["sequence_idx"], validation_status), | ||
| columns=["sequence_idx", "gbif_validation"], | ||
| ) | ||
| # submit the download | ||
| download_info = occ.download( | ||
| queries=download_predicate, | ||
| format="SIMPLE_PARQUET", | ||
| user=username, | ||
| pwd=password, | ||
| email=email, | ||
| ) | ||
| # write to parquet to later ingest into the main database | ||
| chunk_output.to_parquet(output_name) | ||
| return download_info | ||
| # increase the chunk count | ||
| chunk_count += 1 | ||
| temp_db_con.close() | ||
| def add_validation_to_metadata(read_data_to_modify, temp_folder): | ||
@@ -369,3 +375,3 @@ # connect to the main database | ||
| # collect the parquet data | ||
| parquet_files = temp_folder.joinpath("gbif_validation_*.parquet.snappy") | ||
| parquet_files = temp_folder.joinpath("gbif_validation.parquet.snappy") | ||
@@ -401,3 +407,3 @@ # create a view over the parquet files | ||
| # remove the parquet files | ||
| for file in temp_folder.glob("gbif_validation_*.parquet.snappy"): | ||
| for file in temp_folder.glob("gbif_validation.parquet.snappy"): | ||
| if file.is_file(): | ||
@@ -511,2 +517,143 @@ file.unlink() | ||
| def download_initialized(pickle_path): | ||
| if pickle_path.is_file(): | ||
| return True | ||
| else: | ||
| return False | ||
| def download_info(pickle_path): | ||
| if not download_initialized(pickle_path): | ||
| st.info("There is no active download at the moment.") | ||
| return False, False | ||
| else: | ||
| download_data = load(pickle_path) | ||
| download_key = download_data[0] | ||
| meta = occ.download_meta(download_key) | ||
| created = meta["created"] | ||
| status = meta["status"] | ||
| st.info( | ||
| f""" | ||
| The download key is: {download_key}\n | ||
| The download has been created at: {created}\n | ||
| Current status: {status}\n | ||
| This page will refresh every 20 seconds until the data is ready. | ||
| """ | ||
| ) | ||
| return status, download_key | ||
| def download_gbif_data(download_key, temp_folder): | ||
| # show some status updates | ||
| status = st.status("Downloading and processing data.", expanded=True) | ||
| # download the data to the temp folder, unpack it and load it into a duckdb database | ||
| occ.download_get(download_key, path=temp_folder) | ||
| status.write("Download completed.") | ||
| # unzip and push data into duckdb | ||
| status.write("Unzipping downloaded results.") | ||
| download_file = temp_folder.joinpath(f"{download_key}.zip") | ||
| with zipfile.ZipFile(download_file, "r") as in_stream: | ||
| in_stream.extractall(temp_folder) | ||
| # remove the zip file | ||
| download_file.unlink() | ||
| # Load data into duckDB | ||
| status.write("Loading data into DuckDB.") | ||
| # collect all files to load into duckdb (remove the empty file first) | ||
| for file in temp_folder.joinpath("occurrence.parquet").iterdir(): | ||
| if file.stat().st_size == 0: | ||
| file.unlink() | ||
| # load into duckdb | ||
| gbif_database = temp_folder.joinpath("gbif_db.duckdb") | ||
| gbif_con = duckdb.connect(gbif_database) | ||
| gbif_con.execute("INSTALL spatial; LOAD spatial;") | ||
| gbif_con.execute( | ||
| f""" | ||
| CREATE OR REPLACE TABLE | ||
| occ_data | ||
| AS SELECT | ||
| CAST(taxonkey AS BIGINT) AS taxonkey, | ||
| ST_Collect(ARRAY_AGG(ST_Point(decimallongitude, decimallatitude))) AS multipoints | ||
| FROM read_parquet('{temp_folder.joinpath("occurrence.parquet", ("*"))}') | ||
| GROUP BY taxonkey | ||
| """ | ||
| ) | ||
| # close the connection | ||
| gbif_con.close() | ||
| status.write("Data successfully loaded.") | ||
| # remove the downloaded parquet | ||
| shutil.rmtree(temp_folder.joinpath("occurrence.parquet")) | ||
| # return status widget to use further | ||
| return status | ||
| def compute_validation(status, temp_db, temp_folder, download_pickle): | ||
| # update the status | ||
| status.write("Computing validation.") | ||
| # collect the required data to validate | ||
| temp_db_con = duckdb.connect(temp_db) | ||
| gbif_database = temp_folder.joinpath("gbif_db.duckdb") | ||
| temp_db_con.execute(f"ATTACH '{gbif_database}' as gbif_data") | ||
| temp_db_con.execute("INSTALL spatial; LOAD spatial;") | ||
| # collect the polygon data | ||
| temp_db_con.execute( | ||
| f""" | ||
| CREATE OR REPLACE TABLE main.validation_data AS | ||
| SELECT | ||
| DISTINCT(wd.sequence_idx), | ||
| dd.gbif_usage_key, | ||
| wd.wkt_string, | ||
| god.multipoints | ||
| FROM wkt_data AS wd | ||
| LEFT JOIN distribution_data AS dd | ||
| ON wd.sequence_idx = dd.sequence_idx | ||
| LEFT JOIN gbif_data.occ_data AS god | ||
| ON dd.gbif_usage_key = god.taxonkey | ||
| """ | ||
| ) | ||
| # create an output file as parquet to ingest in the next step | ||
| output_parquet = temp_folder.joinpath("gbif_validation.parquet.snappy") | ||
| # create the validation parquet | ||
| temp_db_con.execute( | ||
| f""" | ||
| COPY ( | ||
| SELECT | ||
| sequence_idx, | ||
| CASE | ||
| WHEN ST_Intersects( | ||
| multipoints, | ||
| ST_GeomFromText(wkt_string) | ||
| ) THEN 'plausible' | ||
| ELSE 'implausible' | ||
| END AS gbif_validation | ||
| FROM main.validation_data | ||
| ) TO '{output_parquet}' (FORMAT PARQUET) | ||
| """ | ||
| ) | ||
| # close the connection | ||
| temp_db_con.close() | ||
| # remove the download file in the end and all other files not longer needed | ||
| if download_pickle.is_file(): | ||
| download_pickle.unlink() | ||
| gbif_database.unlink() | ||
| # finalize the status window | ||
| status.update(label="Finished", state="complete", expanded=False) | ||
| def main(): | ||
@@ -527,2 +674,3 @@ # prevent page from scroling up on click | ||
| temp_db = Path(temp_folder).joinpath("temp.duckdb") | ||
| download_pickle = temp_folder.joinpath("download.pkl") | ||
@@ -626,16 +774,65 @@ # header | ||
| # option to reset the species distribution data | ||
| compute = st.button(label="Validate species via GBIF API", type="primary") | ||
| reset = st.button(label="Reset species distributions", type="secondary") | ||
| st.info( | ||
| """To submit a download to GBIF credentials are required. They will only be used to | ||
| initialize the download and never be stored. After sending the download the fields will | ||
| be emptied automatically. You will receive an email once the download is finished.""", | ||
| icon="ℹ️", | ||
| ) | ||
| # reset wkt data | ||
| if reset: | ||
| temp_db.unlink() | ||
| st.rerun() | ||
| # give the input fields for username, password and email | ||
| col1, col2, col3 = st.columns(3) | ||
| # perform the GBIF validation algorithm | ||
| if compute: | ||
| with st.spinner("Querying GBIF API. Hold on.", show_time=True): | ||
| api_validation(temp_db, temp_folder) | ||
| with col1: | ||
| user = st.text_input(label="GBIF username") | ||
| with col2: | ||
| pwd = st.text_input(label="GBIF password", type="password") | ||
| with col3: | ||
| email = st.text_input(label="Your mail adress") | ||
| # option to initialize the download | ||
| if user and pwd and email and not download_initialized(download_pickle): | ||
| validate = st.button( | ||
| label="Initialize download of validation data", | ||
| type="primary", | ||
| disabled=False, | ||
| ) | ||
| else: | ||
| validate = st.button( | ||
| label="Initialize download of validation data", | ||
| type="primary", | ||
| disabled=True, | ||
| ) | ||
| if validate: | ||
| with st.spinner("Initializing download. Hold on.", show_time=True): | ||
| pickle_data = initialize_download( | ||
| temp_db, temp_folder, user, pwd, email | ||
| ) | ||
| # pickle the download data after the download has been requested | ||
| dump(pickle_data, download_pickle) | ||
| # rerun to clear all fields and disable the button | ||
| st.rerun() | ||
| # display download info | ||
| status, download_key = download_info(download_pickle) | ||
| if status == "SUCCEEDED": | ||
| download_process = st.button( | ||
| label="Download and process data", disabled=False, type="primary" | ||
| ) | ||
| else: | ||
| download_process = st.button( | ||
| label="Download and process data", disabled=True | ||
| ) | ||
| st_autorefresh(interval=20_000) | ||
| # download and process the data, remove download files afterwards | ||
| if download_process: | ||
| # download the data and pass to duckdb | ||
| status = download_gbif_data(download_key, temp_folder) | ||
| # compute the actual validaton | ||
| compute_validation(status, temp_db, temp_folder, download_pickle) | ||
| # ingest the data into the sequence metadata | ||
@@ -650,2 +847,12 @@ add_validation_to_metadata( | ||
| st.divider() | ||
| reset = st.button(label="Reset species distributions", type="secondary") | ||
| # reset wkt data | ||
| if reset: | ||
| temp_db.unlink() | ||
| download_pickle.unlink() | ||
| st.rerun() | ||
| # if validated data exists | ||
@@ -652,0 +859,0 @@ if gbif_validated: |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: apscale | ||
| Version: 4.1.6 | ||
| Version: 4.2.0 | ||
| Summary: Advanced Pipeline for Simple yet Comprehensive AnaLysEs of DNA metabarcoding data | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/DominikBuchner/apscale |
+1
-1
@@ -8,3 +8,3 @@ import setuptools | ||
| name="apscale", | ||
| version="4.1.6", | ||
| version="4.2.0", | ||
| author="Dominik Buchner", | ||
@@ -11,0 +11,0 @@ author_email="dominik.buchner524@googlemail.com", |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
267127
2.77%5788
3.26%