From ff8b50e1b3abf73d0aa691748d8d045e5ce83b36 Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Wed, 14 Apr 2021 09:10:31 +0200 Subject: mpm: gpsd_iface: Refactor class - There's a lengthy conversion of TPV/SKY dicts into GPGGA which is removed from the class to enhance readability - The file had some Pylint issues, including a Python2-ism --- mpm/python/usrp_mpm/gpsd_iface.py | 197 ++++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 94 deletions(-) (limited to 'mpm') diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py index 72cde22b1..68747dff5 100644 --- a/mpm/python/usrp_mpm/gpsd_iface.py +++ b/mpm/python/usrp_mpm/gpsd_iface.py @@ -7,7 +7,6 @@ GPS service daemon (GPSd) interface class """ -from __future__ import print_function import socket import json import time @@ -17,8 +16,95 @@ import math import re from usrp_mpm.mpmlog import get_logger +def _deg_to_dm(angle): + """ + Convert a latitude or longitude from NMEA degrees to degrees minutes + format (DDDmm.mm) + """ + fraction_int_tuple = math.modf(angle) + return fraction_int_tuple[1] * 100 + fraction_int_tuple[0] * 60 + +def _nmea_checksum(nmea_sentence): + """Calculate the checksum for a NMEA data sentence""" + checksum = 0 + if not nmea_sentence.startswith('$'): + return checksum + + for character in nmea_sentence[1:]: + checksum ^= ord(character) + + return checksum + +def gpgga_from_tpv_sky(tpv_sensor_data, sky_sensor_data): + """ + Turn a TPV and SKY sensor value dictionary into a GPGGA string + """ + gpgga = "$GPGGA," + + if 'time' in tpv_sensor_data: + time_formatted = re.subn(r'\d{4}-\d{2}-\d{2}T(\d{2}):(\d{2}):(\d{2}\.?\d*)Z', + r'\1\2\3,', tpv_sensor_data.get('time')) + if time_formatted[1] == 1: + gpgga += time_formatted[0] + else: + gpgga += "," + else: + gpgga += "," + + if 'lat' in tpv_sensor_data: + latitude = tpv_sensor_data.get('lat') + latitude_direction = 'N' if latitude > 0 else 'S' + latitude = _deg_to_dm(abs(latitude)) + gpgga += "{:09.4f},{},".format(latitude, latitude_direction) + else: + gpgga += "0.0,S," + + if 'lon' in tpv_sensor_data: + longitude = tpv_sensor_data['lon'] + longitude_direction = 'E' if longitude > 0 else 'W' + longitude = _deg_to_dm(abs(longitude)) + gpgga += "{:010.4f},{},".format(longitude, longitude_direction) + else: + gpgga += "0.0,W," + + quality = 0 + if tpv_sensor_data['mode'] > 1: + if tpv_sensor_data.get('status') == 2: + quality = 2 + else: + quality = 1 + gpgga += "{:d},".format(quality) + + if 'satellites' in sky_sensor_data: + satellites_used = 0 + for satellite in sky_sensor_data['satellites']: + if 'used' in satellite and satellite['used']: + satellites_used += 1 + gpgga += "{:02d},".format(satellites_used) + else: + gpgga += "," + + if 'hdop' in sky_sensor_data: + gpgga += "{:.2f},".format(sky_sensor_data['hdop']) + else: + gpgga += "," + + if 'alt' in tpv_sensor_data: + gpgga += "{:2f},{},".format(tpv_sensor_data['alt'], 'M') + else: + gpgga += ",," + + # separation data is not present in tpv or sky sensor data + gpgga += ",," + + # differential data is not present + gpgga += ",," -class GPSDIface(object): + gpgga += "*{:02X}".format(_nmea_checksum(gpgga)) + + return gpgga + +class GPSDIface: """ Interface to the GPS service daemon (GPSd). @@ -171,7 +257,7 @@ class GPSDIface(object): return result.get(resp_class, [{}])[0] -class GPSDIfaceExtension(object): +class GPSDIfaceExtension: """ Wrapper class that facilitates the 'extension' of a `context` object. The intention here is for a object to instantiate a GPSDIfaceExtension object, @@ -205,7 +291,9 @@ class GPSDIfaceExtension(object): self._gpsd_iface.close() def extend(self, context): - """Register the GSPDIfaceExtension object's public function with `context`""" + """ + Register the GSPDIfaceExtension object's public function with `context` + """ new_methods = [method_name for method_name in dir(self) if not method_name.startswith('_') \ and callable(getattr(self, method_name)) \ @@ -245,14 +333,13 @@ class GPSDIfaceExtension(object): if gps_time_prev == 0: gps_time_prev = gps_time continue - else: - if int(gps_time) - int(gps_time_prev) >= 1: - return { - 'name': 'gps_time', - 'type': 'INTEGER', - 'unit': 'seconds', - 'value': str(int(gps_time)), - } + if int(gps_time) - int(gps_time_prev) >= 1: + return { + 'name': 'gps_time', + 'type': 'INTEGER', + 'unit': 'seconds', + 'value': str(int(gps_time)), + } def get_gps_tpv_sensor(self): """Get a TPV response from GPSd as a sensor dict""" @@ -288,22 +375,6 @@ class GPSDIfaceExtension(object): def get_gps_gpgga_sensor(self): """Get GPGGA sensor data by parsing TPV and SKY sensor data""" - def _deg_to_dm(angle): - """Convert a latitude or longitude from degrees to degrees minutes format""" - fraction_int_tuple = math.modf(angle) - return fraction_int_tuple[1] * 100 + fraction_int_tuple[0] * 60 - - def _nmea_checksum(nmea_sentence): - """Calculate the checksum for a NMEA data sentence""" - checksum = 0 - if not nmea_sentence.startswith('$'): - return checksum - - for character in nmea_sentence[1:]: - checksum ^= ord(character) - - return checksum - self._log.trace("Polling GPS TPV and SKY results from GPSD") # Read responses from GPSD until we get both a SKY response and TPV # response in non-trivial mode @@ -312,77 +383,15 @@ class GPSDIfaceExtension(object): self._log.trace("GPS info: {}".format(gps_info)) tpv_sensor_data = gps_info.get('tpv', [{}])[0] sky_sensor_data = gps_info.get('sky', [{}])[0] - if tpv_sensor_data and sky_sensor_data and tpv_sensor_data.get("mode", 0) > 0: + if tpv_sensor_data and \ + sky_sensor_data and \ + tpv_sensor_data.get("mode", 0) > 0: break - - gpgga = "$GPGGA," - - if 'time' in tpv_sensor_data: - time_formatted = re.subn(r'\d{4}-\d{2}-\d{2}T(\d{2}):(\d{2}):(\d{2}\.?\d*)Z', - r'\1\2\3,', tpv_sensor_data.get('time')) - if time_formatted[1] == 1: - gpgga += time_formatted[0] - else: - gpgga += "," - else: - gpgga += "," - - if 'lat' in tpv_sensor_data: - latitude = tpv_sensor_data.get('lat') - latitude_direction = 'N' if latitude > 0 else 'S' - latitude = _deg_to_dm(abs(latitude)) - gpgga += "{:09.4f},{},".format(latitude, latitude_direction) - else: - gpgga += "0.0,S," - - if 'lon' in tpv_sensor_data: - longitude = tpv_sensor_data['lon'] - longitude_direction = 'E' if longitude > 0 else 'W' - longitude = _deg_to_dm(abs(longitude)) - gpgga += "{:010.4f},{},".format(longitude, longitude_direction) - else: - gpgga += "0.0,W," - - quality = 0 - if tpv_sensor_data['mode'] > 1: - if tpv_sensor_data.get('status') == 2: - quality = 2 - else: - quality = 1 - gpgga += "{:d},".format(quality) - - if 'satellites' in sky_sensor_data: - satellites_used = 0 - for satellite in sky_sensor_data['satellites']: - if 'used' in satellite and satellite['used']: - satellites_used += 1 - gpgga += "{:02d},".format(satellites_used) - else: - gpgga += "," - - if 'hdop' in sky_sensor_data: - gpgga += "{:.2f},".format(sky_sensor_data['hdop']) - else: - gpgga += "," - - if 'alt' in tpv_sensor_data: - gpgga += "{:2f},{},".format(tpv_sensor_data['alt'], 'M') - else: - gpgga += ",," - - # separation data is not present in tpv or sky sensor data - gpgga += ",," - - # differential data is not present - gpgga += ",," - - gpgga += "*{:02X}".format(_nmea_checksum(gpgga)) - return { 'name': 'gpgga', 'type': 'STRING', 'unit': '', - 'value': gpgga, + 'value': gpgga_from_tpv_sky(tpv_sensor_data, sky_sensor_data), } def get_gps_lock(self): -- cgit v1.2.3