Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions canopen/objectdictionary/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,11 @@ def import_from_node(node_id: int, network: canopen.network.Network):


def _calc_bit_length(data_type: int) -> int:
if data_type == datatypes.INTEGER8:
return 8
elif data_type == datatypes.INTEGER16:
return 16
elif data_type == datatypes.INTEGER32:
return 32
elif data_type == datatypes.INTEGER64:
return 64
if data_type in datatypes.INTEGER_TYPES:
st = ODVariable.STRUCT_TYPES[data_type]
if isinstance(st, datatypes.IntegerN):
return st.width
return st.size * 8
else:
raise ValueError(
f"Invalid data_type 0x{data_type:04X}, expecting an integer data_type."
Expand All @@ -221,13 +218,27 @@ def _calc_bit_length(data_type: int) -> int:

def _signed_int_from_hex(hex_str, bit_length):
number = int(hex_str, 0)
max_value = (1 << (bit_length - 1)) - 1
min_signed = -(1 << (bit_length - 1))
max_signed = (1 << (bit_length - 1)) - 1
max_unsigned = (1 << bit_length) - 1

if number > max_value:
return number - (1 << bit_length)
else:
if number < min_signed:
raise ValueError(
f"Value {hex_str!r} is out of range for a {bit_length}-bit signed integer"
)
if number < 0:
# Negative literal (e.g. LowLimit=-32768 or -0x8000)
return number

if number > max_unsigned:
raise ValueError(
f"Value {hex_str!r} is out of range for a {bit_length}-bit signed integer"
)
if number > max_signed:
# Unsigned hex literal, two's-complement (e.g. LowLimit=0xFFFF → -1 for INTEGER16)
return number - (1 << bit_length)
return number


def _convert_variable(node_id: int, var_type: int, value: Any) -> Any:
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
Expand All @@ -248,14 +259,14 @@ def _convert_variable(node_id: int, var_type: int, value: Any) -> Any:
def _revert_variable(var_type: int, value: Any) -> Any:
if value is None:
return None
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN) and isinstance(value, bytes):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was really confused why this is necessary to avoid a test failure now. Here's my analysis:

  1. The sample.eds has object 2020 with DataType=0x40 and LowLimit=0x3.
  2. build_variable() sees it's an unknown data type without a corresponding DefaultValue in a section 40sub1.
  3. The data_type is forced to DOMAIN.
  4. var.min is correctly parsed from the LowLimit to an int(3).
  5. During export, export_variable now tries to pass the int to _revert_variable().
  6. For a DOMAIN variable, the value is expected to be of type bytes, so the string conversion via bytes.hex() fails with the integer argument.

Why exactly do we need to call _revert_variable() at all on the limit values? We didn't do that before.

This function is terribly named by the way, I cannot even imagine whether it should be applied to the limits (which are always numeric). But as it mirrors _convert_variable(), one should assume that it requires bytes for a DOMAIN variable unconditionally. Simply skipping to a str() conversion seems wrong.

return bytes.hex(value)
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
return value
elif var_type in datatypes.FLOAT_TYPES:
return value
else:
return f"0x{value:02X}"
return str(value)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return str(value)
return f"0x{value:02X}"



def build_variable(
Expand Down Expand Up @@ -309,7 +320,10 @@ def build_variable(
else:
var.min = int(min_string, 0)
except ValueError:
pass
logger.warning(
"Invalid LowLimit %r for %s (0x%X), ignoring",
eds.get(section, "LowLimit"), var.name, var.index,
Comment on lines +324 to +325
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Invalid LowLimit %r for %s (0x%X), ignoring",
eds.get(section, "LowLimit"), var.name, var.index,
"Invalid LowLimit %r for %s (0x%X), ignoring", min_string, var.name, var.index

)
if eds.has_option(section, "HighLimit"):
try:
max_string = eds.get(section, "HighLimit")
Expand All @@ -318,38 +332,44 @@ def build_variable(
else:
var.max = int(max_string, 0)
except ValueError:
pass
logger.warning(
"Invalid HighLimit %r for %s (0x%X), ignoring",
eds.get(section, "HighLimit"), var.name, var.index,
Comment on lines +336 to +337
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Invalid HighLimit %r for %s (0x%X), ignoring",
eds.get(section, "HighLimit"), var.name, var.index,
"Invalid HighLimit %r for %s (0x%X), ignoring", max_string, var.name, var.index

)
if eds.has_option(section, "DefaultValue"):
try:
var.default_raw = eds.get(section, "DefaultValue")
if '$NODEID' in var.default_raw:
var.relative = True
var.default = _convert_variable(node_id, var.data_type, var.default_raw)
except ValueError:
pass
logger.warning(
"Invalid DefaultValue %r for %s (0x%X), ignoring",
eds.get(section, "DefaultValue"), var.name, var.index,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eds.get(section, "DefaultValue"), var.name, var.index,
var.default_raw, var.name, var.index,

)
if eds.has_option(section, "ParameterValue"):
try:
var.value_raw = eds.get(section, "ParameterValue")
var.value = _convert_variable(node_id, var.data_type, var.value_raw)
except ValueError:
pass
logger.warning(
"Invalid ParameterValue %r for %s (0x%X), ignoring",
eds.get(section, "ParameterValue"), var.name, var.index,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eds.get(section, "ParameterValue"), var.name, var.index,
var.value_raw, var.name, var.index,

)
# Factor, Description and Unit are not standard according to the CANopen specifications, but
# they are implemented in the python canopen package, so we can at least try to use them
if eds.has_option(section, "Factor"):
try:
var.factor = float(eds.get(section, "Factor"))
except ValueError:
pass
logger.warning(
"Invalid Factor %r for %s (0x%X), ignoring",
eds.get(section, "Factor"), var.name, var.index,
)
if eds.has_option(section, "Description"):
try:
var.description = eds.get(section, "Description")
except ValueError:
pass
var.description = eds.get(section, "Description")
if eds.has_option(section, "Unit"):
try:
var.unit = eds.get(section, "Unit")
except ValueError:
pass
var.unit = eds.get(section, "Unit")
return var


Expand Down Expand Up @@ -414,9 +434,9 @@ def export_variable(var, eds):
eds.set(section, "PDOMapping", hex(var.pdo_mappable))

if getattr(var, 'min', None) is not None:
eds.set(section, "LowLimit", var.min)
eds.set(section, "LowLimit", _revert_variable(var.data_type, var.min))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eds.set(section, "LowLimit", _revert_variable(var.data_type, var.min))
eds.set(section, "LowLimit", var.min)

if getattr(var, 'max', None) is not None:
eds.set(section, "HighLimit", var.max)
eds.set(section, "HighLimit", _revert_variable(var.data_type, var.max))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eds.set(section, "HighLimit", _revert_variable(var.data_type, var.max))
eds.set(section, "HighLimit", var.max)


if getattr(var, 'description', '') != '':
eds.set(section, "Description", var.description)
Expand Down
54 changes: 54 additions & 0 deletions test/sample.eds
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,24 @@ HighLimit=0x0A
LowLimit=0x02
PDOMapping=0

[3022]
ParameterName=UNSIGNED16 decimal limits
ObjectType=0x7
DataType=0x06
AccessType=rw
LowLimit=100
HighLimit=1000
PDOMapping=0

[3023]
ParameterName=INTEGER16 decimal limits
ObjectType=0x7
DataType=0x03
AccessType=rw
LowLimit=-100
HighLimit=100
PDOMapping=0

[3030]
ParameterName=INTEGER32 only negative values
ObjectType=0x7
Expand All @@ -976,6 +994,42 @@ HighLimit=0xFFFFFFFF
LowLimit=0x80000000
PDOMapping=0

[3031]
ParameterName=INTEGER24 value range -1 to 0
ObjectType=0x7
DataType=0x10
AccessType=rw
HighLimit=0x000000
LowLimit=0xFFFFFF
PDOMapping=0

[3032]
ParameterName=INTEGER40 value range -1 to 0
ObjectType=0x7
DataType=0x12
AccessType=rw
HighLimit=0x0000000000
LowLimit=0xFFFFFFFFFF
PDOMapping=0

[3033]
ParameterName=INTEGER48 value range -1 to 0
ObjectType=0x7
DataType=0x13
AccessType=rw
HighLimit=0x000000000000
LowLimit=0xFFFFFFFFFFFF
PDOMapping=0

[3034]
ParameterName=INTEGER56 value range -1 to 0
ObjectType=0x7
DataType=0x14
AccessType=rw
HighLimit=0x00000000000000
LowLimit=0xFFFFFFFFFFFFFF
PDOMapping=0

[3040]
ParameterName=INTEGER64 value range -10 to +10
ObjectType=0x7
Expand Down
84 changes: 62 additions & 22 deletions test/test_eds.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import io
import os
import unittest
from configparser import RawConfigParser

import canopen
from canopen.objectdictionary.eds import _signed_int_from_hex
from canopen.objectdictionary.eds import _signed_int_from_hex, build_variable
from canopen.utils import pretty_index

from .util import DATATYPES_EDS, SAMPLE_EDS, tmp_file
Expand Down Expand Up @@ -70,8 +72,6 @@ def test_load_implicit_nodeid(self):
self.assertEqual(od.node_id, 16)

def test_load_implicit_nodeid_fallback(self):
import io

# First, remove the NodeID option from DeviceComissioning.
with open(SAMPLE_EDS) as f:
lines = [L for L in f.readlines() if not L.startswith("NodeID=")]
Expand All @@ -93,8 +93,6 @@ def test_load_baudrate(self):
self.assertEqual(od.bitrate, 500_000)

def test_load_baudrate_fallback(self):
import io

# Remove the Baudrate option.
with open(SAMPLE_EDS) as f:
lines = [L for L in f.readlines() if not L.startswith("Baudrate=")]
Expand Down Expand Up @@ -136,18 +134,23 @@ def test_record(self):
self.assertFalse(var.is_domain)

def test_record_with_limits(self):
int8 = self.od[0x3020]
self.assertEqual(int8.min, 0)
self.assertEqual(int8.max, 127)
uint8 = self.od[0x3021]
self.assertEqual(uint8.min, 2)
self.assertEqual(uint8.max, 10)
int32 = self.od[0x3030]
self.assertEqual(int32.min, -2147483648)
self.assertEqual(int32.max, -1)
int64 = self.od[0x3040]
self.assertEqual(int64.min, -10)
self.assertEqual(int64.max, +10)
cases = [
(0x3020, 0, 127), # _ INTEGER8 hex limits
(0x3021, 2, 10), # _ UNSIGNED8 hex limits
(0x3022, 100, 1000), # _ UNSIGNED16 decimal limits
(0x3023, -100, 100), # _ INTEGER16 decimal limits
(0x3030, -2147483648, -1), # _ INTEGER32 hex limits
(0x3031, -1, 0), # _ INTEGER24 hex limits
(0x3032, -1, 0), # _ INTEGER40 hex limits
(0x3033, -1, 0), # _ INTEGER48 hex limits
(0x3034, -1, 0), # _ INTEGER56 hex limits
(0x3040, -10, +10), # _ INTEGER64 hex limits
]
for index, expected_min, expected_max in cases:
with self.subTest(index=f"0x{index:04X}"):
var = self.od[index]
self.assertEqual(var.min, expected_min)
self.assertEqual(var.max, expected_max)

def test_signed_int_from_hex(self):
for data_type, test_cases in self.test_data.items():
Expand All @@ -156,6 +159,37 @@ def test_signed_int_from_hex(self):
result = _signed_int_from_hex('0x' + test_case["hex_str"], test_case["bit_length"])
self.assertEqual(result, test_case["expected"])

def test_signed_int_from_hex_accepts_decimal(self):
# Negative decimal values are valid EDS literals (CiA 306 allows both formats).
self.assertEqual(_signed_int_from_hex("-1", 8), -1)
self.assertEqual(_signed_int_from_hex("-128", 8), -128)
self.assertEqual(_signed_int_from_hex("-2147483648", 32), -2147483648)

def test_signed_int_from_hex_rejects_out_of_range(self):
with self.assertRaises(ValueError):
_signed_int_from_hex("0xFFFF", 8) # 16-bit value into 8-bit field
with self.assertRaises(ValueError):
_signed_int_from_hex("-129", 8) # below minimum for 8-bit signed

def test_build_variable_range_warnings(self):
eds = RawConfigParser()
cases = [
("2003", "LowLimit", str(-0xFFFF)), # INTEGER16 < signed min
("2003", "HighLimit", "0x10000"), # INTEGER16 > unsigned max
("2001", "DefaultValue", "SOMETHING"), # BOOLEAN non-numeric
("2003", "DefaultValue", "SOMETHING"), # INTEGER16 non-numeric
("2006", "ParameterValue", ""), # UNSIGNED16 empty
]
for index, option, value in cases:
with self.subTest(index=index, option=option, value=value):
# Fresh version for mutating temporarily
eds.clear()
eds.read(DATATYPES_EDS)
eds[index][option] = value
with self.assertLogs(level="WARN") as cm:
build_variable(eds, index, node_id=42, object_type=7, index=int(index, 16))
self.assertRegex(cm.output[0], option)

def test_array_compact_subobj(self):
array = self.od[0x1003]
self.assertIsInstance(array, canopen.objectdictionary.ODArray)
Expand Down Expand Up @@ -240,7 +274,6 @@ def test_read_domain_subobject(self):

def test_roundtrip_domain_objects(self):
# ObjectType==DOMAIN survive an EDS export/import round-trip
import io
with io.StringIO() as dest:
canopen.export_od(self.od, dest, 'eds')
dest.name = 'mock.eds'
Expand All @@ -251,6 +284,17 @@ def test_roundtrip_domain_objects(self):
self.assertTrue(od2[0x3063].is_domain)
self.assertTrue(od2[0x3064][1].is_domain)

def test_export_without_raw_default_values(self):
od = canopen.import_od(DATATYPES_EDS)
# Make sure the values are not cached in raw form
for var in od.values():
try:
delattr(var, 'default_raw')
except AttributeError:
pass
with io.StringIO() as dest:
canopen.export_od(od, dest, 'eds')


def test_comments(self):
self.assertEqual(self.od.comments,
Expand All @@ -271,7 +315,6 @@ def test_export_eds_to_file(self):
self.verify_od(dest, doctype)

def test_export_eds_to_file_unknown_extension(self):
import io
for suffix in ".txt", "":
with tmp_file(suffix=suffix) as tmp:
dest = tmp.name
Expand All @@ -294,7 +337,6 @@ def test_export_eds_to_file_unknown_extension(self):
self.verify_od(buf, "eds")

def test_export_eds_unknown_doctype(self):
import io
filelike_object = io.StringIO()
self.addCleanup(filelike_object.close)
for dest in "filename", None, filelike_object:
Expand All @@ -307,7 +349,6 @@ def test_export_eds_unknown_doctype(self):
os.stat(dest)

def test_export_eds_to_filelike_object(self):
import io
for doctype in "eds", "dcf":
with io.StringIO() as dest:
with self.subTest(dest=dest, doctype=doctype):
Expand All @@ -321,7 +362,6 @@ def test_export_eds_to_filelike_object(self):

def test_export_eds_to_stdout(self):
import contextlib
import io
with contextlib.redirect_stdout(io.StringIO()) as f:
ret = canopen.export_od(self.od, None, "eds")
self.assertIsNone(ret)
Expand Down
Loading