acstore
Advanced tools
| # -*- coding: utf-8 -*- | ||
| """The error objects.""" | ||
| class Error(Exception): | ||
| """The error interface.""" | ||
| class ParseError(Error): | ||
| """Raised when a parse error occurred.""" |
| # -*- coding: utf-8 -*- |
| # -*- coding: utf-8 -*- | ||
| """Schema helper.""" | ||
| class SchemaHelper(object): | ||
| """Schema helper.""" | ||
| # Data types and corresponding attribute serializers per method. | ||
| _data_types = { | ||
| 'AttributeContainerIdentifier': None, | ||
| 'bool': None, | ||
| 'int': None, | ||
| 'str': None, | ||
| 'timestamp': None} | ||
| @classmethod | ||
| def DeregisterDataType(cls, data_type): | ||
| """Deregisters an data type. | ||
| Args: | ||
| data_type (str): data type. | ||
| Raises: | ||
| KeyError: if the data type is not set. | ||
| """ | ||
| if data_type not in cls._data_types: | ||
| raise KeyError(f'Data type: {data_type:s} not set.') | ||
| del cls._data_types[data_type] | ||
| @classmethod | ||
| def GetAttributeSerializer(cls, data_type, serialization_method): | ||
| """Retrieves a specific attribute serializer. | ||
| Args: | ||
| data_type (str): data type. | ||
| serialization_method (str): serialization method. | ||
| Returns: | ||
| AttributeSerializer: attribute serializer or None if not available. | ||
| """ | ||
| serializers = cls._data_types.get(data_type, None) or {} | ||
| return serializers.get(serialization_method, None) | ||
| @classmethod | ||
| def HasDataType(cls, data_type): | ||
| """Determines is a specific data type is supported by the schema. | ||
| Args: | ||
| data_type (str): data type. | ||
| Returns: | ||
| bool: True if the data type is supported, or False otherwise. | ||
| """ | ||
| return data_type in cls._data_types | ||
| @classmethod | ||
| def RegisterDataType(cls, data_type, serializers): | ||
| """Registers a data type. | ||
| Args: | ||
| data_type (str): data type. | ||
| serializers (dict[str, AttributeSerializer]): attribute serializers per | ||
| method. | ||
| Raises: | ||
| KeyError: if the data type is already set. | ||
| """ | ||
| if data_type in cls._data_types: | ||
| raise KeyError(f'Data type: {data_type:s} already set.') | ||
| cls._data_types[data_type] = serializers | ||
| @classmethod | ||
| def RegisterDataTypes(cls, data_types): | ||
| """Registers data types. | ||
| Args: | ||
| data_types (dict[str: dict[str, AttributeSerializer]]): attribute | ||
| serializers with method per data types. | ||
| Raises: | ||
| KeyError: if the data type is already set. | ||
| """ | ||
| for data_type, serializers in data_types.items(): | ||
| cls.RegisterDataType(data_type, serializers) |
| # -*- coding: utf-8 -*- | ||
| """YAML-based attribute container definitions file.""" | ||
| import yaml | ||
| from acstore import errors | ||
| from acstore.containers import interface | ||
| from acstore.helpers import schema | ||
| # TODO: merge this into interface.AttributeContainer once Plaso has been | ||
| # changed to no longer support attributes containers without a schema. | ||
| class AttributeContainerWithSchema(interface.AttributeContainer): | ||
| """Attribute container with schema.""" | ||
| SCHEMA = {} | ||
| class YAMLAttributeContainerDefinitionsFile(object): | ||
| """YAML-based attribute container definitions file. | ||
| A YAML-based attribute container definitions file contains one or more | ||
| attribute container definitions. An attribute container definition consists | ||
| of: | ||
| name: 'windows_eventlog_message_file' | ||
| attributes: | ||
| - name: path | ||
| type: str | ||
| - name: windows_path | ||
| type: str | ||
| Where: | ||
| * name, unique identifier of the attribute container; | ||
| * attributes, defines the attributes of the container. | ||
| """ | ||
| _SUPPORTED_DATA_TYPES = frozenset([ | ||
| 'AttributeContainerIdentifier', | ||
| 'bool', | ||
| 'int', | ||
| 'str', | ||
| 'timestamp']) | ||
| _SUPPORTED_KEYS = frozenset([ | ||
| 'name', | ||
| 'attributes']) | ||
| def _ReadDefinition(self, definition_values): | ||
| """Reads a definition from a dictionary. | ||
| Args: | ||
| definition_values (dict[str, object]): attribute container definition | ||
| values. | ||
| Returns: | ||
| AttributeContainer: an attribute container. | ||
| Raises: | ||
| ParseError: if the definition is not set or incorrect. | ||
| """ | ||
| if not definition_values: | ||
| raise errors.ParseError('Missing attribute container definition values.') | ||
| different_keys = set(definition_values) - self._SUPPORTED_KEYS | ||
| if different_keys: | ||
| different_keys = ', '.join(different_keys) | ||
| raise errors.ParseError(f'Undefined keys: {different_keys:s}') | ||
| container_name = definition_values.get('name', None) | ||
| if not container_name: | ||
| raise errors.ParseError( | ||
| 'Invalid attribute container definition missing name.') | ||
| attributes = definition_values.get('attributes', None) | ||
| if not attributes: | ||
| raise errors.ParseError(( | ||
| f'Invalid attribute container definition: {container_name:s} ' | ||
| f'missing attributes.')) | ||
| class_name = ''.join([ | ||
| element.title() for element in container_name.split('_')]) | ||
| class_attributes = {'CONTAINER_TYPE': container_name} | ||
| container_schema = {} | ||
| for attribute_index, attribute_values in enumerate(attributes): | ||
| attribute_name = attribute_values.get('name', None) | ||
| if not attribute_name: | ||
| raise errors.ParseError(( | ||
| f'Invalid attribute container definition: {container_name:s} name ' | ||
| f'missing of attribute: {attribute_index:d}.')) | ||
| if attribute_name in class_attributes: | ||
| raise errors.ParseError(( | ||
| f'Invalid attribute container definition: {container_name:s} ' | ||
| f'attribute: {attribute_name:s} already set.')) | ||
| attribute_data_type = attribute_values.get('type', None) | ||
| if not attribute_data_type: | ||
| raise errors.ParseError(( | ||
| f'Invalid attribute container definition: {container_name:s} type ' | ||
| f'missing of attribute: {attribute_name:s}.')) | ||
| if not schema.SchemaHelper.HasDataType(attribute_data_type): | ||
| raise errors.ParseError(( | ||
| f'Invalid attribute container definition: {container_name:s} type ' | ||
| f'attribute: {attribute_name:s} unsupported data type: ' | ||
| f'{attribute_data_type:s}.')) | ||
| class_attributes[attribute_name] = None | ||
| container_schema[attribute_name] = attribute_data_type | ||
| class_attributes['SCHEMA'] = container_schema | ||
| # TODO: add support for _SERIALIZABLE_PROTECTED_ATTRIBUTES. | ||
| return type(class_name, (AttributeContainerWithSchema, ), class_attributes) | ||
| def _ReadFromFileObject(self, file_object): | ||
| """Reads the definitions from a file-like object. | ||
| Args: | ||
| file_object (file): definitions file-like object. | ||
| Yields: | ||
| AttributeContainer: an attribute container. | ||
| """ | ||
| yaml_generator = yaml.safe_load_all(file_object) | ||
| for yaml_definition in yaml_generator: | ||
| yield self._ReadDefinition(yaml_definition) | ||
| def ReadFromFile(self, path): | ||
| """Reads the definitions from a YAML file. | ||
| Args: | ||
| path (str): path to a definitions file. | ||
| Yields: | ||
| AttributeContainer: an attribute container. | ||
| """ | ||
| with open(path, 'r', encoding='utf-8') as file_object: | ||
| for yaml_definition in self._ReadFromFileObject(file_object): | ||
| yield yaml_definition |
| acstore.helpers package | ||
| ======================= | ||
| Submodules | ||
| ---------- | ||
| acstore.helpers.schema module | ||
| ----------------------------- | ||
| .. automodule:: acstore.helpers.schema | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| acstore.helpers.yaml\_definitions\_file module | ||
| ---------------------------------------------- | ||
| .. automodule:: acstore.helpers.yaml_definitions_file | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| Module contents | ||
| --------------- | ||
| .. automodule:: acstore.helpers | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: |
| # YAML-based attribute container definitions file. | ||
| --- | ||
| name: 'windows_eventlog_message_file' | ||
| attributes: | ||
| - name: 'path' | ||
| type: 'str' | ||
| - name: 'windows_path' | ||
| type: 'str' |
| # -*- coding: utf-8 -*- |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the schema helper.""" | ||
| import unittest | ||
| from acstore.helpers import schema | ||
| from tests import test_lib as shared_test_lib | ||
| class SchemaHelperTest(shared_test_lib.BaseTestCase): | ||
| """Tests for the schema helper.""" | ||
| # pylint: disable=protected-access | ||
| def testHasDataType(self): | ||
| """Tests the HasDataType function.""" | ||
| result = schema.SchemaHelper.HasDataType('str') | ||
| self.assertTrue(result) | ||
| result = schema.SchemaHelper.HasDataType('test') | ||
| self.assertFalse(result) | ||
| def testRegisterDataType(self): | ||
| """Tests the RegisterDataType function.""" | ||
| number_of_data_types = len(schema.SchemaHelper._data_types) | ||
| schema.SchemaHelper.RegisterDataType('test', {'json': None}) | ||
| try: | ||
| self.assertEqual( | ||
| len(schema.SchemaHelper._data_types), number_of_data_types + 1) | ||
| with self.assertRaises(KeyError): | ||
| schema.SchemaHelper.RegisterDataType('test', {'json': None}) | ||
| finally: | ||
| schema.SchemaHelper.DeregisterDataType('test') | ||
| self.assertEqual( | ||
| len(schema.SchemaHelper._data_types), number_of_data_types) | ||
| def testRegisterDataTypes(self): | ||
| """Tests the RegisterDataTypes function.""" | ||
| number_of_data_types = len(schema.SchemaHelper._data_types) | ||
| schema.SchemaHelper.RegisterDataTypes({'test': {'json': None}}) | ||
| try: | ||
| self.assertEqual( | ||
| len(schema.SchemaHelper._data_types), number_of_data_types + 1) | ||
| with self.assertRaises(KeyError): | ||
| schema.SchemaHelper.RegisterDataTypes({'test': {'json': None}}) | ||
| finally: | ||
| schema.SchemaHelper.DeregisterDataType('test') | ||
| self.assertEqual( | ||
| len(schema.SchemaHelper._data_types), number_of_data_types) | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the YAML-based attribute container definitions file.""" | ||
| import io | ||
| import unittest | ||
| from acstore import errors | ||
| from acstore.helpers import yaml_definitions_file | ||
| from tests import test_lib as shared_test_lib | ||
| class YAMLAttributeContainerDefinitionsFileTest(shared_test_lib.BaseTestCase): | ||
| """Tests for the YAML-based attribute container definitions file.""" | ||
| # pylint: disable=protected-access | ||
| _FORMATTERS_YAML = { | ||
| 'name': 'windows_eventlog_message_file', | ||
| 'attributes': [ | ||
| {'name': 'path', 'type': 'str'}, | ||
| {'name': 'windows_path', 'type': 'str'}]} | ||
| def testReadDefinition(self): | ||
| """Tests the _ReadDefinition function.""" | ||
| test_definitions_file = ( | ||
| yaml_definitions_file.YAMLAttributeContainerDefinitionsFile()) | ||
| container_class = test_definitions_file._ReadDefinition( | ||
| self._FORMATTERS_YAML) | ||
| self.assertIsNotNone(container_class) | ||
| self.assertEqual( | ||
| container_class.CONTAINER_TYPE, 'windows_eventlog_message_file') | ||
| self.assertEqual( | ||
| container_class.SCHEMA, {'path': 'str', 'windows_path': 'str'}) | ||
| with self.assertRaises(errors.ParseError): | ||
| test_definitions_file._ReadDefinition({}) | ||
| with self.assertRaises(errors.ParseError): | ||
| test_definitions_file._ReadDefinition({ | ||
| 'name': 'windows_eventlog_message_file', | ||
| 'attributes': []}) | ||
| with self.assertRaises(errors.ParseError): | ||
| test_definitions_file._ReadDefinition({ | ||
| 'name': 'windows_eventlog_message_file', | ||
| 'attributes': [{'type': 'str'}]}) | ||
| with self.assertRaises(errors.ParseError): | ||
| test_definitions_file._ReadDefinition({ | ||
| 'name': 'windows_eventlog_message_file', | ||
| 'attributes': [{'name': 'path'}]}) | ||
| with self.assertRaises(errors.ParseError): | ||
| test_definitions_file._ReadDefinition({ | ||
| 'name': 'windows_eventlog_message_file', | ||
| 'attributes': [{'name': 'path', 'type': 'bogus'}]}) | ||
| def testReadFromFileObject(self): | ||
| """Tests the _ReadFromFileObject function.""" | ||
| test_file_path = self._GetTestFilePath(['definitions.yaml']) | ||
| self._SkipIfPathNotExists(test_file_path) | ||
| test_definitions_file = ( | ||
| yaml_definitions_file.YAMLAttributeContainerDefinitionsFile()) | ||
| with io.open(test_file_path, 'r', encoding='utf-8') as file_object: | ||
| definitions = list(test_definitions_file._ReadFromFileObject(file_object)) | ||
| self.assertEqual(len(definitions), 1) | ||
| def testReadFromFile(self): | ||
| """Tests the ReadFromFile function.""" | ||
| test_file_path = self._GetTestFilePath(['definitions.yaml']) | ||
| self._SkipIfPathNotExists(test_file_path) | ||
| test_definitions_file = ( | ||
| yaml_definitions_file.YAMLAttributeContainerDefinitionsFile()) | ||
| definitions = list(test_definitions_file.ReadFromFile(test_file_path)) | ||
| self.assertEqual(len(definitions), 1) | ||
| self.assertEqual( | ||
| definitions[0].CONTAINER_TYPE, 'windows_eventlog_message_file') | ||
| if __name__ == '__main__': | ||
| unittest.main() |
@@ -21,3 +21,3 @@ # Run tests on Fedora and Ubuntu Docker images using GIFT CORP and GIFT PPA on commit | ||
| dnf copr -y enable @gift/dev | ||
| dnf install -y @development-tools python3 python3-devel python3-setuptools | ||
| dnf install -y @development-tools python3 python3-devel python3-pyyaml python3-setuptools | ||
| - name: Run tests | ||
@@ -62,3 +62,3 @@ env: | ||
| apt-get update -q | ||
| apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools | ||
| apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools python3-yaml | ||
| - name: Run tests | ||
@@ -65,0 +65,0 @@ env: |
@@ -39,3 +39,3 @@ # Run docs tox tests on Ubuntu Docker images using GIFT PPA | ||
| apt-get update -q | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools python3-yaml | ||
| - name: Install tox | ||
@@ -42,0 +42,0 @@ run: | |
@@ -49,3 +49,3 @@ # Run tox tests on Ubuntu Docker images using GIFT PPA | ||
| apt-get update -q | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools python3-yaml | ||
| - name: Install tox | ||
@@ -52,0 +52,0 @@ run: | |
| Metadata-Version: 2.1 | ||
| Name: acstore | ||
| Version: 20230226 | ||
| Version: 20230325 | ||
| Summary: Attribute Container Storage (ACStore). | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/log2timeline/acstore |
| pip>=7.0.0 | ||
| PyYAML>=3.10 |
@@ -22,2 +22,3 @@ .pylintrc | ||
| acstore/__init__.py | ||
| acstore/errors.py | ||
| acstore/fake_store.py | ||
@@ -35,2 +36,5 @@ acstore/interface.py | ||
| acstore/containers/manager.py | ||
| acstore/helpers/__init__.py | ||
| acstore/helpers/schema.py | ||
| acstore/helpers/yaml_definitions_file.py | ||
| config/appveyor/install.ps1 | ||
@@ -50,2 +54,3 @@ config/appveyor/install.sh | ||
| docs/sources/api/acstore.containers.rst | ||
| docs/sources/api/acstore.helpers.rst | ||
| docs/sources/api/acstore.rst | ||
@@ -55,2 +60,3 @@ docs/sources/api/modules.rst | ||
| docs/sources/user/index.rst | ||
| test_data/definitions.yaml | ||
| tests/__init__.py | ||
@@ -65,2 +71,5 @@ tests/fake_store.py | ||
| tests/containers/manager.py | ||
| tests/helpers/__init__.py | ||
| tests/helpers/schema.py | ||
| tests/helpers/yaml_definitions_file.py | ||
| utils/__init__.py | ||
@@ -67,0 +76,0 @@ utils/check_dependencies.py |
@@ -8,2 +8,2 @@ # -*- coding: utf-8 -*- | ||
| __version__ = '20230226' | ||
| __version__ = '20230325' |
@@ -44,3 +44,3 @@ # -*- coding: utf-8 -*- | ||
| if self.name is not None and self.sequence_number is not None: | ||
| return '{0:s}.{1:d}'.format(self.name, self.sequence_number) | ||
| return f'{self.name:s}.{self.sequence_number:d}' | ||
@@ -152,5 +152,3 @@ return None | ||
| attribute_string = '{0:s}: {1!s}'.format( | ||
| attribute_name, attribute_value) | ||
| attributes.append(attribute_string) | ||
| attributes.append(f'{attribute_name:s}: {attribute_value!s}') | ||
@@ -157,0 +155,0 @@ return ', '.join(attributes) |
+26
-0
@@ -10,2 +10,28 @@ # -*- coding: utf-8 -*- | ||
| class AttributeSerializer(object): | ||
| """Attribute serializer.""" | ||
| @abc.abstractmethod | ||
| def DeserializeValue(self, value): | ||
| """Deserializes a value. | ||
| Args: | ||
| value (object): serialized value. | ||
| Returns: | ||
| object: runtime value. | ||
| """ | ||
| @abc.abstractmethod | ||
| def SerializeValue(self, value): | ||
| """Serializes a value. | ||
| Args: | ||
| value (object): runtime value. | ||
| Returns: | ||
| object: serialized value. | ||
| """ | ||
| class AttributeContainerStore(object): | ||
@@ -12,0 +38,0 @@ """Interface of an attribute container store. |
@@ -101,11 +101,9 @@ # -*- coding: utf-8 -*- | ||
| sample = '{0:f}\t{1:s}\t{2:s}\t{3:s}\t{4:f}\t{5:d}\t{6:d}\n'.format( | ||
| sample_time, profile_name, operation, description, | ||
| processing_time, data_size, compressed_data_size) | ||
| self._WritesString(sample) | ||
| self._WritesString(( | ||
| f'{sample_time:f}\t{profile_name:s}\t{operation:s}\t{description:s}\t' | ||
| f'{processing_time:f}\t{data_size:d}\t{compressed_data_size:d}\n')) | ||
| def Start(self): | ||
| """Starts the profiler.""" | ||
| filename = '{0:s}-{1:s}.csv.gz'.format( | ||
| self._FILENAME_PREFIX, self._identifier) | ||
| filename = f'{self._FILENAME_PREFIX:s}-{self._identifier:s}.csv.gz' | ||
| if self._path: | ||
@@ -112,0 +110,0 @@ filename = os.path.join(self._path, filename) |
+141
-56
@@ -11,4 +11,5 @@ # -*- coding: utf-8 -*- | ||
| from acstore import interface | ||
| from acstore.containers import interface as containers_interface | ||
| from acstore import interface | ||
| from acstore.helpers import schema as schema_helper | ||
@@ -76,2 +77,94 @@ | ||
| class SQLiteSchemaHelper(object): | ||
| """SQLite schema helper.""" | ||
| _MAPPINGS = { | ||
| 'bool': 'INTEGER', | ||
| 'int': 'INTEGER', | ||
| 'str': 'TEXT', | ||
| 'timestamp': 'BIGINT'} | ||
| def GetStorageDataType(self, data_type): | ||
| """Retrieves the storage data type. | ||
| Args: | ||
| data_type (str): schema data type. | ||
| Returns: | ||
| str: corresponding SQLite data type. | ||
| """ | ||
| return self._MAPPINGS.get(data_type, 'TEXT') | ||
| def DeserializeValue(self, data_type, value): | ||
| """Deserializes a value. | ||
| Args: | ||
| data_type (str): schema data type. | ||
| value (object): serialized value. | ||
| Returns: | ||
| object: runtime value. | ||
| Raises: | ||
| IOError: if the schema data type is not supported. | ||
| OSError: if the schema data type is not supported. | ||
| """ | ||
| if not schema_helper.SchemaHelper.HasDataType(data_type): | ||
| raise IOError(f'Unsupported data type: {data_type:s}') | ||
| if value is not None: | ||
| if data_type == 'AttributeContainerIdentifier': | ||
| identifier = containers_interface.AttributeContainerIdentifier() | ||
| identifier.CopyFromString(value) | ||
| value = identifier | ||
| elif data_type == 'bool': | ||
| value = bool(value) | ||
| elif data_type not in self._MAPPINGS: | ||
| serializer = schema_helper.SchemaHelper.GetAttributeSerializer( | ||
| data_type, 'json') | ||
| value = serializer.DeserializeValue(value) | ||
| return value | ||
| def SerializeValue(self, data_type, value): | ||
| """Serializes a value. | ||
| Args: | ||
| data_type (str): schema data type. | ||
| value (object): runtime value. | ||
| Returns: | ||
| object: serialized value. | ||
| Raises: | ||
| IOError: if the schema data type is not supported. | ||
| OSError: if the schema data type is not supported. | ||
| """ | ||
| if not schema_helper.SchemaHelper.HasDataType(data_type): | ||
| raise IOError(f'Unsupported data type: {data_type:s}') | ||
| if value is not None: | ||
| if data_type == 'AttributeContainerIdentifier' and isinstance( | ||
| value, containers_interface.AttributeContainerIdentifier): | ||
| value = value.CopyToString() | ||
| elif data_type == 'bool': | ||
| value = int(value) | ||
| elif data_type not in self._MAPPINGS: | ||
| serializer = schema_helper.SchemaHelper.GetAttributeSerializer( | ||
| data_type, 'json') | ||
| # JSON will not serialize certain runtime types like set, therefore | ||
| # these are cast to list first. | ||
| if isinstance(value, set): | ||
| value = list(value) | ||
| return serializer.SerializeValue(value) | ||
| return value | ||
| class SQLiteAttributeContainerStore(interface.AttributeContainerStore): | ||
@@ -82,5 +175,6 @@ """SQLite-based attribute container store. | ||
| format_version (int): storage format version. | ||
| serialization_format (str): serialization format. | ||
| """ | ||
| _FORMAT_VERSION = 20221023 | ||
| _FORMAT_VERSION = 20230312 | ||
@@ -99,2 +193,3 @@ # The earliest format version, stored in-file, that this class | ||
| # TODO: kept for backwards compatibility. | ||
| _CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS = { | ||
@@ -130,5 +225,7 @@ 'AttributeContainerIdentifier': 'TEXT', | ||
| self._read_only = True | ||
| self._schema_helper = SQLiteSchemaHelper() | ||
| self._write_cache = {} | ||
| self.format_version = self._FORMAT_VERSION | ||
| self.serialization_format = 'json' | ||
@@ -177,4 +274,4 @@ def _CacheAttributeContainerByIndex(self, attribute_container, index): | ||
| Raises: | ||
| IOError: if the format version is not supported. | ||
| OSError: if the format version is not supported. | ||
| IOError: if the storage metadata is not supported. | ||
| OSError: if the storage metadata is not supported. | ||
| """ | ||
@@ -210,2 +307,8 @@ format_version = metadata_values.get('format_version', None) | ||
| serialization_format = metadata_values.get('serialization_format', None) | ||
| if serialization_format != 'json': | ||
| raise IOError( | ||
| f'Unsupported serialization format: {serialization_format!s}') | ||
| # Ensure format_version is an integer. | ||
| metadata_values['format_version'] = format_version | ||
@@ -242,6 +345,4 @@ | ||
| schema_to_sqlite_type_mappings = ( | ||
| self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS) | ||
| for name, data_type in sorted(schema.items()): | ||
| data_type = schema_to_sqlite_type_mappings.get(data_type, 'TEXT') | ||
| data_type = self._schema_helper.GetStorageDataType(data_type) | ||
| column_definitions.append(f'{name:s} {data_type:s}') | ||
@@ -286,22 +387,15 @@ | ||
| for column_index, name in enumerate(column_names): | ||
| attribute_value = row[first_column_index + column_index] | ||
| if attribute_value is None: | ||
| continue | ||
| row_value = row[first_column_index + column_index] | ||
| if row_value is not None: | ||
| data_type = schema[name] | ||
| try: | ||
| attribute_value = self._schema_helper.DeserializeValue( | ||
| data_type, row_value) | ||
| except IOError: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: {container_type:s} ' | ||
| f'attribute: {name:s} data type: {data_type:s}')) | ||
| data_type = schema[name] | ||
| if data_type == 'AttributeContainerIdentifier': | ||
| identifier = containers_interface.AttributeContainerIdentifier() | ||
| identifier.CopyFromString(attribute_value) | ||
| attribute_value = identifier | ||
| setattr(container, name, attribute_value) | ||
| elif data_type == 'bool': | ||
| attribute_value = bool(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: {container_type:s} ' | ||
| f'attribute: {name:s} data type: {data_type:s}')) | ||
| setattr(container, name, attribute_value) | ||
| return container | ||
@@ -518,2 +612,3 @@ | ||
| self.format_version = metadata_values['format_version'] | ||
| self.serialization_format = metadata_values['serialization_format'] | ||
@@ -586,18 +681,13 @@ def _ReadMetadata(self): | ||
| attribute_value = getattr(container, name, None) | ||
| if attribute_value is not None: | ||
| if data_type == 'AttributeContainerIdentifier' and isinstance( | ||
| attribute_value, containers_interface.AttributeContainerIdentifier): | ||
| attribute_value = attribute_value.CopyToString() | ||
| try: | ||
| row_value = self._schema_helper.SerializeValue( | ||
| data_type, attribute_value) | ||
| except IOError: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| elif data_type == 'bool': | ||
| attribute_value = int(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| column_names.append(f'{name:s} = ?') | ||
| values.append(attribute_value) | ||
| values.append(row_value) | ||
@@ -639,2 +729,3 @@ column_names_string = ', '.join(column_names) | ||
| self._WriteMetadataValue('format_version', f'{self._FORMAT_VERSION:d}') | ||
| self._WriteMetadataValue('serialization_format', self.serialization_format) | ||
@@ -690,25 +781,19 @@ def _WriteMetadataValue(self, key, value): | ||
| column_names = [] | ||
| values = [] | ||
| row_values = [] | ||
| for name, data_type in sorted(schema.items()): | ||
| attribute_value = getattr(container, name, None) | ||
| if attribute_value is not None: | ||
| if data_type == 'AttributeContainerIdentifier' and isinstance( | ||
| attribute_value, | ||
| containers_interface.AttributeContainerIdentifier): | ||
| attribute_value = attribute_value.CopyToString() | ||
| try: | ||
| row_value = self._schema_helper.SerializeValue( | ||
| data_type, attribute_value) | ||
| except IOError: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| elif data_type == 'bool': | ||
| attribute_value = int(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| column_names.append(name) | ||
| values.append(attribute_value) | ||
| row_values.append(row_value) | ||
| self._CacheAttributeContainerForWrite( | ||
| container.CONTAINER_TYPE, column_names, values) | ||
| container.CONTAINER_TYPE, column_names, row_values) | ||
@@ -715,0 +800,0 @@ self._CacheAttributeContainerByIndex(container, next_sequence_number - 1) |
| # Script to set up tests on AppVeyor Windows. | ||
| $Dependencies = "" | ||
| $Dependencies = "PyYAML" | ||
| $Dependencies = ${Dependencies} -split " " | ||
@@ -5,0 +5,0 @@ |
@@ -11,2 +11,3 @@ acstore package | ||
| acstore.containers | ||
| acstore.helpers | ||
@@ -16,2 +17,10 @@ Submodules | ||
| acstore.errors module | ||
| --------------------- | ||
| .. automodule:: acstore.errors | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| acstore.fake\_store module | ||
@@ -18,0 +27,0 @@ -------------------------- |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: acstore | ||
| Version: 20230226 | ||
| Version: 20230325 | ||
| Summary: Attribute Container Storage (ACStore). | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/log2timeline/acstore |
+1
-0
| pip >= 7.0.0 | ||
| PyYAML >= 3.10 |
+1
-0
@@ -12,2 +12,3 @@ [metadata] | ||
| build_requires = python3-setuptools | ||
| requires = python3-pyyaml >= 3.10 | ||
@@ -14,0 +15,0 @@ [bdist_wheel] |
@@ -37,2 +37,69 @@ #!/usr/bin/env python3 | ||
| class SQLiteSchemaHelperTest(test_lib.BaseTestCase): | ||
| """Tests for the SQLite schema helper.""" | ||
| # pylint: disable=protected-access | ||
| def testGetStorageDataType(self): | ||
| """Tests the GetStorageDataType function.""" | ||
| schema_helper = sqlite_store.SQLiteSchemaHelper() | ||
| data_type = schema_helper.GetStorageDataType('bool') | ||
| self.assertEqual(data_type, 'INTEGER') | ||
| data_type = schema_helper.GetStorageDataType('int') | ||
| self.assertEqual(data_type, 'INTEGER') | ||
| data_type = schema_helper.GetStorageDataType('str') | ||
| self.assertEqual(data_type, 'TEXT') | ||
| data_type = schema_helper.GetStorageDataType('timestamp') | ||
| self.assertEqual(data_type, 'BIGINT') | ||
| data_type = schema_helper.GetStorageDataType('AttributeContainerIdentifier') | ||
| self.assertEqual(data_type, 'TEXT') | ||
| def testDeserializeValue(self): | ||
| """Tests the DeserializeValue function.""" | ||
| schema_helper = sqlite_store.SQLiteSchemaHelper() | ||
| value = schema_helper.DeserializeValue('bool', 0) | ||
| self.assertFalse(value) | ||
| value = schema_helper.DeserializeValue('bool', 1) | ||
| self.assertTrue(value) | ||
| value = schema_helper.DeserializeValue('int', 1) | ||
| self.assertEqual(value, 1) | ||
| value = schema_helper.DeserializeValue('str', 'one') | ||
| self.assertEqual(value, 'one') | ||
| value = schema_helper.DeserializeValue('timestamp', 1) | ||
| self.assertEqual(value, 1) | ||
| # TODO: add test for AttributeContainerIdentifier | ||
| def testSerializeValue(self): | ||
| """Tests the SerializeValue function.""" | ||
| schema_helper = sqlite_store.SQLiteSchemaHelper() | ||
| value = schema_helper.SerializeValue('bool', False) | ||
| self.assertEqual(value, 0) | ||
| value = schema_helper.SerializeValue('bool', True) | ||
| self.assertEqual(value, 1) | ||
| value = schema_helper.SerializeValue('int', 1) | ||
| self.assertEqual(value, 1) | ||
| value = schema_helper.SerializeValue('str', 'one') | ||
| self.assertEqual(value, 'one') | ||
| value = schema_helper.SerializeValue('timestamp', 1) | ||
| self.assertEqual(value, 1) | ||
| # TODO: add test for AttributeContainerIdentifier | ||
| class SQLiteAttributeContainerStoreTest(test_lib.BaseTestCase): | ||
@@ -71,3 +138,4 @@ """Tests for the SQLite-based storage file object.""" | ||
| metadata_values = { | ||
| 'format_version': '{0:d}'.format(test_store._FORMAT_VERSION)} | ||
| 'format_version': f'{test_store._FORMAT_VERSION:d}', | ||
| 'serialization_format': 'json'} | ||
| test_store._CheckStorageMetadata(metadata_values) | ||
@@ -83,5 +151,9 @@ | ||
| metadata_values['format_version'] = '{0:d}'.format( | ||
| test_store._FORMAT_VERSION) | ||
| metadata_values['format_version'] = f'{test_store._FORMAT_VERSION:d}' | ||
| metadata_values['serialization_format'] = 'bogus' | ||
| with self.assertRaises(IOError): | ||
| test_store._CheckStorageMetadata(metadata_values) | ||
| metadata_values['serialization_format'] = 'json' | ||
| def testCreateAttributeContainerTable(self): | ||
@@ -404,3 +476,3 @@ """Tests the _CreateAttributeContainerTable function.""" | ||
| # present in the storage file. | ||
| query = 'DROP TABLE {0:s}'.format(attribute_container.CONTAINER_TYPE) | ||
| query = f'DROP TABLE {attribute_container.CONTAINER_TYPE:s}' | ||
| test_store._cursor.execute(query) | ||
@@ -407,0 +479,0 @@ number_of_containers = test_store.GetNumberOfAttributeContainers( |
+36
-0
| # -*- coding: utf-8 -*- | ||
| """Functions and classes for testing.""" | ||
| import os | ||
| import shutil | ||
@@ -11,2 +12,11 @@ import tempfile | ||
| # The path to top of the dfWinReg source tree. | ||
| PROJECT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | ||
| # The paths below are all derived from the project path directory. | ||
| # They are enumerated explicitly here so that they can be overwritten for | ||
| # compatibility with different build systems. | ||
| TEST_DATA_PATH = os.path.join(PROJECT_PATH, 'test_data') | ||
| class TestAttributeContainer(containers_interface.AttributeContainer): | ||
@@ -35,3 +45,29 @@ """Attribute container for testing purposes. | ||
| def _GetTestFilePath(self, path_segments): | ||
| """Retrieves the path of a test file relative to the test data directory. | ||
| Args: | ||
| path_segments (list[str]): path segments inside the test data directory. | ||
| Returns: | ||
| str: path of the test file. | ||
| """ | ||
| # Note that we need to pass the individual path segments to os.path.join | ||
| # and not a list. | ||
| return os.path.join(TEST_DATA_PATH, *path_segments) | ||
| def _SkipIfPathNotExists(self, path): | ||
| """Skips the test if the path does not exist. | ||
| Args: | ||
| path (str): path of a test file. | ||
| Raises: | ||
| SkipTest: if the path does not exist and the test should be skipped. | ||
| """ | ||
| if not os.path.exists(path): | ||
| filename = os.path.basename(path) | ||
| raise unittest.SkipTest(f'missing test file: {filename:s}') | ||
| class TempDirectory(object): | ||
@@ -38,0 +74,0 @@ """Class that implements a temporary directory.""" |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
199833
10.7%74
13.85%3357
15.4%