aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-04-22 15:31:57 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-04-22 15:31:57 +0200
commitf575d8a0d858233e220d1cf8dac11b584cf25036 (patch)
tree8ed1151d4024097c215216e316465e39ad861a32
parent9983e7e56ecc1ab388b071a9aa3638483770ce21 (diff)
downloadmmbtools-aux-f575d8a0d858233e220d1cf8dac11b584cf25036.tar.gz
mmbtools-aux-f575d8a0d858233e220d1cf8dac11b584cf25036.tar.bz2
mmbtools-aux-f575d8a0d858233e220d1cf8dac11b584cf25036.zip
Improve UECP parsing
-rwxr-xr-xuecpparse/uecp_parse.py112
1 files changed, 74 insertions, 38 deletions
diff --git a/uecpparse/uecp_parse.py b/uecpparse/uecp_parse.py
index cdfccd5..bbd3be7 100755
--- a/uecpparse/uecp_parse.py
+++ b/uecpparse/uecp_parse.py
@@ -33,6 +33,8 @@ import sys
import struct
import crc
+verbose=True
+
# Message Element Codes
uecp_mec_names = {
0x01: "PI" , # Program Identification
@@ -68,7 +70,8 @@ class UECP_Message_Decoder():
Message Elements from a message"""
self.message = message_bytes
self.mec = message_bytes[0]
- assert(0x01 <= self.mec <= 0xFD)
+ if not(0x01 <= self.mec <= 0xFD):
+ raise ValueError("MEC 0x{:02x} out of range".format(self.mec))
if self.mec in uecp_mec_names:
name = uecp_mec_names[self.mec]
@@ -83,45 +86,50 @@ class UECP_Message_Decoder():
psn = message_bytes[2]
mel = message_bytes[3]
med = message_bytes[4:-1]
- print(" RT DSN={:02x} PSN={:02x} MEL={:02} ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med))
+ print(" RT DSN={:02x} PSN={:02x} MEL={:02}, ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med))
else:
print(" MEC={}".format(self.mec))
class UECP_Frame_Decoder():
- def __init__(self, uecp_bytes):
+ def __init__(self):
"""Create a new decoder from bytes containing a
complete UECP frame, from STA to STO.
See 2.2.1 General Frame Format"""
- assert(uecp_bytes[0] == 0xfe)
- assert(uecp_bytes[-1] == 0xff)
- self.trapped_data = uecp_bytes
- self.untrap()
- self.decode_frame()
-
- def untrap(self):
- """Unescapes characters as defined in 2.2.9"""
- self.data = [0xfe]
-
- next_untrap = False
- for b in self.trapped_data[1:-1]:
- if next_untrap:
- next_untrap = False
- if b == 0x00:
- self.data.append(0xFD)
- elif b == 0x01:
- self.data.append(0xFE)
- elif b == 0x02:
- self.data.append(0xFF)
- else:
- raise ValueError("untrap {} invalid".format(b))
- elif b == 0xFD:
- next_untrap = True
- elif b == 0xFE or b == 0xFF:
- raise ValueError("untrapped data {} invalid".format(b))
+ self.next_untrap = False
+ self.message_begin_seen = False
+ self.data = []
+
+ def add_byte(self, b):
+ """Unescapes characters as defined in 2.2.9
+
+ Returns True if more data is needed"""
+ if self.next_untrap:
+ self.next_untrap = False
+ if b == 0x00:
+ self.data.append(0xFD)
+ elif b == 0x01:
+ self.data.append(0xFE)
+ elif b == 0x02:
+ self.data.append(0xFF)
else:
- self.data.append(b)
- self.data.append(0xff)
+ raise ValueError("untrap 0x{:02x} invalid".format(b))
+ elif b == 0xFD and self.message_begin_seen:
+ self.next_untrap = True
+ elif b == 0xFE and not self.message_begin_seen:
+ self.message_begin_seen = True
+ self.data.append(b)
+ elif b == 0xFF and self.message_begin_seen:
+ # end of message
+ self.data.append(b)
+ self.decode_frame()
+ return False
+ elif self.message_begin_seen:
+ self.data.append(b)
+ else:
+ #print("Dropping 0x{:02x}".format(b))
+ pass
+ return True
def check_crc(self):
if 0:
@@ -137,32 +145,60 @@ class UECP_Frame_Decoder():
def decode_frame(self):
"""Decodes an untrapped frame"""
+
+ if verbose:
+ print("Decoding " + " ".join("{:02x}".format(b) for b in self.data))
+
self.addr = self.data[1] * 256 + self.data[2]
self.sqc = self.data[3]
self.mfl = self.data[4]
self.msg = self.data[5:5+self.mfl]
- assert(self.mfl + 8 == len(self.data))
+ if self.mfl + 8 != len(self.data):
+ raise ValueError("MFL {}, expected {}".format(self.mfl, len(self.data) - 8))
self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1]
self.crc_ok = self.check_crc()
UECP_Message_Decoder(self.msg)
in_fd = open(sys.argv[1], "r")
-for line in in_fd:
+
+uecp = UECP_Frame_Decoder()
+
+def parse_anc_bytes(anc_bytes):
+ global uecp
+ for b in anc_bytes:
+
+ need_more_data = uecp.add_byte(b)
+
+ if not need_more_data:
+ uecp.decode_frame()
+ print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
+ uecp.addr, uecp.sqc, uecp.mfl,
+ uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))
+ uecp = UECP_Frame_Decoder()
+
+for line_nr, line in enumerate(in_fd):
line_bytes = [int(i, 16) for i in line.split()]
line_bytes.reverse()
anc_header = line_bytes[0]
+ anc_bytes = None
+
if anc_header == 0xfd:
anc_len = line_bytes[1]
if anc_len > 0:
anc_bytes = line_bytes[2:2+anc_len]
+ elif anc_header != 0x00:
+ anc_len = line_bytes[0]
+ if anc_len > 0:
+ anc_bytes = line_bytes[1:1+anc_len]
- if anc_bytes[-1] == 0xff:
- uecp = UECP_Frame_Decoder(anc_bytes)
+ if anc_bytes:
+ try:
+ parse_anc_bytes(anc_bytes)
+ except ValueError as e:
+ print("***** Error on line {}: {}".format(line_nr+1, e))
+ uecp = UECP_Frame_Decoder()
- print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
- uecp.addr, uecp.sqc, uecp.mfl,
- uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))