aboutsummaryrefslogtreecommitdiffstats
path: root/host/examples/python/replay_capture.py
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2022-03-01 15:10:17 +0100
committerAaron Rossetto <aaron.rossetto@ni.com>2022-03-31 08:10:45 -0700
commitf43511e913a9317c0dd886f6459aa32474f407f2 (patch)
tree8d245e5c407ff501103c995d5994c7292d1f1841 /host/examples/python/replay_capture.py
parentfe2e24e79e3e5d714e2bf30da89ee5cd71dcb399 (diff)
downloaduhd-f43511e913a9317c0dd886f6459aa32474f407f2.tar.gz
uhd-f43511e913a9317c0dd886f6459aa32474f407f2.tar.bz2
uhd-f43511e913a9317c0dd886f6459aa32474f407f2.zip
examples: Add replay_capture.py
This is an example that allows capturing RF data into DRAM, and then stream it back to host, using the Python API.
Diffstat (limited to 'host/examples/python/replay_capture.py')
-rwxr-xr-xhost/examples/python/replay_capture.py310
1 files changed, 310 insertions, 0 deletions
diff --git a/host/examples/python/replay_capture.py b/host/examples/python/replay_capture.py
new file mode 100755
index 000000000..c540c3034
--- /dev/null
+++ b/host/examples/python/replay_capture.py
@@ -0,0 +1,310 @@
+#!/usr/bin/env python3
+"""
+Captures samples into DRAM, then streams those to the host.
+
+Note: The --freq, --gain, and --antenna options can take a single value, which
+is applied to all channels, or a list of values which is applied to individual
+channels. Example:
+
+ replay_capture.py -f 1e9 -g 20 30 -c 0/Radio#0:0 0/Radio#0:1
+
+This will use two channels from Radio 0 with a common frequency of 1 GHz.
+The first channel will use a gain of 20 dB, and the second a gain of 30 dB.
+
+A note on hardware capabilities: When downloading data from DRAM onto the host,
+it is possible for the network interfaces to not be able to keep up with the
+data rates generated by the USRP, and packets will be dropped.
+
+If this happens, the following options may help:
+- Make sure the system configuration is set up for highest performance, e.g.
+ by adapting network driver settings (see also
+ https://kb.ettus.com/USRP_Host_Performance_Tuning_Tips_and_Tricks#Increasing_Ring_Buffers)
+- Fall back to a slower data rate, e.g., use 1 GbE instead of 10 or 100 GbE
+"""
+
+import os
+import sys
+import time
+import shutil
+import tempfile
+import argparse
+import numpy as np
+import uhd
+try:
+ import tqdm
+ HAVE_TQDM = True
+except ImportError:
+ HAVE_TQDM = False
+
+
+# pylint: disable=too-many-arguments
+def parse_args():
+ """
+ Return parsed command line args
+ """
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--args", "-a", type=str, default="",
+ help="Device args to use when connecting to the USRP.")
+ parser.add_argument("-o", "--output-file", required=True,
+ help="Where to store the samples.")
+ parser.add_argument("-r", "--rate", type=float,
+ help="Sampling Rate (defaults to maximum rate)")
+ parser.add_argument("-f", "--freq", type=float, required=True, nargs="+",
+ help="Center frequency")
+ parser.add_argument("-g", "--gain", type=int, default=[10], nargs="+", help="Gain (dB)")
+ parser.add_argument("--antenna", help="Antenna", nargs="+")
+ parser.add_argument("-d", "--duration", type=float,
+ help="Duration in seconds to capture. "
+ "Default behavior is to fill the DRAM.")
+ parser.add_argument("--delay", "-l", type=float, default=0.5,
+ help="Capture delay in seconds")
+ parser.add_argument("-c", "--radio-channels", default=["0/Radio#0:0"], nargs="+",
+ help="List radios plus their channels "
+ "(defaults to \"0/Radio#0:0\")")
+ parser.add_argument("--block", "-b", type=str, default="0/Replay#0",
+ help="Replay block to test. Defaults to \"0/Replay#0\".")
+ parser.add_argument("--pkt-size", "-k", type=int, default=None,
+ help="Playback packet size in bytes. "
+ "Defaults to maximum packet size for this transport")
+ parser.add_argument("-n", "--numpy", default=False, action="store_true",
+ help="Save output file in NumPy format (default: No)")
+ parser.add_argument("--cpu-format", default="sc16", choices=["sc16", "fc32"],
+ help="Data format for storing data")
+ return parser.parse_args()
+
+def enumerate_radios(graph, radio_chans):
+ """
+ Return a list of radio/chan pairs to use for this test.
+ """
+ radio_id_chan_pairs = [
+ (r.split(':', 2)[0], int(r.split(':', 2)[1]))
+ if ':' in r else (r, 0)
+ for r in radio_chans
+ ]
+ # Sanity checks
+ available_radios = graph.find_blocks("Radio")
+ radio_chan_pairs = []
+ for rcp in radio_id_chan_pairs:
+ if rcp[0] not in available_radios:
+ raise RuntimeError(f"'{rcp[0]}' is not a valid radio block ID!")
+ radio_chan_pairs.append(
+ (uhd.rfnoc.RadioControl(graph.get_block(rcp[0])), rcp[1]))
+ return radio_chan_pairs
+
+def connect_radios(graph, replay, radio_chan_pairs, freqs, gains, antennas, rate):
+ """
+ Set up the replay/radio part of the graph, and configure radios
+ """
+ if rate is None:
+ rate = radio_chan_pairs[0][0].get_rate()
+ print(f"Requested rate: {rate/1e6:.2f} Msps")
+ actual_rate = None
+ for replay_port_idx, rcp in enumerate(radio_chan_pairs):
+ radio, chan = rcp
+ print(
+ f"Connecting {rcp[0].get_unique_id()}:{rcp[1]} to "
+ f"{replay.get_unique_id()}:{replay_port_idx}")
+ radio.set_rx_frequency(freqs[replay_port_idx % len(freqs)], rcp[1])
+ radio.set_rx_gain(gains[replay_port_idx % len(gains)], rcp[1])
+ if antennas is not None:
+ radio.set_rx_antenna(antennas[replay_port_idx % len(antennas)], rcp[1])
+ print(f"--> Radio settings: fc={radio.get_rx_frequency(chan)/1e6:.2f} MHz, "
+ f" gain={radio.get_rx_gain(chan)} dB, "
+ f"antenna={radio.get_rx_antenna(chan)}")
+ radio_to_replay_graph = uhd.rfnoc.connect_through_blocks(
+ graph,
+ rcp[0].get_unique_id(), rcp[1],
+ replay.get_unique_id(), replay_port_idx)
+ ddc_block = next((
+ (x.dst_blockid, x.dst_port)
+ for x in radio_to_replay_graph
+ if uhd.rfnoc.BlockID(x.dst_blockid).get_block_name() == 'DDC'
+ ), None)
+ if ddc_block is not None:
+ print(f"Found DDC block on channel {chan}.")
+ this_rate = uhd.rfnoc.DdcBlockControl(
+ graph.get_block(ddc_block[0])).set_output_rate(rate, rcp[1])
+ else:
+ this_rate = rcp[0].set_rate(rate)
+ if actual_rate is None:
+ actual_rate = this_rate
+ continue
+ if actual_rate != this_rate:
+ raise RuntimeError("Unexpected rate mismatch.")
+ return actual_rate
+
+
+def _sanitize_args(replay, num_ports, num_bytes, pkt_size_bytes=None):
+ """
+ Sanitize requested args based on the capabilities of the replay block
+ """
+ assert num_ports <= replay.get_num_input_ports()
+ ## Figure out how many bytes to send
+ mem_size = replay.get_mem_size()
+ mem_stride = mem_size // num_ports
+ num_bytes = int(num_bytes) if num_bytes is not None else mem_stride
+ print(f"Total memory size: {mem_size // 1024 // 1024} MiB")
+ # Set the number of bytes to test
+ print(f"Requested Record size per port: {num_bytes // 1024 // 1024} MiB")
+ if num_bytes > mem_size // num_ports:
+ num_bytes = mem_size // num_ports
+ print(f"WARNING: Exceeds allocated space per port! "
+ f"Reducing to {num_bytes // 1024 // 1024} MiB")
+ for port in range(num_ports):
+ print(f"Port {port} address space: "
+ f"0x{mem_stride*port:08X} - 0x{mem_stride*port+num_bytes:08X}")
+ replay.set_play_type("sc16", 0)
+ replay.set_record_type("sc16", 0)
+ if pkt_size_bytes is not None:
+ replay.set_max_items_per_packet(pkt_size_bytes // 4, port)
+ return mem_stride, num_bytes
+
+
+def run_capture(graph, replay, radio_chan_pairs, num_samps, rate,
+ cap_delay, pkt_size_bytes=None):
+ """
+ Record from radio into DRAM
+ """
+ mem_stride, num_bytes = _sanitize_args(
+ replay,
+ len(radio_chan_pairs),
+ num_samps * 4 if num_samps is not None else None,
+ pkt_size_bytes,
+ )
+ num_samps = num_bytes // 4
+ num_ports = len(radio_chan_pairs)
+ ## Arm replay block for recording
+ for idx in range(len(radio_chan_pairs)):
+ replay.record(idx * mem_stride, num_bytes, idx)
+ ## Send stream command to all radios
+ # This 'rate ratio' would be better handled by RFNoC. If the replay block
+ # were to submit the stream command to the radio, this would not be necessary.
+ rate_ratio = int(radio_chan_pairs[0][0].get_rate() / rate)
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
+ stream_cmd.num_samps = num_samps * rate_ratio
+ stream_cmd.stream_now = False
+ stream_cmd.time_spec = \
+ graph.get_mb_controller().get_timekeeper(0).get_time_now() + \
+ uhd.types.TimeSpec(cap_delay)
+ print(f"Requesting {num_samps} samples from {num_ports} radio(s)...")
+ print(f"Capture will take approx. {num_samps/rate:.1f} seconds...")
+ for rcp in radio_chan_pairs:
+ rcp[0].issue_stream_cmd(stream_cmd, rcp[1])
+ ## Wait for record buffers to fill up
+ timeout = time.monotonic() + num_samps / rate + cap_delay + 2.0
+ if HAVE_TQDM:
+ with tqdm.tqdm(total=num_bytes*num_ports,
+ unit_scale=True, unit="byte") as pbar:
+ total_bytes_recorded = 0
+ while total_bytes_recorded < num_ports * num_bytes:
+ bytes_recorded = sum([
+ replay.get_record_fullness(port)
+ for port in range(num_ports)])
+ pbar.update(bytes_recorded - total_bytes_recorded)
+ total_bytes_recorded = bytes_recorded
+ time.sleep(0.100)
+ if time.monotonic() > timeout:
+ raise RuntimeError("Timeout while loading replay buffer!")
+ else:
+ while any((replay.get_record_fullness(port) < num_bytes
+ for port in range(num_ports))):
+ time.sleep(0.200)
+ if time.monotonic() > timeout:
+ raise RuntimeError("Timeout while loading replay buffer!")
+ return num_bytes // 4
+
+def rx_data_to_host(rx_streamer, output_data, num_words, pkt_size_words):
+ """
+ Download data from a previously configured replay block via a streamer
+ object.
+ """
+ print("Downloading data to host...")
+ rx_md = uhd.types.RXMetadata()
+ num_ports = rx_streamer.get_num_channels()
+ num_bytes = num_words * 4
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
+ stream_cmd.num_samps = num_words
+ # This is not strictly necessary, but the streamer will not allow a
+ # multi-chan operation without a time spec.
+ stream_cmd.stream_now = False
+ stream_cmd.time_spec = uhd.types.TimeSpec(0.0)
+ rx_streamer.issue_stream_cmd(stream_cmd)
+ if HAVE_TQDM:
+ num_rx = 0
+ output_buf = np.zeros((num_ports, pkt_size_words), dtype=output_data.dtype)
+ with tqdm.tqdm(total=num_bytes*num_ports,
+ unit_scale=True, unit="byte") as pbar:
+ while num_rx < num_words:
+ num_rx_i = rx_streamer.recv(output_buf, rx_md, 1.0)
+ if rx_md.error_code == uhd.types.RXMetadataErrorCode.timeout:
+ print("recv() timed out. Exiting...")
+ break
+ if rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow and \
+ rx_md.out_of_sequence:
+ print("Detected sequence error!")
+ elif rx_md.error_code == uhd.types.RXMetadataErrorCode.overflow:
+ print("ERROR: Overflow detected!")
+ elif rx_md.error_code != uhd.types.RXMetadataErrorCode.none:
+ print("ERROR: recv() gave unexpected error code: " + rx_md.strerror())
+ pbar.update(num_rx_i * 4 * num_ports)
+ output_data[:, num_rx:num_rx+num_rx_i] = output_buf[:, 0:num_rx_i]
+ num_rx += num_rx_i
+ else:
+ num_rx = rx_streamer.recv(output_data, rx_md, 5.0)
+ if rx_md.error_code != uhd.types.RXMetadataErrorCode.none:
+ print("Error during download: " + rx_md.strerror())
+ if num_rx != num_words:
+ print("ERROR: Fewer samples received than expected!")
+ print("Download complete.")
+ return num_rx
+
+
+def main():
+ """
+ Run capture
+ """
+ args = parse_args()
+ graph = uhd.rfnoc.RfnocGraph(args.args)
+ replay = uhd.rfnoc.ReplayBlockControl(graph.get_block(args.block))
+ radio_chan_pairs = enumerate_radios(graph, args.radio_channels)
+ rate = connect_radios(graph, replay, radio_chan_pairs,
+ args.freq, args.gain, args.antenna, args.rate)
+ print(f"Using rate: {rate/1e6:.3f} Msps")
+ # Set up streamer
+ stream_args = uhd.usrp.StreamArgs(args.cpu_format, "sc16")
+ rx_streamer = graph.create_rx_streamer(len(radio_chan_pairs), stream_args)
+ num_ports = rx_streamer.get_num_channels()
+ for chan in range(len(radio_chan_pairs)):
+ # This won't work if we can't directly attach the streamer to the
+ # replay block.
+ graph.connect(replay.get_unique_id(), chan, rx_streamer, chan)
+ graph.commit()
+ num_samps = run_capture(
+ graph, replay, radio_chan_pairs,
+ args.duration * rate if args.duration is not None else None, rate,
+ args.delay, pkt_size_bytes=args.pkt_size)
+
+ if args.numpy:
+ tmp_dir = tempfile.mkdtemp()
+ mmap_filename = os.path.join(tmp_dir, 'replay_capture.dat')
+ else:
+ mmap_filename = args.output_file
+ cap_dtype = np.complex64 if args.cpu_format == 'fc32' else np.uint32
+
+ output_data = np.memmap(
+ mmap_filename, shape=(num_ports, num_samps), mode='w+', dtype=cap_dtype)
+ output_data.flush()
+
+ rx_data_to_host(rx_streamer, output_data, num_samps, replay.get_max_packet_size(0))
+ if args.numpy:
+ print(f"Saving data as Numpy array to {args.output_file}...")
+ with open(args.output_file, 'wb') as out_file:
+ np.save(out_file, output_data)
+ output_data = None
+ shutil.rmtree(tmp_dir)
+ return True
+
+if __name__ == "__main__":
+ sys.exit(not main())