aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/gpsd_iface.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/gpsd_iface.py')
-rw-r--r--mpm/python/usrp_mpm/gpsd_iface.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py
new file mode 100644
index 000000000..49a13cc75
--- /dev/null
+++ b/mpm/python/usrp_mpm/gpsd_iface.py
@@ -0,0 +1,152 @@
+#!/usr/bin/python3
+#
+# Copyright 2017 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+"""
+GPS service daemon (GPSd) interface class
+"""
+import socket
+import json
+import time
+import select
+from usrp_mpm.mpmlog import get_logger
+
+
+class GPSDIface(object):
+ """
+ Interface to the GPS service daemon (GPSd).
+
+ The GPSDIface implementation can be used as a context manager, and GPSD results should be
+ gotten with the get_gps_info() function. This will filter by the GPSD response class
+ (resp_class) and return that class message. If no filter is provided, this function will return
+ the first response (not counting the VERSION message).
+
+ The MPM SKY sensors returns the first available response- there shouldn't be anything tricky
+ about this. The MPM TPV sensor, however, returns an interesting value in the shortest time
+ possible. If the GPSD has received a TPV report since we connected (and sent the start
+ command), this function should return immediately. However, if no report is ready, the function
+ waits until an interesting result is ready and returns that. This is achieved by discarding
+ `mode=0` responses.
+ """
+ def __init__(self):
+ # Make a logger
+ try:
+ self.log = get_logger('GPSDIface')
+ except AssertionError:
+ from usrp_mpm.mpmlog import get_main_logger
+ self.log = get_main_logger('GPSDIface')
+ # Make a socket to connect to GPSD
+ self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ def __enter__(self):
+ self.open()
+ self.watch_query()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_query()
+ self.close()
+ return exc_type is None
+
+ def open(self):
+ """Open the socket to GPSD"""
+ self.gpsd_socket.connect(('localhost', 2947))
+ self.log.trace("Connected to GPSD.")
+
+ def close(self):
+ """Close the socket"""
+ self.gpsd_socket.close()
+ self.log.trace("Closing the connection to GPSD.")
+
+ def watch_query(self):
+ """Send a WATCH command, which starts operation"""
+ query_cmd = b'?WATCH={"enable":true}'
+ self.gpsd_socket.sendall(query_cmd)
+ self.log.trace("Sent query: {}".format(query_cmd))
+
+ def poll_query(self):
+ """Send a POLL command"""
+ query_cmd = b'?POLL;'
+ self.gpsd_socket.sendall(query_cmd)
+ self.log.trace("Sent query: {}".format(query_cmd))
+
+ def stop_query(self):
+ """Send the command to stop operation"""
+ query_cmd = b'?WATCH={"enable":false}'
+ self.gpsd_socket.sendall(query_cmd)
+ self.log.trace("Sent query: {}".format(query_cmd))
+
+ def socket_read_line(self, timeout=60, interval=0.1):
+ """
+ Read from a socket until newline. If there was no newline until the timeout
+ occurs, raise an error. Otherwise, return the line.
+ """
+ line = b''
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ socket_ready = select.select([self.gpsd_socket], [], [], 0)[0]
+ if socket_ready:
+ next_char = self.gpsd_socket.recv(1)
+ if next_char == b'\n':
+ return line.decode('ascii')
+ line += next_char
+ else:
+ time.sleep(interval)
+ raise RuntimeError("socket_read_line() exceeded read timeout!")
+
+ def get_gps_info(self, resp_class='', timeout=60):
+ """Convenience function for getting a response which contains a response class"""
+ # Read results until we see one which contains the requested response class, ie TPV or SKY
+ result = {}
+ end_time = time.time() + timeout
+ while resp_class not in result:
+ try:
+ self.poll_query()
+ json_result = self.socket_read_line(timeout=timeout)
+ self.log.trace(
+ "Received JSON response: {}".format(json_result)
+ )
+ result = json.loads(json_result)
+ self.log.trace(
+ "Keys in response: {}".format(result.keys())
+ )
+ if (resp_class == "") or (time.time() > end_time):
+ # If we don't have a response class filter, just return the first response
+ # or if we timeout
+ break
+ except json.JSONDecodeError:
+ # If we get an incomplete packet, this will trigger
+ # In this case, just retry
+ continue
+ # Filter the result by resp_class or return the entire thing
+ # In the filtered case, the result contains a list of 'resp_class' responses,
+ # so we need to get one valid one.
+ return result if (resp_class == "") else result.get(resp_class, [{}])[0]
+
+
+def main():
+ """Test functionality of the GPSDIface class"""
+ # Do some setup
+ import argparse
+
+ def parse_args():
+ """Parse the command-line arguments"""
+ parser = argparse.ArgumentParser(description="Read messages from GPSD")
+ parser.add_argument("--timeout", help="Timeout for the GPSD read", default=20)
+ return parser.parse_args()
+
+ args = parse_args()
+
+ with GPSDIface() as gps_iface:
+ result = gps_iface.get_gps_info(resp_class='', timeout=args.timeout)
+ tpv_result = gps_iface.get_gps_info(resp_class='tpv', timeout=args.timeout)
+ sky_result = gps_iface.get_gps_info(resp_class='sky', timeout=args.timeout)
+ print("Sample result: {}".format(result))
+ print("TPV: {}".format(tpv_result))
+ print("SKY: {}".format(sky_result))
+
+
+if __name__ == "__main__":
+ main()