New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

cuallee

Package Overview
Dependencies
Maintainers
2
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cuallee

Python library for data validation on DataFrame APIs including Snowflake/Snowpark, Apache/PySpark and Pandas/DataFrame.

  • 0.15.2
  • PyPI
  • Socket score

Maintainers
2

cuallee

PyPI version ci codecov License status DOI

Meaning good in Aztec (Nahuatl), pronounced: QUAL-E

This library provides an intuitive API to describe data quality checks initially just for PySpark dataframes v3.3.0. And extended to pandas, snowpark, duckdb, daft and more. It is a replacement written in pure python of the pydeequ framework.

I gave up in deequ as after extensive use, the API is not user-friendly, the Python Callback servers produce additional costs in our compute clusters, and the lack of support to the newest version of PySpark.

As result cuallee was born

This implementation goes in hand with the latest API from PySpark and uses the Observation API to collect metrics at the lower cost of computation. When benchmarking against pydeequ, cuallee uses circa <3k java classes underneath and remarkably less memory.

Support

cuallee is the data quality framework truly dataframe agnostic.

ProviderAPIVersions
snowflakesnowpark1.11.1, 1.4.0
databrickspyspark & spark-connect3.5.x, 3.4.0, 3.3.x, 3.2.x
bigquerybigquery3.4.1
pandaspandas2.0.2, 1.5.x, 1.4.x
duckdbduckdb1.0.0, 0.10.2,0.9.2,0.8.0
polarspolars1.0.0, 0.19.6
daftdaft0.2.24, 0.2.19

Logos are trademarks of their own brands.

Install

pip install cuallee

Checks

The most common checks for data integrity validations are completeness and uniqueness an example of this dimensions shown below:

from cuallee import Check, CheckLevel # WARN:0, ERR: 1

# Nulls on column Id
check = Check(CheckLevel.WARNING, "Completeness")
(
    check
    .is_complete("id")
    .is_unique("id")
    .validate(df)
).show() # Returns a pyspark.sql.DataFrame

[!IMPORTANT] A new version of the validate output is currently under construction.

Dates

Perhaps one of the most useful features of cuallee is its extensive number of checks for Date and Timestamp values. Including, validation of ranges, set operations like inclusion, or even a verification that confirms continuity on dates using the is_daily check function.

# Unique values on id
check = Check(CheckLevel.WARNING, "CheckIsBetweenDates")
df = spark.sql(
    """
    SELECT
        explode(
            sequence(
                to_date('2022-01-01'),
                to_date('2022-01-10'),
                interval 1 day)) as date
    """)
assert (
    check.is_between("date", ("2022-01-01", "2022-01-10"))
    .validate(df)
    .first()
    .status == "PASS"
)

Membership

Other common test is the validation of list of values as part of the multiple integrity checks required for better quality data.

df = spark.createDataFrame([[1, 10], [2, 15], [3, 17]], ["ID", "value"])
check = Check(CheckLevel.WARNING, "is_contained_in_number_test")
check.is_contained_in("value", (10, 15, 20, 25)).validate(df)

Regular Expressions

When it comes to the flexibility of matching, regular expressions are always to the rescue. cuallee makes use of the regular expressions to validate that fields of type String conform to specific patterns.

df = spark.createDataFrame([[1, "is_blue"], [2, "has_hat"], [3, "is_smart"]], ["ID", "desc"])
check = Check(CheckLevel.WARNING, "has_pattern_test")
check.has_pattern("desc", r"^is.*t$") # only match is_smart 33% of rows.
check.validate(df).first().status == "FAIL"

Anomalies

Statistical tests are a great aid for verifying anomalies on data. Here an example that shows that will PASS only when 40% of data is inside the interquartile range

df = spark.range(10)
check = Check(CheckLevel.WARNING, "IQR_Test")
check.is_inside_interquartile_range("id", pct=0.4)
check.validate(df).first().status == "PASS"

+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|id |timestamp          |check|level  |column|rule                         |value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|1  |2022-10-19 00:09:39|IQR  |WARNING|id    |is_inside_interquartile_range|10000|10  |4         |0.6      |0.4           |PASS  |
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+

Workflows (Process Mining)

Besides the common citizen-like checks, cuallee offers out-of-the-box real-life checks. For example, suppose that you are working SalesForce or SAP environment. Very likely your business processes will be driven by a lifecycle:

  • Order-To-Cash
  • Request-To-Pay
  • Inventory-Logistics-Delivery
  • Others. In this scenario, cuallee offers the ability that the sequence of events registered over time, are according to a sequence of events, like the example below:
import pyspark.sql.functions as F
from cuallee import Check, CheckLevel

data = pd.DataFrame({
   "name":["herminio", "herminio", "virginie", "virginie"],
   "event":["new","active", "new", "active"],
   "date": ["2022-01-01", "2022-01-02", "2022-01-03", "2022-02-04"]}
   )
df = spark.createDataFrame(data).withColumn("date", F.to_date("date"))

# Cuallee Process Mining
# Testing that all edges on workflows
check = Check(CheckLevel.WARNING, "WorkflowViolations")

# Validate that 50% of data goes from new => active
check.has_workflow("name", "event", "date", [("new", "active")], pct=0.5)
check.validate(df).show(truncate=False)

+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|id |timestamp          |check             |level  |column                   |rule        |value               |rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|1  |2022-11-07 23:08:50|WorkflowViolations|WARNING|('name', 'event', 'date')|has_workflow|(('new', 'active'),)|4   |2.0       |0.5      |0.5           |PASS  |
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+

Assertions

[2024-09-28]New feature! Return a simple true|false as a unified result for your check

import pandas as pd
from cuallee import Check
df = pd.DataFrame({"X":[1,2,3]})
# .ok(dataframe) method of a check will call validate and then verify that all rules are PASS
assert Check().is_complete("X").ok(df)

Controls

Simplify the entire validation of a dataframe in a particular dimension.

import pandas as pd
from cuallee import Control
df = pd.DataFrame({"X":[1,2,3], "Y": [10,20,30]})
# Checks all columns in dataframe for using is_complete check
Control.completeness(df)

cuallee VS pydeequ

In the test folder there are docker containers with the requirements to match the tests. Also a perftest.py available at the root folder for interests.

# 1000 rules / # of seconds

cuallee: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 162.00
pydeequ: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 322.00

Catalogue

CheckDescriptionDataType
is_completeZero nullsagnostic
is_uniqueZero duplicatesagnostic
is_primary_keyZero duplicatesagnostic
are_completeZero nulls on group of columnsagnostic
are_uniqueComposite primary key checkagnostic
is_composite_keyZero duplicates on multiple columnsagnostic
is_greater_thancol > xnumeric
is_positivecol > 0numeric
is_negativecol < 0numeric
is_greater_or_equal_thancol >= xnumeric
is_less_thancol < xnumeric
is_less_or_equal_thancol <= xnumeric
is_equal_thancol == xnumeric
is_contained_incol in [a, b, c, ...]agnostic
is_inAlias of is_contained_inagnostic
not_contained_incol not in [a, b, c, ...]agnostic
not_inAlias of not_contained_inagnostic
is_betweena <= col <= bnumeric, date
has_patternMatching a pattern defined as a regexstring
is_legitString not null & not empty ^\S$string
has_minmin(col) == xnumeric
has_maxmax(col) == xnumeric
has_stdσ(col) == xnumeric
has_meanμ(col) == xnumeric
has_sumΣ(col) == xnumeric
has_percentile%(col) == xnumeric
has_cardinalitycount(distinct(col)) == xagnostic
has_infogaincount(distinct(col)) > 1agnostic
has_max_byA utilitary predicate for max(col_a) == x for max(col_b)agnostic
has_min_byA utilitary predicate for min(col_a) == x for min(col_b)agnostic
has_correlationFinds correlation between 0..1 on corr(col_a, col_b)numeric
has_entropyCalculates the entropy of a column entropy(col) == x for classification problemsnumeric
is_inside_interquartile_rangeVerifies column values reside inside limits of interquartile range Q1 <= col <= Q3 used on anomalies.numeric
is_in_millionscol >= 1e6numeric
is_in_billionscol >= 1e9numeric
is_t_minus_1For date fields confirms 1 day ago t-1date
is_t_minus_2For date fields confirms 2 days ago t-2date
is_t_minus_3For date fields confirms 3 days ago t-3date
is_t_minus_nFor date fields confirms n days ago t-ndate
is_todayFor date fields confirms day is current date t-0date
is_yesterdayFor date fields confirms 1 day ago t-1date
is_on_weekdayFor date fields confirms day is between Mon-Fridate
is_on_weekendFor date fields confirms day is between Sat-Sundate
is_on_mondayFor date fields confirms day is Mondate
is_on_tuesdayFor date fields confirms day is Tuedate
is_on_wednesdayFor date fields confirms day is Weddate
is_on_thursdayFor date fields confirms day is Thudate
is_on_fridayFor date fields confirms day is Fridate
is_on_saturdayFor date fields confirms day is Satdate
is_on_sundayFor date fields confirms day is Sundate
is_on_scheduleFor date fields confirms time windows i.e. 9:00 - 17:00timestamp
is_dailyCan verify daily continuity on date fields by default. [2,3,4,5,6] which represents Mon-Fri in PySpark. However new schedules can be used for custom date continuitydate
has_workflowAdjacency matrix validation on 3-column graph, based on group, event, order columns.agnostic
is_customUser-defined custom function applied to dataframe for row-based validation.agnostic
satisfiesAn open SQL expression builder to construct custom checksagnostic
validateThe ultimate transformation of a check with a dataframe input for validationagnostic

