Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

acstore

Package Overview
Dependencies
Maintainers
2
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

acstore - npm Package Compare versions

Comparing version
20230226
to
20230325
+10
acstore/errors.py
# -*- coding: utf-8 -*-
"""The error objects."""
class Error(Exception):
"""The error interface."""
class ParseError(Error):
"""Raised when a parse error occurred."""
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
"""Schema helper."""
class SchemaHelper(object):
"""Schema helper."""
# Data types and corresponding attribute serializers per method.
_data_types = {
'AttributeContainerIdentifier': None,
'bool': None,
'int': None,
'str': None,
'timestamp': None}
@classmethod
def DeregisterDataType(cls, data_type):
"""Deregisters an data type.
Args:
data_type (str): data type.
Raises:
KeyError: if the data type is not set.
"""
if data_type not in cls._data_types:
raise KeyError(f'Data type: {data_type:s} not set.')
del cls._data_types[data_type]
@classmethod
def GetAttributeSerializer(cls, data_type, serialization_method):
"""Retrieves a specific attribute serializer.
Args:
data_type (str): data type.
serialization_method (str): serialization method.
Returns:
AttributeSerializer: attribute serializer or None if not available.
"""
serializers = cls._data_types.get(data_type, None) or {}
return serializers.get(serialization_method, None)
@classmethod
def HasDataType(cls, data_type):
"""Determines is a specific data type is supported by the schema.
Args:
data_type (str): data type.
Returns:
bool: True if the data type is supported, or False otherwise.
"""
return data_type in cls._data_types
@classmethod
def RegisterDataType(cls, data_type, serializers):
"""Registers a data type.
Args:
data_type (str): data type.
serializers (dict[str, AttributeSerializer]): attribute serializers per
method.
Raises:
KeyError: if the data type is already set.
"""
if data_type in cls._data_types:
raise KeyError(f'Data type: {data_type:s} already set.')
cls._data_types[data_type] = serializers
@classmethod
def RegisterDataTypes(cls, data_types):
"""Registers data types.
Args:
data_types (dict[str: dict[str, AttributeSerializer]]): attribute
serializers with method per data types.
Raises:
KeyError: if the data type is already set.
"""
for data_type, serializers in data_types.items():
cls.RegisterDataType(data_type, serializers)
# -*- coding: utf-8 -*-
"""YAML-based attribute container definitions file."""
import yaml
from acstore import errors
from acstore.containers import interface
from acstore.helpers import schema
# TODO: merge this into interface.AttributeContainer once Plaso has been
# changed to no longer support attributes containers without a schema.
class AttributeContainerWithSchema(interface.AttributeContainer):
"""Attribute container with schema."""
SCHEMA = {}
class YAMLAttributeContainerDefinitionsFile(object):
"""YAML-based attribute container definitions file.
A YAML-based attribute container definitions file contains one or more
attribute container definitions. An attribute container definition consists
of:
name: 'windows_eventlog_message_file'
attributes:
- name: path
type: str
- name: windows_path
type: str
Where:
* name, unique identifier of the attribute container;
* attributes, defines the attributes of the container.
"""
_SUPPORTED_DATA_TYPES = frozenset([
'AttributeContainerIdentifier',
'bool',
'int',
'str',
'timestamp'])
_SUPPORTED_KEYS = frozenset([
'name',
'attributes'])
def _ReadDefinition(self, definition_values):
"""Reads a definition from a dictionary.
Args:
definition_values (dict[str, object]): attribute container definition
values.
Returns:
AttributeContainer: an attribute container.
Raises:
ParseError: if the definition is not set or incorrect.
"""
if not definition_values:
raise errors.ParseError('Missing attribute container definition values.')
different_keys = set(definition_values) - self._SUPPORTED_KEYS
if different_keys:
different_keys = ', '.join(different_keys)
raise errors.ParseError(f'Undefined keys: {different_keys:s}')
container_name = definition_values.get('name', None)
if not container_name:
raise errors.ParseError(
'Invalid attribute container definition missing name.')
attributes = definition_values.get('attributes', None)
if not attributes:
raise errors.ParseError((
f'Invalid attribute container definition: {container_name:s} '
f'missing attributes.'))
class_name = ''.join([
element.title() for element in container_name.split('_')])
class_attributes = {'CONTAINER_TYPE': container_name}
container_schema = {}
for attribute_index, attribute_values in enumerate(attributes):
attribute_name = attribute_values.get('name', None)
if not attribute_name:
raise errors.ParseError((
f'Invalid attribute container definition: {container_name:s} name '
f'missing of attribute: {attribute_index:d}.'))
if attribute_name in class_attributes:
raise errors.ParseError((
f'Invalid attribute container definition: {container_name:s} '
f'attribute: {attribute_name:s} already set.'))
attribute_data_type = attribute_values.get('type', None)
if not attribute_data_type:
raise errors.ParseError((
f'Invalid attribute container definition: {container_name:s} type '
f'missing of attribute: {attribute_name:s}.'))
if not schema.SchemaHelper.HasDataType(attribute_data_type):
raise errors.ParseError((
f'Invalid attribute container definition: {container_name:s} type '
f'attribute: {attribute_name:s} unsupported data type: '
f'{attribute_data_type:s}.'))
class_attributes[attribute_name] = None
container_schema[attribute_name] = attribute_data_type
class_attributes['SCHEMA'] = container_schema
# TODO: add support for _SERIALIZABLE_PROTECTED_ATTRIBUTES.
return type(class_name, (AttributeContainerWithSchema, ), class_attributes)
def _ReadFromFileObject(self, file_object):
"""Reads the definitions from a file-like object.
Args:
file_object (file): definitions file-like object.
Yields:
AttributeContainer: an attribute container.
"""
yaml_generator = yaml.safe_load_all(file_object)
for yaml_definition in yaml_generator:
yield self._ReadDefinition(yaml_definition)
def ReadFromFile(self, path):
"""Reads the definitions from a YAML file.
Args:
path (str): path to a definitions file.
Yields:
AttributeContainer: an attribute container.
"""
with open(path, 'r', encoding='utf-8') as file_object:
for yaml_definition in self._ReadFromFileObject(file_object):
yield yaml_definition
acstore.helpers package
=======================
Submodules
----------
acstore.helpers.schema module
-----------------------------
.. automodule:: acstore.helpers.schema
:members:
:undoc-members:
:show-inheritance:
acstore.helpers.yaml\_definitions\_file module
----------------------------------------------
.. automodule:: acstore.helpers.yaml_definitions_file
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: acstore.helpers
:members:
:undoc-members:
:show-inheritance:
# YAML-based attribute container definitions file.
---
name: 'windows_eventlog_message_file'
attributes:
- name: 'path'
type: 'str'
- name: 'windows_path'
type: 'str'
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the schema helper."""
import unittest
from acstore.helpers import schema
from tests import test_lib as shared_test_lib
class SchemaHelperTest(shared_test_lib.BaseTestCase):
"""Tests for the schema helper."""
# pylint: disable=protected-access
def testHasDataType(self):
"""Tests the HasDataType function."""
result = schema.SchemaHelper.HasDataType('str')
self.assertTrue(result)
result = schema.SchemaHelper.HasDataType('test')
self.assertFalse(result)
def testRegisterDataType(self):
"""Tests the RegisterDataType function."""
number_of_data_types = len(schema.SchemaHelper._data_types)
schema.SchemaHelper.RegisterDataType('test', {'json': None})
try:
self.assertEqual(
len(schema.SchemaHelper._data_types), number_of_data_types + 1)
with self.assertRaises(KeyError):
schema.SchemaHelper.RegisterDataType('test', {'json': None})
finally:
schema.SchemaHelper.DeregisterDataType('test')
self.assertEqual(
len(schema.SchemaHelper._data_types), number_of_data_types)
def testRegisterDataTypes(self):
"""Tests the RegisterDataTypes function."""
number_of_data_types = len(schema.SchemaHelper._data_types)
schema.SchemaHelper.RegisterDataTypes({'test': {'json': None}})
try:
self.assertEqual(
len(schema.SchemaHelper._data_types), number_of_data_types + 1)
with self.assertRaises(KeyError):
schema.SchemaHelper.RegisterDataTypes({'test': {'json': None}})
finally:
schema.SchemaHelper.DeregisterDataType('test')
self.assertEqual(
len(schema.SchemaHelper._data_types), number_of_data_types)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the YAML-based attribute container definitions file."""
import io
import unittest
from acstore import errors
from acstore.helpers import yaml_definitions_file
from tests import test_lib as shared_test_lib
class YAMLAttributeContainerDefinitionsFileTest(shared_test_lib.BaseTestCase):
"""Tests for the YAML-based attribute container definitions file."""
# pylint: disable=protected-access
_FORMATTERS_YAML = {
'name': 'windows_eventlog_message_file',
'attributes': [
{'name': 'path', 'type': 'str'},
{'name': 'windows_path', 'type': 'str'}]}
def testReadDefinition(self):
"""Tests the _ReadDefinition function."""
test_definitions_file = (
yaml_definitions_file.YAMLAttributeContainerDefinitionsFile())
container_class = test_definitions_file._ReadDefinition(
self._FORMATTERS_YAML)
self.assertIsNotNone(container_class)
self.assertEqual(
container_class.CONTAINER_TYPE, 'windows_eventlog_message_file')
self.assertEqual(
container_class.SCHEMA, {'path': 'str', 'windows_path': 'str'})
with self.assertRaises(errors.ParseError):
test_definitions_file._ReadDefinition({})
with self.assertRaises(errors.ParseError):
test_definitions_file._ReadDefinition({
'name': 'windows_eventlog_message_file',
'attributes': []})
with self.assertRaises(errors.ParseError):
test_definitions_file._ReadDefinition({
'name': 'windows_eventlog_message_file',
'attributes': [{'type': 'str'}]})
with self.assertRaises(errors.ParseError):
test_definitions_file._ReadDefinition({
'name': 'windows_eventlog_message_file',
'attributes': [{'name': 'path'}]})
with self.assertRaises(errors.ParseError):
test_definitions_file._ReadDefinition({
'name': 'windows_eventlog_message_file',
'attributes': [{'name': 'path', 'type': 'bogus'}]})
def testReadFromFileObject(self):
"""Tests the _ReadFromFileObject function."""
test_file_path = self._GetTestFilePath(['definitions.yaml'])
self._SkipIfPathNotExists(test_file_path)
test_definitions_file = (
yaml_definitions_file.YAMLAttributeContainerDefinitionsFile())
with io.open(test_file_path, 'r', encoding='utf-8') as file_object:
definitions = list(test_definitions_file._ReadFromFileObject(file_object))
self.assertEqual(len(definitions), 1)
def testReadFromFile(self):
"""Tests the ReadFromFile function."""
test_file_path = self._GetTestFilePath(['definitions.yaml'])
self._SkipIfPathNotExists(test_file_path)
test_definitions_file = (
yaml_definitions_file.YAMLAttributeContainerDefinitionsFile())
definitions = list(test_definitions_file.ReadFromFile(test_file_path))
self.assertEqual(len(definitions), 1)
self.assertEqual(
definitions[0].CONTAINER_TYPE, 'windows_eventlog_message_file')
if __name__ == '__main__':
unittest.main()
+2
-2

