From a13a871ade41bbebf85760acf598eecabe7dc0c5 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 6 Jun 2014 18:06:49 +0200 Subject: Add EDI dump analyser script --- edi/crc.py | 46 ++++++++++ edi/edidebug.py | 279 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 edi/crc.py create mode 100755 edi/edidebug.py (limited to 'edi') diff --git a/edi/crc.py b/edi/crc.py new file mode 100644 index 0000000..ad520fc --- /dev/null +++ b/edi/crc.py @@ -0,0 +1,46 @@ +#library for crc16 calculation + + +crc16tab = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 ] + +def crc16(data, l_crc=0xffff): + for d in data: + #print("crc 0x{:x} 0x{:x}".format(ord(d), l_crc)) + l_crc = (l_crc << 8) ^ crc16tab[(l_crc >> 8) ^ ord(d)] + l_crc = l_crc & 0xFFFF + + return l_crc + + diff --git a/edi/edidebug.py b/edi/edidebug.py new file mode 100755 index 0000000..0b54622 --- /dev/null +++ b/edi/edidebug.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python2 +# +# Decode EDI for debugging + +import sys +import struct + +from crc import crc16 + +class Printer: + def __init__(self): + self.indent = 0 + def pr(self, s): + print(" " * self.indent + s) + def inc(self): + self.indent += 1 + def dec(self): + self.indent -= 1 + +p = Printer() + +def decode(stream): + p.pr("start") + success = False + + pos = stream.tell() + sync = stream.read(2) + stream.seek(pos) + + if len(sync) < 2: + p.pr("EOF") + return False + + if sync == "PF": + if decode_pft(stream): + p.pr("PFT decode success") + success = True + else: + p.pr("PFT decode fail") + elif sync == "AF": + if decode_af(stream): + p.pr("AF decode success") + success = True + else: + p.pr("AF decode fail") + return success + + +pft_head_struct = "!2sH3B3BH" +pft_rs_head_struct = "!2B" +pft_addr_head_struct = "!2H" +def decode_pft(stream): + p.inc() + p.pr("start decoding PF") + + headerdata = stream.read(12) + header = struct.unpack(pft_head_struct, headerdata) + + psync, pseq, findex1, findex2, findex3, fcount1, fcount2, fcount3, fec_ad_plen = header + + findex = (findex1 << 16) | (findex2 << 8) | findex3 + fcount = (fcount1 << 16) | (fcount2 << 8) | fcount3 + + fec = (fec_ad_plen & 0x80) != 0x00 + addr = (fec_ad_plen & 0x40) != 0x00 + plen = fec_ad_plen & 0x3F + + # try to sync according to TS 102 821 Clause 7.4.1 + if psync != "PF": + p.pr("No PF Sync") + p.dec() + return False + + rs_k = 0 + rs_z = 0 + if fec: + rs_head = stream.read(2) + rs_k, rs_z = struct.unpack(pft_rs_head_struct, rs_head) + headerdata += rs_head + + addr_source = 0 + addr_dest = 0 + if addr: + addr_head = stream.read(4) + addr_source, addr_dest = struct.unpack(pft_addr_head_struct, addr_head) + headerdata += addr_head + + # read CRC + headerdata += stream.read(2) + + crc_ok = crc16(headerdata) == 0x1d0f + + if crc_ok: + p.pr("CRC ok") + else: + p.pr("CRC not ok!") + + p.pr("pseq {}".format(pseq)) + p.pr("findex {}".format(findex)) + p.pr("fcount {}".format(fcount)) + if fec: + p.pr("with fec:") + p.pr(" RSk={}".format(rs_k)) + p.pr(" RSz={}".format(rs_z)) + if addr: + p.pr("with transport header:") + p.pr(" source={}".format(addr_source)) + p.pr(" dest={}".format(addr_dest)) + p.pr("payload length={}".format(plen)) + + success = False + if crc_ok and fec: + p.pr("RS PACKET TODO") + elif crc_ok: + success = decode_af(stream) + + p.dec() + return success + +af_head_struct = "!2sLHBc" +def decode_af(stream): + p.pr("AF Packet") + p.inc() + + headerdata = stream.read(10) + + sync, plen, seq, ar, pt = struct.unpack(af_head_struct, headerdata) + + if sync != "AF": + p.pr("No AF Sync") + p.dec() + return False + + crc_flag = (ar & 0x80) != 0x00 + revision = ar & 0x7F + + payload = stream.read(plen) + + crc = struct.unpack("!H", stream.read(2))[0] + + crc_calc = crc16(headerdata) + crc_calc = crc16(payload, crc_calc) + crc_calc ^= 0xFFFF + # manual endianness conversion + crc_calc = ((crc_calc >> 8) | (crc_calc << 8)) & 0xFFFF + + crc_ok = crc_calc == crc + + if crc_flag and crc_ok: + p.pr("CRC ok") + elif crc_flag: + p.pr("CRC not ok!") + p.pr(" CRC: is 0x{0:04x}, calculated 0x{1:04x}".format(crc, crc_calc)) + else: + p.pr("No CRC") + + p.pr("plen {}".format(plen)) + p.pr("seq {}".format(seq)) + p.pr("revision {}".format(revision)) + p.pr("protocol type {}".format(pt)) + + success = False + if pt == "T": + success = decode_tag(payload) + + p.dec() + return success + +tag_item_head_struct = "!4sL" +def tagitems(tagpacket): + i = 0 + while i+8 < len(tagpacket): + name, length = struct.unpack(tag_item_head_struct, tagpacket[i:i+8]) + + # length is in bits, because it's more annoying this way + assert(length % 8 == 0) + length /= 8 + + tag_value = tagpacket[i+8:i+8+length] + yield {'name': name, 'length': length, 'value': tag_value} + + i += 8 + length + +def decode_tag(tagpacket): + p.pr("Tag packet len={}".format(len(tagpacket))) + p.inc() + for item in tagitems(tagpacket): + if item['name'] == "deti": + decode_deti(item) + elif item['name'].startswith("est"): + decode_estn(item) + else: + p.pr("Tag item '{}' ({})".format(item['name'], item['length'])) + + p.dec() + return True + +item_deti_header_struct = "!BBBBH" +def decode_deti(item): + p.pr("TAG item {} ({})".format(item['name'], item['length'])) + p.inc() + tag_value = item['value'] + + unpacked = struct.unpack(item_deti_header_struct, tag_value[:6]) + flag_fcth, fctl, stat, mid_fp, mnsc = unpacked + + + atstf = flag_fcth & 0x80 != 0 + if atstf: + utco, seconds, tsta1, tsta2, tsta3 = struct.unpack("!BL3B", tag_value[6:6+8]) + tsta = (tsta1 << 16) | (tsta2 << 8) | tsta3 + + ficf = flag_fcth & 0x40 != 0 + rfudf = flag_fcth & 0x20 != 0 + fcth = flag_fcth & 0x1F + fct = fcth << 5 | fctl + + mid = (mid_fp >> 6) & 0x03 + fp = (mid_fp >> 3) & 0x07 + + + p.pr("FICF = {}".format(ficf)) + p.pr("ATST = {}".format(atstf)) + p.pr("RFUDF = {}".format(rfudf)) + p.pr("FCT = {} (0x{:02x} 0x{:02x})".format(fct, fcth, fctl)) + p.pr("STAT = 0x{:02x}".format(stat)) + p.pr("Mode id = {}".format(mid)) + p.pr("Frame phase = {}".format(fp)) + p.pr("MNSC = 0x{:02x}".format(mnsc)) + if atstf: + p.pr("UTCOffset = {}".format(utco)) + p.pr("Seconds = {}".format(seconds)) + p.pr("TSTA = {}".format(tsta)) + + + len_fic = len(tag_value) - 2 - 4 + + if atstf: + len_fic -= 8 + + if rfudf: + len_fic -= 3 + + p.pr("FIC data len {}".format(len_fic)) + + p.dec() + +item_estn_head_struct = "!HB" +def decode_estn(item): + estN = chr(ord("0") + ord(item['name'][3])) + p.pr("TAG item EST{} ({})".format(estN, item['length'])) + p.inc() + tag_value = item['value'] + + scid_sad, tpl_rfa = struct.unpack(item_estn_head_struct, tag_value[:3]) + scid = scid_sad >> 10 + sad = scid_sad & 0x3F + tpl = tpl_rfa >> 2 + + p.pr("SCID = {}".format(scid)) + p.pr("SAD = {}".format(sad)) + p.pr("TPL = {}".format(tpl)) + + assert(item['length'] == len(tag_value)) + + p.pr("MST len = {}".format(len(tag_value) - 3)) + + p.dec() + +if len(sys.argv) < 2: + print("Filename expected") + sys.exit(1) + +filename = sys.argv[1] + +edi_fd = open(filename, "r") +while decode(edi_fd): + pass + -- cgit v1.2.3