diff --git a/canopen/pdo/base.py b/canopen/pdo/base.py index d467cdb3..c5154ec8 100644 --- a/canopen/pdo/base.py +++ b/canopen/pdo/base.py @@ -1,6 +1,6 @@ from __future__ import annotations + import threading -import math from typing import Callable, Dict, Iterator, List, Optional, Union, TYPE_CHECKING from collections.abc import Mapping import logging @@ -260,7 +260,7 @@ def _fill_map(self, needed): self.map.append(var) def _update_data_size(self): - self.data = bytearray(int(math.ceil(self.length / 8.0))) + self.data = bytearray(-(-self.length // 8)) # rounds up @property def name(self) -> str: @@ -564,6 +564,7 @@ def get_data(self) -> bytes: """ byte_offset, bit_offset = divmod(self.offset, 8) + byte_last = -(-(self.offset + self.length) // 8) # rounds up if bit_offset or self.length % 8: # Need information of the current variable type (unsigned vs signed) data_type = self.od.data_type @@ -571,16 +572,21 @@ def get_data(self) -> bytes: # A boolean type needs to be treated as an U08 data_type = objectdictionary.UNSIGNED8 od_struct = self.od.STRUCT_TYPES[data_type] - data = od_struct.unpack_from(self.pdo_parent.data, byte_offset)[0] + # Mask of relevant bits for this mapping, starting at bit 0 + mask = (1 << self.length) - 1 + # Extract all needed bytes and convert to a number for bit operations + cur_msg_data = self.pdo_parent.data[byte_offset:byte_last] + cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little") # Shift and mask to get the correct values - data = (data >> bit_offset) & ((1 << self.length) - 1) + data_bits = (cur_msg_bits >> bit_offset) & mask + # Check if the variable is signed and if the data is negative prepend signedness - if od_struct.format.islower() and (1 << (self.length - 1)) < data: + if od_struct.format.islower() and (1 << (self.length - 1)) < data_bits: # fill up the rest of the bits to get the correct signedness - data = data | (~((1 << self.length) - 1)) - data = od_struct.pack(data) + data_bits |= ~mask + data = od_struct.pack(data_bits) else: - data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8] + data = self.pdo_parent.data[byte_offset:byte_last] return data @@ -593,27 +599,24 @@ def set_data(self, data: bytes): logger.debug("Updating %s to %s in %s", self.name, binascii.hexlify(data), self.pdo_parent.name) + byte_last = -(-(self.offset + self.length) // 8) # rounds up if bit_offset or self.length % 8: - cur_msg_data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8] - # Need information of the current variable type (unsigned vs signed) - data_type = self.od.data_type - if data_type == objectdictionary.BOOLEAN: - # A boolean type needs to be treated as an U08 - data_type = objectdictionary.UNSIGNED8 - od_struct = self.od.STRUCT_TYPES[data_type] - cur_msg_data = od_struct.unpack(cur_msg_data)[0] - # data has to have the same size as old_data - data = od_struct.unpack(data)[0] + # Mask of relevant bits for this mapping, starting at bit 0 + mask = (1 << self.length) - 1 + # Extract all needed bytes and convert to a number for bit operations + cur_msg_data = self.pdo_parent.data[byte_offset:byte_last] + cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little") # Mask out the old data value - # At the end we need to mask for correct variable length (bitwise operation failure) - shifted = (((1 << self.length) - 1) << bit_offset) & ((1 << len(self.od)) - 1) - bitwise_not = (~shifted) & ((1 << len(self.od)) - 1) - cur_msg_data = cur_msg_data & bitwise_not + cur_msg_bits &= ~(mask << bit_offset) + # Convert new value to a number for generic bit operations + data_bits = int.from_bytes(data, byteorder="little") & mask # Set the new data on the correct position - data = (data << bit_offset) | cur_msg_data - od_struct.pack_into(self.pdo_parent.data, byte_offset, data) + data_bits = (data_bits << bit_offset) | cur_msg_bits + + merged_data = data_bits.to_bytes(byte_last - byte_offset, byteorder="little") + self.pdo_parent.data[byte_offset:byte_last] = merged_data else: - self.pdo_parent.data[byte_offset:byte_offset + len(data)] = data + self.pdo_parent.data[byte_offset:byte_last] = data[0:byte_last] self.pdo_parent.update() diff --git a/test/sample.eds b/test/sample.eds index 16d0c31a..da341483 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -838,6 +838,20 @@ DataType=0x0001 AccessType=rw PDOMapping=1 +[2007] +ParameterName=UNSIGNED16 value +ObjectType=0x7 +DataType=0x0006 +AccessType=rw +PDOMapping=1 + +[2008] +ParameterName=UNSIGNED32 value +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=1 + [2020] ParameterName=Complex data type ObjectType=0x7 diff --git a/test/test_pdo.py b/test/test_pdo.py index 32963cf3..a69dc001 100644 --- a/test/test_pdo.py +++ b/test/test_pdo.py @@ -55,6 +55,35 @@ def test_bit_mapping(self): self.assertEqual(node.tpdo[0x2002].raw, 0xf) self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf) + def test_bit_offsets(self): + node = canopen.Node(1, EDS_PATH) + pdo = node.pdo.tx[1] + pdo.add_variable('UNSIGNED8 value', length=4) # byte-aligned, partial byte length + pdo.add_variable('INTEGER8 value') # non-byte-aligned, one whole byte length + pdo.add_variable('UNSIGNED32 value', length=24) # non-aligned, partial last byte + pdo.add_variable('UNSIGNED16 value', length=12) # non-aligned, whole last byte + pdo.add_variable('INTEGER16 value', length=3) # byte-aligned, partial byte length + pdo.add_variable('INTEGER32 value', length=13) # non-aligned, whole last byte + + # Write some values + pdo['UNSIGNED8 value'].raw = 3 + pdo['INTEGER8 value'].raw = -2 + pdo['UNSIGNED32 value'].raw = 0x987654 + pdo['UNSIGNED16 value'].raw = 0x321 + pdo['INTEGER16 value'].raw = -1 + pdo['INTEGER32 value'].raw = -1071 + + # Check expected data + self.assertEqual(pdo.data, b'\xe3\x4f\x65\x87\x19\x32\x8f\xde') + + # Read values from data + self.assertEqual(pdo['UNSIGNED8 value'].raw, 3) + self.assertEqual(pdo['INTEGER8 value'].raw, -2) + self.assertEqual(pdo['UNSIGNED32 value'].raw, 0x987654) + self.assertEqual(pdo['UNSIGNED16 value'].raw, 0x321) + self.assertEqual(pdo['INTEGER16 value'].raw, -1) + self.assertEqual(pdo['INTEGER32 value'].raw, -1071) + def test_save_pdo(self): node = canopen.Node(1, EDS_PATH) node.tpdo.save()