aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp41
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp17
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp49
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp11
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp6
-rw-r--r--host/lib/rfnoc/CMakeLists.txt1
-rw-r--r--host/lib/rfnoc/actions.cpp20
-rw-r--r--host/lib/rfnoc/radio_control_impl.cpp16
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp69
-rw-r--r--host/lib/rfnoc/tx_async_msg_queue.cpp52
10 files changed, 262 insertions, 20 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 0fb0ab5d1..63c2b24cb 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#include <uhd/types/metadata.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
@@ -103,8 +104,9 @@ private:
class chdr_tx_data_xport
{
public:
- using uptr = std::unique_ptr<chdr_tx_data_xport>;
- using buff_t = transport::frame_buff;
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+ using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>;
//! Information about data packet
struct packet_info_t
@@ -215,6 +217,16 @@ public:
}
/*!
+ * Configure a function to call to enqueue async msgs
+ *
+ * \param fn Function to enqueue async messages
+ */
+ void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn)
+ {
+ _enqueue_async_msg = fn;
+ }
+
+ /*!
* Sends a TX data packet
*
* \param buff the frame buffer containing the packet to send
@@ -286,7 +298,27 @@ private:
_fc_state.update_dest_recv_count(
{strs.xfer_count_bytes, static_cast<uint32_t>(strs.xfer_count_pkts)});
- // TODO: check strs status here and push into async msg queue
+ if (strs.status != chdr::STRS_OKAY) {
+ switch (strs.status) {
+ case chdr::STRS_SEQERR:
+ UHD_LOG_FASTPATH("S");
+ if (_enqueue_async_msg) {
+ _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0);
+ }
+ break;
+ case chdr::STRS_DATAERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!");
+ break;
+ case chdr::STRS_RTERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!");
+ break;
+ case chdr::STRS_CMDERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!");
+ break;
+ default:
+ break;
+ }
+ }
// Packet belongs to this transport, release buff and return true
recv_link->release_recv_buff(std::move(buff));
@@ -522,6 +554,9 @@ private:
// Handles sending of strc flow control ack packets
detail::tx_flow_ctrl_sender _fc_sender;
+ // Function to enqueue an async msg
+ enqueue_async_msg_fn_t _enqueue_async_msg;
+
// Local / Source EPID
sep_id_t _epid;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
index 3bfc9d05a..3e006f7f9 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -9,6 +9,7 @@
#include <uhd/rfnoc/node.hpp>
#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/rfnoc/tx_async_msg_queue.hpp>
#include <uhdlib/transport/tx_streamer_impl.hpp>
#include <string>
@@ -78,9 +79,25 @@ public:
*/
void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport);
+ /*! Receive an asynchronous message from this tx stream
+ *
+ * Implementation of tx_streamer API method.
+ *
+ * \param async_metadata the metadata to be filled in
+ * \param timeout the timeout in seconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+ bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
+ void _handle_tx_event_action(
+ const res_source_info& src, tx_event_action_info::sptr tx_event_action);
+
+ // Queue for async messages
+ tx_async_msg_queue::sptr _async_msg_queue;
+
// Properties
std::vector<property_t<double>> _scaling_out;
std::vector<property_t<double>> _samp_rate_out;
diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp
new file mode 100644
index 000000000..181a31754
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp
@@ -0,0 +1,49 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP
+#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP
+
+#include <uhd/types/metadata.hpp>
+#include <boost/lockfree/queue.hpp>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Implements queue of async messages originating from the tx data transport
+ * and from the rfnoc graph.
+ */
+class tx_async_msg_queue
+{
+public:
+ using sptr = std::shared_ptr<tx_async_msg_queue>;
+
+ //! Constructor
+ tx_async_msg_queue(size_t capacity);
+
+ /*!
+ * Retrieve async message from queue
+ *
+ * \param async_metadata the metadata to be filled in
+ * \param timeout_ms the timeout in milliseconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+ bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms);
+
+ /*!
+ * Push an async message onto the queue
+ *
+ * \param async_metadata the metadata to be pushed
+ */
+ void enqueue(const async_metadata_t& async_metadata);
+
+private:
+ boost::lockfree::queue<async_metadata_t> _queue;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index 819ed5558..35a724fa9 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -99,7 +99,6 @@ public:
return _spp;
}
-
/*! Get width of each over-the-wire item component. For complex items,
* returns the width of one component only (real or imaginary).
*/
@@ -178,15 +177,13 @@ public:
}
}
- //! Implementation of rx_streamer API method
- bool recv_async_msg(
- uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+protected:
+ //! Returns the tick rate for conversion of timestamp
+ double get_tick_rate() const
{
- // TODO: implement me
- return false;
+ return _zero_copy_streamer.get_tick_rate();
}
-protected:
//! Returns the size in bytes of a sample in a packet
size_t get_mtu() const
{
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
index 1b6f55238..5ac7a1e8c 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -51,6 +51,12 @@ public:
return _xports.size();
}
+ //! Returns the tick rate for conversion of timestamp
+ double get_tick_rate() const
+ {
+ return _tick_rate;
+ }
+
//! Configures tick rate for conversion of timestamp
void set_tick_rate(const double rate)
{
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index a88507dcd..73de394e3 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -55,6 +55,7 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/tx_async_msg_queue.cpp
# Default block control classes:
${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp
diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp
index d4ed4559f..4de05b304 100644
--- a/host/lib/rfnoc/actions.cpp
+++ b/host/lib/rfnoc/actions.cpp
@@ -63,3 +63,23 @@ rx_event_action_info::sptr rx_event_action_info::make()
};
return std::make_shared<rx_event_action_info_make_shared>();
}
+
+/*** TX Metadata Action Info *************************************************/
+tx_event_action_info::tx_event_action_info(
+ uhd::async_metadata_t::event_code_t event_code_)
+ : action_info(ACTION_KEY_TX_EVENT), event_code(event_code_)
+{
+}
+
+tx_event_action_info::sptr tx_event_action_info::make(
+ uhd::async_metadata_t::event_code_t event_code)
+{
+ struct tx_event_action_info_make_shared : public tx_event_action_info
+ {
+ tx_event_action_info_make_shared(uhd::async_metadata_t::event_code_t event_code)
+ : tx_event_action_info(event_code)
+ {
+ }
+ };
+ return std::make_shared<tx_event_action_info_make_shared>(event_code);
+}
diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp
index 2094f4096..9d7257108 100644
--- a/host/lib/rfnoc/radio_control_impl.cpp
+++ b/host/lib/rfnoc/radio_control_impl.cpp
@@ -928,12 +928,24 @@ void radio_control_impl::async_message_handler(
return;
}
switch (code) {
- case err_codes::ERR_TX_UNDERRUN:
+ case err_codes::ERR_TX_UNDERRUN: {
+ auto tx_event_action = tx_event_action_info::make(
+ uhd::async_metadata_t::EVENT_CODE_UNDERFLOW);
+ post_action(res_source_info{res_source_info::INPUT_EDGE, chan},
+ tx_event_action);
UHD_LOG_FASTPATH("U");
+ RFNOC_LOG_TRACE("Posting underrun event action message.");
break;
- case err_codes::ERR_TX_LATE_DATA:
+ }
+ case err_codes::ERR_TX_LATE_DATA: {
+ auto tx_event_action = tx_event_action_info::make(
+ uhd::async_metadata_t::EVENT_CODE_TIME_ERROR);
+ post_action(res_source_info{res_source_info::INPUT_EDGE, chan},
+ tx_event_action);
UHD_LOG_FASTPATH("L");
+ RFNOC_LOG_TRACE("Posting late data event action message.");
break;
+ }
}
break;
}
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
index 61d714a85..4fc1a3ff8 100644
--- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -14,6 +14,7 @@ using namespace uhd::rfnoc;
const std::string STREAMER_ID = "TxStreamer";
static std::atomic<uint64_t> streamer_inst_ctr;
+static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000;
rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
const uhd::stream_args_t stream_args)
@@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
, _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
, _stream_args(stream_args)
{
+ _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE);
+
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ register_action_handler(ACTION_KEY_TX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ tx_event_action_info::sptr tx_event_action =
+ std::dynamic_pointer_cast<tx_event_action_info>(action);
+ if (!tx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid TX event action!");
+ return;
+ }
+ _handle_tx_event_action(src, tx_event_action);
+ });
+
// Initialize properties
_scaling_out.reserve(num_chans);
_samp_rate_out.reserve(num_chans);
@@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
for (auto& mtu_prop : _mtu_out) {
mtu_resolver_out.insert(&mtu_prop);
}
- //property_t<size_t>* mtu_out = &_mtu_out.back();
add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out),
[&mtu_out = _mtu_out[i], i, this]() {
@@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel(
const size_t mtu = xport->get_max_payload_size();
set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel});
+ xport->set_enqueue_async_msg_fn(
+ [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) {
+ async_metadata_t md;
+ md.channel = channel;
+ md.event_code = event_code;
+ md.has_time_spec = has_tsf;
+
+ if (has_tsf) {
+ md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate());
+ }
+
+ this->_async_msg_queue->enqueue(md);
+ });
+
tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport));
}
-void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format)
+bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata,
+ double timeout)
+{
+ const auto timeout_ms = static_cast<uint64_t>(timeout * 1000);
+ return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms);
+}
+
+void rfnoc_tx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
{
// Create actual properties and store them
_scaling_out.push_back(property_t<double>(
@@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot
register_property(mtu_out);
// Add resolvers
- add_property_resolver({scaling_out}, {},
- [&scaling_out = *scaling_out, chan, this]() {
+ add_property_resolver(
+ {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);
if (scaling_out.is_valid()) {
this->set_scale_factor(chan, 32767.0 / scaling_out.get());
}
});
- add_property_resolver({samp_rate_out}, {},
- [&samp_rate_out = *samp_rate_out, chan, this]() {
+ add_property_resolver(
+ {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);
if (samp_rate_out.is_valid()) {
this->set_samp_rate(samp_rate_out.get());
}
});
- add_property_resolver({tick_rate_out}, {},
- [&tick_rate_out = *tick_rate_out, chan, this]() {
+ add_property_resolver(
+ {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() {
RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);
if (tick_rate_out.is_valid()) {
this->set_tick_rate(tick_rate_out.get());
}
});
}
+
+void rfnoc_tx_streamer::_handle_tx_event_action(
+ const res_source_info& src, tx_event_action_info::sptr tx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE);
+
+ uhd::async_metadata_t md;
+ md.event_code = tx_event_action->event_code;
+ md.channel = src.instance;
+ md.has_time_spec = tx_event_action->has_tsf;
+
+ if (md.has_time_spec) {
+ md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate());
+ }
+
+ RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel);
+ _async_msg_queue->enqueue(md);
+}
diff --git a/host/lib/rfnoc/tx_async_msg_queue.cpp b/host/lib/rfnoc/tx_async_msg_queue.cpp
new file mode 100644
index 000000000..71a05074f
--- /dev/null
+++ b/host/lib/rfnoc/tx_async_msg_queue.cpp
@@ -0,0 +1,52 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhdlib/rfnoc/tx_async_msg_queue.hpp>
+#include <chrono>
+#include <thread>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+
+tx_async_msg_queue::tx_async_msg_queue(size_t capacity)
+ : _queue(capacity)
+{
+}
+
+bool tx_async_msg_queue::recv_async_msg(uhd::async_metadata_t& async_metadata,
+ int32_t timeout_ms)
+{
+ using namespace std::chrono;
+
+ if (timeout_ms == 0.0) {
+ return _queue.pop(async_metadata);
+ }
+
+ const auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+
+ bool last_check = false;
+
+ while (true) {
+ if (_queue.pop(async_metadata)) {
+ return true;
+ }
+
+ if (steady_clock::now() > end_time) {
+ if (last_check) {
+ return false;
+ } else {
+ last_check = true;
+ }
+ }
+
+ std::this_thread::sleep_for(std::chrono::microseconds(100));
+ }
+}
+
+void tx_async_msg_queue::enqueue(const async_metadata_t& async_metadata)
+{
+ _queue.push(async_metadata);
+}