aboutsummaryrefslogtreecommitdiffstats
path: root/uecpparse/uecp_parse.py
diff options
context:
space:
mode:
Diffstat (limited to 'uecpparse/uecp_parse.py')
-rwxr-xr-xuecpparse/uecp_parse.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/uecpparse/uecp_parse.py b/uecpparse/uecp_parse.py
new file mode 100755
index 0000000..cdfccd5
--- /dev/null
+++ b/uecpparse/uecp_parse.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2016 Matthias P. Braendli
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+#
+# A UECP decoder, reading from a file generated by
+# my patched VLC, from ancillary MP2 bytes in a MPEG-TS
+#
+# Please refer to RDS UECP specification
+
+import sys
+import struct
+import crc
+
+# Message Element Codes
+uecp_mec_names = {
+ 0x01: "PI" , # Program Identification
+ 0x02: "PS" , # Program Service name
+ 0x06: "PIN" , # Program Item Number
+ 0x04: "DI_PTYI" , # Decoder Information / Dynamic PTY Indicator
+ 0x03: "TA_TP" , # Traffic Anouncement / Traffic Programme
+ 0x05: "MS" , # Music / Speech switch
+ 0x07: "PTY" , # Programme Type
+ 0x3E: "PTYN" , # Programme Type Name
+ 0x0A: "RT" , # RadioText
+ 0x0D: "RTC" , # Real Time Clock
+ 0x19: "CT" , # Enable RTC (group 4 version A) transmits
+ 0x1C: "DSN_SELECT" , # Set active Data Set
+ 0x0B: "PSN_ENABLE" , # Enable / disable a specific PSN
+ 0x1E: "RDSON" , # Enable / disable the RDS output signal
+ 0x23: "SET_SITE_ADDR" , # Add a site address to the encoder
+ 0x27: "SET_ENC_ADDR" , # Add an encoder address to the encoder
+ 0x17: "MSG_REQUEST" , # Request data from the encoder
+ 0x18: "MSG_ACK" , # ACK of a received message
+ 0x2C: "SET_COMM_MODE" } # Set communication mode for the encoder
+
+def usage():
+ print("Usage:\n{} <filename>".format(sys.argv[0]))
+
+if len(sys.argv) < 2:
+ usage()
+ sys.exit(1)
+
+class UECP_Message_Decoder():
+ def __init__(self, message_bytes):
+ """Create a message decoder that creates
+ Message Elements from a message"""
+ self.message = message_bytes
+ self.mec = message_bytes[0]
+ assert(0x01 <= self.mec <= 0xFD)
+
+ if self.mec in uecp_mec_names:
+ name = uecp_mec_names[self.mec]
+ print(" MEC={}".format(name))
+ if name == "PS":
+ dsn = message_bytes[1]
+ psn = message_bytes[2]
+ ps = message_bytes[3:-1]
+ print(" PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps))
+ elif name == "RT":
+ dsn = message_bytes[1]
+ psn = message_bytes[2]
+ mel = message_bytes[3]
+ med = message_bytes[4:-1]
+ print(" RT DSN={:02x} PSN={:02x} MEL={:02} ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med))
+
+ else:
+ print(" MEC={}".format(self.mec))
+
+class UECP_Frame_Decoder():
+ def __init__(self, uecp_bytes):
+ """Create a new decoder from bytes containing a
+ complete UECP frame, from STA to STO.
+ See 2.2.1 General Frame Format"""
+ assert(uecp_bytes[0] == 0xfe)
+ assert(uecp_bytes[-1] == 0xff)
+ self.trapped_data = uecp_bytes
+ self.untrap()
+ self.decode_frame()
+
+ def untrap(self):
+ """Unescapes characters as defined in 2.2.9"""
+ self.data = [0xfe]
+
+ next_untrap = False
+ for b in self.trapped_data[1:-1]:
+ if next_untrap:
+ next_untrap = False
+ if b == 0x00:
+ self.data.append(0xFD)
+ elif b == 0x01:
+ self.data.append(0xFE)
+ elif b == 0x02:
+ self.data.append(0xFF)
+ else:
+ raise ValueError("untrap {} invalid".format(b))
+ elif b == 0xFD:
+ next_untrap = True
+ elif b == 0xFE or b == 0xFF:
+ raise ValueError("untrapped data {} invalid".format(b))
+ else:
+ self.data.append(b)
+ self.data.append(0xff)
+
+ def check_crc(self):
+ if 0:
+ print("B " + " ".join("{:02x}".format(b) for b in self.data))
+ for i in range(1, len(self.data)):
+ print(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i,
+ self.crc,
+ 0xffff ^ crc.crc16(self.data[1:i]),
+ crc.crc_ccitt(self.data[1:i])))
+
+ self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl])
+ return self.crc_calc == self.crc
+
+ def decode_frame(self):
+ """Decodes an untrapped frame"""
+ self.addr = self.data[1] * 256 + self.data[2]
+ self.sqc = self.data[3]
+ self.mfl = self.data[4]
+ self.msg = self.data[5:5+self.mfl]
+ assert(self.mfl + 8 == len(self.data))
+ self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1]
+ self.crc_ok = self.check_crc()
+
+ UECP_Message_Decoder(self.msg)
+
+in_fd = open(sys.argv[1], "r")
+for line in in_fd:
+ line_bytes = [int(i, 16) for i in line.split()]
+
+ line_bytes.reverse()
+
+ anc_header = line_bytes[0]
+ if anc_header == 0xfd:
+ anc_len = line_bytes[1]
+ if anc_len > 0:
+ anc_bytes = line_bytes[2:2+anc_len]
+
+ if anc_bytes[-1] == 0xff:
+ uecp = UECP_Frame_Decoder(anc_bytes)
+
+ print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
+ uecp.addr, uecp.sqc, uecp.mfl,
+ uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))
+