aboutsummaryrefslogtreecommitdiffstats
path: root/uecpparse/uecp_parse.py
blob: fad580ea9caa8ebf552f3d08929efd8752f16838 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (c) 2016 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
#    The above copyright notice and this permission notice shall be included in
#    all copies or substantial portions of the Software.
#
#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
#    THE SOFTWARE.
#
#
# A UECP decoder, reading from a UDP socket, receiving
# data generated by my patched VLC, from ancillary MP2 bytes in a MPEG-TS
#
# Please refer to RDS UECP specification

import sys
import socket
import struct
import crc

verbose=False

def log(msg):
    if verbose:
        print(msg)

# Message Element Codes
uecp_mec_names = {
        0x01: "PI"             , # Program Identification
        0x02: "PS"             , # Program Service name
        0x06: "PIN"            , # Program Item Number
        0x04: "DI_PTYI"        , # Decoder Information / Dynamic PTY Indicator
        0x03: "TA_TP"          , # Traffic Anouncement / Traffic Programme
        0x05: "MS"             , # Music / Speech switch
        0x07: "PTY"            , # Programme Type
        0x3E: "PTYN"           , # Programme Type Name
        0x0A: "RT"             , # RadioText
        0x0D: "RTC"            , # Real Time Clock
        0x19: "CT"             , # Enable RTC (group 4 version A) transmits
        0x1C: "DSN_SELECT"     , # Set active Data Set
        0x0B: "PSN_ENABLE"     , # Enable / disable a specific PSN
        0x1E: "RDSON"          , # Enable / disable the RDS output signal
        0x23: "SET_SITE_ADDR"  , # Add a site address to the encoder
        0x27: "SET_ENC_ADDR"   , # Add an encoder address to the encoder
        0x17: "MSG_REQUEST"    , # Request data from the encoder
        0x18: "MSG_ACK"        , # ACK of a received message
        0x2C: "SET_COMM_MODE"  } # Set communication mode for the encoder

def usage():
    print("Usage:\n{} <port>".format(sys.argv[0]))

if len(sys.argv) < 2:
    usage()
    sys.exit(1)

class UECP_Message_Decoder():
    def __init__(self, message_bytes):
        """Create a message decoder that creates
        Message Elements from a message"""
        self.message = message_bytes
        self.mec = message_bytes[0]
        if not(0x01 <= self.mec <= 0xFD):
            raise ValueError("MEC 0x{:02x} out of range".format(self.mec))

        if self.mec in uecp_mec_names:
            name = uecp_mec_names[self.mec]
            log("  MEC={}".format(name))
            if name == "PS":
                dsn = message_bytes[1]
                psn = message_bytes[2]
                ps  = message_bytes[3:-1]
                log("    PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps))
            elif name == "RT":
                dsn = message_bytes[1]
                psn = message_bytes[2]
                mel = message_bytes[3]
                med = message_bytes[4:]
                buffer_config = (med[0] & 0b01100000) >> 5
                number_of_tx = (med[0] & 0b00011110) >> 1
                toggle_a_b_flag = (med[0] & 0b00000001)
                radiotext = "".join(chr(d) for d in med[1:])
                log("    RT DSN={:02x} PSN={:02x} MEL={:02}, ".format(dsn, psn, mel) + radiotext)
                print(radiotext)

        else:
            log("  MEC={}".format(self.mec))

class UECP_Frame_Decoder():
    def __init__(self):
        """Create a new decoder from bytes containing a
        complete UECP frame, from STA to STO.
        See 2.2.1 General Frame Format"""
        self.next_untrap = False
        self.message_begin_seen = False
        self.data = []

    def add_byte(self, b):
        """Unescapes characters as defined in 2.2.9

        Returns True if more data is needed"""
        if self.next_untrap:
            self.next_untrap = False
            if b == 0x00:
                self.data.append(0xFD)
            elif b == 0x01:
                self.data.append(0xFE)
            elif b == 0x02:
                self.data.append(0xFF)
            else:
                raise ValueError("untrap 0x{:02x} invalid".format(b))
        elif b == 0xFD and self.message_begin_seen:
            self.next_untrap = True
        elif b == 0xFE and not self.message_begin_seen:
            self.message_begin_seen = True
            self.data.append(b)
        elif b == 0xFF and self.message_begin_seen:
            # end of message
            self.data.append(b)
            self.decode_frame()
            return False
        elif self.message_begin_seen:
            self.data.append(b)
        else:
            #log("Dropping 0x{:02x}".format(b))
            pass
        return True

    def check_crc(self):
        if 0:
            log("B " + " ".join("{:02x}".format(b) for b in self.data))
            for i in range(1, len(self.data)):
                log(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i,
                    self.crc,
                    0xffff ^ crc.crc16(self.data[1:i]),
                    crc.crc_ccitt(self.data[1:i])))

        self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl])
        return self.crc_calc == self.crc

    def decode_frame(self):
        """Decodes an untrapped frame"""

        if verbose:
            log("Decoding " + " ".join("{:02x}".format(b) for b in self.data))

        self.addr = self.data[1] * 256 + self.data[2]
        self.sqc  = self.data[3]
        self.mfl  = self.data[4]
        self.msg  = self.data[5:5+self.mfl]
        if self.mfl + 8 != len(self.data):
            raise ValueError("MFL {}, expected {}".format(self.mfl, len(self.data) - 8))
        self.crc  = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1]
        self.crc_ok = self.check_crc()

        UECP_Message_Decoder(self.msg)

uecp = UECP_Frame_Decoder()

def parse_anc_bytes(anc_bytes):
    global uecp
    for b in anc_bytes:

        need_more_data = uecp.add_byte(b)

        if not need_more_data:
            uecp.decode_frame()
            log("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
                uecp.addr, uecp.sqc, uecp.mfl,
                uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))
            uecp = UECP_Frame_Decoder()

UDP_IP = "127.0.0.1"
port = int(sys.argv[1])

sock = socket.socket(socket.AF_INET, # Internet
                     socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, port))


while True:
    data, addr = sock.recvfrom(1024)

    if not data:
        break

    line_bytes = list(data)

    line_bytes.reverse()

    anc_header = line_bytes[0]
    anc_bytes = None

    if anc_header == 0xfd:
        anc_len = line_bytes[1]
        if anc_len > 0:
            anc_bytes = line_bytes[2:2+anc_len]
    elif anc_header != 0x00:
        anc_len = line_bytes[0]
        if anc_len > 0:
            anc_bytes = line_bytes[1:1+anc_len]

    if anc_bytes:
        try:
            parse_anc_bytes(anc_bytes)
        except ValueError as e:
            log("***** Error on line {}: {}".format(line_nr+1, e))
            uecp = UECP_Frame_Decoder()