Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
django-cqrs
is an Django application, that implements CQRS data synchronisation between several Django microservices.
In Connect we have a rather complex Domain Model. There are many microservices, that are decomposed by subdomain and which follow database-per-service pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually API Composition is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.
The pattern, that solves this issue is called CQRS - Command Query Responsibility Segregation. Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to Domain events published by the service that owns the data. Data is eventually consistent and that's okay for non-critical business transactions.
Full documentation is available at https://django-cqrs.readthedocs.org.
You can find an example project here
RabbitMQ
django-cqrs
# models.py
from django.db import models
from dj_cqrs.mixins import MasterMixin, RawMasterMixin
class Account(MasterMixin, models.Model):
CQRS_ID = 'account'
CQRS_PRODUCE = True # set this to False to prevent sending instances to Transport
class Author(MasterMixin, models.Model):
CQRS_ID = 'author'
CQRS_SERIALIZER = 'app.api.AuthorSerializer'
# For cases of Diamond Multi-inheritance or in case of Proxy Django-models the following approach could be used:
from mptt.models import MPTTModel
from dj_cqrs.metas import MasterMeta
class ComplexInheritanceModel(MPTTModel, RawMasterMixin):
CQRS_ID = 'diamond'
class BaseModel(RawMasterMixin):
CQRS_ID = 'base'
class ProxyModel(BaseModel):
class Meta:
proxy = True
MasterMeta.register(ComplexInheritanceModel)
MasterMeta.register(BaseModel)
# settings.py
CQRS = {
'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
'host': RABBITMQ_HOST,
'port': RABBITMQ_PORT,
'user': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
}
from django.db import models
from dj_cqrs.mixins import ReplicaMixin
class AccountRef(ReplicaMixin, models.Model):
CQRS_ID = 'account'
id = models.IntegerField(primary_key=True)
class AuthorRef(ReplicaMixin, models.Model):
CQRS_ID = 'author'
CQRS_CUSTOM_SERIALIZATION = True
@classmethod
def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
# Override here
pass
def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
# Override here
pass
# settings.py
CQRS = {
'transport': 'dj_cqrs.transport.RabbitMQTransport',
'queue': 'account_replica',
'host': RABBITMQ_HOST,
'port': RABBITMQ_PORT,
'user': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
}
python manage.py cqrs_consume -w 2
update_fields
doesn't trigger CQRS logic, but it can be overridden for the whole application in settings:settings.CQRS = {
...
'master': {
'CQRS_AUTO_UPDATE_FIELDS': True,
},
...
}
or a special flag can be used in each place, where it's required to trigger CQRS flow:
instance.save(update_fields=['name'], update_cqrs_fields=True)
is_sync_instance
to set filtering rule.
It's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.Example:
class FilteredSimplestModel(MasterMixin, models.Model):
CQRS_ID = 'filter'
name = models.CharField(max_length=200)
def is_sync_instance(self):
return len(str(self.name)) > 2
Add action to synchronize master items from Django Admin page.
from django.db import models
from django.contrib import admin
from dj_cqrs.admin_mixins import CQRSAdminMasterSyncMixin
class AccountAdmin(CQRSAdminMasterSyncMixin, admin.ModelAdmin):
...
admin.site.register(models.Account, AccountAdmin)
_cqrs_sync_queryset
from CQRSAdminMasterSyncMixin
to adjust the QuerySet and use it for synchronization.Bulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.
python manage.py cqrs_bulk_dump --cqrs-id=author
-> author.dump
python manage.py cqrs_bulk_load -i=author.dump
Filter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.
python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica
Set of diff synchronization tools:
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author |
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_diff_replica --cqrs-id=author |
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_deleted_diff_master |
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_sync_replica
requirements/dev.txt
isort
library to order and format our imports, and black
- to format the code.
We check it using flake8-isort
and flake8-black
libraries (automatically on flake8
run).isort . && black .
to format the code.requirements/test.txt
export PYTHONPATH=/your/path/to/django-cqrs/
Run tests with various RDBMS:
cd integration_tests
DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test
DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test
Check code style: flake8
Run tests: pytest
Tests reports are generated in tests/reports
.
out.xml
- JUnit test resultscoverage.xml
- Coverage xml resultsTo generate HTML coverage reports use:
--cov-report html:tests/reports/cov_html
cd integration_tests
docker compose run master
FAQs
Django CQRS data synchronisation
We found that django-cqrs demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.