Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

syngenta-digital-dta

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

syngenta-digital-dta

A DRY multi-database normalizer.

  • 0.1.14
  • PyPI
  • Socket score

Maintainers
1

Syngenta Digital DTA (Database Adapter)

A DRY multi-database normalizer.

Features

  • Use the same package with multiple database engines
  • Able to validate your data against predefined schema in code (for no-schemas solutions)
  • Creates easy pub-sub architecture based on model changes
  • Local development support

Philosophy

The dta philosophy is to use one pattern with multiple databases

The dta encourages pub-sub architecture by allowing for the automatic publishing of data over SNS.

Installation

This is a python module available through the pypi registry.

Before installing, download and install python. python 3 or higher is required.

Installation is done using the pip install command:

$ pip install syngenta_digital_dta

or

$ pipenv install syngenta_digital_dta

Common Usage: DynamoDB

import os
import syngenta_digital_dta

adapter = syngenta_digital_dta.adapter(
    engine='dynamodb',
    table=os.getenv('DYNAMODB_TABLE'),
    endpoint='http://localhost:4000',
    model_schema='v1-table-model',
    model_schema_file='application/openapi.yml',
    model_identifier='test_id',
    model_version_key='modified'
)

Initialize Options

Option NameRequiredTypeDescription
enginetruestringname of supported db engine (dynamodb)
tabletruestringname of dynamodb table
endpointfalsestringurl of the dynamodb table (useful for local development)
model_schematruestringkey of openapi schema this is being set against
model_schema_filetruestringpath where your schema file can found (accepts JSON as well)
model_identifiertruestringunique identifier key on the model
model_version_keytruestringkey that can be used as a version key (modified timestamps often suffice)
author_identifierfalsestringunique identifier of the author who made the change (optional)
sns_arnfalsestringsns topic arn you want to broadcast the changes to
sns_attributesfalsedictcustom attributes in dict format; values should only be strings or numbers
sns_default_attributesfalsebooleandetermines if default sns attributes are included in sns message (model_identifier, model_version_key, model_schema, author_identifier)

Examples

DynamoDB Create

result = adapter.create(
	operation='insert', # or overwrite (optional); defaults to insert
	data=some_dict_to_insert_into_the_table,
)

result = adapter.insert(data=some_dict_to_insert_into_the_table) # alias

DynamoDB Read

result = adapter.read(
    operation='get', # or query or scan (optional); defaults to get
	query={
	   'Key': {
	        'example_id': '3'
	   }
    }
)

result = adapter.get(
	query={
	   'Key': {
	        'example_id': '3'
	   }
    }
)

results = adapter.read(
    operation='query',
    query={
        'IndexName': 'test_query_id',
        'Limit': 1,
        'KeyConditionExpression': 'test_query_id = :test_query_id',
        'ExpressionAttributeValues': {
            ':test_query_id': 'def345'
        }
    }
)

DynamoDB Update

result = adapter.update(
	data=some_dict_to_update_the_model,
	operation='get',
	query={
	   'Key': {
	        'example_id': '3'
	   }
    }
)

DynamoDB Delete

result = adapter.delete(
	query={
	   'Key': {
	        'example_id': '3'
	   }
    }
)

Common Usage: Postgres & Redshift

import os
import syngenta_digital_dta

adapter = syngenta_digital_dta.adapter(
    engine='postgres', # or redshift
    table='users',
    endpoint='localhost',
    database='dta',
    port=5439, # 5432 for redshift
    user=os.getenv('POSTGRES_USER'),
    password=os.getenv('POSTGRES_PASSWORD'),
    model_schema='test-postgres-user-model',
    model_schema_file='tests/openapi.yml',
    model_identifier='user_id',
    model_version_key='modified',
    relationships={
        'addresses': 'user_id'
    }
)

Initialize Options

