1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (c) 2016 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#
# A UECP decoder, reading from a UDP socket, receiving
# data generated by my patched VLC, from ancillary MP2 bytes in a MPEG-TS
#
# Please refer to RDS UECP specification
import sys
import socket
import struct
import crc
verbose=False
def log(msg):
if verbose:
print(msg)
# Message Element Codes
uecp_mec_names = {
0x01: "PI" , # Program Identification
0x02: "PS" , # Program Service name
0x06: "PIN" , # Program Item Number
0x04: "DI_PTYI" , # Decoder Information / Dynamic PTY Indicator
0x03: "TA_TP" , # Traffic Anouncement / Traffic Programme
0x05: "MS" , # Music / Speech switch
0x07: "PTY" , # Programme Type
0x3E: "PTYN" , # Programme Type Name
0x0A: "RT" , # RadioText
0x0D: "RTC" , # Real Time Clock
0x19: "CT" , # Enable RTC (group 4 version A) transmits
0x1C: "DSN_SELECT" , # Set active Data Set
0x0B: "PSN_ENABLE" , # Enable / disable a specific PSN
0x1E: "RDSON" , # Enable / disable the RDS output signal
0x23: "SET_SITE_ADDR" , # Add a site address to the encoder
0x27: "SET_ENC_ADDR" , # Add an encoder address to the encoder
0x17: "MSG_REQUEST" , # Request data from the encoder
0x18: "MSG_ACK" , # ACK of a received message
0x2C: "SET_COMM_MODE" } # Set communication mode for the encoder
def usage():
print("Usage:\n{} <port>".format(sys.argv[0]))
if len(sys.argv) < 2:
usage()
sys.exit(1)
class UECP_Message_Decoder():
def __init__(self, message_bytes):
"""Create a message decoder that creates
Message Elements from a message"""
self.message = message_bytes
self.mec = message_bytes[0]
if not(0x01 <= self.mec <= 0xFD):
raise ValueError("MEC 0x{:02x} out of range".format(self.mec))
if self.mec in uecp_mec_names:
name = uecp_mec_names[self.mec]
log(" MEC={}".format(name))
if name == "PS":
dsn = message_bytes[1]
psn = message_bytes[2]
ps = message_bytes[3:-1]
log(" PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps))
elif name == "RT":
dsn = message_bytes[1]
psn = message_bytes[2]
mel = message_bytes[3]
med = message_bytes[4:]
buffer_config = (med[0] & 0b01100000) >> 5
number_of_tx = (med[0] & 0b00011110) >> 1
toggle_a_b_flag = (med[0] & 0b00000001)
radiotext = "".join(chr(d) for d in med[1:])
log(" RT DSN={:02x} PSN={:02x} MEL={:02}, ".format(dsn, psn, mel) + radiotext)
print(radiotext)
else:
log(" MEC={}".format(self.mec))
class UECP_Frame_Decoder():
def __init__(self):
"""Create a new decoder from bytes containing a
complete UECP frame, from STA to STO.
See 2.2.1 General Frame Format"""
self.next_untrap = False
self.message_begin_seen = False
self.data = []
def add_byte(self, b):
"""Unescapes characters as defined in 2.2.9
Returns True if more data is needed"""
if self.next_untrap:
self.next_untrap = False
if b == 0x00:
self.data.append(0xFD)
elif b == 0x01:
self.data.append(0xFE)
elif b == 0x02:
self.data.append(0xFF)
else:
raise ValueError("untrap 0x{:02x} invalid".format(b))
elif b == 0xFD and self.message_begin_seen:
self.next_untrap = True
elif b == 0xFE and not self.message_begin_seen:
self.message_begin_seen = True
self.data.append(b)
elif b == 0xFF and self.message_begin_seen:
# end of message
self.data.append(b)
self.decode_frame()
return False
elif self.message_begin_seen:
self.data.append(b)
else:
#log("Dropping 0x{:02x}".format(b))
pass
return True
def check_crc(self):
if 0:
log("B " + " ".join("{:02x}".format(b) for b in self.data))
for i in range(1, len(self.data)):
log(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i,
self.crc,
0xffff ^ crc.crc16(self.data[1:i]),
crc.crc_ccitt(self.data[1:i])))
self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl])
return self.crc_calc == self.crc
def decode_frame(self):
"""Decodes an untrapped frame"""
if verbose:
log("Decoding " + " ".join("{:02x}".format(b) for b in self.data))
self.addr = self.data[1] * 256 + self.data[2]
self.sqc = self.data[3]
self.mfl = self.data[4]
self.msg = self.data[5:5+self.mfl]
if self.mfl + 8 != len(self.data):
raise ValueError("MFL {}, expected {}".format(self.mfl, len(self.data) - 8))
self.crc = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1]
self.crc_ok = self.check_crc()
UECP_Message_Decoder(self.msg)
uecp = UECP_Frame_Decoder()
def parse_anc_bytes(anc_bytes):
global uecp
for b in anc_bytes:
need_more_data = uecp.add_byte(b)
if not need_more_data:
uecp.decode_frame()
log("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
uecp.addr, uecp.sqc, uecp.mfl,
uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))
uecp = UECP_Frame_Decoder()
UDP_IP = "127.0.0.1"
port = int(sys.argv[1])
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, port))
while True:
data, addr = sock.recvfrom(1024)
if not data:
break
line_bytes = list(data)
line_bytes.reverse()
anc_header = line_bytes[0]
anc_bytes = None
if anc_header == 0xfd:
anc_len = line_bytes[1]
if anc_len > 0:
anc_bytes = line_bytes[2:2+anc_len]
elif anc_header != 0x00:
anc_len = line_bytes[0]
if anc_len > 0:
anc_bytes = line_bytes[1:1+anc_len]
if anc_bytes:
try:
parse_anc_bytes(anc_bytes)
except ValueError as e:
log("***** Error on line {}: {}".format(line_nr+1, e))
uecp = UECP_Frame_Decoder()
|