@@ -21,3 +21,3 @@ # Run tests on Fedora and Ubuntu Docker images using GIFT CORP and GIFT PPA on commit

dnf copr -y enable @gift/dev
dnf install -y @development-tools python3 python3-devel python3-setuptools
dnf install -y @development-tools python3 python3-devel python3-pyyaml python3-setuptools
- name: Run tests

@@ -62,3 +62,3 @@ env:

apt-get update -q
apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools
apt-get install -y build-essential python3 python3-dev python3-distutils python3-setuptools python3-yaml
- name: Run tests

@@ -65,0 +65,0 @@ env:

@@ -39,3 +39,3 @@ # Run docs tox tests on Ubuntu Docker images using GIFT PPA

apt-get update -q
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools python3-yaml
- name: Install tox

@@ -42,0 +42,0 @@ run: |

@@ -49,3 +49,3 @@ # Run tox tests on Ubuntu Docker images using GIFT PPA

apt-get update -q
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools
apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv python3-distutils python3-pip python3-setuptools python3-yaml
- name: Install tox

@@ -52,0 +52,0 @@ run: |

Metadata-Version: 2.1
Name: acstore
Version: 20230226
Version: 20230325
Summary: Attribute Container Storage (ACStore).

@@ -5,0 +5,0 @@ Home-page: https://github.com/log2timeline/acstore

