Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Python library to provide an Unified cloud storage API for reading and writing parquet and deltalake files from/to the main cloud provider's object storage using the arrow format
April 2024
Alfredo Lorie Bernardo, Ignacio Rodriguez Sanchez
version 0.8.1
Python Library to read and write Parquet and Deltalake files from the main Cloud Providers Object-Store and Local Filesystem
Cloud Arrow
is a python library to provide read and write capabilities for Parquet and Deltalake files
(without relying on spark) from/to the main cloud providers object storage service (Azure ADLSGen2, Google GCSFS, AWS S3 -> coming soon)
and the Local FileSystem.
The main goal is to provide a single and unified API for reading and writing files from python programs.
This library is available in PyPI and distributed under the GNU license.4
Up to date, Cloud Arrow relies on the Apache Arrow under the hood and some fsspec-compatible filesystems implementations to connect and interact with the cloud providers object storage.
GitHub: https://github.com/a24lorie/Cloud-Arrow
The Cloud Arrow library provides an Unified API to read and write parquet files and delta-lake tables from the main cloud providers object storage and the local filesystem. The current filesystem implementations are:
AbstractStorage
└─── ADLSStorage
└─── GCSFSStorage
└─── LocalFileSystemStorage
└─── DBFSStorage (Future *)
└─── S3Storage (Future *)
When using the Cloud Arrow library the first step is to create an instance of one of the implementations above, let's see some examples
To read and write from Azure ADLS Gen2 create an instance of ADLSStorage object providing the following arguments:
*Currently, only the client-secret authentication mechanism supported, we plan to extend the authentication mechanisms to support other types in the future
from cloud_arrow.adls import ADLSStorage
object_storage = ADLSStorage(
tenant_id="AzureTentantId",
client_id="AzureClientId",
client_secret="AzureClientSecret",
account_name="storage-account",
container="container"
)
To read and write from Google GCSFS create an instance of GCSFSStorage object, by providing the following arguments:
from cloud_arrow.gcsfs import GCSFSStorage
object_storage = GCSFSStorage(
project="GCSFSProjectId",
access="read_only",
bucket="GCSFSBucket",
token="/path/google-secret.json",
default_location=""
)
google-secret.json
In the previous example the credentials filename path was used to authenticate. You can create the google-secret.json
file here.
You don't need to manually fill in JSON by hand, the below example is provided to show you what the end result should look like.
You should be able to read, write, and delete objects from at least one bucket.
{
"type": "service_account",
"project_id": "$YOUR_GOOGLE_PROJECT_ID",
"private_key_id": "...",
"private_key": "...",
"client_email": "...",
"client_id": "...",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": ""
}
To read and write from the Local Filesystem create an instance of LocalFileSystemStorage object:
from cloud_arrow.local import LocalFileSystemStorage
object_storage = LocalFileSystemStorage()
To read files from ADLSGen2, GCSFS or the Local Filesystem use the object_storage instance configured. The Cloud Arrow library provides an unified and consistence experience across all the filesystem implementations to read parquet files or delta-lake tables, the following methods are available for reading:
Let's take a look at some examples
read_batches(file_format: str, path: str, partitioning: str, filters=None, batch_size: int) -> pa.RecordBatch
batches_parquet = object_storage.read_batches(file_format="parquet", path="path_to_parquet", batch_size=2000)
batches_delta = object_storage.read_batches(file_format="deltalake", path="path_to_deltalake", batch_size=2000)
for batch_parquet in batches_parquet:
count_parquet += batch_parquet.num_rows
for batch_delta in batches_delta:
count_delta += batch_delta.num_rows
print(count_parquet)
print(count_delta)
read_to_arrow_table(file_format: str, path: str, partitioning: str, filters=None) -> pa.Table
result_parquet = object_storage.read_to_arrow_table(file_format="parquet", path="path_to_parquet")
result_delta = object_storage.read_to_arrow_table(file_format="deltalake", path="path_to_deltalake")
print(result_parquet.num_rows)
print(result_delta.num_rows)
read_to_pandas(file_format: str, path: str, partitioning: str, filters=None) -> DataFrame
result_parquet = object_storage.read_to_pandas(file_format="parquet", path="path_to_parquet")
result_delta = object_storage.read_to_pandas(file_format="deltalake", path="path_to_deltalake")
print(result_parquet.info())
print(result_delta.info())
By specifying the argument filters to any of the methods described previously the source data can be filtered during the reading. Filters can be applied to any given column on the source data. I f the source data is partitioned and a filter expression contains any partition column, the files within the partitions that does not match filtering condition will be skipped, resulting in better performance, this is known as partition pruning.
filters: (Optional) - pyarrow.compute.Expression or List[Tuple] or List[List[Tuple]]. See pyarrow filter for more details
import pyarrow.dataset as ds
batches_parquet_batch = object_storage.read_batches(
file_format="parquet",
path="path_to_parquet",
filters=(ds.field("FieldName") == 0),
batch_size=2000)
batches_delta_batch = object_storage.read_batches(
file_format="deltalake",
path="path_to_deltalake",
filters=(ds.field("FieldName") == 0),
batch_size=2000)
for batch_parquet in batches_parquet_batch:
count_parquet += batch_parquet.num_rows
for batch_delta in batches_delta_batch:
count_delta += batch_delta.num_rows
print(count_parquet)
print(count_delta)
# Read parquet with filtering expression to an Arrow Table
result_parquet_arr_table = object_storage.read_to_arrow_table(
file_format="parquet",
path="path_to_parquet"
filters=(ds.field("FieldName") == 0))
# Read delta lake with filtering expression to an Arrow Table
result_delta_arr_table = object_storage.read_to_arrow_table(
file_format="parquet",
path="path_to_parquet"
filters=(ds.field("FieldName") == 0))
print(result_parquet_arr_table.num_rows)
print(result_delta_arr_table.num_rows)
# Read parquet with filtering expression to a Pandas Dataframe
result_parquet_df = object_storage.read_to_pandas(
file_format="parquet",
path="path_to_parquet"
filters=(ds.field("FieldName") == 0))
# Read delta lake with filtering expression to a Pandas Dataframe
result_delta_df = object_storage.read_to_pandas(
file_format="parquet",
path="path_to_parquet"
filters=(ds.field("FieldName") == 0))
print(result_parquet_df.info())
print(result_delta_df.info())
To write files to ADLSGen2, GCSFS or the Local Filesystem use the object_storage instance configured. The Cloud Arrow library provides an unified and consistence experience across all the filesystem implementations to write parquet files or delta-lake tables, the following method is available for writing:
WriteOptions
| * partitions: List of the names of columns to split the dataset.
| * compression_codec: Allow to specify the compression codec (None, snappy, sz4, brotli, gzip, zstd)
└─── ParquetWriteOptions
* existing_data_behavior: Can be one of ['error', 'overwrite_or_ignore', 'delete_matching']
└─── DeltaLakeWriteOptions
* existing_data_behavior: Can be one of [['error', 'append', 'overwrite', 'ignore']]
Let's take a look at some examples
# Example writing parquet
object_storage.write(
data=table,
file_format="parquet",
path=path,
write_options=ParquetWriteOptions(
compression_codec="None",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
# Example writing deltalake
object_storage.write(
data=table,
file_format="deltalake",
path=path,
write_options=DeltaLakeWriteOptions(
compression_codec="None",
existing_data_behavior="overwrite") # 'error', 'append', 'overwrite', 'ignore'
)
# Example writing parquet
object_storage.write(
data=table,
file_format="parquet",
path=path,
write_options=ParquetWriteOptions(
partitions=["col1", "col2"],
compression_codec="snappy",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
# Example writing deltalake
object_storage.write(
data=table,
file_format="deltalake",
path=path,
write_options=DeltaLakeWriteOptions(
partitions=["col1", "col2"],
compression_codec="snappy",
existing_data_behavior="overwrite") # 'error', 'append', 'overwrite', 'ignore'
)
schema = pa.schema([
("Pregnancies", pa.int64()),
("Glucose", pa.int64()),
("BloodPressure", pa.int64()),
("SkinThickness", pa.int64()),
("Insulin", pa.int64()),
("BMI", pa.float64()),
("DiabetesPedigreeFunction", pa.float64()),
("Age", pa.int64()),
("Outcome", pa.int64())]
)
pregnancies = pa.array(numpy.random.randint(low=0, high=17, size=5))
glucose = pa.array(numpy.random.randint(low=0, high=199, size=5))
blood_pressure = pa.array(numpy.random.randint(low=0, high=122, size=5))
skin_thickness = pa.array(numpy.random.randint(low=0, high=99, size=5))
insulin = pa.array(numpy.random.randint(low=0, high=846, size=5))
bmi = pa.array(numpy.random.uniform(0.0, 67.1, size=5))
diabetes_pedigree_function = pa.array(numpy.random.uniform(0.08, 2.42, size=5))
age = pa.array(numpy.random.randint(low=21, high=81, size=5))
outcome = pa.array(numpy.random.randint(low=0, high=1, size=5))
def iter_record_batches():
for i in range(5):
yield pa.RecordBatch.from_arrays([
pregnancies, glucose, blood_pressure, skin_thickness,
insulin, bmi, diabetes_pedigree_function, age, outcome
], schema=schema)
batch_reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
# Example writing parquet in batches
object_storage.write(
data=batch_reader,
file_format="parquet",
path=path,
write_options=ParquetWriteOptions(
partitions=[],
compression_codec="None",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
# Example writing deltalake in batches
object_storage.write(
data=batch_reader,
file_format="deltalake",
path=path,
write_options=ParquetWriteOptions(
partitions=[],
compression_codec="None",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
schema = pa.schema([
("Pregnancies", pa.int64()),
("Glucose", pa.int64()),
("BloodPressure", pa.int64()),
("SkinThickness", pa.int64()),
("Insulin", pa.int64()),
("BMI", pa.float64()),
("DiabetesPedigreeFunction", pa.float64()),
("Age", pa.int64()),
("Outcome", pa.int64())]
)
pregnancies = pa.array(numpy.random.randint(low=0, high=17, size=5))
glucose = pa.array(numpy.random.randint(low=0, high=199, size=5))
blood_pressure = pa.array(numpy.random.randint(low=0, high=122, size=5))
skin_thickness = pa.array(numpy.random.randint(low=0, high=99, size=5))
insulin = pa.array(numpy.random.randint(low=0, high=846, size=5))
bmi = pa.array(numpy.random.uniform(0.0, 67.1, size=5))
diabetes_pedigree_function = pa.array(numpy.random.uniform(0.08, 2.42, size=5))
age = pa.array(numpy.random.randint(low=21, high=81, size=5))
outcome = pa.array(numpy.random.randint(low=0, high=1, size=5))
def iter_record_batches():
for i in range(5):
yield pa.RecordBatch.from_arrays([
pregnancies, glucose, blood_pressure, skin_thickness,
insulin, bmi, diabetes_pedigree_function, age, outcome
], schema=schema)
batch_reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
# Example writing parquet in batches
object_storage.write(
data=batch_reader,
file_format="parquet",
path=path,
write_options=ParquetWriteOptions(
partitions=[],
compression_codec="snappy",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
# Example writing deltalake in batches
object_storage.write(
data=batch_reader,
file_format="deltalake",
path=path,
write_options=ParquetWriteOptions(
partitions=[],
compression_codec="snappy",
existing_data_behavior="overwrite_or_ignore") # 'error', 'overwrite_or_ignore', 'delete_matching'
)
The Cloud Arrow library welcomes contributors from all developers, regardless of your experience or programming background. If you find a bug, send a pull request and we'll discuss things. If you are not familiar with "pull request" term I recommend reading the following article for better understanding We value kind communication and building a productive, friendly environment for maximum collaboration and fun.
FAQs
Python library to provide an Unified cloud storage API for reading and writing parquet and deltalake files from/to the main cloud provider's object storage using the arrow format
We found that cloud-arrow demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.