Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

acstore

Package Overview
Dependencies
Maintainers
2
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

acstore - npm Package Compare versions

Comparing version
20171013
to
20221230
+81
.github/workflows/test_docker.yml
# Run tests on Fedora and Ubuntu Docker images using GIFT CORP and GIFT PPA on commit
name: test_docker
on: [push]
permissions: read-all
jobs:
test_fedora:
runs-on: ubuntu-latest
strategy:
matrix:
version: ['37']
container:
image: registry.fedoraproject.org/fedora:${{ matrix.version }}
steps:
- uses: actions/checkout@v2
- name: Set up container
run: |
dnf install -y dnf-plugins-core langpacks-en
- name: Install dependencies
run: |
dnf copr -y enable @gift/dev
dnf install -y @development-tools python3 python3-devel python3-setuptools
- name: Run tests
env:
LANG: C.utf8
run: |
python3 ./run_tests.py
- name: Run end-to-end tests
run: |
if test -f tests/end-to-end.py; then PYTHONPATH=. python3 ./tests/end-to-end.py --debug -c config/end-to-end.ini; fi
- name: Build source distribution
run: |
python3 ./setup.py sdist
- name: Build binary distribution
run: |
python3 ./setup.py bdist
- name: Run build and install test
run: |
python3 ./setup.py build
python3 ./setup.py install
test_ubuntu:
runs-on: ubuntu-latest
strategy:
matrix:
version: ['22.04']
container:
image: ubuntu:${{ matrix.version }}
steps:
- uses: actions/checkout@v2
- name: Set up container
env:
DEBIAN_FRONTEND: noninteractive
run: |
apt-get update -q
apt-get install -y libterm-readline-gnu-perl locales software-properties-common
locale-gen en_US.UTF-8
ln -f -s /usr/share/zoneinfo/UTC /etc/localtime
- name: Install dependencies
run: |
add-apt-repository -y ppa:gift/dev
apt-get update -q
apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools
- name: Run tests
env:
LANG: en_US.UTF-8
run: |
python3 ./run_tests.py
- name: Run end-to-end tests
env:
LANG: en_US.UTF-8
run: |
if test -f tests/end-to-end.py; then PYTHONPATH=. python3 ./tests/end-to-end.py --debug -c config/end-to-end.ini; fi
- name: Build source distribution
run: |
python3 ./setup.py sdist
- name: Build binary distribution
run: |
python3 ./setup.py bdist
- name: Run build and install test
run: |
python3 ./setup.py build
python3 ./setup.py install
# Run docs tox tests on Ubuntu Docker images using GIFT PPA
name: test_docs
on:
pull_request:
branches:
- main
push:
branches:
- main
permissions: read-all
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- python-version: '3.8'
toxenv: 'docs'
container:
image: ubuntu:22.04
steps:
- uses: actions/checkout@v2
- name: Set up container
env:
DEBIAN_FRONTEND: noninteractive
run: |
apt-get update -q
apt-get install -y libterm-readline-gnu-perl locales software-properties-common
locale-gen en_US.UTF-8
ln -f -s /usr/share/zoneinfo/UTC /etc/localtime
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
add-apt-repository -y universe
add-apt-repository -y ppa:deadsnakes/ppa
add-apt-repository -y ppa:gift/dev
apt-get update -q
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools
- name: Install tox
run: |
python3 -m pip install tox
- name: Run tests
env:
LANG: en_US.UTF-8
run: |
tox -e${{ matrix.toxenv }}
# Run tox tests on Ubuntu Docker images using GIFT PPA
name: test_tox
on:
pull_request:
branches:
- main
push:
branches:
- main
permissions: read-all
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- python-version: '3.7'
toxenv: 'py37'
- python-version: '3.8'
toxenv: 'py38,coverage,codecov'
- python-version: '3.9'
toxenv: 'py39'
- python-version: '3.10'
toxenv: 'py310'
- python-version: '3.11'
toxenv: 'py311'
- python-version: '3.11'
toxenv: 'lint'
container:
image: ubuntu:22.04
steps:
- uses: actions/checkout@v2
- name: Set up container
env:
DEBIAN_FRONTEND: noninteractive
run: |
apt-get update -q
apt-get install -y libterm-readline-gnu-perl locales software-properties-common
locale-gen en_US.UTF-8
ln -f -s /usr/share/zoneinfo/UTC /etc/localtime
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
add-apt-repository -y universe
add-apt-repository -y ppa:deadsnakes/ppa
add-apt-repository -y ppa:gift/dev
apt-get update -q
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools
- name: Install tox
run: |
python3 -m pip install tox
- name: Run tests
env:
LANG: en_US.UTF-8
run: |
tox -e${{ matrix.toxenv }}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

