diff options
Diffstat (limited to 'mpm/python/usrp_mpm/gpsd_iface.py')
-rw-r--r-- | mpm/python/usrp_mpm/gpsd_iface.py | 108 |
1 files changed, 107 insertions, 1 deletions
diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py index 66abfd4bd..d2da27975 100644 --- a/mpm/python/usrp_mpm/gpsd_iface.py +++ b/mpm/python/usrp_mpm/gpsd_iface.py @@ -1,15 +1,18 @@ # -# Copyright 2017 Ettus Research, a National Instruments Company +# Copyright 2017-2018 Ettus Research, a National Instruments Company # # SPDX-License-Identifier: GPL-3.0-or-later # """ GPS service daemon (GPSd) interface class """ + +from __future__ import print_function import socket import json import time import select +import datetime from usrp_mpm.mpmlog import get_logger @@ -125,6 +128,108 @@ class GPSDIface(object): return result if (resp_class == "") else result.get(resp_class, [{}])[0] +class GPSDIfaceExtension(object): + """ + Wrapper class that facilitates the 'extension' of a `context` object. The + intention here is for a object to instantiate a GPSDIfaceExtension object, + then call the `extend` method with `context` object as an argument. This + will then add the GPSDIfaceExtension methods to the `context` object's + methods. The `context` object can then call the convenience functions to + retrieve GPS information from GPSd. + For example: + + class foo: + def __init__(self): + self.gps_ext = GPSDIfaceExtension() + self.gps_ext.extend(self) + # The GPSDIfaceExtension methods are now registered with foo, so + # we can call `get_gps_time` + print(self.get_gps_time()) + """ + def __init__(self): + self._gpsd_iface = GPSDIface() + self._gpsd_iface.open() + self._log = self._gpsd_iface.log + + def __del__(self): + self._gpsd_iface.close() + + def extend(self, context): + """Register the GSPDIfaceExtension object's public function with `context`""" + new_methods = [method_name for method_name in dir(self) + if not method_name.startswith('_') \ + and callable(getattr(self, method_name)) \ + and method_name != "extend"] + for method_name in new_methods: + new_method = getattr(self, method_name) + self._log.trace("%s: Adding %s method", context, method_name) + setattr(context, method_name, new_method) + return new_methods + + def get_gps_time_sensor(self): + """ + Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict + This time is not high accuracy. + """ + self._log.trace("Polling GPS time results from GPSD") + # Read responses from GPSD until we get a non-trivial mode + self._gpsd_iface.watch_query() + while True: + gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15) + self._log.trace("GPS info: {}".format(gps_info)) + if gps_info.get("mode", 0) > 0: + break + self._gpsd_iface.stop_query() + time_str = gps_info.get("time", "") + self._log.trace("GPS time string: {}".format(time_str)) + time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ") + self._log.trace("GPS datetime: {}".format(time_dt)) + epoch_dt = datetime.datetime(1970, 1, 1) + gps_time = int((time_dt - epoch_dt).total_seconds()) + return { + 'name': 'gps_time', + 'type': 'INTEGER', + 'unit': 'seconds', + 'value': str(gps_time), + } + + def get_gps_tpv_sensor(self): + """Get a TPV response from GPSd as a sensor dict""" + self._log.trace("Polling GPS TPV results from GPSD") + # Read responses from GPSD until we get a non-trivial mode + self._gpsd_iface.watch_query() + while True: + gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15) + self._log.trace("GPS info: {}".format(gps_info)) + if gps_info.get("mode", 0) > 0: + break + self._gpsd_iface.stop_query() + # Return the JSON'd results + gps_tpv = json.dumps(gps_info) + return { + 'name': 'gps_tpv', + 'type': 'STRING', + 'unit': '', + 'value': gps_tpv, + } + + def get_gps_sky_sensor(self): + """Get a SKY response from GPSd as a sensor dict""" + self._log.trace("Polling GPS SKY results from GPSD") + # Just get the first SKY result + self._gpsd_iface.watch_query() + gps_info = self._gpsd_iface.get_gps_info(resp_class='sky', timeout=15) + self._gpsd_iface.stop_query() + # Return the JSON'd results + gps_sky = json.dumps(gps_info) + return { + 'name': 'gps_sky', + 'type': 'STRING', + 'unit': '', + 'value': gps_sky, + } + + def main(): """Test functionality of the GPSDIface class""" # Do some setup @@ -145,6 +250,7 @@ def main(): print("Sample result: {}".format(result)) print("TPV: {}".format(tpv_result)) print("SKY: {}".format(sky_result)) + #TODO Add GPSDIfaceExtension code if __name__ == "__main__": |