Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions lewis_emulators/eurotherm/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,148 @@ def set_connected(self, addr: str, connected: bool) -> None:
euro = self.sensors[addr]
euro.connected = connected

def automan(self, addr: str) -> int:
"""
Gets the Auto/Manual status

Returns: current Automan mode (Auto/Manual)
"""

euro = self.sensors[addr]
return euro.automan

def set_automan(self, addr: str, value: int) -> None:
"""
Sets Automan mode
"""
self.log.info(f"Addr: {addr}, Automan: {value}")
self.log.info(f"{addr.__class__.__name__}")
automan = int(value)
if addr is None:
for euro in self.sensors.values():
euro.automan = automan
else:
euro = self.sensors[addr]
euro.automan = automan

def lowrange(self, addr: str) -> float:
"""
Get low range limit of the device.

Returns: the low range limit.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.lowrange

def hirange(self, addr: str) -> float:
"""
Get high range limit of the device.

Returns: the high range limit.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.hirange

def workoutp(self, addr: str) -> float:
"""
Get working output value of the device.

Returns: the working output value in K.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.workoutp

def outlowlm(self, addr: str) -> float:
"""
Get low power limit value of the device.

Returns: the low power limit value.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.outlowlm

def outpcomp(self, addr: str) -> float:
"""
Get proportional component of output of the device.

Returns: the proportional component of output.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.outpcomp

def outicomp(self, addr: str) -> float:
"""
Get integral component of output of the device.

Returns: the Integral component of output.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.outicomp

def outdcomp(self, addr: str) -> float:
"""
Get the derivative component of output of the device.

Returns: the derivative component of output.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.outdcomp

def snbrkpst(self, addr: str) -> int:
"""
Get the sensor break status of the device.

Returns: the sensor break status.
"""
self._delay()

euro = self.sensors[addr]
if not euro.connected:
raise ValueError
return euro.snbrkpst

def set_snbrkpst(self, addr: str, value: int) -> None:
"""
Sets snbrkpst status
"""
self.log.info(f"Addr: {addr}, snbrkpst: {value}")
self.log.info(f"{addr.__class__.__name__}")
snbrkpst = int(value)
if addr is None:
for euro in self.sensors.values():
euro.snbrkpst = snbrkpst
else:
euro = self.sensors[addr]
euro.snbrkpst = snbrkpst

class EurothermSensor:
"""
Eurotherm temperature sensor method
Expand Down Expand Up @@ -827,3 +969,12 @@ def __init__(self) -> None:
self.needlevalve_flow_sp_mode = 0
self.needlevalve_direction = 0
self.needlevalve_stop = 0
self.automan = 0
self.lowrange = 0.0
self.hirange = 0.0
self.workoutp = 0.0
self.outlowlm = 0.0
self.outpcomp = 0.0
self.outicomp = 0.0
self.outdcomp = 0.0
self.snbrkpst = 0
40 changes: 40 additions & 0 deletions lewis_emulators/eurotherm/interfaces/modbus_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ def __init__(self) -> None:
4963: self.get_nv_min_auto_flow_bl_temp,
4965: self.get_nv_auto_flow_scale,
1292: self.get_nv_stop,
273: self.get_automan,
11: self.get_lowrange,
12: self.get_hirange,
4: self.get_workoutp,
31: self.get_outlowlm,
214: self.get_outpcomp,
55: self.get_outicomp,
116: self.get_outdcomp,
258: self.get_snbrkpst,
}

self.write_commands = {
Expand All @@ -112,6 +121,7 @@ def __init__(self) -> None:
4963: self.set_nv_min_auto_flow_bl_temp,
4965: self.set_nv_auto_flow_scale,
1292: self.set_nv_stop,
273: self.set_automan,
}

in_terminator = ""
Expand Down Expand Up @@ -275,3 +285,33 @@ def set_nv_stop(self, value: int) -> None:

def get_nv_stop(self) -> int:
return int(self.device.needlevalve_stop(sensor))

def get_automan(self) -> bool:
return bool(self.device.automan(sensor))

def set_automan(self, value: bool) -> None:
self.device.set_automan(sensor, value)

def get_lowrange(self) -> int:
return int(self.device.lowrange(sensor))

def get_hirange(self) -> int:
return int(self.device.hirange(sensor))

def get_workoutp(self) -> int:
return int(self.device.workoutp(sensor))

def get_outlowlm(self) -> int:
return int(self.device.outlowlm(sensor))

def get_outpcomp(self) -> int:
return int(self.device.outpcomp(sensor))

def get_outicomp(self) -> int:
return int(self.device.outicomp(sensor))

def get_outdcomp(self) -> int:
return int(self.device.outdcomp(sensor))

def get_snbrkpst(self) -> bool:
return bool(self.device.snbrkpst(sensor))
143 changes: 143 additions & 0 deletions lewis_emulators/eurotherm/interfaces/stream_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ def __init__(self) -> None:
.etx()
.any()
.build(),
CmdBuilder("get_automan").eot().arg("[0-9]{4}").escape("mA").enq().build(),
CmdBuilder("set_automan")
.eot()
.arg("[0-9]{4}")
.stx()
.escape("mA")
.int()
.etx()
.any()
.build(),
CmdBuilder("get_lowrange").eot().arg("[0-9]{4}").escape("QC").enq().build(),
CmdBuilder("get_hirange").eot().arg("[0-9]{4}").escape("QB").enq().build(),
CmdBuilder("get_workoutp").eot().arg("[0-9]{4}").escape("WO").enq().build(),
CmdBuilder("get_outlowlm").eot().arg("[0-9]{4}").escape("LO").enq().build(),
CmdBuilder("get_outpcomp").eot().arg("[0-9]{4}").escape("Xp").enq().build(),
CmdBuilder("get_outicomp").eot().arg("[0-9]{4}").escape("xI").enq().build(),
CmdBuilder("get_outdcomp").eot().arg("[0-9]{4}").escape("xD").enq().build(),
CmdBuilder("get_snbrkpst").eot().arg("[0-9]{4}").escape("sb").enq().build(),
}

# Add terminating characters manually for each command,
Expand Down Expand Up @@ -323,3 +341,128 @@ def get_error(self, addr: str) -> str:
"""
reply = "\x02EE>0x{}\x03".format(self.device.error(addr))
return f"{reply}{self.make_checksum(reply[1:])}"

@if_connected
@translate_adddress
def get_automan(self, addr: str) -> str | None:
"""Get the automan mode of the Eurotherm sensor.

Returns: the automan mode formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("mA", self.device.automan(addr))
except Exception:
return None

@if_connected
@translate_adddress
def set_automan(self, addr: str, automan: int, _: str) -> str | None:
"""Set the automan flag.

Args:
automan: automan lfag to be set as int.
_: argument captured by the command.
addr: address of the Eurotherm sensor.

"""
try:
self.device.set_automan(addr, automan)
return "\x06"
except Exception:
return None

@if_connected
@translate_adddress
def get_lowrange(self, addr: str) -> str | None:
"""Get the low range limit value of the Eurotherm sensor.

Returns: the low range limit formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("QC", self.device.lowrange(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_hirange(self, addr: str) -> str | None:
"""Get the high range limit value of the Eurotherm sensor.

Returns: the high range limit value formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("QB", self.device.hirange(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_workoutp(self, addr: str) -> str | None:
"""Get the working output value of the Eurotherm sensor.

Returns: the working output value formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("WO", self.device.workoutp(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_outlowlm(self, addr: str) -> str | None:
"""Get the low power limit value of the Eurotherm sensor.

Returns: the low power limit value formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("LO", self.device.outlowlm(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_outpcomp(self, addr: str) -> str | None:
"""Get the proportional component of output of the Eurotherm sensor.

Returns: the proportional component of output formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("Xp", self.device.outpcomp(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_outicomp(self, addr: str) -> str | None:
"""Get the integral component of output of the Eurotherm sensor.

Returns: the integral component of output formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("xI", self.device.outicomp(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_outdcomp(self, addr: str) -> str | None:
"""Get the derivative component of output of the Eurotherm sensor.

Returns: the derivative component of output formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("xD", self.device.outdcomp(addr))
except Exception:
return None

@if_connected
@translate_adddress
def get_snbrkpst(self, addr: str) -> str | None:
"""Get the sensor break status of the Eurotherm sensor.

Returns: the sensor break status formatted like the Eurotherm protocol.
"""
try:
return self.make_read_reply("sb", self.device.snbrkpst(addr))
except Exception:
return None
Loading