diff options
-rwxr-xr-x | edi/edidebug.py | 154 |
1 files changed, 149 insertions, 5 deletions
diff --git a/edi/edidebug.py b/edi/edidebug.py index e70cb2b..a5528af 100755 --- a/edi/edidebug.py +++ b/edi/edidebug.py @@ -26,7 +26,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. - +from pprint import pprint +import io import sys import struct @@ -84,9 +85,116 @@ class BufferedFile: self.buf.extend(dat) return dat +def tobyte(xs): + return bytes(bytearray([xs])) + +class EtiData: + def __init__(self): + self.clear() + + def new_subchannel(self): + new_stc = {} + self.stc.append(new_stc) + return new_stc + + def clear(self): + self.fc = {} + self.stc = [] + self.mnsc = 0 + self.complete = False + self.fic = [] + + def generate_eti(self): + # generate ETI(NI) + # SYNC + buf = io.BytesIO() + buf.write("\xff") # ERR + # FSYNC + if self.fc['FCT'] % 2 == 1: + buf.write("\xf8\xc5\x49") + else: + buf.write("\x07\x3a\xb6") + + # LIDATA + # FC + buf.write(tobyte(self.fc['FCT'] & 0xff)) + + NST = len(self.stc) + buf.write(tobyte((self.fc['FICF'] << 7) | NST)) + + if self.fc['FICF'] == 0: + FICL = 0 + elif self.fc['MID'] == 3: + FICL = 32 + else: + FICL = 24 + + # EN 300 799 5.3.6 + FL = NST + 1 + FICL + sum(subch['STL'] * 2 for subch in self.stc) + + print("********** NST {}, FICL {}, stl {}, sum, {}".format( + NST, FICL, [subch['STL'] for subch in self.stc], + sum(subch['STL'] * 2 for subch in self.stc))) + + buf.write(tobyte( (self.fc['FP'] << 5) | + (self.fc['MID'] << 3) | + ((FL & 0x700) >> 8))) + + buf.write(tobyte(FL & 0xff)) + + # STC + for subch in self.stc: + buf.write(tobyte( (subch['SCID'] << 2) | ((subch['SAD'] & 0x300) >> 8) )) + buf.write(tobyte( subch['SAD'] & 0xff )) + buf.write(tobyte( (subch['TPL'] << 2) | ((subch['STL'] & 0x300) >> 8))) + buf.write(tobyte( subch['STL'] & 0xff )) + + # EOH + # MNSC + buf.write(tobyte( (self.mnsc & 0xff00) >> 8 )) + buf.write(tobyte( self.mnsc & 0xff )) + # CRC + buf.seek(4) + headerdata = buf.read() + crc_calc = crc16(headerdata) + crc_calc ^= 0xFFFF + buf.write(tobyte( (crc_calc & 0xff00) >> 8)) + buf.write(tobyte( crc_calc & 0xff )) + + mst_start = buf.tell() + # MST + # FIC data + buf.write(bytes(bytearray(self.fic))) + + # Data stream + for subch in self.stc: + buf.write(bytes(bytearray(subch['data']))) + + # EOF + # CRC + buf.seek(mst_start) + mst_data = buf.read() + crc_calc = crc16(mst_data) + crc_calc ^= 0xFFFF + buf.write(tobyte( (crc_calc & 0xff00) >> 8)) + buf.write(tobyte( crc_calc & 0xff )) + buf.write("\xff\xff") # RFU + # TIST + buf.write("\xff\xff\xff\xff") # TODO TIST in EDI is awful + length = buf.tell() + + padding = 6144 - length + + buf.write(bytes(bytearray("\x55" * padding))) + + buf.seek(0) + return buf.read() + + +eti_data = EtiData() p = Printer() # keys=findex @@ -379,6 +487,7 @@ def tagitems(tagpacket): i += 8 + length p.pr("Completed decoding all TAG items after {} bytes".format(i)) + eti_data.complete = True def decode_tag(tagpacket): p.pr("Tag packet len={}".format(len(tagpacket))) @@ -424,20 +533,29 @@ def decode_deti(item): unpacked = struct.unpack(item_deti_header_struct, tag_value[:6]) flag_fcth, fctl, stat, mid_fp, mnsc = unpacked - + eti_data.mnsc = mnsc atstf = flag_fcth & 0x80 != 0 + eti_data.fc['ATSTF'] = int(atstf) if atstf: utco, seconds, tsta1, tsta2, tsta3 = struct.unpack("!BL3B", tag_value[6:6+8]) tsta = (tsta1 << 16) | (tsta2 << 8) | tsta3 + eti_data.fc['TSTA'] = tsta ficf = flag_fcth & 0x40 != 0 + eti_data.fc['FICF'] = int(ficf) + rfudf = flag_fcth & 0x20 != 0 + fcth = flag_fcth & 0x1F fct = (fcth * 250) + fctl + eti_data.fc['FCT'] = fct mid = (mid_fp >> 6) & 0x03 + eti_data.fc['MID'] = mid + fp = (mid_fp >> 3) & 0x07 + eti_data.fc['FP'] = fp p.pr("FICF = {}".format(ficf)) @@ -462,6 +580,10 @@ def decode_deti(item): if rfudf: len_fic -= 3 + fic_offset = len(tag_value) - len_fic + + eti_data.fic = tag_value[fic_offset:] + p.pr("FIC data len {}".format(len_fic)) p.dec() @@ -478,24 +600,46 @@ def decode_estn(item): sad = scid_sad & 0x3F tpl = tpl_rfa >> 2 + stc = eti_data.new_subchannel() + stc['SCID'] = scid + stc['SAD'] = sad + stc['TPL'] = tpl + stl = len(tag_value) - 3 + stc['STL'] = stl / 8 + p.pr("SCID = {}".format(scid)) p.pr("SAD = {}".format(sad)) p.pr("TPL = {}".format(tpl)) assert(item['length'] == len(tag_value)) - p.pr("MST len = {}".format(len(tag_value) - 3)) + p.pr("MST len = {}".format(stl)) p.hexpr("MST {} data".format(scid), tag_value[3:]) + stc['data'] = tag_value[3:] p.dec() -if len(sys.argv) > 1: +if len(sys.argv) > 2: + filename = sys.argv[1] + edi_fd = BufferedFile(filename) + + eti_fd = open(sys.argv[2], "wb") + + +elif len(sys.argv) == 2: filename = sys.argv[1] edi_fd = BufferedFile(filename) + eti_fd = None else: edi_fd = BufferedFile("-") + eti_fd = None +c = 0 while decode(edi_fd): - pass + if eti_data.complete and eti_fd: + eti_fd.write(eti_data.generate_eti()) + c += 1 + if c > 2: + break |