aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2021-04-14 09:10:31 +0200
committerAaron Rossetto <aaron.rossetto@ni.com>2021-05-24 13:59:40 -0500
commitff8b50e1b3abf73d0aa691748d8d045e5ce83b36 (patch)
tree0fc8fdfa2451c82933bfa792dd598d3c3229bf3f
parentd2fc12dd8d4eccb79207fc1fcf1c997964dcbe98 (diff)
downloaduhd-ff8b50e1b3abf73d0aa691748d8d045e5ce83b36.tar.gz
uhd-ff8b50e1b3abf73d0aa691748d8d045e5ce83b36.tar.bz2
uhd-ff8b50e1b3abf73d0aa691748d8d045e5ce83b36.zip
mpm: gpsd_iface: Refactor class
- There's a lengthy conversion of TPV/SKY dicts into GPGGA which is removed from the class to enhance readability - The file had some Pylint issues, including a Python2-ism
-rw-r--r--mpm/python/usrp_mpm/gpsd_iface.py197
1 files changed, 103 insertions, 94 deletions
diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py
index 72cde22b1..68747dff5 100644
--- a/mpm/python/usrp_mpm/gpsd_iface.py
+++ b/mpm/python/usrp_mpm/gpsd_iface.py
@@ -7,7 +7,6 @@
GPS service daemon (GPSd) interface class
"""
-from __future__ import print_function
import socket
import json
import time
@@ -17,8 +16,95 @@ import math
import re
from usrp_mpm.mpmlog import get_logger
+def _deg_to_dm(angle):
+ """
+ Convert a latitude or longitude from NMEA degrees to degrees minutes
+ format (DDDmm.mm)
+ """
+ fraction_int_tuple = math.modf(angle)
+ return fraction_int_tuple[1] * 100 + fraction_int_tuple[0] * 60
+
+def _nmea_checksum(nmea_sentence):
+ """Calculate the checksum for a NMEA data sentence"""
+ checksum = 0
+ if not nmea_sentence.startswith('$'):
+ return checksum
+
+ for character in nmea_sentence[1:]:
+ checksum ^= ord(character)
+
+ return checksum
+
+def gpgga_from_tpv_sky(tpv_sensor_data, sky_sensor_data):
+ """
+ Turn a TPV and SKY sensor value dictionary into a GPGGA string
+ """
+ gpgga = "$GPGGA,"
+
+ if 'time' in tpv_sensor_data:
+ time_formatted = re.subn(r'\d{4}-\d{2}-\d{2}T(\d{2}):(\d{2}):(\d{2}\.?\d*)Z',
+ r'\1\2\3,', tpv_sensor_data.get('time'))
+ if time_formatted[1] == 1:
+ gpgga += time_formatted[0]
+ else:
+ gpgga += ","
+ else:
+ gpgga += ","
+
+ if 'lat' in tpv_sensor_data:
+ latitude = tpv_sensor_data.get('lat')
+ latitude_direction = 'N' if latitude > 0 else 'S'
+ latitude = _deg_to_dm(abs(latitude))
+ gpgga += "{:09.4f},{},".format(latitude, latitude_direction)
+ else:
+ gpgga += "0.0,S,"
+
+ if 'lon' in tpv_sensor_data:
+ longitude = tpv_sensor_data['lon']
+ longitude_direction = 'E' if longitude > 0 else 'W'
+ longitude = _deg_to_dm(abs(longitude))
+ gpgga += "{:010.4f},{},".format(longitude, longitude_direction)
+ else:
+ gpgga += "0.0,W,"
+
+ quality = 0
+ if tpv_sensor_data['mode'] > 1:
+ if tpv_sensor_data.get('status') == 2:
+ quality = 2
+ else:
+ quality = 1
+ gpgga += "{:d},".format(quality)
+
+ if 'satellites' in sky_sensor_data:
+ satellites_used = 0
+ for satellite in sky_sensor_data['satellites']:
+ if 'used' in satellite and satellite['used']:
+ satellites_used += 1
+ gpgga += "{:02d},".format(satellites_used)
+ else:
+ gpgga += ","
+
+ if 'hdop' in sky_sensor_data:
+ gpgga += "{:.2f},".format(sky_sensor_data['hdop'])
+ else:
+ gpgga += ","
+
+ if 'alt' in tpv_sensor_data:
+ gpgga += "{:2f},{},".format(tpv_sensor_data['alt'], 'M')
+ else:
+ gpgga += ",,"
+
+ # separation data is not present in tpv or sky sensor data
+ gpgga += ",,"
+
+ # differential data is not present
+ gpgga += ",,"
-class GPSDIface(object):
+ gpgga += "*{:02X}".format(_nmea_checksum(gpgga))
+
+ return gpgga
+
+class GPSDIface:
"""
Interface to the GPS service daemon (GPSd).
@@ -171,7 +257,7 @@ class GPSDIface(object):
return result.get(resp_class, [{}])[0]
-class GPSDIfaceExtension(object):
+class GPSDIfaceExtension:
"""
Wrapper class that facilitates the 'extension' of a `context` object. The
intention here is for a object to instantiate a GPSDIfaceExtension object,
@@ -205,7 +291,9 @@ class GPSDIfaceExtension(object):
self._gpsd_iface.close()
def extend(self, context):
- """Register the GSPDIfaceExtension object's public function with `context`"""
+ """
+ Register the GSPDIfaceExtension object's public function with `context`
+ """
new_methods = [method_name for method_name in dir(self)
if not method_name.startswith('_') \
and callable(getattr(self, method_name)) \
@@ -245,14 +333,13 @@ class GPSDIfaceExtension(object):
if gps_time_prev == 0:
gps_time_prev = gps_time
continue
- else:
- if int(gps_time) - int(gps_time_prev) >= 1:
- return {
- 'name': 'gps_time',
- 'type': 'INTEGER',
- 'unit': 'seconds',
- 'value': str(int(gps_time)),
- }
+ if int(gps_time) - int(gps_time_prev) >= 1:
+ return {
+ 'name': 'gps_time',
+ 'type': 'INTEGER',
+ 'unit': 'seconds',
+ 'value': str(int(gps_time)),
+ }
def get_gps_tpv_sensor(self):
"""Get a TPV response from GPSd as a sensor dict"""
@@ -288,22 +375,6 @@ class GPSDIfaceExtension(object):
def get_gps_gpgga_sensor(self):
"""Get GPGGA sensor data by parsing TPV and SKY sensor data"""
- def _deg_to_dm(angle):
- """Convert a latitude or longitude from degrees to degrees minutes format"""
- fraction_int_tuple = math.modf(angle)
- return fraction_int_tuple[1] * 100 + fraction_int_tuple[0] * 60
-
- def _nmea_checksum(nmea_sentence):
- """Calculate the checksum for a NMEA data sentence"""
- checksum = 0
- if not nmea_sentence.startswith('$'):
- return checksum
-
- for character in nmea_sentence[1:]:
- checksum ^= ord(character)
-
- return checksum
-
self._log.trace("Polling GPS TPV and SKY results from GPSD")
# Read responses from GPSD until we get both a SKY response and TPV
# response in non-trivial mode
@@ -312,77 +383,15 @@ class GPSDIfaceExtension(object):
self._log.trace("GPS info: {}".format(gps_info))
tpv_sensor_data = gps_info.get('tpv', [{}])[0]
sky_sensor_data = gps_info.get('sky', [{}])[0]
- if tpv_sensor_data and sky_sensor_data and tpv_sensor_data.get("mode", 0) > 0:
+ if tpv_sensor_data and \
+ sky_sensor_data and \
+ tpv_sensor_data.get("mode", 0) > 0:
break
-
- gpgga = "$GPGGA,"
-
- if 'time' in tpv_sensor_data:
- time_formatted = re.subn(r'\d{4}-\d{2}-\d{2}T(\d{2}):(\d{2}):(\d{2}\.?\d*)Z',
- r'\1\2\3,', tpv_sensor_data.get('time'))
- if time_formatted[1] == 1:
- gpgga += time_formatted[0]
- else:
- gpgga += ","
- else:
- gpgga += ","
-
- if 'lat' in tpv_sensor_data:
- latitude = tpv_sensor_data.get('lat')
- latitude_direction = 'N' if latitude > 0 else 'S'
- latitude = _deg_to_dm(abs(latitude))
- gpgga += "{:09.4f},{},".format(latitude, latitude_direction)
- else:
- gpgga += "0.0,S,"
-
- if 'lon' in tpv_sensor_data:
- longitude = tpv_sensor_data['lon']
- longitude_direction = 'E' if longitude > 0 else 'W'
- longitude = _deg_to_dm(abs(longitude))
- gpgga += "{:010.4f},{},".format(longitude, longitude_direction)
- else:
- gpgga += "0.0,W,"
-
- quality = 0
- if tpv_sensor_data['mode'] > 1:
- if tpv_sensor_data.get('status') == 2:
- quality = 2
- else:
- quality = 1
- gpgga += "{:d},".format(quality)
-
- if 'satellites' in sky_sensor_data:
- satellites_used = 0
- for satellite in sky_sensor_data['satellites']:
- if 'used' in satellite and satellite['used']:
- satellites_used += 1
- gpgga += "{:02d},".format(satellites_used)
- else:
- gpgga += ","
-
- if 'hdop' in sky_sensor_data:
- gpgga += "{:.2f},".format(sky_sensor_data['hdop'])
- else:
- gpgga += ","
-
- if 'alt' in tpv_sensor_data:
- gpgga += "{:2f},{},".format(tpv_sensor_data['alt'], 'M')
- else:
- gpgga += ",,"
-
- # separation data is not present in tpv or sky sensor data
- gpgga += ",,"
-
- # differential data is not present
- gpgga += ",,"
-
- gpgga += "*{:02X}".format(_nmea_checksum(gpgga))
-
return {
'name': 'gpgga',
'type': 'STRING',
'unit': '',
- 'value': gpgga,
+ 'value': gpgga_from_tpv_sky(tpv_sensor_data, sky_sensor_data),
}
def get_gps_lock(self):