aboutsummaryrefslogtreecommitdiffstats
path: root/edi
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-08-09 18:50:29 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-08-09 18:50:29 +0200
commit75a92f67431b372dc4f371eb5f69527d3bf161cf (patch)
tree504640125054fcb943a60fbb792966745ccdd901 /edi
parent1badbe52f57ac08509234af5c6a3dc96090c8800 (diff)
downloadmmbtools-aux-75a92f67431b372dc4f371eb5f69527d3bf161cf.tar.gz
mmbtools-aux-75a92f67431b372dc4f371eb5f69527d3bf161cf.tar.bz2
mmbtools-aux-75a92f67431b372dc4f371eb5f69527d3bf161cf.zip
Add EDI to ETI converter in edidebug
Diffstat (limited to 'edi')
-rwxr-xr-xedi/edidebug.py154
1 files changed, 149 insertions, 5 deletions
diff --git a/edi/edidebug.py b/edi/edidebug.py
index e70cb2b..a5528af 100755
--- a/edi/edidebug.py
+++ b/edi/edidebug.py
@@ -26,7 +26,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-
+from pprint import pprint
+import io
import sys
import struct
@@ -84,9 +85,116 @@ class BufferedFile:
self.buf.extend(dat)
return dat
+def tobyte(xs):
+ return bytes(bytearray([xs]))
+
+class EtiData:
+ def __init__(self):
+ self.clear()
+
+ def new_subchannel(self):
+ new_stc = {}
+ self.stc.append(new_stc)
+ return new_stc
+
+ def clear(self):
+ self.fc = {}
+ self.stc = []
+ self.mnsc = 0
+ self.complete = False
+ self.fic = []
+
+ def generate_eti(self):
+ # generate ETI(NI)
+ # SYNC
+ buf = io.BytesIO()
+ buf.write("\xff") # ERR
+ # FSYNC
+ if self.fc['FCT'] % 2 == 1:
+ buf.write("\xf8\xc5\x49")
+ else:
+ buf.write("\x07\x3a\xb6")
+
+ # LIDATA
+ # FC
+ buf.write(tobyte(self.fc['FCT'] & 0xff))
+
+ NST = len(self.stc)
+ buf.write(tobyte((self.fc['FICF'] << 7) | NST))
+
+ if self.fc['FICF'] == 0:
+ FICL = 0
+ elif self.fc['MID'] == 3:
+ FICL = 32
+ else:
+ FICL = 24
+
+ # EN 300 799 5.3.6
+ FL = NST + 1 + FICL + sum(subch['STL'] * 2 for subch in self.stc)
+
+ print("********** NST {}, FICL {}, stl {}, sum, {}".format(
+ NST, FICL, [subch['STL'] for subch in self.stc],
+ sum(subch['STL'] * 2 for subch in self.stc)))
+
+ buf.write(tobyte( (self.fc['FP'] << 5) |
+ (self.fc['MID'] << 3) |
+ ((FL & 0x700) >> 8)))
+
+ buf.write(tobyte(FL & 0xff))
+
+ # STC
+ for subch in self.stc:
+ buf.write(tobyte( (subch['SCID'] << 2) | ((subch['SAD'] & 0x300) >> 8) ))
+ buf.write(tobyte( subch['SAD'] & 0xff ))
+ buf.write(tobyte( (subch['TPL'] << 2) | ((subch['STL'] & 0x300) >> 8)))
+ buf.write(tobyte( subch['STL'] & 0xff ))
+
+ # EOH
+ # MNSC
+ buf.write(tobyte( (self.mnsc & 0xff00) >> 8 ))
+ buf.write(tobyte( self.mnsc & 0xff ))
+ # CRC
+ buf.seek(4)
+ headerdata = buf.read()
+ crc_calc = crc16(headerdata)
+ crc_calc ^= 0xFFFF
+ buf.write(tobyte( (crc_calc & 0xff00) >> 8))
+ buf.write(tobyte( crc_calc & 0xff ))
+
+ mst_start = buf.tell()
+ # MST
+ # FIC data
+ buf.write(bytes(bytearray(self.fic)))
+
+ # Data stream
+ for subch in self.stc:
+ buf.write(bytes(bytearray(subch['data'])))
+
+ # EOF
+ # CRC
+ buf.seek(mst_start)
+ mst_data = buf.read()
+ crc_calc = crc16(mst_data)
+ crc_calc ^= 0xFFFF
+ buf.write(tobyte( (crc_calc & 0xff00) >> 8))
+ buf.write(tobyte( crc_calc & 0xff ))
+ buf.write("\xff\xff") # RFU
+ # TIST
+ buf.write("\xff\xff\xff\xff") # TODO TIST in EDI is awful
+ length = buf.tell()
+
+ padding = 6144 - length
+
+ buf.write(bytes(bytearray("\x55" * padding)))
+
+ buf.seek(0)
+ return buf.read()
+
+
+eti_data = EtiData()
p = Printer()
# keys=findex
@@ -379,6 +487,7 @@ def tagitems(tagpacket):
i += 8 + length
p.pr("Completed decoding all TAG items after {} bytes".format(i))
+ eti_data.complete = True
def decode_tag(tagpacket):
p.pr("Tag packet len={}".format(len(tagpacket)))
@@ -424,20 +533,29 @@ def decode_deti(item):
unpacked = struct.unpack(item_deti_header_struct, tag_value[:6])
flag_fcth, fctl, stat, mid_fp, mnsc = unpacked
-
+ eti_data.mnsc = mnsc
atstf = flag_fcth & 0x80 != 0
+ eti_data.fc['ATSTF'] = int(atstf)
if atstf:
utco, seconds, tsta1, tsta2, tsta3 = struct.unpack("!BL3B", tag_value[6:6+8])
tsta = (tsta1 << 16) | (tsta2 << 8) | tsta3
+ eti_data.fc['TSTA'] = tsta
ficf = flag_fcth & 0x40 != 0
+ eti_data.fc['FICF'] = int(ficf)
+
rfudf = flag_fcth & 0x20 != 0
+
fcth = flag_fcth & 0x1F
fct = (fcth * 250) + fctl
+ eti_data.fc['FCT'] = fct
mid = (mid_fp >> 6) & 0x03
+ eti_data.fc['MID'] = mid
+
fp = (mid_fp >> 3) & 0x07
+ eti_data.fc['FP'] = fp
p.pr("FICF = {}".format(ficf))
@@ -462,6 +580,10 @@ def decode_deti(item):
if rfudf:
len_fic -= 3
+ fic_offset = len(tag_value) - len_fic
+
+ eti_data.fic = tag_value[fic_offset:]
+
p.pr("FIC data len {}".format(len_fic))
p.dec()
@@ -478,24 +600,46 @@ def decode_estn(item):
sad = scid_sad & 0x3F
tpl = tpl_rfa >> 2
+ stc = eti_data.new_subchannel()
+ stc['SCID'] = scid
+ stc['SAD'] = sad
+ stc['TPL'] = tpl
+ stl = len(tag_value) - 3
+ stc['STL'] = stl / 8
+
p.pr("SCID = {}".format(scid))
p.pr("SAD = {}".format(sad))
p.pr("TPL = {}".format(tpl))
assert(item['length'] == len(tag_value))
- p.pr("MST len = {}".format(len(tag_value) - 3))
+ p.pr("MST len = {}".format(stl))
p.hexpr("MST {} data".format(scid), tag_value[3:])
+ stc['data'] = tag_value[3:]
p.dec()
-if len(sys.argv) > 1:
+if len(sys.argv) > 2:
+ filename = sys.argv[1]
+ edi_fd = BufferedFile(filename)
+
+ eti_fd = open(sys.argv[2], "wb")
+
+
+elif len(sys.argv) == 2:
filename = sys.argv[1]
edi_fd = BufferedFile(filename)
+ eti_fd = None
else:
edi_fd = BufferedFile("-")
+ eti_fd = None
+c = 0
while decode(edi_fd):
- pass
+ if eti_data.complete and eti_fd:
+ eti_fd.write(eti_data.generate_eti())
+ c += 1
+ if c > 2:
+ break