#!/usr/bin/env python # # This is an somewhat ugly script whose main # purpose is to show TII information that the # DAB752 display doesn't show. # # Copyright 2015 Matthias P. Braendli # http://opendigitalradio.org # # MIT Licence # Please see LICENCE for full text import sys import serial PACKETRETRY = 3 MAXDATLEN = 128 CRCPOLY = 0x810 CRCINIT = 0xFFFF TX_ACKTIMEOUT = 3 #sec RX_BYTETIMEOUT = 1 #sec STX_header = 0x02 SOH_header = 0x01 ETB_header = 0x17 ETX_header = 0x03 ACK_header = 0x06 NAK_header = 0x15 def crc_initial(c): crc = 0 c = c << 8 for j in range(8): if (crc ^ c) & 0x8000: crc = (crc << 1) ^ CRCPOLY else: crc = crc << 1 c = c << 1 crc = crc & 0xFFFF return crc crc_table = [ crc_initial(i) for i in range(256) ] def get_crc(data): crc = 0 for d in data + [0xff, 0xff]: if d < 0 or d > 255: raise ValueError("data value out of bounds") crc_h = crc >> 8 crc = ((crc << 8) | d) & 0xFFFF crc = crc ^ crc_table[crc_h] return [crc >> 8, crc & 0xff] def check_crc(data): return get_crc(data[:-2]) == data[-2:] class LLCP: def __init__(self, port): self.packet_id = 0 self.serial = serial.Serial(port, 19200, timeout=0.2) def _tx_data(self, data): self.serial.write(data) self.receive_packet() def receive_packet(self): packet = [] header = self.serial.read() if not header: return [] header = ord(header) packet.append(header) if header == SOH_header or \ header == STX_header or \ header == ETB_header or \ header == ETX_header or \ header == ACK_header or \ header == NAK_header: packet_id = self.serial.read() if not packet_id: print("packet_id timeout") return [] packet_id = ord(packet_id) packet.append(packet_id) if header != ACK_header and header != NAK_header: length = self.serial.read() if not length: print("length timeout") return [] length = ord(length) if length > MAXDATLEN: print("length invalid") return [] packet.append(length) data = self.serial.read(length) if len(data) < length: print("data length too short") return [] packet.extend(data) crch, crcl = self.serial.read(2) if not crch or not crcl: print("crc timeout") return [] packet.append(crch) packet.append(crcl) if header == SOH_header: print("SOH") if packet[3] == 0: print("Warning: is a syntax error!") if header == ETB_header: print("ETB") if header == ETX_header: print("ETX") if header == NAK_header: print("NAK") if check_crc(packet): if header != ACK_header and header != NAK_header: ack_data = [ACK_header, packet_id] ack_data.extend(get_crc(ack_data)) self._tx_data(ack_data) else: print("RX CRC INVALID") if header != ACK_header and header != NAK_header: print("Send NAK") ack_data = [NAK_header, packet_id] ack_data.extend(get_crc(ack_data)) self._tx_data(ack_data) return packet def transmit(self, message): self.packet_id += 1 data = [] if len(message) < MAXDATLEN: data.append(STX_header) data.append(self.packet_id) data.append(len(message)) data.extend(message) data.extend(get_crc(data)) self._tx_data(data) else: h = SOH_header sent_bytes = 0 while sent_bytes < len(message): message_part = message[sent_bytes:sent_bytes+MAXDATLEN] data.append(h) data.append(self.packet_id) data.append(len(message_part)) data.extend(message_part) data.extend(get_crc(data)) self._tx_data(data) sent_bytes += len(message_part) h = ETB_header if len(message) - sent_bytes < MAXDATLEN: h = ETX_header llcp = LLCP("/dev/ttyUSB0") LnkRegister = 129 GetComponentList = 39 GetCurrentFrequency = 41 GetNrOfTII = 50 GetTII = 61 SetSubscription = 148 TIIListChanged = 204 TIIVectorsChanged = 205 # Send a Link Register On message print("Send LnkRegister") rx = [] for i in reversed(range(5)): llcp.transmit([LnkRegister, 1]) rx = llcp.receive_packet() print(i) if rx: break if not rx: print("No LnkRegister acknowledgement") sys.exit(1) print("Send GetCurrentFrequency") llcp.transmit([GetCurrentFrequency]) packet = llcp.receive_packet() print("Frequency %d kHz" % (packet[4] << 16 | packet[5] << 8 | packet[6])) print() print("Send a GetNrOfTII") llcp.transmit([GetNrOfTII]) packet = llcp.receive_packet() print("Nr of TII: %d" % packet[4]) print() print("Subscribe to TII info") llcp.transmit([SetSubscription, TIIListChanged, 1]) llcp.transmit([SetSubscription, TIIVectorsChanged, 1]) print() while True: try: packet = llcp.receive_packet() if packet: if packet[3] == TIIVectorsChanged: if packet[4] == 3: print("TII Vectors: no valid data") elif packet[4] == 0: print("TII Vectors") nroftii = packet[5] ix = 6 for i in range(nroftii): real_part = packet[ix+2] if real_part > 127: real_part = real_part - 256 imag_part = packet[ix+3] if imag_part > 127: imag_part = imag_part - 256 print(" pattern=%d, comb=%d, %d + %d j" % (packet[ix], packet[ix+1], real_part, imag_part)) ix += 4 else: print("TII Vectors: invalid packet") elif packet[3] == TIIListChanged: if packet[4] == 3: print("TII List: no valid data") elif packet[4] == 0: print("TII List:") nroftii = packet[5] ix = 6 for i in range(nroftii): print(" pattern=%d, comb=%d" % (packet[ix], packet[ix+1])) ix += 2 else: print("TII List: invalid packet") else: print("RX %r" % packet) except KeyboardInterrupt: break print("Send a LnkRegister Off") llcp.transmit([LnkRegister, 0]) packet = llcp.receive_packet() print()