Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

bomf

Package Overview
Dependencies
Maintainers
2
Versions
80
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bomf - npm Package Compare versions

Comparing version
0.11.1
to
0.11.2rc3
+2
domain-specific-terms.txt
# contains 1 lower case word per line which are ignored in the spell_check
adresse
version = "0.11.2rc3"
+1
-1

@@ -25,5 +25,5 @@ name: "Black"

python -m pip install --upgrade pip
pip install -r ./dev_requirements/requirements-formatting.txt
pip install .[formatting]
- name: Black Code Formatter
run: |
black . --check

@@ -38,3 +38,7 @@ # This GitHub workflow is only needed for python package releases which are supposed to be published on pypi.

name: Build and publish Python 🐍 distributions 📦 to PyPI and TestPyPI
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: [ "3.12" ]
os: [ ubuntu-latest ]
# Specifying a GitHub environment, # Specifying a GitHub environment, which is strongly recommended by PyPI: https://docs.pypi.org/trusted-publishers/adding-a-publisher/

@@ -49,3 +53,3 @@ # you have to create an environment in your repository settings and add the environment name here

- uses: actions/checkout@v4
- name: Set up Python
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5

@@ -57,3 +61,3 @@ with:

python -m pip install --upgrade pip
pip install -r dev_requirements/requirements-test_packaging.txt
pip install .[test_packaging]
- name: Build wheel and source distributions

@@ -60,0 +64,0 @@ run: |

+42
-21

@@ -1,31 +0,52 @@

Metadata-Version: 2.1
Metadata-Version: 2.3
Name: bomf
Version: 0.11.1
Version: 0.11.2rc3
Summary: BO4E Migration Framework
Home-page: https://github.com/Hochfrequenz/bo4e_migration_framework
Author: Hochfrequenz Unternehmensberatung GmbH
Author-email: info@hochfrequenz.de
License: mit
Project-URL: Documentation, https://github.com/Hochfrequenz/bo4e_migration_framework
Project-URL: Code, https://github.com/Hochfrequenz/bo4e_migration_framework
Project-URL: Bug tracker, https://github.com/Hochfrequenz/bo4e_migration_framework/issues
Platform: any
Project-URL: Changelog, https://github.com/Hochfrequenz/bo4e_migration_framework/releases
Project-URL: Homepage, https://github.com/Hochfrequenz/bo4e_migration_framework
Author-email: Hochfrequenz Unternehmensberatung GmbH <info@hochfrequenz.de>
License: MIT
License-File: LICENSE
Keywords: BO4E,Data,Migration
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Console
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8
License-File: LICENSE
Requires-Dist: pydantic>=2.0.0
Requires-Dist: typeguard>=4.0.1
Requires-Python: >=3.11
Requires-Dist: bidict
Requires-Dist: frozendict
Requires-Dist: bidict
Requires-Dist: injector
Requires-Dist: networkx
Requires-Dist: injector
Requires-Dist: pvframework
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-generics
Requires-Dist: pvframework
Requires-Dist: typeguard
Provides-Extra: coverage
Requires-Dist: coverage==7.6.1; extra == 'coverage'
Provides-Extra: dev
Requires-Dist: pip-tools; extra == 'dev'
Provides-Extra: formatting
Requires-Dist: black==24.8.0; extra == 'formatting'
Requires-Dist: isort==5.13.2; extra == 'formatting'
Provides-Extra: linting
Requires-Dist: pylint==3.2.7; extra == 'linting'
Provides-Extra: spellcheck
Requires-Dist: codespell==2.3.0; extra == 'spellcheck'
Provides-Extra: test-packaging
Requires-Dist: build==1.2.2; extra == 'test-packaging'
Requires-Dist: twine==5.1.1; extra == 'test-packaging'
Provides-Extra: tests
Requires-Dist: bo4e==202401.2.1; extra == 'tests'
Requires-Dist: pytest-asyncio==0.24.0; extra == 'tests'
Requires-Dist: pytest==8.3.3; extra == 'tests'
Provides-Extra: type-check
Requires-Dist: mypy==1.11.2; extra == 'type-check'
Requires-Dist: networkx-stubs==0.0.1; extra == 'type-check'
Requires-Dist: types-frozendict==2.0.9; extra == 'type-check'
Description-Content-Type: text/markdown

@@ -55,3 +76,3 @@ # BO4E Migration Framework (bomf)

## Architeture / Overview
## Architecture / Overview
The overall setup for a migration from 1-n source systems (A, B, C...) to 1-m target systems (1,2, 3...) might look like this:

@@ -58,0 +79,0 @@

@@ -0,1 +1,68 @@

[project]
name = "bomf"
description = "BO4E Migration Framework"
license = { text = "MIT" }
requires-python = ">=3.11"
authors = [{ name = "Hochfrequenz Unternehmensberatung GmbH", email = "info@hochfrequenz.de" }]
keywords = ["BO4E", "Migration", "Data"]
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"pydantic>=2.0.0",
"typeguard",
"frozendict",
"bidict",
"networkx",
"injector",
"python-generics",
"pvframework"
] # add all the dependencies here
dynamic = ["readme", "version"]
[project.optional-dependencies]
tests = [
"pytest==8.3.3",
"pytest-asyncio==0.24.0",
"bo4e==202401.2.1"
]
linting = [
"pylint==3.2.7"
]
type_check = [
"mypy==1.11.2",
"networkx-stubs==0.0.1",
"types-frozendict==2.0.9"
]
spellcheck = [
"codespell==2.3.0"
]
coverage = [
"coverage==7.6.1"
]
formatting = [
"black==24.8.0",
"isort==5.13.2"
]
test_packaging = [
"build==1.2.2",
"twine==5.1.1"
]
dev = [
"pip-tools"
]
[project.urls]
Changelog = "https://github.com/Hochfrequenz/bo4e_migration_framework/releases"
Homepage = "https://github.com/Hochfrequenz/bo4e_migration_framework"
[tool.black]

@@ -12,2 +79,8 @@ line-length = 120

[mypy]
truethy-bool = true
[tool.mypy]
disable_error_code = []
[tool.pytest.ini_options]

@@ -22,5 +95,24 @@ # When the mode is auto, all discovered async tests are considered asyncio-driven

[build-system]
requires = ["setuptools>=41.0", "wheel", "setuptools_scm[toml]>=3.4"]
build-backend = "setuptools.build_meta"
requires = ["hatchling>=1.8.0", "hatch-vcs", "hatch-fancy-pypi-readme"]
build-backend = "hatchling.build"
[tool.setuptools_scm]
[tool.hatch.metadata.hooks.fancy-pypi-readme]
content-type = "text/markdown"
fragments = [{ path = "README.md" }]
[tool.hatch.version]
source = "vcs"
[tool.hatch.build.hooks.vcs]
version-file = "src/_bomf_version.py"
template = '''
version = "{version}"
'''
[tool.hatch.build.targets.sdist]
exclude = ["/unittests"]
[tool.hatch.build.targets.wheel]
only-include = ["src"]
sources = ["src"]

