1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
#
# Copyright 2019 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Utility classes to facilitate unit testing """
import queue
import platform
import os
def on_linux():
"""
Returns True if this is being executed on a Linux system
"""
return 'linux' in platform.system().lower()
def on_usrp():
"""
Returns True if this is being executed on an USRP
"""
# Check device tree standard property for manufacturer info
path = '/sys/firmware/devicetree/base/compatible'
if not os.path.exists(path):
return False
else:
with open(path, 'r') as f:
s = f.read()
# String returned is actually a list of null-terminated strings,
# replace the null-terminations with a separator
s.replace('\x00', ';')
return 'ettus' in s
def _mock_gpiod_pkg():
"""
Replace the gpiod python package with a mock version if import
of the gpiod package fails. This package is not available on all
OS versions that we would like to test in.
"""
try:
import gpiod
except Exception as ex:
# The gpiod package should be available if testing on a USRP
if on_usrp():
raise ex
import sys
sys.modules["gpiod"] = MockGpiod
class MockGpiod(object):
"""
Mocks a portion of the gpiod python package without actually
accessing GPIO hardware.
"""
LINE_REQ_DIR_IN = 0
LINE_REQ_DIR_OUT = 1
_DEFAULT_LINE_VAL = 0
class MockLine(object):
def __init__(self, val):
self.val = val
def request(self):
pass
def release(self):
pass
def get_value(self):
return self.val
def set_value(self, val):
self.val = val
def __init__(self):
self.lines = dict()
def find_line(self, name):
if name not in self.lines.keys():
self.lines[name] = self.MockLine(self._DEFAULT_LINE_VAL)
return self.lines[name]
class MockRegsIface(object):
"""
Mocks the interaction with a register interface by returning
values from an ic_reg_map
"""
def __init__(self, register_map):
self.map = register_map
self.recent_vals = {}
self.next_vals = {}
def peek32(self, addr):
"""
Reads either
a. the default value (if no values were queued)
or
b. the next value queued for this register
"""
if (addr in self.next_vals) and (not self.next_vals[addr].empty()):
return self.next_vals[addr].get_nowait()
else:
return self.map.get_reg(addr)
def poke32(self, addr, value):
"""
Saves a new value to the given register and stores a history
of all values previously set for that register.
"""
self.map.set_reg(addr, value)
# Store written value in a list
if addr in self.recent_vals:
self.recent_vals[addr].append(value)
else:
self.recent_vals[addr] = [value]
def get_recent_vals(self, addr):
"""
Returns the past values written to a given address.
Useful for validating HW interaction
"""
return self.recent_vals.get(addr, [])
def set_next_vals(self, addr, vals):
"""
Sets a list of the next values to be read from the
corresponding register address.
Useful for mocking HW interaction.
"""
if addr not in self.next_vals:
self.next_vals[addr] = queue.Queue()
for val in vals:
self.next_vals[addr].put_nowait(val)
class MockLog(object):
"""
Mocks logging functionality for testing purposes by putting log
messages in a queue.
"""
# The MockLog class is not currently implemented to be thread safe
def __init__(self):
self.error_log = queue.Queue()
self.warning_log = queue.Queue()
self.info_log = queue.Queue()
self.trace_log = queue.Queue()
self.debug_log = queue.Queue()
def error(self, msg):
self.error_log.put_nowait(msg)
def warning(self, msg):
self.warning_log.put_nowait(msg)
def info(self, msg):
self.info_log.put_nowait(msg)
def trace(self, msg):
self.trace_log.put_nowait(msg)
def debug(self, msg):
self.debug_log.put_nowait(msg)
def clear_all(self):
""" Clears all log queues """
self.error_log.queue.clear()
self.warning_log.queue.clear()
self.info_log.queue.clear()
self.trace_log.queue.clear()
self.debug_log.queue.clear()
def get_last_msg(self, log_level):
"""
Gets the last message logged to a given queue. Will return an
empty string if the queue is empty or throw an error if a queue
of that log_level does not exist.
"""
queue_name = log_level + '_log'
if not hasattr(self, queue_name):
raise RuntimeError("Log level {} does not exist in " \
"mock log".format(log_level))
log_messages = getattr(self, queue_name)
if log_messages.empty():
return ''
else:
return log_messages.get_nowait()
# importing this utilities package should mock out the gpiod package
# if necessary so that usrp_mpm can be imported on devices without
# gpiod support for the OS.
_mock_gpiod_pkg()
|