#!/usr/bin/env python import sys import serial PACKETRETRY = 3 MAXDATLEN = 128 CRCPOLY = 0x810 CRCINIT = 0xFFFF TX_ACKTIMEOUT = 3 #sec RX_BYTETIMEOUT = 1 #sec STX_header = 0x02 SOH_header = 0x01 ETB_header = 0x17 ETX_header = 0x03 ACK_header = 0x06 NAK_header = 0x15 def crc_initial(c): crc = 0 c = c << 8 for j in range(8): if (crc ^ c) & 0x8000: crc = (crc << 1) ^ CRCPOLY else: crc = crc << 1 c = c << 1 crc = crc & 0xFFFF return crc crc_table = [ crc_initial(i) for i in range(256) ] def get_crc(data): crc = 0 for d in data + [0xff, 0xff]: if d < 0 or d > 255: raise ValueError("data value out of bounds") crc_h = crc >> 8 crc = ((crc << 8) | d) & 0xFFFF crc = crc ^ crc_table[crc_h] return [crc >> 8, crc & 0xff] def check_crc(data): return get_crc(data[:-2]) == data[-2:] class LLCP: def __init__(self, port): self.packet_id = 0 self.serial = serial.Serial(port, 19200, timeout=0.5) def _tx_data(self, data): print("TX %r" % data) self.serial.write(data) self.receive_packet() def receive_packet(self): packet = [] header = self.serial.read() if not header: print("RX timeout") return [] header = ord(header) packet.append(header) if header == SOH_header or \ header == STX_header or \ header == ETB_header or \ header == ETX_header or \ header == ACK_header or \ header == NAK_header: packet_id = self.serial.read() if not packet_id: print("packet_id timeout") return [] packet_id = ord(packet_id) packet.append(packet_id) if header != ACK_header and header != NAK_header: length = self.serial.read() if not length: print("length timeout") return [] length = ord(length) if length > MAXDATLEN: print("length invalid") return [] packet.append(length) data = self.serial.read(length) if len(data) < length: print("data length too short") return [] packet.extend(data) crch, crcl = self.serial.read(2) if not crch or not crcl: print("crc timeout") return [] packet.append(crch) packet.append(crcl) if header == SOH_header: print("SOH") if header == STX_header: print("STX") if header == ETB_header: print("ETB") if header == ETX_header: print("ETX") if header == ACK_header: print("ACK") if header == NAK_header: print("NAK") if check_crc(packet): print("RX CRC Valid") if header != ACK_header and header != NAK_header: print("Send ACK") ack_data = [ACK_header, packet_id] ack_data.extend(get_crc(ack_data)) self._tx_data(ack_data) else: print("RX CRC INVALID") if header != ACK_header and header != NAK_header: print("Send NAK") ack_data = [NAK_header, packet_id] ack_data.extend(get_crc(ack_data)) self._tx_data(ack_data) print("RX %r" % packet) return packet def transmit(self, message): self.packet_id += 1 data = [] if len(message) < MAXDATLEN: data.append(STX_header) data.append(self.packet_id) data.append(len(message)) data.extend(message) data.extend(get_crc(data)) self._tx_data(data) else: h = SOH_header sent_bytes = 0 while sent_bytes < len(message): message_part = message[sent_bytes:sent_bytes+MAXDATLEN] data.append(h) data.append(self.packet_id) data.append(len(message_part)) data.extend(message_part) data.extend(get_crc(data)) self._tx_data(data) sent_bytes += len(message_part) h = ETB_header if len(message) - sent_bytes < MAXDATLEN: h = ETX_header llcp = LLCP("/dev/ttyUSB0") LnkRegister = 129 GetComponentList = 39 GetCurrentFrequency = 41 # Send a Link Register On message print("Send a LnkRegister") for i in range(4): llcp.transmit([LnkRegister, 1]) rx = llcp.receive_packet() print() if rx: break print("Send invalid opcode") llcp.transmit([78]) llcp.receive_packet() print() print("Send GetComponentList") llcp.transmit([GetComponentList]) llcp.receive_packet() print() print("Send GetCurrentFrequency") llcp.transmit([GetCurrentFrequency]) llcp.receive_packet() print() print("Send a LnkRegister Off") llcp.transmit([LnkRegister, 0]) llcp.receive_packet() print()