From 15051a37036458441f8a76ad5c6b2de13502bf72 Mon Sep 17 00:00:00 2001 From: Trung Tran Date: Sun, 11 Nov 2018 17:50:59 -0800 Subject: mpm:gpsd_iface: handle errors from gpsd gpsd connection is not reliable. Adding more error handling to re-connect during polling. Add control flows to get_gps_time in order to give an effect of getting the value on pps edge. --- mpm/python/usrp_mpm/gpsd_iface.py | 137 ++++++++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 50 deletions(-) (limited to 'mpm/python/usrp_mpm') diff --git a/mpm/python/usrp_mpm/gpsd_iface.py b/mpm/python/usrp_mpm/gpsd_iface.py index d2da27975..08e9fdd16 100644 --- a/mpm/python/usrp_mpm/gpsd_iface.py +++ b/mpm/python/usrp_mpm/gpsd_iface.py @@ -44,43 +44,62 @@ class GPSDIface(object): def __enter__(self): self.open() - self.watch_query() return self def __exit__(self, exc_type, exc_value, traceback): - self.stop_query() + self.disable_watch() self.close() return exc_type is None def open(self): """Open the socket to GPSD""" self.gpsd_socket.connect(('localhost', 2947)) - self.log.trace("Connected to GPSD.") + version_str = self.read_class("VERSION") + self.enable_watch() + self.log.trace("GPSD version: %s", version_str) def close(self): """Close the socket""" self.gpsd_socket.close() self.log.trace("Closing the connection to GPSD.") - def watch_query(self): + def enable_watch(self): """Send a WATCH command, which starts operation""" - query_cmd = b'?WATCH={"enable":true}' - self.gpsd_socket.sendall(query_cmd) - self.log.trace("Sent query: {}".format(query_cmd)) + self.gpsd_socket.sendall(b'?WATCH={"enable":true};') + self.log.trace(self.read_class("DEVICES")) + self.log.trace(self.read_class("WATCH")) + + def poll_request(self, socket_timeout=60, num_retry=10): + """Send a POLL command - def poll_query(self): - """Send a POLL command""" + Raises + ------ + json.JSONDecodeError + If the data returned from GPSd cannot be decoded with JSON. + RuntimeError + If unsuccessfully connecting to GPSD within num_retry. + """ query_cmd = b'?POLL;' - self.gpsd_socket.sendall(query_cmd) - self.log.trace("Sent query: {}".format(query_cmd)) + for _ in range(num_retry): + try: + self.gpsd_socket.sendall(query_cmd) + return self.read_class("POLL", socket_timeout) + except socket.error: + self.log.warning("Reconnecting to GPSD.") + try: + self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.open() + except socket.error: + self.log.warning("Error during GPSD reconnect.") + continue + raise RuntimeError("Unsuccessfully connecting to GPSD within {} tries".format(num_retry)) - def stop_query(self): + def disable_watch(self): """Send the command to stop operation""" - query_cmd = b'?WATCH={"enable":false}' + query_cmd = b'?WATCH={"enable":false};' self.gpsd_socket.sendall(query_cmd) - self.log.trace("Sent query: {}".format(query_cmd)) - def socket_read_line(self, timeout=60, interval=0.1): + def socket_read_line(self, timeout=60, interval=0): """ Read from a socket until newline. If there was no newline until the timeout occurs, raise an error. Otherwise, return the line. @@ -96,7 +115,21 @@ class GPSDIface(object): line += next_char else: time.sleep(interval) - raise RuntimeError("socket_read_line() exceeded read timeout!") + raise socket.timeout + + def read_class(self, class_name, socket_timeout=60): + """return json data for spcecfic key of 'class' + This function will read until socket timeout (or no data on socket) + + Raises + ------ + json.JSONDecodeError + If the data returned from GPSd cannot be decoded with JSON. + """ + while True: + json_result = json.loads(self.socket_read_line(socket_timeout)) + if json_result.get('class', '') == class_name: + return json_result def get_gps_info(self, resp_class='', timeout=60): """Convenience function for getting a response which contains a response class""" @@ -105,15 +138,9 @@ class GPSDIface(object): end_time = time.time() + timeout while not result.get(resp_class, {}): try: - self.poll_query() - json_result = self.socket_read_line(timeout=timeout) - self.log.trace( - "Received JSON response: {}".format(json_result) - ) - result = json.loads(json_result) - self.log.trace( - "Keys in response: {}".format(result.keys()) - ) + # Do poll request with socket timeout of 5s here. + # It should not be that long, since GPSD should send POLL object promptly. + result = self.poll_request(5) if (resp_class == "") or (time.time() > end_time): # If we don't have a response class filter, just return the first response # or if we timeout @@ -121,6 +148,7 @@ class GPSDIface(object): except json.JSONDecodeError: # If we get an incomplete packet, this will trigger # In this case, just retry + self.log.warning("JSON decode error: %s", result) continue # Filter the result by resp_class or return the entire thing # In the filtered case, the result contains a list of 'resp_class' responses, @@ -168,42 +196,50 @@ class GPSDIfaceExtension(object): def get_gps_time_sensor(self): """ - Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict - This time is not high accuracy. + Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict. + This returns a sensor dictionary on the second edge containing the latest second. + For example, if we call this function at the gps time of 1.001s it will wait until + just after 2.000s to return 2 second. This effect is similar to get gps time on + the next edge of pps. """ - self._log.trace("Polling GPS time results from GPSD") - # Read responses from GPSD until we get a non-trivial mode - self._gpsd_iface.watch_query() + def parse_time(time_str): + """parse a string of time in format of %Y-%m-%dT%H:%M:%S.%fZ + return in unit second + """ + time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ") + epoch_dt = datetime.datetime(1970, 1, 1) + return (time_dt - epoch_dt).total_seconds() + # Read responses from GPSD until we get a non-trivial mode and until next second. + gps_time_prev = 0 while True: gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15) - self._log.trace("GPS info: {}".format(gps_info)) - if gps_info.get("mode", 0) > 0: - break - self._gpsd_iface.stop_query() - time_str = gps_info.get("time", "") - self._log.trace("GPS time string: {}".format(time_str)) - time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ") - self._log.trace("GPS datetime: {}".format(time_dt)) - epoch_dt = datetime.datetime(1970, 1, 1) - gps_time = int((time_dt - epoch_dt).total_seconds()) - return { - 'name': 'gps_time', - 'type': 'INTEGER', - 'unit': 'seconds', - 'value': str(gps_time), - } + gps_mode = gps_info.get("mode", 0) + gps_time = parse_time(gps_info.get("time", "")) + if gps_mode == 0: + self._log.warning("GPSD reported invalid mode." + "Return from GPSD is %s", gps_info) + continue + if gps_time_prev == 0: + gps_time_prev = gps_time + continue + else: + if int(gps_time) - int(gps_time_prev) >= 1: + return { + 'name': 'gps_time', + 'type': 'INTEGER', + 'unit': 'seconds', + 'value': str(int(gps_time)), + } def get_gps_tpv_sensor(self): """Get a TPV response from GPSd as a sensor dict""" self._log.trace("Polling GPS TPV results from GPSD") # Read responses from GPSD until we get a non-trivial mode - self._gpsd_iface.watch_query() while True: gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15) self._log.trace("GPS info: {}".format(gps_info)) if gps_info.get("mode", 0) > 0: break - self._gpsd_iface.stop_query() # Return the JSON'd results gps_tpv = json.dumps(gps_info) return { @@ -217,9 +253,7 @@ class GPSDIfaceExtension(object): """Get a SKY response from GPSd as a sensor dict""" self._log.trace("Polling GPS SKY results from GPSD") # Just get the first SKY result - self._gpsd_iface.watch_query() gps_info = self._gpsd_iface.get_gps_info(resp_class='sky', timeout=15) - self._gpsd_iface.stop_query() # Return the JSON'd results gps_sky = json.dumps(gps_info) return { @@ -250,6 +284,9 @@ def main(): print("Sample result: {}".format(result)) print("TPV: {}".format(tpv_result)) print("SKY: {}".format(sky_result)) + gps_ext = GPSDIfaceExtension() + for _ in range(10): + print(gps_ext.get_gps_time_sensor().get('value')) #TODO Add GPSDIfaceExtension code -- cgit v1.2.3