pip>=7.0.0
PyYAML>=3.10

@@ -22,2 +22,3 @@ .pylintrc

acstore/__init__.py
acstore/errors.py
acstore/fake_store.py

@@ -35,2 +36,5 @@ acstore/interface.py

acstore/containers/manager.py
acstore/helpers/__init__.py
acstore/helpers/schema.py
acstore/helpers/yaml_definitions_file.py
config/appveyor/install.ps1

@@ -50,2 +54,3 @@ config/appveyor/install.sh

docs/sources/api/acstore.containers.rst
docs/sources/api/acstore.helpers.rst
docs/sources/api/acstore.rst

@@ -55,2 +60,3 @@ docs/sources/api/modules.rst

docs/sources/user/index.rst
test_data/definitions.yaml
tests/__init__.py

@@ -65,2 +71,5 @@ tests/fake_store.py

tests/containers/manager.py
tests/helpers/__init__.py
tests/helpers/schema.py
tests/helpers/yaml_definitions_file.py
utils/__init__.py

@@ -67,0 +76,0 @@ utils/check_dependencies.py

@@ -8,2 +8,2 @@ # -*- coding: utf-8 -*-

__version__ = '20230226'
__version__ = '20230325'

@@ -44,3 +44,3 @@ # -*- coding: utf-8 -*-

