#!/usr/bin/env python # -*- coding: utf-8 -*- # # The MIT License (MIT) # # Copyright (c) 2016 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # # A UECP decoder, reading from a file generated by # my patched VLC, from ancillary MP2 bytes in a MPEG-TS # # Please refer to RDS UECP specification import sys import struct import crc # Message Element Codes uecp_mec_names = { 0x01: "PI" , # Program Identification 0x02: "PS" , # Program Service name 0x06: "PIN" , # Program Item Number 0x04: "DI_PTYI" , # Decoder Information / Dynamic PTY Indicator 0x03: "TA_TP" , # Traffic Anouncement / Traffic Programme 0x05: "MS" , # Music / Speech switch 0x07: "PTY" , # Programme Type 0x3E: "PTYN" , # Programme Type Name 0x0A: "RT" , # RadioText 0x0D: "RTC" , # Real Time Clock 0x19: "CT" , # Enable RTC (group 4 version A) transmits 0x1C: "DSN_SELECT" , # Set active Data Set 0x0B: "PSN_ENABLE" , # Enable / disable a specific PSN 0x1E: "RDSON" , # Enable / disable the RDS output signal 0x23: "SET_SITE_ADDR" , # Add a site address to the encoder 0x27: "SET_ENC_ADDR" , # Add an encoder address to the encoder 0x17: "MSG_REQUEST" , # Request data from the encoder 0x18: "MSG_ACK" , # ACK of a received message 0x2C: "SET_COMM_MODE" } # Set communication mode for the encoder def usage(): print("Usage:\n{} ".format(sys.argv[0])) if len(sys.argv) < 2: usage() sys.exit(1) class UECP_Message_Decoder(): def __init__(self, message_bytes): """Create a message decoder that creates Message Elements from a message""" self.message = message_bytes self.mec = message_bytes[0] assert(0x01 <= self.mec <= 0xFD) if self.mec in uecp_mec_names: name = uecp_mec_names[self.mec] print(" MEC={}".format(name)) if name == "PS": dsn = message_bytes[1] psn = message_bytes[2] ps = message_bytes[3:-1] print(" PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps)) elif name == "RT": dsn = message_bytes[1] psn = message_bytes[2] mel = message_bytes[3] med = message_bytes[4:-1] print(" RT DSN={:02x} PSN={:02x} MEL={:02} ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med)) else: print(" MEC={}".format(self.mec)) class UECP_Frame_Decoder(): def __init__(self, uecp_bytes): """Create a new decoder from bytes containing a complete UECP frame, from STA to STO. See 2.2.1 General Frame Format""" assert(uecp_bytes[0] == 0xfe) assert(uecp_bytes[-1] == 0xff) self.trapped_data = uecp_bytes self.untrap() self.decode_frame() def untrap(self): """Unescapes characters as defined in 2.2.9""" self.data = [0xfe] next_untrap = False for b in self.trapped_data[1:-1]: if next_untrap: next_untrap = False if b == 0x00: self.data.append(0xFD) elif b == 0x01: self.data.append(0xFE) elif b == 0x02: self.data.append(0xFF) else: raise ValueError("untrap {} invalid".format(b)) elif b == 0xFD: next_untrap = True elif b == 0xFE or b == 0xFF: raise ValueError("untrapped data {} invalid".format(b)) else: self.data.append(b) self.data.append(0xff) def check_crc(self): if 0: print("B " + " ".join("{:02x}".format(b) for b in self.data)) for i in range(1, len(self.data)): print(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i, self.crc, 0xffff ^ crc.crc16(self.data[1:i]), crc.crc_ccitt(self.data[1:i]))) self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl]) return self.crc_calc == self.crc def decode_frame(self): """Decodes an untrapped frame""" self.addr = self.data[1] * 256 + self.data[2] self.sqc = self.data[3] self.mfl = self.data[4] self.msg = self.data[5:5+self.mfl] assert(self.mfl + 8 == len(self.data)) self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1] self.crc_ok = self.check_crc() UECP_Message_Decoder(self.msg) in_fd = open(sys.argv[1], "r") for line in in_fd: line_bytes = [int(i, 16) for i in line.split()] line_bytes.reverse() anc_header = line_bytes[0] if anc_header == 0xfd: anc_len = line_bytes[1] if anc_len > 0: anc_bytes = line_bytes[2:2+anc_len] if anc_bytes[-1] == 0xff: uecp = UECP_Frame_Decoder(anc_bytes) print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format( uecp.addr, uecp.sqc, uecp.mfl, uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))