From a13a871ade41bbebf85760acf598eecabe7dc0c5 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 6 Jun 2014 18:06:49 +0200 Subject: Add EDI dump analyser script --- edi/edidebug.py | 279 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100755 edi/edidebug.py (limited to 'edi/edidebug.py') diff --git a/edi/edidebug.py b/edi/edidebug.py new file mode 100755 index 0000000..0b54622 --- /dev/null +++ b/edi/edidebug.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python2 +# +# Decode EDI for debugging + +import sys +import struct + +from crc import crc16 + +class Printer: + def __init__(self): + self.indent = 0 + def pr(self, s): + print(" " * self.indent + s) + def inc(self): + self.indent += 1 + def dec(self): + self.indent -= 1 + +p = Printer() + +def decode(stream): + p.pr("start") + success = False + + pos = stream.tell() + sync = stream.read(2) + stream.seek(pos) + + if len(sync) < 2: + p.pr("EOF") + return False + + if sync == "PF": + if decode_pft(stream): + p.pr("PFT decode success") + success = True + else: + p.pr("PFT decode fail") + elif sync == "AF": + if decode_af(stream): + p.pr("AF decode success") + success = True + else: + p.pr("AF decode fail") + return success + + +pft_head_struct = "!2sH3B3BH" +pft_rs_head_struct = "!2B" +pft_addr_head_struct = "!2H" +def decode_pft(stream): + p.inc() + p.pr("start decoding PF") + + headerdata = stream.read(12) + header = struct.unpack(pft_head_struct, headerdata) + + psync, pseq, findex1, findex2, findex3, fcount1, fcount2, fcount3, fec_ad_plen = header + + findex = (findex1 << 16) | (findex2 << 8) | findex3 + fcount = (fcount1 << 16) | (fcount2 << 8) | fcount3 + + fec = (fec_ad_plen & 0x80) != 0x00 + addr = (fec_ad_plen & 0x40) != 0x00 + plen = fec_ad_plen & 0x3F + + # try to sync according to TS 102 821 Clause 7.4.1 + if psync != "PF": + p.pr("No PF Sync") + p.dec() + return False + + rs_k = 0 + rs_z = 0 + if fec: + rs_head = stream.read(2) + rs_k, rs_z = struct.unpack(pft_rs_head_struct, rs_head) + headerdata += rs_head + + addr_source = 0 + addr_dest = 0 + if addr: + addr_head = stream.read(4) + addr_source, addr_dest = struct.unpack(pft_addr_head_struct, addr_head) + headerdata += addr_head + + # read CRC + headerdata += stream.read(2) + + crc_ok = crc16(headerdata) == 0x1d0f + + if crc_ok: + p.pr("CRC ok") + else: + p.pr("CRC not ok!") + + p.pr("pseq {}".format(pseq)) + p.pr("findex {}".format(findex)) + p.pr("fcount {}".format(fcount)) + if fec: + p.pr("with fec:") + p.pr(" RSk={}".format(rs_k)) + p.pr(" RSz={}".format(rs_z)) + if addr: + p.pr("with transport header:") + p.pr(" source={}".format(addr_source)) + p.pr(" dest={}".format(addr_dest)) + p.pr("payload length={}".format(plen)) + + success = False + if crc_ok and fec: + p.pr("RS PACKET TODO") + elif crc_ok: + success = decode_af(stream) + + p.dec() + return success + +af_head_struct = "!2sLHBc" +def decode_af(stream): + p.pr("AF Packet") + p.inc() + + headerdata = stream.read(10) + + sync, plen, seq, ar, pt = struct.unpack(af_head_struct, headerdata) + + if sync != "AF": + p.pr("No AF Sync") + p.dec() + return False + + crc_flag = (ar & 0x80) != 0x00 + revision = ar & 0x7F + + payload = stream.read(plen) + + crc = struct.unpack("!H", stream.read(2))[0] + + crc_calc = crc16(headerdata) + crc_calc = crc16(payload, crc_calc) + crc_calc ^= 0xFFFF + # manual endianness conversion + crc_calc = ((crc_calc >> 8) | (crc_calc << 8)) & 0xFFFF + + crc_ok = crc_calc == crc + + if crc_flag and crc_ok: + p.pr("CRC ok") + elif crc_flag: + p.pr("CRC not ok!") + p.pr(" CRC: is 0x{0:04x}, calculated 0x{1:04x}".format(crc, crc_calc)) + else: + p.pr("No CRC") + + p.pr("plen {}".format(plen)) + p.pr("seq {}".format(seq)) + p.pr("revision {}".format(revision)) + p.pr("protocol type {}".format(pt)) + + success = False + if pt == "T": + success = decode_tag(payload) + + p.dec() + return success + +tag_item_head_struct = "!4sL" +def tagitems(tagpacket): + i = 0 + while i+8 < len(tagpacket): + name, length = struct.unpack(tag_item_head_struct, tagpacket[i:i+8]) + + # length is in bits, because it's more annoying this way + assert(length % 8 == 0) + length /= 8 + + tag_value = tagpacket[i+8:i+8+length] + yield {'name': name, 'length': length, 'value': tag_value} + + i += 8 + length + +def decode_tag(tagpacket): + p.pr("Tag packet len={}".format(len(tagpacket))) + p.inc() + for item in tagitems(tagpacket): + if item['name'] == "deti": + decode_deti(item) + elif item['name'].startswith("est"): + decode_estn(item) + else: + p.pr("Tag item '{}' ({})".format(item['name'], item['length'])) + + p.dec() + return True + +item_deti_header_struct = "!BBBBH" +def decode_deti(item): + p.pr("TAG item {} ({})".format(item['name'], item['length'])) + p.inc() + tag_value = item['value'] + + unpacked = struct.unpack(item_deti_header_struct, tag_value[:6]) + flag_fcth, fctl, stat, mid_fp, mnsc = unpacked + + + atstf = flag_fcth & 0x80 != 0 + if atstf: + utco, seconds, tsta1, tsta2, tsta3 = struct.unpack("!BL3B", tag_value[6:6+8]) + tsta = (tsta1 << 16) | (tsta2 << 8) | tsta3 + + ficf = flag_fcth & 0x40 != 0 + rfudf = flag_fcth & 0x20 != 0 + fcth = flag_fcth & 0x1F + fct = fcth << 5 | fctl + + mid = (mid_fp >> 6) & 0x03 + fp = (mid_fp >> 3) & 0x07 + + + p.pr("FICF = {}".format(ficf)) + p.pr("ATST = {}".format(atstf)) + p.pr("RFUDF = {}".format(rfudf)) + p.pr("FCT = {} (0x{:02x} 0x{:02x})".format(fct, fcth, fctl)) + p.pr("STAT = 0x{:02x}".format(stat)) + p.pr("Mode id = {}".format(mid)) + p.pr("Frame phase = {}".format(fp)) + p.pr("MNSC = 0x{:02x}".format(mnsc)) + if atstf: + p.pr("UTCOffset = {}".format(utco)) + p.pr("Seconds = {}".format(seconds)) + p.pr("TSTA = {}".format(tsta)) + + + len_fic = len(tag_value) - 2 - 4 + + if atstf: + len_fic -= 8 + + if rfudf: + len_fic -= 3 + + p.pr("FIC data len {}".format(len_fic)) + + p.dec() + +item_estn_head_struct = "!HB" +def decode_estn(item): + estN = chr(ord("0") + ord(item['name'][3])) + p.pr("TAG item EST{} ({})".format(estN, item['length'])) + p.inc() + tag_value = item['value'] + + scid_sad, tpl_rfa = struct.unpack(item_estn_head_struct, tag_value[:3]) + scid = scid_sad >> 10 + sad = scid_sad & 0x3F + tpl = tpl_rfa >> 2 + + p.pr("SCID = {}".format(scid)) + p.pr("SAD = {}".format(sad)) + p.pr("TPL = {}".format(tpl)) + + assert(item['length'] == len(tag_value)) + + p.pr("MST len = {}".format(len(tag_value) - 3)) + + p.dec() + +if len(sys.argv) < 2: + print("Filename expected") + sys.exit(1) + +filename = sys.argv[1] + +edi_fd = open(filename, "r") +while decode(edi_fd): + pass + -- cgit v1.2.3