
Research
/Security News
Contagious Interview Campaign Escalates With 67 Malicious npm Packages and New Malware Loader
North Korean threat actors deploy 67 malicious npm packages using the newly discovered XORIndex malware loader.
Python Spark SQL DataFrame schema management for sensible humans, with no dependencies aside from pyspark.
Don't sweat it... sparkql it ✨
sparkql
takes the pain out of working with DataFrame schemas in PySpark.
It makes schema definition more Pythonic. And it's
particularly useful you're dealing with structured data.
In plain old PySpark, you might find that you write schemas like this:
CITY_SCHEMA = StructType()
CITY_NAME_FIELD = "name"
CITY_SCHEMA.add(StructField(CITY_NAME_FIELD, StringType(), False))
CITY_LAT_FIELD = "latitude"
CITY_SCHEMA.add(StructField(CITY_LAT_FIELD, FloatType()))
CITY_LONG_FIELD = "longitude"
CITY_SCHEMA.add(StructField(CITY_LONG_FIELD, FloatType()))
CONFERENCE_SCHEMA = StructType()
CONF_NAME_FIELD = "name"
CONFERENCE_SCHEMA.add(StructField(CONF_NAME_FIELD, StringType(), False))
CONF_CITY_FIELD = "city"
CONFERENCE_SCHEMA.add(StructField(CONF_CITY_FIELD, CITY_SCHEMA))
And then plain old PySpark makes you deal with nested fields like this:
dframe.withColumn("city_name", df[CONF_CITY_FIELD][CITY_NAME_FIELD])
Instead, with sparkql
, schemas become a lot
more literate:
class City(Struct):
name = String(nullable=False)
latitude = Float()
longitude = Float()
class Conference(Struct):
name = String(nullable=False)
city = City()
As does dealing with nested fields:
dframe.withColumn("city_name", Conference.city.name.COL)
Here's a summary of sparkql
's features.
Struct
is (by default) used as its field name. This name can
be optionally overridden.PATH
and COL
special properties. Avoid hand-constructing strings
(or Column
s) to reference your nested fields.sparkql
schema.inheritance
, includes
, and
implements
.pretty_schema
.Read on for documentation on these features.
Each Spark atomic type has a counterpart sparkql
field:
PySpark type | sparkql field |
---|---|
ByteType | Byte |
IntegerType | Integer |
LongType | Long |
ShortType | Short |
DecimalType | Decimal |
DoubleType | Double |
FloatType | Float |
StringType | String |
BinaryType | Binary |
BooleanType | Boolean |
DateType | Date |
TimestampType | Timestamp |
Array
(counterpart to ArrayType
in PySpark) allows the definition
of arrays of objects. By creating a subclass of Struct
, we can
define a custom class that will be converted to a StructType
.
For
example,
given the sparkql
schema definition:
from sparkql import Struct, String, Array
class Article(Struct):
title = String(nullable=False)
tags = Array(String(), nullable=False)
comments = Array(String(nullable=False))
Then we can build the equivalent PySpark schema (a StructType
)
with:
from sparkql import schema
pyspark_struct = schema(Article)
Pretty printing the schema with the expression
sparkql.pretty_schema(pyspark_struct)
will give the following:
StructType([
StructField('title', StringType(), False),
StructField('tags',
ArrayType(StringType(), True),
False),
StructField('comments',
ArrayType(StringType(), False),
True)])
Many examples of how to use sparkql
can be found in
examples
.
By default, field names are inferred from the attribute name in the struct they are declared.
For example, given the struct
class Geolocation(Struct):
latitude = Float()
longitude = Float()
the concrete name of the Geolocation.latitude
field is latitude
.
Names also be overridden by explicitly specifying the field name as an argument to the field
class Geolocation(Struct):
latitude = Float(name="lat")
longitude = Float(name="lon")
which would mean the concrete name of the Geolocation.latitude
field
is lat
.
Referencing fields in nested data can be a chore. sparkql
simplifies this
with path referencing.
For example, if we have a schema with nested objects:
class Address(Struct):
post_code = String()
city = String()
class User(Struct):
username = String(nullable=False)
address = Address()
class Comment(Struct):
message = String()
author = User(nullable=False)
class Article(Struct):
title = String(nullable=False)
author = User(nullable=False)
comments = Array(Comment())
We can use the special PATH
property to turn a path into a
Spark-understandable string:
author_city_str = Article.author.address.city.PATH
"author.address.city"
COL
is a counterpart to PATH
that returns a Spark Column
object for the path, allowing it to be used in all places where Spark
requires a column.
Function equivalents path_str
, path_col
, and name
are also available.
This table demonstrates the equivalence of the property styles and the function
styles:
Property style | Function style | Result (both styles are equivalent) |
---|---|---|
Article.author.address.city.PATH | sparkql.path_str(Article.author.address.city) | "author.address.city" |
Article.author.address.city.COL | sparkql.path_col(Article.author.address.city) | Column pointing to author.address.city |
Article.author.address.city.NAME | sparkql.name(Article.author.address.city) | "city" |
For paths that include an array, two approaches are provided:
comment_usernames_str = Article.comments.e.author.username.PATH
"comments.author.username"
comment_usernames_str = Article.comments.author.username.PATH
"comments.author.username"
Both give the same result. However, the former (e
) is more
type-oriented. The e
attribute corresponds to the array's element
field. Although this looks strange at first, it has the advantage of
being inspectable by IDEs and other tools, allowing goodness such as
IDE auto-completion, automated refactoring, and identifying errors
before runtime.
Field metadata can be specified with the metadata
argument to a field, which accepts a dictionary
of key-value pairs.
class Article(Struct):
title = String(nullable=False,
metadata={"description": "The title of the article", "max_length": 100})
The metadata can be accessed with the METADATA
property of the field:
Article.title.METADATA
{"description": "The title of the article", "max_length": 100}
Struct method validate_data_frame
will verify if a given DataFrame's
schema matches the Struct.
For example,
if we have our Article
struct and a DataFrame we want to ensure adheres to the Article
schema:
dframe = spark_session.createDataFrame([{"title": "abc"}])
class Article(Struct):
title = String()
body = String()
Then we can can validate with:
validation_result = Article.validate_data_frame(dframe)
validation_result.is_valid
indicates whether the DataFrame is valid
(False
in this case), and validation_result.report
is a
human-readable string describing the differences:
Struct schema...
StructType([
StructField('title', StringType(), True),
StructField('body', StringType(), True)])
DataFrame schema...
StructType([
StructField('title', StringType(), True)])
Diff of struct -> data frame...
StructType([
- StructField('title', StringType(), True)])
+ StructField('title', StringType(), True),
+ StructField('body', StringType(), True)])
For convenience,
Article.validate_data_frame(dframe).raise_on_invalid()
will raise a InvalidDataFrameError
(see sparkql.exceptions
) if the
DataFrame is not valid.
sparkql
simplifies the process of creating an instance of a struct.
You might need to do this, for example, when creating test data, or
when creating an object (a dict or a row) to return from a UDF.
Use Struct.make_dict(...)
to instantiate a struct as a dictionary.
This has the advantage that the input values will be correctly
validated, and it will convert schema property names into their
underlying field names.
For example, given some simple Structs:
class User(Struct):
id = Integer(name="user_id", nullable=False)
username = String()
class Article(Struct):
id = Integer(name="article_id", nullable=False)
title = String()
author = User()
text = String(name="body")
Here are a few examples of creating dicts from Article
:
Article.make_dict(
id=1001,
title="The article title",
author=User.make_dict(
id=440,
username="user"
),
text="Lorem ipsum article text lorem ipsum."
)
# generates...
{
"article_id": 1001,
"author": {
"user_id": 440,
"username": "user"},
"body": "Lorem ipsum article text lorem ipsum.",
"title": "The article title"
}
Article.make_dict(
id=1002
)
# generates...
{
"article_id": 1002,
"author": None,
"body": None,
"title": None
}
See
this example
for an extended example of using make_dict
.
It is sometimes useful to be able to re-use the fields of one struct
in another struct. sparkql
provides a few features to enable this:
See the following examples for a better explanation.
For example, the following:
class BaseEvent(Struct):
correlation_id = String(nullable=False)
event_time = Timestamp(nullable=False)
class RegistrationEvent(BaseEvent):
user_id = String(nullable=False)
will produce the following RegistrationEvent
schema:
StructType([
StructField('correlation_id', StringType(), False),
StructField('event_time', TimestampType(), False),
StructField('user_id', StringType(), False)])
includes
declarationFor example, the following:
class EventMetadata(Struct):
correlation_id = String(nullable=False)
event_time = Timestamp(nullable=False)
class RegistrationEvent(Struct):
class Meta:
includes = [EventMetadata]
user_id = String(nullable=False)
will produce the RegistrationEvent
schema:
StructType(List(
StructField('user_id', StringType(), False),
StructField('correlation_id', StringType(), False),
StructField('event_time', TimestampType(), False)))
implements
declarationimplements
is similar to includes
, but does not automatically
incorporate the fields of specified structs. Instead, it is up to
the implementor to ensure that the required fields are declared in
the struct.
Failing to implement a field from an implements
struct will result in
a StructImplementationError
error.
class LogEntryMetadata(Struct):
logged_at = Timestamp(nullable=False)
class PageViewLogEntry(Struct):
class Meta:
implements = [LogEntryMetadata]
page_id = String(nullable=False)
# the above class declaration will fail with the following StructImplementationError error:
# Struct 'PageViewLogEntry' does not implement field 'logged_at' required by struct 'LogEntryMetadata'
Spark's stringified schema representation isn't very user-friendly, particularly for large schemas:
StructType([StructField('name', StringType(), False), StructField('city', StructType([StructField('name', StringType(), False), StructField('latitude', FloatType(), True), StructField('longitude', FloatType(), True)]), True)])
The function pretty_schema
will return something more useful:
StructType([
StructField('name', StringType(), False),
StructField('city',
StructType([
StructField('name', StringType(), False),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True)]),
True)])
StructType
typesIt can be useful to build a composite schema from two StructType
s. sparkql provides a
merge_schemas
function to do this.
schema_a = StructType([
StructField("message", StringType()),
StructField("author", ArrayType(
StructType([
StructField("name", StringType())
])
))
])
schema_b = StructType([
StructField("author", ArrayType(
StructType([
StructField("address", StringType())
])
))
])
merged_schema = merge_schemas(schema_a, schema_b)
results in a merged_schema
that looks like:
StructType([
StructField('message', StringType(), True),
StructField('author',
ArrayType(StructType([
StructField('name', StringType(), True),
StructField('address', StringType(), True)]), True),
True)])
Contributions are very welcome. Developers who'd like to contribute to this project should refer to CONTRIBUTING.md.
FAQs
sparkql: Apache Spark SQL DataFrame schema management for sensible humans
We found that sparkql demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
North Korean threat actors deploy 67 malicious npm packages using the newly discovered XORIndex malware loader.
Security News
Meet Socket at Black Hat & DEF CON 2025 for 1:1s, insider security talks at Allegiant Stadium, and a private dinner with top minds in software supply chain security.
Security News
CAI is a new open source AI framework that automates penetration testing tasks like scanning and exploitation up to 3,600× faster than humans.