aboutsummaryrefslogtreecommitdiffstats
path: root/python/igate.py
blob: 7c2e131e7b834845576b937c24374ad1499104fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!venv/bin/python
#
# This script is a very simple APRS-IS LoRa igate that
# runs on a raspberry pi and expects a RFM9x on the SPI bus.
#
# Also, there are two status LEDs on D23 and D24 that count
# the incoming messages.
#
# It is a send-only i-gate that uses the HTTP POST mechanism described
# on http://aprs-is.net/SendOnlyPorts.aspx
#
import datetime
import socket
import struct
import time

import requests
import busio
from digitalio import DigitalInOut, Direction, Pull
import board
# Import the RFM9x radio module.
import adafruit_rfm9x

# For APRS-IS
APRS_IS_SERVER = "france.aprs2.net"
APRS_IS_PORT = 8080
CALLSIGN = "HB9EGM-10"

# See https://github.com/aprsorg/aprs-deviceid/blob/main/ALLOCATING.md
APRS_DEVICEID = "APZEGM"

# 46°23.07' N 6°13.28'
LATITUDE = "4623.07N"
LONGITUDE = "00613.28E"
COMMENT = "RPi RFM98W Python igate, https://mpb.li/git/lora-aprs-hb9egm/tree/python/igate.py"

PASSCODE = "17832"
REPORTING_DISTANCE = "10"

def decode_encoded_position_report(packet):
    """Returns a string with decoded latitude and longitude, or raises ValueError"""
    ix = packet.index(b":!/")
    gps_packet = packet[ix+3:]
    encoded_latitude = gps_packet[:4]
    encoded_longitude = gps_packet[4:8]

    (y1, y2, y3, y4) = struct.unpack("<bbbb", encoded_latitude)

    decodedLatitude = 90.0 - ((((y1-33) * pow(91,3)) + ((y2-33) * pow(91,2)) + ((y3-33) * 91) + y4-33) / 380926.0);

    (x1, x2, x3, x4) = struct.unpack("<bbbb", encoded_longitude)

    decodedLongitude = -180.0 + ((((x1-33) * pow(91,3)) + ((x2-33) * pow(91,2)) + ((x3-33) * 91) + x4-33) / 190463.0);
    return f"{decodedLatitude}N / {decodedLongitude}E"

def send_frame_to_aprs_is(frame):
    headers = { "Accept": "text/plain", "Content-Type": "application/octet-stream" }

    aprsauth = f"user {CALLSIGN} pass {PASSCODE} vers HB9EGM_lorapy 1.0 filter t/m/{CALLSIGN}/{REPORTING_DISTANCE}\n";
    data = aprsauth.encode() + frame

    print(data)

    r = requests.post(f"http://{APRS_IS_SERVER}:{APRS_IS_PORT}/", data=data, headers=headers)
    print(f"{r.status_code} {r.content.decode()}")
    r.raise_for_status()


led_yellow = DigitalInOut(board.D23)
led_yellow.direction = Direction.OUTPUT
led_green = DigitalInOut(board.D24)
led_green.direction = Direction.OUTPUT

led_counter = 0

def update_leds():
    global led_counter
    led_counter = (led_counter + 1) % 4

    led_green.value = (led_counter & 0x1) != 0
    led_yellow.value = (led_counter & 0x2) != 0

update_leds()

cs = DigitalInOut(board.CE1)
reset = DigitalInOut(board.D25)
spi = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)

baudrate = 5000000

last_beacon_time = time.time()

# see https://docs.circuitpython.org/projects/rfm9x/en/latest/
try:
    rfm9x = adafruit_rfm9x.RFM9x(spi, cs, reset, 433.775, baudrate=baudrate, preamble_length=8)
    rfm9x.signal_bandwidth = 125000
    rfm9x.coding_rate = 5
    #rfm9x.tx_power = 23 # default 13

    rfm9x.spreading_factor = 12

    # Section 4.1.1.6 - Set low datarate automatically based on BW and SF
    symbolDuration = 1000 / ( rfm9x.signal_bandwidth / (1 << rfm9x.spreading_factor) )
    if symbolDuration > 16:
        rfm9x.low_datarate_optimize = 1
    else:
        rfm9x.low_datarate_optimize = 0

    print("Waiting for packets...")
    while True:
        packet = rfm9x.receive(timeout=4, with_header=True)
        if packet is not None:
            packet = bytes(packet) # otherwise it's a bytearray

            print("{}: {}".format(datetime.datetime.now(), packet))
            #print(" {}".format(" ".join(hex(x) for x in packet)))

            if b":!/" in packet:
                print(decode_encoded_position_report(packet))
                try:
                    # Prefix igate path
                    path, _, message = packet[3:].partition(b":")
                    new_frame = path + b",qAO," + CALLSIGN.encode() + b":" + message + b"\n"
                    send_frame_to_aprs_is(new_frame)
                except Exception as ex:
                    print("Error sending position report to APRS-IS")
                    print(ex)

            print(" RSSI: {}".format(rfm9x.last_rssi))
            print()

            update_leds()

        now = time.time()

        if last_beacon_time + 1800 < now:
            last_beacon_time = now
            packet = f"{CALLSIGN}>{APRS_DEVICEID},qAC:={LATITUDE}L{LONGITUDE}&{COMMENT}"
            try:
                send_frame_to_aprs_is(packet.encode())
            except Exception as ex:
                print("Error sending beacon to APRS-IS")
                print(ex)

except RuntimeError as error:
    print(f'RFM9x Error: {error}')