Skip to content

Commit

Permalink
Fix decoder for new yaml format.
Browse files Browse the repository at this point in the history
  • Loading branch information
yozik04 committed Nov 23, 2023
1 parent df1e3d3 commit e663744
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions paradox/console_scripts/ip150_connection_decrypt.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
#!/usr/bin/env python3
import argparse
import binascii
import traceback
from collections import OrderedDict
import re
import traceback

import yaml

from paradox.connections.ip.parsers import (IPMessageCommand, IPMessageRequest,
IPMessageResponse, IPMessageType,
IPPayloadConnectResponse)
from paradox.connections.ip.parsers import (
IPMessageCommand,
IPMessageRequest,
IPMessageResponse,
IPMessageType,
IPPayloadConnectResponse,
)
from paradox.hardware import create_panel
from paradox.hardware.parsers import InitiateCommunicationResponse


class Colors: # You may need to change color settings
RED = '\033[31m'
ENDC = '\033[m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
ON_WHITE = '\033[47m'
class Colors: # You may need to change color settings
RED = "\033[31m"
ENDC = "\033[m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
ON_WHITE = "\033[47m"


def ordered_load(stream, Loader=yaml.loader.SafeLoader, object_pairs_hook=OrderedDict):
Expand Down Expand Up @@ -57,11 +62,18 @@ def parse(self, parsed):
traceback.print_exc()

def _parse_ip_response(self, parsed):
if parsed.header.command == IPMessageCommand.connect and parsed.header.sub_command == 0:
print(f"{Colors.ON_WHITE}{IPPayloadConnectResponse.parse(parsed.payload)}{Colors.ENDC}")
if (
parsed.header.command == IPMessageCommand.connect
and parsed.header.sub_command == 0
):
print(
f"{Colors.ON_WHITE}{IPPayloadConnectResponse.parse(parsed.payload)}{Colors.ENDC}"
)
if len(parsed.payload) > 25:
p = parsed.payload[25:62]
print(f"{Colors.ON_WHITE}InitiateCommunicationResponse:\n{InitiateCommunicationResponse.parse(p)}{Colors.ENDC}")
print(
f"{Colors.ON_WHITE}InitiateCommunicationResponse:\n{InitiateCommunicationResponse.parse(p)}{Colors.ENDC}"
)
elif parsed.header.command == IPMessageCommand.multicommand:
self._parse_multicommand(parsed, "frompanel")
else:
Expand All @@ -74,7 +86,7 @@ def _parse_multicommand(self, parsed, direction):
self._parse_generic_multicommand(parsed, direction)
elif parsed.header.sub_command == 0x1: # MGSP multiread
self._parse_generic_mgsp_multiread(parsed, direction)
elif parsed.header.sub_command == 0xdd: # Unified multiread
elif parsed.header.sub_command == 0xDD: # Unified multiread
pass

def _parse_serial_passthrough_response(self, parsed):
Expand All @@ -95,7 +107,9 @@ def _parse_serial_passthrough_eeprom_read(self, parsed_payload):
ram_address = parsed_payload.fields.value.address
ram_parser = self.panel.get_message("RAMDataParserMap").get(ram_address)
if ram_parser is not None:
print(f"{Colors.ON_WHITE}{ram_parser.parse(parsed_payload.fields.value.data)}{Colors.ENDC}")
print(
f"{Colors.ON_WHITE}{ram_parser.parse(parsed_payload.fields.value.data)}{Colors.ENDC}"
)
else:
print(
f"{Colors.RED}No parser for {ram_address} ram address, data: {binascii.hexlify(parsed_payload.fields.value.data)}{Colors.ENDC}"
Expand All @@ -105,14 +119,16 @@ def _parse_ip_request(self, parsed):
if parsed.header.command == IPMessageCommand.multicommand:
self._parse_multicommand(parsed, "topanel")
else:
print(f"{Colors.RED}No parser for ip_request payload: {binascii.hexlify(parsed.payload)}{Colors.ENDC}")
print(
f"{Colors.RED}No parser for ip_request payload: {binascii.hexlify(parsed.payload)}{Colors.ENDC}"
)

def _parse_generic_multicommand(self, parsed, direction: str):
i = 0
while i < len(parsed.payload):
cmd_len = parsed.payload[i]
i += 1
cmd = parsed.payload[i:i + cmd_len]
cmd = parsed.payload[i : i + cmd_len]
assert len(cmd) == cmd_len
i += cmd_len
print(f"{Colors.ON_WHITE}Multicommand: {cmd}{Colors.ENDC}")
Expand All @@ -130,19 +146,37 @@ def _parse_message(self, message, direction):
return parsed_payload

def _parse_generic_mgsp_multiread(self, parsed, direction):
print(
f"{Colors.RED}No parser for {direction} mgsp_multiread{Colors.ENDC}"
)
print(f"{Colors.RED}No parser for {direction} mgsp_multiread{Colors.ENDC}")


def old_yaml_format_traverser(data):
re_key = re.compile(r"peer(\d+)_(\d+)")
for key, value in data.items():
# example peer0_0, where 0 is peer number and 0 is index number
matches = re_key.match(key)

yield {
"peer": int(matches.group(1)),
"data": value,
"index": int(matches.group(2)),
}


def decrypt_file(file, password, max_packets: int = None):
try:
data = ordered_load(file, yaml.loader.SafeLoader)
parser = PayloadParser()

if "peers" in data and "packets" in data:
iterator = data["packets"]
else:
iterator = old_yaml_format_traverser(data)

n = 0
for key, value in data.items():
if not value[0] == 0xaa:
for packet in iterator:
value = packet["data"]
key = packet["index"]
if not value[0] == 0xAA:
print(f"{Colors.RED}Not an IP packet: {value}{Colors.ENDC}")
continue
header = value[0:16]
Expand All @@ -167,7 +201,6 @@ def decrypt_file(file, password, max_packets: int = None):
if parsed.header.sub_command == 0:
assert password == parsed.payload, "Wrong decryption password"


if (
parsed.header.command == IPMessageCommand.connect
and parsed.header.message_type == IPMessageType.ip_response
Expand All @@ -187,7 +220,7 @@ def decrypt_file(file, password, max_packets: int = None):

print(
f"\tpayload: {binascii.hexlify(parsed.payload)}\n",
f"\tpayload_raw: {parsed.payload}"
f"\tpayload_raw: {parsed.payload}",
)

print(parsed)
Expand Down

0 comments on commit e663744

Please sign in to comment.