[project]
name: acstore
name_description: ACStore
maintainer: Log2Timeline maintainers <log2timeline-maintainers@googlegroups.com>
homepage_url: https://github.com/log2timeline/acstore
description_short: Attribute Container Storage (ACStore).
description_long: ACStore, or Attribute Container Storage, provides a stand-alone
implementation to read and write attribute container storage files.
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
"""The attribute container interface."""
class AttributeContainerIdentifier(object):
"""The attribute container identifier.
The identifier is used to uniquely identify attribute containers.
The value should be unique relative to an attribute container store.
Attributes:
name (str): name of the table (attribute container).
sequence_number (int): sequence number of the attribute container.
"""
def __init__(self, name=None, sequence_number=None):
"""Initializes an attribute container identifier.
Args:
name (Optional[str]): name of the table (attribute container).
sequence_number (Optional[int]): sequence number of the attribute
container.
"""
super(AttributeContainerIdentifier, self).__init__()
self.name = name
self.sequence_number = sequence_number
def CopyFromString(self, identifier_string):
"""Copies the identifier from a string representation.
Args:
identifier_string (str): string representation.
"""
self.name, sequence_number = identifier_string.split('.')
self.sequence_number = int(sequence_number, 10)
def CopyToString(self):
"""Copies the identifier to a string representation.
Returns:
str: unique identifier or None.
"""
if self.name is not None and self.sequence_number is not None:
return '{0:s}.{1:d}'.format(self.name, self.sequence_number)
return None
class AttributeContainer(object):
"""The attribute container interface.
This is the the base class for those object that exists primarily as
a container of attributes with basic accessors and mutators.
The CONTAINER_TYPE class attribute contains a string that identifies
the container type, for example the container type "event" identifiers
an event object.
Attributes are public class members of an serializable type. Protected and
private class members are not to be serialized, with the exception of those
defined in _SERIALIZABLE_PROTECTED_ATTRIBUTES.
"""
CONTAINER_TYPE = None
# Names of protected attributes, those with a leading underscore, that
# should be serialized.
_SERIALIZABLE_PROTECTED_ATTRIBUTES = []
def __init__(self):
"""Initializes an attribute container."""
super(AttributeContainer, self).__init__()
self._identifier = AttributeContainerIdentifier(
name=self.CONTAINER_TYPE, sequence_number=id(self))
def CopyFromDict(self, attributes):
"""Copies the attribute container from a dictionary.
Args:
attributes (dict[str, object]): attribute values per name.
"""
for attribute_name, attribute_value in attributes.items():
# Not using startswith to improve performance.
if (attribute_name[0] != '_' or
attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES):
self.__dict__[attribute_name] = attribute_value
def CopyToDict(self):
"""Copies the attribute container to a dictionary.
Returns:
dict[str, object]: attribute values per name.
"""
return dict(self.GetAttributes())
def GetAttributeNames(self):
"""Retrieves the names of all attributes.
Returns:
list[str]: attribute names.
"""
attribute_names = list(self._SERIALIZABLE_PROTECTED_ATTRIBUTES)
for attribute_name in self.__dict__:
# Not using startswith to improve performance.
if attribute_name[0] != '_':
attribute_names.append(attribute_name)
return attribute_names
def GetAttributes(self):
"""Retrieves the attribute names and values.
Attributes that are set to None are ignored.
Yields:
tuple[str, object]: attribute name and value.
"""
for attribute_name, attribute_value in self.__dict__.items():
# Not using startswith to improve performance.
if attribute_value is not None and (
attribute_name[0] != '_' or
attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES):
yield attribute_name, attribute_value
def GetAttributeValuesHash(self):
"""Retrieves a comparable string of the attribute values.
Returns:
int: hash of comparable string of the attribute values.
"""
return hash(self.GetAttributeValuesString())
def GetAttributeValuesString(self):
"""Retrieves a comparable string of the attribute values.
Returns:
str: comparable string of the attribute values.
"""
attributes = []
for attribute_name, attribute_value in sorted(self.__dict__.items()):
# Not using startswith to improve performance.
if attribute_value is not None and (
attribute_name[0] != '_' or
attribute_name in self._SERIALIZABLE_PROTECTED_ATTRIBUTES):
if isinstance(attribute_value, dict):
attribute_value = sorted(attribute_value.items())
elif isinstance(attribute_value, bytes):
attribute_value = repr(attribute_value)
attributes.append(f'{attribute_name:s}: {attribute_value!s}')
return ', '.join(attributes)
def GetIdentifier(self):
"""Retrieves the identifier.
The identifier is a storage specific value that should not be serialized.
Returns:
AttributeContainerIdentifier: an unique identifier for the container.
"""
return self._identifier
def MatchesExpression(self, expression):
"""Determines if an attribute container matches the expression.
Args:
expression (code|str): expression.
Returns:
bool: True if the attribute container matches the expression, False
otherwise.
"""
result = not expression
if expression:
namespace = dict(self.GetAttributes())
# Make sure __builtins__ contains an empty dictionary.
namespace['__builtins__'] = {}
try:
result = eval(expression, namespace) # pylint: disable=eval-used
except Exception: # pylint: disable=broad-except
pass
return result
def SetIdentifier(self, identifier):
"""Sets the identifier.
The identifier is a storage specific value that should not be serialized.
Args:
identifier (AttributeContainerIdentifier): identifier.
"""
self._identifier = identifier
# -*- coding: utf-8 -*-
"""This file contains the attribute container manager class."""
class AttributeContainersManager(object):
"""Class that implements the attribute container manager."""
_attribute_container_classes = {}
@classmethod
def CreateAttributeContainer(cls, container_type):
"""Creates an instance of a specific attribute container type.
Args:
container_type (str): container type.
Returns:
AttributeContainer: an instance of attribute container.
Raises:
ValueError: if the container type is not supported.
"""
container_class = cls._attribute_container_classes.get(
container_type, None)
if not container_class:
raise ValueError(f'Unsupported container type: {container_type:s}')
return container_class()
@classmethod
def DeregisterAttributeContainer(cls, attribute_container_class):
"""Deregisters an attribute container class.
The attribute container classes are identified based on their lower case
container type.
Args:
attribute_container_class (type): attribute container class.
Raises:
KeyError: if attribute container class is not set for the corresponding
container type.
"""
container_type = attribute_container_class.CONTAINER_TYPE.lower()
if container_type not in cls._attribute_container_classes:
raise KeyError((
f'Attribute container class not set for container type: '
f'{attribute_container_class.CONTAINER_TYPE:s}.'))
del cls._attribute_container_classes[container_type]
@classmethod
def GetContainerTypes(cls):
"""Retrieves the container types of the registered attribute containers.
Returns:
list[str]: container types.
"""
return list(cls._attribute_container_classes.keys())
@classmethod
def GetSchema(cls, container_type):
"""Retrieves the schema of a registered attribute container.
Args:
container_type (str): attribute container type.
Returns:
dict[str, str]: attribute container schema or an empty dictionary if
no schema available.
Raises:
ValueError: if the container type is not supported.
"""
container_class = cls._attribute_container_classes.get(
container_type, None)
if not container_class:
raise ValueError(f'Unsupported container type: {container_type:s}')
return getattr(container_class, 'SCHEMA', {})
@classmethod
def RegisterAttributeContainer(cls, attribute_container_class):
"""Registers a attribute container class.
The attribute container classes are identified based on their lower case
container type.
Args:
attribute_container_class (type): attribute container class.
Raises:
KeyError: if attribute container class is already set for the
corresponding container type.
"""
container_type = attribute_container_class.CONTAINER_TYPE.lower()
if container_type in cls._attribute_container_classes:
raise KeyError((
f'Attribute container class already set for container type: '
f'{attribute_container_class.CONTAINER_TYPE:s}.'))
cls._attribute_container_classes[container_type] = attribute_container_class
@classmethod
def RegisterAttributeContainers(cls, attribute_container_classes):
"""Registers attribute container classes.
The attribute container classes are identified based on their lower case
container type.
Args:
attribute_container_classes (list[type]): attribute container classes.
Raises:
KeyError: if attribute container class is already set for the
corresponding container type.
"""
for attribute_container_class in attribute_container_classes:
cls.RegisterAttributeContainer(attribute_container_class)
# -*- coding: utf-8 -*-
"""Fake (in-memory only) attribute container store for testing."""
import ast
import collections
import copy
import itertools
from acstore import interface
from acstore.containers import interface as containers_interface
class FakeAttributeContainerStore(interface.AttributeContainerStore):
"""Fake (in-memory only) attribute container store."""
def __init__(self):
"""Initializes a fake (in-memory only) store."""
super(FakeAttributeContainerStore, self).__init__()
self._attribute_containers = {}
self._is_open = False
def _RaiseIfNotReadable(self):
"""Raises if the store is not readable.
Raises:
OSError: if the store cannot be read from.
IOError: if the store cannot be read from.
"""
if not self._is_open:
raise IOError('Unable to read from closed storage writer.')
def _RaiseIfNotWritable(self):
"""Raises if the storage file is not writable.
Raises:
IOError: when the storage writer is closed.
OSError: when the storage writer is closed.
"""
if not self._is_open:
raise IOError('Unable to write to closed storage writer.')
def _WriteExistingAttributeContainer(self, container):
"""Writes an existing attribute container to the store.
Args:
container (AttributeContainer): attribute container.
Raises:
IOError: if an unsupported identifier is provided or if the attribute
container does not exist.
OSError: if an unsupported identifier is provided or if the attribute
container does not exist.
"""
identifier = container.GetIdentifier()
lookup_key = identifier.CopyToString()
containers = self._attribute_containers.get(container.CONTAINER_TYPE, None)
if containers is None or lookup_key not in containers:
raise IOError((
f'Missing attribute container: {container.CONTAINER_TYPE:s} with '
f'identifier: {lookup_key:s}'))
containers[lookup_key] = container
def _WriteNewAttributeContainer(self, container):
"""Writes a new attribute container to the store.
Args:
container (AttributeContainer): attribute container.
"""
containers = self._attribute_containers.get(container.CONTAINER_TYPE, None)
if containers is None:
containers = collections.OrderedDict()
self._attribute_containers[container.CONTAINER_TYPE] = containers
next_sequence_number = self._GetAttributeContainerNextSequenceNumber(
container.CONTAINER_TYPE)
identifier = containers_interface.AttributeContainerIdentifier(
name=container.CONTAINER_TYPE, sequence_number=next_sequence_number)
container.SetIdentifier(identifier)
lookup_key = identifier.CopyToString()
# Make sure the fake storage preserves the state of the attribute container.
container = copy.deepcopy(container)
containers[lookup_key] = container
def Close(self):
"""Closes the store.
Raises:
IOError: if the store is already closed.
OSError: if the store is already closed.
"""
if not self._is_open:
raise IOError('Store already closed.')
self._is_open = False
def GetAttributeContainerByIdentifier(self, container_type, identifier):
"""Retrieves a specific type of container with a specific identifier.
Args:
container_type (str): container type.
identifier (AttributeContainerIdentifier): attribute container identifier.
Returns:
AttributeContainer: attribute container or None if not available.
"""
containers = self._attribute_containers.get(container_type, {})
lookup_key = identifier.CopyToString()
return containers.get(lookup_key, None)
def GetAttributeContainerByIndex(self, container_type, index):
"""Retrieves a specific attribute container.
Args:
container_type (str): attribute container type.
index (int): attribute container index.
Returns:
AttributeContainer: attribute container or None if not available.
"""
containers = self._attribute_containers.get(container_type, {})
number_of_containers = len(containers)
if index < 0 or index >= number_of_containers:
return None
return next(itertools.islice(
containers.values(), index, number_of_containers))
def GetAttributeContainers(self, container_type, filter_expression=None):
"""Retrieves a specific type of attribute containers.
Args:
container_type (str): attribute container type.
filter_expression (Optional[str]): expression to filter the resulting
attribute containers by.
Yield:
AttributeContainer: attribute container.
"""
if filter_expression:
expression_ast = ast.parse(filter_expression, mode='eval')
filter_expression = compile(expression_ast, '<string>', mode='eval')
for attribute_container in self._attribute_containers.get(
container_type, {}).values():
if attribute_container.MatchesExpression(filter_expression):
yield attribute_container
def GetNumberOfAttributeContainers(self, container_type):
"""Retrieves the number of a specific type of attribute containers.
Args:
container_type (str): attribute container type.
Returns:
int: the number of containers of a specified type.
"""
containers = self._attribute_containers.get(container_type, {})
return len(containers)
def HasAttributeContainers(self, container_type):
"""Determines if a store contains a specific type of attribute container.
Args:
container_type (str): attribute container type.
Returns:
bool: True if the store contains the specified type of attribute
containers.
"""
containers = self._attribute_containers.get(container_type, {})
return bool(containers)
def Open(self, **kwargs):
"""Opens the store.
Raises:
IOError: if the store is already opened.
OSError: if the store is already opened.
"""
if self._is_open:
raise IOError('Store already opened.')
self._is_open = True
# -*- coding: utf-8 -*-
"""The attribute container store interface."""
import abc
import collections
from acstore.containers import manager as containers_manager
class AttributeContainerStore(object):
"""Interface of an attribute container store.
Attributes:
format_version (int): storage format version.
serialization_format (str): serialization format.
"""
def __init__(self):
"""Initializes an attribute container store."""
super(AttributeContainerStore, self).__init__()
self._attribute_container_sequence_numbers = collections.Counter()
self._containers_manager = containers_manager.AttributeContainersManager
self._storage_profiler = None
self.format_version = None
self.serialization_format = None
def _GetAttributeContainerNextSequenceNumber(self, container_type):
"""Retrieves the next sequence number of an attribute container.
Args:
container_type (str): attribute container type.
Returns:
int: next sequence number.
"""
self._attribute_container_sequence_numbers[container_type] += 1
return self._attribute_container_sequence_numbers[container_type]
def _GetAttributeContainerSchema(self, container_type):
"""Retrieves the schema of an attribute container.
Args:
container_type (str): attribute container type.
Returns:
dict[str, str]: attribute container schema or an empty dictionary if
no schema available.
"""
try:
schema = self._containers_manager.GetSchema(container_type)
except ValueError:
schema = {}
return schema
@abc.abstractmethod
def _RaiseIfNotReadable(self):
"""Raises if the store is not readable.
Raises:
OSError: if the store cannot be read from.
IOError: if the store cannot be read from.
"""
@abc.abstractmethod
def _RaiseIfNotWritable(self):
"""Raises if the store is not writable.
Raises:
OSError: if the store cannot be written to.
IOError: if the store cannot be written to.
"""
def _SetAttributeContainerNextSequenceNumber(
self, container_type, next_sequence_number):
"""Sets the next sequence number of an attribute container.
Args:
container_type (str): attribute container type.
next_sequence_number (int): next sequence number.
"""
self._attribute_container_sequence_numbers[
container_type] = next_sequence_number
@abc.abstractmethod
def _WriteNewAttributeContainer(self, container):
"""Writes a new attribute container to the store.
Args:
container (AttributeContainer): attribute container.
"""
@abc.abstractmethod
def _WriteExistingAttributeContainer(self, container):
"""Writes an existing attribute container to the store.
Args:
container (AttributeContainer): attribute container.
"""
def AddAttributeContainer(self, container):
"""Adds a new attribute container.
Args:
container (AttributeContainer): attribute container.
Raises:
OSError: if the store cannot be written to.
IOError: if the store cannot be written to.
"""
self._RaiseIfNotWritable()
self._WriteNewAttributeContainer(container)
@abc.abstractmethod
def Close(self):
"""Closes the store."""
@abc.abstractmethod
def GetAttributeContainerByIdentifier(self, container_type, identifier):
"""Retrieves a specific type of container with a specific identifier.
Args:
container_type (str): container type.
identifier (AttributeContainerIdentifier): attribute container identifier.
Returns:
AttributeContainer: attribute container or None if not available.
Raises:
IOError: when the store is closed or if an unsupported identifier is
provided.
OSError: when the store is closed or if an unsupported identifier is
provided.
"""
@abc.abstractmethod
def GetAttributeContainerByIndex(self, container_type, index):
"""Retrieves a specific attribute container.
Args:
container_type (str): attribute container type.
index (int): attribute container index.
Returns:
AttributeContainer: attribute container or None if not available.
Raises:
IOError: when the store is closed.
OSError: when the store is closed.
"""
@abc.abstractmethod
def GetAttributeContainers(self, container_type, filter_expression=None):
"""Retrieves a specific type of attribute containers.
Args:
container_type (str): attribute container type.
filter_expression (Optional[str]): expression to filter the resulting
attribute containers by.
Returns:
generator(AttributeContainer): attribute container generator.
Raises:
IOError: when the store is closed.
OSError: when the store is closed.
"""
@abc.abstractmethod
def GetNumberOfAttributeContainers(self, container_type):
"""Retrieves the number of a specific type of attribute containers.
Args:
container_type (str): attribute container type.
Returns:
int: the number of containers of a specified type.
"""
@abc.abstractmethod
def HasAttributeContainers(self, container_type):
"""Determines if a store contains a specific type of attribute container.
Args:
container_type (str): attribute container type.
Returns:
bool: True if the store contains the specified type of attribute
containers.
"""
@abc.abstractmethod
def Open(self, **kwargs):
"""Opens the store."""
def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profiler.
"""
self._storage_profiler = storage_profiler
def UpdateAttributeContainer(self, container):
"""Updates an existing attribute container.
Args:
container (AttributeContainer): attribute container.
Raises:
OSError: if the store cannot be written to.
IOError: if the store cannot be written to.
"""
self._RaiseIfNotWritable()
self._WriteExistingAttributeContainer(container)
# -*- coding: utf-8 -*-
"""SQLite-based attribute container store."""
import ast
import collections
import itertools
import os
import pathlib
import sqlite3
from acstore.containers import interface as containers_interface
from acstore import interface
def PythonAST2SQL(ast_node):
"""Converts a Python AST to SQL.
Args:
ast_node (ast.Node): node of the Python AST.
Returns:
str: SQL statement that represents the node.
Raises:
TypeError: if the type of node is not supported.
"""
if isinstance(ast_node, ast.BoolOp):
if isinstance(ast_node.op, ast.And):
operand = ' AND '
elif isinstance(ast_node.op, ast.Or):
operand = ' OR '
else:
raise TypeError(ast_node)
return operand.join([
PythonAST2SQL(ast_node_value) for ast_node_value in ast_node.values])
if isinstance(ast_node, ast.Compare):
if len(ast_node.ops) != 1:
raise TypeError(ast_node)
if isinstance(ast_node.ops[0], ast.Eq):
operator = ' = '
elif isinstance(ast_node.ops[0], ast.NotEq):
operator = ' <> '
else:
raise TypeError(ast_node)
if len(ast_node.comparators) != 1:
raise TypeError(ast_node)
sql_left = PythonAST2SQL(ast_node.left)
sql_right = PythonAST2SQL(ast_node.comparators[0])
return operator.join([sql_left, sql_right])
if isinstance(ast_node, ast.Constant):
if isinstance(ast_node.value, str):
return f'"{ast_node.value:s}"'
return str(ast_node.value)
if isinstance(ast_node, ast.Name):
return ast_node.id
if isinstance(ast_node, ast.Num):
return str(ast_node.n)
if isinstance(ast_node, ast.Str):
return f'"{ast_node.s:s}"'
raise TypeError(ast_node)
class SQLiteAttributeContainerStore(interface.AttributeContainerStore):
"""SQLite-based attribute container store.
Attributes:
format_version (int): storage format version.
"""
_FORMAT_VERSION = 20221023
# The earliest format version, stored in-file, that this class
# is able to append (write).
_APPEND_COMPATIBLE_FORMAT_VERSION = 20221023
# The earliest format version, stored in-file, that this class
# is able to upgrade (write new format features).
_UPGRADE_COMPATIBLE_FORMAT_VERSION = 20221023
# The earliest format version, stored in-file, that this class
# is able to read.
_READ_COMPATIBLE_FORMAT_VERSION = 20221023
_CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS = {
'AttributeContainerIdentifier': 'TEXT',
'bool': 'INTEGER',
'int': 'INTEGER',
'str': 'TEXT',
'timestamp': 'BIGINT'}
_CREATE_METADATA_TABLE_QUERY = (
'CREATE TABLE metadata (key TEXT, value TEXT);')
_HAS_TABLE_QUERY = (
'SELECT name FROM sqlite_master '
'WHERE type = "table" AND name = "{0:s}"')
_INSERT_METADATA_VALUE_QUERY = (
'INSERT INTO metadata (key, value) VALUES (?, ?)')
# The maximum number of cached attribute containers
_MAXIMUM_CACHED_CONTAINERS = 32 * 1024
_MAXIMUM_WRITE_CACHE_SIZE = 50
def __init__(self):
"""Initializes a SQLite attribute container store."""
super(SQLiteAttributeContainerStore, self).__init__()
self._attribute_container_cache = collections.OrderedDict()
self._connection = None
self._cursor = None
self._is_open = False
self._read_only = True
self._write_cache = {}
self.format_version = self._FORMAT_VERSION
def _CacheAttributeContainerByIndex(self, attribute_container, index):
"""Caches a specific attribute container.
Args:
attribute_container (AttributeContainer): attribute container.
index (int): attribute container index.
"""
if len(self._attribute_container_cache) >= self._MAXIMUM_CACHED_CONTAINERS:
self._attribute_container_cache.popitem(last=True)
lookup_key = f'{attribute_container.CONTAINER_TYPE:s}.{index:d}'
self._attribute_container_cache[lookup_key] = attribute_container
self._attribute_container_cache.move_to_end(lookup_key, last=False)
def _CheckStorageMetadata(self, metadata_values, check_readable_only=False):
"""Checks the storage metadata.
Args:
metadata_values (dict[str, str]): metadata values per key.
check_readable_only (Optional[bool]): whether the store should only be
checked to see if it can be read. If False, the store will be checked
to see if it can be read and written to.
Raises:
IOError: if the format version is not supported.
OSError: if the format version is not supported.
"""
format_version = metadata_values.get('format_version', None)
if not format_version:
raise IOError('Missing format version.')
try:
format_version = int(format_version, 10)
except (TypeError, ValueError):
raise IOError(f'Invalid format version: {format_version!s}.')
if (not check_readable_only and
format_version < self._APPEND_COMPATIBLE_FORMAT_VERSION):
raise IOError((
f'Format version: {format_version:d} is too old and can no longer '
f'be written.'))
if format_version < self._READ_COMPATIBLE_FORMAT_VERSION:
raise IOError((
f'Format version: {format_version:d} is too old and can no longer '
f'be read.'))
if format_version > self._FORMAT_VERSION:
raise IOError((
f'Format version: {format_version:d} is too new and not yet '
f'supported.'))
metadata_values['format_version'] = format_version
def _CreateAttributeContainerTable(self, container_type):
"""Creates a table for a specific attribute container type.
Args:
container_type (str): attribute container type.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
schema = self._GetAttributeContainerSchema(container_type)
if not schema:
raise IOError(f'Unsupported attribute container type: {container_type:s}')
column_definitions = ['_identifier INTEGER PRIMARY KEY AUTOINCREMENT']
schema_to_sqlite_type_mappings = (
self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS)
for name, data_type in sorted(schema.items()):
data_type = schema_to_sqlite_type_mappings.get(data_type, 'TEXT')
column_definitions.append(f'{name:s} {data_type:s}')
column_definitions = ', '.join(column_definitions)
query = f'CREATE TABLE {container_type:s} ({column_definitions:s});'
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
def _CreatetAttributeContainerFromRow(
self, container_type, column_names, row, first_column_index):
"""Creates an attribute container of a row in the database.
Args:
container_type (str): attribute container type.
column_names (list[str]): names of the columns selected.
row (sqlite.Row): row as a result from a SELECT query.
first_column_index (int): index of the first column in row.
Returns:
AttributeContainer: attribute container.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
schema = self._GetAttributeContainerSchema(container_type)
if not schema:
raise IOError(f'Unsupported attribute container type: {container_type:s}')
container = self._containers_manager.CreateAttributeContainer(
container_type)
for column_index, name in enumerate(column_names):
attribute_value = row[first_column_index + column_index]
if attribute_value is None:
continue
data_type = schema[name]
if data_type == 'AttributeContainerIdentifier':
identifier = containers_interface.AttributeContainerIdentifier()
identifier.CopyFromString(attribute_value)
attribute_value = identifier
elif data_type == 'bool':
attribute_value = bool(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: {container_type:s} '
f'attribute: {name:s} data type: {data_type:s}'))
setattr(container, name, attribute_value)
return container
def _Flush(self):
"""Ensures cached data is written to file.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
for container_type, write_cache in self._write_cache.items():
if len(write_cache) > 1:
self._FlushWriteCache(container_type, write_cache)
self._write_cache = {}
# We need to run commit or not all data is stored in the database.
self._connection.commit()
def _FlushWriteCache(self, container_type, write_cache):
"""Flushes attribute container values cached for writing.
Args:
container_type (str): attribute container type.
write_cache (list[tuple[str]]): cached attribute container values.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
column_names = write_cache.pop(0)
value_statement = ','.join(['?'] * len(column_names))
value_statement = f'({value_statement:s})'
values_statement = ', '.join([value_statement] * len(write_cache))
column_names_string = ', '.join(column_names)
query = (f'INSERT INTO {container_type:s} ({column_names_string:s}) '
f'VALUES {values_statement:s}')
if self._storage_profiler:
self._storage_profiler.StartTiming('write_new')
try:
values = list(itertools.chain(*write_cache))
self._cursor.execute(query, values)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
finally:
if self._storage_profiler:
self._storage_profiler.StopTiming('write_new')
def _GetAttributeContainersWithFilter(
self, container_type, column_names=None, filter_expression=None,
order_by=None):
"""Retrieves a specific type of stored attribute containers.
Args:
container_type (str): attribute container type.
column_names (Optional[list[str]]): names of the columns to retrieve.
filter_expression (Optional[str]): SQL expression to filter results by.
order_by (Optional[str]): name of a column to order the results by.
Yields:
AttributeContainer: attribute container.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
write_cache = self._write_cache.get(container_type, [])
if len(write_cache) > 1:
self._FlushWriteCache(container_type, write_cache)
del self._write_cache[container_type]
column_names_string = ', '.join(column_names)
query = (f'SELECT _identifier, {column_names_string:s} '
f'FROM {container_type:s}')
if filter_expression:
query = ' WHERE '.join([query, filter_expression])
if order_by:
query = ' ORDER BY '.join([query, order_by])
# Use a local cursor to prevent another query interrupting the generator.
cursor = self._connection.cursor()
try:
cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store for container: '
f'{container_type:s} with error: {exception!s}'))
if self._storage_profiler:
self._storage_profiler.StartTiming('get_containers')
try:
row = cursor.fetchone()
finally:
if self._storage_profiler:
self._storage_profiler.StopTiming('get_containers')
while row:
container = self._CreatetAttributeContainerFromRow(
container_type, column_names, row, 1)
identifier = containers_interface.AttributeContainerIdentifier(
name=container_type, sequence_number=row[0])
container.SetIdentifier(identifier)
yield container
if self._storage_profiler:
self._storage_profiler.StartTiming('get_containers')
try:
row = cursor.fetchone()
finally:
if self._storage_profiler:
self._storage_profiler.StopTiming('get_containers')
def _GetCachedAttributeContainer(self, container_type, index):
"""Retrieves a specific cached attribute container.
Args:
container_type (str): attribute container type.
index (int): attribute container index.
Returns:
AttributeContainer: attribute container or None if not available.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
lookup_key = f'{container_type:s}.{index:d}'
attribute_container = self._attribute_container_cache.get(lookup_key, None)
if attribute_container:
self._attribute_container_cache.move_to_end(lookup_key, last=False)
return attribute_container
def _HasTable(self, table_name):
"""Determines if a specific table exists.
Args:
table_name (str): name of the table.
Returns:
bool: True if the table exists, false otherwise.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
query = self._HAS_TABLE_QUERY.format(table_name)
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
return bool(self._cursor.fetchone())
def _RaiseIfNotReadable(self):
"""Raises if the attribute container store is not readable.
Raises:
IOError: when the attribute container store is closed.
OSError: when the attribute container store is closed.
"""
if not self._is_open:
raise IOError('Unable to read from closed attribute container store.')
def _RaiseIfNotWritable(self):
"""Raises if the attribute container store is not writable.
Raises:
IOError: when the attribute container store is closed or read-only.
OSError: when the attribute container store is closed or read-only.
"""
if not self._is_open:
raise IOError('Unable to write to closed attribute container store.')
if self._read_only:
raise IOError('Unable to write to read-only attribute container store.')
def _ReadAndCheckStorageMetadata(self, check_readable_only=False):
"""Reads storage metadata and checks that the values are valid.
Args:
check_readable_only (Optional[bool]): whether the store should only be
checked to see if it can be read. If False, the store will be checked
to see if it can be read and written to.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
metadata_values = self._ReadMetadata()
self._CheckStorageMetadata(
metadata_values, check_readable_only=check_readable_only)
self.format_version = metadata_values['format_version']
def _ReadMetadata(self):
"""Reads metadata.
Returns:
dict[str, str]: metadata values.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
query = 'SELECT key, value FROM metadata'
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
return {row[0]: row[1] for row in self._cursor.fetchall()}
def _UpdateStorageMetadataFormatVersion(self):
"""Updates the storage metadata format version.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
if self.format_version >= self._UPGRADE_COMPATIBLE_FORMAT_VERSION:
query = (f'UPDATE metadata SET value = {self._FORMAT_VERSION:d} '
f'WHERE key = "format_version"')
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
def _WriteExistingAttributeContainer(self, container):
"""Writes an existing attribute container to the store.
Args:
container (AttributeContainer): attribute container.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
identifier = container.GetIdentifier()
schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE)
if not schema:
raise IOError(
f'Unsupported attribute container type: {container.CONTAINER_TYPE:s}')
write_cache = self._write_cache.get(container.CONTAINER_TYPE, [])
if len(write_cache) > 1:
self._FlushWriteCache(container.CONTAINER_TYPE, write_cache)
del self._write_cache[container.CONTAINER_TYPE]
column_names = []
values = []
for name, data_type in sorted(schema.items()):
attribute_value = getattr(container, name, None)
if attribute_value is not None:
if data_type == 'AttributeContainerIdentifier' and isinstance(
attribute_value, containers_interface.AttributeContainerIdentifier):
attribute_value = attribute_value.CopyToString()
elif data_type == 'bool':
attribute_value = int(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
column_names.append(f'{name:s} = ?')
values.append(attribute_value)
column_names_string = ', '.join(column_names)
query = (f'UPDATE {container.CONTAINER_TYPE:s} SET {column_names_string:s} '
f'WHERE _identifier = {identifier.sequence_number:d}')
if self._storage_profiler:
self._storage_profiler.StartTiming('write_existing')
try:
self._cursor.execute(query, values)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
finally:
if self._storage_profiler:
self._storage_profiler.StopTiming('write_existing')
def _WriteMetadata(self):
"""Writes metadata.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
try:
self._cursor.execute(self._CREATE_METADATA_TABLE_QUERY)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
self._WriteMetadataValue('format_version', f'{self._FORMAT_VERSION:d}')
def _WriteMetadataValue(self, key, value):
"""Writes a metadata value.
Args:
key (str): key of the storage metadata.
value (str): value of the storage metadata.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
try:
self._cursor.execute(self._INSERT_METADATA_VALUE_QUERY, (key, value))
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
def _WriteNewAttributeContainer(self, container):
"""Writes a new attribute container to the store.
The table for the container type must exist.
Args:
container (AttributeContainer): attribute container.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
next_sequence_number = self._GetAttributeContainerNextSequenceNumber(
container.CONTAINER_TYPE)
identifier = containers_interface.AttributeContainerIdentifier(
name=container.CONTAINER_TYPE, sequence_number=next_sequence_number)
container.SetIdentifier(identifier)
schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE)
if not schema:
raise IOError(
f'Unsupported attribute container type: {container.CONTAINER_TYPE:s}')
column_names = []
values = []
for name, data_type in sorted(schema.items()):
attribute_value = getattr(container, name, None)
if attribute_value is not None:
if data_type == 'AttributeContainerIdentifier' and isinstance(
attribute_value,
containers_interface.AttributeContainerIdentifier):
attribute_value = attribute_value.CopyToString()
elif data_type == 'bool':
attribute_value = int(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
column_names.append(name)
values.append(attribute_value)
write_cache = self._write_cache.get(
container.CONTAINER_TYPE, [column_names])
write_cache.append(values)
if len(write_cache) >= self._MAXIMUM_WRITE_CACHE_SIZE:
self._FlushWriteCache(container.CONTAINER_TYPE, write_cache)
write_cache = [column_names]
self._write_cache[container.CONTAINER_TYPE] = write_cache
self._CacheAttributeContainerByIndex(container, next_sequence_number - 1)
@classmethod
def CheckSupportedFormat(cls, path):
"""Checks if the attribute container store format is supported.
Args:
path (str): path to the attribute container store.
Returns:
bool: True if the format is supported.
"""
# Check if the path is an existing file, to prevent sqlite3 creating
# an emtpy database file.
if not os.path.isfile(path):
return False
try:
connection = sqlite3.connect(
path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
cursor = connection.cursor()
query = 'SELECT * FROM metadata'
cursor.execute(query)
metadata_values = {row[0]: row[1] for row in cursor.fetchall()}
format_version = metadata_values.get('format_version', None)
if format_version:
try:
format_version = int(format_version, 10)
result = True
except (TypeError, ValueError):
pass
connection.close()
except (IOError, TypeError, ValueError, sqlite3.DatabaseError):
result = False
return result
def Close(self):
"""Closes the file.
Raises:
IOError: if the attribute container store is already closed.
OSError: if the attribute container store is already closed.
"""
if not self._is_open:
raise IOError('Storage file already closed.')
if self._connection:
self._Flush()
self._connection.close()
self._connection = None
self._cursor = None
self._is_open = False
def GetAttributeContainerByIdentifier(self, container_type, identifier):
"""Retrieves a specific type of container with a specific identifier.
Args:
container_type (str): container type.
identifier (AttributeContainerIdentifier): attribute container identifier.
Returns:
AttributeContainer: attribute container or None if not available.
Raises:
IOError: when the store is closed or if an unsupported attribute
container is provided.
OSError: when the store is closed or if an unsupported attribute
container is provided.
"""
return self.GetAttributeContainerByIndex(
container_type, identifier.sequence_number - 1)
def GetAttributeContainerByIndex(self, container_type, index):
"""Retrieves a specific attribute container.
Args:
container_type (str): attribute container type.
index (int): attribute container index.
Returns:
AttributeContainer: attribute container or None if not available.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
container = self._GetCachedAttributeContainer(container_type, index)
if container:
return container
write_cache = self._write_cache.get(container_type, [])
if len(write_cache) > 1:
self._FlushWriteCache(container_type, write_cache)
del self._write_cache[container_type]
schema = self._GetAttributeContainerSchema(container_type)
if not schema:
raise IOError(f'Unsupported attribute container type: {container_type:s}')
column_names = sorted(schema.keys())
column_names_string = ', '.join(column_names)
row_number = index + 1
query = (f'SELECT {column_names_string:s} FROM {container_type:s} WHERE '
f'rowid = {row_number:d}')
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
if self._storage_profiler:
self._storage_profiler.StartTiming('get_container_by_index')
try:
row = self._cursor.fetchone()
finally:
if self._storage_profiler:
self._storage_profiler.StopTiming('get_container_by_index')
if not row:
return None
container = self._CreatetAttributeContainerFromRow(
container_type, column_names, row, 0)
identifier = containers_interface.AttributeContainerIdentifier(
name=container_type, sequence_number=row_number)
container.SetIdentifier(identifier)
self._CacheAttributeContainerByIndex(container, index)
return container
def GetAttributeContainers(self, container_type, filter_expression=None):
"""Retrieves a specific type of stored attribute containers.
Args:
container_type (str): attribute container type.
filter_expression (Optional[str]): expression to filter the resulting
attribute containers by.
Returns:
generator(AttributeContainer): attribute container generator.
Raises:
IOError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
OSError: when there is an error querying the attribute container store
or if an unsupported attribute container is provided.
"""
schema = self._GetAttributeContainerSchema(container_type)
if not schema:
raise IOError(f'Unsupported attribute container type: {container_type:s}')
column_names = sorted(schema.keys())
sql_filter_expression = None
if filter_expression:
expression_ast = ast.parse(filter_expression, mode='eval')
sql_filter_expression = PythonAST2SQL(expression_ast.body)
return self._GetAttributeContainersWithFilter(
container_type, column_names=column_names,
filter_expression=sql_filter_expression)
def GetNumberOfAttributeContainers(self, container_type):
"""Retrieves the number of a specific type of attribute containers.
Args:
container_type (str): attribute container type.
Returns:
int: the number of containers of a specified type.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
if not self._HasTable(container_type):
return 0
write_cache = self._write_cache.get(container_type, [])
if len(write_cache) > 1:
self._FlushWriteCache(container_type, write_cache)
del self._write_cache[container_type]
# Note that this is SQLite specific, and will give inaccurate results if
# there are DELETE commands run on the table. acstore does not run any
# DELETE commands.
query = f'SELECT MAX(_ROWID_) FROM {container_type:s} LIMIT 1'
try:
self._cursor.execute(query)
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
row = self._cursor.fetchone()
if not row:
return 0
return row[0] or 0
def HasAttributeContainers(self, container_type):
"""Determines if store contains a specific type of attribute containers.
Args:
container_type (str): attribute container type.
Returns:
bool: True if the store contains the specified type of attribute
containers.
Raises:
IOError: when there is an error querying the attribute container store.
OSError: when there is an error querying the attribute container store.
"""
count = self.GetNumberOfAttributeContainers(container_type)
return count > 0
def Open(self, path=None, read_only=True, **unused_kwargs): # pylint: disable=arguments-differ
"""Opens the store.
Args:
path (Optional[str]): path to the attribute container store.
read_only (Optional[bool]): True if the file should be opened in
read-only mode.
Raises:
IOError: if the attribute container store is already opened or if
the database cannot be connected.
OSError: if the attribute container store is already opened or if
the database cannot be connected.
ValueError: if path is missing.
"""
if self._is_open:
raise IOError('Storage file already opened.')
if not path:
raise ValueError('Missing path.')
path = os.path.abspath(path)
try:
path_uri = pathlib.Path(path).as_uri()
if read_only:
path_uri = f'{path_uri:s}?mode=ro'
except ValueError:
path_uri = None
detect_types = sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES
if path_uri:
connection = sqlite3.connect(
path_uri, detect_types=detect_types, isolation_level='DEFERRED',
uri=True)
else:
connection = sqlite3.connect(
path, detect_types=detect_types, isolation_level='DEFERRED')
try:
# Use in-memory journaling mode to reduce IO.
connection.execute('PRAGMA journal_mode=MEMORY')
# Turn off insert transaction integrity since we want to do bulk insert.
connection.execute('PRAGMA synchronous=OFF')
except (sqlite3.InterfaceError, sqlite3.OperationalError) as exception:
raise IOError((
f'Unable to query attribute container store with error: '
f'{exception!s}'))
cursor = connection.cursor()
if not cursor:
return
self._connection = connection
self._cursor = cursor
self._is_open = True
self._read_only = read_only
if read_only:
self._ReadAndCheckStorageMetadata(check_readable_only=True)
else:
if not self._HasTable('metadata'):
self._WriteMetadata()
else:
self._ReadAndCheckStorageMetadata()
# Update the storage metadata format version in case we are adding
# new format features that are not backwards compatible.
self._UpdateStorageMetadataFormatVersion()
# TODO: create table on demand.
for container_type in self._containers_manager.GetContainerTypes():
if not self._HasTable(container_type):
self._CreateAttributeContainerTable(container_type)
self._connection.commit()
environment:
matrix:
- DESCRIPTION: "Windows with 32-bit Python 3.10"
MACHINE_TYPE: "x86"
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019
PYTHON: "C:\\Python310"
PYTHON_VERSION: "3.10"
L2TBINARIES_TRACK: "dev"
- DESCRIPTION: "Windows with 64-bit Python 3.10"
MACHINE_TYPE: "amd64"
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019
PYTHON: "C:\\Python310-x64"
PYTHON_VERSION: "3.10"
L2TBINARIES_TRACK: "dev"
- DESCRIPTION: "Mac OS with Python 3.11"
APPVEYOR_BUILD_WORKER_IMAGE: macos-monterey
HOMEBREW_NO_INSTALL_CLEANUP: 1
install:
- cmd: "%PYTHON%\\python.exe -m pip install -U pip setuptools twine wheel"
- cmd: "%PYTHON%\\python.exe -m pip install pywin32 WMI"
- cmd: "%PYTHON%\\python.exe %PYTHON%\\Scripts\\pywin32_postinstall.py -install"
- ps: If ($isWindows) { .\config\appveyor\install.ps1 }
- sh: config/appveyor/install.sh
build_script:
- cmd: "%PYTHON%\\python.exe setup.py bdist_wheel"
test_script:
- cmd: "%PYTHON%\\python.exe run_tests.py"
- cmd: IF EXIST "tests\\end-to-end.py" (
set PYTHONPATH=. &&
"%PYTHON%\\python.exe" "tests\\end-to-end.py" --debug -c "config\\end-to-end.ini" )
- sh: config/appveyor/runtests.sh
artifacts:
- path: dist\*.whl
# Script to set up tests on AppVeyor Windows.
$Dependencies = ""
$Dependencies = ${Dependencies} -split " "
$Output = Invoke-Expression -Command "git clone https://github.com/log2timeline/l2tdevtools.git ..\l2tdevtools 2>&1"
Write-Host (${Output} | Out-String)
If ($env:APPVEYOR_REPO_BRANCH -eq "main")
{
$Track = "stable"
}
Else
{
$Track = $env:APPVEYOR_REPO_BRANCH
}
New-Item -ItemType "directory" -Name "dependencies"
$env:PYTHONPATH = "..\l2tdevtools"
$Output = Invoke-Expression -Command "& '${env:PYTHON}\python.exe' ..\l2tdevtools\tools\update.py --download-directory dependencies --machine-type ${env:MACHINE_TYPE} --msi-targetdir ${env:PYTHON} --track ${env:L2TBINARIES_TRACK} ${Dependencies} 2>&1"
Write-Host (${Output} | Out-String)
# Script to set up tests on AppVeyor MacOS.
set -e
brew update -q
brew install -q gettext gnu-sed python@3.11 tox || true
#!/bin/sh
# Script to run tests
# Set the following environment variables to build libyal with gettext.
export CPPFLAGS="-I/usr/local/include -I/usr/local/opt/gettext/include ${CPPFLAGS}";
export LDFLAGS="-L/usr/local/lib -L/usr/local/opt/gettext/lib ${LDFLAGS}";
# Set the following environment variables to build pycrypto and yara-python.
export CPPFLAGS="-I/usr/local/opt/openssl@1.1/include ${CPPFLAGS}";
export LDFLAGS="-L/usr/local/opt/openssl@1.1/lib ${LDFLAGS}";
# Set the following environment variables to ensure tox can find Python 3.11.
export PATH="/usr/local/opt/python@3.11/bin:${PATH}";
tox -e py311
# -*- coding: utf-8 -*-
"""Sphinx build configuration file."""
import os
import sys
from sphinx.ext import apidoc
from docutils import nodes
from docutils import transforms
# Change PYTHONPATH to include acstore module and dependencies.
sys.path.insert(0, os.path.abspath('..'))
import acstore # pylint: disable=wrong-import-position
import utils.dependencies # pylint: disable=wrong-import-position
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
needs_sphinx = '2.0.1'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'recommonmark',
'sphinx.ext.autodoc',
'sphinx.ext.coverage',
'sphinx.ext.doctest',
'sphinx.ext.napoleon',
'sphinx.ext.viewcode',
'sphinx_markdown_tables',
'sphinx_rtd_theme',
]
# We cannot install architecture dependent Python modules on readthedocs,
# therefore we mock most imports.
pip_installed_modules = set()
dependency_helper = utils.dependencies.DependencyHelper(
dependencies_file=os.path.join('..', 'dependencies.ini'),
test_dependencies_file=os.path.join('..', 'test_dependencies.ini'))
modules_to_mock = set(dependency_helper.dependencies.keys())
modules_to_mock = modules_to_mock.difference(pip_installed_modules)
autodoc_mock_imports = sorted(modules_to_mock)
# Options for the Sphinx Napoleon extension, which reads Google-style
# docstrings.
napoleon_google_docstring = True
napoleon_numpy_docstring = False
napoleon_include_private_with_doc = False
napoleon_include_special_with_doc = True
# General information about the project.
# pylint: disable=redefined-builtin
project = 'ACStore'
copyright = 'The ACStore authors'
version = acstore.__version__
release = acstore.__version__
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
# The master toctree document.
master_doc = 'index'
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'sphinx_rtd_theme'
# Output file base name for HTML help builder.
htmlhelp_basename = 'acstoredoc'
# -- Options linkcheck ----------------------------------------------------
linkcheck_ignore = [
]
# -- Code to rewrite links for readthedocs --------------------------------
# This function is a Sphinx core event callback, the format of which is detailed
# here: https://www.sphinx-doc.org/en/master/extdev/appapi.html#events
# pylint: disable=unused-argument
def RunSphinxAPIDoc(app):
"""Runs sphinx-apidoc to auto-generate documentation.
Args:
app (sphinx.application.Sphinx): Sphinx application. Required by the
the Sphinx event callback API.
"""
current_directory = os.path.abspath(os.path.dirname(__file__))
module_path = os.path.join(current_directory, '..', 'acstore')
api_directory = os.path.join(current_directory, 'sources', 'api')
apidoc.main(['-o', api_directory, module_path, '--force'])
class MarkdownLinkFixer(transforms.Transform):
"""Transform definition to parse .md references to internal pages."""
default_priority = 1000
_URI_PREFIXES = []
def _FixLinks(self, node):
"""Corrects links to .md files not part of the documentation.
Args:
node (docutils.nodes.Node): docutils node.
Returns:
docutils.nodes.Node: docutils node, with correct URIs outside
of Markdown pages outside the documentation.
"""
if isinstance(node, nodes.reference) and 'refuri' in node:
reference_uri = node['refuri']
for uri_prefix in self._URI_PREFIXES:
if (reference_uri.startswith(uri_prefix) and not (
reference_uri.endswith('.asciidoc') or
reference_uri.endswith('.md'))):
node['refuri'] = reference_uri + '.md'
break
return node
def _Traverse(self, node):
"""Traverses the document tree rooted at node.
Args:
node (docutils.nodes.Node): docutils node.
"""
self._FixLinks(node)
for child_node in node.children:
self._Traverse(child_node)
# pylint: disable=arguments-differ
def apply(self):
"""Applies this transform on document tree."""
self._Traverse(self.document)
# pylint: invalid-name
def setup(app):
"""Called at Sphinx initialization.
Args:
app (sphinx.application.Sphinx): Sphinx application.
"""
# Triggers sphinx-apidoc to generate API documentation.
app.connect('builder-inited', RunSphinxAPIDoc)
app.add_config_value(
'recommonmark_config', {'enable_auto_toc_tree': True}, True)
app.add_transform(MarkdownLinkFixer)
Welcome to the ACStore documentation
====================================
ACStore, or Attribute Container Storage, provides a stand-alone implementation
to read and write attribute container storage files.
The source code is available from the `project page <https://github.com/log2timeline/acstore>`__.
.. toctree::
:maxdepth: 2
sources/user/index
.. toctree::
:maxdepth: 2
API documentation <sources/api/acstore>
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
docutils
Markdown
recommonmark
sphinx >= 4.1.0, < 5.2.0
sphinx-markdown-tables
sphinx-rtd-theme >= 0.5.1
acstore.containers package
==========================
Submodules
----------
acstore.containers.interface module
-----------------------------------
.. automodule:: acstore.containers.interface
:members:
:undoc-members:
:show-inheritance:
acstore.containers.manager module
---------------------------------
.. automodule:: acstore.containers.manager
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: acstore.containers
:members:
:undoc-members:
:show-inheritance:
acstore package
===============
Subpackages
-----------
.. toctree::
:maxdepth: 4
acstore.containers
Submodules
----------
acstore.fake\_store module
--------------------------
.. automodule:: acstore.fake_store
:members:
:undoc-members:
:show-inheritance:
acstore.interface module
------------------------
.. automodule:: acstore.interface
:members:
:undoc-members:
:show-inheritance:
acstore.sqlite\_store module
----------------------------
.. automodule:: acstore.sqlite_store
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: acstore
:members:
:undoc-members:
:show-inheritance:
acstore
=======
.. toctree::
:maxdepth: 4
acstore
###############
Getting started
###############
To be able to use ACStore you first need to install it. There are multiple
ways to install ACStore, check the following instructions for more detail.
.. toctree::
:maxdepth: 2
Installation instructions <Installation-instructions>
# Installation instructions
## pip
**Note that using pip outside virtualenv is not recommended since it ignores
your systems package manager. If you aren't comfortable debugging package
installation issues, this is not the option for you.**
Create and activate a virtualenv:
```bash
virtualenv acstoreenv
cd acstoreenv
source ./bin/activate
```
Upgrade pip and install ACStore dependencies:
```bash
pip install --upgrade pip
pip install acstore
```
To deactivate the virtualenv run:
```bash
deactivate
```
## Ubuntu 18.04 and 20.04 LTS
To install ACStore from the [GIFT Personal Package Archive (PPA)](https://launchpad.net/~gift):
```bash
sudo add-apt-repository ppa:gift/stable
```
Update and install ACStore:
```bash
sudo apt-get update
sudo apt-get install python3-acstore
```
## Windows
The [l2tbinaries](https://github.com/log2timeline/l2tbinaries) contains the
necessary packages for running ACStore. l2tbinaries provides the following
branches:
* main; branch intended for the "packaged release" of ACStore and dependencies;
* dev; branch intended for the "development release" of ACStore;
* testing; branch intended for testing newly created packages.
The l2tdevtools project provides [an update script](https://github.com/log2timeline/l2tdevtools/wiki/Update-script)
to ease the process of keeping the dependencies up to date.
The script requires [pywin32](https://github.com/mhammond/pywin32/releases) and
[Python WMI](https://pypi.org/project/WMI).
To install the release versions of the dependencies run:
```
set PYTHONPATH=.
C:\Python3\python.exe tools\update.py --preset acstore
```
pip >= 7.0.0
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the attribute container interface."""
import unittest
from acstore.containers import interface
from tests import test_lib
class AttributeContainerIdentifierTest(test_lib.BaseTestCase):
"""Tests for the attribute container identifier."""
def testCopyToString(self):
"""Tests the CopyToString function."""
sequence_number = id(self)
identifier = interface.AttributeContainerIdentifier(
name='test_container', sequence_number=sequence_number)
identifier_string = identifier.CopyToString()
self.assertEqual(
identifier_string, f'test_container.{sequence_number:d}')
class AttributeContainerTest(test_lib.BaseTestCase):
"""Tests for the attribute container interface."""
# pylint: disable=protected-access
def testCopyToDict(self):
"""Tests the CopyToDict function."""
attribute_container = interface.AttributeContainer()
attribute_container.attribute_name = 'attribute_name'
attribute_container.attribute_value = 'attribute_value'
expected_dict = {
'attribute_name': 'attribute_name',
'attribute_value': 'attribute_value'}
test_dict = attribute_container.CopyToDict()
self.assertEqual(test_dict, expected_dict)
def testGetAttributeNames(self):
"""Tests the GetAttributeNames function."""
attribute_container = interface.AttributeContainer()
attribute_container._protected_attribute = 'protected'
attribute_container.attribute_name = 'attribute_name'
attribute_container.attribute_value = 'attribute_value'
expected_attribute_names = ['attribute_name', 'attribute_value']
attribute_names = sorted(attribute_container.GetAttributeNames())
self.assertEqual(attribute_names, expected_attribute_names)
attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_protected_attribute']
expected_attribute_names = [
'_protected_attribute', 'attribute_name', 'attribute_value']
attribute_names = sorted(attribute_container.GetAttributeNames())
self.assertEqual(attribute_names, expected_attribute_names)
def testGetAttributes(self):
"""Tests the GetAttributes function."""
attribute_container = interface.AttributeContainer()
attribute_container._protected_attribute = 'protected'
attribute_container.attribute_name = 'attribute_name'
attribute_container.attribute_value = 'attribute_value'
expected_attributes = [
('attribute_name', 'attribute_name'),
('attribute_value', 'attribute_value')]
attributes = sorted(attribute_container.GetAttributes())
self.assertEqual(attributes, expected_attributes)
attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_protected_attribute']
expected_attributes = [
('_protected_attribute', 'protected'),
('attribute_name', 'attribute_name'),
('attribute_value', 'attribute_value')]
attributes = sorted(attribute_container.GetAttributes())
self.assertEqual(attributes, expected_attributes)
def testGetAttributeValueHash(self):
"""Tests the GetAttributeValuesHash function."""
attribute_container = interface.AttributeContainer()
attribute_container._protected_attribute = 'protected'
attribute_container.attribute_name = 'attribute_name'
attribute_container.attribute_value = 'attribute_value'
attribute_values_hash1 = attribute_container.GetAttributeValuesHash()
attribute_container.attribute_value = 'changes'
attribute_values_hash2 = attribute_container.GetAttributeValuesHash()
self.assertNotEqual(attribute_values_hash1, attribute_values_hash2)
attribute_container.attribute_value = 'attribute_value'
attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_protected_attribute']
attribute_values_hash2 = attribute_container.GetAttributeValuesHash()
self.assertNotEqual(attribute_values_hash1, attribute_values_hash2)
def testGetAttributeValuesString(self):
"""Tests the GetAttributeValuesString function."""
attribute_container = interface.AttributeContainer()
attribute_container._protected_attribute = 'protected'
attribute_container.attribute_name = 'attribute_name'
attribute_container.attribute_value = 'attribute_value'
attribute_values_string1 = attribute_container.GetAttributeValuesString()
attribute_container.attribute_value = 'changes'
attribute_values_string2 = attribute_container.GetAttributeValuesString()
self.assertNotEqual(attribute_values_string1, attribute_values_string2)
attribute_container.attribute_value = 'attribute_value'
attribute_container._SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_protected_attribute']
attribute_values_string2 = attribute_container.GetAttributeValuesString()
self.assertNotEqual(attribute_values_string1, attribute_values_string2)
def testGetIdentifier(self):
"""Tests the GetIdentifier function."""
attribute_container = interface.AttributeContainer()
identifier = attribute_container.GetIdentifier()
self.assertIsNotNone(identifier)
def testMatchesExpression(self):
"""Tests the MatchesExpression function."""
attribute_container = interface.AttributeContainer()
attribute_container.name = 'value'
result = attribute_container.MatchesExpression('name == "value"')
self.assertTrue(result)
result = attribute_container.MatchesExpression('name == "bogus"')
self.assertFalse(result)
result = attribute_container.MatchesExpression('bogus')
self.assertFalse(result)
def testSetIdentifier(self):
"""Tests the SetIdentifier function."""
attribute_container = interface.AttributeContainer()
attribute_container.SetIdentifier(None)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the attribute container manager."""
import unittest
from acstore.containers import manager
from tests import test_lib as shared_test_lib
class AttributeContainersManagerTest(shared_test_lib.BaseTestCase):
"""Tests for the attribute container manager."""
# pylint: disable=protected-access
def testCreateAttributeContainer(self):
"""Tests the CreateAttributeContainer function."""
manager.AttributeContainersManager.RegisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
try:
attribute_container = (
manager.AttributeContainersManager.CreateAttributeContainer(
'test_container'))
self.assertIsNotNone(attribute_container)
with self.assertRaises(ValueError):
manager.AttributeContainersManager.CreateAttributeContainer('bogus')
finally:
manager.AttributeContainersManager.DeregisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
def testGetContainerTypes(self):
"""Tests the GetContainerTypes function."""
manager.AttributeContainersManager.RegisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
try:
container_types = manager.AttributeContainersManager.GetContainerTypes()
self.assertIn('test_container', container_types)
finally:
manager.AttributeContainersManager.DeregisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
def testGetSchema(self):
"""Tests the GetSchema function."""
manager.AttributeContainersManager.RegisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
try:
schema = manager.AttributeContainersManager.GetSchema('test_container')
self.assertIsNotNone(schema)
self.assertEqual(schema, shared_test_lib.TestAttributeContainer.SCHEMA)
with self.assertRaises(ValueError):
manager.AttributeContainersManager.GetSchema('bogus')
finally:
manager.AttributeContainersManager.DeregisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
def testAttributeContainerRegistration(self):
"""Tests the Register and DeregisterAttributeContainer functions."""
number_of_classes = len(
manager.AttributeContainersManager._attribute_container_classes)
manager.AttributeContainersManager.RegisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
try:
self.assertEqual(
len(manager.AttributeContainersManager._attribute_container_classes),
number_of_classes + 1)
with self.assertRaises(KeyError):
manager.AttributeContainersManager.RegisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
finally:
manager.AttributeContainersManager.DeregisterAttributeContainer(
shared_test_lib.TestAttributeContainer)
self.assertEqual(
len(manager.AttributeContainersManager._attribute_container_classes),
number_of_classes)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the fake (in-memory only) store."""
import unittest
from acstore import fake_store
from tests import test_lib
class FakeAttributeContainerStoreTest(test_lib.BaseTestCase):
"""Tests for the fake (in-memory only) store."""
# pylint: disable=protected-access
def testRaiseIfNotReadable(self):
"""Tests the _RaiseIfNotReadable function."""
test_store = fake_store.FakeAttributeContainerStore()
with self.assertRaises(IOError):
test_store._RaiseIfNotReadable()
def testRaiseIfNotWritable(self):
"""Tests the _RaiseIfNotWritable function."""
test_store = fake_store.FakeAttributeContainerStore()
with self.assertRaises(IOError):
test_store._RaiseIfNotWritable()
def testWriteExistingAttributeContainer(self):
"""Tests the _WriteExistingAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
with self.assertRaises(IOError):
test_store._WriteExistingAttributeContainer(attribute_container)
test_store._WriteNewAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store._WriteExistingAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.Close()
def testWriteNewAttributeContainer(self):
"""Tests the _WriteNewAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store._WriteNewAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.Close()
def testAddAttributeContainer(self):
"""Tests the AddAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.Close()
with self.assertRaises(IOError):
test_store.AddAttributeContainer(attribute_container)
def testGetAttributeContainerByIdentifier(self):
"""Tests the GetAttributeContainerByIdentifier function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
test_store.AddAttributeContainer(attribute_container)
identifier = attribute_container.GetIdentifier()
container = test_store.GetAttributeContainerByIdentifier(
attribute_container.CONTAINER_TYPE, identifier)
self.assertIsNotNone(container)
identifier.sequence_number = 99
container = test_store.GetAttributeContainerByIdentifier(
attribute_container.CONTAINER_TYPE, identifier)
self.assertIsNone(container)
test_store.Close()
def testGetAttributeContainerByIndex(self):
"""Tests the GetAttributeContainerByIndex function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
container = test_store.GetAttributeContainerByIndex(
attribute_container.CONTAINER_TYPE, 0)
self.assertIsNone(container)
test_store.AddAttributeContainer(attribute_container)
container = test_store.GetAttributeContainerByIndex(
attribute_container.CONTAINER_TYPE, 0)
self.assertIsNotNone(container)
test_store.Close()
def testGetAttributeContainers(self):
"""Tests the GetAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d'
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE))
self.assertEqual(len(containers), 0)
test_store.AddAttributeContainer(attribute_container)
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE,
filter_expression=filter_expression))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE,
filter_expression=filter_expression))
self.assertEqual(len(containers), 0)
test_store.Close()
def testGetNumberOfAttributeContainers(self):
"""Tests the GetNumberOfAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.Close()
def testHasAttributeContainers(self):
"""Tests the HasAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
result = test_store.HasAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertFalse(result)
test_store.AddAttributeContainer(attribute_container)
result = test_store.HasAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertTrue(result)
test_store.Close()
def testOpenClose(self):
"""Tests the Open and Close functions."""
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
test_store.Close()
test_store.Open()
test_store.Close()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
test_store.Close()
test_store.Open()
with self.assertRaises(IOError):
test_store.Open()
test_store.Close()
with self.assertRaises(IOError):
test_store.Close()
def testUpdateAttributeContainer(self):
"""Tests the UpdateAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = fake_store.FakeAttributeContainerStore()
test_store.Open()
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
with self.assertRaises(IOError):
test_store.UpdateAttributeContainer(attribute_container)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.UpdateAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.Close()
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the attribute container store interface."""
import unittest
from acstore import interface
from acstore.containers import manager
from tests import test_lib
class AttributeContainerStoreTest(test_lib.BaseTestCase):
"""Tests for the attribute container store interface."""
# pylint: disable=protected-access
def testGetAttributeContainerNextSequenceNumber(self):
"""Tests the _GetAttributeContainerNextSequenceNumber function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = interface.AttributeContainerStore()
sequence_number = test_store._GetAttributeContainerNextSequenceNumber(
attribute_container.CONTAINER_TYPE)
self.assertEqual(sequence_number, 1)
sequence_number = test_store._GetAttributeContainerNextSequenceNumber(
attribute_container.CONTAINER_TYPE)
self.assertEqual(sequence_number, 2)
def testGetAttributeContainerSchema(self):
"""Tests the _GetAttributeContainerSchema function."""
attribute_container = test_lib.TestAttributeContainer()
test_store = interface.AttributeContainerStore()
schema = test_store._GetAttributeContainerSchema(
attribute_container.CONTAINER_TYPE)
self.assertEqual(schema, {})
manager.AttributeContainersManager.RegisterAttributeContainer(
test_lib.TestAttributeContainer)
try:
schema = test_store._GetAttributeContainerSchema(
attribute_container.CONTAINER_TYPE)
self.assertEqual(schema, test_lib.TestAttributeContainer.SCHEMA)
finally:
manager.AttributeContainersManager.DeregisterAttributeContainer(
test_lib.TestAttributeContainer)
# TODO: add tests for _SetAttributeContainerNextSequenceNumber
def testSetStorageProfiler(self):
"""Tests the SetStorageProfiler function."""
test_store = interface.AttributeContainerStore()
test_store.SetStorageProfiler(None)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the SQLite-based attribute container store."""
import os
import unittest
from acstore import sqlite_store
from acstore.containers import manager as containers_manager
from tests import test_lib
class _TestSQLiteAttributeContainerStoreV20220716(
sqlite_store.SQLiteAttributeContainerStore):
"""Test class for testing format compatibility checks."""
_FORMAT_VERSION = 20220716
_APPEND_COMPATIBLE_FORMAT_VERSION = 20211121
_UPGRADE_COMPATIBLE_FORMAT_VERSION = 20211121
_READ_COMPATIBLE_FORMAT_VERSION = 20211121
class _TestSQLiteAttributeContainerStoreV20221023(
sqlite_store.SQLiteAttributeContainerStore):
"""Test class for testing format compatibility checks."""
_FORMAT_VERSION = 20221023
_APPEND_COMPATIBLE_FORMAT_VERSION = 20221023
_UPGRADE_COMPATIBLE_FORMAT_VERSION = 20221023
_READ_COMPATIBLE_FORMAT_VERSION = 20211121
# TODO add tests for PythonAST2SQL.
class SQLiteAttributeContainerStoreTest(test_lib.BaseTestCase):
"""Tests for the SQLite-based storage file object."""
# pylint: disable=protected-access
def setUp(self):
"""Sets up the needed objects used throughout the test."""
containers_manager.AttributeContainersManager.RegisterAttributeContainer(
test_lib.TestAttributeContainer)
def tearDown(self):
"""Cleans up the needed objects used throughout the test."""
containers_manager.AttributeContainersManager.DeregisterAttributeContainer(
test_lib.TestAttributeContainer)
def testCacheAttributeContainerByIndex(self):
"""Tests the _CacheAttributeContainerByIndex function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory():
test_store = sqlite_store.SQLiteAttributeContainerStore()
self.assertEqual(len(test_store._attribute_container_cache), 0)
test_store._CacheAttributeContainerByIndex(attribute_container, 0)
self.assertEqual(len(test_store._attribute_container_cache), 1)
def testCheckStorageMetadata(self):
"""Tests the _CheckStorageMetadata function."""
with test_lib.TempDirectory():
test_store = sqlite_store.SQLiteAttributeContainerStore()
metadata_values = {
'format_version': '{0:d}'.format(test_store._FORMAT_VERSION)}
test_store._CheckStorageMetadata(metadata_values)
metadata_values['format_version'] = 'bogus'
with self.assertRaises(IOError):
test_store._CheckStorageMetadata(metadata_values)
metadata_values['format_version'] = '1'
with self.assertRaises(IOError):
test_store._CheckStorageMetadata(metadata_values)
metadata_values['format_version'] = '{0:d}'.format(
test_store._FORMAT_VERSION)
def testCreateAttributeContainerTable(self):
"""Tests the _CreateAttributeContainerTable function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
with self.assertRaises(IOError):
test_store._CreateAttributeContainerTable(
attribute_container.CONTAINER_TYPE)
finally:
test_store.Close()
# TODO: add tests for _CreatetAttributeContainerFromRow
# TODO: add tests for _Flush
# TODO: add tests for _FlushWriteCache
def testGetAttributeContainersWithFilter(self):
"""Tests the _GetAttributeContainersWithFilter function."""
attribute_container = test_lib.TestAttributeContainer()
attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d'
column_names = ['attribute']
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
containers = list(test_store._GetAttributeContainersWithFilter(
attribute_container.CONTAINER_TYPE, column_names=column_names))
self.assertEqual(len(containers), 0)
test_store.AddAttributeContainer(attribute_container)
containers = list(test_store._GetAttributeContainersWithFilter(
attribute_container.CONTAINER_TYPE, column_names=column_names))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store._GetAttributeContainersWithFilter(
attribute_container.CONTAINER_TYPE, column_names=column_names,
filter_expression=filter_expression))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store._GetAttributeContainersWithFilter(
attribute_container.CONTAINER_TYPE, column_names=column_names,
filter_expression=filter_expression))
self.assertEqual(len(containers), 0)
with self.assertRaises(IOError):
list(test_store._GetAttributeContainersWithFilter(
'bogus', column_names=column_names))
finally:
test_store.Close()
def testGetCachedAttributeContainer(self):
"""Tests the _GetCachedAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory():
test_store = sqlite_store.SQLiteAttributeContainerStore()
cached_container = test_store._GetCachedAttributeContainer(
attribute_container.CONTAINER_TYPE, 1)
self.assertIsNone(cached_container)
test_store._CacheAttributeContainerByIndex(attribute_container, 1)
cached_container = test_store._GetCachedAttributeContainer(
attribute_container.CONTAINER_TYPE, 1)
self.assertIsNotNone(cached_container)
def testHasTable(self):
"""Tests the _HasTable function."""
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
result = test_store._HasTable('test_container')
self.assertTrue(result)
result = test_store._HasTable('bogus')
self.assertFalse(result)
finally:
test_store.Close()
def testRaiseIfNotReadable(self):
"""Tests the _RaiseIfNotReadable function."""
test_store = sqlite_store.SQLiteAttributeContainerStore()
with self.assertRaises(IOError):
test_store._RaiseIfNotReadable()
def testRaiseIfNotWritable(self):
"""Tests the _RaiseIfNotWritable function."""
test_store = sqlite_store.SQLiteAttributeContainerStore()
with self.assertRaises(IOError):
test_store._RaiseIfNotWritable()
# TODO: add tests for _ReadAndCheckStorageMetadata
# TODO: add tests for _ReadMetadata
# TODO: add tests for _UpdateStorageMetadataFormatVersion
def testWriteExistingAttributeContainer(self):
"""Tests the _WriteExistingAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store._WriteNewAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store._WriteExistingAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
finally:
test_store.Close()
# TODO: add tests for _WriteMetadata
# TODO: add tests for _WriteMetadataValue
def testWriteNewAttributeContainer(self):
"""Tests the _WriteNewAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store._WriteNewAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
finally:
test_store.Close()
def testAddAttributeContainer(self):
"""Tests the AddAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
finally:
test_store.Close()
with self.assertRaises(IOError):
test_store.AddAttributeContainer(attribute_container)
# TODO: add tests for CheckSupportedFormat
def testGetAttributeContainerByIdentifier(self):
"""Tests the GetAttributeContainerByIdentifier function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
test_store.AddAttributeContainer(attribute_container)
identifier = attribute_container.GetIdentifier()
container = test_store.GetAttributeContainerByIdentifier(
attribute_container.CONTAINER_TYPE, identifier)
self.assertIsNotNone(container)
identifier.sequence_number = 99
container = test_store.GetAttributeContainerByIdentifier(
attribute_container.CONTAINER_TYPE, identifier)
self.assertIsNone(container)
finally:
test_store.Close()
def testGetAttributeContainerByIndex(self):
"""Tests the GetAttributeContainerByIndex function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
container = test_store.GetAttributeContainerByIndex(
attribute_container.CONTAINER_TYPE, 0)
self.assertIsNone(container)
test_store.AddAttributeContainer(attribute_container)
container = test_store.GetAttributeContainerByIndex(
attribute_container.CONTAINER_TYPE, 0)
self.assertIsNotNone(container)
with self.assertRaises(IOError):
test_store.GetAttributeContainerByIndex('bogus', 0)
finally:
test_store.Close()
def testGetAttributeContainers(self):
"""Tests the GetAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d'
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE))
self.assertEqual(len(containers), 0)
test_store.AddAttributeContainer(attribute_container)
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE,
filter_expression=filter_expression))
self.assertEqual(len(containers), 1)
filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"'
containers = list(test_store.GetAttributeContainers(
attribute_container.CONTAINER_TYPE,
filter_expression=filter_expression))
self.assertEqual(len(containers), 0)
with self.assertRaises(IOError):
list(test_store.GetAttributeContainers('bogus'))
finally:
test_store.Close()
def testGetNumberOfAttributeContainers(self):
"""Tests the GetNumberOfAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
# Test for a supported container type that does not have a table
# present in the storage file.
query = 'DROP TABLE {0:s}'.format(attribute_container.CONTAINER_TYPE)
test_store._cursor.execute(query)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
finally:
test_store.Close()
def testHasAttributeContainers(self):
"""Tests the HasAttributeContainers function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
result = test_store.HasAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertFalse(result)
test_store.AddAttributeContainer(attribute_container)
result = test_store.HasAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertTrue(result)
result = test_store.HasAttributeContainers('bogus')
self.assertFalse(result)
finally:
test_store.Close()
# TODO: add tests for Open and Close
def testUpdateAttributeContainer(self):
"""Tests the UpdateAttributeContainer function."""
attribute_container = test_lib.TestAttributeContainer()
with test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'acstore.sqlite')
test_store = sqlite_store.SQLiteAttributeContainerStore()
test_store.Open(path=test_path, read_only=False)
try:
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
test_store.AddAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
test_store.UpdateAttributeContainer(attribute_container)
number_of_containers = test_store.GetNumberOfAttributeContainers(
attribute_container.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
finally:
test_store.Close()
def testVersionCompatibility(self):
"""Tests the version compatibility methods."""
with test_lib.TempDirectory() as temp_directory:
v1_storage_path = os.path.join(temp_directory, 'v20220716.sqlite')
v1_test_store = _TestSQLiteAttributeContainerStoreV20220716()
v1_test_store.Open(path=v1_storage_path, read_only=False)
v1_test_store.Close()
v2_test_store_rw = _TestSQLiteAttributeContainerStoreV20221023()
with self.assertRaises((IOError, OSError)):
v2_test_store_rw.Open(path=v1_storage_path, read_only=False)
v2_test_store_ro = _TestSQLiteAttributeContainerStoreV20221023()
v2_test_store_ro.Open(path=v1_storage_path, read_only=True)
v2_test_store_ro.Close()
if __name__ == '__main__':
unittest.main()
# -*- coding: utf-8 -*-
"""Functions and classes for testing."""
import shutil
import tempfile
import unittest
from acstore.containers import interface as containers_interface
class TestAttributeContainer(containers_interface.AttributeContainer):
"""Attribute container for testing purposes.
Attributes:
attribute (str): attribute for testing purposes.
"""
CONTAINER_TYPE = 'test_container'
SCHEMA = {'attribute': 'str'}
def __init__(self):
"""Initializes an attribute container."""
super(TestAttributeContainer, self).__init__()
self.attribute = None
class BaseTestCase(unittest.TestCase):
"""The base test case."""
# Show full diff results.
maxDiff = None
class TempDirectory(object):
"""Class that implements a temporary directory."""
def __init__(self):
"""Initializes a temporary directory."""
super(TempDirectory, self).__init__()
self.name = ''
def __enter__(self):
"""Make this work with the 'with' statement."""
self.name = tempfile.mkdtemp()
return self.name
def __exit__(self, exception_type, value, traceback):
"""Make this work with the 'with' statement."""
shutil.rmtree(self.name, True)
[tox]
envlist = py3{7,8,9,10,11},coverage,docs,lint
[testenv]
allowlist_externals = ./run_tests.py
pip_pre = True
passenv =
CFLAGS
CPPFLAGS
LDFLAGS
setenv =
PYTHONPATH = {toxinidir}
deps =
-rrequirements.txt
-rtest_requirements.txt
coverage: coverage
commands =
py3{7,8,9,10,11}: ./run_tests.py
coverage: coverage erase
coverage: coverage run --source=acstore --omit="*_test*,*__init__*,*test_lib*" run_tests.py
[testenv:codecov]
skip_install = True
passenv =
CFLAGS
CPPFLAGS
GITHUB_ACTION
GITHUB_HEAD_REF
GITHUB_REF
GITHUB_REPOSITORY
GITHUB_RUN_ID
GITHUB_SHA
LDFLAGS
deps =
codecov < 2.1.10
commands =
codecov
[testenv:docs]
usedevelop = True
deps =
-rdocs/requirements.txt
commands =
sphinx-build -b html -d build/doctrees docs dist/docs
sphinx-build -b linkcheck docs dist/docs
[testenv:lint]
skipsdist = True
pip_pre = True
passenv =
CFLAGS
CPPFLAGS
LDFLAGS
setenv =
PYTHONPATH = {toxinidir}
deps =
-rrequirements.txt
-rtest_requirements.txt
pylint >= 2.14.0, < 2.15.0
commands =
pylint --version
# Ignore setup.py for now due to:
# setup.py:15:0: E0001: Cannot import 'distutils.command.bdist_msi' due to
# syntax error 'expected an indented block (<unknown>, line 347)' (syntax-error)
pylint --rcfile=.pylintrc acstore tests
#!/bin/bash
#
# Script that makes changes in preparation of a new release, such as updating
# the version and documentation.
EXIT_FAILURE=1;
EXIT_SUCCESS=0;
VERSION=`date -u +"%Y%m%d"`
DPKG_DATE=`date -R`
# Update the Python module version.
sed "s/__version__ = '[0-9]*'/__version__ = '${VERSION}'/" -i acstore/__init__.py
# Update the version in the dpkg configuration files.
cat > config/dpkg/changelog << EOT
acstore (${VERSION}-1) unstable; urgency=low
* Auto-generated
-- Log2Timeline maintainers <log2timeline-maintainers@googlegroups.com> ${DPKG_DATE}
EOT
# Regenerate the API documentation.
tox -edocs
exit ${EXIT_SUCCESS};
+10
-8

@@ -1,14 +0,16 @@

Metadata-Version: 1.1
Metadata-Version: 2.1
Name: acstore
Version: 20171013
Summary: Attribute Container Storage (ACStore)
Version: 20221230
Summary: Attribute Container Storage (ACStore).
Home-page: https://github.com/log2timeline/acstore
Author: dfDateTime development team
Author-email: log2timeline-dev@googlegroups.com
Maintainer: Log2Timeline maintainers
Maintainer-email: log2timeline-maintainers@googlegroups.com
License: Apache License, Version 2.0
Description: ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write plaso storage files.
Platform: UNKNOWN
Classifier: Development Status :: 3 - Alpha
Classifier:
Classifier: Environment :: Console
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Description-Content-Type: text/plain
License-File: LICENSE
ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write attribute container storage files.

@@ -0,1 +1,3 @@

.pylintrc
.style.yapf
ACKNOWLEDGEMENTS

@@ -6,11 +8,30 @@ AUTHORS

README
acstore.ini
appveyor.yml
dependencies.ini
requirements.txt
run_tests.py
setup.cfg
setup.py
test_dependencies.ini
test_requirements.txt
tox.ini
.github/workflows/test_docker.yml
.github/workflows/test_docs.yml
.github/workflows/test_tox.yml
acstore/__init__.py
acstore/fake_store.py
acstore/interface.py
acstore/sqlite_store.py
acstore.egg-info/PKG-INFO
acstore.egg-info/SOURCES.txt
acstore.egg-info/dependency_links.txt
acstore.egg-info/requires.txt
acstore.egg-info/top_level.txt
acstore/containers/__init__.py
acstore/containers/interface.py
acstore/containers/manager.py
config/appveyor/install.ps1
config/appveyor/install.sh
config/appveyor/runtests.sh
config/dpkg/changelog

@@ -25,5 +46,21 @@ config/dpkg/clean

config/dpkg/source/format
config/travis/install.sh
docs/conf.py
docs/index.rst
docs/requirements.txt
docs/sources/api/acstore.containers.rst
docs/sources/api/acstore.rst
docs/sources/api/modules.rst
docs/sources/user/Installation-instructions.md
docs/sources/user/index.rst
tests/__init__.py
tests/fake_store.py
tests/interface.py
tests/sqlite_store.py
tests/test_lib.py
tests/containers/__init__.py
tests/containers/interface.py
tests/containers/manager.py
utils/__init__.py
utils/check_dependencies.py
utils/dependencies.py
utils/dependencies.py
utils/update_release.sh

@@ -8,2 +8,2 @@ # -*- coding: utf-8 -*-

__version__ = '20171013'
__version__ = '20221230'
include ACKNOWLEDGEMENTS AUTHORS LICENSE README
include dependencies.ini run_tests.py utils/__init__.py utils/dependencies.py
include utils/check_dependencies.py
include requirements.txt test_requirements.txt
exclude .gitignore

@@ -5,0 +6,0 @@ exclude *.pyc

@@ -1,14 +0,16 @@

Metadata-Version: 1.1
Metadata-Version: 2.1
Name: acstore
Version: 20171013
Summary: Attribute Container Storage (ACStore)
Version: 20221230
Summary: Attribute Container Storage (ACStore).
Home-page: https://github.com/log2timeline/acstore
Author: dfDateTime development team
Author-email: log2timeline-dev@googlegroups.com
Maintainer: Log2Timeline maintainers
Maintainer-email: log2timeline-maintainers@googlegroups.com
License: Apache License, Version 2.0
Description: ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write plaso storage files.
Platform: UNKNOWN
Classifier: Development Status :: 3 - Alpha
Classifier:
Classifier: Environment :: Console
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Description-Content-Type: text/plain
License-File: LICENSE
ACStore, or Attribute Container Storage, provides a stand-alone implementation to read and write attribute container storage files.
+2
-3
ACStore, or Attribute Container Storage, provides a stand-alone implementation
to read and write plaso storage files.
to read and write Attribute Container stores, such as Plaso storage files.
For more information see:
* Project documentation: https://github.com/log2timeline/acstore/wiki/Home
* How to build from source: https://github.com/log2timeline/acstore/wiki/Building
* Project documentation: https://acstore.readthedocs.io/en/latest

@@ -1,2 +0,2 @@

#!/usr/bin/python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

@@ -9,3 +9,3 @@ """Script to run the tests."""

# Change PYTHONPATH to include dependencies.
sys.path.insert(0, u'.')
sys.path.insert(0, '.')

@@ -12,0 +12,0 @@ import utils.dependencies # pylint: disable=wrong-import-position

@@ -0,1 +1,4 @@

[metadata]
license_files = LICENSE
[bdist_rpm]

@@ -8,4 +11,7 @@ release = 1

README
build_requires = python-setuptools
build_requires = python3-setuptools
[bdist_wheel]
universal = 1
[egg_info]

@@ -12,0 +18,0 @@ tag_build =

+93
-37

@@ -1,6 +0,7 @@

#!/usr/bin/python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Installation and deployment script."""
from __future__ import print_function
import os
import pkg_resources
import sys

@@ -23,5 +24,6 @@

if sys.version < '2.7':
print('Unsupported Python version: {0:s}.'.format(sys.version))
print('Supported Python versions are 2.7 or a later 2.x version.')
version_tuple = (sys.version_info[0], sys.version_info[1])
if version_tuple < (3, 7):
print(f'Unsupported Python version: {sys.version:s}, version 3.7 or higher '
f'required.')
sys.exit(1)

@@ -41,2 +43,3 @@

# pylint: disable=invalid-name
def run(self):

@@ -57,2 +60,3 @@ """Builds an MSI."""

# pylint: disable=invalid-name
def _make_spec_file(self):

@@ -70,8 +74,6 @@ """Generates the text of an RPM spec file.

if sys.version_info[0] < 3:
python_package = 'python'
else:
python_package = 'python3'
python_package = 'python3'
description = []
requires = ''
summary = ''

@@ -83,10 +85,11 @@ in_description = False

if line.startswith('Summary: '):
summary = line
summary = line[9:]
elif line.startswith('BuildRequires: '):
line = 'BuildRequires: {0:s}-setuptools'.format(python_package)
line = (f'BuildRequires: {python_package:s}-setuptools, '
f'{python_package:s}-devel')
elif line.startswith('Requires: '):
if python_package == 'python3':
line = line.replace('python', 'python3')
requires = line[10:]
continue

@@ -96,15 +99,30 @@ elif line.startswith('%description'):

elif line.startswith('python setup.py build'):
if python_package == 'python3':
line = '%py3_build'
else:
line = '%py2_build'
elif line.startswith('python setup.py install'):
if python_package == 'python3':
line = '%py3_install'
else:
line = '%py2_install'
elif line.startswith('%files'):
# Cannot use %{_libdir} here since it can expand to "lib64".
lines = [
'%files',
f'%files -n {python_package:s}-%{{name}}',
'%defattr(644,root,root,755)',
'%doc ACKNOWLEDGEMENTS AUTHORS LICENSE README',
'%{_prefix}/lib/python*/site-packages/acstore/*.py',
'%{_prefix}/lib/python*/site-packages/acstore*.egg-info/*',
'%exclude %{_prefix}/lib/python*/site-packages/acstore/*.pyc',
'%exclude %{_prefix}/lib/python*/site-packages/acstore/*.pyo',
('%exclude %{_prefix}/lib/python*/site-packages/acstore/'
'__pycache__/*')]
'%license LICENSE',
'%doc ACKNOWLEDGEMENTS AUTHORS README']
lines.extend([
'%{python3_sitelib}/acstore/*.py',
'%{python3_sitelib}/acstore/*/*.py',
'%{python3_sitelib}/acstore*.egg-info/*',
'',
'%exclude %{_prefix}/share/doc/*',
'%exclude %{python3_sitelib}/acstore/__pycache__/*',
'%exclude %{python3_sitelib}/acstore/*/__pycache__/*'])
python_spec_file.extend(lines)

@@ -116,8 +134,13 @@ break

python_spec_file.append(
'%package -n {0:s}-%{{name}}'.format(python_package))
python_spec_file.append('{0:s}'.format(summary))
python_spec_file.append('')
python_spec_file.append(
'%description -n {0:s}-%{{name}}'.format(python_package))
python_spec_file.append(f'%package -n {python_package:s}-%{{name}}')
python_summary = f'Python 3 module of {summary:s}'
if requires:
python_spec_file.append(f'Requires: {requires:s}')
python_spec_file.extend([
f'Summary: {python_summary:s}',
'',
f'%description -n {python_package:s}-%{{name}}'])
python_spec_file.extend(description)

@@ -137,9 +160,41 @@

def parse_requirements_from_file(path):
"""Parses requirements from a requirements file.
Args:
path (str): path to the requirements file.
Returns:
list[str]: name and optional version information of the required packages.
"""
requirements = []
if os.path.isfile(path):
with open(path, 'r') as file_object:
file_contents = file_object.read()
for requirement in pkg_resources.parse_requirements(file_contents):
try:
name = str(requirement.req)
except AttributeError:
name = str(requirement)
if not name.startswith('pip '):
requirements.append(name)
return requirements
acstore_description = (
'Attribute Container Storage (ACStore)')
'Attribute Container Storage (ACStore).')
acstore_long_description = (
'ACStore, or Attribute Container Storage, provides a stand-alone '
'implementation to read and write plaso storage files.')
'implementation to read and write attribute container storage files.')
command_classes = {}
if BdistMSICommand:
command_classes['bdist_msi'] = BdistMSICommand
if BdistRPMCommand:
command_classes['bdist_rpm'] = BdistRPMCommand
setup(

@@ -150,11 +205,10 @@ name='acstore',

long_description=acstore_long_description,
long_description_content_type='text/plain',
license='Apache License, Version 2.0',
url='https://github.com/log2timeline/acstore',
maintainer='dfDateTime development team',
maintainer_email='log2timeline-dev@googlegroups.com',
cmdclass={
'bdist_msi': BdistMSICommand,
'bdist_rpm': BdistRPMCommand},
maintainer='Log2Timeline maintainers',
maintainer_email='log2timeline-maintainers@googlegroups.com',
cmdclass=command_classes,
classifiers=[
'Development Status :: 3 - Alpha',
'',
'Environment :: Console',

@@ -165,3 +219,3 @@ 'Operating System :: OS Independent',

packages=find_packages('.', exclude=[
'examples', 'tests', 'tests.*', 'utils']),
'docs', 'tests', 'tests.*', 'utils']),
package_dir={

@@ -174,2 +228,4 @@ 'acstore': 'acstore'

],
install_requires=parse_requirements_from_file('requirements.txt'),
tests_require=parse_requirements_from_file('test_requirements.txt'),
)

@@ -1,2 +0,2 @@

#!/usr/bin/python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

@@ -8,3 +8,3 @@ """Script to check for the availability and version of dependencies."""

# Change PYTHONPATH to include dependencies.
sys.path.insert(0, u'.')
sys.path.insert(0, '.')

@@ -14,5 +14,6 @@ import utils.dependencies # pylint: disable=wrong-import-position

if __name__ == u'__main__':
if __name__ == '__main__':
dependency_helper = utils.dependencies.DependencyHelper()
dependency_helper.CheckDependencies()
if not dependency_helper.CheckDependencies():
sys.exit(1)
# -*- coding: utf-8 -*-
"""Helper to check for availability and version of dependencies."""
from __future__ import print_function
from __future__ import unicode_literals
import configparser
import os
import re
try:
import ConfigParser as configparser
except ImportError:
import configparser # pylint: disable=import-error
class DependencyDefinition(object):

@@ -23,8 +17,15 @@ """Dependency definition.

