1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#
# Copyright 2020 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
This module contains the interface for providing data to a simulator
stream and receiving data from a simulator stream.
"""
sources = {}
sinks = {}
def cli_source(cls):
"""This decorator adds a class to the global list of SampleSources"""
sources[cls.__name__] = cls
return cls
def cli_sink(cls):
"""This decorator adds a class to the global list of SampleSinks"""
sinks[cls.__name__] = cls
return cls
class SampleSource:
"""This class defines the interface of a SampleSource. It
provides samples to the simulator which are then sent over the
network to a UHD client.
"""
def fill_packet(self, packet, payload_size):
"""This method should fill the packet with enough samples to
make its payload payload_size bytes long.
Returning None signals that this source is exhausted.
"""
raise NotImplementedError()
def close(self):
"""Use this to clean up any resources held by the object"""
raise NotImplementedError()
class SampleSink:
"""This class provides the interface of a SampleSink. It serves
as a destination for smaples received over the network from a
UHD client.
"""
def accept_packet(self, packet):
"""Called whenever a new packet is received"""
raise NotImplementedError()
def close(self):
"""Use this to clean up any resources held by the object"""
raise NotImplementedError()
@cli_source
@cli_sink
class NullSamples(SampleSource, SampleSink):
"""This combination source/sink simply provides an infinite
number of samples with a value of zero. You may optionally provide
a log object which will enable debug output.
"""
def __init__(self, log=None):
self.log = log
def fill_packet(self, packet, payload_size):
if self.log is not None:
self.log.debug("Null Source called, providing {} bytes of zeroes".format(payload_size))
payload = bytes(payload_size)
packet.set_payload_bytes(payload)
return packet
def accept_packet(self, packet):
if self.log is not None:
self.log.debug("Null Source called, accepting {} bytes of payload"
.format(len(packet.get_payload_bytes())))
def close(self):
pass
class IOSource(SampleSource):
"""This adaptor class creates a sample source using a read object
that provides a read(# of bytes) function.
(e.g. the result of an open("<filename>", "rb") call)
"""
def __init__(self, read):
self.read_obj = read
def fill_packet(self, packet, payload_size):
payload = self.read_obj.read(payload_size)
if len(payload) == 0:
return None
packet.set_payload_bytes(payload)
return packet
def close(self):
self.read_obj.close()
class IOSink(SampleSink):
"""This adaptor class creates a sample sink using a write object
that provides a write(bytes) function.
(e.g. the result of an open("<filename>", "wb") call)
"""
def __init__(self, write):
self.write_obj = write
def accept_packet(self, packet):
payload = packet.get_payload_bytes()
written = self.write_obj.write(bytes(payload))
assert written == len(payload)
def close(self):
self.write_obj.close()
@cli_source
class FileSource(IOSource):
"""This class creates a SampleSource using a file path"""
def __init__(self, read_file, repeat=False):
self.open = lambda: open(read_file, "rb")
if isinstance(repeat, bool):
self.repeat = repeat
else:
self.repeat = repeat == "True"
read = self.open()
super().__init__(read)
def fill_packet(self, packet, payload_size):
payload = self.read_obj.read(payload_size)
if len(payload) == 0:
if self.repeat:
self.read_obj.close()
self.read_obj = self.open()
payload = self.read_obj.read(payload_size)
else:
return None
packet.set_payload_bytes(payload)
return packet
@cli_sink
class FileSink(IOSink):
"""This class creates a SampleSink using a file path"""
def __init__(self, write_file):
write = open(write_file, "wb")
super().__init__(write)
|