Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added writeSysMemory to GE_SRTP.py #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion lib/GE_SRTP.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,38 @@ def closeConnection(self):
except Exception as err:
print("Exception:" + str(err))

###########################################################
# Generates command string to send to PLC for writing
# registers. Expects format of R#### etc.
# Returns: Bytearray for sending via socket.
###########################################################
def writeSysMemory(self, reg, val):
if not re.search('%*(R|AI|AQ|I|Q)\d+',reg):
raise Exception("Invalid Register Address! (" + reg + ").")

try:
# Make copy of generic base message
tmp = GE_SRTP_Messages.BASE_MSG.copy()
tmp[42] = GE_SRTP_Messages.SERVICE_REQUEST_CODE["WRITE_SYS_MEMORY"]
# Update for type of register
tmp[43] = GE_SRTP_Messages.MEMORY_TYPE_CODE[re.search('(R|AI|AQ|I|Q)',reg)[0]]
# 0 based register to read
address = int(re.search('\d+',reg)[0]) - 1
tmp[44] = int(address & 255).to_bytes(1,byteorder='big') # Get LSB of Word
tmp[45] = int(address >> 8 ).to_bytes(1,byteorder='big') # Get MSB of Word
# Update for width
tmp[46] = b'\x01' # DANGER - TODO make dynamic for different data types.
tmp[47] = b'\x00'
# Value
tmp[48] = int(int(val) & 255).to_bytes(1,byteorder='big') # Get LSB of Word
tmp[49] = int(int(val) >> 8 ).to_bytes(1,byteorder='big') # Get MSB of Word
bytes_to_send = b''.join(tmp)
# Send to PLC, Read Response back as memory value (string type)
response = self.sendSocketCommand(bytes_to_send)
return(GeSrtpResponse(response))
except Exception as err:
print(err)
raise Exception("Failure reading PLC or Register from PLC. Abort")

###########################################################
# Generates command string to send to PLC for reading
Expand Down Expand Up @@ -236,4 +267,4 @@ def printLimitedBin(self, pre_msg, msg, start=7, end=7):
if iter_len - idx <= end:
hex_resp_end += " {:02x}".format(x)

print("{}{}...{} (Only Displaying {} of {} bytes)".format(pre_msg, hex_resp_start, hex_resp_end, start + end, len(msg)))
print("{}{}...{} (Only Displaying {} of {} bytes)".format(pre_msg, hex_resp_start, hex_resp_end, start + end, len(msg)))