aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2023-11-11 15:03:16 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2023-11-11 15:03:16 +0100
commitdc576f70a68d87b99b35be8619d26316e5874f14 (patch)
treeb2311994c14777c4099c36376b59701cd108c777 /python
parenta3799852467cfe67f84767c958ac6c49c80ddd25 (diff)
downloadlora-aprs-hb9egm-main.tar.gz
lora-aprs-hb9egm-main.tar.bz2
lora-aprs-hb9egm-main.zip
igate.py: add console UIHEADmain
Diffstat (limited to 'python')
-rwxr-xr-xpython/igate.py216
1 files changed, 162 insertions, 54 deletions
diff --git a/python/igate.py b/python/igate.py
index 0637e42..ccadf82 100755
--- a/python/igate.py
+++ b/python/igate.py
@@ -10,9 +10,11 @@
# on http://aprs-is.net/SendOnlyPorts.aspx
#
import datetime
+import json
import socket
import struct
import time
+import traceback
import requests
import busio
@@ -21,6 +23,15 @@ import board
# Import the RFM9x radio module.
import adafruit_rfm9x
+CLEAR_DISPLAY = "\x1b[2J"
+RED = "\x1b[31m"
+RED_BRIGHT = "\x1b[1;31m"
+GREEN = "\x1b[32m"
+GREEN_BRIGHT = "\x1b[1;32m"
+CLEAR = "\x1b[0m"
+
+HEADER = f"{CLEAR_DISPLAY}{GREEN}HB9EGM LoRa APRS i-Gate{CLEAR}"
+
# For APRS-IS
APRS_IS_SERVER = "france.aprs2.net"
APRS_IS_PORT = 8080
@@ -63,6 +74,40 @@ def send_frame_to_aprs_is(frame):
#print(f"{r.status_code} {r.content.decode()}")
r.raise_for_status()
+# Contains dicts with `timestamp` and `msg`
+packet_history = []
+errors = []
+
+# Display has limited number of lines
+def update_display():
+ NUM_LINES = 16
+ disp_lines = [HEADER]
+
+ while len(errors) > 3:
+ errors.pop(0)
+
+ errors.sort(key=lambda x: x['timestamp'], reverse=True)
+
+ while len(errors) + len(packet_history) > NUM_LINES-2:
+ packet_history.pop(0)
+
+ packet_history.sort(key=lambda x: x['timestamp'], reverse=True)
+
+ for p in packet_history:
+ ts = p['timestamp'].isoformat()
+ disp_lines.append(f"{GREEN}{ts}{CLEAR} {p['rssi']} {GREEN_BRIGHT}{p['callsign']}{CLEAR} {p['msg']}")
+
+ if errors:
+ disp_lines.append(f"{RED}Errors:{CLEAR}")
+ for p in errors:
+ ts = p['timestamp'].isoformat()
+ disp_lines.append(f"{GREEN}{ts}{CLEAR} {RED_BRIGHT}{p['msg']}{CLEAR}")
+
+ while len(disp_lines) < NUM_LINES:
+ disp_lines.append("")
+
+ print("\n".join(disp_lines))
+
led_yellow = DigitalInOut(board.D23)
led_yellow.direction = Direction.OUTPUT
@@ -79,6 +124,7 @@ def update_leds():
led_yellow.value = (led_counter & 0x2) != 0
update_leds()
+update_display()
cs = DigitalInOut(board.CE1)
reset = DigitalInOut(board.D25)
@@ -89,64 +135,126 @@ baudrate = 5000000
last_beacon_time = time.time()
# see https://docs.circuitpython.org/projects/rfm9x/en/latest/
-try:
- rfm9x = adafruit_rfm9x.RFM9x(spi, cs, reset, 433.775, baudrate=baudrate, preamble_length=8)
- rfm9x.signal_bandwidth = 125000
- rfm9x.coding_rate = 5
- #rfm9x.tx_power = 23 # default 13
-
- rfm9x.spreading_factor = 12
-
- # Section 4.1.1.6 - Set low datarate automatically based on BW and SF
- symbolDuration = 1000 / ( rfm9x.signal_bandwidth / (1 << rfm9x.spreading_factor) )
- if symbolDuration > 16:
- rfm9x.low_datarate_optimize = 1
- else:
- rfm9x.low_datarate_optimize = 0
-
- print("Waiting for packets...")
- while True:
- packet = rfm9x.receive(timeout=4, with_header=True)
- if packet is not None:
+rfm9x = adafruit_rfm9x.RFM9x(spi, cs, reset, 433.775, baudrate=baudrate, preamble_length=8)
+rfm9x.signal_bandwidth = 125000
+rfm9x.coding_rate = 5
+#rfm9x.tx_power = 23 # default 13
+
+rfm9x.spreading_factor = 12
+
+# Section 4.1.1.6 - Set low datarate automatically based on BW and SF
+symbolDuration = 1000 / ( rfm9x.signal_bandwidth / (1 << rfm9x.spreading_factor) )
+if symbolDuration > 16:
+ rfm9x.low_datarate_optimize = 1
+else:
+ rfm9x.low_datarate_optimize = 0
+
+while True:
+ packet = rfm9x.receive(timeout=4, with_header=True)
+ if packet is not None:
+ now = datetime.datetime.now()
+
+ try:
packet = bytes(packet) # otherwise it's a bytearray
- print("{}: {}".format(datetime.datetime.now(), packet))
+ #print("{}: {}".format(now, packet))
#print(" {}".format(" ".join(hex(x) for x in packet)))
- if b":!" in packet:
- try:
- print(decode_encoded_position_report(packet))
- except Exception as e:
- print(f"Failed to decode packet: {e}")
-
# :! are compressed position reports
# :> are messages
# := are uncompressed position reports
- if any(pattern in packet for pattern in [b":!", b":>", b":="]):
- try:
- # Prefix igate path
- path, _, message = packet[3:].partition(b":")
- new_frame = path + b",qAO," + CALLSIGN.encode() + b":" + message + b"\n"
- send_frame_to_aprs_is(new_frame)
- except Exception as ex:
- print("Error sending position report to APRS-IS")
- print(ex)
-
- print(" RSSI: {}".format(rfm9x.last_rssi))
- print()
-
- update_leds()
-
- now = time.time()
-
- if last_beacon_time + 1800 < now:
- last_beacon_time = now
- packet = f"{CALLSIGN}>{APRS_DEVICEID},qAC:={LATITUDE}L{LONGITUDE}&{COMMENT}"
- try:
- send_frame_to_aprs_is(packet.encode())
- except Exception as ex:
- print("Error sending beacon to APRS-IS")
- print(ex)
-
-except RuntimeError as error:
- print(f'RFM9x Error: {error}')
+
+ callsign = None
+ path = None
+
+ if packet.startswith(b'<\xff\x01'):
+ ix = packet.index(b">")
+ callsign = packet[3:ix].decode()
+ ix2 = packet.index(b":")
+ path = packet[ix+1:ix2].decode()
+
+ def add_to_log(msg):
+ # avoid having the display full of identical messages
+ found = False
+ for p in packet_history:
+ if p['msg'] == msg and p['callsign'] == callsign:
+ found = True
+ p['timestamp'] = now
+ p['rssi'] = rfm9x.last_rssi,
+ break
+
+ if not found:
+ packet_history.append({
+ 'timestamp': now,
+ 'rssi': rfm9x.last_rssi,
+ 'callsign': callsign,
+ 'path': path,
+ 'msg': msg})
+
+
+ if b":!" in packet:
+ try:
+ packet_history.append({
+ 'timestamp': now,
+ 'rssi': rfm9x.last_rssi,
+ 'callsign': callsign,
+ 'path': path,
+ 'msg': decode_encoded_position_report(packet)})
+ except Exception as e:
+ errors.append({'timestamp': now, 'msg': f"Failed to decode packet: {e}"})
+
+ elif b":=" in packet:
+ ix = packet.index(b":=")
+ pos_info = packet[ix+3:]
+ add_to_log(pos_info)
+
+ elif b":>" in packet:
+ ix = packet.index(b":>")
+ msg = packet[ix+3:]
+ add_to_log(msg)
+
+
+ if any(pattern in packet for pattern in [b":!", b":>", b":="]):
+ try:
+ # Prefix igate path
+ orig_path, _, message = packet[3:].partition(b":")
+ new_frame = orig_path + b",qAO," + CALLSIGN.encode() + b":" + message + b"\n"
+ send_frame_to_aprs_is(new_frame)
+ except Exception as e:
+ errors.append({'timestamp': now, 'msg': f"Error sending position report to APRS-IS: {e}"})
+
+ with open("log.jsonl", "a") as fd:
+ json.dump({
+ 'timestamp': now.isoformat(),
+ 'rssi': rfm9x.last_rssi,
+ 'callsign': callsign,
+ 'path': path,
+ 'packet': str(packet)},
+ fd)
+ fd.write("\n")
+
+ except Exception as e:
+ with open("log.jsonl", "a") as fd:
+ json.dump({
+ 'timestamp': now.isoformat(),
+ 'packet': str(packet),
+ 'exception': traceback.format_exc() },
+ fd)
+ fd.write("\n")
+
+ errors.append({'timestamp': now, 'msg': f"Error parsing packet"})
+
+ update_leds()
+ update_display()
+
+ now = time.time()
+
+ if last_beacon_time + 1800 < now:
+ last_beacon_time = now
+ packet = f"{CALLSIGN}>{APRS_DEVICEID},qAC:={LATITUDE}L{LONGITUDE}&{COMMENT}"
+ try:
+ send_frame_to_aprs_is(packet.encode())
+ except Exception as e:
+ errors.append({'timestamp': now, 'msg': f"Error sending beacon to APRS-IS: {e}"})
+ update_display()
+