#!/usr/bin/env python # -*- coding: utf-8 -*- # # The MIT License (MIT) # # Copyright (c) 2016 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # # A UECP decoder, reading from a UDP socket, receiving # data generated by my patched VLC, from ancillary MP2 bytes in a MPEG-TS # # Please refer to RDS UECP specification import sys import socket import struct import crc verbose=False def log(msg): if verbose: print(msg) # Message Element Codes uecp_mec_names = { 0x01: "PI" , # Program Identification 0x02: "PS" , # Program Service name 0x06: "PIN" , # Program Item Number 0x04: "DI_PTYI" , # Decoder Information / Dynamic PTY Indicator 0x03: "TA_TP" , # Traffic Anouncement / Traffic Programme 0x05: "MS" , # Music / Speech switch 0x07: "PTY" , # Programme Type 0x3E: "PTYN" , # Programme Type Name 0x0A: "RT" , # RadioText 0x0D: "RTC" , # Real Time Clock 0x19: "CT" , # Enable RTC (group 4 version A) transmits 0x1C: "DSN_SELECT" , # Set active Data Set 0x0B: "PSN_ENABLE" , # Enable / disable a specific PSN 0x1E: "RDSON" , # Enable / disable the RDS output signal 0x23: "SET_SITE_ADDR" , # Add a site address to the encoder 0x27: "SET_ENC_ADDR" , # Add an encoder address to the encoder 0x17: "MSG_REQUEST" , # Request data from the encoder 0x18: "MSG_ACK" , # ACK of a received message 0x2C: "SET_COMM_MODE" } # Set communication mode for the encoder def usage(): print("Usage:\n{} ".format(sys.argv[0])) if len(sys.argv) < 2: usage() sys.exit(1) class UECP_Message_Decoder(): def __init__(self, message_bytes): """Create a message decoder that creates Message Elements from a message""" self.message = message_bytes self.mec = message_bytes[0] if not(0x01 <= self.mec <= 0xFD): raise ValueError("MEC 0x{:02x} out of range".format(self.mec)) if self.mec in uecp_mec_names: name = uecp_mec_names[self.mec] log(" MEC={}".format(name)) if name == "PS": dsn = message_bytes[1] psn = message_bytes[2] ps = message_bytes[3:-1] log(" PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps)) elif name == "RT": dsn = message_bytes[1] psn = message_bytes[2] mel = message_bytes[3] med = message_bytes[4:] buffer_config = (med[0] & 0b01100000) >> 5 number_of_tx = (med[0] & 0b00011110) >> 1 toggle_a_b_flag = (med[0] & 0b00000001) radiotext = "".join(chr(d) for d in med[1:]) log(" RT DSN={:02x} PSN={:02x} MEL={:02}, ".format(dsn, psn, mel) + radiotext) print(radiotext) else: log(" MEC={}".format(self.mec)) class UECP_Frame_Decoder(): def __init__(self): """Create a new decoder from bytes containing a complete UECP frame, from STA to STO. See 2.2.1 General Frame Format""" self.next_untrap = False self.message_begin_seen = False self.data = [] def add_byte(self, b): """Unescapes characters as defined in 2.2.9 Returns True if more data is needed""" if self.next_untrap: self.next_untrap = False if b == 0x00: self.data.append(0xFD) elif b == 0x01: self.data.append(0xFE) elif b == 0x02: self.data.append(0xFF) else: raise ValueError("untrap 0x{:02x} invalid".format(b)) elif b == 0xFD and self.message_begin_seen: self.next_untrap = True elif b == 0xFE and not self.message_begin_seen: self.message_begin_seen = True self.data.append(b) elif b == 0xFF and self.message_begin_seen: # end of message self.data.append(b) self.decode_frame() return False elif self.message_begin_seen: self.data.append(b) else: #log("Dropping 0x{:02x}".format(b)) pass return True def check_crc(self): if 0: log("B " + " ".join("{:02x}".format(b) for b in self.data)) for i in range(1, len(self.data)): log(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i, self.crc, 0xffff ^ crc.crc16(self.data[1:i]), crc.crc_ccitt(self.data[1:i]))) self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl]) return self.crc_calc == self.crc def decode_frame(self): """Decodes an untrapped frame""" if verbose: log("Decoding " + " ".join("{:02x}".format(b) for b in self.data)) self.addr = self.data[1] * 256 + self.data[2] self.sqc = self.data[3] self.mfl = self.data[4] self.msg = self.data[5:5+self.mfl] if self.mfl + 8 != len(self.data): raise ValueError("MFL {}, expected {}".format(self.mfl, len(self.data) - 8)) self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1] self.crc_ok = self.check_crc() UECP_Message_Decoder(self.msg) uecp = UECP_Frame_Decoder() def parse_anc_bytes(anc_bytes): global uecp for b in anc_bytes: need_more_data = uecp.add_byte(b) if not need_more_data: uecp.decode_frame() log("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format( uecp.addr, uecp.sqc, uecp.mfl, uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok)) uecp = UECP_Frame_Decoder() UDP_IP = "127.0.0.1" port = int(sys.argv[1]) sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.bind((UDP_IP, port)) while True: data, addr = sock.recvfrom(1024) if not data: break line_bytes = list(data) line_bytes.reverse() anc_header = line_bytes[0] anc_bytes = None if anc_header == 0xfd: anc_len = line_bytes[1] if anc_len > 0: anc_bytes = line_bytes[2:2+anc_len] elif anc_header != 0x00: anc_len = line_bytes[0] if anc_len > 0: anc_bytes = line_bytes[1:1+anc_len] if anc_bytes: try: parse_anc_bytes(anc_bytes) except ValueError as e: log("***** Error on line {}: {}".format(line_nr+1, e)) uecp = UECP_Frame_Decoder()