the dependency.
maximum_version (str): maximum supported version.
minimum_version (str): minimum supported version.
maximum_version (str): maximum supported version, a greater or equal
version is not supported.
minimum_version (str): minimum supported version, a lesser version is
not supported.
name (str): name of (the Python module that provides) the dependency.
pypi_name (str): name of the PyPI package that provides the dependency.
python2_only (bool): True if the dependency is only supported by Python 2.
python3_only (bool): True if the dependency is only supported by Python 3.
rpm_name (str): name of the rpm package that provides the dependency.
skip_check (bool): True if the dependency should be skipped by the
CheckDependencies or CheckTestDependencies methods of DependencyHelper.
skip_requires (bool): True if the dependency should be excluded from
requirements.txt or setup.py install_requires.
version_property (str): name of the version attribute or function.

@@ -34,3 +35,3 @@ """

def __init__(self, name):
"""Initializes a dependency configuation.
"""Initializes a dependency configuration.

@@ -49,3 +50,6 @@ Args:

self.python2_only = False
self.python3_only = False
self.rpm_name = None
self.skip_check = None
self.skip_requires = None
self.version_property = None

@@ -65,3 +69,6 @@

'python2_only',
'python3_only',
'rpm_name',
'skip_check',
'skip_requires',
'version_property'])

@@ -83,3 +90,3 @@

except configparser.NoOptionError:
return
return None

@@ -95,4 +102,4 @@ def Read(self, file_object):

"""
config_parser = configparser.RawConfigParser()
config_parser.readfp(file_object) # pylint: disable=deprecated-method
config_parser = configparser.ConfigParser(interpolation=None)
config_parser.read_file(file_object)