if self.name is not None and self.sequence_number is not None:
return '{0:s}.{1:d}'.format(self.name, self.sequence_number)
return f'{self.name:s}.{self.sequence_number:d}'

@@ -152,5 +152,3 @@ return None

attribute_string = '{0:s}: {1!s}'.format(
attribute_name, attribute_value)
attributes.append(attribute_string)
attributes.append(f'{attribute_name:s}: {attribute_value!s}')

@@ -157,0 +155,0 @@ return ', '.join(attributes)

@@ -10,2 +10,28 @@ # -*- coding: utf-8 -*-

class AttributeSerializer(object):
"""Attribute serializer."""
@abc.abstractmethod
def DeserializeValue(self, value):
"""Deserializes a value.
Args:
value (object): serialized value.
Returns:
object: runtime value.
"""
@abc.abstractmethod
def SerializeValue(self, value):
"""Serializes a value.
Args:
value (object): runtime value.
Returns:
object: serialized value.
"""
class AttributeContainerStore(object):

@@ -12,0 +38,0 @@ """Interface of an attribute container store.

@@ -101,11 +101,9 @@ # -*- coding: utf-8 -*-

sample = '{0:f}\t{1:s}\t{2:s}\t{3:s}\t{4:f}\t{5:d}\t{6:d}\n'.format(
sample_time, profile_name, operation, description,
processing_time, data_size, compressed_data_size)
self._WritesString(sample)
self._WritesString((
f'{sample_time:f}\t{profile_name:s}\t{operation:s}\t{description:s}\t'
f'{processing_time:f}\t{data_size:d}\t{compressed_data_size:d}\n'))
def Start(self):
"""Starts the profiler."""
filename = '{0:s}-{1:s}.csv.gz'.format(
self._FILENAME_PREFIX, self._identifier)
filename = f'{self._FILENAME_PREFIX:s}-{self._identifier:s}.csv.gz'
if self._path:

@@ -112,0 +110,0 @@ filename = os.path.join(self._path, filename)

@@ -11,4 +11,5 @@ # -*- coding: utf-8 -*-

from acstore import interface
from acstore.containers import interface as containers_interface
from acstore import interface
from acstore.helpers import schema as schema_helper

@@ -76,2 +77,94 @@

class SQLiteSchemaHelper(object):
"""SQLite schema helper."""
_MAPPINGS = {
'bool': 'INTEGER',
'int': 'INTEGER',
'str': 'TEXT',
'timestamp': 'BIGINT'}
def GetStorageDataType(self, data_type):
"""Retrieves the storage data type.
Args:
data_type (str): schema data type.
Returns:
str: corresponding SQLite data type.
"""
return self._MAPPINGS.get(data_type, 'TEXT')
def DeserializeValue(self, data_type, value):
"""Deserializes a value.
Args:
data_type (str): schema data type.
value (object): serialized value.
Returns:
object: runtime value.
Raises:
IOError: if the schema data type is not supported.
OSError: if the schema data type is not supported.
"""
if not schema_helper.SchemaHelper.HasDataType(data_type):
raise IOError(f'Unsupported data type: {data_type:s}')
if value is not None:
if data_type == 'AttributeContainerIdentifier':
identifier = containers_interface.AttributeContainerIdentifier()
identifier.CopyFromString(value)
value = identifier
elif data_type == 'bool':
value = bool(value)
elif data_type not in self._MAPPINGS:
serializer = schema_helper.SchemaHelper.GetAttributeSerializer(
data_type, 'json')
value = serializer.DeserializeValue(value)
return value
def SerializeValue(self, data_type, value):
"""Serializes a value.
Args:
data_type (str): schema data type.
value (object): runtime value.
Returns:
object: serialized value.
Raises:
IOError: if the schema data type is not supported.
OSError: if the schema data type is not supported.
"""
if not schema_helper.SchemaHelper.HasDataType(data_type):
raise IOError(f'Unsupported data type: {data_type:s}')
if value is not None:
if data_type == 'AttributeContainerIdentifier' and isinstance(
value, containers_interface.AttributeContainerIdentifier):
value = value.CopyToString()
elif data_type == 'bool':
value = int(value)
elif data_type not in self._MAPPINGS:
serializer = schema_helper.SchemaHelper.GetAttributeSerializer(
data_type, 'json')
# JSON will not serialize certain runtime types like set, therefore
# these are cast to list first.
if isinstance(value, set):
value = list(value)
return serializer.SerializeValue(value)
return value
class SQLiteAttributeContainerStore(interface.AttributeContainerStore):

@@ -82,5 +175,6 @@ """SQLite-based attribute container store.

