Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

repartipy

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

repartipy

Helper for handling PySpark DataFrame partition size 📑🎛️

  • 0.1.8
  • PyPI
  • Socket score

Maintainers
1

RepartiPy

RepartiPy helps you to elaborately handle PySpark DataFrame partition size.

Possible Use Cases

  • Repartition your DataFrame precisely, without knowing the whole DataFrame size (i.e. Dynamic Repartition)
  • Estimate your DataFrame size with more accuracy

Why RepartiPy

Although Spark SizeEstimator can be used to estimate a DataFrame size, it is not accurate sometimes. RepartiPy uses Spark's execution plan statistics in order to provide a roundabout way. It suggests two approaches to achieve this:

  • reaprtipy.SizeEstimator
  • reaprtipy.SamplingSizeEstimator

reaprtipy.SizeEstimator

Recommended when your executor resource (memory) is affordable to cache the whole DataFrame. SizeEstimator just simply caches the whole Dataframe into the memory and extract the execution plan statistics.

repartipy.SamplingSizeEstimator

Recommended when your executor resource (memory) is NOT affordable to cache the whole dataframe. SamplingSizeEstimator uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:

  1. Prevent double read from the source like S3, which might be inefficient -> better performance
  2. Reduce partition skewness by reading data again on purpose (leverage MaxPartitionBytes) -> better sampling result

Therefore, you must have HDFS settings on your cluster and enough disk space.

This may not be accurate compared to SizeEstimator due to sampling. If you want more accurate results, tune the sample_count option properly. Additionally, this approach will be slower than SizeEstimator as SamplingSizeEstimator requires disk I/O and additional logics.

How To Use

Setup

pip install repartipy

Prerequisite

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
input_data = [
        (1, "Seoul"),
        (2, "Busan"),
    ]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])

get_desired_partition_count()

Calculate ideal number of partitions for a DataFrame

SizeEstimator will suggest desired_partition_count, so that each partition can have desired_partition_size_in_bytes (default: 1GiB) after repartition. reproduce() produces exactly the same df, but internally reproduced by SizeEstimator for better performance. SizeEstimator reproduces df from Memory (Cache). SamplingSizeEstimator reproduces df from Disk (HDFS).

with SizeEstimator
import repartipy

one_gib_in_bytes = 1073741824

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
with SamplingSizeEstimator
import repartipy
    
one_gib_in_bytes = 1073741824

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")

estimate()

Estimate size of a DataFrame

with SizeEstimator
import repartipy

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    df_size_in_bytes = se.estimate()
with SamplingSizeEstimator
import repartipy

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    df_size_in_bytes = se.estimate()

Benchmark

Overall, there appears to be a slight performance loss when employing RepartiPy. This benchmark compares the running time of spark jobs in the following two cases to give a rough estimate:

  • Static Repartition (repartition without RepartiPy)
# e.g.
df.repartition(123).write.save("your/write/path")
  • Dynamic Repartition (repartition with RepartiPy)
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")

All the other conditions remain the same except the usage of RepartiPy.

Note

Benchmark results provided are for brief reference only, not absolute. Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).

SizeEstimator

  • DataFrame Size ~= 256 MiB (decompressed size)
StaticDynamic
Running Time8.5 min8.6 min

SamplingSizeEstimator

  • DataFrame Size ~= 241 GiB (decompressed size)
StaticDynamic
Running Time14 min16 min

Keywords

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc