diff --git a/server/enip/defined_tags.py b/server/enip/defined_tags.py new file mode 100644 index 0000000..0a043de --- /dev/null +++ b/server/enip/defined_tags.py @@ -0,0 +1,647 @@ +from __future__ import absolute_import, print_function, division + +import io +import json +import struct + +from ...automata import dfa, state +from ...dotdict import dotdict +from . import device +from . import parser + + +TYPE_IS_STRUCT = 0x8000 +TYPE_IS_ARRAY = 0x2000 + +GET_INSTANCE_ATTRIBUTE_LIST_REQ = 0x55 +GET_INSTANCE_ATTRIBUTE_LIST_RPY = GET_INSTANCE_ATTRIBUTE_LIST_REQ | 0x80 +GET_INSTANCE_ATTRIBUTE_LIST_CTX = "get_instance_attribute_list" +READ_TEMPLATE_REQ = 0x4C +READ_TEMPLATE_RPY = READ_TEMPLATE_REQ | 0x80 + +try: + string_types = (basestring,) +except NameError: + string_types = (str,) + +_registry = None + + +class ControllerTag(object): + def __init__(self, instance_id, name, template_id, size, dimensions=None): + self.instance_id = instance_id + self.name = name + self.template_id = template_id + self.size = size + self.dimensions = (dimensions or [0, 0, 0])[:3] + while len(self.dimensions) < 3: + self.dimensions.append(0) + self.type_code = TYPE_IS_STRUCT | template_id + + +class UdtDefinition(object): + def __init__(self, template_id, name, size, fields): + self.template_id = template_id + self.name = name + self.size = size + self.fields = fields + + +class UdtField(object): + def __init__(self, name, type_code, offset, metadata=0, template_id=None, atomic_type=None, bit=None): + self.name = name + self.type_code = type_code + self.offset = offset + self.metadata = metadata + self.template_id = template_id + self.atomic_type = atomic_type + self.bit = bit + + @property + def is_struct(self): + return (self.type_code & TYPE_IS_STRUCT) != 0 and self.template_id is not None + + @property + def is_array(self): + return (self.type_code & TYPE_IS_ARRAY) != 0 + + +class RawStructParser(parser.STRUCT): + def __init__(self, structure_tag, size): + self.structure_tag = structure_tag + self.struct_calcsize = size + super(RawStructParser, self).__init__(structure_tag=structure_tag) + + +class BackingBuffer(object): + def __init__(self, size): + self.data = bytearray(size) + + def read(self, offset, size): + return bytes(self.data[offset:offset + size]) + + def write(self, offset, payload): + end = offset + len(payload) + assert end <= len(self.data) + self.data[offset:end] = bytearray(payload) + + def read_values(self, offset, count, type_cls): + type_parser = type_cls() + size = type_parser.struct_calcsize + fmt = type_parser.struct_format + return [ + struct.unpack_from(fmt, self.data, offset + (index * size))[0] + for index in range(count) + ] + + def write_values(self, offset, values, type_cls): + type_parser = type_cls() + size = type_parser.struct_calcsize + fmt = type_parser.struct_format + for index, value in enumerate(values): + struct.pack_into(fmt, self.data, offset + (index * size), value) + + def read_bit(self, offset, bit): + return bool(self.data[offset] & (1 << bit)) + + def write_bit(self, offset, bit, value): + if value: + self.data[offset] |= 1 << bit + else: + self.data[offset] &= ~(1 << bit) + + +class StructSliceAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, size, structure_tag): + self.name = name + self.backing = backing + self.offset = offset + self.size = size + self.parser = RawStructParser(structure_tag=structure_tag, size=size) + + def __len__(self): + return 1 + + def __getitem__(self, key): + self._validate_key(key) + record = dotdict() + record.data = dotdict() + record.data.input = self.backing.read(self.offset, self.size) + return [record] if isinstance(key, slice) else record + + def __setitem__(self, key, value): + self._validate_key(key) + payload = self._coerce_payload(value) + self.backing.write(self.offset, payload[:self.size]) + + def _validate_key(self, key): + if isinstance(key, slice): + start, stop, stride = key.indices(1) + if start == 0 and stop == 1 and stride == 1: + return + elif key == 0: + return + raise KeyError("Unsupported STRUCT slice for {0}: {1!r}".format(self.name, key)) + + def _coerce_payload(self, value): + if isinstance(value, list): + value = value[0] + if hasattr(value, "input"): + return bytes(bytearray(value.input)) + if hasattr(value, "data") and hasattr(value.data, "input"): + return bytes(bytearray(value.data.input)) + return bytes(bytearray(value)) + + +class AtomicArrayAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, count, type_cls): + self.name = name + self.backing = backing + self.offset = offset + self.count = count + self.type_cls = type_cls + self.parser = type_cls() + + def __len__(self): + return self.count + + def __getitem__(self, key): + values = self.backing.read_values(self.offset, self.count, self.type_cls) + return values[key] + + def __setitem__(self, key, value): + values = self.backing.read_values(self.offset, self.count, self.type_cls) + if isinstance(key, slice): + values[key] = list(value) + else: + values[key] = value + self.backing.write_values(self.offset, values, self.type_cls) + + +class BoolAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, bit): + self.name = name + self.backing = backing + self.offset = offset + self.bit = bit + self.parser = parser.BOOL() + + def __len__(self): + return 1 + + def __getitem__(self, key): + self._validate_key(key) + value = self.backing.read_bit(self.offset, self.bit) + return [value] if isinstance(key, slice) else value + + def __setitem__(self, key, value): + self._validate_key(key) + if isinstance(key, slice): + value = next(iter(value)) + self.backing.write_bit(self.offset, self.bit, bool(value)) + + def _validate_key(self, key): + if isinstance(key, slice): + start, stop, stride = key.indices(1) + if start == 0 and stop == 1 and stride == 1: + return + elif key == 0: + return + raise KeyError("Unsupported BOOL slice for {0}: {1!r}".format(self.name, key)) + + +class BytesAttribute(object): + error = 0x00 + + def __init__(self, name, payload): + self.name = name + self.payload = list(bytearray(payload)) + self.parser = parser.USINT() + + def __len__(self): + return len(self.payload) + + def __getitem__(self, key): + return self.payload[key] + + def __setitem__(self, key, value): + self.payload[key] = value + + +class DefinedTagsRegistry(object): + def __init__(self, controller_tags, templates): + self.controller_tags = controller_tags + self.templates = templates + + @classmethod + def from_file(cls, path): + with io.open(path, "r", encoding="utf-8") as schema_file: + return cls.from_dict(json.load(schema_file)) + + @classmethod + def from_json(cls, text): + return cls.from_dict(json.loads(text)) + + @classmethod + def from_source(cls, source): + if _looks_like_json_text(source): + return cls.from_json(source) + return cls.from_file(source) + + @classmethod + def from_dict(cls, schema): + templates = _parse_templates(schema["templates"]) + controller_tags = [ + ControllerTag( + _as_int(item.get("id", index + 1)), + item["name"], + _as_int(item.get("templateId", item.get("typeTemplateId"))), + _as_int(item["size"]), + [_as_int(value) for value in item.get("dimensions", [0, 0, 0])], + ) + for index, item in enumerate(schema["controllerTags"]) + ] + return cls(controller_tags, templates) + + def build_tags(self): + tags = dotdict() + for controller_tag in self.controller_tags: + backing = BackingBuffer(controller_tag.size) + root_template = self.templates[controller_tag.template_id] + self._add_struct_tag( + tags, + controller_tag.name, + controller_tag.name, + backing, + 0, + controller_tag.size, + controller_tag.template_id, + ) + self._add_template_members(tags, controller_tag.name, backing, 0, root_template, {}) + + _add_tag(tags, "@tags", BytesAttribute("@tags", self.build_tag_listing_payload())) + for template_id in self.templates: + _add_tag(tags, "@udt/{0}".format(template_id), BytesAttribute( + "@udt/{0}".format(template_id), + self.build_udt_payload(template_id), + )) + return tags + + def build_tag_listing_payload(self): + payload = bytearray() + for controller_tag in self.controller_tags: + payload += _tag_info_record( + controller_tag.instance_id, + controller_tag.type_code, + controller_tag.size, + controller_tag.dimensions, + controller_tag.name, + ) + return bytes(payload) + + def build_udt_payload(self, template_id): + definition = self.templates[int(template_id)] + payload = bytearray() + payload += struct.pack( + "