ltcodecs
Advanced tools
+4
-3
| Metadata-Version: 2.1 | ||
| Name: ltcodecs | ||
| Version: 1.1.0 | ||
| Version: 2.0.0 | ||
| Summary: LT Codecs | ||
| Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| Home-page: https://git.whoi.edu/acomms/ltcodec/ | ||
| Author: whoi | ||
| Author-email: egallimore@whoi.edu | ||
| Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ltcodec/ | ||
| Project-URL: Documentation, https://acomms.pages.whoi.edu/ltcodec/ | ||
| Classifier: Programming Language :: Python :: 3 | ||
@@ -10,0 +11,0 @@ Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+) |
+4
-3
| [metadata] | ||
| name = ltcodecs | ||
| version = 1.1.0 | ||
| version = 2.0.0 | ||
| author = whoi | ||
@@ -9,5 +9,6 @@ author_email = egallimore@whoi.edu | ||
| long_description_content_type = text/markdown | ||
| url = https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| url = https://git.whoi.edu/acomms/ltcodec/ | ||
| project_urls = | ||
| Bug Tracker = https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| Bug Tracker = https://git.whoi.edu/acomms/ltcodec/ | ||
| Documentation = https://acomms.pages.whoi.edu/ltcodec/ | ||
| classifiers = | ||
@@ -14,0 +15,0 @@ Programming Language :: Python :: 3 |
| Metadata-Version: 2.1 | ||
| Name: ltcodecs | ||
| Version: 1.1.0 | ||
| Version: 2.0.0 | ||
| Summary: LT Codecs | ||
| Home-page: https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| Home-page: https://git.whoi.edu/acomms/ltcodec/ | ||
| Author: whoi | ||
| Author-email: egallimore@whoi.edu | ||
| Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ros_acomms/tree/master/src/acomms_codecs | ||
| Project-URL: Bug Tracker, https://git.whoi.edu/acomms/ltcodec/ | ||
| Project-URL: Documentation, https://acomms.pages.whoi.edu/ltcodec/ | ||
| Classifier: Programming Language :: Python :: 3 | ||
@@ -10,0 +11,0 @@ Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+) |
@@ -5,3 +5,3 @@ """ | ||
| This module contains the NullableCodec class, which uses a boolean field to control whether a target field is optionally | ||
| This module contains the OptionalCodec class, which uses a boolean field to control whether a target field is optionally | ||
| encoded. | ||
@@ -25,24 +25,36 @@ """ | ||
| :param target_field: Target field name | ||
| :param target_type: ROS type of the target field (as a string) | ||
| :param target_params: dictionary with target field codec parameters | ||
| Args: | ||
| target_field: Target field name (used to target a single field). If this is set, target_fields must not be set. | ||
| target_fields: Dict containing fields and associated parameters. Cannot be set if target_field is set. | ||
| target_type: ROS type of the target field (as a string) | ||
| target_params: dictionary with target field codec parameters | ||
| """ | ||
| def __init__( | ||
| self, target_field: str, target_type: str, target_params=None, **kwargs | ||
| self, target_fields: dict = None, **kwargs: object | ||
| ) -> None: | ||
| self.target_field = target_field | ||
| if target_params: | ||
| self.target_field_codec = ltcodecs.field_codec_classes[target_type]( | ||
| **target_params | ||
| ) | ||
| else: | ||
| self.target_field_codec = ltcodecs.field_codec_classes[target_type]() | ||
| self.target_fields = target_fields | ||
| self.field_codecs = {} | ||
| if target_fields: | ||
| for field_name, field_params in self.target_fields.items(): | ||
| # print(field_name, field_params['codec']) | ||
| # print(field_codecs.field_codec_classes[field_params['codec']]) | ||
| try: | ||
| self.field_codecs[field_name] = ltcodecs.field_codec_classes[field_params["codec"]](**field_params) | ||
| except KeyError as err: | ||
| raise KeyError( | ||
| f"Error parsing codec config for {field_name}. Got params:\n{field_params}\nError: {err}" | ||
| ) from err | ||
| def encode_multiple(self, value: bool, message_dict: dict) -> tuple[Bits, bool, dict]: | ||
| """ | ||
| encode a pair of fields: the boolean used to control the "nullable" target, and the target if requested | ||
| encode a set of fields: the boolean used to control the "nullable" target field(s), and the target if requested | ||
| :param value: the boolean field value that indicates whether the target should be encoded | ||
| :param value: the boolean field value that indicates whether the target fields should be encoded | ||
| :param message_dict: the full message from which the target field is read | ||
| :return: A tuple containing the encoded bits, the boolean value that indicates if the optional fields are encoded | ||
| and a dictionary of the encoded fields (after compression, if the target field codec does that). | ||
| """ | ||
@@ -56,7 +68,22 @@ value = bool(value) | ||
| # Otherwise, we want to encode a bit (to indicate that the target is present), and then the target | ||
| target_bits, target_value = self.target_field_codec.encode(message_dict[self.target_field]) | ||
| value_bits.append(target_bits) | ||
| # Otherwise, we want to encode a bit (to indicate that the target is present), and then the target fields | ||
| encoded_dict = {} | ||
| for field_name, field_params in self.target_fields.items(): | ||
| try: | ||
| field_codec = self.field_codecs[field_name] | ||
| # Note that metadata encoding is done at the ros_msg_codec level, not here | ||
| if not field_codec or isinstance(field_codec, str): | ||
| continue | ||
| if isinstance(field_codec, MultipleFieldCodec): | ||
| field_bits, encoded_dict[field_name], encoded_fields_dict = field_codec.encode_multiple(value, message_dict) | ||
| encoded_dict = {**encoded_dict, **encoded_fields_dict} | ||
| else: | ||
| field_bits, encoded_dict[field_name] = field_codec.encode(message_dict[field_name]) | ||
| return value_bits, value, {self.target_field: target_value} | ||
| value_bits.append(field_bits) | ||
| except Exception as err: | ||
| raise EncodingFailed( | ||
| f'Error encoding field "{field_name}" with codec {field_codec} (max len bits {field_codec.max_length_bits})' | ||
| ) from err | ||
| return value_bits, value, encoded_dict | ||
@@ -80,5 +107,12 @@ def decode_multiple(self, bits_to_decode: ConstBitStream) -> tuple[bool, dict]: | ||
| target_value = self.target_field_codec.decode(bits_to_decode) | ||
| return value, {self.target_field: target_value} | ||
| decoded_message = {} | ||
| for field_name, field_params in self.target_fields.items(): | ||
| field_codec = self.field_codecs[field_name] | ||
| if hasattr(field_codec, "decode_as_dict"): | ||
| decoded_message[field_name] = field_codec.decode_as_dict(bits_to_decode) | ||
| else: | ||
| decoded_message[field_name] = field_codec.decode(bits_to_decode) | ||
| return value, decoded_message | ||
| @property | ||
@@ -90,2 +124,2 @@ def min_length_bits(self) -> int: | ||
| def max_length_bits(self) -> int: | ||
| return 1 + self.target_field_codec.max_length_bits | ||
| return 1 + sum([c.max_length_bits for c in self.field_codecs.values()]) |
@@ -20,9 +20,21 @@ """ | ||
| codec for variable-length array | ||
| Args: | ||
| element_type: codec to use to encode each element of the array | ||
| max_length: maximum number of elements to encode | ||
| element_params: Codec parameters dictionary to use for the element codec | ||
| nullable: If this is True, use a bit to indicate if the array is empty. This allows encoding an empty array | ||
| with a single bit. | ||
| """ | ||
| def __init__( | ||
| self, element_type: str, max_length: int, element_params=None, **kwargs | ||
| self, element_type: str, max_length: int, element_params=None, nullable=False, **kwargs | ||
| ) -> None: | ||
| self.max_length = max_length | ||
| self.length_codec = VarintCodec(min_value=0, max_value=self.max_length) | ||
| self.nullable = nullable | ||
| if not self.nullable: | ||
| self.length_codec = VarintCodec(min_value=0, max_value=self.max_length) | ||
| else: | ||
| # If we are nullable, we don't need to encode length 0, since it is handled by the nullable flag | ||
| self.length_codec = VarintCodec(min_value=1, max_value=self.max_length) | ||
| print("element_type", element_type) | ||
@@ -40,15 +52,20 @@ if element_params: | ||
| """ | ||
| value = value[0 : self.max_length] | ||
| length_bits, _ = self.length_codec.encode(len(value)) | ||
| value_bits = BitArray(length_bits) | ||
| value_bits = BitArray() | ||
| encoded_value_list = [] | ||
| for element in value: | ||
| ( | ||
| element_bits, | ||
| element_value, | ||
| ) = self.element_field_codec.encode(element) | ||
| value_bits.append(element_bits) | ||
| encoded_value_list.append(element_value) | ||
| if self.nullable: | ||
| value_bits.append(BitArray(bool=(len(value) > 0))) | ||
| if len(value) > 0 or not self.nullable: | ||
| value = value[0 : self.max_length] | ||
| length_bits, _ = self.length_codec.encode(len(value)) | ||
| value_bits.append(length_bits) | ||
| for element in value: | ||
| ( | ||
| element_bits, | ||
| element_value, | ||
| ) = self.element_field_codec.encode(element) | ||
| value_bits.append(element_bits) | ||
| encoded_value_list.append(element_value) | ||
| return value_bits, encoded_value_list | ||
@@ -60,7 +77,12 @@ | ||
| """ | ||
| num_elements = self.length_codec.decode(bits_to_decode) | ||
| not_null = True | ||
| if self.nullable: | ||
| not_null = bits_to_decode.read('bool') | ||
| decoded_list = [] | ||
| for i in range(num_elements): | ||
| element = self.element_field_codec.decode(bits_to_decode) | ||
| decoded_list.append(element) | ||
| if not_null: | ||
| num_elements = self.length_codec.decode(bits_to_decode) | ||
| for i in range(num_elements): | ||
| element = self.element_field_codec.decode(bits_to_decode) | ||
| decoded_list.append(element) | ||
| return decoded_list | ||
@@ -70,8 +92,14 @@ | ||
| def min_length_bits(self) -> int: | ||
| return self.length_codec.max_length_bits | ||
| if self.nullable: | ||
| return 1 | ||
| else: | ||
| return self.length_codec.max_length_bits | ||
| @property | ||
| def max_length_bits(self) -> int: | ||
| return self.length_codec.max_length_bits + ( | ||
| length_bits = self.length_codec.max_length_bits + ( | ||
| self.max_length * self.element_field_codec.max_length_bits | ||
| ) | ||
| if self.nullable: | ||
| length_bits += 1 | ||
| return length_bits |
+25
-9
@@ -295,10 +295,25 @@ #!/usr/bin/env python3 | ||
| ) | ||
| encoded_int = codec.encode(input_array) | ||
| encoded_bits = codec.encode(input_array) | ||
| bit_stream = ConstBitStream(encoded_bits[0]) | ||
| decoded_array = codec.decode(bit_stream) | ||
| bit_stream = ConstBitStream(encoded_int[0]) | ||
| assert decoded_array == input_array, "decoded array does not match input array" | ||
| decoded_int = codec.decode(bit_stream) | ||
| # Now, test making it nullable | ||
| codec = ltcodecs.VariableLenArrayCodec( | ||
| "integer", 4, nullable=True, element_params={"min_value": 0, "max_value": 100} | ||
| ) | ||
| encoded_bits = codec.encode(input_array) | ||
| bit_stream = ConstBitStream(encoded_bits[0]) | ||
| decoded_array = codec.decode(bit_stream) | ||
| assert decoded_int == input_array, "decoded array does not match input array" | ||
| assert decoded_array == input_array, "decoded array does not match input array (with nullable codec)" | ||
| # ... and try actually making it null | ||
| encoded_bits = codec.encode([]) | ||
| bit_stream = ConstBitStream(encoded_bits[0]) | ||
| decoded_array = codec.decode(bit_stream) | ||
| assert len(bit_stream) == 1, "Encoded message (when nulled) has wrong number of bits" | ||
| def test_ros_message_codec(self) -> None: | ||
@@ -319,7 +334,7 @@ """test functionality of the ros message codec""" | ||
| def test_nullable_codec(self) -> None: | ||
| """test functionality of the ros message codec""" | ||
| def test_optional_codec(self) -> None: | ||
| """test functionality of the OptionalFieldCodec codec""" | ||
| # use a ColorRGBA message because it's part of std_msgs and has a field that we can cast as a boolean | ||
| msg = ColorRGBA(r=1, g=1234.5678, b=3, a=4) | ||
| msg = ColorRGBA(r=1, g=1234.5678, b=3.14, a=4) | ||
| message_dict = {} | ||
@@ -329,3 +344,3 @@ for field in msg.__slots__: | ||
| codec = ltcodecs.OptionalFieldCodec(target_field='g', target_type='float32') | ||
| codec = ltcodecs.OptionalFieldCodec(target_fields={'g': {'codec': 'float32'}, 'b': {'codec': 'float32'}}) | ||
@@ -339,3 +354,4 @@ encoded_msg = codec.encode_multiple(msg.r, message_dict) | ||
| assert decoded_dict['g'] == pytest.approx(msg.g), "decoded target value doesn't match input" | ||
| assert len(bit_stream) == 33, "Encoded message has wrong number of bits" | ||
| assert decoded_dict['b'] == pytest.approx(msg.b), "decoded target value doesn't match input" | ||
| assert len(bit_stream) == 1+32+32, "Encoded message has wrong number of bits" | ||
@@ -342,0 +358,0 @@ # Now try with nulled |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
82513
5.26%1767
4.06%