diff options
author | Martin Braun <martin.braun@ettus.com> | 2022-03-01 15:10:17 +0100 |
---|---|---|
committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2022-03-31 08:10:45 -0700 |
commit | f43511e913a9317c0dd886f6459aa32474f407f2 (patch) | |
tree | 8d245e5c407ff501103c995d5994c7292d1f1841 /host/examples/python/replay_capture.py | |
parent | fe2e24e79e3e5d714e2bf30da89ee5cd71dcb399 (diff) | |
download | uhd-f43511e913a9317c0dd886f6459aa32474f407f2.tar.gz uhd-f43511e913a9317c0dd886f6459aa32474f407f2.tar.bz2 uhd-f43511e913a9317c0dd886f6459aa32474f407f2.zip |
examples: Add replay_capture.py
This is an example that allows capturing RF data into DRAM, and then
stream it back to host, using the Python API.
Diffstat (limited to 'host/examples/python/replay_capture.py')
-rwxr-xr-x | host/examples/python/replay_capture.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/host/examples/python/replay_capture.py b/host/examples/python/replay_capture.py new file mode 100755 index 000000000..c540c3034 --- /dev/null +++ b/host/examples/python/replay_capture.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +Captures samples into DRAM, then streams those to the host. + +Note: The --freq, --gain, and --antenna options can take a single value, which +is applied to all channels, or a list of values which is applied to individual +channels. Example: + + replay_capture.py -f 1e9 -g 20 30 -c 0/Radio#0:0 0/Radio#0:1 + +This will use two channels from Radio 0 with a common frequency of 1 GHz. +The first channel will use a gain of 20 dB, and the second a gain of 30 dB. + +A note on hardware capabilities: When downloading data from DRAM onto the host, +it is possible for the network interfaces to not be able to keep up with the +data rates generated by the USRP, and packets will be dropped. + +If this happens, the following options may help: +- Make sure the system configuration is set up for highest performance, e.g. + by adapting network driver settings (see also + https://kb.ettus.com/USRP_Host_Performance_Tuning_Tips_and_Tricks#Increasing_Ring_Buffers) +- Fall back to a slower data rate, e.g., use 1 GbE instead of 10 or 100 GbE +""" + +import os +import sys +import time +import shutil +import tempfile +import argparse +import numpy as np +import uhd +try: + import tqdm + HAVE_TQDM = True +except ImportError: + HAVE_TQDM = False + + +# pylint: disable=too-many-arguments +def parse_args(): + """ + Return parsed command line args + """ + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--args", "-a", type=str, default="", + help="Device args to use when connecting to the USRP.") + parser.add_argument("-o", "--output-file", required=True, + help="Where to store the samples.") + parser.add_argument("-r", "--rate", type=float, + help="Sampling Rate (defaults to maximum rate)") + parser.add_argument("-f", "--freq", type=float, required=True, nargs="+", + help="Center frequency") + parser.add_argument("-g", "--gain", type=int, default=[10], nargs="+", help="Gain (dB)") + parser.add_argument("--antenna", help="Antenna", nargs="+") + parser.add_argument("-d", "--duration", type=float, + help="Duration in seconds to capture. " + "Default behavior is to fill the DRAM.") + parser.add_argument("--delay", "-l", type=float, default=0.5, + help="Capture delay in seconds") + parser.add_argument("-c", "--radio-channels", default=["0/Radio#0:0"], nargs="+", + help="List radios plus their channels " + "(defaults to \"0/Radio#0:0\")") + parser.add_argument("--block", "-b", type=str, default="0/Replay#0", + help="Replay block to test. Defaults to \"0/Replay#0\".") + parser.add_argument("--pkt-size", "-k", type=int, default=None, + help="Playback packet size in bytes. " + "Defaults to maximum packet size for this transport") + parser.add_argument("-n", "--numpy", default=False, action="store_true", + help="Save output file in NumPy format (default: No)") + parser.add_argument("--cpu-format", default="sc16", choices=["sc16", "fc32"], + help="Data format for storing data") + return parser.parse_args() + +def enumerate_radios(graph, radio_chans): + """ + Return a list of radio/chan pairs to use for this test. + """ + radio_id_chan_pairs = [ + (r.split(':', 2)[0], int(r.split(':', 2)[1])) + if ':' in r else (r, 0) + for r in radio_chans + ] + # Sanity checks + available_radios = graph.find_blocks("Radio") + radio_chan_pairs = [] + for rcp in radio_id_chan_pairs: + if rcp[0] not in available_radios: + raise RuntimeError(f"'{rcp[0]}' is not a valid radio block ID!") + radio_chan_pairs.append( + (uhd.rfnoc.RadioControl(graph.get_block(rcp[0])), rcp[1])) + return radio_chan_pairs + +def connect_radios(graph, replay, radio_chan_pairs, freqs, gains, antennas, rate): + """ + Set up the replay/radio part of the graph, and configure radios + """ + if rate is None: + rate = radio_chan_pairs[0][0].get_rate() + print(f"Requested rate: {rate/1e6:.2f} Msps") + actual_rate = None + for replay_port_idx, rcp in enumerate(radio_chan_pairs): + radio, chan = rcp + print( + f"Connecting {rcp[0].get_unique_id()}:{rcp[1]} to " + f"{replay.get_unique_id()}:{replay_port_idx}") + radio.set_rx_frequency(freqs[replay_port_idx % len(freqs)], rcp[1]) + radio.set_rx_gain(gains[replay_port_idx % len(gains)], rcp[1]) + if antennas is not None: + radio.set_rx_antenna(antennas[replay_port_idx % len(antennas)], rcp[1]) + print(f"--> Radio settings: fc={radio.get_rx_frequency(chan)/1e6:.2f} MHz, " + f" gain={radio.get_rx_gain(chan)} dB, " + f"antenna={radio.get_rx_antenna(chan)}") + radio_to_replay_graph = uhd.rfnoc.connect_through_blocks( + graph, + rcp[0].get_unique_id(), rcp[1], + replay.get_unique_id(), replay_port_idx) + ddc_block = next(( + (x.dst_blockid, x.dst_port) + for x in radio_to_replay_graph + if uhd.rfnoc.BlockID(x.dst_blockid).get_block_name() == 'DDC' + ), None) + if ddc_block is not None: + print(f"Found DDC block on channel {chan}.") + this_rate = uhd.rfnoc.DdcBlockControl( + graph.get_block(ddc_block[0])).set_output_rate(rate, rcp[1]) + else: + this_rate = rcp[0].set_rate(rate) + if actual_rate is None: + actual_rate = this_rate + continue + if actual_rate != this_rate: + raise RuntimeError("Unexpected rate mismatch.") + return actual_rate + + +def _sanitize_args(replay, num_ports, num_bytes, pkt_size_bytes=None): + """ + Sanitize requested args based on the capabilities of the replay block + """ + assert num_ports <= replay.get_num_input_ports() + ## Figure out how many bytes to send + mem_size = replay.get_mem_size() + mem_stride = mem_size // num_ports + num_bytes = int(num_bytes) if num_bytes is not None else mem_stride + print(f"Total memory size: {mem_size // 1024 // 1024} MiB") + # Set the number of bytes to test + print(f"Requested Record size per port: {num_bytes // 1024 // 1024} MiB") + if num_bytes > mem_size // num_ports: + num_bytes = mem_size // num_ports + print(f"WARNING: Exceeds allocated space per port! " + f"Reducing to {num_bytes // 1024 // 1024} MiB") + for port in range(num_ports): + print(f"Port {port} address space: " + f"0x{mem_stride*port:08X} - 0x{mem_stride*port+num_bytes:08X}") + replay.set_play_type("sc16", 0) + replay.set_record_type("sc16", 0) + if pkt_size_bytes is not None: + replay.set_max_items_per_packet(pkt_size_bytes // 4, port) + return mem_stride, num_bytes + + +def run_capture(graph, replay, radio_chan_pairs, num_samps, rate, + cap_delay, pkt_size_bytes=None): + """ + Record from radio into DRAM + """ + mem_stride, num_bytes = _sanitize_args( + replay, + len(radio_chan_pairs), + num_samps * 4 if num_samps is not None else None, + pkt_size_bytes, + ) + num_samps = num_bytes // 4 + num_ports = len(radio_chan_pairs) + ## Arm replay block for recording + for idx in range(len(radio_chan_pairs)): + replay.record(idx * mem_stride, num_bytes, idx) + ## Send stream command to all radios + # This 'rate ratio' would be better handled by RFNoC. If the replay block + # were to submit the stream command to the radio, this would not be necessary. + rate_ratio = int(radio_chan_pairs[0][0].get_rate() / rate) + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done) + stream_cmd.num_samps = num_samps * rate_ratio + stream_cmd.stream_now = False + stream_cmd.time_spec = \ + graph.get_mb_controller().get_timekeeper(0).get_time_now() + \ + uhd.types.TimeSpec(cap_delay) + print(f"Requesting {num_samps} samples from {num_ports} radio(s)...") + print(f"Capture will take approx. {num_samps/rate:.1f} seconds...") + for rcp in radio_chan_pairs: + rcp[0].issue_stream_cmd(stream_cmd, rcp[1]) + ## Wait for record buffers to fill up + timeout = time.monotonic() + num_samps / rate + cap_delay + 2.0 + if HAVE_TQDM: + with tqdm.tqdm(total=num_bytes*num_ports, + unit_scale=True, unit="byte") as pbar: + total_bytes_recorded = 0 + while total_bytes_recorded < num_ports * num_bytes: + bytes_recorded = sum([ + replay.get_record_fullness(port) + for port in range(num_ports)]) + pbar.update(bytes_recorded - total_bytes_recorded) + total_bytes_recorded = bytes_recorded + time.sleep(0.100) + if time.monotonic() > timeout: + raise RuntimeError("Timeout while loading replay buffer!") + else: + while any((replay.get_record_fullness(port) < num_bytes + for port in range(num_ports))): + time.sleep(0.200) + if time.monotonic() > timeout: + raise RuntimeError("Timeout while loading replay buffer!") + return num_bytes // 4 + +def rx_data_to_host(rx_streamer, output_data, num_words, pkt_size_words): + """ + Download data from a previously configured replay block via a streamer + object. + """ + print("Downloading data to host...") + rx_md = uhd.types.RXMetadata() + num_ports = rx_streamer.get_num_channels() + num_bytes = num_words * 4 + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done) + stream_cmd.num_samps = num_words + # This is not strictly necessary, but the streamer will not allow a + # multi-chan operation without a time spec. + stream_cmd.stream_now = False + stream_cmd.time_spec = uhd.types.TimeSpec(0.0) + rx_streamer.issue_stream_cmd(stream_cmd) + if HAVE_TQDM: + num_rx = 0 + output_buf = np.zeros((num_ports, pkt_size_words), dtype=output_data.dtype) + with tqdm.tqdm(total=num_bytes*num_ports, + unit_scale=True, unit="byte") as pbar: + while num_rx < num_words: + num_rx_i = rx_streamer.recv(output_buf, rx_md, 1.0) + if rx_md.error_code == uhd.types.RXMetadataErrorCode.timeout: + print("recv() timed out. Exiting...") + break + if rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow and \ + rx_md.out_of_sequence: + print("Detected sequence error!") + elif rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow: + print("ERROR: Overflow detected!") + elif rx_md.error_code != uhd.types.RXMetadataErrorCode.none: + print("ERROR: recv() gave unexpected error code: " + rx_md.strerror()) + pbar.update(num_rx_i * 4 * num_ports) + output_data[:, num_rx:num_rx+num_rx_i] = output_buf[:, 0:num_rx_i] + num_rx += num_rx_i + else: + num_rx = rx_streamer.recv(output_data, rx_md, 5.0) + if rx_md.error_code != uhd.types.RXMetadataErrorCode.none: + print("Error during download: " + rx_md.strerror()) + if num_rx != num_words: + print("ERROR: Fewer samples received than expected!") + print("Download complete.") + return num_rx + + +def main(): + """ + Run capture + """ + args = parse_args() + graph = uhd.rfnoc.RfnocGraph(args.args) + replay = uhd.rfnoc.ReplayBlockControl(graph.get_block(args.block)) + radio_chan_pairs = enumerate_radios(graph, args.radio_channels) + rate = connect_radios(graph, replay, radio_chan_pairs, + args.freq, args.gain, args.antenna, args.rate) + print(f"Using rate: {rate/1e6:.3f} Msps") + # Set up streamer + stream_args = uhd.usrp.StreamArgs(args.cpu_format, "sc16") + rx_streamer = graph.create_rx_streamer(len(radio_chan_pairs), stream_args) + num_ports = rx_streamer.get_num_channels() + for chan in range(len(radio_chan_pairs)): + # This won't work if we can't directly attach the streamer to the + # replay block. + graph.connect(replay.get_unique_id(), chan, rx_streamer, chan) + graph.commit() + num_samps = run_capture( + graph, replay, radio_chan_pairs, + args.duration * rate if args.duration is not None else None, rate, + args.delay, pkt_size_bytes=args.pkt_size) + + if args.numpy: + tmp_dir = tempfile.mkdtemp() + mmap_filename = os.path.join(tmp_dir, 'replay_capture.dat') + else: + mmap_filename = args.output_file + cap_dtype = np.complex64 if args.cpu_format == 'fc32' else np.uint32 + + output_data = np.memmap( + mmap_filename, shape=(num_ports, num_samps), mode='w+', dtype=cap_dtype) + output_data.flush() + + rx_data_to_host(rx_streamer, output_data, num_samps, replay.get_max_packet_size(0)) + if args.numpy: + print(f"Saving data as Numpy array to {args.output_file}...") + with open(args.output_file, 'wb') as out_file: + np.save(out_file, output_data) + output_data = None + shutil.rmtree(tmp_dir) + return True + +if __name__ == "__main__": + sys.exit(not main()) |