@@ -109,22 +116,36 @@ for section_name in config_parser.sections():

class DependencyHelper(object):
"""Dependency helper."""
"""Dependency helper.
Attributes:
dependencies (dict[str, DependencyDefinition]): dependencies.
"""
_VERSION_NUMBERS_REGEX = re.compile(r'[0-9.]+')
_VERSION_SPLIT_REGEX = re.compile(r'\.|\-')
def __init__(self):
"""Initializes a dependency helper."""
def __init__(
self, dependencies_file='dependencies.ini',
test_dependencies_file='test_dependencies.ini'):
"""Initializes a dependency helper.
Args:
dependencies_file (Optional[str]): path to the dependencies configuration
file.
test_dependencies_file (Optional[str]): path to the test dependencies
configuration file.
"""
super(DependencyHelper, self).__init__()
self._dependencies = {}
self._test_dependencies = {}
self.dependencies = {}
dependency_reader = DependencyDefinitionReader()
with open('dependencies.ini', 'r') as file_object:
with open(dependencies_file, 'r', encoding='utf-8') as file_object:
for dependency in dependency_reader.Read(file_object):
self._dependencies[dependency.name] = dependency
self.dependencies[dependency.name] = dependency
dependency = DependencyDefinition('mock')
dependency.minimum_version = '0.7.1'
dependency.version_property = '__version__'
self._test_dependencies['mock'] = dependency
if os.path.exists(test_dependencies_file):
with open(test_dependencies_file, 'r', encoding='utf-8') as file_object:
for dependency in dependency_reader.Read(file_object):
self._test_dependencies[dependency.name] = dependency

