aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/sample_source.py
blob: 36278efd82f44fb6fccbc8457b0fa9fbd085091a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#
# Copyright 2020 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
This module contains the interface for providing data to a simulator
stream and receiving data from a simulator stream.
"""

sources = {}
sinks = {}

def cli_source(cls):
    """This decorator adds a class to the global list of SampleSources"""
    sources[cls.__name__] = cls
    return cls

def cli_sink(cls):
    """This decorator adds a class to the global list of SampleSinks"""
    sinks[cls.__name__] = cls
    return cls

class SampleSource:
    """This class defines the interface of a SampleSource. It
    provides samples to the simulator which are then sent over the
    network to a UHD client.
    """
    def fill_packet(self, packet, payload_size):
        """This method should fill the packet with enough samples to
        make its payload payload_size bytes long.
        Returning None signals that this source is exhausted.
        """
        raise NotImplementedError()

    def close(self):
        """Use this to clean up any resources held by the object"""
        raise NotImplementedError()

class SampleSink:
    """This class provides the interface of a SampleSink. It serves
    as a destination for smaples received over the network from a
    UHD client.
    """
    def accept_packet(self, packet):
        """Called whenever a new packet is received"""
        raise NotImplementedError()

    def close(self):
        """Use this to clean up any resources held by the object"""
        raise NotImplementedError()

@cli_source
@cli_sink
class NullSamples(SampleSource, SampleSink):
    """This combination source/sink simply provides an infinite
    number of samples with a value of zero. You may optionally provide
    a log object which will enable debug output.
    """
    def __init__(self, log=None):
        self.log = log

    def fill_packet(self, packet, payload_size):
        if self.log is not None:
            self.log.debug("Null Source called, providing {} bytes of zeroes".format(payload_size))
        payload = bytes(payload_size)
        packet.set_payload_bytes(payload)
        return packet

    def accept_packet(self, packet):
        if self.log is not None:
            self.log.debug("Null Source called, accepting {} bytes of payload"
                           .format(len(packet.get_payload_bytes())))

    def close(self):
        pass

class IOSource(SampleSource):
    """This adaptor class creates a sample source using a read object
    that provides a read(# of bytes) function.
    (e.g. the result of an open("<filename>", "rb") call)
    """
    def __init__(self, read):
        self.read_obj = read

    def fill_packet(self, packet, payload_size):
        payload = self.read_obj.read(payload_size)
        if len(payload) == 0:
            return None
        packet.set_payload_bytes(payload)
        return packet

    def close(self):
        self.read_obj.close()

class IOSink(SampleSink):
    """This adaptor class creates a sample sink using a write object
    that provides a write(bytes) function.
    (e.g. the result of an open("<filename>", "wb") call)
    """
    def __init__(self, write):
        self.write_obj = write

    def accept_packet(self, packet):
        payload = packet.get_payload_bytes()
        written = self.write_obj.write(bytes(payload))
        assert written == len(payload)

    def close(self):
        self.write_obj.close()

@cli_source
class FileSource(IOSource):
    """This class creates a SampleSource using a file path"""
    def __init__(self, read_file, repeat=False):
        self.open = lambda: open(read_file, "rb")
        if isinstance(repeat, bool):
            self.repeat = repeat
        else:
            self.repeat = repeat == "True"
        read = self.open()
        super().__init__(read)

    def fill_packet(self, packet, payload_size):
        payload = self.read_obj.read(payload_size)
        if len(payload) == 0:
            if self.repeat:
                self.read_obj.close()
                self.read_obj = self.open()
                payload = self.read_obj.read(payload_size)
            else:
                return None
        packet.set_payload_bytes(payload)
        return packet

@cli_sink
class FileSink(IOSink):
    """This class creates a SampleSink using a file path"""
    def __init__(self, write_file):
        write = open(write_file, "wb")
        super().__init__(write)