Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications. dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
The dbt-glue
package implements the dbt adapter protocol for AWS Glue's Spark engine.
It supports running dbt against Spark, through the new Glue Interactive Sessions API.
To learn how to deploy a data pipeline in your modern data platform using the dbt-glue
adapter, please read the following blog post: Build your data pipeline in your AWS modern data platform using AWS Lake Formation, AWS Glue, and dbt Core
The package can be installed from PyPI with:
$ pip3 install dbt-glue
For further (and more likely up-to-date) info, see the README
There are two IAM principals used with interactive sessions.
Read this documentation to configure these principals.
You will find bellow a least privileged policy to enjoy all features of dbt-glue
adapter.
Please to update variables between <>
, here are explanations of these arguments:
Args | Description |
---|---|
region | The region where your Glue database is stored |
AWS Account | The AWS account where you run your pipeline |
dbt output database | The database updated by dbt (this is the schema configured in the profile.yml of your dbt environment) |
dbt source database | All databases used as source |
dbt output bucket | The bucket name where the data will be generate dbt (the location configured in the profile.yml of your dbt environment) |
dbt source bucket | The bucket name of source databases (if they are not managed by Lake Formation) |
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Read_and_write_databases",
"Action": [
"glue:SearchTables",
"glue:BatchCreatePartition",
"glue:CreatePartitionIndex",
"glue:DeleteDatabase",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:DeleteTableVersion",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:DeletePartitionIndex",
"glue:GetTableVersion",
"glue:UpdateColumnStatisticsForTable",
"glue:CreatePartition",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"glue:UpdateColumnStatisticsForPartition",
"glue:CreateDatabase",
"glue:BatchDeleteTableVersion",
"glue:BatchDeleteTable",
"glue:DeletePartition",
"glue:GetUserDefinedFunctions",
"lakeformation:ListResources",
"lakeformation:BatchGrantPermissions",
"lakeformation:ListPermissions",
"lakeformation:GetDataAccess",
"lakeformation:GrantPermissions",
"lakeformation:RevokePermissions",
"lakeformation:BatchRevokePermissions",
"lakeformation:AddLFTagsToResource",
"lakeformation:RemoveLFTagsFromResource",
"lakeformation:GetResourceLFTags",
"lakeformation:ListLFTags",
"lakeformation:GetLFTag",
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:table/<dbt output database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt output database>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_databases",
"Action": [
"glue:SearchTables",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:GetTableVersion",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"lakeformation:ListResources",
"lakeformation:ListPermissions"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:table/<dbt source database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt source database>",
"arn:aws:glue:<region>:<AWS Account>:database/default",
"arn:aws:glue:<region>:<AWS Account>:database/global_temp"
],
"Effect": "Allow"
},
{
"Sid": "Storage_all_buckets",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>",
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_and_write_buckets",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_buckets",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
}
]
}
Because dbt
and dbt-glue
adapter are compatible with Python versions 3.7, 3.8, and 3.9, check the version of Python:
$ python3 --version
Configure a Python virtual environment to isolate package version and code dependencies:
$ python3 -m venv dbt_venv
$ source dbt_venv/bin/activate
$ python3 -m pip install --upgrade pip
Configure the last version of AWS CLI
$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install
Install boto3 package
$ sudo yum install gcc krb5-devel.x86_64 python3-devel.x86_64 -y
$ pip3 install —-upgrade boto3
Install the package:
$ pip3 install dbt-glue
type: glue
query-comment: This is a glue dbt example
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: us-east-1
workers: 2
worker_type: G.1X
idle_timeout: 10
schema: "dbt_demo"
session_provisioning_timeout_in_seconds: 120
location: "s3://dbt_demo_bucket/dbt_demo_data"
The table below describes all the options.
Option | Description | Mandatory |
---|---|---|
project_name | The dbt project name. This must be the same as the one configured in the dbt project. | yes |
type | The driver to use. | yes |
query-comment | A string to inject as a comment in each query that dbt runs. | no |
role_arn | The ARN of the glue interactive session IAM role. | yes |
region | The AWS Region were you run the data pipeline. | yes |
workers | The number of workers of a defined workerType that are allocated when a job runs. | yes |
worker_type | The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X. | yes |
schema | The schema used to organize data stored in Amazon S3.Additionally, is the database in AWS Lake Formation that stores metadata tables in the Data Catalog. | yes |
session_provisioning_timeout_in_seconds | The timeout in seconds for AWS Glue interactive session provisioning. | yes |
location | The Amazon S3 location of your target data. | yes |
query_timeout_in_minutes | The timeout in minutes for a signle query. Default is 300 | no |
idle_timeout | The AWS Glue session idle timeout in minutes. (The session stops after being idle for the specified amount of time) | no |
glue_version | The version of AWS Glue for this session to use. Currently, the only valid options are 2.0, 3.0 and 4.0. The default value is 4.0. | no |
security_configuration | The security configuration to use with this session. | no |
connections | A comma-separated list of connections to use in the session. | no |
conf | Specific configuration used at the startup of the Glue Interactive Session (arg --conf) | no |
extra_py_files | Extra python Libs that can be used by the interactive session. | no |
delta_athena_prefix | A prefix used to create Athena compatible tables for Delta tables (if not specified, then no Athena compatible table will be created) | no |
tags | The map of key value pairs (tags) belonging to the session. Ex: KeyName1=Value1,KeyName2=Value2 | no |
seed_format | By default parquet , can be Spark format compatible like csv or json | no |
seed_mode | By default overwrite , the seed data will be overwritten, you can set it to append if you just want to add new data in your dataset | no |
default_arguments | The map of key value pairs parameters belonging to the session. More information on Job parameters used by AWS Glue. Ex: --enable-continuous-cloudwatch-log=true,--enable-continuous-log-filter=true | no |
glue_session_id | re-use a glue-session to run multiple dbt run commands. Will create a new glue-session using glue_session_id if it does not exists yet. | no |
glue_session_reuse | re-use the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closed. The glue session will close after idle_timeout time is expired after idle_timeout time | no |
datalake_formats | The ACID datalake format that you want to use if you are doing merge, can be hudi , ìceberg or delta | no |
use_arrow | (experimental) use an arrow file instead of stdout to have better scalability. | no |
When materializing a model as table
, you may include several optional configs that are specific to the dbt-spark plugin, in addition to the standard model configs.
Option | Description | Required? | Example |
---|---|---|---|
file_format | The file format to use when creating tables (parquet , csv , json , text , jdbc or orc ). | Optional | parquet |
partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | date_day |
clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | country_code |
buckets | The number of buckets to create while clustering | Required if clustered_by is specified | 8 |
custom_location | By default, the adapter will store your data in the following path: location path /schema /table . If you don't want to follow that default behaviour, you can use this parameter to set your own custom location on S3 | No | s3://mycustombucket/mycustompath |
hudi_options | When using file_format hudi , gives the ability to overwrite any of the default configuration options. | Optional | {'hoodie.schema.on.read.enable': 'true'} |
dbt seeks to offer useful and intuitive modeling abstractions by means of its built-in configurations and materializations.
For that reason, the dbt-glue plugin leans heavily on the incremental_strategy
config. This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of three values:
append
(default): Insert new records without updating or overwriting any existing data.insert_overwrite
: If partition_by
is specified, overwrite partitions in the table with new data. If no partition_by
is specified, overwrite the entire table with new data.merge
(Apache Hudi and Apache Iceberg only): Match records based on a unique_key
; update old records, insert new ones. (If no unique_key
is specified, all new data is inserted, similar to append
.)Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, incremental_strategy
may be specified in dbt_project.yml
or within a model file's config()
block.
Notes:
The default strategy is insert_overwrite
append
strategyFollowing the append
strategy, dbt will perform an insert into
statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy cannot update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- All rows returned by this query will be appended to the existing table
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view spark_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
insert_overwrite
strategyThis strategy is most effective when specified alongside a partition_by
clause in your model config. dbt will run an atomic insert overwrite
statement that dynamically replaces all partitions included in your query. Be sure to re-select all of the relevant data for a partition when using this incremental strategy.
If no partition_by
is specified, then the insert_overwrite
strategy will atomically replace all contents of the table, overriding all existing data with only the new records. The column schema of the table remains the same, however. This can be desirable in some limited circumstances, since it minimizes downtime while the table contents are overwritten. The operation is comparable to running truncate
+ insert
on other databases. For atomic replacement of Delta-formatted tables, use the table
materialization (which runs create or replace
) instead.
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Every partition returned by this query will be overwritten
when this model runs
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from events
group by 1
create temporary view spark_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.spark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp
Specifying insert_overwrite
as the incremental strategy is optional, since it's the default strategy used when none is specified.
merge
strategyCompatibility:
NB:
For Glue 3: you have to setup a Glue connectors.
For Glue 4: use the datalake_formats
option in your profile.yml
When using a connector be sure that your IAM role has these policies:
{
"Sid": "access_to_connections",
"Action": [
"glue:GetConnection",
"glue:GetConnections"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:connection/*"
],
"Effect": "Allow"
}
and that the managed policy AmazonEC2ContainerRegistryReadOnly
is attached.
Be sure that you follow the getting started instructions here.
This blog post also explain how to setup and works with Glue Connectors
Usage notes: The merge
with Hudi incremental strategy requires:
file_format: hudi
in your table configurationdatalake_formats: hudi
connections: name_of_your_hudi_connector
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
dbt will run an atomic merge
statement which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a unique_key
is specified (recommended), dbt will update old records with values from new records that match on the key column. If a unique_key
is not specified, dbt will forgo match criteria and simply insert all new records (similar to append
strategy).
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
datalake_formats: hudi
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
file_format='hudi',
hudi_options={
'hoodie.datasource.write.precombine.field': 'eventtime',
}
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
You can also use Delta Lake to be able to use merge feature on tables.
Usage notes: The merge
with Delta incremental strategy requires:
file_format: delta
in your table configurationdatalake_formats: delta
connections: name_of_your_delta_connector
conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Athena: Athena is not compatible by default with delta tables, but you can configure the adapter to create Athena tables on top of your delta table. To do so, you need to configure the two following options in your profile:
delta_athena_prefix: "the_prefix_of_your_choice"
MSCK REPAIR TABLE your_delta_table
after each new partition addingtest_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: delta
conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
delta_athena_prefix: "delta"
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
partition_by=['dt'],
file_format='delta'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen,
current_date() as dt
from events
group by 1
Usage notes: The merge
with Iceberg incremental strategy requires:
file_format: Iceberg
in your table configurationdatalake_formats: iceberg
connections: name_of_your_iceberg_connector
(
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://<PATH_TO_YOUR_WAREHOUSE>
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.sql.warehouse=s3://<your-bucket-name>
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoDbLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=<DYNAMODB_TABLE_NAME>
You'll also need to grant the dbt-glue execution role with the appropriate permissions on DynamoDB
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CommitLockTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:ConditionCheckItem",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:<AWS_REGION>:<AWS_ACCOUNT_ID>:table/<DYNAMODB_TABLE_NAME>"
}
]
}
org.apache.iceberg.aws.glue.DynamoLockManager
instead : --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoDbLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable
dbt will run an atomic merge
statement which looks nearly identical to the default merge behavior on Snowflake and BigQuery. You need to provide a unique_key
to perform merge operation otherwise it will fail. This key is to provide in a Python list format and can contains multiple column name to create a composite unique_key.
insert_overwrite
and append
strategies.warehouse
conf must be provided, but it's overwritten by the adapter location
in your profile or custom_location
in model configuration.iceberg_expire_snapshots
set to 'True', if you need to have historical auditable changes, set: iceberg_expire_snapshots='False'
.custom_iceberg_catalog_namespace
parameter configures the namespace for Apache Iceberg catalog integration. This parameter enables the use of Iceberg tables within your Spark application by setting up the necessary catalog configurations. Default Value: glue_catalog
When specifying a non-null and non-empty value for custom_iceberg_catalog_namespace
, the following Spark configurations must be provided:--conf spark.sql.catalog.{catalog_namespace}=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.{catalog_namespace}.warehouse={warehouse_path}
--conf spark.sql.catalog.{catalog_namespace}.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.{catalog_namespace}.io-impl=org.apache.iceberg.aws.s3.S3FileIO
When using the default value, the following spark configuration should be added to enable iceberg.
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://your-warehouse-path
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
table_properties
can be found here.test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: iceberg
conf: --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.warehouse=s3://aws-dbt-glue-datalake-1234567890-eu-west-1/dbt_test_project --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['user_id'],
file_format='iceberg',
iceberg_expire_snapshots='False',
partition_by=['status']
table_properties={'write.target-file-size-bytes': '268435456'}
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
{% snapshot demosnapshot %}
{{
config(
strategy='timestamp',
target_schema='jaffle_db',
updated_at='dt',
file_format='iceberg'
) }}
select * from {{ ref('customers') }}
{% endsnapshot %}
Monitoring is an important part of maintaining the reliability, availability, and performance of AWS Glue and your other AWS solutions. AWS provides monitoring tools that you can use to watch AWS Glue, identify the required number of workers required for your Glue Interactive Session, report when something is wrong and take action automatically when appropriate. AWS Glue provides Spark UI, and CloudWatch logs and metrics for monitoring your AWS Glue jobs. More information on: Monitoring AWS Glue Spark jobs
Usage notes: Monitoring requires:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CloudwatchMetrics",
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {
"StringEquals": {
"cloudwatch:namespace": "Glue"
}
}
},
{
"Sid": "CloudwatchLogs",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:/aws-glue/*",
"arn:aws:s3:::bucket-to-write-sparkui-logs/*"
]
}
]
}
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-metrics=true, --enable-continuous-cloudwatch-log=true, --enable-continuous-log-filter=true, --enable-spark-ui=true, --spark-event-logs-path=s3://bucket-to-write-sparkui-logs/dbt/"
If you want to use the Spark UI, you can launch the Spark history server using a AWS CloudFormation template that hosts the server on an EC2 instance, or launch locally using Docker. More information on Launching the Spark history server
Auto Scaling is available since AWS Glue version 3.0 or later. More information on the following AWS blog post: "Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark"
With Auto Scaling enabled, you will get the following benefits:
AWS Glue automatically adds and removes workers from the cluster depending on the parallelism at each stage or microbatch of the job run.
It removes the need for you to experiment and decide on the number of workers to assign for your AWS Glue Interactive sessions.
Once you choose the maximum number of workers, AWS Glue will choose the right size resources for the workload.
You can see how the size of the cluster changes during the Glue Interactive sessions run by looking at CloudWatch metrics. More information on Monitoring your Glue Interactive Session.
Usage notes: AWS Glue Auto Scaling requires:
workers
parameter sets the maximum number of workers)--enable-auto-scaling=true
parameter on your Glue Interactive Session Config (in your profile).
More information on Job parameters used by AWS Gluetest_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-auto-scaling=true"
In many cases, you may need to run you dbt jobs to read from another AWS account.
Review the following link https://repost.aws/knowledge-center/glue-tables-cross-accounts to set up access policies in source and target accounts
Add the following "spark.hadoop.hive.metastore.glue.catalogid=" to your conf in the DBT profile, as such, you can have multiple outputs for each of the accounts that you have access to.
Note: The access cross-accounts need to be within the same AWS Region
test_project:
target: dev
outputsAccountB:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: "--conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
--conf spark.hadoop.hive.metastore.glue.catalogid=<TARGET-AWS-ACCOUNT-ID-B>"
Relation-level docs persistence is supported since dbt v0.17.0. For more information on configuring docs persistence, see the docs.
When the persist_docs
option is configured appropriately, you'll be able to
see model descriptions in the Comment
field of describe [table] extended
or show table extended in [database] like '*'
.
schema
, never database
Apache Spark uses the terms "schema" and "database" interchangeably. dbt understands
database
to exist at a higher level than schema
. As such, you should never
use or set database
as a node config or in the target profile when running dbt-glue.
If you want to control the schema/database in which dbt will materialize models,
use the schema
config and generate_schema_name
macro only.
For more information, check the dbt documentation about custom schemas.
The adapter supports AWS Lake Formation tags management enabling you to associate existing tags defined out of dbt-glue to database objects built by dbt-glue (database, table, view, snapshot, incremental models, seeds).
The adapter also supports AWS Lakeformation data cell filtering.
The below configuration let the specified principal (lf-data-scientist IAM user) access rows that have a customer_lifetime_value > 15 and all the columns specified ('customer_id', 'first_order', 'most_recent_order', 'number_of_orders')
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'column_names': ['customer_id', 'first_order', 'most_recent_order', 'number_of_orders']
}
},
}
}
The below configuration let the specified principal (lf-data-scientist IAM user) access rows that have a customer_lifetime_value > 15 and all the columns except the one specified ('first_name')
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}
See below some examples of how you can integrate LF Tags management and data cell filtering to your configurations :
This way of defining your Lakeformation rules is appropriate if you want to handle the tagging and filtering policy at object level. Remember that it overrides any configuration defined at dbt-project level.
{{ config(
materialized='incremental',
unique_key="customer_id",
incremental_strategy='append',
lf_tags_config={
'enabled': true,
'drop_existing' : False,
'tags_database':
{
'name_of_my_db_tag': 'value_of_my_db_tag'
},
'tags_table':
{
'name_of_my_table_tag': 'value_of_my_table_tag'
},
'tags_columns': {
'name_of_my_lf_tag': {
'value_of_my_tag': ['customer_id', 'customer_lifetime_value', 'dt']
}}},
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}
) }}
select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value,
current_date() as dt
from customers
left join customer_orders using (customer_id)
left join customer_payments using (customer_id)
This way you can specify tags and data filtering policy for a particular path in your dbt project (eg. models, seeds, models/model_group1, etc.) This is especially useful for seeds, for which you can't define configuration in the file directly.
seeds:
+lf_tags_config:
enabled: true
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
models:
+lf_tags_config:
enabled: true
drop_existing: True
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'
To perform a functional test:
$ pip3 install -r dev-requirements.txt
$ python3 setup.py build && python3 setup.py install_lib
$ export DBT_AWS_ACCOUNT=123456789101
$ export DBT_GLUE_REGION=us-east-1
$ export DBT_S3_LOCATION=s3://mybucket/myprefix
$ export DBT_GLUE_ROLE_ARN=arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
Caution: Be careful not to set S3 path containing important files.
dbt-glue's test suite automatically deletes all the existing files under the S3 path specified in DBT_S3_LOCATION
.
$ python3 -m pytest tests/functional
or
$ python3 -m pytest -s
For more information, check the dbt documentation about testing a new adapter.
Most dbt Core functionality is supported, but some features are only available with Apache Hudi.
Apache Hudi-only features:
unique_key
instead of partition_by
(see merge
strategy)Some dbt features, available on the core adapters, are not yet supported on Glue:
For more information on dbt:
See CONTRIBUTING for more information.
This project is licensed under the Apache-2.0 License.
FAQs
dbt adapter for AWS Glue
We found that dbt-glue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.