aboutsummaryrefslogtreecommitdiffstats
path: root/host/examples/python/replay_capture.py
blob: c540c3034934ef46d9cc21f3975dca308f0abdee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python3
"""
Captures samples into DRAM, then streams those to the host.

Note: The --freq, --gain, and --antenna options can take a single value, which
is applied to all channels, or a list of values which is applied to individual
channels. Example:

    replay_capture.py -f 1e9 -g 20 30 -c 0/Radio#0:0 0/Radio#0:1

This will use two channels from Radio 0 with a common frequency of 1 GHz.
The first channel will use a gain of 20 dB, and the second a gain of 30 dB.

A note on hardware capabilities: When downloading data from DRAM onto the host,
it is possible for the network interfaces to not be able to keep up with the
data rates generated by the USRP, and packets will be dropped.

If this happens, the following options may help:
- Make sure the system configuration is set up for highest performance, e.g.
  by adapting network driver settings (see also
  https://kb.ettus.com/USRP_Host_Performance_Tuning_Tips_and_Tricks#Increasing_Ring_Buffers)
- Fall back to a slower data rate, e.g., use 1 GbE instead of 10 or 100 GbE
"""

import os
import sys
import time
import shutil
import tempfile
import argparse
import numpy as np
import uhd
try:
    import tqdm
    HAVE_TQDM = True
except ImportError:
    HAVE_TQDM = False


# pylint: disable=too-many-arguments
def parse_args():
    """
    Return parsed command line args
    """
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument("--args", "-a", type=str, default="",
                        help="Device args to use when connecting to the USRP.")
    parser.add_argument("-o", "--output-file", required=True,
                        help="Where to store the samples.")
    parser.add_argument("-r", "--rate", type=float,
                        help="Sampling Rate (defaults to maximum rate)")
    parser.add_argument("-f", "--freq", type=float, required=True, nargs="+",
                        help="Center frequency")
    parser.add_argument("-g", "--gain", type=int, default=[10], nargs="+", help="Gain (dB)")
    parser.add_argument("--antenna", help="Antenna", nargs="+")
    parser.add_argument("-d", "--duration", type=float,
                        help="Duration in seconds to capture. "
                             "Default behavior is to fill the DRAM.")
    parser.add_argument("--delay", "-l", type=float, default=0.5,
                        help="Capture delay in seconds")
    parser.add_argument("-c", "--radio-channels", default=["0/Radio#0:0"], nargs="+",
                        help="List radios plus their channels "
                             "(defaults to \"0/Radio#0:0\")")
    parser.add_argument("--block", "-b", type=str, default="0/Replay#0",
                        help="Replay block to test. Defaults to \"0/Replay#0\".")
    parser.add_argument("--pkt-size", "-k", type=int, default=None,
                        help="Playback packet size in bytes. "
                             "Defaults to maximum packet size for this transport")
    parser.add_argument("-n", "--numpy", default=False, action="store_true",
                        help="Save output file in NumPy format (default: No)")
    parser.add_argument("--cpu-format", default="sc16", choices=["sc16", "fc32"],
                        help="Data format for storing data")
    return parser.parse_args()

def enumerate_radios(graph, radio_chans):
    """
    Return a list of radio/chan pairs to use for this test.
    """
    radio_id_chan_pairs = [
        (r.split(':', 2)[0], int(r.split(':', 2)[1]))
        if ':' in r else (r, 0)
        for r in radio_chans
    ]
    # Sanity checks
    available_radios = graph.find_blocks("Radio")
    radio_chan_pairs = []
    for rcp in radio_id_chan_pairs:
        if rcp[0] not in available_radios:
            raise RuntimeError(f"'{rcp[0]}' is not a valid radio block ID!")
        radio_chan_pairs.append(
            (uhd.rfnoc.RadioControl(graph.get_block(rcp[0])), rcp[1]))
    return radio_chan_pairs

