diff --git a/lewis_emulators/eurotherm/device.py b/lewis_emulators/eurotherm/device.py index d8b2bb6f..a3ff59eb 100644 --- a/lewis_emulators/eurotherm/device.py +++ b/lewis_emulators/eurotherm/device.py @@ -793,6 +793,148 @@ def set_connected(self, addr: str, connected: bool) -> None: euro = self.sensors[addr] euro.connected = connected + def automan(self, addr: str) -> int: + """ + Gets the Auto/Manual status + + Returns: current Automan mode (Auto/Manual) + """ + + euro = self.sensors[addr] + return euro.automan + + def set_automan(self, addr: str, value: int) -> None: + """ + Sets Automan mode + """ + self.log.info(f"Addr: {addr}, Automan: {value}") + self.log.info(f"{addr.__class__.__name__}") + automan = int(value) + if addr is None: + for euro in self.sensors.values(): + euro.automan = automan + else: + euro = self.sensors[addr] + euro.automan = automan + + def lowrange(self, addr: str) -> float: + """ + Get low range limit of the device. + + Returns: the low range limit. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.lowrange + + def hirange(self, addr: str) -> float: + """ + Get high range limit of the device. + + Returns: the high range limit. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.hirange + + def workoutp(self, addr: str) -> float: + """ + Get working output value of the device. + + Returns: the working output value in K. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.workoutp + + def outlowlm(self, addr: str) -> float: + """ + Get low power limit value of the device. + + Returns: the low power limit value. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.outlowlm + + def outpcomp(self, addr: str) -> float: + """ + Get proportional component of output of the device. + + Returns: the proportional component of output. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.outpcomp + + def outicomp(self, addr: str) -> float: + """ + Get integral component of output of the device. + + Returns: the Integral component of output. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.outicomp + + def outdcomp(self, addr: str) -> float: + """ + Get the derivative component of output of the device. + + Returns: the derivative component of output. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.outdcomp + + def snbrkpst(self, addr: str) -> int: + """ + Get the sensor break status of the device. + + Returns: the sensor break status. + """ + self._delay() + + euro = self.sensors[addr] + if not euro.connected: + raise ValueError + return euro.snbrkpst + + def set_snbrkpst(self, addr: str, value: int) -> None: + """ + Sets snbrkpst status + """ + self.log.info(f"Addr: {addr}, snbrkpst: {value}") + self.log.info(f"{addr.__class__.__name__}") + snbrkpst = int(value) + if addr is None: + for euro in self.sensors.values(): + euro.snbrkpst = snbrkpst + else: + euro = self.sensors[addr] + euro.snbrkpst = snbrkpst + class EurothermSensor: """ Eurotherm temperature sensor method @@ -827,3 +969,12 @@ def __init__(self) -> None: self.needlevalve_flow_sp_mode = 0 self.needlevalve_direction = 0 self.needlevalve_stop = 0 + self.automan = 0 + self.lowrange = 0.0 + self.hirange = 0.0 + self.workoutp = 0.0 + self.outlowlm = 0.0 + self.outpcomp = 0.0 + self.outicomp = 0.0 + self.outdcomp = 0.0 + self.snbrkpst = 0 diff --git a/lewis_emulators/eurotherm/interfaces/modbus_interface.py b/lewis_emulators/eurotherm/interfaces/modbus_interface.py index 27198627..f345700c 100644 --- a/lewis_emulators/eurotherm/interfaces/modbus_interface.py +++ b/lewis_emulators/eurotherm/interfaces/modbus_interface.py @@ -95,6 +95,15 @@ def __init__(self) -> None: 4963: self.get_nv_min_auto_flow_bl_temp, 4965: self.get_nv_auto_flow_scale, 1292: self.get_nv_stop, + 273: self.get_automan, + 11: self.get_lowrange, + 12: self.get_hirange, + 4: self.get_workoutp, + 31: self.get_outlowlm, + 214: self.get_outpcomp, + 55: self.get_outicomp, + 116: self.get_outdcomp, + 258: self.get_snbrkpst, } self.write_commands = { @@ -112,6 +121,7 @@ def __init__(self) -> None: 4963: self.set_nv_min_auto_flow_bl_temp, 4965: self.set_nv_auto_flow_scale, 1292: self.set_nv_stop, + 273: self.set_automan, } in_terminator = "" @@ -275,3 +285,33 @@ def set_nv_stop(self, value: int) -> None: def get_nv_stop(self) -> int: return int(self.device.needlevalve_stop(sensor)) + + def get_automan(self) -> bool: + return bool(self.device.automan(sensor)) + + def set_automan(self, value: bool) -> None: + self.device.set_automan(sensor, value) + + def get_lowrange(self) -> int: + return int(self.device.lowrange(sensor)) + + def get_hirange(self) -> int: + return int(self.device.hirange(sensor)) + + def get_workoutp(self) -> int: + return int(self.device.workoutp(sensor)) + + def get_outlowlm(self) -> int: + return int(self.device.outlowlm(sensor)) + + def get_outpcomp(self) -> int: + return int(self.device.outpcomp(sensor)) + + def get_outicomp(self) -> int: + return int(self.device.outicomp(sensor)) + + def get_outdcomp(self) -> int: + return int(self.device.outdcomp(sensor)) + + def get_snbrkpst(self) -> bool: + return bool(self.device.snbrkpst(sensor)) diff --git a/lewis_emulators/eurotherm/interfaces/stream_interface.py b/lewis_emulators/eurotherm/interfaces/stream_interface.py index 201c1027..7ebc613e 100644 --- a/lewis_emulators/eurotherm/interfaces/stream_interface.py +++ b/lewis_emulators/eurotherm/interfaces/stream_interface.py @@ -75,6 +75,24 @@ def __init__(self) -> None: .etx() .any() .build(), + CmdBuilder("get_automan").eot().arg("[0-9]{4}").escape("mA").enq().build(), + CmdBuilder("set_automan") + .eot() + .arg("[0-9]{4}") + .stx() + .escape("mA") + .int() + .etx() + .any() + .build(), + CmdBuilder("get_lowrange").eot().arg("[0-9]{4}").escape("QC").enq().build(), + CmdBuilder("get_hirange").eot().arg("[0-9]{4}").escape("QB").enq().build(), + CmdBuilder("get_workoutp").eot().arg("[0-9]{4}").escape("WO").enq().build(), + CmdBuilder("get_outlowlm").eot().arg("[0-9]{4}").escape("LO").enq().build(), + CmdBuilder("get_outpcomp").eot().arg("[0-9]{4}").escape("Xp").enq().build(), + CmdBuilder("get_outicomp").eot().arg("[0-9]{4}").escape("xI").enq().build(), + CmdBuilder("get_outdcomp").eot().arg("[0-9]{4}").escape("xD").enq().build(), + CmdBuilder("get_snbrkpst").eot().arg("[0-9]{4}").escape("sb").enq().build(), } # Add terminating characters manually for each command, @@ -323,3 +341,128 @@ def get_error(self, addr: str) -> str: """ reply = "\x02EE>0x{}\x03".format(self.device.error(addr)) return f"{reply}{self.make_checksum(reply[1:])}" + + @if_connected + @translate_adddress + def get_automan(self, addr: str) -> str | None: + """Get the automan mode of the Eurotherm sensor. + + Returns: the automan mode formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("mA", self.device.automan(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def set_automan(self, addr: str, automan: int, _: str) -> str | None: + """Set the automan flag. + + Args: + automan: automan lfag to be set as int. + _: argument captured by the command. + addr: address of the Eurotherm sensor. + + """ + try: + self.device.set_automan(addr, automan) + return "\x06" + except Exception: + return None + + @if_connected + @translate_adddress + def get_lowrange(self, addr: str) -> str | None: + """Get the low range limit value of the Eurotherm sensor. + + Returns: the low range limit formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("QC", self.device.lowrange(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_hirange(self, addr: str) -> str | None: + """Get the high range limit value of the Eurotherm sensor. + + Returns: the high range limit value formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("QB", self.device.hirange(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_workoutp(self, addr: str) -> str | None: + """Get the working output value of the Eurotherm sensor. + + Returns: the working output value formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("WO", self.device.workoutp(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_outlowlm(self, addr: str) -> str | None: + """Get the low power limit value of the Eurotherm sensor. + + Returns: the low power limit value formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("LO", self.device.outlowlm(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_outpcomp(self, addr: str) -> str | None: + """Get the proportional component of output of the Eurotherm sensor. + + Returns: the proportional component of output formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("Xp", self.device.outpcomp(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_outicomp(self, addr: str) -> str | None: + """Get the integral component of output of the Eurotherm sensor. + + Returns: the integral component of output formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("xI", self.device.outicomp(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_outdcomp(self, addr: str) -> str | None: + """Get the derivative component of output of the Eurotherm sensor. + + Returns: the derivative component of output formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("xD", self.device.outdcomp(addr)) + except Exception: + return None + + @if_connected + @translate_adddress + def get_snbrkpst(self, addr: str) -> str | None: + """Get the sensor break status of the Eurotherm sensor. + + Returns: the sensor break status formatted like the Eurotherm protocol. + """ + try: + return self.make_read_reply("sb", self.device.snbrkpst(addr)) + except Exception: + return None