acstore
Advanced tools
| # Run tests on Fedora and Ubuntu Docker images using GIFT CORP and GIFT PPA on commit | ||
| name: test_docker | ||
| on: [push] | ||
| permissions: read-all | ||
| jobs: | ||
| test_fedora: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| version: ['37'] | ||
| container: | ||
| image: registry.fedoraproject.org/fedora:${{ matrix.version }} | ||
| steps: | ||
| - uses: actions/checkout@v2 | ||
| - name: Set up container | ||
| run: | | ||
| dnf install -y dnf-plugins-core langpacks-en | ||
| - name: Install dependencies | ||
| run: | | ||
| dnf copr -y enable @gift/dev | ||
| dnf install -y @development-tools python3 python3-devel python3-setuptools | ||
| - name: Run tests | ||
| env: | ||
| LANG: C.utf8 | ||
| run: | | ||
| python3 ./run_tests.py | ||
| - name: Run end-to-end tests | ||
| run: | | ||
| if test -f tests/end-to-end.py; then PYTHONPATH=. python3 ./tests/end-to-end.py --debug -c config/end-to-end.ini; fi | ||
| - name: Build source distribution | ||
| run: | | ||
| python3 ./setup.py sdist | ||
| - name: Build binary distribution | ||
| run: | | ||
| python3 ./setup.py bdist | ||
| - name: Run build and install test | ||
| run: | | ||
| python3 ./setup.py build | ||
| python3 ./setup.py install | ||
| test_ubuntu: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| version: ['22.04'] | ||
| container: | ||
| image: ubuntu:${{ matrix.version }} | ||
| steps: | ||
| - uses: actions/checkout@v2 | ||
| - name: Set up container | ||
| env: | ||
| DEBIAN_FRONTEND: noninteractive | ||
| run: | | ||
| apt-get update -q | ||
| apt-get install -y libterm-readline-gnu-perl locales software-properties-common | ||
| locale-gen en_US.UTF-8 | ||
| ln -f -s /usr/share/zoneinfo/UTC /etc/localtime | ||
| - name: Install dependencies | ||
| run: | | ||
| add-apt-repository -y ppa:gift/dev | ||
| apt-get update -q | ||
| apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools | ||
| - name: Run tests | ||
| env: | ||
| LANG: en_US.UTF-8 | ||
| run: | | ||
| python3 ./run_tests.py | ||
| - name: Run end-to-end tests | ||
| env: | ||
| LANG: en_US.UTF-8 | ||
| run: | | ||
| if test -f tests/end-to-end.py; then PYTHONPATH=. python3 ./tests/end-to-end.py --debug -c config/end-to-end.ini; fi | ||
| - name: Build source distribution | ||
| run: | | ||
| python3 ./setup.py sdist | ||
| - name: Build binary distribution | ||
| run: | | ||
| python3 ./setup.py bdist | ||
| - name: Run build and install test | ||
| run: | | ||
| python3 ./setup.py build | ||
| python3 ./setup.py install |
| # Run docs tox tests on Ubuntu Docker images using GIFT PPA | ||
| name: test_docs | ||
| on: | ||
| pull_request: | ||
| branches: | ||
| - main | ||
| push: | ||
| branches: | ||
| - main | ||
| permissions: read-all | ||
| jobs: | ||
| build: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| include: | ||
| - python-version: '3.8' | ||
| toxenv: 'docs' | ||
| container: | ||
| image: ubuntu:22.04 | ||
| steps: | ||
| - uses: actions/checkout@v2 | ||
| - name: Set up container | ||
| env: | ||
| DEBIAN_FRONTEND: noninteractive | ||
| run: | | ||
| apt-get update -q | ||
| apt-get install -y libterm-readline-gnu-perl locales software-properties-common | ||
| locale-gen en_US.UTF-8 | ||
| ln -f -s /usr/share/zoneinfo/UTC /etc/localtime | ||
| - name: Install dependencies | ||
| env: | ||
| DEBIAN_FRONTEND: noninteractive | ||
| run: | | ||
| add-apt-repository -y universe | ||
| add-apt-repository -y ppa:deadsnakes/ppa | ||
| add-apt-repository -y ppa:gift/dev | ||
| apt-get update -q | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools | ||
| - name: Install tox | ||
| run: | | ||
| python3 -m pip install tox | ||
| - name: Run tests | ||
| env: | ||
| LANG: en_US.UTF-8 | ||
| run: | | ||
| tox -e${{ matrix.toxenv }} |
| # Run tox tests on Ubuntu Docker images using GIFT PPA | ||
| name: test_tox | ||
| on: | ||
| pull_request: | ||
| branches: | ||
| - main | ||
| push: | ||
| branches: | ||
| - main | ||
| permissions: read-all | ||
| jobs: | ||
| build: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| include: | ||
| - python-version: '3.7' | ||
| toxenv: 'py37' | ||
| - python-version: '3.8' | ||
| toxenv: 'py38,coverage,codecov' | ||
| - python-version: '3.9' | ||
| toxenv: 'py39' | ||
| - python-version: '3.10' | ||
| toxenv: 'py310' | ||
| - python-version: '3.11' | ||
| toxenv: 'py311' | ||
| - python-version: '3.11' | ||
| toxenv: 'lint' | ||
| container: | ||
| image: ubuntu:22.04 | ||
| steps: | ||
| - uses: actions/checkout@v2 | ||
| - name: Set up container | ||
| env: | ||
| DEBIAN_FRONTEND: noninteractive | ||
| run: | | ||
| apt-get update -q | ||
| apt-get install -y libterm-readline-gnu-perl locales software-properties-common | ||
| locale-gen en_US.UTF-8 | ||
| ln -f -s /usr/share/zoneinfo/UTC /etc/localtime | ||
| - name: Install dependencies | ||
| env: | ||
| DEBIAN_FRONTEND: noninteractive | ||
| run: | | ||
| add-apt-repository -y universe | ||
| add-apt-repository -y ppa:deadsnakes/ppa | ||
| add-apt-repository -y ppa:gift/dev | ||
| apt-get update -q | ||
| apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools | ||
| - name: Install tox | ||
| run: | | ||
| python3 -m pip install tox | ||
| - name: Run tests | ||
| env: | ||
| LANG: en_US.UTF-8 | ||
| run: | | ||
| tox -e${{ matrix.toxenv }} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
| pip>=7.0.0 |
| [project] | ||
| name: acstore | ||
| name_description: ACStore | ||
| maintainer: Log2Timeline maintainers <log2timeline-maintainers@googlegroups.com> | ||
| homepage_url: https://github.com/log2timeline/acstore | ||
| description_short: Attribute Container Storage (ACStore). | ||
| description_long: ACStore, or Attribute Container Storage, provides a stand-alone | ||
| implementation to read and write attribute container storage files. |
| # -*- coding: utf-8 -*- |
| # -*- coding: utf-8 -*- | ||
| """The attribute container interface.""" | ||
| class AttributeContainerIdentifier(object): | ||
| """The attribute container identifier. | ||
| The identifier is used to uniquely identify attribute containers. | ||
| The value should be unique relative to an attribute container store. | ||
| Attributes: | ||
| name (str): name of the table (attribute container). | ||
| sequence_number (int): sequence number of the attribute container. | ||
| """ | ||
| def __init__(self, name=None, sequence_number=None): | ||
| """Initializes an attribute container identifier. | ||
| Args: | ||
| name (Optional[str]): name of the table (attribute container). | ||
| sequence_number (Optional[int]): sequence number of the attribute | ||
| container. | ||
| """ | ||
| super(AttributeContainerIdentifier, self).__init__() | ||
| self.name = name | ||
| self.sequence_number = sequence_number | ||
| def CopyFromString(self, identifier_string): | ||
| """Copies the identifier from a string representation. | ||
| Args: | ||
| identifier_string (str): string representation. | ||
| """ | ||
| self.name, sequence_number = identifier_string.split('.') | ||
| self.sequence_number = int(sequence_number, 10) | ||
| def CopyToString(self): | ||
| """Copies the identifier to a string representation. | ||
| Returns: | ||
| str: unique identifier or None. | ||
| """ | ||
| if self.name is not None and self.sequence_number is not None: | ||
| return '{0:s}.{1:d}'.format(self.name, self.sequence_number) | ||
| return None | ||
| class AttributeContainer(object): | ||
| """The attribute container interface. | ||
| This is the the base class for those object that exists primarily as | ||
| a container of attributes with basic accessors and mutators. | ||
| The CONTAINER_TYPE class attribute contains a string that identifies | ||
| the container type, for example the container type "event" identifiers | ||
| an event object. | ||
| Attributes are public class members of an serializable type. Protected and | ||
| private class members are not to be serialized, with the exception of those | ||
| defined in _SERIALIZABLE_PROTECTED_ATTRIBUTES. | ||
| """ | ||
| CONTAINER_TYPE = None | ||
| # Names of protected attributes, those with a leading underscore, that | ||
| # should be serialized. | ||
| _SERIALIZABLE_PROTECTED_ATTRIBUTES = [] | ||
| def __init__(self): | ||
| """Initializes an attribute container.""" | ||
| super(AttributeContainer, self).__init__() | ||
| self._identifier = AttributeContainerIdentifier( | ||
| name=self.CONTAINER_TYPE, sequence_number=id(self)) | ||
| def CopyFromDict(self, attributes): | ||
| """Copies the attribute container from a dictionary. | ||
| Args: | ||
| attributes (dict[str, object]): attribute values per name. | ||
| """ | ||
| for attribute_name, attribute_value in attributes.items(): | ||
| # Not using startswith to improve performance. | ||
| if (attribute_name[0] != '_' or | ||
| attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES): | ||
| self.__dict__[attribute_name] = attribute_value | ||
| def CopyToDict(self): | ||
| """Copies the attribute container to a dictionary. | ||
| Returns: | ||
| dict[str, object]: attribute values per name. | ||
| """ | ||
| return dict(self.GetAttributes()) | ||
| def GetAttributeNames(self): | ||
| """Retrieves the names of all attributes. | ||
| Returns: | ||
| list[str]: attribute names. | ||
| """ | ||
| attribute_names = list(self._SERIALIZABLE_PROTECTED_ATTRIBUTES) | ||
| for attribute_name in self.__dict__: | ||
| # Not using startswith to improve performance. | ||
| if attribute_name[0] != '_': | ||
| attribute_names.append(attribute_name) | ||
| return attribute_names | ||
| def GetAttributes(self): | ||
| """Retrieves the attribute names and values. | ||
| Attributes that are set to None are ignored. | ||
| Yields: | ||
| tuple[str, object]: attribute name and value. | ||
| """ | ||
| for attribute_name, attribute_value in self.__dict__.items(): | ||
| # Not using startswith to improve performance. | ||
| if attribute_value is not None and ( | ||
| attribute_name[0] != '_' or | ||
| attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES): | ||
| yield attribute_name, attribute_value | ||
| def GetAttributeValuesHash(self): | ||
| """Retrieves a comparable string of the attribute values. | ||
| Returns: | ||
| int: hash of comparable string of the attribute values. | ||
| """ | ||
| return hash(self.GetAttributeValuesString()) | ||
| def GetAttributeValuesString(self): | ||
| """Retrieves a comparable string of the attribute values. | ||
| Returns: | ||
| str: comparable string of the attribute values. | ||
| """ | ||
| attributes = [] | ||
| for attribute_name, attribute_value in sorted(self.__dict__.items()): | ||
| # Not using startswith to improve performance. | ||
| if attribute_value is not None and ( | ||
| attribute_name[0] != '_' or | ||
| attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES): | ||
| if isinstance(attribute_value, dict): | ||
| attribute_value = sorted(attribute_value.items()) | ||
| elif isinstance(attribute_value, bytes): | ||
| attribute_value = repr(attribute_value) | ||
| attributes.append(f'{attribute_name:s}: {attribute_value!s}') | ||
| return ', '.join(attributes) | ||
| def GetIdentifier(self): | ||
| """Retrieves the identifier. | ||
| The identifier is a storage specific value that should not be serialized. | ||
| Returns: | ||
| AttributeContainerIdentifier: an unique identifier for the container. | ||
| """ | ||
| return self._identifier | ||
| def MatchesExpression(self, expression): | ||
| """Determines if an attribute container matches the expression. | ||
| Args: | ||
| expression (code|str): expression. | ||
| Returns: | ||
| bool: True if the attribute container matches the expression, False | ||
| otherwise. | ||
| """ | ||
| result = not expression | ||
| if expression: | ||
| namespace = dict(self.GetAttributes()) | ||
| # Make sure __builtins__ contains an empty dictionary. | ||
| namespace['__builtins__'] = {} | ||
| try: | ||
| result = eval(expression, namespace) # pylint: disable=eval-used | ||
| except Exception: # pylint: disable=broad-except | ||
| pass | ||
| return result | ||
| def SetIdentifier(self, identifier): | ||
| """Sets the identifier. | ||
| The identifier is a storage specific value that should not be serialized. | ||
| Args: | ||
| identifier (AttributeContainerIdentifier): identifier. | ||
| """ | ||
| self._identifier = identifier |
| # -*- coding: utf-8 -*- | ||
| """This file contains the attribute container manager class.""" | ||
| class AttributeContainersManager(object): | ||
| """Class that implements the attribute container manager.""" | ||
| _attribute_container_classes = {} | ||
| @classmethod | ||
| def CreateAttributeContainer(cls, container_type): | ||
| """Creates an instance of a specific attribute container type. | ||
| Args: | ||
| container_type (str): container type. | ||
| Returns: | ||
| AttributeContainer: an instance of attribute container. | ||
| Raises: | ||
| ValueError: if the container type is not supported. | ||
| """ | ||
| container_class = cls._attribute_container_classes.get( | ||
| container_type, None) | ||
| if not container_class: | ||
| raise ValueError(f'Unsupported container type: {container_type:s}') | ||
| return container_class() | ||
| @classmethod | ||
| def DeregisterAttributeContainer(cls, attribute_container_class): | ||
| """Deregisters an attribute container class. | ||
| The attribute container classes are identified based on their lower case | ||
| container type. | ||
| Args: | ||
| attribute_container_class (type): attribute container class. | ||
| Raises: | ||
| KeyError: if attribute container class is not set for the corresponding | ||
| container type. | ||
| """ | ||
| container_type = attribute_container_class.CONTAINER_TYPE.lower() | ||
| if container_type not in cls._attribute_container_classes: | ||
| raise KeyError(( | ||
| f'Attribute container class not set for container type: ' | ||
| f'{attribute_container_class.CONTAINER_TYPE:s}.')) | ||
| del cls._attribute_container_classes[container_type] | ||
| @classmethod | ||
| def GetContainerTypes(cls): | ||
| """Retrieves the container types of the registered attribute containers. | ||
| Returns: | ||
| list[str]: container types. | ||
| """ | ||
| return list(cls._attribute_container_classes.keys()) | ||
| @classmethod | ||
| def GetSchema(cls, container_type): | ||
| """Retrieves the schema of a registered attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| dict[str, str]: attribute container schema or an empty dictionary if | ||
| no schema available. | ||
| Raises: | ||
| ValueError: if the container type is not supported. | ||
| """ | ||
| container_class = cls._attribute_container_classes.get( | ||
| container_type, None) | ||
| if not container_class: | ||
| raise ValueError(f'Unsupported container type: {container_type:s}') | ||
| return getattr(container_class, 'SCHEMA', {}) | ||
| @classmethod | ||
| def RegisterAttributeContainer(cls, attribute_container_class): | ||
| """Registers a attribute container class. | ||
| The attribute container classes are identified based on their lower case | ||
| container type. | ||
| Args: | ||
| attribute_container_class (type): attribute container class. | ||
| Raises: | ||
| KeyError: if attribute container class is already set for the | ||
| corresponding container type. | ||
| """ | ||
| container_type = attribute_container_class.CONTAINER_TYPE.lower() | ||
| if container_type in cls._attribute_container_classes: | ||
| raise KeyError(( | ||
| f'Attribute container class already set for container type: ' | ||
| f'{attribute_container_class.CONTAINER_TYPE:s}.')) | ||
| cls._attribute_container_classes[container_type] = attribute_container_class | ||
| @classmethod | ||
| def RegisterAttributeContainers(cls, attribute_container_classes): | ||
| """Registers attribute container classes. | ||
| The attribute container classes are identified based on their lower case | ||
| container type. | ||
| Args: | ||
| attribute_container_classes (list[type]): attribute container classes. | ||
| Raises: | ||
| KeyError: if attribute container class is already set for the | ||
| corresponding container type. | ||
| """ | ||
| for attribute_container_class in attribute_container_classes: | ||
| cls.RegisterAttributeContainer(attribute_container_class) |
| # -*- coding: utf-8 -*- | ||
| """Fake (in-memory only) attribute container store for testing.""" | ||
| import ast | ||
| import collections | ||
| import copy | ||
| import itertools | ||
| from acstore import interface | ||
| from acstore.containers import interface as containers_interface | ||
| class FakeAttributeContainerStore(interface.AttributeContainerStore): | ||
| """Fake (in-memory only) attribute container store.""" | ||
| def __init__(self): | ||
| """Initializes a fake (in-memory only) store.""" | ||
| super(FakeAttributeContainerStore, self).__init__() | ||
| self._attribute_containers = {} | ||
| self._is_open = False | ||
| def _RaiseIfNotReadable(self): | ||
| """Raises if the store is not readable. | ||
| Raises: | ||
| OSError: if the store cannot be read from. | ||
| IOError: if the store cannot be read from. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Unable to read from closed storage writer.') | ||
| def _RaiseIfNotWritable(self): | ||
| """Raises if the storage file is not writable. | ||
| Raises: | ||
| IOError: when the storage writer is closed. | ||
| OSError: when the storage writer is closed. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Unable to write to closed storage writer.') | ||
| def _WriteExistingAttributeContainer(self, container): | ||
| """Writes an existing attribute container to the store. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| Raises: | ||
| IOError: if an unsupported identifier is provided or if the attribute | ||
| container does not exist. | ||
| OSError: if an unsupported identifier is provided or if the attribute | ||
| container does not exist. | ||
| """ | ||
| identifier = container.GetIdentifier() | ||
| lookup_key = identifier.CopyToString() | ||
| containers = self._attribute_containers.get(container.CONTAINER_TYPE, None) | ||
| if containers is None or lookup_key not in containers: | ||
| raise IOError(( | ||
| f'Missing attribute container: {container.CONTAINER_TYPE:s} with ' | ||
| f'identifier: {lookup_key:s}')) | ||
| containers[lookup_key] = container | ||
| def _WriteNewAttributeContainer(self, container): | ||
| """Writes a new attribute container to the store. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| """ | ||
| containers = self._attribute_containers.get(container.CONTAINER_TYPE, None) | ||
| if containers is None: | ||
| containers = collections.OrderedDict() | ||
| self._attribute_containers[container.CONTAINER_TYPE] = containers | ||
| next_sequence_number = self._GetAttributeContainerNextSequenceNumber( | ||
| container.CONTAINER_TYPE) | ||
| identifier = containers_interface.AttributeContainerIdentifier( | ||
| name=container.CONTAINER_TYPE, sequence_number=next_sequence_number) | ||
| container.SetIdentifier(identifier) | ||
| lookup_key = identifier.CopyToString() | ||
| # Make sure the fake storage preserves the state of the attribute container. | ||
| container = copy.deepcopy(container) | ||
| containers[lookup_key] = container | ||
| def Close(self): | ||
| """Closes the store. | ||
| Raises: | ||
| IOError: if the store is already closed. | ||
| OSError: if the store is already closed. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Store already closed.') | ||
| self._is_open = False | ||
| def GetAttributeContainerByIdentifier(self, container_type, identifier): | ||
| """Retrieves a specific type of container with a specific identifier. | ||
| Args: | ||
| container_type (str): container type. | ||
| identifier (AttributeContainerIdentifier): attribute container identifier. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| """ | ||
| containers = self._attribute_containers.get(container_type, {}) | ||
| lookup_key = identifier.CopyToString() | ||
| return containers.get(lookup_key, None) | ||
| def GetAttributeContainerByIndex(self, container_type, index): | ||
| """Retrieves a specific attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| index (int): attribute container index. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| """ | ||
| containers = self._attribute_containers.get(container_type, {}) | ||
| number_of_containers = len(containers) | ||
| if index < 0 or index >= number_of_containers: | ||
| return None | ||
| return next(itertools.islice( | ||
| containers.values(), index, number_of_containers)) | ||
| def GetAttributeContainers(self, container_type, filter_expression=None): | ||
| """Retrieves a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| filter_expression (Optional[str]): expression to filter the resulting | ||
| attribute containers by. | ||
| Yield: | ||
| AttributeContainer: attribute container. | ||
| """ | ||
| if filter_expression: | ||
| expression_ast = ast.parse(filter_expression, mode='eval') | ||
| filter_expression = compile(expression_ast, '<string>', mode='eval') | ||
| for attribute_container in self._attribute_containers.get( | ||
| container_type, {}).values(): | ||
| if attribute_container.MatchesExpression(filter_expression): | ||
| yield attribute_container | ||
| def GetNumberOfAttributeContainers(self, container_type): | ||
| """Retrieves the number of a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| int: the number of containers of a specified type. | ||
| """ | ||
| containers = self._attribute_containers.get(container_type, {}) | ||
| return len(containers) | ||
| def HasAttributeContainers(self, container_type): | ||
| """Determines if a store contains a specific type of attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| bool: True if the store contains the specified type of attribute | ||
| containers. | ||
| """ | ||
| containers = self._attribute_containers.get(container_type, {}) | ||
| return bool(containers) | ||
| def Open(self, **kwargs): | ||
| """Opens the store. | ||
| Raises: | ||
| IOError: if the store is already opened. | ||
| OSError: if the store is already opened. | ||
| """ | ||
| if self._is_open: | ||
| raise IOError('Store already opened.') | ||
| self._is_open = True |
| # -*- coding: utf-8 -*- | ||
| """The attribute container store interface.""" | ||
| import abc | ||
| import collections | ||
| from acstore.containers import manager as containers_manager | ||
| class AttributeContainerStore(object): | ||
| """Interface of an attribute container store. | ||
| Attributes: | ||
| format_version (int): storage format version. | ||
| serialization_format (str): serialization format. | ||
| """ | ||
| def __init__(self): | ||
| """Initializes an attribute container store.""" | ||
| super(AttributeContainerStore, self).__init__() | ||
| self._attribute_container_sequence_numbers = collections.Counter() | ||
| self._containers_manager = containers_manager.AttributeContainersManager | ||
| self._storage_profiler = None | ||
| self.format_version = None | ||
| self.serialization_format = None | ||
| def _GetAttributeContainerNextSequenceNumber(self, container_type): | ||
| """Retrieves the next sequence number of an attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| int: next sequence number. | ||
| """ | ||
| self._attribute_container_sequence_numbers[container_type] += 1 | ||
| return self._attribute_container_sequence_numbers[container_type] | ||
| def _GetAttributeContainerSchema(self, container_type): | ||
| """Retrieves the schema of an attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| dict[str, str]: attribute container schema or an empty dictionary if | ||
| no schema available. | ||
| """ | ||
| try: | ||
| schema = self._containers_manager.GetSchema(container_type) | ||
| except ValueError: | ||
| schema = {} | ||
| return schema | ||
| @abc.abstractmethod | ||
| def _RaiseIfNotReadable(self): | ||
| """Raises if the store is not readable. | ||
| Raises: | ||
| OSError: if the store cannot be read from. | ||
| IOError: if the store cannot be read from. | ||
| """ | ||
| @abc.abstractmethod | ||
| def _RaiseIfNotWritable(self): | ||
| """Raises if the store is not writable. | ||
| Raises: | ||
| OSError: if the store cannot be written to. | ||
| IOError: if the store cannot be written to. | ||
| """ | ||
| def _SetAttributeContainerNextSequenceNumber( | ||
| self, container_type, next_sequence_number): | ||
| """Sets the next sequence number of an attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| next_sequence_number (int): next sequence number. | ||
| """ | ||
| self._attribute_container_sequence_numbers[ | ||
| container_type] = next_sequence_number | ||
| @abc.abstractmethod | ||
| def _WriteNewAttributeContainer(self, container): | ||
| """Writes a new attribute container to the store. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| """ | ||
| @abc.abstractmethod | ||
| def _WriteExistingAttributeContainer(self, container): | ||
| """Writes an existing attribute container to the store. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| """ | ||
| def AddAttributeContainer(self, container): | ||
| """Adds a new attribute container. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| Raises: | ||
| OSError: if the store cannot be written to. | ||
| IOError: if the store cannot be written to. | ||
| """ | ||
| self._RaiseIfNotWritable() | ||
| self._WriteNewAttributeContainer(container) | ||
| @abc.abstractmethod | ||
| def Close(self): | ||
| """Closes the store.""" | ||
| @abc.abstractmethod | ||
| def GetAttributeContainerByIdentifier(self, container_type, identifier): | ||
| """Retrieves a specific type of container with a specific identifier. | ||
| Args: | ||
| container_type (str): container type. | ||
| identifier (AttributeContainerIdentifier): attribute container identifier. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| Raises: | ||
| IOError: when the store is closed or if an unsupported identifier is | ||
| provided. | ||
| OSError: when the store is closed or if an unsupported identifier is | ||
| provided. | ||
| """ | ||
| @abc.abstractmethod | ||
| def GetAttributeContainerByIndex(self, container_type, index): | ||
| """Retrieves a specific attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| index (int): attribute container index. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| Raises: | ||
| IOError: when the store is closed. | ||
| OSError: when the store is closed. | ||
| """ | ||
| @abc.abstractmethod | ||
| def GetAttributeContainers(self, container_type, filter_expression=None): | ||
| """Retrieves a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| filter_expression (Optional[str]): expression to filter the resulting | ||
| attribute containers by. | ||
| Returns: | ||
| generator(AttributeContainer): attribute container generator. | ||
| Raises: | ||
| IOError: when the store is closed. | ||
| OSError: when the store is closed. | ||
| """ | ||
| @abc.abstractmethod | ||
| def GetNumberOfAttributeContainers(self, container_type): | ||
| """Retrieves the number of a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| int: the number of containers of a specified type. | ||
| """ | ||
| @abc.abstractmethod | ||
| def HasAttributeContainers(self, container_type): | ||
| """Determines if a store contains a specific type of attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| bool: True if the store contains the specified type of attribute | ||
| containers. | ||
| """ | ||
| @abc.abstractmethod | ||
| def Open(self, **kwargs): | ||
| """Opens the store.""" | ||
| def SetStorageProfiler(self, storage_profiler): | ||
| """Sets the storage profiler. | ||
| Args: | ||
| storage_profiler (StorageProfiler): storage profiler. | ||
| """ | ||
| self._storage_profiler = storage_profiler | ||
| def UpdateAttributeContainer(self, container): | ||
| """Updates an existing attribute container. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| Raises: | ||
| OSError: if the store cannot be written to. | ||
| IOError: if the store cannot be written to. | ||
| """ | ||
| self._RaiseIfNotWritable() | ||
| self._WriteExistingAttributeContainer(container) |
| # -*- coding: utf-8 -*- | ||
| """SQLite-based attribute container store.""" | ||
| import ast | ||
| import collections | ||
| import itertools | ||
| import os | ||
| import pathlib | ||
| import sqlite3 | ||
| from acstore.containers import interface as containers_interface | ||
| from acstore import interface | ||
| def PythonAST2SQL(ast_node): | ||
| """Converts a Python AST to SQL. | ||
| Args: | ||
| ast_node (ast.Node): node of the Python AST. | ||
| Returns: | ||
| str: SQL statement that represents the node. | ||
| Raises: | ||
| TypeError: if the type of node is not supported. | ||
| """ | ||
| if isinstance(ast_node, ast.BoolOp): | ||
| if isinstance(ast_node.op, ast.And): | ||
| operand = ' AND ' | ||
| elif isinstance(ast_node.op, ast.Or): | ||
| operand = ' OR ' | ||
| else: | ||
| raise TypeError(ast_node) | ||
| return operand.join([ | ||
| PythonAST2SQL(ast_node_value) for ast_node_value in ast_node.values]) | ||
| if isinstance(ast_node, ast.Compare): | ||
| if len(ast_node.ops) != 1: | ||
| raise TypeError(ast_node) | ||
| if isinstance(ast_node.ops[0], ast.Eq): | ||
| operator = ' = ' | ||
| elif isinstance(ast_node.ops[0], ast.NotEq): | ||
| operator = ' <> ' | ||
| else: | ||
| raise TypeError(ast_node) | ||
| if len(ast_node.comparators) != 1: | ||
| raise TypeError(ast_node) | ||
| sql_left = PythonAST2SQL(ast_node.left) | ||
| sql_right = PythonAST2SQL(ast_node.comparators[0]) | ||
| return operator.join([sql_left, sql_right]) | ||
| if isinstance(ast_node, ast.Constant): | ||
| if isinstance(ast_node.value, str): | ||
| return f'"{ast_node.value:s}"' | ||
| return str(ast_node.value) | ||
| if isinstance(ast_node, ast.Name): | ||
| return ast_node.id | ||
| if isinstance(ast_node, ast.Num): | ||
| return str(ast_node.n) | ||
| if isinstance(ast_node, ast.Str): | ||
| return f'"{ast_node.s:s}"' | ||
| raise TypeError(ast_node) | ||
| class SQLiteAttributeContainerStore(interface.AttributeContainerStore): | ||
| """SQLite-based attribute container store. | ||
| Attributes: | ||
| format_version (int): storage format version. | ||
| """ | ||
| _FORMAT_VERSION = 20221023 | ||
| # The earliest format version, stored in-file, that this class | ||
| # is able to append (write). | ||
| _APPEND_COMPATIBLE_FORMAT_VERSION = 20221023 | ||
| # The earliest format version, stored in-file, that this class | ||
| # is able to upgrade (write new format features). | ||
| _UPGRADE_COMPATIBLE_FORMAT_VERSION = 20221023 | ||
| # The earliest format version, stored in-file, that this class | ||
| # is able to read. | ||
| _READ_COMPATIBLE_FORMAT_VERSION = 20221023 | ||
| _CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS = { | ||
| 'AttributeContainerIdentifier': 'TEXT', | ||
| 'bool': 'INTEGER', | ||
| 'int': 'INTEGER', | ||
| 'str': 'TEXT', | ||
| 'timestamp': 'BIGINT'} | ||
| _CREATE_METADATA_TABLE_QUERY = ( | ||
| 'CREATE TABLE metadata (key TEXT, value TEXT);') | ||
| _HAS_TABLE_QUERY = ( | ||
| 'SELECT name FROM sqlite_master ' | ||
| 'WHERE type = "table" AND name = "{0:s}"') | ||
| _INSERT_METADATA_VALUE_QUERY = ( | ||
| 'INSERT INTO metadata (key, value) VALUES (?, ?)') | ||
| # The maximum number of cached attribute containers | ||
| _MAXIMUM_CACHED_CONTAINERS = 32 * 1024 | ||
| _MAXIMUM_WRITE_CACHE_SIZE = 50 | ||
| def __init__(self): | ||
| """Initializes a SQLite attribute container store.""" | ||
| super(SQLiteAttributeContainerStore, self).__init__() | ||
| self._attribute_container_cache = collections.OrderedDict() | ||
| self._connection = None | ||
| self._cursor = None | ||
| self._is_open = False | ||
| self._read_only = True | ||
| self._write_cache = {} | ||
| self.format_version = self._FORMAT_VERSION | ||
| def _CacheAttributeContainerByIndex(self, attribute_container, index): | ||
| """Caches a specific attribute container. | ||
| Args: | ||
| attribute_container (AttributeContainer): attribute container. | ||
| index (int): attribute container index. | ||
| """ | ||
| if len(self._attribute_container_cache) >= self._MAXIMUM_CACHED_CONTAINERS: | ||
| self._attribute_container_cache.popitem(last=True) | ||
| lookup_key = f'{attribute_container.CONTAINER_TYPE:s}.{index:d}' | ||
| self._attribute_container_cache[lookup_key] = attribute_container | ||
| self._attribute_container_cache.move_to_end(lookup_key, last=False) | ||
| def _CheckStorageMetadata(self, metadata_values, check_readable_only=False): | ||
| """Checks the storage metadata. | ||
| Args: | ||
| metadata_values (dict[str, str]): metadata values per key. | ||
| check_readable_only (Optional[bool]): whether the store should only be | ||
| checked to see if it can be read. If False, the store will be checked | ||
| to see if it can be read and written to. | ||
| Raises: | ||
| IOError: if the format version is not supported. | ||
| OSError: if the format version is not supported. | ||
| """ | ||
| format_version = metadata_values.get('format_version', None) | ||
| if not format_version: | ||
| raise IOError('Missing format version.') | ||
| try: | ||
| format_version = int(format_version, 10) | ||
| except (TypeError, ValueError): | ||
| raise IOError(f'Invalid format version: {format_version!s}.') | ||
| if (not check_readable_only and | ||
| format_version < self._APPEND_COMPATIBLE_FORMAT_VERSION): | ||
| raise IOError(( | ||
| f'Format version: {format_version:d} is too old and can no longer ' | ||
| f'be written.')) | ||
| if format_version < self._READ_COMPATIBLE_FORMAT_VERSION: | ||
| raise IOError(( | ||
| f'Format version: {format_version:d} is too old and can no longer ' | ||
| f'be read.')) | ||
| if format_version > self._FORMAT_VERSION: | ||
| raise IOError(( | ||
| f'Format version: {format_version:d} is too new and not yet ' | ||
| f'supported.')) | ||
| metadata_values['format_version'] = format_version | ||
| def _CreateAttributeContainerTable(self, container_type): | ||
| """Creates a table for a specific attribute container type. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| schema = self._GetAttributeContainerSchema(container_type) | ||
| if not schema: | ||
| raise IOError(f'Unsupported attribute container type: {container_type:s}') | ||
| column_definitions = ['_identifier INTEGER PRIMARY KEY AUTOINCREMENT'] | ||
| schema_to_sqlite_type_mappings = ( | ||
| self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS) | ||
| for name, data_type in sorted(schema.items()): | ||
| data_type = schema_to_sqlite_type_mappings.get(data_type, 'TEXT') | ||
| column_definitions.append(f'{name:s} {data_type:s}') | ||
| column_definitions = ', '.join(column_definitions) | ||
| query = f'CREATE TABLE {container_type:s} ({column_definitions:s});' | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| def _CreatetAttributeContainerFromRow( | ||
| self, container_type, column_names, row, first_column_index): | ||
| """Creates an attribute container of a row in the database. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| column_names (list[str]): names of the columns selected. | ||
| row (sqlite.Row): row as a result from a SELECT query. | ||
| first_column_index (int): index of the first column in row. | ||
| Returns: | ||
| AttributeContainer: attribute container. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| schema = self._GetAttributeContainerSchema(container_type) | ||
| if not schema: | ||
| raise IOError(f'Unsupported attribute container type: {container_type:s}') | ||
| container = self._containers_manager.CreateAttributeContainer( | ||
| container_type) | ||
| for column_index, name in enumerate(column_names): | ||
| attribute_value = row[first_column_index + column_index] | ||
| if attribute_value is None: | ||
| continue | ||
| data_type = schema[name] | ||
| if data_type == 'AttributeContainerIdentifier': | ||
| identifier = containers_interface.AttributeContainerIdentifier() | ||
| identifier.CopyFromString(attribute_value) | ||
| attribute_value = identifier | ||
| elif data_type == 'bool': | ||
| attribute_value = bool(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: {container_type:s} ' | ||
| f'attribute: {name:s} data type: {data_type:s}')) | ||
| setattr(container, name, attribute_value) | ||
| return container | ||
| def _Flush(self): | ||
| """Ensures cached data is written to file. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| for container_type, write_cache in self._write_cache.items(): | ||
| if len(write_cache) > 1: | ||
| self._FlushWriteCache(container_type, write_cache) | ||
| self._write_cache = {} | ||
| # We need to run commit or not all data is stored in the database. | ||
| self._connection.commit() | ||
| def _FlushWriteCache(self, container_type, write_cache): | ||
| """Flushes attribute container values cached for writing. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| write_cache (list[tuple[str]]): cached attribute container values. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| column_names = write_cache.pop(0) | ||
| value_statement = ','.join(['?'] * len(column_names)) | ||
| value_statement = f'({value_statement:s})' | ||
| values_statement = ', '.join([value_statement] * len(write_cache)) | ||
| column_names_string = ', '.join(column_names) | ||
| query = (f'INSERT INTO {container_type:s} ({column_names_string:s}) ' | ||
| f'VALUES {values_statement:s}') | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StartTiming('write_new') | ||
| try: | ||
| values = list(itertools.chain(*write_cache)) | ||
| self._cursor.execute(query, values) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| finally: | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StopTiming('write_new') | ||
| def _GetAttributeContainersWithFilter( | ||
| self, container_type, column_names=None, filter_expression=None, | ||
| order_by=None): | ||
| """Retrieves a specific type of stored attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| column_names (Optional[list[str]]): names of the columns to retrieve. | ||
| filter_expression (Optional[str]): SQL expression to filter results by. | ||
| order_by (Optional[str]): name of a column to order the results by. | ||
| Yields: | ||
| AttributeContainer: attribute container. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| write_cache = self._write_cache.get(container_type, []) | ||
| if len(write_cache) > 1: | ||
| self._FlushWriteCache(container_type, write_cache) | ||
| del self._write_cache[container_type] | ||
| column_names_string = ', '.join(column_names) | ||
| query = (f'SELECT _identifier, {column_names_string:s} ' | ||
| f'FROM {container_type:s}') | ||
| if filter_expression: | ||
| query = ' WHERE '.join([query, filter_expression]) | ||
| if order_by: | ||
| query = ' ORDER BY '.join([query, order_by]) | ||
| # Use a local cursor to prevent another query interrupting the generator. | ||
| cursor = self._connection.cursor() | ||
| try: | ||
| cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store for container: ' | ||
| f'{container_type:s} with error: {exception!s}')) | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StartTiming('get_containers') | ||
| try: | ||
| row = cursor.fetchone() | ||
| finally: | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StopTiming('get_containers') | ||
| while row: | ||
| container = self._CreatetAttributeContainerFromRow( | ||
| container_type, column_names, row, 1) | ||
| identifier = containers_interface.AttributeContainerIdentifier( | ||
| name=container_type, sequence_number=row[0]) | ||
| container.SetIdentifier(identifier) | ||
| yield container | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StartTiming('get_containers') | ||
| try: | ||
| row = cursor.fetchone() | ||
| finally: | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StopTiming('get_containers') | ||
| def _GetCachedAttributeContainer(self, container_type, index): | ||
| """Retrieves a specific cached attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| index (int): attribute container index. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| lookup_key = f'{container_type:s}.{index:d}' | ||
| attribute_container = self._attribute_container_cache.get(lookup_key, None) | ||
| if attribute_container: | ||
| self._attribute_container_cache.move_to_end(lookup_key, last=False) | ||
| return attribute_container | ||
| def _HasTable(self, table_name): | ||
| """Determines if a specific table exists. | ||
| Args: | ||
| table_name (str): name of the table. | ||
| Returns: | ||
| bool: True if the table exists, false otherwise. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| query = self._HAS_TABLE_QUERY.format(table_name) | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| return bool(self._cursor.fetchone()) | ||
| def _RaiseIfNotReadable(self): | ||
| """Raises if the attribute container store is not readable. | ||
| Raises: | ||
| IOError: when the attribute container store is closed. | ||
| OSError: when the attribute container store is closed. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Unable to read from closed attribute container store.') | ||
| def _RaiseIfNotWritable(self): | ||
| """Raises if the attribute container store is not writable. | ||
| Raises: | ||
| IOError: when the attribute container store is closed or read-only. | ||
| OSError: when the attribute container store is closed or read-only. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Unable to write to closed attribute container store.') | ||
| if self._read_only: | ||
| raise IOError('Unable to write to read-only attribute container store.') | ||
| def _ReadAndCheckStorageMetadata(self, check_readable_only=False): | ||
| """Reads storage metadata and checks that the values are valid. | ||
| Args: | ||
| check_readable_only (Optional[bool]): whether the store should only be | ||
| checked to see if it can be read. If False, the store will be checked | ||
| to see if it can be read and written to. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| metadata_values = self._ReadMetadata() | ||
| self._CheckStorageMetadata( | ||
| metadata_values, check_readable_only=check_readable_only) | ||
| self.format_version = metadata_values['format_version'] | ||
| def _ReadMetadata(self): | ||
| """Reads metadata. | ||
| Returns: | ||
| dict[str, str]: metadata values. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| query = 'SELECT key, value FROM metadata' | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| return {row[0]: row[1] for row in self._cursor.fetchall()} | ||
| def _UpdateStorageMetadataFormatVersion(self): | ||
| """Updates the storage metadata format version. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| if self.format_version >= self._UPGRADE_COMPATIBLE_FORMAT_VERSION: | ||
| query = (f'UPDATE metadata SET value = {self._FORMAT_VERSION:d} ' | ||
| f'WHERE key = "format_version"') | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| def _WriteExistingAttributeContainer(self, container): | ||
| """Writes an existing attribute container to the store. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| identifier = container.GetIdentifier() | ||
| schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE) | ||
| if not schema: | ||
| raise IOError( | ||
| f'Unsupported attribute container type: {container.CONTAINER_TYPE:s}') | ||
| write_cache = self._write_cache.get(container.CONTAINER_TYPE, []) | ||
| if len(write_cache) > 1: | ||
| self._FlushWriteCache(container.CONTAINER_TYPE, write_cache) | ||
| del self._write_cache[container.CONTAINER_TYPE] | ||
| column_names = [] | ||
| values = [] | ||
| for name, data_type in sorted(schema.items()): | ||
| attribute_value = getattr(container, name, None) | ||
| if attribute_value is not None: | ||
| if data_type == 'AttributeContainerIdentifier' and isinstance( | ||
| attribute_value, containers_interface.AttributeContainerIdentifier): | ||
| attribute_value = attribute_value.CopyToString() | ||
| elif data_type == 'bool': | ||
| attribute_value = int(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| column_names.append(f'{name:s} = ?') | ||
| values.append(attribute_value) | ||
| column_names_string = ', '.join(column_names) | ||
| query = (f'UPDATE {container.CONTAINER_TYPE:s} SET {column_names_string:s} ' | ||
| f'WHERE _identifier = {identifier.sequence_number:d}') | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StartTiming('write_existing') | ||
| try: | ||
| self._cursor.execute(query, values) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| finally: | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StopTiming('write_existing') | ||
| def _WriteMetadata(self): | ||
| """Writes metadata. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| try: | ||
| self._cursor.execute(self._CREATE_METADATA_TABLE_QUERY) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| self._WriteMetadataValue('format_version', f'{self._FORMAT_VERSION:d}') | ||
| def _WriteMetadataValue(self, key, value): | ||
| """Writes a metadata value. | ||
| Args: | ||
| key (str): key of the storage metadata. | ||
| value (str): value of the storage metadata. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| try: | ||
| self._cursor.execute(self._INSERT_METADATA_VALUE_QUERY, (key, value)) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| def _WriteNewAttributeContainer(self, container): | ||
| """Writes a new attribute container to the store. | ||
| The table for the container type must exist. | ||
| Args: | ||
| container (AttributeContainer): attribute container. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| next_sequence_number = self._GetAttributeContainerNextSequenceNumber( | ||
| container.CONTAINER_TYPE) | ||
| identifier = containers_interface.AttributeContainerIdentifier( | ||
| name=container.CONTAINER_TYPE, sequence_number=next_sequence_number) | ||
| container.SetIdentifier(identifier) | ||
| schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE) | ||
| if not schema: | ||
| raise IOError( | ||
| f'Unsupported attribute container type: {container.CONTAINER_TYPE:s}') | ||
| column_names = [] | ||
| values = [] | ||
| for name, data_type in sorted(schema.items()): | ||
| attribute_value = getattr(container, name, None) | ||
| if attribute_value is not None: | ||
| if data_type == 'AttributeContainerIdentifier' and isinstance( | ||
| attribute_value, | ||
| containers_interface.AttributeContainerIdentifier): | ||
| attribute_value = attribute_value.CopyToString() | ||
| elif data_type == 'bool': | ||
| attribute_value = int(attribute_value) | ||
| elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS: | ||
| raise IOError(( | ||
| f'Unsupported attribute container type: ' | ||
| f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: ' | ||
| f'{data_type:s}')) | ||
| column_names.append(name) | ||
| values.append(attribute_value) | ||
| write_cache = self._write_cache.get( | ||
| container.CONTAINER_TYPE, [column_names]) | ||
| write_cache.append(values) | ||
| if len(write_cache) >= self._MAXIMUM_WRITE_CACHE_SIZE: | ||
| self._FlushWriteCache(container.CONTAINER_TYPE, write_cache) | ||
| write_cache = [column_names] | ||
| self._write_cache[container.CONTAINER_TYPE] = write_cache | ||
| self._CacheAttributeContainerByIndex(container, next_sequence_number - 1) | ||
| @classmethod | ||
| def CheckSupportedFormat(cls, path): | ||
| """Checks if the attribute container store format is supported. | ||
| Args: | ||
| path (str): path to the attribute container store. | ||
| Returns: | ||
| bool: True if the format is supported. | ||
| """ | ||
| # Check if the path is an existing file, to prevent sqlite3 creating | ||
| # an emtpy database file. | ||
| if not os.path.isfile(path): | ||
| return False | ||
| try: | ||
| connection = sqlite3.connect( | ||
| path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) | ||
| cursor = connection.cursor() | ||
| query = 'SELECT * FROM metadata' | ||
| cursor.execute(query) | ||
| metadata_values = {row[0]: row[1] for row in cursor.fetchall()} | ||
| format_version = metadata_values.get('format_version', None) | ||
| if format_version: | ||
| try: | ||
| format_version = int(format_version, 10) | ||
| result = True | ||
| except (TypeError, ValueError): | ||
| pass | ||
| connection.close() | ||
| except (IOError, TypeError, ValueError, sqlite3.DatabaseError): | ||
| result = False | ||
| return result | ||
| def Close(self): | ||
| """Closes the file. | ||
| Raises: | ||
| IOError: if the attribute container store is already closed. | ||
| OSError: if the attribute container store is already closed. | ||
| """ | ||
| if not self._is_open: | ||
| raise IOError('Storage file already closed.') | ||
| if self._connection: | ||
| self._Flush() | ||
| self._connection.close() | ||
| self._connection = None | ||
| self._cursor = None | ||
| self._is_open = False | ||
| def GetAttributeContainerByIdentifier(self, container_type, identifier): | ||
| """Retrieves a specific type of container with a specific identifier. | ||
| Args: | ||
| container_type (str): container type. | ||
| identifier (AttributeContainerIdentifier): attribute container identifier. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| Raises: | ||
| IOError: when the store is closed or if an unsupported attribute | ||
| container is provided. | ||
| OSError: when the store is closed or if an unsupported attribute | ||
| container is provided. | ||
| """ | ||
| return self.GetAttributeContainerByIndex( | ||
| container_type, identifier.sequence_number - 1) | ||
| def GetAttributeContainerByIndex(self, container_type, index): | ||
| """Retrieves a specific attribute container. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| index (int): attribute container index. | ||
| Returns: | ||
| AttributeContainer: attribute container or None if not available. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| container = self._GetCachedAttributeContainer(container_type, index) | ||
| if container: | ||
| return container | ||
| write_cache = self._write_cache.get(container_type, []) | ||
| if len(write_cache) > 1: | ||
| self._FlushWriteCache(container_type, write_cache) | ||
| del self._write_cache[container_type] | ||
| schema = self._GetAttributeContainerSchema(container_type) | ||
| if not schema: | ||
| raise IOError(f'Unsupported attribute container type: {container_type:s}') | ||
| column_names = sorted(schema.keys()) | ||
| column_names_string = ', '.join(column_names) | ||
| row_number = index + 1 | ||
| query = (f'SELECT {column_names_string:s} FROM {container_type:s} WHERE ' | ||
| f'rowid = {row_number:d}') | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StartTiming('get_container_by_index') | ||
| try: | ||
| row = self._cursor.fetchone() | ||
| finally: | ||
| if self._storage_profiler: | ||
| self._storage_profiler.StopTiming('get_container_by_index') | ||
| if not row: | ||
| return None | ||
| container = self._CreatetAttributeContainerFromRow( | ||
| container_type, column_names, row, 0) | ||
| identifier = containers_interface.AttributeContainerIdentifier( | ||
| name=container_type, sequence_number=row_number) | ||
| container.SetIdentifier(identifier) | ||
| self._CacheAttributeContainerByIndex(container, index) | ||
| return container | ||
| def GetAttributeContainers(self, container_type, filter_expression=None): | ||
| """Retrieves a specific type of stored attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| filter_expression (Optional[str]): expression to filter the resulting | ||
| attribute containers by. | ||
| Returns: | ||
| generator(AttributeContainer): attribute container generator. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| OSError: when there is an error querying the attribute container store | ||
| or if an unsupported attribute container is provided. | ||
| """ | ||
| schema = self._GetAttributeContainerSchema(container_type) | ||
| if not schema: | ||
| raise IOError(f'Unsupported attribute container type: {container_type:s}') | ||
| column_names = sorted(schema.keys()) | ||
| sql_filter_expression = None | ||
| if filter_expression: | ||
| expression_ast = ast.parse(filter_expression, mode='eval') | ||
| sql_filter_expression = PythonAST2SQL(expression_ast.body) | ||
| return self._GetAttributeContainersWithFilter( | ||
| container_type, column_names=column_names, | ||
| filter_expression=sql_filter_expression) | ||
| def GetNumberOfAttributeContainers(self, container_type): | ||
| """Retrieves the number of a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| int: the number of containers of a specified type. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| if not self._HasTable(container_type): | ||
| return 0 | ||
| write_cache = self._write_cache.get(container_type, []) | ||
| if len(write_cache) > 1: | ||
| self._FlushWriteCache(container_type, write_cache) | ||
| del self._write_cache[container_type] | ||
| # Note that this is SQLite specific, and will give inaccurate results if | ||
| # there are DELETE commands run on the table. acstore does not run any | ||
| # DELETE commands. | ||
| query = f'SELECT MAX(_ROWID_) FROM {container_type:s} LIMIT 1' | ||
| try: | ||
| self._cursor.execute(query) | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| row = self._cursor.fetchone() | ||
| if not row: | ||
| return 0 | ||
| return row[0] or 0 | ||
| def HasAttributeContainers(self, container_type): | ||
| """Determines if store contains a specific type of attribute containers. | ||
| Args: | ||
| container_type (str): attribute container type. | ||
| Returns: | ||
| bool: True if the store contains the specified type of attribute | ||
| containers. | ||
| Raises: | ||
| IOError: when there is an error querying the attribute container store. | ||
| OSError: when there is an error querying the attribute container store. | ||
| """ | ||
| count = self.GetNumberOfAttributeContainers(container_type) | ||
| return count > 0 | ||
| def Open(self, path=None, read_only=True, **unused_kwargs): # pylint: disable=arguments-differ | ||
| """Opens the store. | ||
| Args: | ||
| path (Optional[str]): path to the attribute container store. | ||
| read_only (Optional[bool]): True if the file should be opened in | ||
| read-only mode. | ||
| Raises: | ||
| IOError: if the attribute container store is already opened or if | ||
| the database cannot be connected. | ||
| OSError: if the attribute container store is already opened or if | ||
| the database cannot be connected. | ||
| ValueError: if path is missing. | ||
| """ | ||
| if self._is_open: | ||
| raise IOError('Storage file already opened.') | ||
| if not path: | ||
| raise ValueError('Missing path.') | ||
| path = os.path.abspath(path) | ||
| try: | ||
| path_uri = pathlib.Path(path).as_uri() | ||
| if read_only: | ||
| path_uri = f'{path_uri:s}?mode=ro' | ||
| except ValueError: | ||
| path_uri = None | ||
| detect_types = sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES | ||
| if path_uri: | ||
| connection = sqlite3.connect( | ||
| path_uri, detect_types=detect_types, isolation_level='DEFERRED', | ||
| uri=True) | ||
| else: | ||
| connection = sqlite3.connect( | ||
| path, detect_types=detect_types, isolation_level='DEFERRED') | ||
| try: | ||
| # Use in-memory journaling mode to reduce IO. | ||
| connection.execute('PRAGMA journal_mode=MEMORY') | ||
| # Turn off insert transaction integrity since we want to do bulk insert. | ||
| connection.execute('PRAGMA synchronous=OFF') | ||
| except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception: | ||
| raise IOError(( | ||
| f'Unable to query attribute container store with error: ' | ||
| f'{exception!s}')) | ||
| cursor = connection.cursor() | ||
| if not cursor: | ||
| return | ||
| self._connection = connection | ||
| self._cursor = cursor | ||
| self._is_open = True | ||
| self._read_only = read_only | ||
| if read_only: | ||
| self._ReadAndCheckStorageMetadata(check_readable_only=True) | ||
| else: | ||
| if not self._HasTable('metadata'): | ||
| self._WriteMetadata() | ||
| else: | ||
| self._ReadAndCheckStorageMetadata() | ||
| # Update the storage metadata format version in case we are adding | ||
| # new format features that are not backwards compatible. | ||
| self._UpdateStorageMetadataFormatVersion() | ||
| # TODO: create table on demand. | ||
| for container_type in self._containers_manager.GetContainerTypes(): | ||
| if not self._HasTable(container_type): | ||
| self._CreateAttributeContainerTable(container_type) | ||
| self._connection.commit() |
+37
| environment: | ||
| matrix: | ||
| - DESCRIPTION: "Windows with 32-bit Python 3.10" | ||
| MACHINE_TYPE: "x86" | ||
| APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 | ||
| PYTHON: "C:\\Python310" | ||
| PYTHON_VERSION: "3.10" | ||
| L2TBINARIES_TRACK: "dev" | ||
| - DESCRIPTION: "Windows with 64-bit Python 3.10" | ||
| MACHINE_TYPE: "amd64" | ||
| APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 | ||
| PYTHON: "C:\\Python310-x64" | ||
| PYTHON_VERSION: "3.10" | ||
| L2TBINARIES_TRACK: "dev" | ||
| - DESCRIPTION: "Mac OS with Python 3.11" | ||
| APPVEYOR_BUILD_WORKER_IMAGE: macos-monterey | ||
| HOMEBREW_NO_INSTALL_CLEANUP: 1 | ||
| install: | ||
| - cmd: "%PYTHON%\\python.exe -m pip install -U pip setuptools twine wheel" | ||
| - cmd: "%PYTHON%\\python.exe -m pip install pywin32 WMI" | ||
| - cmd: "%PYTHON%\\python.exe %PYTHON%\\Scripts\\pywin32_postinstall.py -install" | ||
| - ps: If ($isWindows) { .\config\appveyor\install.ps1 } | ||
| - sh: config/appveyor/install.sh | ||
| build_script: | ||
| - cmd: "%PYTHON%\\python.exe setup.py bdist_wheel" | ||
| test_script: | ||
| - cmd: "%PYTHON%\\python.exe run_tests.py" | ||
| - cmd: IF EXIST "tests\\end-to-end.py" ( | ||
| set PYTHONPATH=. && | ||
| "%PYTHON%\\python.exe" "tests\\end-to-end.py" --debug -c "config\\end-to-end.ini" ) | ||
| - sh: config/appveyor/runtests.sh | ||
| artifacts: | ||
| - path: dist\*.whl |
| # Script to set up tests on AppVeyor Windows. | ||
| $Dependencies = "" | ||
| $Dependencies = ${Dependencies} -split " " | ||
| $Output = Invoke-Expression -Command "git clone https://github.com/log2timeline/l2tdevtools.git ..\l2tdevtools 2>&1" | ||
| Write-Host (${Output} | Out-String) | ||
| If ($env:APPVEYOR_REPO_BRANCH -eq "main") | ||
| { | ||
| $Track = "stable" | ||
| } | ||
| Else | ||
| { | ||
| $Track = $env:APPVEYOR_REPO_BRANCH | ||
| } | ||
| New-Item -ItemType "directory" -Name "dependencies" | ||
| $env:PYTHONPATH = "..\l2tdevtools" | ||
| $Output = Invoke-Expression -Command "& '${env:PYTHON}\python.exe' ..\l2tdevtools\tools\update.py --download-directory dependencies --machine-type ${env:MACHINE_TYPE} --msi-targetdir ${env:PYTHON} --track ${env:L2TBINARIES_TRACK} ${Dependencies} 2>&1" | ||
| Write-Host (${Output} | Out-String) | ||
| # Script to set up tests on AppVeyor MacOS. | ||
| set -e | ||
| brew update -q | ||
| brew install -q gettext gnu-sed python@3.11 tox || true | ||
| #!/bin/sh | ||
| # Script to run tests | ||
| # Set the following environment variables to build libyal with gettext. | ||
| export CPPFLAGS="-I/usr/local/include -I/usr/local/opt/gettext/include ${CPPFLAGS}"; | ||
| export LDFLAGS="-L/usr/local/lib -L/usr/local/opt/gettext/lib ${LDFLAGS}"; | ||
| # Set the following environment variables to build pycrypto and yara-python. | ||
| export CPPFLAGS="-I/usr/local/opt/openssl@1.1/include ${CPPFLAGS}"; | ||
| export LDFLAGS="-L/usr/local/opt/openssl@1.1/lib ${LDFLAGS}"; | ||
| # Set the following environment variables to ensure tox can find Python 3.11. | ||
| export PATH="/usr/local/opt/python@3.11/bin:${PATH}"; | ||
| tox -e py311 |
+170
| # -*- coding: utf-8 -*- | ||
| """Sphinx build configuration file.""" | ||
| import os | ||
| import sys | ||
| from sphinx.ext import apidoc | ||
| from docutils import nodes | ||
| from docutils import transforms | ||
| # Change PYTHONPATH to include acstore module and dependencies. | ||
| sys.path.insert(0, os.path.abspath('..')) | ||
| import acstore # pylint: disable=wrong-import-position | ||
| import utils.dependencies # pylint: disable=wrong-import-position | ||
| # -- General configuration ------------------------------------------------ | ||
| # If your documentation needs a minimal Sphinx version, state it here. | ||
| needs_sphinx = '2.0.1' | ||
| # Add any Sphinx extension module names here, as strings. They can be | ||
| # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom | ||
| # ones. | ||
| extensions = [ | ||
| 'recommonmark', | ||
| 'sphinx.ext.autodoc', | ||
| 'sphinx.ext.coverage', | ||
| 'sphinx.ext.doctest', | ||
| 'sphinx.ext.napoleon', | ||
| 'sphinx.ext.viewcode', | ||
| 'sphinx_markdown_tables', | ||
| 'sphinx_rtd_theme', | ||
| ] | ||
| # We cannot install architecture dependent Python modules on readthedocs, | ||
| # therefore we mock most imports. | ||
| pip_installed_modules = set() | ||
| dependency_helper = utils.dependencies.DependencyHelper( | ||
| dependencies_file=os.path.join('..', 'dependencies.ini'), | ||
| test_dependencies_file=os.path.join('..', 'test_dependencies.ini')) | ||
| modules_to_mock = set(dependency_helper.dependencies.keys()) | ||
| modules_to_mock = modules_to_mock.difference(pip_installed_modules) | ||
| autodoc_mock_imports = sorted(modules_to_mock) | ||
| # Options for the Sphinx Napoleon extension, which reads Google-style | ||
| # docstrings. | ||
| napoleon_google_docstring = True | ||
| napoleon_numpy_docstring = False | ||
| napoleon_include_private_with_doc = False | ||
| napoleon_include_special_with_doc = True | ||
| # General information about the project. | ||
| # pylint: disable=redefined-builtin | ||
| project = 'ACStore' | ||
| copyright = 'The ACStore authors' | ||
| version = acstore.__version__ | ||
| release = acstore.__version__ | ||
| # Add any paths that contain templates here, relative to this directory. | ||
| templates_path = ['_templates'] | ||
| # List of patterns, relative to source directory, that match files and | ||
| # directories to ignore when looking for source files. | ||
| exclude_patterns = ['_build'] | ||
| # The master toctree document. | ||
| master_doc = 'index' | ||
| # The name of the Pygments (syntax highlighting) style to use. | ||
| pygments_style = 'sphinx' | ||
| # -- Options for HTML output ---------------------------------------------- | ||
| # The theme to use for HTML and HTML Help pages. See the documentation for | ||
| # a list of builtin themes. | ||
| html_theme = 'sphinx_rtd_theme' | ||
| # Output file base name for HTML help builder. | ||
| htmlhelp_basename = 'acstoredoc' | ||
| # -- Options linkcheck ---------------------------------------------------- | ||
| linkcheck_ignore = [ | ||
| ] | ||
| # -- Code to rewrite links for readthedocs -------------------------------- | ||
| # This function is a Sphinx core event callback, the format of which is detailed | ||
| # here: https://www.sphinx-doc.org/en/master/extdev/appapi.html#events | ||
| # pylint: disable=unused-argument | ||
| def RunSphinxAPIDoc(app): | ||
| """Runs sphinx-apidoc to auto-generate documentation. | ||
| Args: | ||
| app (sphinx.application.Sphinx): Sphinx application. Required by the | ||
| the Sphinx event callback API. | ||
| """ | ||
| current_directory = os.path.abspath(os.path.dirname(__file__)) | ||
| module_path = os.path.join(current_directory, '..', 'acstore') | ||
| api_directory = os.path.join(current_directory, 'sources', 'api') | ||
| apidoc.main(['-o', api_directory, module_path, '--force']) | ||
| class MarkdownLinkFixer(transforms.Transform): | ||
| """Transform definition to parse .md references to internal pages.""" | ||
| default_priority = 1000 | ||
| _URI_PREFIXES = [] | ||
| def _FixLinks(self, node): | ||
| """Corrects links to .md files not part of the documentation. | ||
| Args: | ||
| node (docutils.nodes.Node): docutils node. | ||
| Returns: | ||
| docutils.nodes.Node: docutils node, with correct URIs outside | ||
| of Markdown pages outside the documentation. | ||
| """ | ||
| if isinstance(node, nodes.reference) and 'refuri' in node: | ||
| reference_uri = node['refuri'] | ||
| for uri_prefix in self._URI_PREFIXES: | ||
| if (reference_uri.startswith(uri_prefix) and not ( | ||
| reference_uri.endswith('.asciidoc') or | ||
| reference_uri.endswith('.md'))): | ||
| node['refuri'] = reference_uri + '.md' | ||
| break | ||
| return node | ||
| def _Traverse(self, node): | ||
| """Traverses the document tree rooted at node. | ||
| Args: | ||
| node (docutils.nodes.Node): docutils node. | ||
| """ | ||
| self._FixLinks(node) | ||
| for child_node in node.children: | ||
| self._Traverse(child_node) | ||
| # pylint: disable=arguments-differ | ||
| def apply(self): | ||
| """Applies this transform on document tree.""" | ||
| self._Traverse(self.document) | ||
| # pylint: invalid-name | ||
| def setup(app): | ||
| """Called at Sphinx initialization. | ||
| Args: | ||
| app (sphinx.application.Sphinx): Sphinx application. | ||
| """ | ||
| # Triggers sphinx-apidoc to generate API documentation. | ||
| app.connect('builder-inited', RunSphinxAPIDoc) | ||
| app.add_config_value( | ||
| 'recommonmark_config', {'enable_auto_toc_tree': True}, True) | ||
| app.add_transform(MarkdownLinkFixer) |
| Welcome to the ACStore documentation | ||
| ==================================== | ||
| ACStore, or Attribute Container Storage, provides a stand-alone implementation | ||
| to read and write attribute container storage files. | ||
| The source code is available from the `project page <https://github.com/log2timeline/acstore>`__. | ||
| .. toctree:: | ||
| :maxdepth: 2 | ||
| sources/user/index | ||
| .. toctree:: | ||
| :maxdepth: 2 | ||
| API documentation <sources/api/acstore> | ||
| Indices and tables | ||
| ================== | ||
| * :ref:`genindex` | ||
| * :ref:`modindex` | ||
| docutils | ||
| Markdown | ||
| recommonmark | ||
| sphinx >= 4.1.0, < 5.2.0 | ||
| sphinx-markdown-tables | ||
| sphinx-rtd-theme >= 0.5.1 |
| acstore.containers package | ||
| ========================== | ||
| Submodules | ||
| ---------- | ||
| acstore.containers.interface module | ||
| ----------------------------------- | ||
| .. automodule:: acstore.containers.interface | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| acstore.containers.manager module | ||
| --------------------------------- | ||
| .. automodule:: acstore.containers.manager | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| Module contents | ||
| --------------- | ||
| .. automodule:: acstore.containers | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: |
| acstore package | ||
| =============== | ||
| Subpackages | ||
| ----------- | ||
| .. toctree:: | ||
| :maxdepth: 4 | ||
| acstore.containers | ||
| Submodules | ||
| ---------- | ||
| acstore.fake\_store module | ||
| -------------------------- | ||
| .. automodule:: acstore.fake_store | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| acstore.interface module | ||
| ------------------------ | ||
| .. automodule:: acstore.interface | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| acstore.sqlite\_store module | ||
| ---------------------------- | ||
| .. automodule:: acstore.sqlite_store | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: | ||
| Module contents | ||
| --------------- | ||
| .. automodule:: acstore | ||
| :members: | ||
| :undoc-members: | ||
| :show-inheritance: |
| acstore | ||
| ======= | ||
| .. toctree:: | ||
| :maxdepth: 4 | ||
| acstore |
| ############### | ||
| Getting started | ||
| ############### | ||
| To be able to use ACStore you first need to install it. There are multiple | ||
| ways to install ACStore, check the following instructions for more detail. | ||
| .. toctree:: | ||
| :maxdepth: 2 | ||
| Installation instructions <Installation-instructions> |
| # Installation instructions | ||
| ## pip | ||
| **Note that using pip outside virtualenv is not recommended since it ignores | ||
| your systems package manager. If you aren't comfortable debugging package | ||
| installation issues, this is not the option for you.** | ||
| Create and activate a virtualenv: | ||
| ```bash | ||
| virtualenv acstoreenv | ||
| cd acstoreenv | ||
| source ./bin/activate | ||
| ``` | ||
| Upgrade pip and install ACStore dependencies: | ||
| ```bash | ||
| pip install --upgrade pip | ||
| pip install acstore | ||
| ``` | ||
| To deactivate the virtualenv run: | ||
| ```bash | ||
| deactivate | ||
| ``` | ||
| ## Ubuntu 18.04 and 20.04 LTS | ||
| To install ACStore from the [GIFT Personal Package Archive (PPA)](https://launchpad.net/~gift): | ||
| ```bash | ||
| sudo add-apt-repository ppa:gift/stable | ||
| ``` | ||
| Update and install ACStore: | ||
| ```bash | ||
| sudo apt-get update | ||
| sudo apt-get install python3-acstore | ||
| ``` | ||
| ## Windows | ||
| The [l2tbinaries](https://github.com/log2timeline/l2tbinaries) contains the | ||
| necessary packages for running ACStore. l2tbinaries provides the following | ||
| branches: | ||
| * main; branch intended for the "packaged release" of ACStore and dependencies; | ||
| * dev; branch intended for the "development release" of ACStore; | ||
| * testing; branch intended for testing newly created packages. | ||
| The l2tdevtools project provides [an update script](https://github.com/log2timeline/l2tdevtools/wiki/Update-script) | ||
| to ease the process of keeping the dependencies up to date. | ||
| The script requires [pywin32](https://github.com/mhammond/pywin32/releases) and | ||
| [Python WMI](https://pypi.org/project/WMI). | ||
| To install the release versions of the dependencies run: | ||
| ``` | ||
| set PYTHONPATH=. | ||
| C:\Python3\python.exe tools\update.py --preset acstore | ||
| ``` |
| pip >= 7.0.0 |
| # -*- coding: utf-8 -*- |
| # -*- coding: utf-8 -*- |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the attribute container interface.""" | ||
| import unittest | ||
| from acstore.containers import interface | ||
| from tests import test_lib | ||
| class AttributeContainerIdentifierTest(test_lib.BaseTestCase): | ||
| """Tests for the attribute container identifier.""" | ||
| def testCopyToString(self): | ||
| """Tests the CopyToString function.""" | ||
| sequence_number = id(self) | ||
| identifier = interface.AttributeContainerIdentifier( | ||
| name='test_container', sequence_number=sequence_number) | ||
| identifier_string = identifier.CopyToString() | ||
| self.assertEqual( | ||
| identifier_string, f'test_container.{sequence_number:d}') | ||
| class AttributeContainerTest(test_lib.BaseTestCase): | ||
| """Tests for the attribute container interface.""" | ||
| # pylint: disable=protected-access | ||
| def testCopyToDict(self): | ||
| """Tests the CopyToDict function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container.attribute_name = 'attribute_name' | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| expected_dict = { | ||
| 'attribute_name': 'attribute_name', | ||
| 'attribute_value': 'attribute_value'} | ||
| test_dict = attribute_container.CopyToDict() | ||
| self.assertEqual(test_dict, expected_dict) | ||
| def testGetAttributeNames(self): | ||
| """Tests the GetAttributeNames function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container._protected_attribute = 'protected' | ||
| attribute_container.attribute_name = 'attribute_name' | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| expected_attribute_names = ['attribute_name', 'attribute_value'] | ||
| attribute_names = sorted(attribute_container.GetAttributeNames()) | ||
| self.assertEqual(attribute_names, expected_attribute_names) | ||
| attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [ | ||
| '_protected_attribute'] | ||
| expected_attribute_names = [ | ||
| '_protected_attribute', 'attribute_name', 'attribute_value'] | ||
| attribute_names = sorted(attribute_container.GetAttributeNames()) | ||
| self.assertEqual(attribute_names, expected_attribute_names) | ||
| def testGetAttributes(self): | ||
| """Tests the GetAttributes function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container._protected_attribute = 'protected' | ||
| attribute_container.attribute_name = 'attribute_name' | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| expected_attributes = [ | ||
| ('attribute_name', 'attribute_name'), | ||
| ('attribute_value', 'attribute_value')] | ||
| attributes = sorted(attribute_container.GetAttributes()) | ||
| self.assertEqual(attributes, expected_attributes) | ||
| attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [ | ||
| '_protected_attribute'] | ||
| expected_attributes = [ | ||
| ('_protected_attribute', 'protected'), | ||
| ('attribute_name', 'attribute_name'), | ||
| ('attribute_value', 'attribute_value')] | ||
| attributes = sorted(attribute_container.GetAttributes()) | ||
| self.assertEqual(attributes, expected_attributes) | ||
| def testGetAttributeValueHash(self): | ||
| """Tests the GetAttributeValuesHash function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container._protected_attribute = 'protected' | ||
| attribute_container.attribute_name = 'attribute_name' | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| attribute_values_hash1 = attribute_container.GetAttributeValuesHash() | ||
| attribute_container.attribute_value = 'changes' | ||
| attribute_values_hash2 = attribute_container.GetAttributeValuesHash() | ||
| self.assertNotEqual(attribute_values_hash1, attribute_values_hash2) | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [ | ||
| '_protected_attribute'] | ||
| attribute_values_hash2 = attribute_container.GetAttributeValuesHash() | ||
| self.assertNotEqual(attribute_values_hash1, attribute_values_hash2) | ||
| def testGetAttributeValuesString(self): | ||
| """Tests the GetAttributeValuesString function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container._protected_attribute = 'protected' | ||
| attribute_container.attribute_name = 'attribute_name' | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| attribute_values_string1 = attribute_container.GetAttributeValuesString() | ||
| attribute_container.attribute_value = 'changes' | ||
| attribute_values_string2 = attribute_container.GetAttributeValuesString() | ||
| self.assertNotEqual(attribute_values_string1, attribute_values_string2) | ||
| attribute_container.attribute_value = 'attribute_value' | ||
| attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [ | ||
| '_protected_attribute'] | ||
| attribute_values_string2 = attribute_container.GetAttributeValuesString() | ||
| self.assertNotEqual(attribute_values_string1, attribute_values_string2) | ||
| def testGetIdentifier(self): | ||
| """Tests the GetIdentifier function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| identifier = attribute_container.GetIdentifier() | ||
| self.assertIsNotNone(identifier) | ||
| def testMatchesExpression(self): | ||
| """Tests the MatchesExpression function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container.name = 'value' | ||
| result = attribute_container.MatchesExpression('name == "value"') | ||
| self.assertTrue(result) | ||
| result = attribute_container.MatchesExpression('name == "bogus"') | ||
| self.assertFalse(result) | ||
| result = attribute_container.MatchesExpression('bogus') | ||
| self.assertFalse(result) | ||
| def testSetIdentifier(self): | ||
| """Tests the SetIdentifier function.""" | ||
| attribute_container = interface.AttributeContainer() | ||
| attribute_container.SetIdentifier(None) | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the attribute container manager.""" | ||
| import unittest | ||
| from acstore.containers import manager | ||
| from tests import test_lib as shared_test_lib | ||
| class AttributeContainersManagerTest(shared_test_lib.BaseTestCase): | ||
| """Tests for the attribute container manager.""" | ||
| # pylint: disable=protected-access | ||
| def testCreateAttributeContainer(self): | ||
| """Tests the CreateAttributeContainer function.""" | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| try: | ||
| attribute_container = ( | ||
| manager.AttributeContainersManager.CreateAttributeContainer( | ||
| 'test_container')) | ||
| self.assertIsNotNone(attribute_container) | ||
| with self.assertRaises(ValueError): | ||
| manager.AttributeContainersManager.CreateAttributeContainer('bogus') | ||
| finally: | ||
| manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| def testGetContainerTypes(self): | ||
| """Tests the GetContainerTypes function.""" | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| try: | ||
| container_types = manager.AttributeContainersManager.GetContainerTypes() | ||
| self.assertIn('test_container', container_types) | ||
| finally: | ||
| manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| def testGetSchema(self): | ||
| """Tests the GetSchema function.""" | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| try: | ||
| schema = manager.AttributeContainersManager.GetSchema('test_container') | ||
| self.assertIsNotNone(schema) | ||
| self.assertEqual(schema, shared_test_lib.TestAttributeContainer.SCHEMA) | ||
| with self.assertRaises(ValueError): | ||
| manager.AttributeContainersManager.GetSchema('bogus') | ||
| finally: | ||
| manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| def testAttributeContainerRegistration(self): | ||
| """Tests the Register and DeregisterAttributeContainer functions.""" | ||
| number_of_classes = len( | ||
| manager.AttributeContainersManager._attribute_container_classes) | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| try: | ||
| self.assertEqual( | ||
| len(manager.AttributeContainersManager._attribute_container_classes), | ||
| number_of_classes + 1) | ||
| with self.assertRaises(KeyError): | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| finally: | ||
| manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| shared_test_lib.TestAttributeContainer) | ||
| self.assertEqual( | ||
| len(manager.AttributeContainersManager._attribute_container_classes), | ||
| number_of_classes) | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the fake (in-memory only) store.""" | ||
| import unittest | ||
| from acstore import fake_store | ||
| from tests import test_lib | ||
| class FakeAttributeContainerStoreTest(test_lib.BaseTestCase): | ||
| """Tests for the fake (in-memory only) store.""" | ||
| # pylint: disable=protected-access | ||
| def testRaiseIfNotReadable(self): | ||
| """Tests the _RaiseIfNotReadable function.""" | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| with self.assertRaises(IOError): | ||
| test_store._RaiseIfNotReadable() | ||
| def testRaiseIfNotWritable(self): | ||
| """Tests the _RaiseIfNotWritable function.""" | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| with self.assertRaises(IOError): | ||
| test_store._RaiseIfNotWritable() | ||
| def testWriteExistingAttributeContainer(self): | ||
| """Tests the _WriteExistingAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| with self.assertRaises(IOError): | ||
| test_store._WriteExistingAttributeContainer(attribute_container) | ||
| test_store._WriteNewAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store._WriteExistingAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.Close() | ||
| def testWriteNewAttributeContainer(self): | ||
| """Tests the _WriteNewAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store._WriteNewAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.Close() | ||
| def testAddAttributeContainer(self): | ||
| """Tests the AddAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.Close() | ||
| with self.assertRaises(IOError): | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| def testGetAttributeContainerByIdentifier(self): | ||
| """Tests the GetAttributeContainerByIdentifier function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| identifier = attribute_container.GetIdentifier() | ||
| container = test_store.GetAttributeContainerByIdentifier( | ||
| attribute_container.CONTAINER_TYPE, identifier) | ||
| self.assertIsNotNone(container) | ||
| identifier.sequence_number = 99 | ||
| container = test_store.GetAttributeContainerByIdentifier( | ||
| attribute_container.CONTAINER_TYPE, identifier) | ||
| self.assertIsNone(container) | ||
| test_store.Close() | ||
| def testGetAttributeContainerByIndex(self): | ||
| """Tests the GetAttributeContainerByIndex function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| container = test_store.GetAttributeContainerByIndex( | ||
| attribute_container.CONTAINER_TYPE, 0) | ||
| self.assertIsNone(container) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| container = test_store.GetAttributeContainerByIndex( | ||
| attribute_container.CONTAINER_TYPE, 0) | ||
| self.assertIsNotNone(container) | ||
| test_store.Close() | ||
| def testGetAttributeContainers(self): | ||
| """Tests the GetAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d' | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE)) | ||
| self.assertEqual(len(containers), 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 0) | ||
| test_store.Close() | ||
| def testGetNumberOfAttributeContainers(self): | ||
| """Tests the GetNumberOfAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.Close() | ||
| def testHasAttributeContainers(self): | ||
| """Tests the HasAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| result = test_store.HasAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertFalse(result) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| result = test_store.HasAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertTrue(result) | ||
| test_store.Close() | ||
| def testOpenClose(self): | ||
| """Tests the Open and Close functions.""" | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| test_store.Close() | ||
| test_store.Open() | ||
| test_store.Close() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| test_store.Close() | ||
| test_store.Open() | ||
| with self.assertRaises(IOError): | ||
| test_store.Open() | ||
| test_store.Close() | ||
| with self.assertRaises(IOError): | ||
| test_store.Close() | ||
| def testUpdateAttributeContainer(self): | ||
| """Tests the UpdateAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = fake_store.FakeAttributeContainerStore() | ||
| test_store.Open() | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| with self.assertRaises(IOError): | ||
| test_store.UpdateAttributeContainer(attribute_container) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.UpdateAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.Close() | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the attribute container store interface.""" | ||
| import unittest | ||
| from acstore import interface | ||
| from acstore.containers import manager | ||
| from tests import test_lib | ||
| class AttributeContainerStoreTest(test_lib.BaseTestCase): | ||
| """Tests for the attribute container store interface.""" | ||
| # pylint: disable=protected-access | ||
| def testGetAttributeContainerNextSequenceNumber(self): | ||
| """Tests the _GetAttributeContainerNextSequenceNumber function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = interface.AttributeContainerStore() | ||
| sequence_number = test_store._GetAttributeContainerNextSequenceNumber( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(sequence_number, 1) | ||
| sequence_number = test_store._GetAttributeContainerNextSequenceNumber( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(sequence_number, 2) | ||
| def testGetAttributeContainerSchema(self): | ||
| """Tests the _GetAttributeContainerSchema function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| test_store = interface.AttributeContainerStore() | ||
| schema = test_store._GetAttributeContainerSchema( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(schema, {}) | ||
| manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| test_lib.TestAttributeContainer) | ||
| try: | ||
| schema = test_store._GetAttributeContainerSchema( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(schema, test_lib.TestAttributeContainer.SCHEMA) | ||
| finally: | ||
| manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| test_lib.TestAttributeContainer) | ||
| # TODO: add tests for _SetAttributeContainerNextSequenceNumber | ||
| def testSetStorageProfiler(self): | ||
| """Tests the SetStorageProfiler function.""" | ||
| test_store = interface.AttributeContainerStore() | ||
| test_store.SetStorageProfiler(None) | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Tests for the SQLite-based attribute container store.""" | ||
| import os | ||
| import unittest | ||
| from acstore import sqlite_store | ||
| from acstore.containers import manager as containers_manager | ||
| from tests import test_lib | ||
| class _TestSQLiteAttributeContainerStoreV20220716( | ||
| sqlite_store.SQLiteAttributeContainerStore): | ||
| """Test class for testing format compatibility checks.""" | ||
| _FORMAT_VERSION = 20220716 | ||
| _APPEND_COMPATIBLE_FORMAT_VERSION = 20211121 | ||
| _UPGRADE_COMPATIBLE_FORMAT_VERSION = 20211121 | ||
| _READ_COMPATIBLE_FORMAT_VERSION = 20211121 | ||
| class _TestSQLiteAttributeContainerStoreV20221023( | ||
| sqlite_store.SQLiteAttributeContainerStore): | ||
| """Test class for testing format compatibility checks.""" | ||
| _FORMAT_VERSION = 20221023 | ||
| _APPEND_COMPATIBLE_FORMAT_VERSION = 20221023 | ||
| _UPGRADE_COMPATIBLE_FORMAT_VERSION = 20221023 | ||
| _READ_COMPATIBLE_FORMAT_VERSION = 20211121 | ||
| # TODO add tests for PythonAST2SQL. | ||
| class SQLiteAttributeContainerStoreTest(test_lib.BaseTestCase): | ||
| """Tests for the SQLite-based storage file object.""" | ||
| # pylint: disable=protected-access | ||
| def setUp(self): | ||
| """Sets up the needed objects used throughout the test.""" | ||
| containers_manager.AttributeContainersManager.RegisterAttributeContainer( | ||
| test_lib.TestAttributeContainer) | ||
| def tearDown(self): | ||
| """Cleans up the needed objects used throughout the test.""" | ||
| containers_manager.AttributeContainersManager.DeregisterAttributeContainer( | ||
| test_lib.TestAttributeContainer) | ||
| def testCacheAttributeContainerByIndex(self): | ||
| """Tests the _CacheAttributeContainerByIndex function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory(): | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| self.assertEqual(len(test_store._attribute_container_cache), 0) | ||
| test_store._CacheAttributeContainerByIndex(attribute_container, 0) | ||
| self.assertEqual(len(test_store._attribute_container_cache), 1) | ||
| def testCheckStorageMetadata(self): | ||
| """Tests the _CheckStorageMetadata function.""" | ||
| with test_lib.TempDirectory(): | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| metadata_values = { | ||
| 'format_version': '{0:d}'.format(test_store._FORMAT_VERSION)} | ||
| test_store._CheckStorageMetadata(metadata_values) | ||
| metadata_values['format_version'] = 'bogus' | ||
| with self.assertRaises(IOError): | ||
| test_store._CheckStorageMetadata(metadata_values) | ||
| metadata_values['format_version'] = '1' | ||
| with self.assertRaises(IOError): | ||
| test_store._CheckStorageMetadata(metadata_values) | ||
| metadata_values['format_version'] = '{0:d}'.format( | ||
| test_store._FORMAT_VERSION) | ||
| def testCreateAttributeContainerTable(self): | ||
| """Tests the _CreateAttributeContainerTable function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| with self.assertRaises(IOError): | ||
| test_store._CreateAttributeContainerTable( | ||
| attribute_container.CONTAINER_TYPE) | ||
| finally: | ||
| test_store.Close() | ||
| # TODO: add tests for _CreatetAttributeContainerFromRow | ||
| # TODO: add tests for _Flush | ||
| # TODO: add tests for _FlushWriteCache | ||
| def testGetAttributeContainersWithFilter(self): | ||
| """Tests the _GetAttributeContainersWithFilter function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d' | ||
| column_names = ['attribute'] | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| containers = list(test_store._GetAttributeContainersWithFilter( | ||
| attribute_container.CONTAINER_TYPE, column_names=column_names)) | ||
| self.assertEqual(len(containers), 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| containers = list(test_store._GetAttributeContainersWithFilter( | ||
| attribute_container.CONTAINER_TYPE, column_names=column_names)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store._GetAttributeContainersWithFilter( | ||
| attribute_container.CONTAINER_TYPE, column_names=column_names, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store._GetAttributeContainersWithFilter( | ||
| attribute_container.CONTAINER_TYPE, column_names=column_names, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 0) | ||
| with self.assertRaises(IOError): | ||
| list(test_store._GetAttributeContainersWithFilter( | ||
| 'bogus', column_names=column_names)) | ||
| finally: | ||
| test_store.Close() | ||
| def testGetCachedAttributeContainer(self): | ||
| """Tests the _GetCachedAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory(): | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| cached_container = test_store._GetCachedAttributeContainer( | ||
| attribute_container.CONTAINER_TYPE, 1) | ||
| self.assertIsNone(cached_container) | ||
| test_store._CacheAttributeContainerByIndex(attribute_container, 1) | ||
| cached_container = test_store._GetCachedAttributeContainer( | ||
| attribute_container.CONTAINER_TYPE, 1) | ||
| self.assertIsNotNone(cached_container) | ||
| def testHasTable(self): | ||
| """Tests the _HasTable function.""" | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| result = test_store._HasTable('test_container') | ||
| self.assertTrue(result) | ||
| result = test_store._HasTable('bogus') | ||
| self.assertFalse(result) | ||
| finally: | ||
| test_store.Close() | ||
| def testRaiseIfNotReadable(self): | ||
| """Tests the _RaiseIfNotReadable function.""" | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| with self.assertRaises(IOError): | ||
| test_store._RaiseIfNotReadable() | ||
| def testRaiseIfNotWritable(self): | ||
| """Tests the _RaiseIfNotWritable function.""" | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| with self.assertRaises(IOError): | ||
| test_store._RaiseIfNotWritable() | ||
| # TODO: add tests for _ReadAndCheckStorageMetadata | ||
| # TODO: add tests for _ReadMetadata | ||
| # TODO: add tests for _UpdateStorageMetadataFormatVersion | ||
| def testWriteExistingAttributeContainer(self): | ||
| """Tests the _WriteExistingAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store._WriteNewAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store._WriteExistingAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| finally: | ||
| test_store.Close() | ||
| # TODO: add tests for _WriteMetadata | ||
| # TODO: add tests for _WriteMetadataValue | ||
| def testWriteNewAttributeContainer(self): | ||
| """Tests the _WriteNewAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store._WriteNewAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| finally: | ||
| test_store.Close() | ||
| def testAddAttributeContainer(self): | ||
| """Tests the AddAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| finally: | ||
| test_store.Close() | ||
| with self.assertRaises(IOError): | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| # TODO: add tests for CheckSupportedFormat | ||
| def testGetAttributeContainerByIdentifier(self): | ||
| """Tests the GetAttributeContainerByIdentifier function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| identifier = attribute_container.GetIdentifier() | ||
| container = test_store.GetAttributeContainerByIdentifier( | ||
| attribute_container.CONTAINER_TYPE, identifier) | ||
| self.assertIsNotNone(container) | ||
| identifier.sequence_number = 99 | ||
| container = test_store.GetAttributeContainerByIdentifier( | ||
| attribute_container.CONTAINER_TYPE, identifier) | ||
| self.assertIsNone(container) | ||
| finally: | ||
| test_store.Close() | ||
| def testGetAttributeContainerByIndex(self): | ||
| """Tests the GetAttributeContainerByIndex function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| container = test_store.GetAttributeContainerByIndex( | ||
| attribute_container.CONTAINER_TYPE, 0) | ||
| self.assertIsNone(container) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| container = test_store.GetAttributeContainerByIndex( | ||
| attribute_container.CONTAINER_TYPE, 0) | ||
| self.assertIsNotNone(container) | ||
| with self.assertRaises(IOError): | ||
| test_store.GetAttributeContainerByIndex('bogus', 0) | ||
| finally: | ||
| test_store.Close() | ||
| def testGetAttributeContainers(self): | ||
| """Tests the GetAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d' | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE)) | ||
| self.assertEqual(len(containers), 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 1) | ||
| filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"' | ||
| containers = list(test_store.GetAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE, | ||
| filter_expression=filter_expression)) | ||
| self.assertEqual(len(containers), 0) | ||
| with self.assertRaises(IOError): | ||
| list(test_store.GetAttributeContainers('bogus')) | ||
| finally: | ||
| test_store.Close() | ||
| def testGetNumberOfAttributeContainers(self): | ||
| """Tests the GetNumberOfAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| # Test for a supported container type that does not have a table | ||
| # present in the storage file. | ||
| query = 'DROP TABLE {0:s}'.format(attribute_container.CONTAINER_TYPE) | ||
| test_store._cursor.execute(query) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| finally: | ||
| test_store.Close() | ||
| def testHasAttributeContainers(self): | ||
| """Tests the HasAttributeContainers function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| result = test_store.HasAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertFalse(result) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| result = test_store.HasAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertTrue(result) | ||
| result = test_store.HasAttributeContainers('bogus') | ||
| self.assertFalse(result) | ||
| finally: | ||
| test_store.Close() | ||
| # TODO: add tests for Open and Close | ||
| def testUpdateAttributeContainer(self): | ||
| """Tests the UpdateAttributeContainer function.""" | ||
| attribute_container = test_lib.TestAttributeContainer() | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| test_path = os.path.join(temp_directory, 'acstore.sqlite') | ||
| test_store = sqlite_store.SQLiteAttributeContainerStore() | ||
| test_store.Open(path=test_path, read_only=False) | ||
| try: | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 0) | ||
| test_store.AddAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| test_store.UpdateAttributeContainer(attribute_container) | ||
| number_of_containers = test_store.GetNumberOfAttributeContainers( | ||
| attribute_container.CONTAINER_TYPE) | ||
| self.assertEqual(number_of_containers, 1) | ||
| finally: | ||
| test_store.Close() | ||
| def testVersionCompatibility(self): | ||
| """Tests the version compatibility methods.""" | ||
| with test_lib.TempDirectory() as temp_directory: | ||
| v1_storage_path = os.path.join(temp_directory, 'v20220716.sqlite') | ||
| v1_test_store = _TestSQLiteAttributeContainerStoreV20220716() | ||
| v1_test_store.Open(path=v1_storage_path, read_only=False) | ||
| v1_test_store.Close() | ||
| v2_test_store_rw = _TestSQLiteAttributeContainerStoreV20221023() | ||
| with self.assertRaises((IOError, OSError)): | ||
| v2_test_store_rw.Open(path=v1_storage_path, read_only=False) | ||
| v2_test_store_ro = _TestSQLiteAttributeContainerStoreV20221023() | ||
| v2_test_store_ro.Open(path=v1_storage_path, read_only=True) | ||
| v2_test_store_ro.Close() | ||
| if __name__ == '__main__': | ||
| unittest.main() |
| # -*- coding: utf-8 -*- | ||
| """Functions and classes for testing.""" | ||
| import shutil | ||
| import tempfile | ||
| import unittest | ||
| from acstore.containers import interface as containers_interface | ||
| class TestAttributeContainer(containers_interface.AttributeContainer): | ||
| """Attribute container for testing purposes. | ||
| Attributes: | ||
| attribute (str): attribute for testing purposes. | ||
| """ | ||
| CONTAINER_TYPE = 'test_container' | ||
| SCHEMA = {'attribute': 'str'} | ||
| def __init__(self): | ||
| """Initializes an attribute container.""" | ||
| super(TestAttributeContainer, self).__init__() | ||
| self.attribute = None | ||
| class BaseTestCase(unittest.TestCase): | ||
| """The base test case.""" | ||
| # Show full diff results. | ||
| maxDiff = None | ||
| class TempDirectory(object): | ||
| """Class that implements a temporary directory.""" | ||
| def __init__(self): | ||
| """Initializes a temporary directory.""" | ||
| super(TempDirectory, self).__init__() | ||
| self.name = '' | ||
| def __enter__(self): | ||
| """Make this work with the 'with' statement.""" | ||
| self.name = tempfile.mkdtemp() | ||
| return self.name | ||
| def __exit__(self, exception_type, value, traceback): | ||
| """Make this work with the 'with' statement.""" | ||
| shutil.rmtree(self.name, True) |
+65
| [tox] | ||
| envlist = py3{7,8,9,10,11},coverage,docs,lint | ||
| [testenv] | ||
| allowlist_externals = ./run_tests.py | ||
| pip_pre = True | ||
| passenv = | ||
| CFLAGS | ||
| CPPFLAGS | ||
| LDFLAGS | ||
| setenv = | ||
| PYTHONPATH = {toxinidir} | ||
| deps = | ||
| -rrequirements.txt | ||
| -rtest_requirements.txt | ||
| coverage: coverage | ||
| commands = | ||
| py3{7,8,9,10,11}: ./run_tests.py | ||
| coverage: coverage erase | ||
| coverage: coverage run --source=acstore --omit="*_test*,*__init__*,*test_lib*" run_tests.py | ||
| [testenv:codecov] | ||
| skip_install = True | ||
| passenv = | ||
| CFLAGS | ||
| CPPFLAGS | ||
| GITHUB_ACTION | ||
| GITHUB_HEAD_REF | ||
| GITHUB_REF | ||
| GITHUB_REPOSITORY | ||
| GITHUB_RUN_ID | ||
| GITHUB_SHA | ||
| LDFLAGS | ||
| deps = | ||
| codecov < 2.1.10 | ||
| commands = | ||
| codecov | ||
| [testenv:docs] | ||
| usedevelop = True | ||
| deps = | ||
| -rdocs/requirements.txt | ||
| commands = | ||
| sphinx-build -b html -d build/doctrees docs dist/docs | ||
| sphinx-build -b linkcheck docs dist/docs | ||
| [testenv:lint] | ||
| skipsdist = True | ||
| pip_pre = True | ||
| passenv = | ||
| CFLAGS | ||
| CPPFLAGS | ||
| LDFLAGS | ||
| setenv = | ||
| PYTHONPATH = {toxinidir} | ||
| deps = | ||
| -rrequirements.txt | ||
| -rtest_requirements.txt | ||
| pylint >= 2.14.0, < 2.15.0 | ||
| commands = | ||
| pylint --version | ||
| # Ignore setup.py for now due to: | ||
| # setup.py:15:0: E0001: Cannot import 'distutils.command.bdist_msi' due to | ||
| # syntax error 'expected an indented block (<unknown>, line 347)' (syntax-error) | ||
| pylint --rcfile=.pylintrc acstore tests |
| #!/bin/bash | ||
| # | ||
| # Script that makes changes in preparation of a new release, such as updating | ||
| # the version and documentation. | ||
| EXIT_FAILURE=1; | ||
| EXIT_SUCCESS=0; | ||
| VERSION=`date -u +"%Y%m%d"` | ||
| DPKG_DATE=`date -R` | ||
| # Update the Python module version. | ||
| sed "s/__version__ = '[0-9]*'/__version__ = '${VERSION}'/" -i acstore/__init__.py | ||
| # Update the version in the dpkg configuration files. | ||
| cat > config/dpkg/changelog << EOT | ||
| acstore (${VERSION}-1) unstable; urgency=low | ||
| * Auto-generated | ||
| -- Log2Timeline maintainers <log2timeline-maintainers@googlegroups.com> ${DPKG_DATE} | ||
| EOT | ||
| # Regenerate the API documentation. | ||
| tox -edocs | ||
| exit ${EXIT_SUCCESS}; | ||
@@ -1,14 +0,16 @@ | ||
| Metadata-Version: 1.1 | ||
| Metadata-Version: 2.1 | ||
| Name: acstore | ||
| Version: 20171013 | ||
| Summary: Attribute Container Storage (ACStore) | ||
| Version: 20221230 | ||
| Summary: Attribute Container Storage (ACStore). | ||
| Home-page: https://github.com/log2timeline/acstore | ||
| Author: dfDateTime development team | ||
| Author-email: log2timeline-dev@googlegroups.com | ||
| Maintainer: Log2Timeline maintainers | ||
| Maintainer-email: log2timeline-maintainers@googlegroups.com | ||
| License: Apache License, Version 2.0 | ||
| Description: ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write plaso storage files. | ||
| Platform: UNKNOWN | ||
| Classifier: Development Status :: 3 - Alpha | ||
| Classifier: | ||
| Classifier: Environment :: Console | ||
| Classifier: Operating System :: OS Independent | ||
| Classifier: Programming Language :: Python | ||
| Description-Content-Type: text/plain | ||
| License-File: LICENSE | ||
| ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write attribute container storage files. |
@@ -0,1 +1,3 @@ | ||
| .pylintrc | ||
| .style.yapf | ||
| ACKNOWLEDGEMENTS | ||
@@ -6,11 +8,30 @@ AUTHORS | ||
| README | ||
| acstore.ini | ||
| appveyor.yml | ||
| dependencies.ini | ||
| requirements.txt | ||
| run_tests.py | ||
| setup.cfg | ||
| setup.py | ||
| test_dependencies.ini | ||
| test_requirements.txt | ||
| tox.ini | ||
| .github/workflows/test_docker.yml | ||
| .github/workflows/test_docs.yml | ||
| .github/workflows/test_tox.yml | ||
| acstore/__init__.py | ||
| acstore/fake_store.py | ||
| acstore/interface.py | ||
| acstore/sqlite_store.py | ||
| acstore.egg-info/PKG-INFO | ||
| acstore.egg-info/SOURCES.txt | ||
| acstore.egg-info/dependency_links.txt | ||
| acstore.egg-info/requires.txt | ||
| acstore.egg-info/top_level.txt | ||
| acstore/containers/__init__.py | ||
| acstore/containers/interface.py | ||
| acstore/containers/manager.py | ||
| config/appveyor/install.ps1 | ||
| config/appveyor/install.sh | ||
| config/appveyor/runtests.sh | ||
| config/dpkg/changelog | ||
@@ -25,5 +46,21 @@ config/dpkg/clean | ||
| config/dpkg/source/format | ||
| config/travis/install.sh | ||
| docs/conf.py | ||
| docs/index.rst | ||
| docs/requirements.txt | ||
| docs/sources/api/acstore.containers.rst | ||
| docs/sources/api/acstore.rst | ||
| docs/sources/api/modules.rst | ||
| docs/sources/user/Installation-instructions.md | ||
| docs/sources/user/index.rst | ||
| tests/__init__.py | ||
| tests/fake_store.py | ||
| tests/interface.py | ||
| tests/sqlite_store.py | ||
| tests/test_lib.py | ||
| tests/containers/__init__.py | ||
| tests/containers/interface.py | ||
| tests/containers/manager.py | ||
| utils/__init__.py | ||
| utils/check_dependencies.py | ||
| utils/dependencies.py | ||
| utils/dependencies.py | ||
| utils/update_release.sh |
@@ -8,2 +8,2 @@ # -*- coding: utf-8 -*- | ||
| __version__ = '20171013' | ||
| __version__ = '20221230' |
+1
-0
| include ACKNOWLEDGEMENTS AUTHORS LICENSE README | ||
| include dependencies.ini run_tests.py utils/__init__.py utils/dependencies.py | ||
| include utils/check_dependencies.py | ||
| include requirements.txt test_requirements.txt | ||
| exclude .gitignore | ||
@@ -5,0 +6,0 @@ exclude *.pyc |
+10
-8
@@ -1,14 +0,16 @@ | ||
| Metadata-Version: 1.1 | ||
| Metadata-Version: 2.1 | ||
| Name: acstore | ||
| Version: 20171013 | ||
| Summary: Attribute Container Storage (ACStore) | ||
| Version: 20221230 | ||
| Summary: Attribute Container Storage (ACStore). | ||
| Home-page: https://github.com/log2timeline/acstore | ||
| Author: dfDateTime development team | ||
| Author-email: log2timeline-dev@googlegroups.com | ||
| Maintainer: Log2Timeline maintainers | ||
| Maintainer-email: log2timeline-maintainers@googlegroups.com | ||
| License: Apache License, Version 2.0 | ||
| Description: ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write plaso storage files. | ||
| Platform: UNKNOWN | ||
| Classifier: Development Status :: 3 - Alpha | ||
| Classifier: | ||
| Classifier: Environment :: Console | ||
| Classifier: Operating System :: OS Independent | ||
| Classifier: Programming Language :: Python | ||
| Description-Content-Type: text/plain | ||
| License-File: LICENSE | ||
| ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write attribute container storage files. |
+2
-3
| ACStore, or Attribute Container Storage, provides a stand-alone implementation | ||
| to read and write plaso storage files. | ||
| to read and write Attribute Container stores, such as Plaso storage files. | ||
| For more information see: | ||
| * Project documentation: https://github.com/log2timeline/acstore/wiki/Home | ||
| * How to build from source: https://github.com/log2timeline/acstore/wiki/Building | ||
| * Project documentation: https://acstore.readthedocs.io/en/latest | ||
+2
-2
@@ -1,2 +0,2 @@ | ||
| #!/usr/bin/python | ||
| #!/usr/bin/env python | ||
| # -*- coding: utf-8 -*- | ||
@@ -9,3 +9,3 @@ """Script to run the tests.""" | ||
| # Change PYTHONPATH to include dependencies. | ||
| sys.path.insert(0, u'.') | ||
| sys.path.insert(0, '.') | ||
@@ -12,0 +12,0 @@ import utils.dependencies # pylint: disable=wrong-import-position |
+7
-1
@@ -0,1 +1,4 @@ | ||
| [metadata] | ||
| license_files = LICENSE | ||
| [bdist_rpm] | ||
@@ -8,4 +11,7 @@ release = 1 | ||
| README | ||
| build_requires = python-setuptools | ||
| build_requires = python3-setuptools | ||
| [bdist_wheel] | ||
| universal = 1 | ||
| [egg_info] | ||
@@ -12,0 +18,0 @@ tag_build = |
+93
-37
@@ -1,6 +0,7 @@ | ||
| #!/usr/bin/python | ||
| #!/usr/bin/env python | ||
| # -*- coding: utf-8 -*- | ||
| """Installation and deployment script.""" | ||
| from __future__ import print_function | ||
| import os | ||
| import pkg_resources | ||
| import sys | ||
@@ -23,5 +24,6 @@ | ||
| if sys.version < '2.7': | ||
| print('Unsupported Python version: {0:s}.'.format(sys.version)) | ||
| print('Supported Python versions are 2.7 or a later 2.x version.') | ||
| version_tuple = (sys.version_info[0], sys.version_info[1]) | ||
| if version_tuple < (3, 7): | ||
| print(f'Unsupported Python version: {sys.version:s}, version 3.7 or higher ' | ||
| f'required.') | ||
| sys.exit(1) | ||
@@ -41,2 +43,3 @@ | ||
| # pylint: disable=invalid-name | ||
| def run(self): | ||
@@ -57,2 +60,3 @@ """Builds an MSI.""" | ||
| # pylint: disable=invalid-name | ||
| def _make_spec_file(self): | ||
@@ -70,8 +74,6 @@ """Generates the text of an RPM spec file. | ||
| if sys.version_info[0] < 3: | ||
| python_package = 'python' | ||
| else: | ||
| python_package = 'python3' | ||
| python_package = 'python3' | ||
| description = [] | ||
| requires = '' | ||
| summary = '' | ||
@@ -83,10 +85,11 @@ in_description = False | ||
| if line.startswith('Summary: '): | ||
| summary = line | ||
| summary = line[9:] | ||
| elif line.startswith('BuildRequires: '): | ||
| line = 'BuildRequires: {0:s}-setuptools'.format(python_package) | ||
| line = (f'BuildRequires: {python_package:s}-setuptools, ' | ||
| f'{python_package:s}-devel') | ||
| elif line.startswith('Requires: '): | ||
| if python_package == 'python3': | ||
| line = line.replace('python', 'python3') | ||
| requires = line[10:] | ||
| continue | ||
@@ -96,15 +99,30 @@ elif line.startswith('%description'): | ||
| elif line.startswith('python setup.py build'): | ||
| if python_package == 'python3': | ||
| line = '%py3_build' | ||
| else: | ||
| line = '%py2_build' | ||
| elif line.startswith('python setup.py install'): | ||
| if python_package == 'python3': | ||
| line = '%py3_install' | ||
| else: | ||
| line = '%py2_install' | ||
| elif line.startswith('%files'): | ||
| # Cannot use %{_libdir} here since it can expand to "lib64". | ||
| lines = [ | ||
| '%files', | ||
| f'%files -n {python_package:s}-%{{name}}', | ||
| '%defattr(644,root,root,755)', | ||
| '%doc ACKNOWLEDGEMENTS AUTHORS LICENSE README', | ||
| '%{_prefix}/lib/python*/site-packages/acstore/*.py', | ||
| '%{_prefix}/lib/python*/site-packages/acstore*.egg-info/*', | ||
| '%exclude %{_prefix}/lib/python*/site-packages/acstore/*.pyc', | ||
| '%exclude %{_prefix}/lib/python*/site-packages/acstore/*.pyo', | ||
| ('%exclude %{_prefix}/lib/python*/site-packages/acstore/' | ||
| '__pycache__/*')] | ||
| '%license LICENSE', | ||
| '%doc ACKNOWLEDGEMENTS AUTHORS README'] | ||
| lines.extend([ | ||
| '%{python3_sitelib}/acstore/*.py', | ||
| '%{python3_sitelib}/acstore/*/*.py', | ||
| '%{python3_sitelib}/acstore*.egg-info/*', | ||
| '', | ||
| '%exclude %{_prefix}/share/doc/*', | ||
| '%exclude %{python3_sitelib}/acstore/__pycache__/*', | ||
| '%exclude %{python3_sitelib}/acstore/*/__pycache__/*']) | ||
| python_spec_file.extend(lines) | ||
@@ -116,8 +134,13 @@ break | ||
| python_spec_file.append( | ||
| '%package -n {0:s}-%{{name}}'.format(python_package)) | ||
| python_spec_file.append('{0:s}'.format(summary)) | ||
| python_spec_file.append('') | ||
| python_spec_file.append( | ||
| '%description -n {0:s}-%{{name}}'.format(python_package)) | ||
| python_spec_file.append(f'%package -n {python_package:s}-%{{name}}') | ||
| python_summary = f'Python 3 module of {summary:s}' | ||
| if requires: | ||
| python_spec_file.append(f'Requires: {requires:s}') | ||
| python_spec_file.extend([ | ||
| f'Summary: {python_summary:s}', | ||
| '', | ||
| f'%description -n {python_package:s}-%{{name}}']) | ||
| python_spec_file.extend(description) | ||
@@ -137,9 +160,41 @@ | ||
| def parse_requirements_from_file(path): | ||
| """Parses requirements from a requirements file. | ||
| Args: | ||
| path (str): path to the requirements file. | ||
| Returns: | ||
| list[str]: name and optional version information of the required packages. | ||
| """ | ||
| requirements = [] | ||
| if os.path.isfile(path): | ||
| with open(path, 'r') as file_object: | ||
| file_contents = file_object.read() | ||
| for requirement in pkg_resources.parse_requirements(file_contents): | ||
| try: | ||
| name = str(requirement.req) | ||
| except AttributeError: | ||
| name = str(requirement) | ||
| if not name.startswith('pip '): | ||
| requirements.append(name) | ||
| return requirements | ||
| acstore_description = ( | ||
| 'Attribute Container Storage (ACStore)') | ||
| 'Attribute Container Storage (ACStore).') | ||
| acstore_long_description = ( | ||
| 'ACStore, or Attribute Container Storage, provides a stand-alone ' | ||
| 'implementation to read and write plaso storage files.') | ||
| 'implementation to read and write attribute container storage files.') | ||
| command_classes = {} | ||
| if BdistMSICommand: | ||
| command_classes['bdist_msi'] = BdistMSICommand | ||
| if BdistRPMCommand: | ||
| command_classes['bdist_rpm'] = BdistRPMCommand | ||
| setup( | ||
@@ -150,11 +205,10 @@ name='acstore', | ||
| long_description=acstore_long_description, | ||
| long_description_content_type='text/plain', | ||
| license='Apache License, Version 2.0', | ||
| url='https://github.com/log2timeline/acstore', | ||
| maintainer='dfDateTime development team', | ||
| maintainer_email='log2timeline-dev@googlegroups.com', | ||
| cmdclass={ | ||
| 'bdist_msi': BdistMSICommand, | ||
| 'bdist_rpm': BdistRPMCommand}, | ||
| maintainer='Log2Timeline maintainers', | ||
| maintainer_email='log2timeline-maintainers@googlegroups.com', | ||
| cmdclass=command_classes, | ||
| classifiers=[ | ||
| 'Development Status :: 3 - Alpha', | ||
| '', | ||
| 'Environment :: Console', | ||
@@ -165,3 +219,3 @@ 'Operating System :: OS Independent', | ||
| packages=find_packages('.', exclude=[ | ||
| 'examples', 'tests', 'tests.*', 'utils']), | ||
| 'docs', 'tests', 'tests.*', 'utils']), | ||
| package_dir={ | ||
@@ -174,2 +228,4 @@ 'acstore': 'acstore' | ||
| ], | ||
| install_requires=parse_requirements_from_file('requirements.txt'), | ||
| tests_require=parse_requirements_from_file('test_requirements.txt'), | ||
| ) |
@@ -1,2 +0,2 @@ | ||
| #!/usr/bin/python | ||
| #!/usr/bin/env python | ||
| # -*- coding: utf-8 -*- | ||
@@ -8,3 +8,3 @@ """Script to check for the availability and version of dependencies.""" | ||
| # Change PYTHONPATH to include dependencies. | ||
| sys.path.insert(0, u'.') | ||
| sys.path.insert(0, '.') | ||
@@ -14,5 +14,6 @@ import utils.dependencies # pylint: disable=wrong-import-position | ||
| if __name__ == u'__main__': | ||
| if __name__ == '__main__': | ||
| dependency_helper = utils.dependencies.DependencyHelper() | ||
| dependency_helper.CheckDependencies() | ||
| if not dependency_helper.CheckDependencies(): | ||
| sys.exit(1) |
+102
-88
| # -*- coding: utf-8 -*- | ||
| """Helper to check for availability and version of dependencies.""" | ||
| from __future__ import print_function | ||
| from __future__ import unicode_literals | ||
| import configparser | ||
| import os | ||
| import re | ||
| try: | ||
| import ConfigParser as configparser | ||
| except ImportError: | ||
| import configparser # pylint: disable=import-error | ||
| class DependencyDefinition(object): | ||
@@ -23,8 +17,15 @@ """Dependency definition. | ||
| the dependency. | ||
| maximum_version (str): maximum supported version. | ||
| minimum_version (str): minimum supported version. | ||
| maximum_version (str): maximum supported version, a greater or equal | ||
| version is not supported. | ||
| minimum_version (str): minimum supported version, a lesser version is | ||
| not supported. | ||
| name (str): name of (the Python module that provides) the dependency. | ||
| pypi_name (str): name of the PyPI package that provides the dependency. | ||
| python2_only (bool): True if the dependency is only supported by Python 2. | ||
| python3_only (bool): True if the dependency is only supported by Python 3. | ||
| rpm_name (str): name of the rpm package that provides the dependency. | ||
| skip_check (bool): True if the dependency should be skipped by the | ||
| CheckDependencies or CheckTestDependencies methods of DependencyHelper. | ||
| skip_requires (bool): True if the dependency should be excluded from | ||
| requirements.txt or setup.py install_requires. | ||
| version_property (str): name of the version attribute or function. | ||
@@ -34,3 +35,3 @@ """ | ||
| def __init__(self, name): | ||
| """Initializes a dependency configuation. | ||
| """Initializes a dependency configuration. | ||
@@ -49,3 +50,6 @@ Args: | ||
| self.python2_only = False | ||
| self.python3_only = False | ||
| self.rpm_name = None | ||
| self.skip_check = None | ||
| self.skip_requires = None | ||
| self.version_property = None | ||
@@ -65,3 +69,6 @@ | ||
| 'python2_only', | ||
| 'python3_only', | ||
| 'rpm_name', | ||
| 'skip_check', | ||
| 'skip_requires', | ||
| 'version_property']) | ||
@@ -83,3 +90,3 @@ | ||
| except configparser.NoOptionError: | ||
| return | ||
| return None | ||
@@ -95,4 +102,4 @@ def Read(self, file_object): | ||
| """ | ||
| config_parser = configparser.RawConfigParser() | ||
| config_parser.readfp(file_object) # pylint: disable=deprecated-method | ||
| config_parser = configparser.ConfigParser(interpolation=None) | ||
| config_parser.read_file(file_object) | ||
@@ -109,22 +116,36 @@ for section_name in config_parser.sections(): | ||
| class DependencyHelper(object): | ||
| """Dependency helper.""" | ||
| """Dependency helper. | ||
| Attributes: | ||
| dependencies (dict[str, DependencyDefinition]): dependencies. | ||
| """ | ||
| _VERSION_NUMBERS_REGEX = re.compile(r'[0-9.]+') | ||
| _VERSION_SPLIT_REGEX = re.compile(r'\.|\-') | ||
| def __init__(self): | ||
| """Initializes a dependency helper.""" | ||
| def __init__( | ||
| self, dependencies_file='dependencies.ini', | ||
| test_dependencies_file='test_dependencies.ini'): | ||
| """Initializes a dependency helper. | ||
| Args: | ||
| dependencies_file (Optional[str]): path to the dependencies configuration | ||
| file. | ||
| test_dependencies_file (Optional[str]): path to the test dependencies | ||
| configuration file. | ||
| """ | ||
| super(DependencyHelper, self).__init__() | ||
| self._dependencies = {} | ||
| self._test_dependencies = {} | ||
| self.dependencies = {} | ||
| dependency_reader = DependencyDefinitionReader() | ||
| with open('dependencies.ini', 'r') as file_object: | ||
| with open(dependencies_file, 'r', encoding='utf-8') as file_object: | ||
| for dependency in dependency_reader.Read(file_object): | ||
| self._dependencies[dependency.name] = dependency | ||
| self.dependencies[dependency.name] = dependency | ||
| dependency = DependencyDefinition('mock') | ||
| dependency.minimum_version = '0.7.1' | ||
| dependency.version_property = '__version__' | ||
| self._test_dependencies['mock'] = dependency | ||
| if os.path.exists(test_dependencies_file): | ||
| with open(test_dependencies_file, 'r', encoding='utf-8') as file_object: | ||
| for dependency in dependency_reader.Read(file_object): | ||
| self._test_dependencies[dependency.name] = dependency | ||
@@ -138,3 +159,3 @@ def _CheckPythonModule(self, dependency): | ||
| Returns: | ||
| tuple: consists: | ||
| tuple: containing: | ||
@@ -147,6 +168,5 @@ bool: True if the Python module is available and conforms to | ||
| if not module_object: | ||
| status_message = 'missing: {0:s}'.format(dependency.name) | ||
| return dependency.is_optional, status_message | ||
| return False, f'missing: {dependency.name:s}' | ||
| if not dependency.version_property or not dependency.minimum_version: | ||
| if not dependency.version_property: | ||
| return True, dependency.name | ||
@@ -171,3 +191,3 @@ | ||
| Returns: | ||
| tuple: consists: | ||
| tuple: containing: | ||
@@ -188,65 +208,54 @@ bool: True if the Python module is available and conforms to | ||
| if not module_version: | ||
| status_message = ( | ||
| 'unable to determine version information for: {0:s}').format( | ||
| module_name) | ||
| return False, status_message | ||
| return False, ( | ||
| f'unable to determine version information for: {module_name:s}') | ||
| # Make sure the module version is a string. | ||
| module_version = '{0!s}'.format(module_version) | ||
| module_version = f'{module_version!s}' | ||
| # Split the version string and convert every digit into an integer. | ||
| # A string compare of both version strings will yield an incorrect result. | ||
| module_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(module_version))) | ||
| minimum_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(minimum_version))) | ||
| if module_version_map < minimum_version_map: | ||
| status_message = ( | ||
| '{0:s} version: {1!s} is too old, {2!s} or later required').format( | ||
| module_name, module_version, minimum_version) | ||
| return False, status_message | ||
| # Strip any semantic suffixes such as a1, b1, pre, post, rc, dev. | ||
| module_version = self._VERSION_NUMBERS_REGEX.findall(module_version)[0] | ||
| if maximum_version: | ||
| maximum_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(maximum_version))) | ||
| if module_version_map > maximum_version_map: | ||
| status_message = ( | ||
| '{0:s} version: {1!s} is too recent, {2!s} or earlier ' | ||
| 'required').format(module_name, module_version, maximum_version) | ||
| return False, status_message | ||
| if module_version[-1] == '.': | ||
| module_version = module_version[:-1] | ||
| status_message = '{0:s} version: {1!s}'.format(module_name, module_version) | ||
| return True, status_message | ||
| try: | ||
| module_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(module_version))) | ||
| except ValueError: | ||
| return False, ( | ||
| f'unable to parse module version: {module_name:s} {module_version:s}') | ||
| def _CheckSQLite3(self): | ||
| """Checks the availability of sqlite3. | ||
| if minimum_version: | ||
| try: | ||
| minimum_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(minimum_version))) | ||
| except ValueError: | ||
| return False, ( | ||
| f'unable to parse minimum version: {module_name:s} ' | ||
| f'{minimum_version:s}') | ||
| Returns: | ||
| tuple: consists: | ||
| if module_version_map < minimum_version_map: | ||
| return False, ( | ||
| f'{module_name:s} version: {module_version!s} is too old, ' | ||
| f'{minimum_version!s} or later required') | ||
| bool: True if the Python module is available and conforms to | ||
| the minimum required version, False otherwise. | ||
| str: status message. | ||
| """ | ||
| # On Windows sqlite3 can be provided by both pysqlite2.dbapi2 and | ||
| # sqlite3. sqlite3 is provided with the Python installation and | ||
| # pysqlite2.dbapi2 by the pysqlite2 Python module. Typically | ||
| # pysqlite2.dbapi2 would contain a newer version of sqlite3, hence | ||
| # we check for its presence first. | ||
| module_name = 'pysqlite2.dbapi2' | ||
| minimum_version = '3.7.8' | ||
| if maximum_version: | ||
| try: | ||
| maximum_version_map = list( | ||
| map(int, self._VERSION_SPLIT_REGEX.split(maximum_version))) | ||
| except ValueError: | ||
| return False, ( | ||
| f'unable to parse maximum version: {module_name:s} ' | ||
| f'{maximum_version:s}') | ||
| module_object = self._ImportPythonModule(module_name) | ||
| if not module_object: | ||
| module_name = 'sqlite3' | ||
| if module_version_map > maximum_version_map: | ||
| return False, ( | ||
| f'{module_name:s} version: {module_version!s} is too recent, ' | ||
| f'{maximum_version!s} or earlier required') | ||
| module_object = self._ImportPythonModule(module_name) | ||
| if not module_object: | ||
| status_message = 'missing: {0:s}.'.format(module_name) | ||
| return False, status_message | ||
| return True, f'{module_name:s} version: {module_version!s}' | ||
| return self._CheckPythonModuleVersion( | ||
| module_name, module_object, 'sqlite_version', minimum_version, None) | ||
| def _ImportPythonModule(self, module_name): | ||
@@ -264,3 +273,3 @@ """Imports a Python module. | ||
| except ImportError: | ||
| return | ||
| return None | ||
@@ -283,2 +292,3 @@ # If the module name contains dots get the upper most module object. | ||
| status_message (str): status message. | ||
| verbose_output (Optional[bool]): True if output should be verbose. | ||
| """ | ||
@@ -291,6 +301,6 @@ if not result or dependency.is_optional: | ||
| print('{0:s}\t{1:s}.'.format(status_indicator, status_message)) | ||
| print(f'{status_indicator:s}\t{status_message:s}') | ||
| elif verbose_output: | ||
| print('[OK]\t\t{0:s}'.format(status_message)) | ||
| print(f'[OK]\t\t{status_message:s}') | ||
@@ -309,9 +319,9 @@ def CheckDependencies(self, verbose_output=True): | ||
| for module_name, dependency in sorted(self._dependencies.items()): | ||
| if module_name == 'sqlite3': | ||
| result, status_message = self._CheckSQLite3() | ||
| else: | ||
| result, status_message = self._CheckPythonModule(dependency) | ||
| for _, dependency in sorted(self.dependencies.items()): | ||
| if dependency.skip_check: | ||
| continue | ||
| if not result: | ||
| result, status_message = self._CheckPythonModule(dependency) | ||
| if not result and not dependency.is_optional: | ||
| check_result = False | ||
@@ -346,4 +356,8 @@ | ||
| key=lambda dependency: dependency.name): | ||
| if dependency.skip_check: | ||
| continue | ||
| result, status_message = self._CheckPythonModule(dependency) | ||
| if not result: | ||
| if not result and not dependency.is_optional: | ||
| check_result = False | ||
@@ -350,0 +364,0 @@ |
| #!/bin/bash | ||
| # | ||
| # Script to set up Travis-CI test VM. | ||
| COVERALL_DEPENDENCIES="python-coverage python-coveralls python-docopt"; | ||
| L2TBINARIES_DEPENDENCIES=""; | ||
| L2TBINARIES_TEST_DEPENDENCIES="funcsigs mock pbr six"; | ||
| PYTHON2_DEPENDENCIES=""; | ||
| PYTHON2_TEST_DEPENDENCIES="python-mock python-tox"; | ||
| # Exit on error. | ||
| set -e; | ||
| if test ${TRAVIS_OS_NAME} = "osx"; | ||
| then | ||
| git clone https://github.com/log2timeline/l2tdevtools.git; | ||
| mv l2tdevtools ../; | ||
| mkdir dependencies; | ||
| PYTHONPATH=../l2tdevtools ../l2tdevtools/tools/update.py --download-directory=dependencies ${L2TBINARIES_DEPENDENCIES} ${L2TBINARIES_TEST_DEPENDENCIES}; | ||
| elif test ${TRAVIS_OS_NAME} = "linux"; | ||
| then | ||
| sudo rm -f /etc/apt/sources.list.d/travis_ci_zeromq3-source.list; | ||
| sudo add-apt-repository ppa:gift/dev -y; | ||
| sudo apt-get update -q; | ||
| # Only install the Python 2 dependencies. | ||
| # Also see: https://docs.travis-ci.com/user/languages/python/#Travis-CI-Uses-Isolated-virtualenvs | ||
| sudo apt-get install -y ${COVERALL_DEPENDENCIES} ${PYTHON2_DEPENDENCIES} ${PYTHON2_TEST_DEPENDENCIES}; | ||
| fi |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
172885
373.11%65
132.14%2717
518.91%