def connect_radios(graph, replay, radio_chan_pairs, freqs, gains, antennas, rate):
    """
    Set up the replay/radio part of the graph, and configure radios
    """
    if rate is None:
        rate = radio_chan_pairs[0][0].get_rate()
    print(f"Requested rate: {rate/1e6:.2f} Msps")
    actual_rate = None
    for replay_port_idx, rcp in enumerate(radio_chan_pairs):
        radio, chan = rcp
        print(
            f"Connecting {rcp[0].get_unique_id()}:{rcp[1]} to "
            f"{replay.get_unique_id()}:{replay_port_idx}")
        radio.set_rx_frequency(freqs[replay_port_idx % len(freqs)], rcp[1])
        radio.set_rx_gain(gains[replay_port_idx % len(gains)], rcp[1])
        if antennas is not None:
            radio.set_rx_antenna(antennas[replay_port_idx % len(antennas)], rcp[1])
        print(f"--> Radio settings: fc={radio.get_rx_frequency(chan)/1e6:.2f} MHz, "
              f" gain={radio.get_rx_gain(chan)} dB, "
              f"antenna={radio.get_rx_antenna(chan)}")
        radio_to_replay_graph = uhd.rfnoc.connect_through_blocks(
            graph,
            rcp[0].get_unique_id(), rcp[1],
            replay.get_unique_id(), replay_port_idx)
        ddc_block = next((
            (x.dst_blockid, x.dst_port)
            for x in radio_to_replay_graph
            if uhd.rfnoc.BlockID(x.dst_blockid).get_block_name() == 'DDC'
        ), None)
        if ddc_block is not None:
            print(f"Found DDC block on channel {chan}.")
            this_rate = uhd.rfnoc.DdcBlockControl(
                graph.get_block(ddc_block[0])).set_output_rate(rate, rcp[1])
        else:
            this_rate = rcp[0].set_rate(rate)
        if actual_rate is None:
            actual_rate = this_rate
            continue
        if actual_rate != this_rate:
            raise RuntimeError("Unexpected rate mismatch.")
    return actual_rate


