aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2021-12-17 11:20:10 +0100
committerAaron Rossetto <aaron.rossetto@ni.com>2022-01-10 14:55:25 -0600
commit4725e97c6d6baa82d414ea89623ca32732d9bea1 (patch)
tree75a426d8f4cb4f91ba5fc84977e9e013394947d1 /host/lib/include/uhdlib
parentd33deb3cae1f231000745dd077d339dcb004e97b (diff)
downloaduhd-4725e97c6d6baa82d414ea89623ca32732d9bea1.tar.gz
uhd-4725e97c6d6baa82d414ea89623ca32732d9bea1.tar.bz2
uhd-4725e97c6d6baa82d414ea89623ca32732d9bea1.zip
rfnoc: transport: Check if streamers are connected in send() and recv()
This adds a check in send() and recv() whether or not the streamer is actually connected. If not, an exception is thrown with the message: [rx_stream] Attempting to call recv() before all channels are connected! or [tx_stream] Attempting to call send() before all channels are connected! The check is a single boolean flag check, but it does add a branch in our hot code. Since this event is unlikely, and only happens in badly configured apps, we will get some help from the CPUs branch prediction to reduce the additional cost of this check.
Diffstat (limited to 'host/lib/include/uhdlib')
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp22
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp20
2 files changed, 40 insertions, 2 deletions
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index f776d9373..1b25b308b 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -13,6 +13,7 @@
#include <uhd/types/endianness.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <algorithm>
#include <limits>
#include <vector>
@@ -74,7 +75,9 @@ class rx_streamer_impl : public rx_streamer
public:
//! Constructor
rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
- : _zero_copy_streamer(num_ports), _in_buffs(num_ports)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ , _chans_connected(num_ports, false)
{
if (stream_args.cpu_format.empty()) {
throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
@@ -99,6 +102,11 @@ public:
const size_t mtu = xport->get_mtu();
_hdr_len = std::max(_hdr_len, xport->get_chdr_hdr_len());
_zero_copy_streamer.connect_channel(channel, std::move(xport));
+ // Note: The previous call also checks if the channel index was valid.
+ _chans_connected[channel] = true;
+ _all_chans_connected = std::all_of(_chans_connected.cbegin(),
+ _chans_connected.cend(),
+ [](const bool connected) { return connected; });
if (mtu < _mtu) {
set_mtu(mtu);
@@ -132,6 +140,11 @@ public:
const double timeout,
const bool one_packet) override
{
+ if (!_all_chans_connected) {
+ throw uhd::runtime_error("[rx_stream] Attempting to call recv() before all "
+ "channels are connected!");
+ }
+
if (_error_metadata_cache.check(metadata)) {
return 0;
}
@@ -422,6 +435,13 @@ private:
// Fragment (partially read packet) information
size_t _fragment_offset_in_samps = 0;
rx_metadata_t _last_fragment_metadata;
+
+ // Store a list of channels that are already connected
+ std::vector<bool> _chans_connected;
+
+ // Flag to store if all channels are connected. This is to speed up the lookup
+ // of all channels' connected-status.
+ bool _all_chans_connected = false;
};
}} // namespace uhd::transport
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index 9dc3b0c35..6b34c1c10 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -10,9 +10,10 @@
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/types/metadata.hpp>
-#include <uhd/utils/tasks.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <algorithm>
#include <limits>
#include <vector>
@@ -106,6 +107,7 @@ public:
: _zero_copy_streamer(num_chans)
, _zero_buffs(num_chans, &_zero)
, _out_buffs(num_chans)
+ , _chans_connected(num_chans, false)
{
_setup_converters(num_chans, stream_args);
_zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
@@ -120,6 +122,11 @@ public:
const size_t mtu = xport->get_mtu();
_hdr_len = std::max(_hdr_len, xport->get_chdr_hdr_len());
_zero_copy_streamer.connect_channel(channel, std::move(xport));
+ // Note: The previous call also checks if the channel index was valid.
+ _chans_connected[channel] = true;
+ _all_chans_connected = std::all_of(_chans_connected.cbegin(),
+ _chans_connected.cend(),
+ [](const bool connected) { return connected; });
if (mtu < _mtu) {
set_mtu(mtu);
@@ -149,6 +156,10 @@ public:
const uhd::tx_metadata_t& metadata_,
const double timeout) override
{
+ if (!_all_chans_connected) {
+ throw uhd::runtime_error("[tx_stream] Attempting to call send() before all "
+ "channels are connected!");
+ }
uhd::tx_metadata_t metadata(metadata_);
if (nsamps_per_buff == 0 && metadata.start_of_burst) {
@@ -459,6 +470,13 @@ private:
// Metadata cache for send calls with no data
detail::tx_metadata_cache _metadata_cache;
+
+ // Store a list of channels that are already connected
+ std::vector<bool> _chans_connected;
+
+ // Flag to store if all channels are connected. This is to speed up the lookup
+ // of all channels' connected-status.
+ bool _all_chans_connected = false;
};
}} // namespace uhd::transport