@@ -138,3 +159,3 @@ def _CheckPythonModule(self, dependency):

Returns:
tuple: consists:
tuple: containing:

@@ -147,6 +168,5 @@ bool: True if the Python module is available and conforms to

if not module_object:
status_message = 'missing: {0:s}'.format(dependency.name)
return dependency.is_optional, status_message
return False, f'missing: {dependency.name:s}'
if not dependency.version_property or not dependency.minimum_version:
if not dependency.version_property:
return True, dependency.name

@@ -171,3 +191,3 @@

Returns:
tuple: consists:
tuple: containing:

@@ -188,65 +208,54 @@ bool: True if the Python module is available and conforms to

if not module_version:
status_message = (
'unable to determine version information for: {0:s}').format(
module_name)
return False, status_message
return False, (
f'unable to determine version information for: {module_name:s}')
# Make sure the module version is a string.
module_version = '{0!s}'.format(module_version)
module_version = f'{module_version!s}'
# Split the version string and convert every digit into an integer.
# A string compare of both version strings will yield an incorrect result.
module_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(module_version)))
minimum_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(minimum_version)))
if module_version_map < minimum_version_map:
status_message = (
'{0:s} version: {1!s} is too old, {2!s} or later required').format(
module_name, module_version, minimum_version)
return False, status_message
# Strip any semantic suffixes such as a1, b1, pre, post, rc, dev.
module_version = self._VERSION_NUMBERS_REGEX.findall(module_version)[0]
if maximum_version:
maximum_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(maximum_version)))
if module_version_map > maximum_version_map:
status_message = (
'{0:s} version: {1!s} is too recent, {2!s} or earlier '
'required').format(module_name, module_version, maximum_version)
return False, status_message
if module_version[-1] == '.':
module_version = module_version[:-1]
status_message = '{0:s} version: {1!s}'.format(module_name, module_version)
return True, status_message
try:
module_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(module_version)))
except ValueError:
return False, (
f'unable to parse module version: {module_name:s} {module_version:s}')
def _CheckSQLite3(self):
"""Checks the availability of sqlite3.
if minimum_version:
try:
minimum_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(minimum_version)))
except ValueError:
return False, (
f'unable to parse minimum version: {module_name:s} '
f'{minimum_version:s}')
Returns:
tuple: consists:
if module_version_map < minimum_version_map:
return False, (
f'{module_name:s} version: {module_version!s} is too old, '
f'{minimum_version!s} or later required')
bool: True if the Python module is available and conforms to
the minimum required version, False otherwise.
str: status message.
"""
# On Windows sqlite3 can be provided by both pysqlite2.dbapi2 and
# sqlite3. sqlite3 is provided with the Python installation and
# pysqlite2.dbapi2 by the pysqlite2 Python module. Typically
# pysqlite2.dbapi2 would contain a newer version of sqlite3, hence
# we check for its presence first.
module_name = 'pysqlite2.dbapi2'
minimum_version = '3.7.8'
if maximum_version:
try:
maximum_version_map = list(
map(int, self._VERSION_SPLIT_REGEX.split(maximum_version)))
except ValueError:
return False, (
f'unable to parse maximum version: {module_name:s} '
f'{maximum_version:s}')
module_object = self._ImportPythonModule(module_name)
if not module_object:
module_name = 'sqlite3'
if module_version_map > maximum_version_map:
return False, (
f'{module_name:s} version: {module_version!s} is too recent, '
f'{maximum_version!s} or earlier required')
module_object = self._ImportPythonModule(module_name)
if not module_object:
status_message = 'missing: {0:s}.'.format(module_name)
return False, status_message
return True, f'{module_name:s} version: {module_version!s}'
return self._CheckPythonModuleVersion(
module_name, module_object, 'sqlite_version', minimum_version, None)
def _ImportPythonModule(self, module_name):

