You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

ltcodecs

Package Overview
Dependencies
Maintainers
2
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ltcodecs - pypi Package Compare versions

Comparing version
1.1.0
to
2.0.0
+4
-3
PKG-INFO
Metadata-Version: 2.1
Name: ltcodecs
Version: 1.1.0
Version: 2.0.0
Summary: LT Codecs
Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
Home-page: https://git.whoi.edu/acomms/ltcodec/
Author: whoi
Author-email: egallimore@whoi.edu
Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ltcodec/
Project-URL: Documentation, https://acomms.pages.whoi.edu/ltcodec/
Classifier: Programming Language :: Python :: 3

@@ -10,0 +11,0 @@ Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)

[metadata]
name = ltcodecs
version = 1.1.0
version = 2.0.0
author = whoi

@@ -9,5 +9,6 @@ author_email = egallimore@whoi.edu

long_description_content_type = text/markdown
url = https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
url = https://git.whoi.edu/acomms/ltcodec/
project_urls =
Bug Tracker = https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
Bug Tracker = https://git.whoi.edu/acomms/ltcodec/
Documentation = https://acomms.pages.whoi.edu/ltcodec/
classifiers =

@@ -14,0 +15,0 @@ Programming Language :: Python :: 3

Metadata-Version: 2.1
Name: ltcodecs
Version: 1.1.0
Version: 2.0.0
Summary: LT Codecs
Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
Home-page: https://git.whoi.edu/acomms/ltcodec/
Author: whoi
Author-email: egallimore@whoi.edu
Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs
Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ltcodec/
Project-URL: Documentation, https://acomms.pages.whoi.edu/ltcodec/
Classifier: Programming Language :: Python :: 3

@@ -10,0 +11,0 @@ Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)

@@ -5,3 +5,3 @@ """

This module contains the NullableCodec class, which uses a boolean field to control whether a target field is optionally
This module contains the OptionalCodec class, which uses a boolean field to control whether a target field is optionally
encoded.

@@ -25,24 +25,36 @@ """

:param target_field: Target field name
:param target_type: ROS type of the target field (as a string)
:param target_params: dictionary with target field codec parameters
Args:
target_field: Target field name (used to target a single field). If this is set, target_fields must not be set.
target_fields: Dict containing fields and associated parameters. Cannot be set if target_field is set.
target_type: ROS type of the target field (as a string)
target_params: dictionary with target field codec parameters
"""
def __init__(
self, target_field: str, target_type: str, target_params=None, **kwargs
self, target_fields: dict = None, **kwargs: object
) -> None:
self.target_field = target_field
if target_params:
self.target_field_codec = ltcodecs.field_codec_classes[target_type](
**target_params
)
else:
self.target_field_codec = ltcodecs.field_codec_classes[target_type]()
self.target_fields = target_fields
self.field_codecs = {}
if target_fields:
for field_name, field_params in self.target_fields.items():
# print(field_name, field_params['codec'])
# print(field_codecs.field_codec_classes[field_params['codec']])
try:
self.field_codecs[field_name] = ltcodecs.field_codec_classes[field_params["codec"]](**field_params)
except KeyError as err:
raise KeyError(
f"Error parsing codec config for {field_name}. Got params:\n{field_params}\nError: {err}"
) from err
def encode_multiple(self, value: bool, message_dict: dict) -> tuple[Bits, bool, dict]:
"""
encode a pair of fields: the boolean used to control the "nullable" target, and the target if requested
encode a set of fields: the boolean used to control the "nullable" target field(s), and the target if requested
:param value: the boolean field value that indicates whether the target should be encoded
:param value: the boolean field value that indicates whether the target fields should be encoded
:param message_dict: the full message from which the target field is read
:return: A tuple containing the encoded bits, the boolean value that indicates if the optional fields are encoded
and a dictionary of the encoded fields (after compression, if the target field codec does that).
"""

@@ -56,7 +68,22 @@ value = bool(value)

# Otherwise, we want to encode a bit (to indicate that the target is present), and then the target
target_bits, target_value = self.target_field_codec.encode(message_dict[self.target_field])
value_bits.append(target_bits)
# Otherwise, we want to encode a bit (to indicate that the target is present), and then the target fields
encoded_dict = {}
for field_name, field_params in self.target_fields.items():
try:
field_codec = self.field_codecs[field_name]
# Note that metadata encoding is done at the ros_msg_codec level, not here
if not field_codec or isinstance(field_codec, str):
continue
if isinstance(field_codec, MultipleFieldCodec):
field_bits, encoded_dict[field_name], encoded_fields_dict = field_codec.encode_multiple(value, message_dict)
encoded_dict = {**encoded_dict, **encoded_fields_dict}
else:
field_bits, encoded_dict[field_name] = field_codec.encode(message_dict[field_name])
return value_bits, value, {self.target_field: target_value}
value_bits.append(field_bits)
except Exception as err:
raise EncodingFailed(
f'Error encoding field "{field_name}" with codec {field_codec} (max len bits {field_codec.max_length_bits})'
) from err
return value_bits, value, encoded_dict

@@ -80,5 +107,12 @@ def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[bool, dict]:

target_value = self.target_field_codec.decode(bits_to_decode)
return value, {self.target_field: target_value}
decoded_message = {}
for field_name, field_params in self.target_fields.items():
field_codec = self.field_codecs[field_name]
if hasattr(field_codec, "decode_as_dict"):
decoded_message[field_name] = field_codec.decode_as_dict(bits_to_decode)
else:
decoded_message[field_name] = field_codec.decode(bits_to_decode)
return value, decoded_message
@property

@@ -90,2 +124,2 @@ def min_length_bits(self) -> int:

def max_length_bits(self) -> int:
return 1 + self.target_field_codec.max_length_bits
return 1 + sum([c.max_length_bits for c in self.field_codecs.values()])

@@ -20,9 +20,21 @@ """

codec for variable-length array
Args:
element_type: codec to use to encode each element of the array
max_length: maximum number of elements to encode
element_params: Codec parameters dictionary to use for the element codec
nullable: If this is True, use a bit to indicate if the array is empty. This allows encoding an empty array
with a single bit.
"""
def __init__(
self, element_type: str, max_length: int, element_params=None, **kwargs
self, element_type: str, max_length: int, element_params=None, nullable=False, **kwargs
) -> None:
self.max_length = max_length
self.length_codec = VarintCodec(min_value=0, max_value=self.max_length)
self.nullable = nullable
if not self.nullable:
self.length_codec = VarintCodec(min_value=0, max_value=self.max_length)
else:
# If we are nullable, we don't need to encode length 0, since it is handled by the nullable flag
self.length_codec = VarintCodec(min_value=1, max_value=self.max_length)
print("element_type", element_type)

@@ -40,15 +52,20 @@ if element_params:

"""
value = value[0 : self.max_length]
length_bits, _ = self.length_codec.encode(len(value))
value_bits = BitArray(length_bits)
value_bits = BitArray()
encoded_value_list = []
for element in value:
(
element_bits,
element_value,
) = self.element_field_codec.encode(element)
value_bits.append(element_bits)
encoded_value_list.append(element_value)
if self.nullable:
value_bits.append(BitArray(bool=(len(value) > 0)))
if len(value) > 0 or not self.nullable:
value = value[0 : self.max_length]
length_bits, _ = self.length_codec.encode(len(value))
value_bits.append(length_bits)
for element in value:
(
element_bits,
element_value,
) = self.element_field_codec.encode(element)
value_bits.append(element_bits)
encoded_value_list.append(element_value)
return value_bits, encoded_value_list

@@ -60,7 +77,12 @@

"""
num_elements = self.length_codec.decode(bits_to_decode)
not_null = True
if self.nullable:
not_null = bits_to_decode.read('bool')
decoded_list = []
for i in range(num_elements):
element = self.element_field_codec.decode(bits_to_decode)
decoded_list.append(element)
if not_null:
num_elements = self.length_codec.decode(bits_to_decode)
for i in range(num_elements):
element = self.element_field_codec.decode(bits_to_decode)
decoded_list.append(element)
return decoded_list

@@ -70,8 +92,14 @@

def min_length_bits(self) -> int:
return self.length_codec.max_length_bits
if self.nullable:
return 1
else:
return self.length_codec.max_length_bits
@property
def max_length_bits(self) -> int:
return self.length_codec.max_length_bits + (
length_bits = self.length_codec.max_length_bits + (
self.max_length * self.element_field_codec.max_length_bits
)
if self.nullable:
length_bits += 1
return length_bits

@@ -295,10 +295,25 @@ #!/usr/bin/env python3

)
encoded_int = codec.encode(input_array)
encoded_bits = codec.encode(input_array)
bit_stream = ConstBitStream(encoded_bits[0])
decoded_array = codec.decode(bit_stream)
bit_stream = ConstBitStream(encoded_int[0])
assert decoded_array == input_array, "decoded array does not match input array"
decoded_int = codec.decode(bit_stream)
# Now, test making it nullable
codec = ltcodecs.VariableLenArrayCodec(
"integer", 4, nullable=True, element_params={"min_value": 0, "max_value": 100}
)
encoded_bits = codec.encode(input_array)
bit_stream = ConstBitStream(encoded_bits[0])
decoded_array = codec.decode(bit_stream)
assert decoded_int == input_array, "decoded array does not match input array"
assert decoded_array == input_array, "decoded array does not match input array (with nullable codec)"
# ... and try actually making it null
encoded_bits = codec.encode([])
bit_stream = ConstBitStream(encoded_bits[0])
decoded_array = codec.decode(bit_stream)
assert len(bit_stream) == 1, "Encoded message (when nulled) has wrong number of bits"
def test_ros_message_codec(self) -> None:

@@ -319,7 +334,7 @@ """test functionality of the ros message codec"""

def test_nullable_codec(self) -> None:
"""test functionality of the ros message codec"""
def test_optional_codec(self) -> None:
"""test functionality of the OptionalFieldCodec codec"""
# use a ColorRGBA message because it's part of std_msgs and has a field that we can cast as a boolean
msg = ColorRGBA(r=1, g=1234.5678, b=3, a=4)
msg = ColorRGBA(r=1, g=1234.5678, b=3.14, a=4)
message_dict = {}

@@ -329,3 +344,3 @@ for field in msg.__slots__:

codec = ltcodecs.OptionalFieldCodec(target_field='g', target_type='float32')
codec = ltcodecs.OptionalFieldCodec(target_fields={'g': {'codec': 'float32'}, 'b': {'codec': 'float32'}})

@@ -339,3 +354,4 @@ encoded_msg = codec.encode_multiple(msg.r, message_dict)

assert decoded_dict['g'] == pytest.approx(msg.g), "decoded target value doesn't match input"
assert len(bit_stream) == 33, "Encoded message has wrong number of bits"
assert decoded_dict['b'] == pytest.approx(msg.b), "decoded target value doesn't match input"
assert len(bit_stream) == 1+32+32, "Encoded message has wrong number of bits"

@@ -342,0 +358,0 @@ # Now try with nulled