You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

ltcodecs

Package Overview
Dependencies
Maintainers
2
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ltcodecs - pypi Package Compare versions

Comparing version
1.0.1
to
1.1.0
+51
src/ltcodecs/multiple_field_codec.py
"""
abstract base class for codecs that encode multiple fields
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from bitstring import ConstBitStream, Bits
from typing import Any
class MultipleFieldCodec(ABC):
"""
abstract base class for field codecs
"""
def __init__(self, **kwargs) -> None:
pass
@abstractmethod
def encode_multiple(self, value: Any, message_dict: dict) -> tuple[Bits, Any]:
"""
encodes multiple fields
:param value: the "primary" value to encode
:param message_dict: the full message from which the additional fields to encode may be read
"""
pass
@abstractmethod
def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[Any, dict]:
"""
decodes multiple values
Args:
bits_to_decode: ConstBitStream to decode
Returns:
value: the value of the "primary" decoded field
values_dict: Dictionary with other decoded field name: value pairs
"""
pass
@property
@abstractmethod
def max_length_bits(self) -> int:
pass
@property
@abstractmethod
def min_length_bits(self) -> int:
pass
"""
ltcodecs.nullable_codec
------------------------------
This module contains the NullableCodec class, which uses a boolean field to control whether a target field is optionally
encoded.
"""
from __future__ import annotations
from bitstring import BitArray, ConstBitStream, Bits
from .multiple_field_codec import MultipleFieldCodec
import ltcodecs as ltcodecs
from typing import Any
from .exceptions import EncodingFailed
class OptionalFieldCodec(MultipleFieldCodec):
"""
codec for optional fields, where a boolean field controls whether a "target" field should be encoded.
If the controlling field is true, the target field will be encoded using target_params. If false, the
target_field won't be encoded.
:param target_field: Target field name
:param target_type: ROS type of the target field (as a string)
:param target_params: dictionary with target field codec parameters
"""
def __init__(
self, target_field: str, target_type: str, target_params=None, **kwargs
) -> None:
self.target_field = target_field
if target_params:
self.target_field_codec = ltcodecs.field_codec_classes[target_type](
**target_params
)
else:
self.target_field_codec = ltcodecs.field_codec_classes[target_type]()
def encode_multiple(self, value: bool, message_dict: dict) -> tuple[Bits, bool, dict]:
"""
encode a pair of fields: the boolean used to control the "nullable" target, and the target if requested
:param value: the boolean field value that indicates whether the target should be encoded
:param message_dict: the full message from which the target field is read
"""
value = bool(value)
value_bits = BitArray(bool=value)
# If the value is false, we treat this like a boolean codec
if not value:
return value_bits, value, {}
# Otherwise, we want to encode a bit (to indicate that the target is present), and then the target
target_bits, target_value = self.target_field_codec.encode(message_dict[self.target_field])
value_bits.append(target_bits)
return value_bits, value, {self.target_field: target_value}
def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[bool, dict]:
"""
decode a nullable (optional) field: the boolean used to control the "nullable" target, and the target if present
Args:
bits_to_decode: the bits to decode
Returns:
value: the value of the controlling boolean field
values_dict: Dictionary with other decoded field name: value pairs
"""
value = bits_to_decode.read("bool")
if not value:
return value, {}
target_value = self.target_field_codec.decode(bits_to_decode)
return value, {self.target_field: target_value}
@property
def min_length_bits(self) -> int:
return 1
@property
def max_length_bits(self) -> int:
return 1 + self.target_field_codec.max_length_bits
+6
-2
Metadata-Version: 2.1
Name: ltcodecs
Version: 1.0.1
Version: 1.1.0
Summary: LT Codecs

@@ -18,2 +18,6 @@ Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs

The package was original part of ros_acomms under the field_codecs directory.
Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links.
See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/
Install from Pypi via `pip install ltcodecs`
# LT Codecs
The package was original part of ros_acomms under the field_codecs directory.
Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links.
See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/
Install from Pypi via `pip install ltcodecs`
[metadata]
name = ltcodecs
version = 1.0.1
version = 1.1.0
author = whoi

@@ -5,0 +5,0 @@ author_email = egallimore@whoi.edu

Metadata-Version: 2.1
Name: ltcodecs
Version: 1.0.1
Version: 1.1.0
Summary: LT Codecs

@@ -18,2 +18,6 @@ Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs

The package was original part of ros_acomms under the field_codecs directory.
Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links.
See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/
Install from Pypi via `pip install ltcodecs`

@@ -18,2 +18,4 @@ LICENSE

src/ltcodecs/lzma_codec.py
src/ltcodecs/multiple_field_codec.py
src/ltcodecs/optional_field_codec.py
src/ltcodecs/padding_codec.py

@@ -20,0 +22,0 @@ src/ltcodecs/ros_message_codec.py

@@ -21,2 +21,3 @@ from .varint_codec import VarintCodec

from .lzma_codec import LzmaCodec
from .optional_field_codec import OptionalFieldCodec
from .exceptions import EncodingFailed

@@ -57,2 +58,3 @@

"lzma": LzmaCodec,
"optional": OptionalFieldCodec,
}

@@ -59,0 +61,0 @@

@@ -20,2 +20,3 @@ """

from .field_codec import FieldCodec
from .multiple_field_codec import MultipleFieldCodec
from .exceptions import EncodingFailed

@@ -127,6 +128,7 @@

continue
(
field_bits,
encoded_dict[field_name],
) = field_codec.encode(message_dict[field_name])
if isinstance(field_codec, MultipleFieldCodec):
field_bits, encoded_dict[field_name], encoded_fields_dict = field_codec.encode_multiple(message_dict[field_name], message_dict)
encoded_dict = {**encoded_dict, **encoded_fields_dict}
else:
field_bits, encoded_dict[field_name] = field_codec.encode(message_dict[field_name])

@@ -175,3 +177,7 @@ encoded_bits.append(field_bits)

else:
decoded_message[field_name] = field_codec.decode(bits_to_decode)
if isinstance(field_codec, MultipleFieldCodec):
decoded_message[field_name], decoded_dict = field_codec.decode_multiple(bits_to_decode)
decoded_message = {**decoded_message, **decoded_dict}
else:
decoded_message[field_name] = field_codec.decode(bits_to_decode)
return self.ros_msg_class(**decoded_message)

@@ -178,0 +184,0 @@

@@ -12,3 +12,3 @@ #!/usr/bin/env python3

import rospy
from std_msgs.msg import String
from std_msgs.msg import String, ColorRGBA

@@ -319,2 +319,33 @@ import ltcodecs

def test_nullable_codec(self) -> None:
"""test functionality of the ros message codec"""
# use a ColorRGBA message because it's part of std_msgs and has a field that we can cast as a boolean
msg = ColorRGBA(r=1, g=1234.5678, b=3, a=4)
message_dict = {}
for field in msg.__slots__:
message_dict[field] = getattr(msg, field)
codec = ltcodecs.OptionalFieldCodec(target_field='g', target_type='float32')
encoded_msg = codec.encode_multiple(msg.r, message_dict)
bit_stream = ConstBitStream(encoded_msg[0])
decoded_r, decoded_dict = codec.decode_multiple(bit_stream)
decoded_dict = {'r': decoded_r, **decoded_dict}
assert decoded_dict['g'] == pytest.approx(msg.g), "decoded target value doesn't match input"
assert len(bit_stream) == 33, "Encoded message has wrong number of bits"
# Now try with nulled
msg.r = 0
encoded_msg = codec.encode_multiple(msg.r, message_dict)
bit_stream = ConstBitStream(encoded_msg[0])
decoded_r, decoded_dict = codec.decode_multiple(bit_stream)
decoded_dict = {'r': decoded_r, **decoded_dict}
assert len(bit_stream) == 1, "Encoded message (when nulled) has wrong number of bits"
def test_ros_message_field_codec(self) -> None:

@@ -321,0 +352,0 @@ """test functionality of the ros message field codec"""