@@ -264,3 +273,3 @@ """Imports a Python module.

except ImportError:
return
return None

@@ -283,2 +292,3 @@ # If the module name contains dots get the upper most module object.

status_message (str): status message.
verbose_output (Optional[bool]): True if output should be verbose.
"""

@@ -291,6 +301,6 @@ if not result or dependency.is_optional:

print('{0:s}\t{1:s}.'.format(status_indicator, status_message))
print(f'{status_indicator:s}\t{status_message:s}')
elif verbose_output:
print('[OK]\t\t{0:s}'.format(status_message))
print(f'[OK]\t\t{status_message:s}')

@@ -309,9 +319,9 @@ def CheckDependencies(self, verbose_output=True):

for module_name, dependency in sorted(self._dependencies.items()):
if module_name == 'sqlite3':
result, status_message = self._CheckSQLite3()
else:
result, status_message = self._CheckPythonModule(dependency)
for _, dependency in sorted(self.dependencies.items()):
if dependency.skip_check:
continue
if not result:
result, status_message = self._CheckPythonModule(dependency)
if not result and not dependency.is_optional:
check_result = False

@@ -346,4 +356,8 @@

key=lambda dependency: dependency.name):
if dependency.skip_check:
continue
result, status_message = self._CheckPythonModule(dependency)
if not result:
if not result and not dependency.is_optional:
check_result = False