Option NameRequiredTypeDescription
enginetruestringname of supported db engine (dynamodb)
tabletruestringname of postgres table to work as primary query point
endpointtruestringurl of the postgres cluster
databasetruestringname of the database to connect to
porttrueintport of database (defaults to 5439)
usertruestringusername for database access
passwordtruestringpassword for database access
model_schematruestringkey of openapi schema this is being set against
model_schema_filetruestringpath where your schema file can found (accepts JSON as well)
model_identifiertruestringunique identifier key on the model
model_version_keytruestringkey that can be used as a version key (modified timestamps often suffice)
autocommitfalsebooleanwill commit transactions automatically without direct call
relationshipsfalsedictkey is the table with the relationship and value is the foreign key on that table (assumes your primary key name is equal to that table's foreign key)
author_identifierfalsestringunique identifier of the author who made the change (optional)
sns_arnfalsestringsns topic arn you want to broadcast the changes to
sns_attributesfalsedictcustom attributes in dict format; values should only be strings or numbers
sns_default_attributesfalsebooleandetermines if default sns attributes are included in sns message (model_identifier, model_version_key, model_schema, author_identifier)[default: true]

Examples

Postgres/Reshift Connect

# will always pool and share connections
self.user_adapter.connect()

Postgres/Reshift Create

data = {
    'user_id': str(uuid.uuid4()),
    'email': 'somen.user@some-email.com',
    'first': 'Some',
    'last': 'User'
}
result = self.user_adapter.create(data=data, commit=True)
result = self.user_adapter.insert(data=data, commit=True) # alias

Postgres/Reshift Update

data = {
    'user_id': 'some-update-guid',
    'email': 'somen.user@some-email.com',
    'first': 'Some',
    'last': 'User'
}
result = self.user_adapter.update(data=data, commit=True)

Postgres/Reshift Upsert

data = {
    'user_id': 'some-update-guid',
    'email': 'somen.user@some-email.com',
    'first': 'Some',
    'last': 'User'
}
result = self.user_adapter.upsert(data=data, commit=True)

Postgres/Reshift Delete

self.user_adapter.delete('some-delete-guid', commit=True)

Postgres/Reshift Read

# will only return 1 row or None
result = self.user_adapter.read('some-read-guid')
result = self.user_adapter.get('some-read-guid') # alias

# all fields optional (defaults to SELECT * FROM {table} ORDER BY {model_identifier} ASC LIMIT 1000)
results = self.user_adapter.read_all(
    where={
        'first': 'first',
        'last': 'last',
    },
    limit=2,
    offset=1,
    orderby_column='first',
    orderby_sort='DESC'
)

# limited to get 1 relationship at a time
results = self.user_adapter.get_relationship('addresses', where={'user_id': 'some-user-relationship-guid'})

# only will allow read-only operations
# query and params are required; params can be empty dict
results = self.user_adapter.query(
    query='SELECT * FROM users WHERE user_id = %(identifier_value)s',
    params={
        'identifier_value':'some-query-relationship-guid'
    }
)

Common Usage: Elasticsearch

import os
import syngenta_digital_dta

# localhost connection
adapter = syngenta_digital_dta.adapter(
    engine='elasticsearch',
    index='users',
    endpoint='localhost',
    model_schema='test-elasticsearch-user-model',
    model_schema_file='tests/openapi.yml',
    model_identifier='user_id',
    model_version_key='modified'
)

# lambda connection (assumes lambda role has access)
adapter = syngenta_digital_dta.adapter(
    engine='elasticsearch',
    index='users',
    endpoint='localhost',
    model_schema='test-elasticsearch-user-model',
    model_schema_file='tests/openapi.yml',
    model_identifier='user_id',
    model_version_key='modified',
    authentication='lambda'
)

# traditional user password connection
adapter = syngenta_digital_dta.adapter(
    engine='elasticsearch',
    index='users',
    endpoint='localhost',
    model_schema='test-elasticsearch-user-model',
    model_schema_file='tests/openapi.yml',
    model_identifier='user_id',
    model_version_key='modified',
    authentication='user-password',
    user='root',
    password='root'
)

Initialize Options

Option NameRequiredTypeDescription
enginetruestringname of supported db engine (dynamodb)
indextruestringname of postgres table to work as primary query point
endpointtruestringurl of the postgres cluster
model_schematruestringkey of openapi schema this is being set against
model_schema_filetruestringpath where your schema file can found (accepts JSON as well)
model_identifiertruestringunique identifier key on the model
model_version_keytruestringkey that can be used as a version key (modified timestamps often suffice)
portfalseintport of database (defaults to 9200 if localhost or 443 if not)
author_identifierfalsestringunique identifier of the author who made the change (optional)
authenticationfalsestringeither 'lamnbda' or 'user-password'
userfalsestringonly needed if authentication is user-password
passwordfalsestringonly needed if authentication is user-password
sns_arnfalsestringsns topic arn you want to broadcast the changes to
sns_attributesfalsedictcustom attributes in dict format; values should only be strings or numbers
sns_default_attributesfalsebooleandetermines if default sns attributes are included in sns message (model_identifier, model_version_key, model_schema, author_identifier) [default: true]

Elasticsearch Connection

# elasticsearch is auto-connected to a shared connection; use this to test that connection
self.adapter.connection.ping()

Elasticsearch Set-up

# will convert openapi schema, defined in init, to a mapping
self.adapter.create_template(
    name='users',
    index_patterns='users-*',
    special={'phone': 'keyword'} # (optional) can send mapping of special types otherwise will default based on schema type
)

# will use sensible defaults or you can pass in a custom settings kwargs['settings']
self.adapter.create_index(settings=some_optional_settings)

OpenAPI Default Conversion

OpenAPI TypeElasticsearch Mapping
arraynone (not needed to be included)
array of objectsnested
booleanboolean
integerinteger
numberlong
object (with properties)object
object (without properties)flattened
string (no format)text
string (format date)date (with iso date format acceptance)
string (format date-time)date (with iso date format acceptance)
string (format email)text (with url email analyzer)
string (format ip)ip
string (format hostname)text (with url email analyzer)
string (format iri)text (with url email analyzer)
string (format url)text (with url email analyzer)

Elasticsearch Create

data = {
    'user_id': uuid.uuid4().hex,
    'email': 'somen.user@some-email.com',
    'first': 'Some',
    'last': 'User',
    'phone': 1112224444
}
self.adapter.create(data=data, refresh=True) # (optional) refresh defaults to True

Elasticsearch Update

updated_data = {
    'user_id': user_id,
    'email': 'peter.cruse@some-email.com',
    'first': 'Peter'
}
self.adapter.update(data=updated_data, refresh=True) # (optional) refresh defaults to True

Elasticsearch Upsert

data = {
    'user_id': upsert_id,
    'email': 'somen.user-upsert@some-email.com',
    'first': 'Some',
    'last': 'User',
    'phone': 1112224444
}
self.adapter.upsert(data=data, refresh=True) # (optional) refresh defaults to True

Elasticsearch Delete

self.adapter.delete(delete_id, refresh=True) # (optional) refresh defaults to True

Elasticsearch Read

response = self.adapter.get(get_id)

# returns single dictionary mapped to openapi model defined in init (or empty dict)
dict_response = self.adapter.get(get_id, normalize=True) # (optional) normalize defaults to False

# returns list of dictionaries mapped to openapi model defined in init (or empty array)
list_response = self.adapter.query(
    normalize=True, # (optional) normalize defaults to False
    query={
        'match': {
            'first': 'Normal'
        }
    }
)

Common Usage: S3

adapter = syngenta_digital_dta.adapter(
    engine='s3',
    endpoint=self.endpoint,
    bucket=self.bucket
)

Initialize Options

Option NameRequiredTypeDescription
enginetruestringname of supported db engine (s3)
buckettruestringname of bucket you are interfacing with
endpointtruestringurl of the s3 endpoint (useful for local development)
sns_arnfalsestringsns topic arn you want to broadcast the changes to
sns_attributesfalsedictcustom attributes in dict format; values should only be strings or numbers

NOTE: If you use the SNS functionality, all SNS messages are sent presigned urls for S3, not the actual data itself given the SNS message size limitations. Below is an an example payload:

{
    "presigned_url": "https://some-s3-url"
}

S3 Create (Single)

# automatically converts dicts to json with flag
adapter.create(
    s3_path='test/test-create.json',
    data={'test': True},
    json=True
)

S3 Create (Multipart)

file = open('./tests/mock/example.json')
chunks = []
for piece in iter(file.read(6000000), ''):
    chunks.append(piece)
adapter.multipart_upload(chunks=chunks, s3_path='test/test-create.json')

S3 Create (Stream)

url = 'https://github.com/syngenta-digital/package-python-dta/archive/refs/heads/master.zip'
response = requests.get(url, stream=True)
self.adapter.upload_stream(data=response.content, s3_path='test/code-clone.zip')

S3 Create (Pre-Signed UPLOAD URLs)

presigned_upload_url = adapter.create_presigned_post_url(s3_path='test/test-create.json', expiration=3600)

S3 Read (In Memory)

# automatically converts json to dict with flag
result = adapter.read(
    s3_path='test/test-create.json',
    json=True
)

S3 Read (Download to Disk)

# automatically creates directory and child directories
file_path = adapter.download(s3_path='test/test-create.json', download_path='/tmp/unit-test-download/test.json')

S3 Read (Pre-Signed URLs)

presigned_url = adapter.create_presigned_read_url(s3_path='test/test-create.json', expiration=3600)

S3 Delete

adapter.delete(s3_path='test/test-create.json')

Common Usage: Mongo

import os
import syngenta_digital_dta

adapter = syngenta_digital_dta.adapter(
    engine='mongo',
    endpoint=os.getenv('MONGO_URI'),
    database=os.getenv('MONGO_DB'),
    collection=os.getenv('MONGO_COLLECTION'),
    user=os.getenv('MONGO_USER'),
    password=os.getenv('MONGO_PASSWORD'),
    model_schema='v1-collection-model',
    model_schema_file='application/openapi.yml',
    model_identifier='test_id',
    model_version_key='modified'
)

Initialize Options

Option NameRequiredTypeDescription
enginetruestringname of supported db engine (mongo)
endpointtruestringuri of the mongo database (ex: mongodb://localhost:27017/)
databasetruestringname of mongo database
collectiontruestringname of mongo collection
usertruestringname of mongo authentication user
passwordtruestringname of mongo authentication password
model_schematruestringkey of openapi schema this is being set against
model_schema_filetruestringpath where your schema file can found (accepts JSON as well)
model_identifiertruestringunique identifier key on the model
model_version_keytruestringkey that can be used as a version key (modified timestamps often suffice)
sns_arnfalsestringsns topic arn you want to broadcast the changes to
sns_attributesfalsedictcustom attributes in dict format; values should only be strings or numbers
sns_default_attributesfalsebooleandetermines if default sns attributes are included in sns message (model_identifier, model_version_key, model_schema, author_identifier)

Examples

Mongo Create

result = adapter.create(data=data) # must be a unique model_identifier
print(result) # will contain a new key (_id)

Mongo Read

result = adapter.read(
    operation='get',
    query={'test_id': data['test_id']}
)
print(result) # will contain a new key (_id)

Mongo Read Many

results = adapter.read(
    operation='query',
    query={'test_id': data['test_id']}
)
print(results) # will be array of dicts with contain a new key (_id)

Mongo Query

pipeline = [
    {'$unwind': '$tags'},
    {'$group': {'_id': '$tags', 'count': {'$sum': 1}}}
]
results = adapter.read(
    operation='aggregate', # able to dynamically call a variety of read-only operations; see list below
    query=pipeline
)
# available operations:
# - find
# - find_one
# - aggregate
# - aggregate_raw_batches
# - find_raw_batches
# - count_documents
# - estimated_document_count
# - distinct
# - list_indexes

Mongo Update

# will throw an error if document doesn't exists
result = self.adapter.update(query={'test_id': data['test_id']}, data=data, update_list_operation='replace')

# available update_list_operation:
# - add (adds items to list) [default]
# - remove (removes item from list, if duplicate)
# - replace (replace the entire list)

Mongo Delete

self.adapter.delete(query={'test_id': data['test_id']})

Contributing

If you would like to contribute please make sure to follow the established patterns and unit test your code:

Local Unit Testing

Requires two tabs to test (uses docker containers for localized db versions)

  • In one tab, run pipenv run local
  • In a second tab, run pipenv run test

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc