
Research
SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains
An emerging npm supply chain attack that infects repos, steals CI secrets, and targets developer AI toolchains for further compromise.
data-linter
Advanced tools
A python package to to allow automatic validation of data as part of a Data Engineering pipeline. It is designed to read in and validate tabular data against a given schema for the data. The schemas provided adhere to our metadata schemas standards for data. This package can also be used to manage movement of data from a landing area (s3 or locally) to a new location based on the result of the validation.
This package uses own lightweight pandas dataframe operations to run simple validation tests on the columns based on the datatype and additional tags in the metadata. Utilises mojap-arrow-pd-parser for reading data.
pip install data_linter
This package takes a yaml based config file written by the user (see example below), and validates data in the specified s3 folder path against specified metadata. If the data conforms to the metadata, it is moved to the specified s3 folder path for the next step in the pipeline (note can also provide local paths). Any failed checks are passed to a separate location for testing. The package also generates logs to allow you to explore issues in more detail.
To run the validation, at most simple you can use the following:
In Python:
from data_linter.validation import run_validation
config_path = "config.yaml"
run_validation(config_path)
Via command line:
data_linter --config_path config.yaml
land-base-path: s3://testing-bucket/land/ # Where to get the data from
fail-base-path: s3://testing-bucket/fail/ # Where to write the data if failed
pass-base-path: s3://testing-bucket/pass/ # Where to write the data if passed
log-base-path: s3://testing-bucket/logs/ # Where to write logs
compress-data: true # Compress data when moving elsewhere (only applicable from CSV/JSON)
remove-tables-on-pass: true # Delete the tables in land if validation passes
all-must-pass: true # Only move data if all tables have passed
fail-unknown-files:
exceptions:
- additional_file.txt
- another_additional_file.txt
validator-engine-params:
log_verbosity: 2 # how many samples of incorrect data to include in logs, 0 means all. Pandas validator only.
# Tables to validate
tables:
table1:
required: true # Does the table have to exist
pattern: null # Assumes file is called table1 (same as key)
metadata: meta_data/table1.json # local path to metadata
log_verbosity: 5 # overrides the validator-engine-params log verbosity for this table only
allow-missing-cols: False # allows there to be data in the metadata but not actual data
table2:
required: true
pattern: ^table2
metadata: meta_data/table2.json
row-limit: 10000 # for big tables - only take the first x rows
allow-unexpected-data: True # allows there to be columns present in the data but not the metadata
unexpected data and missing columns
To allow flexibilty in what is validated in the data, the parameters allow-unexpected-data and allow-missing-cols has been added. These can be described neatly in one diagram:

You can also run the validator as part of a python script, where you might want to dynamically generate your config:
from data_linter.validation import run_validation
base_config = {
"land-base-path": "s3://my-bucket/land/",
"fail-base-path": "s3://my-bucket/fail/",
"pass-base-path": "s3://my-bucket/pass/",
"log-base-path": "s3://my-bucket/log/",
"compress-data": False,
"remove-tables-on-pass": False,
"all-must-pass": False,
"tables": {}
}
def get_table_config(table_name):
d = {
"required": False,
"expect-header": True,
"metadata": f"metadata/{table_name}.json",
"pattern": r"^{}\.jsonl$".format(table_name),
"headers-ignore-case": True,
}
return d
for table in ["table1", "table2"]:
base_config["tables"][table_name] = get_table_config(table_name)
run_validation(base_config) # Then watch that log go...
Without all the bells and whistles
If you do not need data_linter to match files to a specified config, log the process and then move data around based on the outcome of the validation you can just use the validators themselves:
# Example using simple pandas validatior (without added data_linter features)
import json
from data_linter.validators import PandasValidator
filepath = "tests/data/end_to_end1/land/table1.csv"
table_params = {
"expect-header": True
}
with open("tests/data/end_to_end1/meta_data/table1.json") as f:
metadata = json.load(f)
pv = PandasValidator(filepath, table_params, metadata)
pv.read_data_and_validate()
pv.valid # True (data in table1.csv is valid against metadata)
pv.response.get_result() # Returns dict of all tests ran against data
# The response object of the PandasValidator in itself, and has it's own functions
pv.response.get_names_of_column_failures() # [], i.e. no cols failed
Data Linter can also work in parallel to trigger multiple validations at once (only supports use of S3 atm). An example below:
In this scenario we use the parallisation process to init the process split the job into 4 validators and then run the closedown.
# Simple example running DL with multiple validators (in this case 4)
# [init] -> [validator]x4 -> [closedown]
import yaml
from data_linter import validation
from dataengineeringutils3.s3 import get_filepaths_from_s3_folder
simple_yaml_config = """
land-base-path: s3://land/
fail-base-path: s3://fail/
pass-base-path: s3://pass/
log-base-path: s3://log/
compress-data: false
remove-tables-on-pass: true
all-must-pass: true
# Tables to validate
tables:
table1:
required: true
metadata: tests/data/end_to_end1/meta_data/table1.json
expect-header: true
table2:
required: true
pattern: ^table2
metadata: tests/data/end_to_end1/meta_data/table2.json
"""
test_folder = "tests/data/end_to_end1/land/"
config = yaml.safe_load(simple_yaml_config)
# Init stage
validation.para_run_init(4, config)
# Validation stage (although ran sequentially this can be ran in parallel)
for i in range(4):
validation.para_run_validation(i, config)
# Closedown stage
validation.para_collect_all_status(config)
validation.para_collect_all_logs(config)
There are more parallelisation examples, which can be found in the test_simple_examples.py test module
This is the default validator used by data_linter as of the version 5 release.
Timestamps are always a pain to deal with especially when using different file types. The Pandas Validator has tried to keep true to the file types based on the tests it runs.
If the file type stores date/timestamp information as a string (i.e. CSV and JSONL) then the pandas Validator will read in the timestamp / date columns as strings. It will then apply validation tests against those columns checking if the string representation of the dates in the data is a valid date. For timestamp and date types these tests assume ISO standard string representation %Y-%m-%d %H:%M:%S and %Y-%m-%d. If your timestamp/date types are comming in as strings that do not conform to the ISO standard format then you can provide you column in the metadata with a datetime_format property that specifies the exected format e.g.
...
"columns": [
{
"name": "date_in_uk",
"type": "date64",
"datetime_format": "%d/%m/%Y"
},
...
Often you might recieve data that is exported from a system that might encode your timestamp as a date but is written to a format that encodes the data as a timestamp. In this scenario you would expect your dates (in a str timestamp format) to always have a time component of 00:00:00. You can also use data_linter to validate this by specifing the datetime_format of your column as the expected timestamp in format but still specify that the data type is a date e.g.
...
"columns": [
{
"name": "date_in_uk",
"type": "date64",
"datetime_format": "%d/%m/%Y 00:00:00"
},
...
In the above data_linter will attempt to fist parse the column with the specified datetime_format and then as the column type is date it will check that it it truely a date (and not have a time component).
If the file_format is parquet then timestamps are encoded in the filetype and there are just read in as is. Currently data_linter doesn't support minimum and maximum tests for timestamps/dates and also does not currently have tests for time types.
How logic works

We have tests that run on the current state of the poetry.lock file (i.e. the current dependencies). We also run tests based on the most up to date dependencies allowed in pyproject.toml. This allows us to see if there will be any issues when updating dependences. These can be run locally in the tests folder.
When updating this package, make sure to change the version number in pyproject.toml and describe the change in CHANGELOG.md.
If you have changed any dependencies in pyproject.toml, run poetry update to update poetry.lock.
Once you have created a release in GitHub, to publish the latest version to PyPI, run:
poetry build
poetry publish -u <username>
Here, you should substitute for your PyPI username. In order to publish to PyPI, you must be an owner of the project.
FAQs
data linter
We found that data-linter demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 4 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
An emerging npm supply chain attack that infects repos, steals CI secrets, and targets developer AI toolchains for further compromise.

Company News
Socket is proud to join the OpenJS Foundation as a Silver Member, deepening our commitment to the long-term health and security of the JavaScript ecosystem.

Security News
npm now links to Socket's security analysis on every package page. Here's what you'll find when you click through.