format_version (int): storage format version.
serialization_format (str): serialization format.
"""
_FORMAT_VERSION = 20221023
_FORMAT_VERSION = 20230312

@@ -99,2 +193,3 @@ # The earliest format version, stored in-file, that this class

# TODO: kept for backwards compatibility.
_CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS = {

@@ -130,5 +225,7 @@ 'AttributeContainerIdentifier': 'TEXT',

self._read_only = True
self._schema_helper = SQLiteSchemaHelper()
self._write_cache = {}
self.format_version = self._FORMAT_VERSION
self.serialization_format = 'json'

@@ -177,4 +274,4 @@ def _CacheAttributeContainerByIndex(self, attribute_container, index):

Raises:
IOError: if the format version is not supported.
OSError: if the format version is not supported.
IOError: if the storage metadata is not supported.
OSError: if the storage metadata is not supported.
"""

@@ -210,2 +307,8 @@ format_version = metadata_values.get('format_version', None)

serialization_format = metadata_values.get('serialization_format', None)
if serialization_format != 'json':
raise IOError(
f'Unsupported serialization format: {serialization_format!s}')
# Ensure format_version is an integer.
metadata_values['format_version'] = format_version

@@ -242,6 +345,4 @@

schema_to_sqlite_type_mappings = (
self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS)
for name, data_type in sorted(schema.items()):
data_type = schema_to_sqlite_type_mappings.get(data_type, 'TEXT')
data_type = self._schema_helper.GetStorageDataType(data_type)
column_definitions.append(f'{name:s} {data_type:s}')

@@ -286,22 +387,15 @@

for column_index, name in enumerate(column_names):
attribute_value = row[first_column_index + column_index]
if attribute_value is None:
continue
row_value = row[first_column_index + column_index]
if row_value is not None:
data_type = schema[name]
try:
attribute_value = self._schema_helper.DeserializeValue(
data_type, row_value)
except IOError:
raise IOError((
f'Unsupported attribute container type: {container_type:s} '
f'attribute: {name:s} data type: {data_type:s}'))
data_type = schema[name]
if data_type == 'AttributeContainerIdentifier':
identifier = containers_interface.AttributeContainerIdentifier()
identifier.CopyFromString(attribute_value)
attribute_value = identifier
setattr(container, name, attribute_value)
elif data_type == 'bool':
attribute_value = bool(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: {container_type:s} '
f'attribute: {name:s} data type: {data_type:s}'))
setattr(container, name, attribute_value)
return container

