aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp10
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp31
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp15
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp40
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp12
5 files changed, 90 insertions, 18 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index d39d88f43..1afe5db80 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -76,6 +76,15 @@ public:
bool check_topology(const std::vector<size_t>& connected_inputs,
const std::vector<size_t>& connected_outputs);
+ /*! Connects a channel to the streamer port
+ *
+ * Overrides method in rx_streamer_impl.
+ *
+ * \param channel The streamer channel to which to connect
+ * \param xport The transport for the specified channel
+ */
+ void connect_channel(const size_t channel, chdr_rx_data_xport::uptr xport);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
@@ -91,6 +100,7 @@ private:
std::vector<property_t<double>> _samp_rate_in;
std::vector<property_t<double>> _tick_rate_in;
std::vector<property_t<std::string>> _type_in;
+ std::vector<property_t<size_t>> _mtu_in;
// Streamer unique ID
const std::string _unique_id;
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index cc989e8f2..b52358e55 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -87,18 +87,23 @@ public:
_setup_converters(num_ports, stream_args);
_zero_copy_streamer.set_samp_rate(_samp_rate);
_zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+
+ if (stream_args.args.has_key("spp")) {
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ _mtu = _spp * _convert_info.bytes_per_otw_item;
+ }
}
//! Connect a new channel to the streamer
// FIXME: Needs some way to handle virtual channels, since xport could be shared among them
- void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ virtual void connect_channel(const size_t channel, typename transport_t::uptr xport)
{
- const size_t max_pyld_size = xport->get_max_payload_size();
+ const size_t mtu = xport->get_max_payload_size();
_zero_copy_streamer.connect_channel(channel, std::move(xport));
- // Set spp based on the transport frame size
- const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
- _spp = std::min(_spp, xport_spp);
+ if (mtu < _mtu) {
+ set_mtu(mtu);
+ }
}
//! Implementation of rx_streamer API method
@@ -183,6 +188,19 @@ protected:
_converters[chan]->set_scalar(scale_factor);
}
+ //! Returns the maximum payload size
+ size_t get_mtu() const
+ {
+ return _mtu;
+ }
+
+ //! Sets the MTU and calculates spp
+ void set_mtu(const size_t mtu)
+ {
+ _mtu = mtu;
+ _spp = _mtu / _convert_info.bytes_per_otw_item;
+ }
+
//! Configures sample rate for conversion of timestamp
void set_samp_rate(const double rate)
{
@@ -336,6 +354,9 @@ private:
// Sample rate used to calculate metadata time_spec_t
double _samp_rate = 1.0;
+ // MTU, determined when xport is connected and modifiable by subclass
+ size_t _mtu = std::numeric_limits<std::size_t>::max();
+
// Maximum number of samples per packet
size_t _spp = std::numeric_limits<std::size_t>::max();
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index 35a724fa9..fa84026fe 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -76,7 +76,11 @@ public:
{
_setup_converters(num_chans, stream_args);
_zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
- _spp = stream_args.args.cast<size_t>("spp", _spp);
+
+ if (stream_args.args.has_key("spp")) {
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ _mtu = _spp * _convert_info.bytes_per_otw_item;
+ }
}
virtual void connect_channel(const size_t channel, typename transport_t::uptr xport)
@@ -184,7 +188,7 @@ protected:
return _zero_copy_streamer.get_tick_rate();
}
- //! Returns the size in bytes of a sample in a packet
+ //! Returns the maximum payload size
size_t get_mtu() const
{
return _mtu;
@@ -194,10 +198,7 @@ protected:
void set_mtu(const size_t mtu)
{
_mtu = mtu;
-
- // Check if spp needs to be lowered. SPP may already be lower than the
- // value allowed by mtu if the user specified it using stream_args.
- _spp = std::min(_spp, _mtu / _convert_info.bytes_per_otw_item);
+ _spp = _mtu / _convert_info.bytes_per_otw_item;
}
//! Configures scaling factor for conversion
@@ -308,7 +309,7 @@ private:
// Sample rate used to calculate metadata time_spec_t
double _samp_rate = 1.0;
- // Maximum payload size
+ // MTU, determined when xport is connected and modifiable by subclass
size_t _mtu = std::numeric_limits<std::size_t>::max();
// Maximum number of samples per packet
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index b50e2fe15..9383e3487 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -56,10 +56,34 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(
_samp_rate_in.reserve(num_chans);
_tick_rate_in.reserve(num_chans);
_type_in.reserve(num_chans);
+ _mtu_in.reserve(num_chans);
for (size_t i = 0; i < num_chans; i++) {
_register_props(i, stream_args.otw_format);
}
+
+ for (size_t i = 0; i < num_chans; i++) {
+ prop_ptrs_t mtu_resolver_out;
+ for (auto& mtu_prop : _mtu_in) {
+ mtu_resolver_out.insert(&mtu_prop);
+ }
+
+ add_property_resolver({&_mtu_in[i]}, std::move(mtu_resolver_out),
+ [&mtu_in = _mtu_in[i], i, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `mtu_in'@" << i);
+ if (mtu_in.is_valid()) {
+ const size_t mtu = std::min(mtu_in.get(), rx_streamer_impl::get_mtu());
+ // Set the same MTU value for all chans
+ for (auto& prop : this->_mtu_in) {
+ prop.set(mtu);
+ }
+ if (mtu < rx_streamer_impl::get_mtu()) {
+ rx_streamer_impl::set_mtu(mtu);
+ }
+ }
+ });
+ }
+
node_accessor_t node_accessor{};
node_accessor.init_props(this);
}
@@ -124,6 +148,18 @@ void rfnoc_rx_streamer::_handle_overrun()
}
}
+void rfnoc_rx_streamer::connect_channel(
+ const size_t channel, chdr_rx_data_xport::uptr xport)
+{
+ UHD_ASSERT_THROW(channel < _mtu_in.size());
+
+ // Update MTU property based on xport limits
+ const size_t mtu = xport->get_max_payload_size();
+ set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::INPUT_EDGE, channel});
+
+ rx_streamer_impl<chdr_rx_data_xport>::connect_channel(channel, std::move(xport));
+}
+
void rfnoc_rx_streamer::_register_props(const size_t chan,
const std::string& otw_format)
{
@@ -136,18 +172,22 @@ void rfnoc_rx_streamer::_register_props(const size_t chan,
PROP_KEY_TICK_RATE, {res_source_info::INPUT_EDGE, chan}));
_type_in.emplace_back(property_t<std::string>(
PROP_KEY_TYPE, otw_format, {res_source_info::INPUT_EDGE, chan}));
+ _mtu_in.emplace_back(property_t<size_t>(
+ PROP_KEY_MTU, get_mtu(), {res_source_info::INPUT_EDGE, chan}));
// Give us some shorthands for the rest of this function
property_t<double>* scaling_in = &_scaling_in.back();
property_t<double>* samp_rate_in = &_samp_rate_in.back();
property_t<double>* tick_rate_in = &_tick_rate_in.back();
property_t<std::string>* type_in = &_type_in.back();
+ property_t<size_t>* mtu_in = &_mtu_in.back();
// Register them
register_property(scaling_in);
register_property(samp_rate_in);
register_property(tick_rate_in);
register_property(type_in);
+ register_property(mtu_in);
// Add resolvers
add_property_resolver({scaling_in}, {},
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
index 4fc1a3ff8..d4aa267af 100644
--- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -60,12 +60,12 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
[&mtu_out = _mtu_out[i], i, this]() {
RFNOC_LOG_TRACE("Calling resolver for `mtu_out'@" << i);
if (mtu_out.is_valid()) {
- const size_t mtu = mtu_out.get();
- // If the current MTU changes, set the same value for all chans
+ const size_t mtu = std::min(mtu_out.get(), tx_streamer_impl::get_mtu());
+ // Set the same MTU value for all chans
+ for (auto& prop : this->_mtu_out) {
+ prop.set(mtu);
+ }
if (mtu < tx_streamer_impl::get_mtu()) {
- for (auto& prop : this->_mtu_out) {
- prop.set(mtu);
- }
tx_streamer_impl::set_mtu(mtu);
}
}
@@ -155,7 +155,7 @@ void rfnoc_tx_streamer::_register_props(const size_t chan,
_type_out.emplace_back(property_t<std::string>(
PROP_KEY_TYPE, otw_format, {res_source_info::OUTPUT_EDGE, chan}));
_mtu_out.push_back(property_t<size_t>(
- PROP_KEY_MTU, {res_source_info::OUTPUT_EDGE, chan}));
+ PROP_KEY_MTU, get_mtu(), {res_source_info::OUTPUT_EDGE, chan}));
// Give us some shorthands for the rest of this function
property_t<double>* scaling_out = &_scaling_out.back();