aboutsummaryrefslogtreecommitdiffstats
path: root/uecpparse/uecp_parse.py
blob: cdfccd5722770e3c562539f82254d099360bce24 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (c) 2016 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
#    The above copyright notice and this permission notice shall be included in
#    all copies or substantial portions of the Software.
#
#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
#    THE SOFTWARE.
#
#
# A UECP decoder, reading from a file generated by
# my patched VLC, from ancillary MP2 bytes in a MPEG-TS
#
# Please refer to RDS UECP specification

import sys
import struct
import crc

# Message Element Codes
uecp_mec_names = {
        0x01: "PI"             , # Program Identification
        0x02: "PS"             , # Program Service name
        0x06: "PIN"            , # Program Item Number
        0x04: "DI_PTYI"        , # Decoder Information / Dynamic PTY Indicator
        0x03: "TA_TP"          , # Traffic Anouncement / Traffic Programme
        0x05: "MS"             , # Music / Speech switch
        0x07: "PTY"            , # Programme Type
        0x3E: "PTYN"           , # Programme Type Name
        0x0A: "RT"             , # RadioText
        0x0D: "RTC"            , # Real Time Clock
        0x19: "CT"             , # Enable RTC (group 4 version A) transmits
        0x1C: "DSN_SELECT"     , # Set active Data Set
        0x0B: "PSN_ENABLE"     , # Enable / disable a specific PSN
        0x1E: "RDSON"          , # Enable / disable the RDS output signal
        0x23: "SET_SITE_ADDR"  , # Add a site address to the encoder
        0x27: "SET_ENC_ADDR"   , # Add an encoder address to the encoder
        0x17: "MSG_REQUEST"    , # Request data from the encoder
        0x18: "MSG_ACK"        , # ACK of a received message
        0x2C: "SET_COMM_MODE"  } # Set communication mode for the encoder

def usage():
    print("Usage:\n{} <filename>".format(sys.argv[0]))

if len(sys.argv) < 2:
    usage()
    sys.exit(1)

class UECP_Message_Decoder():
    def __init__(self, message_bytes):
        """Create a message decoder that creates
        Message Elements from a message"""
        self.message = message_bytes
        self.mec = message_bytes[0]
        assert(0x01 <= self.mec <= 0xFD)

        if self.mec in uecp_mec_names:
            name = uecp_mec_names[self.mec]
            print("  MEC={}".format(name))
            if name == "PS":
                dsn = message_bytes[1]
                psn = message_bytes[2]
                ps  = message_bytes[3:-1]
                print("    PS DSN={:02x} PSN={:02x} ".format(dsn, psn) + "".join(chr(d) for d in ps))
            elif name == "RT":
                dsn = message_bytes[1]
                psn = message_bytes[2]
                mel = message_bytes[3]
                med = message_bytes[4:-1]
                print("    RT DSN={:02x} PSN={:02x} MEL={:02} ".format(dsn, psn, mel) + " ".join("{:02x}".format(d) for d in med))

        else:
            print("  MEC={}".format(self.mec))

class UECP_Frame_Decoder():
    def __init__(self, uecp_bytes):
        """Create a new decoder from bytes containing a
        complete UECP frame, from STA to STO.
        See 2.2.1 General Frame Format"""
        assert(uecp_bytes[0] == 0xfe)
        assert(uecp_bytes[-1] == 0xff)
        self.trapped_data = uecp_bytes
        self.untrap()
        self.decode_frame()

    def untrap(self):
        """Unescapes characters as defined in 2.2.9"""
        self.data = [0xfe]

        next_untrap = False
        for b in self.trapped_data[1:-1]:
            if next_untrap:
                next_untrap = False
                if b == 0x00:
                    self.data.append(0xFD)
                elif b == 0x01:
                    self.data.append(0xFE)
                elif b == 0x02:
                    self.data.append(0xFF)
                else:
                    raise ValueError("untrap {} invalid".format(b))
            elif b == 0xFD:
                next_untrap = True
            elif b == 0xFE or b == 0xFF:
                raise ValueError("untrapped data {} invalid".format(b))
            else:
                self.data.append(b)
        self.data.append(0xff)

    def check_crc(self):
        if 0:
            print("B " + " ".join("{:02x}".format(b) for b in self.data))
            for i in range(1, len(self.data)):
                print(" i={:02} crc=0x{:04x} calc1=0x{:04x} calc2=0x{:04x}".format(i,
                    self.crc,
                    0xffff ^ crc.crc16(self.data[1:i]),
                    crc.crc_ccitt(self.data[1:i])))

        self.crc_calc = crc.crc_ccitt(self.data[1:4+self.mfl])
        return self.crc_calc == self.crc

    def decode_frame(self):
        """Decodes an untrapped frame"""
        self.addr = self.data[1] * 256 + self.data[2]
        self.sqc  = self.data[3]
        self.mfl  = self.data[4]
        self.msg  = self.data[5:5+self.mfl]
        assert(self.mfl + 8 == len(self.data))
        self.crc  = self.data[5+self.mfl] * 256 + self.data[5+self.mfl+1]
        self.crc_ok = self.check_crc()

        UECP_Message_Decoder(self.msg)

in_fd = open(sys.argv[1], "r")
for line in in_fd:
    line_bytes = [int(i, 16) for i in line.split()]

    line_bytes.reverse()

    anc_header = line_bytes[0]
    if anc_header == 0xfd:
        anc_len = line_bytes[1]
        if anc_len > 0:
            anc_bytes = line_bytes[2:2+anc_len]

            if anc_bytes[-1] == 0xff:
                uecp = UECP_Frame_Decoder(anc_bytes)

                print("Addr={} sqc={} mfl={} msg={} crc=0x{:04x} crc_calc=0x{:04x} ok={}".format(
                    uecp.addr, uecp.sqc, uecp.mfl,
                    uecp.msg, uecp.crc, uecp.crc_calc, uecp.crc_ok))