aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm')
-rw-r--r--mpm/python/usrp_mpm/gpsd_iface.py137
1 files changed, 87 insertions, 50 deletions
diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py
index d2da27975..08e9fdd16 100644
--- a/mpm/python/usrp_mpm/gpsd_iface.py
+++ b/mpm/python/usrp_mpm/gpsd_iface.py
@@ -44,43 +44,62 @@ class GPSDIface(object):
def __enter__(self):
self.open()
- self.watch_query()
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.stop_query()
+ self.disable_watch()
self.close()
return exc_type is None
def open(self):
"""Open the socket to GPSD"""
self.gpsd_socket.connect(('localhost', 2947))
- self.log.trace("Connected to GPSD.")
+ version_str = self.read_class("VERSION")
+ self.enable_watch()
+ self.log.trace("GPSD version: %s", version_str)
def close(self):
"""Close the socket"""
self.gpsd_socket.close()
self.log.trace("Closing the connection to GPSD.")
- def watch_query(self):
+ def enable_watch(self):
"""Send a WATCH command, which starts operation"""
- query_cmd = b'?WATCH={"enable":true}'
- self.gpsd_socket.sendall(query_cmd)
- self.log.trace("Sent query: {}".format(query_cmd))
+ self.gpsd_socket.sendall(b'?WATCH={"enable":true};')
+ self.log.trace(self.read_class("DEVICES"))
+ self.log.trace(self.read_class("WATCH"))
+
+ def poll_request(self, socket_timeout=60, num_retry=10):
+ """Send a POLL command
- def poll_query(self):
- """Send a POLL command"""
+ Raises
+ ------
+ json.JSONDecodeError
+ If the data returned from GPSd cannot be decoded with JSON.
+ RuntimeError
+ If unsuccessfully connecting to GPSD within num_retry.
+ """
query_cmd = b'?POLL;'
- self.gpsd_socket.sendall(query_cmd)
- self.log.trace("Sent query: {}".format(query_cmd))
+ for _ in range(num_retry):
+ try:
+ self.gpsd_socket.sendall(query_cmd)
+ return self.read_class("POLL", socket_timeout)
+ except socket.error:
+ self.log.warning("Reconnecting to GPSD.")
+ try:
+ self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.open()
+ except socket.error:
+ self.log.warning("Error during GPSD reconnect.")
+ continue
+ raise RuntimeError("Unsuccessfully connecting to GPSD within {} tries".format(num_retry))
- def stop_query(self):
+ def disable_watch(self):
"""Send the command to stop operation"""
- query_cmd = b'?WATCH={"enable":false}'
+ query_cmd = b'?WATCH={"enable":false};'
self.gpsd_socket.sendall(query_cmd)
- self.log.trace("Sent query: {}".format(query_cmd))
- def socket_read_line(self, timeout=60, interval=0.1):
+ def socket_read_line(self, timeout=60, interval=0):
"""
Read from a socket until newline. If there was no newline until the timeout
occurs, raise an error. Otherwise, return the line.
@@ -96,7 +115,21 @@ class GPSDIface(object):
line += next_char
else:
time.sleep(interval)
- raise RuntimeError("socket_read_line() exceeded read timeout!")
+ raise socket.timeout
+
+ def read_class(self, class_name, socket_timeout=60):
+ """return json data for spcecfic key of 'class'
+ This function will read until socket timeout (or no data on socket)
+
+ Raises
+ ------
+ json.JSONDecodeError
+ If the data returned from GPSd cannot be decoded with JSON.
+ """
+ while True:
+ json_result = json.loads(self.socket_read_line(socket_timeout))
+ if json_result.get('class', '') == class_name:
+ return json_result
def get_gps_info(self, resp_class='', timeout=60):
"""Convenience function for getting a response which contains a response class"""
@@ -105,15 +138,9 @@ class GPSDIface(object):
end_time = time.time() + timeout
while not result.get(resp_class, {}):
try:
- self.poll_query()
- json_result = self.socket_read_line(timeout=timeout)
- self.log.trace(
- "Received JSON response: {}".format(json_result)
- )
- result = json.loads(json_result)
- self.log.trace(
- "Keys in response: {}".format(result.keys())
- )
+ # Do poll request with socket timeout of 5s here.
+ # It should not be that long, since GPSD should send POLL object promptly.
+ result = self.poll_request(5)
if (resp_class == "") or (time.time() > end_time):
# If we don't have a response class filter, just return the first response
# or if we timeout
@@ -121,6 +148,7 @@ class GPSDIface(object):
except json.JSONDecodeError:
# If we get an incomplete packet, this will trigger
# In this case, just retry
+ self.log.warning("JSON decode error: %s", result)
continue
# Filter the result by resp_class or return the entire thing
# In the filtered case, the result contains a list of 'resp_class' responses,
@@ -168,42 +196,50 @@ class GPSDIfaceExtension(object):
def get_gps_time_sensor(self):
"""
- Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict
- This time is not high accuracy.
+ Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict.
+ This returns a sensor dictionary on the second edge containing the latest second.
+ For example, if we call this function at the gps time of 1.001s it will wait until
+ just after 2.000s to return 2 second. This effect is similar to get gps time on
+ the next edge of pps.
"""
- self._log.trace("Polling GPS time results from GPSD")
- # Read responses from GPSD until we get a non-trivial mode
- self._gpsd_iface.watch_query()
+ def parse_time(time_str):
+ """parse a string of time in format of %Y-%m-%dT%H:%M:%S.%fZ
+ return in unit second
+ """
+ time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
+ epoch_dt = datetime.datetime(1970, 1, 1)
+ return (time_dt - epoch_dt).total_seconds()
+ # Read responses from GPSD until we get a non-trivial mode and until next second.
+ gps_time_prev = 0
while True:
gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15)
- self._log.trace("GPS info: {}".format(gps_info))
- if gps_info.get("mode", 0) > 0:
- break
- self._gpsd_iface.stop_query()
- time_str = gps_info.get("time", "")
- self._log.trace("GPS time string: {}".format(time_str))
- time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
- self._log.trace("GPS datetime: {}".format(time_dt))
- epoch_dt = datetime.datetime(1970, 1, 1)
- gps_time = int((time_dt - epoch_dt).total_seconds())
- return {
- 'name': 'gps_time',
- 'type': 'INTEGER',
- 'unit': 'seconds',
- 'value': str(gps_time),
- }
+ gps_mode = gps_info.get("mode", 0)
+ gps_time = parse_time(gps_info.get("time", ""))
+ if gps_mode == 0:
+ self._log.warning("GPSD reported invalid mode."
+ "Return from GPSD is %s", gps_info)
+ continue
+ if gps_time_prev == 0:
+ gps_time_prev = gps_time
+ continue
+ else:
+ if int(gps_time) - int(gps_time_prev) >= 1:
+ return {
+ 'name': 'gps_time',
+ 'type': 'INTEGER',
+ 'unit': 'seconds',
+ 'value': str(int(gps_time)),
+ }
def get_gps_tpv_sensor(self):
"""Get a TPV response from GPSd as a sensor dict"""
self._log.trace("Polling GPS TPV results from GPSD")
# Read responses from GPSD until we get a non-trivial mode
- self._gpsd_iface.watch_query()
while True:
gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15)
self._log.trace("GPS info: {}".format(gps_info))
if gps_info.get("mode", 0) > 0:
break
- self._gpsd_iface.stop_query()
# Return the JSON'd results
gps_tpv = json.dumps(gps_info)
return {
@@ -217,9 +253,7 @@ class GPSDIfaceExtension(object):
"""Get a SKY response from GPSd as a sensor dict"""
self._log.trace("Polling GPS SKY results from GPSD")
# Just get the first SKY result
- self._gpsd_iface.watch_query()
gps_info = self._gpsd_iface.get_gps_info(resp_class='sky', timeout=15)
- self._gpsd_iface.stop_query()
# Return the JSON'd results
gps_sky = json.dumps(gps_info)
return {
@@ -250,6 +284,9 @@ def main():
print("Sample result: {}".format(result))
print("TPV: {}".format(tpv_result))
print("SKY: {}".format(sky_result))
+ gps_ext = GPSDIfaceExtension()
+ for _ in range(10):
+ print(gps_ext.get_gps_time_sensor().get('value'))
#TODO Add GPSDIfaceExtension code