Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Treasure Data extension for using pyspark.
You can install td-pyspark from PyPI by using pip
as follows:
$ pip install td-pyspark
If you want to install PySpark via PyPI as well, you can install as:
$ pip install td-pyspark[spark]
First contact support@treasure-data.com to enable td-spark feature. This feature is disabled by default.
td-pyspark is a library to enable Python to access tables in Treasure Data. The features of td_pyspark include:
For more details, see also td-spark FAQs.
You can try td_pyspark using Docker without installing Spark nor Python.
First create td-spark.conf file and set your TD API KEY and site (us, jp, eu01, ap02) configurations:
td-spark.conf
spark.td.apikey (Your TD API KEY)
spark.td.site (Your site: us, jp, eu01, ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.pyspark.enabled true
Launch pyspark Docker image. This image already has a pre-installed td_pyspark library:
$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1
Python 3.9.2 (default, Feb 19 2021, 17:33:48)
[GCC 10.2.1 20201203] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.9.2 (default, Feb 19 2021 17:33:48)
SparkSession available as 'spark'.
2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)
...
>>>
Try read a sample table by specifying a time range:
>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()
>>> df.show()
2021-05-10 09:07:40.233Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29)
2021-05-10 09:07:42.262Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|192.225.229.196| /category/software| -| 200|Mozilla/5.0 (Maci...| 117| GET|1412382292|
|null|120.168.215.131| /category/software| -| 200|Mozilla/5.0 (comp...| 53| GET|1412382284|
|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106| GET|1412382275|
|null| 140.168.145.49| /item/garden/2832| /item/toys/230| 200|Mozilla/5.0 (Maci...| 122| GET|1412382267|
|null| 52.168.78.222|/category/electro...| /item/games/2532| 200|Mozilla/5.0 (comp...| 73| GET|1412382259|
|null| 32.42.160.165| /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117| GET|1412382251|
|null| 48.204.59.23| /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 52| GET|1412382243|
|null|136.207.150.227|/category/electro...| -| 200|Mozilla/5.0 (iPad...| 120| GET|1412382234|
|null| 204.21.174.187| /category/jewelry| /item/office/3462| 200|Mozilla/5.0 (Wind...| 59| GET|1412382226|
|null| 224.198.88.93| /category/office| /category/music| 200|Mozilla/4.0 (comp...| 46| GET|1412382218|
|null| 96.54.24.116| /category/games| -| 200|Mozilla/5.0 (Wind...| 40| GET|1412382210|
|null| 184.42.224.210| /category/computers| -| 200|Mozilla/5.0 (Wind...| 95| GET|1412382201|
|null| 144.72.47.212|/item/giftcards/4684| /item/books/1031| 200|Mozilla/5.0 (Wind...| 65| GET|1412382193|
|null| 40.213.111.170| /item/toys/1085| /category/cameras| 200|Mozilla/5.0 (Wind...| 65| GET|1412382185|
|null| 132.54.226.209|/item/electronics...| /category/software| 200|Mozilla/5.0 (comp...| 121| GET|1412382177|
|null| 108.219.68.64|/category/cameras...| -| 200|Mozilla/5.0 (Maci...| 54| GET|1412382168|
|null| 168.66.149.218| /item/software/4343| /category/software| 200|Mozilla/4.0 (comp...| 139| GET|1412382160|
|null| 80.66.118.103| /category/software| -| 200|Mozilla/4.0 (comp...| 92| GET|1412382152|
|null|140.171.147.207| /category/music| /category/jewelry| 200|Mozilla/5.0 (Wind...| 119| GET|1412382144|
|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137| GET|1412382135|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
>>>
TDSparkContext is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:
td = TDSparkContext(spark)
To read a table, use td.table(table name)
:
df = td.table("sample_datasets.www_access").df()
df.show()
To change the context database, use td.use(database_name)
:
td.use("sample_datasets")
# Accesses sample_datasets.www_access
df = td.table("www_access").df()
By calling .df()
your table data will be read as Spark's DataFrame.
The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.
Treasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed.
.within(...)
function can be used to specify a target time range in a concise syntax.
within
function accepts the same syntax used in TD_INTERVAL function in Presto.
For example, to read the last 1 hour range of data, use within("-1h")
:
td.table("tbl").within("-1h").df()
You can also read the last day's data:
td.table("tbl").within("-1d").df()
You can also specify an offset of the relative time range. This example reads the last days's data beginning from 7 days ago:
td.table("tbl").within("-1d/-7d").df()
If you know an exact time range, within("(start time)/(end time)")
is useful:
>>> df = td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
>>> df.show()
2021-05-10 09:10:02.366Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29)
...
See this doc for more examples of interval strings.
If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:
>>> q = td.presto("select code, * from sample_datasets.www_access")
>>> q.show()
2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500| 2|
| 404| 17|
+----+----+
The query result is represented as a DataFrame.
To run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use execute_presto(sql)
:
td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
To use tables in Treaure Data inside Spark SQL, create a view with df.createOrReplaceTempView(...)
:
# Read TD table as a DataFrame
df = td.table("mydb.test1").df()
# Register the DataFrame as a view
df.createOrReplaceTempView("test1")
spark.sql("SELECT * FROM test1").show()
Create a new table or database:
td.create_database_if_not_exists("mydb")
td.create_table_if_not_exists("mydb.test1")
Delete unnecessary tables:
td.drop_table_if_exists("mydb.test1")
td.drop_database_if_exists("mydb")
You can also check the presence of a table:
td.table("mydb.test1").exists() # True if the table exists
User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values).
You can create a UDP table partitioned by id (string type column) as follows:
td.create_udp_s("mydb.user_list", "id")
To create a UDP table, partitioned by Long (bigint) type column, use td.create_udp_l
:
td.create_udp_l("mydb.departments", "dept_id")
You can replace the contents of two tables. The input tables must be in the same database:
# Swap the contents of two tables
td.swap_tables("mydb.tbl1", "mydb.tbl2")
# Another way to swap tables
td.table("mydb.tbl1").swap_table_with("tbl2")
To save your local DataFrames as a table, td.insert_into(df, table)
and td.create_or_replace(df, table)
can be used:
# Insert the records in the input DataFrame to the target table:
td.insert_into(df, "mydb.tbl1")
# Create or replace the target table with the content of the input DataFrame:
td.create_or_replace(df, "mydb.tbl2")
To specify a new api key aside from the key that is configured in td-spark.conf, just use td.with_apikey(apikey)
:
# Returns a new TDSparkContext with the specified key
td2 = td.with_apikey("key2")
For reading tables or uploading DataFrames with the new key, use td2
:
# Read a table with key2
df = td2.table("sample_datasets.www_access").df()
...
# Insert the records with key2
td2.insert_into(df, "mydb.tbl1")
To submit your PySpark script to a Spark cluster, you will need the following files:
spark.td.site
(See above).pip show -f td-pyspark
, and copy td_pyspark.py to your favorite location$SPARK_HOME
.Here is an example PySpark application code: my_app.py
import td_pyspark
from pyspark.sql import SparkSession
# Create a new SparkSession
spark = SparkSession\
.builder\
.appName("myapp")\
.getOrCreate()
# Create TDSparkContext
td = td_pyspark.TDSparkContext(spark)
# Read the table data within -1d (yesterday) range as DataFrame
df = td.table("sample_datasets.www_access").within("-1d").df()
df.show()
To run my_app.py
use spark-submit by specifying the necessary files mentioned above:
# Launching PySpark with the local mode
$ ${SPARK_HOME}/bin/spark-submit --master "local[4]"\
--driver-class-path td-spark-assembly.jar\
--properties-file=td-spark.conf\
--py-files td_pyspark.py\
my_app.py
local[4]
means running a Spark cluster locally using 4 threads.
To use a remote Spark cluster, specify master
address, e.g., --master=spark://(master node IP address):7077
.
The package contains pre-built binary of td-spark so that you can add it into the classpath as default.
TDSparkContextBuilder.default_jar_path()
returns the path to the default td-spark-assembly.jar file.
Passing the path to jars
method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.
import td_pyspark
from pyspark.sql import SparkSession
builder = SparkSession\
.builder\
.appName("td-pyspark-app")
td = td_pyspark.TDSparkContextBuilder(builder)\
.apikey("XXXXXXXXXXXXXX")\
.jars(TDSparkContextBuilder.default_jar_path())\
.build()
FAQs
Treasure Data extension for pyspark
We found that td-pyspark demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.