@@ -350,0 +364,0 @@

#!/bin/bash
#
# Script to set up Travis-CI test VM.
COVERALL_DEPENDENCIES="python-coverage python-coveralls python-docopt";
L2TBINARIES_DEPENDENCIES="";
L2TBINARIES_TEST_DEPENDENCIES="funcsigs mock pbr six";
PYTHON2_DEPENDENCIES="";
PYTHON2_TEST_DEPENDENCIES="python-mock python-tox";
# Exit on error.
set -e;
if test ${TRAVIS_OS_NAME} = "osx";
then
git clone https://github.com/log2timeline/l2tdevtools.git;
mv l2tdevtools ../;
mkdir dependencies;
PYTHONPATH=../l2tdevtools ../l2tdevtools/tools/update.py --download-directory=dependencies ${L2TBINARIES_DEPENDENCIES} ${L2TBINARIES_TEST_DEPENDENCIES};
elif test ${TRAVIS_OS_NAME} = "linux";
then
sudo rm -f /etc/apt/sources.list.d/travis_ci_zeromq3-source.list;
sudo add-apt-repository ppa:gift/dev -y;
sudo apt-get update -q;
# Only install the Python 2 dependencies.
# Also see: https://docs.travis-ci.com/user/languages/python/#Travis-CI-Uses-Isolated-virtualenvs
sudo apt-get install -y ${COVERALL_DEPENDENCIES} ${PYTHON2_DEPENDENCIES} ${PYTHON2_TEST_DEPENDENCIES};
fi

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet