ltcodecs
Advanced tools
| """ | ||
| abstract base class for codecs that encode multiple fields | ||
| """ | ||
| from __future__ import annotations | ||
| from abc import ABC, abstractmethod | ||
| from bitstring import ConstBitStream, Bits | ||
| from typing import Any | ||
| class MultipleFieldCodec(ABC): | ||
| """ | ||
| abstract base class for field codecs | ||
| """ | ||
| def __init__(self, **kwargs) -> None: | ||
| pass | ||
| @abstractmethod | ||
| def encode_multiple(self, value: Any, message_dict: dict) -> tuple[Bits, Any]: | ||
| """ | ||
| encodes multiple fields | ||
| :param value: the "primary" value to encode | ||
| :param message_dict: the full message from which the additional fields to encode may be read | ||
| """ | ||
| pass | ||
| @abstractmethod | ||
| def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[Any, dict]: | ||
| """ | ||
| decodes multiple values | ||
| Args: | ||
| bits_to_decode: ConstBitStream to decode | ||
| Returns: | ||
| value: the value of the "primary" decoded field | ||
| values_dict: Dictionary with other decoded field name: value pairs | ||
| """ | ||
| pass | ||
| @property | ||
| @abstractmethod | ||
| def max_length_bits(self) -> int: | ||
| pass | ||
| @property | ||
| @abstractmethod | ||
| def min_length_bits(self) -> int: | ||
| pass |
| """ | ||
| ltcodecs.nullable_codec | ||
| ------------------------------ | ||
| This module contains the NullableCodec class, which uses a boolean field to control whether a target field is optionally | ||
| encoded. | ||
| """ | ||
| from __future__ import annotations | ||
| from bitstring import BitArray, ConstBitStream, Bits | ||
| from .multiple_field_codec import MultipleFieldCodec | ||
| import ltcodecs as ltcodecs | ||
| from typing import Any | ||
| from .exceptions import EncodingFailed | ||
| class OptionalFieldCodec(MultipleFieldCodec): | ||
| """ | ||
| codec for optional fields, where a boolean field controls whether a "target" field should be encoded. | ||
| If the controlling field is true, the target field will be encoded using target_params. If false, the | ||
| target_field won't be encoded. | ||
| :param target_field: Target field name | ||
| :param target_type: ROS type of the target field (as a string) | ||
| :param target_params: dictionary with target field codec parameters | ||
| """ | ||
| def __init__( | ||
| self, target_field: str, target_type: str, target_params=None, **kwargs | ||
| ) -> None: | ||
| self.target_field = target_field | ||
| if target_params: | ||
| self.target_field_codec = ltcodecs.field_codec_classes[target_type]( | ||
| **target_params | ||
| ) | ||
| else: | ||
| self.target_field_codec = ltcodecs.field_codec_classes[target_type]() | ||
| def encode_multiple(self, value: bool, message_dict: dict) -> tuple[Bits, bool, dict]: | ||
| """ | ||
| encode a pair of fields: the boolean used to control the "nullable" target, and the target if requested | ||
| :param value: the boolean field value that indicates whether the target should be encoded | ||
| :param message_dict: the full message from which the target field is read | ||
| """ | ||
| value = bool(value) | ||
| value_bits = BitArray(bool=value) | ||
| # If the value is false, we treat this like a boolean codec | ||
| if not value: | ||
| return value_bits, value, {} | ||
| # Otherwise, we want to encode a bit (to indicate that the target is present), and then the target | ||
| target_bits, target_value = self.target_field_codec.encode(message_dict[self.target_field]) | ||
| value_bits.append(target_bits) | ||
| return value_bits, value, {self.target_field: target_value} | ||
| def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[bool, dict]: | ||
| """ | ||
| decode a nullable (optional) field: the boolean used to control the "nullable" target, and the target if present | ||
| Args: | ||
| bits_to_decode: the bits to decode | ||
| Returns: | ||
| value: the value of the controlling boolean field | ||
| values_dict: Dictionary with other decoded field name: value pairs | ||
| """ | ||
| value = bits_to_decode.read("bool") | ||
| if not value: | ||
| return value, {} | ||
| target_value = self.target_field_codec.decode(bits_to_decode) | ||
| return value, {self.target_field: target_value} | ||
| @property | ||
| def min_length_bits(self) -> int: | ||
| return 1 | ||
| @property | ||
| def max_length_bits(self) -> int: | ||
| return 1 + self.target_field_codec.max_length_bits |
+6
-2
| Metadata-Version: 2.1 | ||
| Name: ltcodecs | ||
| Version: 1.0.1 | ||
| Version: 1.1.0 | ||
| Summary: LT Codecs | ||
@@ -18,2 +18,6 @@ Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| The package was original part of ros_acomms under the field_codecs directory. | ||
| Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links. | ||
| See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/ | ||
| Install from Pypi via `pip install ltcodecs` |
+5
-1
| # LT Codecs | ||
| The package was original part of ros_acomms under the field_codecs directory. | ||
| Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links. | ||
| See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/ | ||
| Install from Pypi via `pip install ltcodecs` |
+1
-1
| [metadata] | ||
| name = ltcodecs | ||
| version = 1.0.1 | ||
| version = 1.1.0 | ||
| author = whoi | ||
@@ -5,0 +5,0 @@ author_email = egallimore@whoi.edu |
| Metadata-Version: 2.1 | ||
| Name: ltcodecs | ||
| Version: 1.0.1 | ||
| Version: 1.1.0 | ||
| Summary: LT Codecs | ||
@@ -18,2 +18,6 @@ Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| The package was original part of ros_acomms under the field_codecs directory. | ||
| Flexible, lightweight codec system for transferring messages over acoustic and other low-throughput links. | ||
| See the documentation for more information: https://acomms.pages.whoi.edu/ltcodec/ | ||
| Install from Pypi via `pip install ltcodecs` |
@@ -18,2 +18,4 @@ LICENSE | ||
| src/ltcodecs/lzma_codec.py | ||
| src/ltcodecs/multiple_field_codec.py | ||
| src/ltcodecs/optional_field_codec.py | ||
| src/ltcodecs/padding_codec.py | ||
@@ -20,0 +22,0 @@ src/ltcodecs/ros_message_codec.py |
@@ -21,2 +21,3 @@ from .varint_codec import VarintCodec | ||
| from .lzma_codec import LzmaCodec | ||
| from .optional_field_codec import OptionalFieldCodec | ||
| from .exceptions import EncodingFailed | ||
@@ -57,2 +58,3 @@ | ||
| "lzma": LzmaCodec, | ||
| "optional": OptionalFieldCodec, | ||
| } | ||
@@ -59,0 +61,0 @@ |
@@ -20,2 +20,3 @@ """ | ||
| from .field_codec import FieldCodec | ||
| from .multiple_field_codec import MultipleFieldCodec | ||
| from .exceptions import EncodingFailed | ||
@@ -127,6 +128,7 @@ | ||
| continue | ||
| ( | ||
| field_bits, | ||
| encoded_dict[field_name], | ||
| ) = field_codec.encode(message_dict[field_name]) | ||
| if isinstance(field_codec, MultipleFieldCodec): | ||
| field_bits, encoded_dict[field_name], encoded_fields_dict = field_codec.encode_multiple(message_dict[field_name], message_dict) | ||
| encoded_dict = {**encoded_dict, **encoded_fields_dict} | ||
| else: | ||
| field_bits, encoded_dict[field_name] = field_codec.encode(message_dict[field_name]) | ||
@@ -175,3 +177,7 @@ encoded_bits.append(field_bits) | ||
| else: | ||
| decoded_message[field_name] = field_codec.decode(bits_to_decode) | ||
| if isinstance(field_codec, MultipleFieldCodec): | ||
| decoded_message[field_name], decoded_dict = field_codec.decode_multiple(bits_to_decode) | ||
| decoded_message = {**decoded_message, **decoded_dict} | ||
| else: | ||
| decoded_message[field_name] = field_codec.decode(bits_to_decode) | ||
| return self.ros_msg_class(**decoded_message) | ||
@@ -178,0 +184,0 @@ |
+32
-1
@@ -12,3 +12,3 @@ #!/usr/bin/env python3 | ||
| import rospy | ||
| from std_msgs.msg import String | ||
| from std_msgs.msg import String, ColorRGBA | ||
@@ -319,2 +319,33 @@ import ltcodecs | ||
| def test_nullable_codec(self) -> None: | ||
| """test functionality of the ros message codec""" | ||
| # use a ColorRGBA message because it's part of std_msgs and has a field that we can cast as a boolean | ||
| msg = ColorRGBA(r=1, g=1234.5678, b=3, a=4) | ||
| message_dict = {} | ||
| for field in msg.__slots__: | ||
| message_dict[field] = getattr(msg, field) | ||
| codec = ltcodecs.OptionalFieldCodec(target_field='g', target_type='float32') | ||
| encoded_msg = codec.encode_multiple(msg.r, message_dict) | ||
| bit_stream = ConstBitStream(encoded_msg[0]) | ||
| decoded_r, decoded_dict = codec.decode_multiple(bit_stream) | ||
| decoded_dict = {'r': decoded_r, **decoded_dict} | ||
| assert decoded_dict['g'] == pytest.approx(msg.g), "decoded target value doesn't match input" | ||
| assert len(bit_stream) == 33, "Encoded message has wrong number of bits" | ||
| # Now try with nulled | ||
| msg.r = 0 | ||
| encoded_msg = codec.encode_multiple(msg.r, message_dict) | ||
| bit_stream = ConstBitStream(encoded_msg[0]) | ||
| decoded_r, decoded_dict = codec.decode_multiple(bit_stream) | ||
| decoded_dict = {'r': decoded_r, **decoded_dict} | ||
| assert len(bit_stream) == 1, "Encoded message (when nulled) has wrong number of bits" | ||
| def test_ros_message_field_codec(self) -> None: | ||
@@ -321,0 +352,0 @@ """test functionality of the ros message field codec""" |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
78388
9.6%33
6.45%1698
8.64%