1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
#
# Copyright 2021 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
X4XX GPS Manager
Handles GPS-related tasks
"""
import re
from usrp_mpm.gpsd_iface import GPSDIfaceExtension
class X4xxGPSMgr:
"""
Manager class for GPS-related actions for the X4XX.
This also "disables" the sensors when the GPS is not enabled.
"""
def __init__(self, clk_aux_board, log):
assert clk_aux_board and clk_aux_board.is_gps_supported()
self._clocking_auxbrd = clk_aux_board
self.log = log.getChild('GPS')
self.log.trace("Initializing GPSd interface")
self._gpsd = GPSDIfaceExtension()
# To disable sensors, we simply return an empty value if GPS is disabled.
# For TPV, SKY, and GPGGA, we can do this in the same fashion (they are
# very similar). gps_time is different (it returns an int) so for sake
# of simplicity it's defined separately below.
for sensor_name in ('gps_tpv', 'gps_sky', 'gps_gpgga'):
sensor_api = f'get_{sensor_name}_sensor'
setattr(
self, sensor_api,
lambda sensor_name=sensor_name: {
'name': sensor_name, 'type': 'STRING',
'unit': '', 'value': 'n/a'} \
if not self.is_gps_enabled() \
else getattr(self._gpsd, f'get_{sensor_name}_sensor')()
)
def extend(self, context):
"""
Extend 'context' with the sensor methods of this class (get_gps_*_sensor).
If 'context' already has such a method, it is skipped.
Returns a dictionary compatible to mboard_sensor_callback_map.
"""
new_methods = {
re.search(r"get_(.*)_sensor", method_name).group(1): method_name
for method_name in dir(self)
if not method_name.startswith('_') \
and callable(getattr(self, method_name)) \
and method_name.endswith("sensor")}
for method_name in new_methods.values():
if hasattr(context, method_name):
continue
new_method = getattr(self, method_name)
self.log.trace("%s: Adding %s method", context, method_name)
setattr(context, method_name, new_method)
return new_methods
def is_gps_enabled(self):
"""
Return True if the GPS is enabled/active.
"""
return self._clocking_auxbrd.is_gps_enabled()
def get_gps_enabled_sensor(self):
"""
Get enabled status of GPS as a sensor dict
"""
gps_enabled = self.is_gps_enabled()
return {
'name': 'gps_enabled',
'type': 'BOOLEAN',
'unit': 'enabled' if gps_enabled else 'disabled',
'value': str(gps_enabled).lower(),
}
def get_gps_locked_sensor(self):
"""
Get lock status of GPS as a sensor dict
"""
gps_locked = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_lock())
return {
'name': 'gps_lock',
'type': 'BOOLEAN',
'unit': 'locked' if gps_locked else 'unlocked',
'value': str(gps_locked).lower(),
}
def get_gps_alarm_sensor(self):
"""
Get alarm status of GPS as a sensor dict
"""
gps_alarm = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_alarm())
return {
'name': 'gps_alarm',
'type': 'BOOLEAN',
'unit': 'active' if gps_alarm else 'not active',
'value': str(gps_alarm).lower(),
}
def get_gps_warmup_sensor(self):
"""
Get warmup status of GPS as a sensor dict
"""
gps_warmup = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_warmup())
return {
'name': 'gps_warmup',
'type': 'BOOLEAN',
'unit': 'warming up' if gps_warmup else 'warmup done',
'value': str(gps_warmup).lower(),
}
def get_gps_survey_sensor(self):
"""
Get survey status of GPS as a sensor dict
"""
gps_survey = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_survey())
return {
'name': 'gps_survey',
'type': 'BOOLEAN',
'unit': 'survey active' if gps_survey else 'survey not active',
'value': str(gps_survey).lower(),
}
def get_gps_phase_lock_sensor(self):
"""
Get phase_lock status of GPS as a sensor dict
"""
gps_phase_lock = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_phase_lock())
return {
'name': 'gps_phase_lock',
'type': 'BOOLEAN',
'unit': 'phase locked' if gps_phase_lock else 'no phase lock',
'value': str(gps_phase_lock).lower(),
}
def get_gps_time_sensor(self):
"""
"""
if not self.is_gps_enabled():
return {
'name': 'gps_time',
'type': 'INTEGER',
'unit': 'seconds',
'value': str(-1),
}
return self._gpsd.get_gps_time_sensor()
|