Controls pyspark

CheckDescriptionDataType
completenessZero nullsagnostic
informationZero nulls and cardinality > 1agnostic
intelligenceZero nulls, zero empty strings and cardinality > 1agnostic
percentage_fill% rows not emptyagnostic
percentage_empty% rows emptyagnostic

ISO Standard

A new module has been incorporated in cuallee==0.4.0 which allows the verification of International Standard Organization columns in data frames. Simply access the check.iso interface to add the set of checks as shown below.

CheckDescriptionDataType
iso_4217currency compliant ccystring
iso_3166country compliant countrystring
df = spark.createDataFrame([[1, "USD"], [2, "MXN"], [3, "CAD"], [4, "EUR"], [5, "CHF"]], ["id", "ccy"])
check = Check(CheckLevel.WARNING, "ISO Compliant")
check.iso.iso_4217("ccy")
check.validate(df).show()
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
| id|          timestamp|        check|  level|column|           rule|               value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
|  1|2023-05-14 18:28:02|ISO Compliant|WARNING|   ccy|is_contained_in|{'BHD', 'CRC', 'M...|   5|       0.0|      1.0|           1.0|  PASS|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+

Snowflake Connection

In order to establish a connection to your SnowFlake account cuallee relies in the following environment variables to be avaialble in your environment:

  • SF_ACCOUNT
  • SF_USER
  • SF_PASSWORD
  • SF_ROLE
  • SF_WAREHOUSE
  • SF_DATABASE
  • SF_SCHEMA

Spark Connect

Just add the environment variable SPARK_REMOTE to your remote session, then cuallee will connect using

spark_connect = SparkSession.builder.remote(os.getenv("SPARK_REMOTE")).getOrCreate()

and convert all checks to select as opposed to Observation API compute instructions.

Databricks Connection

By default cuallee will search for a SparkSession available in the globals so there is literally no need to SparkSession.builder. When working in a local environment it will automatically search for an available session, or start one.

DuckDB

For testing on duckdb simply pass your table name to your check et voilà

import duckdb
conn = duckdb.connect(":memory:")
check = Check(CheckLevel.WARNING, "DuckDB", table_name="temp/taxi/*.parquet")
check.is_complete("VendorID")
check.is_complete("tpep_pickup_datetime")
check.validate(conn)

   id            timestamp check    level                column         rule value      rows  violations  pass_rate  pass_threshold status
0   1  2022-10-31 23:15:06  test  WARNING              VendorID  is_complete   N/A  19817583         0.0        1.0             1.0   PASS
1   2  2022-10-31 23:15:06  test  WARNING  tpep_pickup_datetime  is_complete   N/A  19817583         0.0        1.0             1.0   PASS

Roadmap

100% data frame agnostic implementation of data quality checks. Define once, run everywhere

  • [x] PySpark 3.5.0
  • [x] PySpark 3.4.0
  • [x] PySpark 3.3.0
  • [x] PySpark 3.2.x
  • [x] Snowpark DataFrame
  • [x] Pandas DataFrame
  • [x] DuckDB Tables
  • [x] BigQuery Client
  • [x] Polars DataFrame
  • [*] Dagster Integration
  • [x] Spark Connect
  • [x] Daft
  • [-] PDF Report
  • Metadata check
  • Help us in a discussion?

Whilst expanding the functionality feels a bit as an overkill because you most likely can connect spark via its drivers to whatever DBMS of your choice. In the desire to make it even more user-friendly we are aiming to make cuallee portable to all the providers above.

Authors

Contributors

Guidelines

Documentation

Paper

cuallee has been published in the Journal of Open Source Software

Vazquez et al., (2024). cuallee: A Python package for data quality checks across multiple DataFrame APIs. Journal of Open Source Software, 9(98), 6684, https://doi.org/10.21105/joss.06684

If you use cuallee please consider citing this work. Citation

License

Apache License 2.0 Free for commercial use, modification, distribution, patent use, private use. Just preserve the copyright and license.

Made with ❤️ in Utrecht 🇳🇱
Maintained over ⌛ from Ljubljana 🇸🇮
Extended 🚀 by contributions all over the 🌎

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc