From f575d8a0d858233e220d1cf8dac11b584cf25036 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 22 Apr 2016 15:31:57 +0200 Subject: Improve UECP parsing --- uecpparse/uecp_parse.py | 112 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 38 deletions(-) (limited to 'uecpparse') diff --git a/uecpparse/uecp_parse.py b/uecpparse/uecp_parse.py index cdfccd5..bbd3be7 100755 --- a/uecpparse/uecp_parse.py +++ b/uecpparse/uecp_parse.py @@ -33,6 +33,8 @@ import sys import struct import crc +verbose=True + # Message Element Codes uecp_mec_names = { 0x01: "PI" , # Program Identification @@ -68,7 +70,8 @@ class UECP_Message_Decoder(): Message Elements from a message""" self.message = message_bytes self.mec = message_bytes[0] - assert(0x01 <= self.mec <= 0xFD) + if not(0x01 <= self.mec <= 0xFD): + raise ValueError("MEC 0x{:02x} out of range".format(self.mec)) if self.mec in uecp_mec_names: name = uecp_mec_names[self.mec] @@ -83,45 +86,50 @@ class UECP_Message_Decoder(): psn = message_bytes[2] mel = message_bytes[3] med = message_bytes[4:-1] - print(" RT DSN={:02x} PSN={:02x} MEL={:02} ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med)) + print(" RT DSN={:02x} PSN={:02x} MEL={:02}, ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med)) else: print(" MEC={}".format(self.mec)) class UECP_Frame_Decoder(): - def __init__(self, uecp_bytes): + def __init__(self): """Create a new decoder from bytes containing a complete UECP frame, from STA to STO. See 2.2.1 General Frame Format""" - assert(uecp_bytes[0] == 0xfe) - assert(uecp_bytes[-1] == 0xff) - self.trapped_data = uecp_bytes - self.untrap() - self.decode_frame() - - def untrap(self): - """Unescapes characters as defined in 2.2.9""" - self.data = [0xfe] - - next_untrap = False - for b in self.trapped_data[1:-1]: - if next_untrap: - next_untrap = False - if b == 0x00: - self.data.append(0xFD) - elif b == 0x01: - self.data.append(0xFE) - elif b == 0x02: - self.data.append(0xFF) - else: - raise ValueError("untrap {} invalid".format(b)) - elif b == 0xFD: - next_untrap = True - elif b == 0xFE or b == 0xFF: - raise ValueError("untrapped data {} invalid".format(b)) + self.next_untrap = False + self.message_begin_seen = False + self.data = [] + + def add_byte(self, b): + """Unescapes characters as defined in 2.2.9 + + Returns True if more data is needed""" + if self.next_untrap: + self.next_untrap = False + if b == 0x00: + self.data.append(0xFD) + elif b == 0x01: + self.data.append(0xFE) + elif b == 0x02: + self.data.append(0xFF) else: - self.data.append(b) - self.data.append(0xff) + raise ValueError("untrap 0x{:02x} invalid".format(b)) + elif b == 0xFD and self.message_begin_seen: + self.next_untrap = True + elif b == 0xFE and not self.message_begin_seen: + self.message_begin_seen = True + self.data.append(b) + elif b == 0xFF and self.message_begin_seen: + # end of message + self.data.append(b) + self.decode_frame() + return False + elif self.message_begin_seen: + self.data.append(b) + else: + #print("Dropping 0x{:02x}".format(b)) + pass + return True def check_crc(self): if 0: @@ -137,32 +145,60 @@ class UECP_Frame_Decoder(): def decode_frame(self): """Decodes an untrapped frame""" + + if verbose: + print("Decoding " + " ".join("{:02x}".format(b) for b in self.data)) + self.addr = self.data[1] * 256 + self.data[2] self.sqc = self.data[3] self.mfl = self.data[4] self.msg = self.data[5:5+self.mfl] - assert(self.mfl + 8 == len(self.data)) + if self.mfl + 8 != len(self.data): + raise ValueError("MFL {}, expected {}".format(self.mfl, len(self.data) - 8)) self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1] self.crc_ok = self.check_crc() UECP_Message_Decoder(self.msg) in_fd = open(sys.argv[1], "r") -for line in in_fd: + +uecp = UECP_Frame_Decoder() + +def parse_anc_bytes(anc_bytes): + global uecp + for b in anc_bytes: + + need_more_data = uecp.add_byte(b) + + if not need_more_data: + uecp.decode_frame() + print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format( + uecp.addr, uecp.sqc, uecp.mfl, + uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok)) + uecp = UECP_Frame_Decoder() + +for line_nr, line in enumerate(in_fd): line_bytes = [int(i, 16) for i in line.split()] line_bytes.reverse() anc_header = line_bytes[0] + anc_bytes = None + if anc_header == 0xfd: anc_len = line_bytes[1] if anc_len > 0: anc_bytes = line_bytes[2:2+anc_len] + elif anc_header != 0x00: + anc_len = line_bytes[0] + if anc_len > 0: + anc_bytes = line_bytes[1:1+anc_len] - if anc_bytes[-1] == 0xff: - uecp = UECP_Frame_Decoder(anc_bytes) + if anc_bytes: + try: + parse_anc_bytes(anc_bytes) + except ValueError as e: + print("***** Error on line {}: {}".format(line_nr+1, e)) + uecp = UECP_Frame_Decoder() - print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format( - uecp.addr, uecp.sqc, uecp.mfl, - uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok)) -- cgit v1.2.3