Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
1.7.*
unique_key
only with the merge
strategyappend
strategyinsert_overwrite
and append
unique_key
pip install dbt-athena-community
pip install git+https://github.com/dbt-athena/dbt-athena.git
To start, you will need an S3 bucket, for instance my-bucket
and an Athena database:
CREATE DATABASE IF NOT EXISTS analytics_dev
COMMENT 'Analytics models generated by dbt (development)'
LOCATION 's3://my-bucket/'
WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com');
Notes:
us-west-2
or eu-west-2
, etc.).Credentials can be passed directly to the adapter, or they can
be determined automatically based
on aws cli
/boto3
conventions.
You can either:
aws_access_key_id
and aws_secret_access_key
aws_profile_name
to match a profile defined in your AWS credentials file.
Checkout dbt profile configuration below for details.A dbt profile can be configured to run against AWS Athena using the following configuration:
Option | Description | Required? | Example |
---|---|---|---|
s3_staging_dir | S3 location to store Athena query results and metadata | Required | s3://bucket/dbt/ |
s3_data_dir | Prefix for storing tables, if different from the connection's s3_staging_dir | Optional | s3://bucket2/dbt/ |
s3_data_naming | How to generate table paths in s3_data_dir | Optional | schema_table_unique |
s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's s3_data_dir | Optional | s3://bucket3/dbt/ |
region_name | AWS region of your Athena instance | Required | eu-west-1 |
schema | Specify the schema (Athena database) to build models into (lowercase only) | Required | dbt |
database | Specify the database (Data catalog) to build models into (lowercase only) | Required | awsdatacatalog |
poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | 5 |
debug_query_state | Flag if debug message with Athena query state is needed | Optional | false |
aws_access_key_id | Access key ID of the user performing requests | Optional | AKIAIOSFODNN7EXAMPLE |
aws_secret_access_key | Secret access key of the user performing requests | Optional | wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY |
aws_profile_name | Profile to use from your AWS shared credentials file | Optional | my-profile |
work_group | Identifier of Athena workgroup | Optional | my-custom-workgroup |
skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | true |
num_retries | Number of times to retry a failing query | Optional | 3 |
num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | 5 |
num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | 3 |
spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | my-spark-workgroup |
seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | {"ACL": "bucket-owner-full-control"} |
lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | tag_key: tag_value |
Example profiles.yml entry:
athena:
target: dev
outputs:
dev:
type: athena
s3_staging_dir: s3://athena-query-results/dbt/
s3_data_dir: s3://your_s3_bucket/dbt/
s3_data_naming: schema_table
s3_tmp_table_dir: s3://your_s3_bucket/temp/
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
threads: 4
aws_profile_name: my-profile
work_group: my-workgroup
spark_work_group: my-spark-workgroup
seed_s3_upload_args:
ACL: bucket-owner-full-control
threads
is supporteddatabase
and catalog
can be used interchangeablyexternal_location
(default=none
)
ha
set to truepartitioned_by
(default=none
)
bucketed_by
(default=none
)
bucket_count
(default=none
)
table_type
(default='hive'
)
hive
or iceberg
ha
(default=false
)
format
(default='parquet'
)
ORC
, PARQUET
, AVRO
, JSON
, TEXTFILE
write_compression
(default=none
)
field_delimiter
(default=none
)
TEXTFILE
table_properties
: table properties to add to the table, valid for Iceberg onlynative_drop
: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be
made to manage data in S3. Data in S3 will only be cleared up for Iceberg
tables see AWS docs. Note that
Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.seed_by_insert
(default=false
)
seed_by_insert
, as the SQL insert statement would
exceed the Athena limit of 262144 bytesforce_batch
(default=false
)
unique_tmp_table_suffix
(default=false
)
temp_schema
(default=none
)
lf_tags_config
(default=none
)
enabled
(default=False
) whether LF tags management is enabled for a modeltags
dictionary with tags and their values to assign for the modeltags_columns
dictionary with a tag key, value and list of columns they must be assigned tolf_inherited_tags
(default=none
)
lf_tags_config
lf_tags_config
is to be exhaustive and first remove any pre-existing tags from
tables and columns before associating the ones currently defined for a given model{{
config(
materialized='incremental',
incremental_strategy='append',
on_schema_change='append_new_columns',
table_type='iceberg',
schema='test_schema',
lf_tags_config={
'enabled': true,
'tags': {
'tag1': 'value1',
'tag2': 'value2'
},
'tags_columns': {
'tag1': {
'value1': ['column1', 'column2'],
'value2': ['column3', 'column4']
}
},
'inherited_tags': ['tag1', 'tag2']
}
)
}}
dbt_project.yml
: +lf_tags_config:
enabled: true
tags:
tag1: value1
tag2: value2
tags_columns:
tag1:
value1: [ column1, column2 ]
inherited_tags: [ tag1, tag2 ]
lf_grants
(default=none
)
lf_grants={
'data_cell_filters': {
'enabled': True | False,
'filters': {
'filter_name': {
'row_filter': '<filter_condition>',
'principals': ['principal_arn1', 'principal_arn2']
}
}
}
}
Notes:
lf_tags
andlf_tags_columns
configs support only attaching lf tags to corresponding resources. We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use terraform or aws cdk for such purpose.data_cell_filters
management can't be automated outside dbt because the filter can't be attached to the table which doesn't exist. Once youenable
this config, dbt will set all filters and their permissions during every dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and apply changes if they occur: drop, create, update filters and their permissions.- Any tags listed in
lf_inherited_tags
should be strictly inherited from the database level and never overridden at the table and column level
- Currently
dbt-athena
does not differentiate between an inherited tag association and an override of same it made previously- e.g. If an inherited tag is overridden by an
lf_tags_config
value in one DBT run, and that override is removed prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform where the inherited value is configured nor in the DBT project where the override previously existed but now is gone)
The location a table is saved to is determined by:
external_location
is defined, that value is useds3_data_dir
is defined, the path is determined by that and s3_data_naming
s3_data_dir
is not defined, data is stored under s3_staging_dir/tables/
Here all the options available for s3_data_naming
:
unique
: {s3_data_dir}/{uuid4()}/
table
: {s3_data_dir}/{table}/
table_unique
: {s3_data_dir}/{table}/{uuid4()}/
schema_table
: {s3_data_dir}/{schema}/{table}/
s3_data_naming=schema_table_unique
: {s3_data_dir}/{schema}/{table}/{uuid4()}/
It's possible to set the s3_data_naming
globally in the target profile, or overwrite the value in the table config,
or setting up the value for groups of model in dbt_project.yml.
Note: when using a workgroup with a default output location configured,
s3_data_naming
and any configured buckets are ignored and the location configured in the workgroup is used.
Support for incremental models.
These strategies are supported:
insert_overwrite
(default): The insert overwrite strategy deletes the overlapping partitions from the destination
table, and then inserts the new records from the source. This strategy depends on the partitioned_by
keyword! If no
partitions are defined, dbt will fall back to the append
strategy.append
: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate
data (e.g. great for log or historical data).merge
: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with unique_key
.
Only available when using Iceberg.on_schema_change
is an option to reflect changes of schema in incremental models.
The following options are supported:
ignore
(default)fail
append_new_columns
sync_all_columns
For details, please refer to dbt docs.
The adapter supports table materialization for Iceberg.
To get started just add this as your model:
{{ config(
materialized='table',
table_type='iceberg',
format='parquet',
partitioned_by=['bucket(user_id, 5)'],
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
}
) }}
select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date
Iceberg supports bucketing as hidden partitions, therefore use the partitioned_by
config to add specific bucketing
conditions.
Iceberg supports several table formats for data : PARQUET
, AVRO
and ORC
.
It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:
append
: New records are appended to the table, this can lead to duplicates.merge
: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only
available with Athena engine version 3.
unique_key
(required): columns that define a unique record in the source and target tables.incremental_predicates
(optional): SQL conditions that enable custom join clauses in the merge statement. This can
be useful for improving performance via predicate pushdown on the target table.delete_condition
(optional): SQL condition used to identify records that should be deleted.update_condition
(optional): SQL condition used to identify records that should be updated.insert_condition
(optional): SQL condition used to identify records that should be inserted.
incremental_predicates
, delete_condition
, update_condition
and insert_condition
can include any column of
the incremental table (src
) or the final table (target
).
Column names must be prefixed by either src
or target
to prevent a Column is ambiguous
error.delete_condition
example:
{{ config(
materialized='incremental',
table_type='iceberg',
incremental_strategy='merge',
unique_key='user_id',
incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
format='parquet'
) }}
select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date
update_condition
example:
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
update_condition='target.id > 1',
schema='sandbox'
)
}}
{% if is_incremental() %}
select * from (
values
(1, 'v1-updated')
, (2, 'v2-updated')
) as t (id, value)
{% else %}
select * from (
values
(-1, 'v-1')
, (0, 'v0')
, (1, 'v1')
, (2, 'v2')
) as t (id, value)
{% endif %}
insert_condition
example:
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
insert_condition='target.status != 0',
schema='sandbox'
)
}}
select * from (
values
(1, 0)
, (2, 1)
) as t (id, status)
The current implementation of the table materialization can lead to downtime, as the target table is
dropped and re-created. To have the less destructive behavior it's possible to use the ha
config on
your table
materialized models. It leverages the table versions feature of glue catalog, creating
a temp table and swapping the target table to the location of the temp table. This materialization
is only available for table_type=hive
and requires using unique locations. For iceberg, high
availability is the default.
{{ config(
materialized='table',
ha=true,
format='parquet',
table_type='hive',
partitioned_by=['status'],
s3_data_naming='table_unique'
) }}
select 'a' as user_id,
'pi' as user_name,
'active' as status
union all
select 'b' as user_id,
'sh' as user_name,
'disabled' as status
By default, the materialization keeps the last 4 table versions, you can change it by setting versions_to_keep
.
versions_to_keep
>= 4, as this will avoid having the older location removedOptionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as glue table properties and column parameters. By default, documentation persistence is disabled, but it can be enabled for specific resources or groups of resources as needed.
For example:
models:
- name: test_deduplicate
description: another value
config:
persist_docs:
relation: true
columns: true
meta:
test: value
columns:
- name: id
meta:
primary_key: true
See persist docs for more details.
The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot create a snapshot file in the snapshots directory. If the directory does not exist create one.
To use the timestamp strategy refer to the dbt docs
To use the check strategy refer to the dbt docs
The materialization also supports invalidating hard deletes. Check the docs to understand usage.
seed file - employent_indicators_november_2022_csv_tables.csv
Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,
model.sql
{{ config(
materialized='table'
) }}
select row_number() over() as id
, *
, cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}
timestamp strategy - model_snapshot_1
{% snapshot model_snapshot_1 %}
{{
config(
strategy='timestamp',
updated_at='refresh_timestamp',
unique_key='id'
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
invalidate hard deletes - model_snapshot_2
{% snapshot model_snapshot_2 %}
{{
config
(
unique_key='id',
strategy='timestamp',
updated_at='refresh_timestamp',
invalidate_hard_deletes=True,
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
check strategy - model_snapshot_3
{% snapshot model_snapshot_3 %}
{{
config
(
unique_key='id',
strategy='check',
check_cols=['series_reference','data_value']
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. The only way, from a dbt perspective, is to do a full-refresh of the incremental model.
Tables, schemas and database names should only be lowercase
In order to avoid potential conflicts, make sure dbt-athena-adapter
is not
installed in the target environment.
See https://github.com/dbt-athena/dbt-athena/issues/103 for more details.
Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history
The adapter implements AWS Lake Formation tags management in the following way:
It's important to understand the following points:
That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. You may find the following links useful to manage that:
The adapter supports Python models using spark
.
~/.dbt/profiles.yml
file and the profile to be used
is referenced in dbt_project.yml
timeout
(default=43200
)
spark_encryption
(default=false
)
spark_cross_account_catalog
(default=false
)
external_catalog_id/database.table
to access the external table on the external
catalog (ex: 999999999999/mydatabase.cloudfront_logs
where 999999999999 is the external catalog ID)spark_requester_pays
(default=false
)
threads
. The number of sessions created for the
entire run depends on the number of unique engine configurations and the availability of sessions to maintain
thread concurrency.table_properties
configuration to set the format_version
to 2.
This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark.import pandas as pd
def model(dbt, session):
dbt.config(materialized="table")
model_df = pd.DataFrame({"A": [1, 2, 3, 4]})
return model_df
def model(dbt, spark_session):
dbt.config(materialized="table")
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
def model(dbt, spark_session):
dbt.config(materialized="incremental")
df = dbt.ref("model")
if dbt.is_incremental:
max_from_this = (
f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
)
df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])
return df
def model(dbt, spark_session):
dbt.config(
materialized="table",
engine_config={
"CoordinatorDpuSize": 1,
"MaxConcurrentDpus": 3,
"DefaultExecutorDpuSize": 1
},
spark_encryption=True,
spark_cross_account_catalog=True,
spark_requester_pays=True
polling_interval=15,
timeout=120,
)
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
def model(dbt, spark_session):
dbt.config(
materialized="incremental",
incremental_strategy="merge",
unique_key="num",
)
sc = spark_session.sparkContext
sc.addPyFile("s3://athena-dbt/test/file1.py")
sc.addPyFile("s3://athena-dbt/test/file2.py")
def func(iterator):
from file2 import transform
return [transform(i) for i in iterator]
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
udf_with_import = udf(func)
data = [(1, "a"), (2, "b"), (3, "c")]
cols = ["num", "alpha"]
df = spark_session.createDataFrame(data, cols)
return df.withColumn("udf_test_col", udf_with_import(col("alpha")))
^[0-9a-zA-Z_]+$
. Dashes and special characters are not
supported by Spark, even though Athena supports them.The adapter partly supports contract definitions:
data_type
is supported but needs to be adjusted for complex types. Types must be specified
entirely (for instance array<int>
) even though they won't be checked. Indeed, as dbt recommends, we only compare
the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types
defined in Athena are ok (pre-flight check).See CONTRIBUTING for more information on how to contribute to this project.
Thanks goes to these wonderful people (emoji key):
Contributions of any kind welcome!
FAQs
The athena adapter plugin for dbt (data build tool)
We found that dbt-athena demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.