def _sanitize_args(replay, num_ports, num_bytes, pkt_size_bytes=None):
    """
    Sanitize requested args based on the capabilities of the replay block
    """
    assert num_ports <= replay.get_num_input_ports()
    ## Figure out how many bytes to send
    mem_size = replay.get_mem_size()
    mem_stride = mem_size // num_ports
    num_bytes = int(num_bytes) if num_bytes is not None else mem_stride
    print(f"Total memory size: {mem_size // 1024 // 1024} MiB")
    # Set the number of bytes to test
    print(f"Requested Record size per port: {num_bytes // 1024 // 1024} MiB")
    if num_bytes > mem_size // num_ports:
        num_bytes = mem_size // num_ports
        print(f"WARNING: Exceeds allocated space per port! "
              f"Reducing to {num_bytes // 1024 // 1024} MiB")
    for port in range(num_ports):
        print(f"Port {port} address space: "
              f"0x{mem_stride*port:08X} - 0x{mem_stride*port+num_bytes:08X}")
        replay.set_play_type("sc16", 0)
        replay.set_record_type("sc16", 0)
        if pkt_size_bytes is not None:
            replay.set_max_items_per_packet(pkt_size_bytes // 4, port)
    return mem_stride, num_bytes


def run_capture(graph, replay, radio_chan_pairs, num_samps, rate,
                cap_delay, pkt_size_bytes=None):
    """
    Record from radio into DRAM
    """
    mem_stride, num_bytes = _sanitize_args(
        replay,
        len(radio_chan_pairs),
        num_samps * 4 if num_samps is not None else None,
        pkt_size_bytes,
    )
    num_samps = num_bytes // 4
    num_ports = len(radio_chan_pairs)
    ## Arm replay block for recording
    for idx in range(len(radio_chan_pairs)):
        replay.record(idx * mem_stride, num_bytes, idx)
    ## Send stream command to all radios
    # This 'rate ratio' would be better handled by RFNoC. If the replay block
    # were to submit the stream command to the radio, this would not be necessary.
    rate_ratio = int(radio_chan_pairs[0][0].get_rate() / rate)
    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
    stream_cmd.num_samps = num_samps * rate_ratio
    stream_cmd.stream_now = False
    stream_cmd.time_spec = \
            graph.get_mb_controller().get_timekeeper(0).get_time_now() + \
            uhd.types.TimeSpec(cap_delay)
    print(f"Requesting {num_samps} samples from {num_ports} radio(s)...")
    print(f"Capture will take approx. {num_samps/rate:.1f} seconds...")
    for rcp in radio_chan_pairs:
        rcp[0].issue_stream_cmd(stream_cmd, rcp[1])
    ## Wait for record buffers to fill up
    timeout = time.monotonic() + num_samps / rate + cap_delay + 2.0
    if HAVE_TQDM:
        with tqdm.tqdm(total=num_bytes*num_ports,
                       unit_scale=True, unit="byte") as pbar:
            total_bytes_recorded = 0
            while total_bytes_recorded < num_ports * num_bytes:
                bytes_recorded = sum([
                    replay.get_record_fullness(port)
                    for port in range(num_ports)])
                pbar.update(bytes_recorded - total_bytes_recorded)
                total_bytes_recorded = bytes_recorded
                time.sleep(0.100)
                if time.monotonic() > timeout:
                    raise RuntimeError("Timeout while loading replay buffer!")
    else:
        while any((replay.get_record_fullness(port) < num_bytes
                   for port in range(num_ports))):
            time.sleep(0.200)
            if time.monotonic() > timeout:
                raise RuntimeError("Timeout while loading replay buffer!")
    return num_bytes // 4

def rx_data_to_host(rx_streamer, output_data, num_words, pkt_size_words):
    """
    Download data from a previously configured replay block via a streamer
    object.
    """
    print("Downloading data to host...")
    rx_md = uhd.types.RXMetadata()
    num_ports = rx_streamer.get_num_channels()
    num_bytes = num_words * 4
    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
    stream_cmd.num_samps = num_words
    # This is not strictly necessary, but the streamer will not allow a
    # multi-chan operation without a time spec.
    stream_cmd.stream_now = False
    stream_cmd.time_spec = uhd.types.TimeSpec(0.0)
    rx_streamer.issue_stream_cmd(stream_cmd)
    if HAVE_TQDM:
        num_rx = 0
        output_buf = np.zeros((num_ports, pkt_size_words), dtype=output_data.dtype)
        with tqdm.tqdm(total=num_bytes*num_ports,
                       unit_scale=True, unit="byte") as pbar:
            while num_rx < num_words:
                num_rx_i = rx_streamer.recv(output_buf, rx_md, 1.0)
                if rx_md.error_code == uhd.types.RXMetadataErrorCode.timeout:
                    print("recv() timed out. Exiting...")
                    break
                if rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow and \
                        rx_md.out_of_sequence:
                    print("Detected sequence error!")
                elif rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow:
                    print("ERROR: Overflow detected!")
                elif rx_md.error_code != uhd.types.RXMetadataErrorCode.none:
                    print("ERROR: recv() gave unexpected error code: " + rx_md.strerror())
                pbar.update(num_rx_i * 4 * num_ports)
                output_data[:, num_rx:num_rx+num_rx_i] = output_buf[:, 0:num_rx_i]
                num_rx += num_rx_i
    else:
        num_rx = rx_streamer.recv(output_data, rx_md, 5.0)
        if rx_md.error_code != uhd.types.RXMetadataErrorCode.none:
            print("Error during download: " + rx_md.strerror())
    if num_rx != num_words:
        print("ERROR: Fewer samples received than expected!")
    print("Download complete.")
    return num_rx


def main():
    """
    Run capture
    """
    args = parse_args()
    graph = uhd.rfnoc.RfnocGraph(args.args)
    replay = uhd.rfnoc.ReplayBlockControl(graph.get_block(args.block))
    radio_chan_pairs = enumerate_radios(graph, args.radio_channels)
    rate = connect_radios(graph, replay, radio_chan_pairs,
                          args.freq, args.gain, args.antenna, args.rate)
    print(f"Using rate: {rate/1e6:.3f} Msps")
    # Set up streamer
    stream_args = uhd.usrp.StreamArgs(args.cpu_format, "sc16")
    rx_streamer = graph.create_rx_streamer(len(radio_chan_pairs), stream_args)
    num_ports = rx_streamer.get_num_channels()
    for chan in range(len(radio_chan_pairs)):
        # This won't work if we can't directly attach the streamer to the
        # replay block.
        graph.connect(replay.get_unique_id(), chan, rx_streamer, chan)
    graph.commit()
    num_samps = run_capture(
        graph, replay, radio_chan_pairs,
        args.duration * rate if args.duration is not None else None, rate,
        args.delay, pkt_size_bytes=args.pkt_size)

    if args.numpy:
        tmp_dir = tempfile.mkdtemp()
        mmap_filename = os.path.join(tmp_dir, 'replay_capture.dat')
    else:
        mmap_filename = args.output_file
    cap_dtype = np.complex64 if args.cpu_format == 'fc32' else np.uint32

    output_data = np.memmap(
        mmap_filename, shape=(num_ports, num_samps), mode='w+', dtype=cap_dtype)
    output_data.flush()

    rx_data_to_host(rx_streamer, output_data, num_samps, replay.get_max_packet_size(0))
    if args.numpy:
        print(f"Saving data as Numpy array to {args.output_file}...")
        with open(args.output_file, 'wb') as out_file:
            np.save(out_file, output_data)
        output_data = None
        shutil.rmtree(tmp_dir)
    return True

if __name__ == "__main__":
    sys.exit(not main())