@@ -518,2 +612,3 @@

self.format_version = metadata_values['format_version']
self.serialization_format = metadata_values['serialization_format']

@@ -586,18 +681,13 @@ def _ReadMetadata(self):

attribute_value = getattr(container, name, None)
if attribute_value is not None:
if data_type == 'AttributeContainerIdentifier' and isinstance(
attribute_value, containers_interface.AttributeContainerIdentifier):
attribute_value = attribute_value.CopyToString()
try:
row_value = self._schema_helper.SerializeValue(
data_type, attribute_value)
except IOError:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
elif data_type == 'bool':
attribute_value = int(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
column_names.append(f'{name:s} = ?')
values.append(attribute_value)
values.append(row_value)

@@ -639,2 +729,3 @@ column_names_string = ', '.join(column_names)

self._WriteMetadataValue('format_version', f'{self._FORMAT_VERSION:d}')
self._WriteMetadataValue('serialization_format', self.serialization_format)

@@ -690,25 +781,19 @@ def _WriteMetadataValue(self, key, value):

column_names = []
values = []
row_values = []
for name, data_type in sorted(schema.items()):
attribute_value = getattr(container, name, None)
if attribute_value is not None:
if data_type == 'AttributeContainerIdentifier' and isinstance(
attribute_value,
containers_interface.AttributeContainerIdentifier):
attribute_value = attribute_value.CopyToString()
try:
row_value = self._schema_helper.SerializeValue(
data_type, attribute_value)
except IOError:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
elif data_type == 'bool':
attribute_value = int(attribute_value)
elif data_type not in self._CONTAINER_SCHEMA_TO_SQLITE_TYPE_MAPPINGS:
raise IOError((
f'Unsupported attribute container type: '
f'{container.CONTAINER_TYPE:s} attribute: {name:s} data type: '
f'{data_type:s}'))
column_names.append(name)
values.append(attribute_value)
row_values.append(row_value)
self._CacheAttributeContainerForWrite(
container.CONTAINER_TYPE, column_names, values)
container.CONTAINER_TYPE, column_names, row_values)

@@ -715,0 +800,0 @@ self._CacheAttributeContainerByIndex(container, next_sequence_number - 1)

# Script to set up tests on AppVeyor Windows.
$Dependencies = ""
$Dependencies = "PyYAML"
$Dependencies = ${Dependencies} -split " "

@@ -5,0 +5,0 @@

@@ -11,2 +11,3 @@ acstore package

acstore.containers
acstore.helpers

@@ -16,2 +17,10 @@ Submodules

acstore.errors module
---------------------
.. automodule:: acstore.errors
:members:
:undoc-members:
:show-inheritance:
acstore.fake\_store module

@@ -18,0 +27,0 @@ --------------------------

Metadata-Version: 2.1
Name: acstore
Version: 20230226
Version: 20230325
Summary: Attribute Container Storage (ACStore).

@@ -5,0 +5,0 @@ Home-page: https://github.com/log2timeline/acstore

pip >= 7.0.0
PyYAML >= 3.10

@@ -12,2 +12,3 @@ [metadata]

build_requires = python3-setuptools
requires = python3-pyyaml >= 3.10

@@ -14,0 +15,0 @@ [bdist_wheel]

@@ -37,2 +37,69 @@ #!/usr/bin/env python3