@@ -24,3 +24,3 @@ # BO4E Migration Framework (bomf)

## Architeture / Overview
## Architecture / Overview
The overall setup for a migration from 1-n source systems (A, B, C...) to 1-m target systems (1,2, 3...) might look like this:

@@ -27,0 +27,0 @@

#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:

@@ -39,2 +39,1 @@ #

# pydantic-core
# typeguard

@@ -111,3 +111,3 @@ """

"""
a harcoded filter filters on a hardcoded list of allowed/blocked values (formerly known as white- and blacklist)
a hardcoded filter filters on a hardcoded list of allowed/blocked values (formerly known as white- and blacklist)
"""

@@ -114,0 +114,0 @@

+17
-8

@@ -7,2 +7,3 @@ [tox]

type_check
spellcheck
skip_missing_interpreters = True

@@ -18,3 +19,3 @@ skipsdist = True

-r requirements.txt
-r dev_requirements/requirements-tests.txt
.[tests]
setenv = PYTHONPATH = {toxinidir}/src

@@ -27,3 +28,3 @@ commands = python -m pytest --basetemp={envtmpdir} {posargs}

-r requirements.txt
-r dev_requirements/requirements-linting.txt
.[linting]
# add your fixtures like e.g. pytest_datafiles here

@@ -41,3 +42,3 @@ setenv = PYTHONPATH = {toxinidir}/src

{[testenv:tests]deps}
-r dev_requirements/requirements-type_check.txt
.[type_check]
commands =

@@ -48,2 +49,12 @@ mypy --show-error-codes --check-untyped-defs src/bomf

[testenv:spellcheck]
# the spellcheck environment checks the code for typos
setenv = PYTHONPATH = {toxinidir}/src
deps =
{[testenv:tests]deps}
.[spellcheck]
commands =
codespell --ignore-words=domain-specific-terms.txt
# add single files (ending with .py) or packages here
[testenv:coverage]

@@ -54,3 +65,3 @@ # the coverage environment is called by the Github Action that runs the coverage measurement

-r requirements.txt
-r dev_requirements/requirements-coverage.txt
.[coverage]
setenv = PYTHONPATH = {toxinidir}/src

@@ -66,3 +77,3 @@ commands =

-r requirements.txt
-r dev_requirements/requirements-test_packaging.txt
.[test_packaging]
commands =

@@ -79,9 +90,7 @@ python -m build

{[testenv:coverage]deps}
-r dev_requirements/requirements-formatting.txt
pip-tools
.[formatting]
pre-commit
commands =
python -m pip install --upgrade pip
pip-compile requirements.in
pip install -r requirements.txt
pre-commit install
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile requirements-coverage.in
#
coverage==7.6.1
# via -r dev_requirements/requirements-coverage.in
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile '.\dev_requirements\requirements-formatting.in'
#
black==24.8.0
# via -r dev_requirements/requirements-formatting.in
click==8.1.7
# via black
isort==5.13.2
# via -r dev_requirements/requirements-formatting.in
mypy-extensions==1.0.0
# via black
packaging==23.2
# via black
pathspec==0.12.1
# via black
platformdirs==4.2.0
# via black
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile requirements-linting.in
#
astroid==3.2.4
# via pylint
dill==0.3.7
# via pylint
isort==5.13.2
# via pylint
mccabe==0.7.0
# via pylint
platformdirs==4.1.0
# via pylint
pylint==3.2.7
# via -r dev_requirements/requirements-linting.in
tomlkit==0.12.3
# via pylint
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile dev_requirements/requirements-test_packaging.in
#
build==1.2.2
# via -r dev_requirements/requirements-test_packaging.in
certifi==2024.2.2
# via requests
cffi==1.16.0
# via cryptography
charset-normalizer==3.3.2
# via requests
cryptography==42.0.5
# via secretstorage
docutils==0.20.1
# via readme-renderer
idna==3.6
# via requests
importlib-metadata==7.1.0
# via
# keyring
# twine
jaraco-classes==3.3.1
# via keyring
jaraco-context==4.3.0
# via keyring
jaraco-functools==4.0.0
# via keyring
jeepney==0.8.0
# via
# keyring
# secretstorage
keyring==25.0.0
# via twine
markdown-it-py==3.0.0
# via rich
mdurl==0.1.2
# via markdown-it-py
more-itertools==10.2.0
# via
# jaraco-classes
# jaraco-functools
nh3==0.2.15
# via readme-renderer
packaging==24.0
# via build
pkginfo==1.10.0
# via twine
pycparser==2.21
# via cffi
pygments==2.17.2
# via
# readme-renderer
# rich
pyproject-hooks==1.0.0
# via build
readme-renderer==43.0
# via twine
requests==2.31.0
# via
# requests-toolbelt
# twine
requests-toolbelt==1.0.0
# via twine
rfc3986==2.0.0
# via twine
rich==13.7.1
# via twine
secretstorage==3.3.3
# via keyring
twine==5.1.1
# via -r dev_requirements/requirements-test_packaging.in
urllib3==2.2.1
# via
# requests
# twine
zipp==3.18.1
# via importlib-metadata
pytest
pytest-asyncio
bo4e==202401.2.1
# we're using this pinned version of bo4e because there's no version which contains this fix:
# https://github.com/bo4e/BO4E-python/commit/3fd45539edd7d5bfdca990775244bf31007a5ca7
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile requirements-tests.in
#
annotated-types==0.6.0
# via pydantic
bo4e==202401.2.1
# via -r dev_requirements/requirements-tests.in
iniconfig==2.0.0
# via pytest
iso3166==2.1.1
# via bo4e
packaging==23.2
# via pytest
pluggy==1.5.0
# via pytest
pydantic==2.9.2
# via bo4e
pydantic-core==2.23.4
# via pydantic
pyhumps==3.8.0
# via bo4e
pytest==8.3.3
# via
# -r dev_requirements/requirements-tests.in
# pytest-asyncio
pytest-asyncio==0.24.0
# via -r dev_requirements/requirements-tests.in
typing-extensions==4.9.0
# via
# pydantic
# pydantic-core
mypy
types-frozendict
networkx-stubs
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile requirements-type_check.in
#
mypy==1.11.2
# via -r dev_requirements/requirements-type_check.in
mypy-extensions==1.0.0
# via mypy
networkx==3.3
# via networkx-stubs
networkx-stubs==0.0.1
# via -r dev_requirements/requirements-type_check.in
types-frozendict==2.0.9
# via -r dev_requirements/requirements-type_check.in
typing-extensions==4.9.0
# via mypy
[metadata]
name = bomf
author = Hochfrequenz Unternehmensberatung GmbH
author_email = info@hochfrequenz.de
description = BO4E Migration Framework
long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8
url = https://github.com/Hochfrequenz/bo4e_migration_framework
project_urls =
Documentation = https://github.com/Hochfrequenz/bo4e_migration_framework
Code = https://github.com/Hochfrequenz/bo4e_migration_framework
Bug tracker = https://github.com/Hochfrequenz/bo4e_migration_framework/issues
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
Programming Language :: Python
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
Operating System :: OS Independent
license = mit
platforms = any
[options]
package_dir =
= src
packages = find:
zip_safe = False
include_package_data = True
python_requires = >=3.10
install_requires =
pydantic>=2.0.0
typeguard>=4.0.1
frozendict
bidict
networkx
injector
python-generics
pvframework
[options.packages.find]
where = src
exclude =
unittests
[options.package_data]
* = py.typed
[egg_info]
tag_build =
tag_date = 0

Sorry, the diff of this file is not supported yet

Metadata-Version: 2.1
Name: bomf
Version: 0.11.1
Summary: BO4E Migration Framework
Home-page: https://github.com/Hochfrequenz/bo4e_migration_framework
Author: Hochfrequenz Unternehmensberatung GmbH
Author-email: info@hochfrequenz.de
License: mit
Project-URL: Documentation, https://github.com/Hochfrequenz/bo4e_migration_framework
Project-URL: Code, https://github.com/Hochfrequenz/bo4e_migration_framework
Project-URL: Bug tracker, https://github.com/Hochfrequenz/bo4e_migration_framework/issues
Platform: any
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8
License-File: LICENSE
Requires-Dist: pydantic>=2.0.0
Requires-Dist: typeguard>=4.0.1
Requires-Dist: frozendict
Requires-Dist: bidict
Requires-Dist: networkx
Requires-Dist: injector
Requires-Dist: python-generics
Requires-Dist: pvframework
# BO4E Migration Framework (bomf)
BOMF is the BO4E Migration Framework.
This repository contains the code of the Python package [bomf](https://pypi.org/project/bomf).
![Unittests status badge](https://github.com/Hochfrequenz/bo4e_migration_framework/workflows/Unittests/badge.svg)
![Coverage status badge](https://github.com/Hochfrequenz/bo4e_migration_framework/workflows/Coverage/badge.svg)
![Linting status badge](https://github.com/Hochfrequenz/bo4e_migration_framework/workflows/Linting/badge.svg)
![Black status badge](https://github.com/Hochfrequenz/bo4e_migration_framework/workflows/Black/badge.svg)
![PyPi Status Badge](https://img.shields.io/pypi/v/bomf)
## Rationale
bomf is a framework, that allows its users to migrate data
- from source systems (starting with the raw data extracts)
- into an intermediate, common BO4E based data layer.
- From there map data to individual target system data models
- and finally create records in target systems (aka "loading").
The framework
- encourages user to program consistent data processing pipelines from any source to any target system
- enforces users to adapt to structured and consistent patterns
- and by doing so will lead to higher chances for maintainable and reusable code.
## Architeture / Overview
The overall setup for a migration from 1-n source systems (A, B, C...) to 1-m target systems (1,2, 3...) might look like this:
```mermaid
graph TD
A[Source System A] -->|System A DB Dump| A2[Source A Data Model: A JSON Extract]
B[Source System B] -->|System B CSV Export| B2[Source B Data Model: B CSV Files]
A2 -->|SourceAToBo4eDataSetMapper| C{Intermediate BO4E Layer aka DataSets}
B2 -->|SourceBToBo4eDataSetMapper| C
C -->|validations| C
C -->|Bo4eDataSetToTarget1Mapper| D1[Target 1 Data Model]
C -->|Bo4eDataSetToTarget2Mapper| D2[Target 2 Data Model]
C -->|Bo4eDataSetToTarget3Mapper| D3[Target 3 Data Model]
D1 -->L1[Target 1 Loader]
D2 -->L2[Target 2 Loader]
D3 -->L3[Target 3 Loader]
L1 -->M1[Target System 1]
L2 -->M2[Target System 2]
L3 -->M3[Target System 3]
```
The Intermediate BO4E Layer (that consists of different so called DataSets) is kind of a contract between the code that maps *from the source data model* and the code that maps *to the target data model*.
### Data Migration Flow
The migration of specific data from source to target is always the same:
```mermaid
graph TD
A1{Source Data 1} -->|Export| B1(All source data 1 extracts)
B1 -->C1[Filter on source data 1 model aka Pre-Select 1]
A2{Source Data 2} -->|Export| B2(All source data 2 extracts)
B2 -->C2[Filter on source data 2 model aka Pre-Select 2]
C1 -->|do not match filter predicate| Z{discarded data}
C1 -->|match filter criteria| M(Custom Logic: SourceDataSetToBo4EDataSetMapper)
C2 -->|do not match filter predicate| Z
C2 -->|match filter criteria| M
M -->|mapping| E(BO4E Data Sets)
E -->F[Validation]
F -->|obeys a validation rule|E
F -->|violate any validation rule|Z
F -->|passes all validations| G[BO4E to Target Mapper]
G -->|mapping| H(target data model)
H -->I[Target Loader]
I -->|load target model|L1[Loader: 1. load to target]
L1 -->|first: load to|T{Target System}
L1 -->|then|L2[Loader: 2 optionally poll until target has processed data]
L2 -->|second: poll until|T
L2 -->|then|L3[Loader: 3 optionally verify the data have been processed correctly]
L3 -->|finally: verify|T
L3 -->|verification failed|Z
L1 -->|loading failed|Z
L3 -->|verification successful|Y[The End.]
Z-->Z1[Monitoring and Logging]
Z1-->Z2[Human Analyst]
Z2 -.->|manually checks| T
Z2 -.->|feedback: heuristically define new rules for|F
Z2 -.->|feedback: heurisically define new filters for|C
```
## How to use this Repository on Your Machine (Development)
Please follow the [instructions in our Python Template Repository](https://github.com/Hochfrequenz/python_template_repository).
tl;dr: `tox`.
## Contribute
You are very welcome to contribute to this template repository by opening a pull request against the main branch.
pydantic>=2.0.0
typeguard>=4.0.1
frozendict
bidict
networkx
injector
python-generics
pvframework
.gitignore
.pre-commit-config.yaml
LICENSE
README.md
pyproject.toml
requirements.in
requirements.txt
setup.cfg
setup.py
tox.ini
.github/dependabot.yml
.github/workflows/black.yml
.github/workflows/codeql-analysis.yml
.github/workflows/coverage.yml
.github/workflows/dependabot_automerge.yml
.github/workflows/no_byte_order_mark.yml
.github/workflows/packaging_test.yml
.github/workflows/python-publish.yml
.github/workflows/pythonlint.yml
.github/workflows/unittests.yml
.vscode/settings.json
dev_requirements/requirements-coverage.in
dev_requirements/requirements-coverage.txt
dev_requirements/requirements-formatting.in
dev_requirements/requirements-formatting.txt
dev_requirements/requirements-linting.in
dev_requirements/requirements-linting.txt
dev_requirements/requirements-test_packaging.in
dev_requirements/requirements-test_packaging.txt
dev_requirements/requirements-tests.in
dev_requirements/requirements-tests.txt
dev_requirements/requirements-type_check.in
dev_requirements/requirements-type_check.txt
src/bomf/__init__.py
src/bomf/config.py
src/bomf/py.typed
src/bomf.egg-info/PKG-INFO
src/bomf.egg-info/SOURCES.txt
src/bomf.egg-info/dependency_links.txt
src/bomf.egg-info/not-zip-safe
src/bomf.egg-info/requires.txt
src/bomf.egg-info/top_level.txt
src/bomf/filter/__init__.py
src/bomf/filter/sourcedataproviderfilter.py
src/bomf/loader/__init__.py
src/bomf/loader/entityloader.py
src/bomf/logging/__init__.py
src/bomf/mapper/__init__.py
src/bomf/model/__init__.py
src/bomf/provider/__init__.py
unittests/__init__.py
unittests/conftest.py
unittests/example_source_data.json
unittests/models.py
unittests/test_bo4e_data_set.py
unittests/test_entity_loader.py
unittests/test_filter.py
unittests/test_list_conversion.py
unittests/test_mapper.py
unittests/test_migration.py
unittests/test_source_data_provider.py
"""
This file is here, because this allows for best de-coupling of tests and application/library logic.
Further reading: https://docs.pytest.org/en/6.2.x/goodpractices.html#tests-outside-application-code
"""
import logging
import pytest
from bomf.logging import initialize_logger
@pytest.fixture(scope="session", autouse=True)
def setup_log_context_var_fixture():
"""
Set up the logging configuration. This fixture is automatically used by pytest.
"""
initialize_logger(logging.getLogger("bomf-tests"))
print("Initialized logger", flush=True)
{
"foo": "bar",
"data": [{ "myKey":"hello", "asd": "fgh" }, { "myKey": "world", "qwe": "rtz" }]
}
"""
models used for testing
"""
import enum
from typing import TypeVar, Union
from bo4e.bo.geschaeftsobjekt import Geschaeftsobjekt
from bo4e.com.com import COM
from pydantic import BaseModel
_SpecificBusinessObject = TypeVar("_SpecificBusinessObject", bound=Geschaeftsobjekt)
"""
an arbitrary but fixed business object type
"""
_SpecificCom = TypeVar("_SpecificCom", bound=COM)
"""
an arbitrary but fixed COM type
"""
Bo4eTyp = Union[_SpecificBusinessObject, _SpecificCom] # pylint: disable=invalid-name
# pylint:disable=too-few-public-methods
class BusinessObjectRelation(BaseModel):
"""
A business object relation describes the relation between two business object.
E.g. a relation could have the type "has_melo" where relation_part_a is a bo4e.bo.Vertrag
and relation_part_b is a bo4e.bo.Messlokation. Some relations are already defined in BO4E itself (e.g MaLo/MeLo)
or MeLo/Address.
The idea is to not enforce too much of a structure to the downstream code but still push coders to think about
necessary relation information.
"""
relation_type: enum.Enum
"""
The relation type describes how two business objects relate to each other.
This is not (only) about cardinality. It's about being able to model different relations between objects.
Think about e.g. a business partner and an address: The relation could be:
- the address is the residential address of the business partner
- the address is the invoice address of the business partner
- the address is the place where the business partner was born
All these relation types are 1:1 relations between business partners and adresses, yet they all carry different
meaning which we'd like to distinguish in our data.
"""
relation_part_a: Bo4eTyp
"""
one Business Object or COM
"""
relation_part_b: Bo4eTyp
"""
another Business Object or COM
"""
import enum
from typing import Iterable, Optional, Type
import pytest # type:ignore[import]
from bo4e.bo.geschaeftspartner import Geschaeftspartner
from bo4e.com.adresse import Adresse
from bomf.model import Bo4eDataSet
from .models import Bo4eTyp, BusinessObjectRelation
class _GeschaeftspartnerAdresseRelation(enum.Enum):
HAS_LIEFERANSCHRIFT = 1
HAS_RECHNUNGSANSCHRIFT = 2
HAS_GEBURTSORT = 3
class _ExampleDataSet(Bo4eDataSet):
business_partner: Geschaeftspartner = Geschaeftspartner.construct(name1="Müller", name2="Klaus")
address: Adresse = Adresse.construct(strasse="Rechnungsstrasse", hausnummer="5")
def get_relations(self) -> Iterable[BusinessObjectRelation]:
return [
BusinessObjectRelation(
relation_type=_GeschaeftspartnerAdresseRelation.HAS_LIEFERANSCHRIFT,
relation_part_a=self.business_partner,
relation_part_b=self.address,
)
]
def get_business_object(self, bo_type: Type[Bo4eTyp], specification: Optional[str] = None) -> Bo4eTyp:
# pyling:disable=fixme
# todo: find out how to allow the static type checker to not complain about the "dynamic" type
if bo_type == Geschaeftspartner:
return self.business_partner # type:ignore[return-value]
if bo_type == Adresse:
return self.address # type:ignore[return-value]
raise NotImplementedError(f"The bo type {bo_type} is not implemented")
class TestBo4eDataSet:
async def test_example_data_set(self):
dataset: _ExampleDataSet = _ExampleDataSet()
assert len(list(dataset.get_relations())) == 1
assert isinstance(dataset.get_business_object(Geschaeftspartner), Geschaeftspartner)
assert isinstance(dataset.get_business_object(Adresse), Adresse)
assert dataset.get_id() is not None
import asyncio
import json
import tempfile
from pathlib import Path
from typing import Optional, Type
import pytest
from pydantic import BaseModel, ConfigDict, Field, RootModel, TypeAdapter
from typing_extensions import deprecated
from bomf.loader.entityloader import (
EntityLoader,
EntityLoadingResult,
JsonFileEntityLoader,
PydanticJsonFileEntityLoader,
)
class _ExampleEntity:
pass
class TestEntityLoader:
class _ExampleEntityLoader(EntityLoader):
def __init__(self):
self.sanitize_called: bool = False
self.verification_called: bool = False
self.loading_called: bool = False
self.polling_called: bool = False
def sanitize(self, entity: _ExampleEntity) -> None:
assert entity is not None
self.sanitize_called = True
async def verify(self, entity: _ExampleEntity, id_in_target_system: Optional[str] = None) -> bool:
self.verification_called = True
return True
async def load_entity(self, entity: _ExampleEntity) -> Optional[EntityLoadingResult]:
self.loading_called = True
return EntityLoadingResult(id_in_target_system="foo", polling_task=self.polling_callback("foo"))
async def polling_callback(self, entity_id: str):
assert entity_id == "foo"
self.polling_called = True
async def test_all_overrides_are_called(self):
example_loader = TestEntityLoader._ExampleEntityLoader()
result = await example_loader.load(_ExampleEntity())
assert example_loader.sanitize_called is True
assert example_loader.loading_called is True
assert example_loader.polling_called is True
assert example_loader.verification_called is True
assert result.was_loaded_successfully is True
assert result.loaded_at is not None
assert result.verified_at is not None
assert result.verified_at >= result.loaded_at
assert result.loading_error is None
async def test_all_overrides_are_called_batch(self):
example_loader = TestEntityLoader._ExampleEntityLoader()
result = await example_loader.load_entities([_ExampleEntity()])
assert example_loader.sanitize_called is True
assert example_loader.loading_called is True
assert example_loader.polling_called is True
assert example_loader.verification_called is True
assert result[0].was_loaded_successfully is True
assert result[0].loaded_at is not None
assert result[0].verified_at is not None
assert result[0].verified_at >= result[0].loaded_at
assert result[0].loading_error is None
async def test_there_is_a_default_sanitize_step(self):
class _ExampleEntityLoaderWithOutSanitize(EntityLoader):
# no def sanitize()
async def verify(self, entity: _ExampleEntity, id_in_target_system: Optional[str] = None) -> bool:
return True
async def load_entity(self, entity: _ExampleEntity) -> Optional[EntityLoadingResult]:
return None
example_loader = _ExampleEntityLoaderWithOutSanitize()
result = await example_loader.load(_ExampleEntity()) # must not crash
assert result.was_loaded_successfully is True
assert result.loaded_at is not None
assert result.verified_at is not None
assert result.verified_at >= result.loaded_at
assert result.loading_error is None
async def test_all_overrides_are_called_on_error(self):
class _ExampleEntityLoaderThatCrashesOnLoad(EntityLoader):
def __init__(self):
self.sanitize_called: bool = False
self.loading_called: bool = False
def sanitize(self, entity: _ExampleEntity) -> None:
assert entity is not None
self.sanitize_called = True
async def verify(self, entity: _ExampleEntity, id_in_target_system: Optional[str] = None) -> bool:
raise NotImplementedError()
async def load_entity(self, entity: _ExampleEntity) -> Optional[EntityLoadingResult]:
self.loading_called = True
raise ValueError("Something is wrong")
example_loader = _ExampleEntityLoaderThatCrashesOnLoad()
result = await example_loader.load(_ExampleEntity())
assert example_loader.sanitize_called is True
assert example_loader.loading_called is True
assert result.was_loaded_successfully is False
assert result.loaded_at is None
assert result.verified_at is None
assert isinstance(result.loading_error, ValueError) is True
class MyPydanticClass(BaseModel):
model_config = ConfigDict(
populate_by_name=True,
)
foo: str
bar: int = Field(alias="bar")
test: str = Field(alias="random_foo_bar")
class MyPydanticOnlyLoader(PydanticJsonFileEntityLoader[MyPydanticClass]):
"""entity loader for my pydantic class; does not use any json.load/dump functions"""
@deprecated("use PydanticJsonFileEntityLoader instead; this is just here to keep the coverage of JsonFileEntityLoader")
class LegacyPydanticJsonFileEntityLoader(JsonFileEntityLoader[MyPydanticClass]):
"""
A json file entity loader specifically for pydantic models (legacy code)
"""
def __init__(self, file_path: Path):
"""provide a file path"""
list_type_adapter = TypeAdapter(list[MyPydanticClass])
super().__init__(
file_path=file_path,
list_encoder=lambda x: list_type_adapter.dump_python(x, by_alias=True),
)
class TestPydanticJsonFileEntityLoader:
@pytest.mark.parametrize("number_of_models", [2, 20, 2000])
@pytest.mark.parametrize(
"loader_class", [pytest.param(MyPydanticOnlyLoader), pytest.param(LegacyPydanticJsonFileEntityLoader)]
)
async def test_dumping_to_file_via_load_entities(
self, number_of_models: int, loader_class: Type[EntityLoader[MyPydanticClass]], tmp_path
):
my_entities = [
MyPydanticClass(foo="asd", bar=x, test="test") for x in range(number_of_models) # type:ignore[call-arg]
]
file_path = Path(tmp_path) / Path("foo.json")
my_loader = loader_class(file_path) # type:ignore[call-arg]
await my_loader.load_entities(my_entities)
del my_loader
with open(file_path, "r", encoding="utf-8") as infile:
json_body = json.load(infile)
assert len(json_body) == number_of_models
assert json_body == [{"foo": "asd", "bar": x, "random_foo_bar": "test"} for x in range(number_of_models)]
@pytest.mark.parametrize("number_of_models", [2, 20, 2000])
@pytest.mark.parametrize(
"loader_class", [pytest.param(MyPydanticOnlyLoader), pytest.param(LegacyPydanticJsonFileEntityLoader)]
)
async def test_dumping_to_file_via_load_entity(
self, number_of_models: int, loader_class: Type[EntityLoader[MyPydanticClass]], tmp_path
):
my_entities = [
MyPydanticClass(foo="asd", bar=x, test="test") for x in range(number_of_models) # type:ignore[call-arg]
]
file_path = Path(tmp_path) / Path("foo.json")
my_loader = loader_class(file_path) # type:ignore[call-arg]
loading_tasks = [my_loader.load_entity(x) for x in my_entities]
await asyncio.gather(*loading_tasks)
del my_loader
with open(file_path, "r", encoding="utf-8") as infile:
json_body = json.load(infile)
assert len(json_body) == number_of_models
# we cannot guarantee the order of the entities
@pytest.mark.parametrize("load_multiple", [True, False])
@pytest.mark.parametrize(
"loader_class", [pytest.param(MyPydanticOnlyLoader), pytest.param(LegacyPydanticJsonFileEntityLoader)]
)
async def test_loader_doesnt_crash_for_empty_file(
self, loader_class: Type[EntityLoader[MyPydanticClass]], load_multiple: bool
):
json_file_path: Path
try:
with tempfile.NamedTemporaryFile(mode="w+", suffix=".json", delete=False) as tmp_file:
json_file_path = Path(tmp_file.name)
assert json_file_path.exists()
json_file_loader = loader_class(json_file_path) # type:ignore[call-arg]
if load_multiple:
_ = await json_file_loader.load_entities([])
else:
_ = await json_file_loader.load_entity(
MyPydanticClass(foo="asd", bar=123, test="test") # type:ignore[call-arg]
)
finally:
json_file_path.unlink()
import dataclasses
import logging
from itertools import groupby
import pytest # type:ignore[import]
from bomf.filter import AggregateFilter, AllowlistFilter, BlocklistFilter, Filter
from bomf.filter.sourcedataproviderfilter import SourceDataProviderFilter
from bomf.provider import ListBasedSourceDataProvider, SourceDataProvider
class _FooFilter(Filter[dict]):
async def predicate(self, candidate: dict) -> bool:
return "foo" in candidate and candidate["foo"] == "bar"
class TestFilter:
@pytest.mark.parametrize(
"filter_under_test,candidates,survivors",
[
pytest.param(
_FooFilter(),
[{"foo": "baz"}, {"foo": "bar"}],
[{"foo": "bar"}],
),
],
)
async def test_filter(self, filter_under_test: Filter, candidates: list[dict], survivors: list[dict], caplog):
caplog.set_level(logging.DEBUG, logger="")
actual = await filter_under_test.apply(candidates)
assert actual == survivors
assert "1 out of 2 candidates have been removed by the filter" in caplog.messages
@dataclasses.dataclass
class _MyCandidate:
number: int
string: str
@dataclasses.dataclass
class _MyAggregate:
group_key: str
max_number_for_key: int
candidate: _MyCandidate
class _BarFilter(AggregateFilter):
"""
An Aggregate Filter that groups _MyCandidates by their string attribute and keeps only those entries that have the
highest number (attribute) in their respective group.
It's basically a show-case test that allows to understand how the aggregate filters are supposed to be used.
"""
def __init__(self):
class _BaseFilter(Filter[_MyAggregate]):
async def predicate(self, candidate: _MyAggregate) -> bool:
return candidate.max_number_for_key == candidate.candidate.number
base_filter = _BaseFilter()
super(_BarFilter, self).__init__(base_filter)
async def aggregate(self, candidates: list[_MyCandidate]) -> list[_MyAggregate]:
result: list[_MyAggregate] = []
for group_key, group in groupby(sorted(candidates, key=lambda c: c.string), lambda c: c.string):
group_items = list(group)
max_number_in_group = max(group_item.number for group_item in group_items)
for group_item in group_items:
result.append(
_MyAggregate(group_key=group_key, max_number_for_key=max_number_in_group, candidate=group_item)
)
return result
def disaggregate(self, aggregate: _MyAggregate) -> _MyCandidate:
return aggregate.candidate
class TestAggregateFilter:
@pytest.mark.parametrize(
"filter_under_test,candidates,survivors",
[
pytest.param(
_BarFilter(),
[
_MyCandidate(number=1, string="foo"),
_MyCandidate(number=19, string="bar"),
_MyCandidate(number=2, string="foo"),
_MyCandidate(number=17, string="bar"),
],
[_MyCandidate(number=19, string="bar"), _MyCandidate(number=2, string="foo")],
),
],
)
async def test_aggregate_filter(
self, filter_under_test: AggregateFilter, candidates: list[dict], survivors: list[dict], caplog
):
caplog.set_level(logging.DEBUG, logger="")
actual = await filter_under_test.apply(candidates)
assert actual == survivors
assert "There are 4 candidates and 4 aggregates" in caplog.messages
assert "There are 2 filtered aggregates left" in caplog.messages
class TestBlockAndAllowlistFilter:
async def test_allowlist_filter(self):
allowlist = {"A", "B", "C"}
candidates: list[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
allowlist_filter: AllowlistFilter[dict[str, str], str] = AllowlistFilter(lambda c: c["foo"], allowlist)
actual = await allowlist_filter.apply(candidates)
assert actual == [{"foo": "A"}, {"foo": "B"}]
async def test_blocklist_filter(self):
blocklist = {"A", "B", "C"}
candidates: list[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
blocklist_filter: BlocklistFilter[dict[str, str], str] = BlocklistFilter(lambda c: c["foo"], blocklist)
actual = await blocklist_filter.apply(candidates)
assert actual == [{"foo": "Z"}]
class TestSourceDataProviderFilter:
@pytest.mark.parametrize(
"candidate_filter,candidates,survivors",
[
pytest.param(
_BarFilter(),
[
_MyCandidate(number=1, string="foo"),
_MyCandidate(number=19, string="bar"),
_MyCandidate(number=2, string="foo"),
_MyCandidate(number=17, string="bar"),
],
[_MyCandidate(number=19, string="bar"), _MyCandidate(number=2, string="foo")],
),
],
)
async def test_source_data_provider_filter(
self,
candidate_filter: Filter[_MyCandidate],
candidates: list[_MyCandidate],
survivors: list[_MyCandidate],
caplog,
):
my_provider: ListBasedSourceDataProvider[_MyCandidate, int] = ListBasedSourceDataProvider(
candidates, key_selector=lambda mc: mc.number
)
sdp_filter: SourceDataProviderFilter[_MyCandidate, int] = SourceDataProviderFilter(candidate_filter)
caplog.set_level(logging.DEBUG, logger="")
filtered_provider = await sdp_filter.apply(my_provider)
assert isinstance(filtered_provider, SourceDataProvider)
actual = await filtered_provider.get_data()
assert actual == survivors
assert "There are 4 candidates and 4 aggregates" in caplog.messages
assert "There are 2 filtered aggregates left" in caplog.messages
async def test_source_data_provider_filter_error(self):
my_provider: ListBasedSourceDataProvider[dict, str] = ListBasedSourceDataProvider(
[{"foo": "bar"}, {"foo": "notbar"}], key_selector=lambda d: d["foo"]
)
del my_provider.key_selector
sdp_filter: SourceDataProviderFilter[dict, str] = SourceDataProviderFilter(_FooFilter())
with pytest.raises(AttributeError):
await sdp_filter.apply(my_provider)
import logging
from bomf.mapper import (
convert_single_mapping_into_list_mapping_with_single_pokemon_catchers,
convert_single_mapping_task_into_list_mapping_task_with_single_pokemon_catchers,
)
class TestListMappingConversion:
async def test_conversion_async(self, caplog):
async def mapping_func(x: int) -> str:
if x == 3:
raise Exception("Fatal crash")
return str(x)
caplog.set_level(logging.ERROR, "foo")
logger = logging.getLogger("foo")
actual = convert_single_mapping_task_into_list_mapping_task_with_single_pokemon_catchers(mapping_func, logger)
test_result = await actual([1, 2, 3, 4, 5])
assert test_result == ["1", "2", "4", "5"]
assert caplog.messages[0] == "Error while calling mapping_func on 3: Fatal crash"
def test_conversion_sync(self, caplog):
def mapping_func(x: int) -> str:
if x == 3:
raise Exception("Fatal crash")
return str(x)
caplog.set_level(logging.ERROR, "foo")
logger = logging.getLogger("foo")
actual = convert_single_mapping_into_list_mapping_with_single_pokemon_catchers(mapping_func, logger)
test_result = actual([1, 2, 3, 4, 5])
assert test_result == ["1", "2", "4", "5"]
assert caplog.messages[0] == "Error while calling mapping_func on 3: Fatal crash"
from typing import Optional, Type
import pytest # type:ignore[import]
from bo4e.bo.marktlokation import Marktlokation
from bo4e.bo.messlokation import Messlokation
from pydantic import BaseModel
from bomf.mapper import Bo4eDataSetToTargetMapper, PaginationNotSupportedException, SourceToBo4eDataSetMapper
from .models import Bo4eTyp
class _NotImplementedBo4eDataSetMixin:
"""
a mixin to inherit from if you'd like to have correct types but don't care about the logic
"""
class _MaLoAndMeLo(BaseModel, _NotImplementedBo4eDataSetMixin):
malo: Marktlokation
melo: Messlokation
def get_business_object(self, bo_type: Type[Bo4eTyp], specification: Optional[str] = None) -> Bo4eTyp:
# pyling:disable=fixme
# todo: find out how to allow the static type checker to not complain about the "dynamic" type
if bo_type == Marktlokation:
return self.malo # type:ignore[return-value]
if bo_type == Messlokation:
return self.melo # type:ignore[return-value]
raise NotImplementedError(f"The bo type {bo_type} is not implemented")
# in these tests we assume, that:
# - the source data model is a dictionary
# - the intermediate data model are BO4E MaLo and MeLo
# - the target data model is a list of string
# This is just to demonstrate the mapping structures.
class _DictToMaLoMeLoMapper(SourceToBo4eDataSetMapper):
async def create_data_sets(self, offset: Optional[int] = None, limit: Optional[int] = None) -> list[_MaLoAndMeLo]:
if limit is not None or offset is not None:
raise PaginationNotSupportedException()
return [
_MaLoAndMeLo(
melo=Messlokation.construct(messlokations_id=source["meloId"]),
malo=Marktlokation.construct(marktlokations_id=source["maloId"]),
)
for source in [{"maloId": "54321012345", "meloId": "DE000111222333"}]
]
class _MaLoMeLoToListMapper(Bo4eDataSetToTargetMapper):
async def create_target_model(self, dataset: _MaLoAndMeLo) -> list[str]:
return [
dataset.get_business_object(Marktlokation).marktlokations_id,
dataset.get_business_object(Messlokation).messlokations_id,
]
class TestMapper:
async def test_source_to_intermediate_mapper_batch(self):
mapper = _DictToMaLoMeLoMapper()
actual = await mapper.create_data_sets()
assert actual == [
_MaLoAndMeLo(
melo=Messlokation.construct(messlokations_id="DE000111222333"),
malo=Marktlokation.construct(marktlokations_id="54321012345"),
)
]
async def test_intermediate_to_target_mapper(self):
"""
tests the single data set mapping
"""
mapper = _MaLoMeLoToListMapper()
actual = await mapper.create_target_model(
_MaLoAndMeLo(
melo=Messlokation.construct(messlokations_id="DE000111222333"),
malo=Marktlokation.construct(marktlokations_id="54321012345"),
)
)
assert actual == ["54321012345", "DE000111222333"]
async def test_intermediate_to_target_mapper_batch(self):
"""
test the batch mapping
"""
mapper = _MaLoMeLoToListMapper()
actual = await mapper.create_target_models(
[
_MaLoAndMeLo(
melo=Messlokation.construct(messlokations_id="DE000111222333"),
malo=Marktlokation.construct(marktlokations_id="54321012345"),
)
]
)
assert actual == [["54321012345", "DE000111222333"]]
"""
Tests the overall data flow using bomf.
"""
from datetime import UTC, datetime
from typing import Optional
from unittest.mock import Mock
from injector import Binder, Injector
from pvframework import PathMappedValidator, Validator
from pvframework.types import SyncValidatorFunction
from bomf import (
Bo4eDataSetToTargetMapper,
EntityLoader,
Filter,
MigrationStrategy,
SourceDataProvider,
SourceToBo4eDataSetMapper,
ValidationManager,
)
from bomf.config import MigrationConfig
from bomf.loader.entityloader import EntityLoadingResult
from bomf.model import Bo4eDataSet
from bomf.provider import KeyTyp
_MySourceDataModel = dict[str, str]
_MyKeyTyp = str
_MyTargetDataModel = list[str]
class _MyIntermediateDataModel(Bo4eDataSet):
data: dict[str, str]
def get_id(self) -> str:
return "12345"
class _MySourceDataProvider(SourceDataProvider[_MySourceDataModel, _MyKeyTyp]):
async def get_entry(self, key: KeyTyp) -> _MySourceDataModel:
raise NotImplementedError("Not relevant for the test")
async def get_data(self) -> list[_MySourceDataModel]:
return [
{"foo": "bar"},
{"FOO": "BAR"},
{"Foo": "Bar"},
{"remove by filter": "should not pass the filter"},
# {"invalid": "doesn't matter"},
]
class _MyFilter(Filter[_MySourceDataModel]):
async def predicate(self, candidate: _MySourceDataModel) -> bool:
return "remove by filter" not in candidate
class _MyToBo4eMapper(SourceToBo4eDataSetMapper[_MyIntermediateDataModel]):
def __init__(self, what_ever_you_like: list[_MySourceDataModel]):
# what_ever_you_like is a place holde for all the relation magic that may happen
self._source_models = what_ever_you_like
async def create_data_sets(
self, offset: Optional[int] = None, limit: Optional[int] = None
) -> list[_MyIntermediateDataModel]:
if offset is not None and limit is not None:
return [_MyIntermediateDataModel(data=source) for source in self._source_models[offset : offset + limit]]
return [_MyIntermediateDataModel(data=source) for source in self._source_models]
def _my_rule(data: dict[str, str]):
if "invalid" in data:
raise ValueError("'invalid' in data")
_my_mapped_validator: PathMappedValidator[_MyIntermediateDataModel, SyncValidatorFunction] = PathMappedValidator(
Validator(_my_rule), {"data": "data"}
)
_my_validation = ValidationManager[_MyIntermediateDataModel]()
_my_validation.register(_my_mapped_validator)
class _MyToTargetMapper(Bo4eDataSetToTargetMapper[_MyTargetDataModel, _MyIntermediateDataModel]):
async def create_target_model(self, dataset: _MyIntermediateDataModel) -> _MyTargetDataModel:
my_dict = dataset.data
for my_key, my_value in my_dict.items():
return [my_key, my_value]
return ["doesnt", "matter"]
class _MyTargetLoader(EntityLoader):
async def load_entity(self, entity: _MyTargetDataModel) -> Optional[EntityLoadingResult]:
async def polling():
return True
return EntityLoadingResult(id_in_target_system="Fooooo", polling_task=polling())
async def verify(self, entity: _MyTargetDataModel, id_in_target_system: Optional[str] = None) -> bool:
return True
class MyMigrationStrategy(MigrationStrategy[_MyIntermediateDataModel, _MyTargetDataModel]):
pass
my_migration_config = MigrationConfig(
migration_key_date=datetime(2021, 1, 1, tzinfo=UTC),
)
class TestMigrationStrategy:
"""
This is more of an integration than a unit test. All the single components come together here.
"""
async def test_happy_path(self):
# here's some pre-processing, you can read some data, you can create relations, whatever
raw_data = await _MySourceDataProvider().get_data()
survivors = await _MyFilter().apply(raw_data)
to_bo4e_mapper = _MyToBo4eMapper(what_ever_you_like=survivors)
strategy = MyMigrationStrategy(
source_data_to_bo4e_mapper=to_bo4e_mapper,
validation_manager=_my_validation,
bo4e_to_target_mapper=_MyToTargetMapper(),
target_loader=_MyTargetLoader(),
config=my_migration_config,
)
result = await strategy.migrate()
assert result is not None
assert len(result) == 3
async def test_happy_path_paginated(self):
# here's some pre-processing, you can read some data, you can create relations, whatever
raw_data = await _MySourceDataProvider().get_data()
survivors = await _MyFilter().apply(raw_data)
to_bo4e_mapper = _MyToBo4eMapper(what_ever_you_like=survivors)
strategy = MyMigrationStrategy(
source_data_to_bo4e_mapper=to_bo4e_mapper,
validation_manager=_my_validation,
bo4e_to_target_mapper=_MyToTargetMapper(),
target_loader=_MyTargetLoader(),
config=my_migration_config,
)
result = await strategy.migrate_paginated(1) # the chunk_size arg here is the only difference to the other test
assert result is not None
assert len(result) == 3 # = source models -1(filter) -1(validation)
async def test_migration_strategy_injector(self):
# here's some pre-processing, you can read some data, you can create relations, whatever
raw_data = await _MySourceDataProvider().get_data()
survivors = await _MyFilter().apply(raw_data)
def _inject_for_migration_strategy(binder: Binder):
to_bo4e_mapper = _MyToBo4eMapper(what_ever_you_like=survivors)
binder.bind(SourceToBo4eDataSetMapper, to=to_bo4e_mapper)
binder.bind(ValidationManager, to=_my_validation)
binder.bind(Bo4eDataSetToTargetMapper, to=_MyToTargetMapper()) # type: ignore[type-abstract]
binder.bind(EntityLoader, to=_MyTargetLoader()) # type: ignore[type-abstract]
binder.bind(MigrationConfig, to=my_migration_config)
def _inject_for_migration_strategy_dummy(binder: Binder):
binder.bind(SourceToBo4eDataSetMapper, to=Mock(SourceToBo4eDataSetMapper))
binder.bind(ValidationManager, to=Mock(ValidationManager))
binder.bind(Bo4eDataSetToTargetMapper, to=Mock(Bo4eDataSetToTargetMapper)) # type: ignore[type-abstract]
binder.bind(EntityLoader, to=Mock(EntityLoader)) # type: ignore[type-abstract]
binder.bind(MigrationConfig, to=Mock(MigrationConfig))
injector = Injector(_inject_for_migration_strategy)
injector_dummy = Injector(_inject_for_migration_strategy_dummy)
strategy = injector.get(MyMigrationStrategy)
strategy_dummy = injector_dummy.get(MyMigrationStrategy)
assert isinstance(strategy, MyMigrationStrategy)
assert isinstance(strategy.source_data_to_bo4e_mapper, _MyToBo4eMapper)
assert isinstance(strategy_dummy, MyMigrationStrategy)
assert isinstance(strategy_dummy.source_data_to_bo4e_mapper, Mock)
result = await strategy.migrate()
assert result is not None
assert len(result) == 3 # = source models -1(filter) -1(validation)
import logging
from pathlib import Path
import pytest # type:ignore[import]
from bomf.provider import JsonFileSourceDataProvider, KeyTyp, ListBasedSourceDataProvider, SourceDataProvider
class LegacyDataSystemDataProvider(SourceDataProvider):
"""
a dummy for access to a legacy system from which we want to migrate data
"""
async def get_entry(self, key: KeyTyp) -> str:
raise NotImplementedError("Not relevant for this test")
async def get_data(self) -> list[str]:
return ["foo", "bar", "baz"]
class TestSourceDataProvider:
async def test_provider(self):
# this is a pretty dumb test
provider_under_test = LegacyDataSystemDataProvider()
assert isinstance(await provider_under_test.get_data(), list)
async def test_json_file_provider(self):
file_path = Path(__file__).parent / Path("example_source_data.json")
example_json_data_provider = JsonFileSourceDataProvider(
file_path,
data_selector=lambda d: d["data"], # type:ignore[call-overload]
key_selector=lambda d: d["myKey"], # type:ignore[index]
)
assert await example_json_data_provider.get_data() == [
{"myKey": "hello", "asd": "fgh"},
{"myKey": "world", "qwe": "rtz"},
]
assert await example_json_data_provider.get_paginated_data(offset=0, limit=0) == []
assert await example_json_data_provider.get_paginated_data(offset=1, limit=1) == [
{"myKey": "world", "qwe": "rtz"}
]
assert await example_json_data_provider.get_paginated_data(offset=1, limit=10) == [
{"myKey": "world", "qwe": "rtz"}
]
assert await example_json_data_provider.get_paginated_data(offset=2, limit=10) == []
assert await example_json_data_provider.get_entry("world") == {"myKey": "world", "qwe": "rtz"}
with pytest.raises(KeyError):
_ = await example_json_data_provider.get_entry("something unknown")
class TestListBasedSourceDataProvider:
async def test_list_based_provider(self, caplog):
caplog.set_level(logging.DEBUG, logger="")
my_provider = ListBasedSourceDataProvider(["foo", "bar", "baz"], key_selector=lambda x: x)
assert len(await my_provider.get_data()) == 3
assert len(await my_provider.get_paginated_data(offset=0, limit=0)) == 0
assert len(await my_provider.get_paginated_data(offset=0, limit=3)) == 3
assert len(await my_provider.get_paginated_data(offset=0, limit=30)) == 3
assert len(await my_provider.get_paginated_data(offset=1, limit=30)) == 2
assert len(await my_provider.get_paginated_data(offset=3, limit=30)) == 0
assert await my_provider.get_entry("bar") == "bar"
assert "Read 3 records from ['foo', 'bar', 'baz']" in caplog.messages
async def test_list_based_provider_key_warning(self, caplog):
caplog.set_level(logging.WARNING, logger=ListBasedSourceDataProvider.__module__)
my_provider = ListBasedSourceDataProvider(["fooy", "fooz" "bar", "baz"], key_selector=lambda x: x[0:3])
assert len(await my_provider.get_data()) == 3
assert (
"There are 2>1 entries for the key 'foo'. You might miss entries because the key is not unique."
in caplog.messages
)