class SQLiteSchemaHelperTest(test_lib.BaseTestCase):
"""Tests for the SQLite schema helper."""
# pylint: disable=protected-access
def testGetStorageDataType(self):
"""Tests the GetStorageDataType function."""
schema_helper = sqlite_store.SQLiteSchemaHelper()
data_type = schema_helper.GetStorageDataType('bool')
self.assertEqual(data_type, 'INTEGER')
data_type = schema_helper.GetStorageDataType('int')
self.assertEqual(data_type, 'INTEGER')
data_type = schema_helper.GetStorageDataType('str')
self.assertEqual(data_type, 'TEXT')
data_type = schema_helper.GetStorageDataType('timestamp')
self.assertEqual(data_type, 'BIGINT')
data_type = schema_helper.GetStorageDataType('AttributeContainerIdentifier')
self.assertEqual(data_type, 'TEXT')
def testDeserializeValue(self):
"""Tests the DeserializeValue function."""
schema_helper = sqlite_store.SQLiteSchemaHelper()
value = schema_helper.DeserializeValue('bool', 0)
self.assertFalse(value)
value = schema_helper.DeserializeValue('bool', 1)
self.assertTrue(value)
value = schema_helper.DeserializeValue('int', 1)
self.assertEqual(value, 1)
value = schema_helper.DeserializeValue('str', 'one')
self.assertEqual(value, 'one')
value = schema_helper.DeserializeValue('timestamp', 1)
self.assertEqual(value, 1)
# TODO: add test for AttributeContainerIdentifier
def testSerializeValue(self):
"""Tests the SerializeValue function."""
schema_helper = sqlite_store.SQLiteSchemaHelper()
value = schema_helper.SerializeValue('bool', False)
self.assertEqual(value, 0)
value = schema_helper.SerializeValue('bool', True)
self.assertEqual(value, 1)
value = schema_helper.SerializeValue('int', 1)
self.assertEqual(value, 1)
value = schema_helper.SerializeValue('str', 'one')
self.assertEqual(value, 'one')
value = schema_helper.SerializeValue('timestamp', 1)
self.assertEqual(value, 1)
# TODO: add test for AttributeContainerIdentifier
class SQLiteAttributeContainerStoreTest(test_lib.BaseTestCase):

@@ -71,3 +138,4 @@ """Tests for the SQLite-based storage file object."""

metadata_values = {
'format_version': '{0:d}'.format(test_store._FORMAT_VERSION)}
'format_version': f'{test_store._FORMAT_VERSION:d}',
'serialization_format': 'json'}
test_store._CheckStorageMetadata(metadata_values)

@@ -83,5 +151,9 @@

metadata_values['format_version'] = '{0:d}'.format(
test_store._FORMAT_VERSION)
metadata_values['format_version'] = f'{test_store._FORMAT_VERSION:d}'
metadata_values['serialization_format'] = 'bogus'
with self.assertRaises(IOError):
test_store._CheckStorageMetadata(metadata_values)
metadata_values['serialization_format'] = 'json'
def testCreateAttributeContainerTable(self):

@@ -404,3 +476,3 @@ """Tests the _CreateAttributeContainerTable function."""

# present in the storage file.
query = 'DROP TABLE {0:s}'.format(attribute_container.CONTAINER_TYPE)
query = f'DROP TABLE {attribute_container.CONTAINER_TYPE:s}'
test_store._cursor.execute(query)

@@ -407,0 +479,0 @@ number_of_containers = test_store.GetNumberOfAttributeContainers(

# -*- coding: utf-8 -*-
"""Functions and classes for testing."""
import os
import shutil

@@ -11,2 +12,11 @@ import tempfile

# The path to top of the dfWinReg source tree.
PROJECT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# The paths below are all derived from the project path directory.
# They are enumerated explicitly here so that they can be overwritten for
# compatibility with different build systems.
TEST_DATA_PATH = os.path.join(PROJECT_PATH, 'test_data')
class TestAttributeContainer(containers_interface.AttributeContainer):

@@ -35,3 +45,29 @@ """Attribute container for testing purposes.

def _GetTestFilePath(self, path_segments):
"""Retrieves the path of a test file relative to the test data directory.
Args:
path_segments (list[str]): path segments inside the test data directory.
Returns:
str: path of the test file.
"""
# Note that we need to pass the individual path segments to os.path.join
# and not a list.
return os.path.join(TEST_DATA_PATH, *path_segments)
def _SkipIfPathNotExists(self, path):
"""Skips the test if the path does not exist.
Args:
path (str): path of a test file.
Raises:
SkipTest: if the path does not exist and the test should be skipped.
"""
if not os.path.exists(path):
filename = os.path.basename(path)
raise unittest.SkipTest(f'missing test file: {filename:s}')
class TempDirectory(object):

@@ -38,0 +74,0 @@ """Class that implements a temporary directory."""

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet