aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-08-08 10:25:20 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:36 -0800
commitbffef674fbbcd892967017e81515bb76e0b850b5 (patch)
tree8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib/include
parent91e01c484475600fcd659bb433ab86efa5146426 (diff)
downloaduhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz
uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2
uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources. Errors can come from the strs packets originating from the stream endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib/include')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp41
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp17
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp49
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp11
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp6
5 files changed, 114 insertions, 10 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 0fb0ab5d1..63c2b24cb 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#include <uhd/types/metadata.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
@@ -103,8 +104,9 @@ private:
class chdr_tx_data_xport
{
public:
- using uptr = std::unique_ptr<chdr_tx_data_xport>;
- using buff_t = transport::frame_buff;
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+ using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>;
//! Information about data packet
struct packet_info_t
@@ -215,6 +217,16 @@ public:
}
/*!
+ * Configure a function to call to enqueue async msgs
+ *
+ * \param fn Function to enqueue async messages
+ */
+ void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn)
+ {
+ _enqueue_async_msg = fn;
+ }
+
+ /*!
* Sends a TX data packet
*
* \param buff the frame buffer containing the packet to send
@@ -286,7 +298,27 @@ private:
_fc_state.update_dest_recv_count(
{strs.xfer_count_bytes, static_cast<uint32_t>(strs.xfer_count_pkts)});
- // TODO: check strs status here and push into async msg queue
+ if (strs.status != chdr::STRS_OKAY) {
+ switch (strs.status) {
+ case chdr::STRS_SEQERR:
+ UHD_LOG_FASTPATH("S");
+ if (_enqueue_async_msg) {
+ _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0);
+ }
+ break;
+ case chdr::STRS_DATAERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!");
+ break;
+ case chdr::STRS_RTERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!");
+ break;
+ case chdr::STRS_CMDERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!");
+ break;
+ default:
+ break;
+ }
+ }
// Packet belongs to this transport, release buff and return true
recv_link->release_recv_buff(std::move(buff));
@@ -522,6 +554,9 @@ private:
// Handles sending of strc flow control ack packets
detail::tx_flow_ctrl_sender _fc_sender;
+ // Function to enqueue an async msg
+ enqueue_async_msg_fn_t _enqueue_async_msg;
+
// Local / Source EPID
sep_id_t _epid;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
index 3bfc9d05a..3e006f7f9 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -9,6 +9,7 @@
#include <uhd/rfnoc/node.hpp>
#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/rfnoc/tx_async_msg_queue.hpp>
#include <uhdlib/transport/tx_streamer_impl.hpp>
#include <string>
@@ -78,9 +79,25 @@ public:
*/
void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport);
+ /*! Receive an asynchronous message from this tx stream
+ *
+ * Implementation of tx_streamer API method.
+ *
+ * \param async_metadata the metadata to be filled in
+ * \param timeout the timeout in seconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+ bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
+ void _handle_tx_event_action(
+ const res_source_info& src, tx_event_action_info::sptr tx_event_action);
+
+ // Queue for async messages
+ tx_async_msg_queue::sptr _async_msg_queue;
+
// Properties
std::vector<property_t<double>> _scaling_out;
std::vector<property_t<double>> _samp_rate_out;
diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp
new file mode 100644
index 000000000..181a31754
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp
@@ -0,0 +1,49 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP
+#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP
+
+#include <uhd/types/metadata.hpp>
+#include <boost/lockfree/queue.hpp>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Implements queue of async messages originating from the tx data transport
+ * and from the rfnoc graph.
+ */
+class tx_async_msg_queue
+{
+public:
+ using sptr = std::shared_ptr<tx_async_msg_queue>;
+
+ //! Constructor
+ tx_async_msg_queue(size_t capacity);
+
+ /*!
+ * Retrieve async message from queue
+ *
+ * \param async_metadata the metadata to be filled in
+ * \param timeout_ms the timeout in milliseconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+ bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms);
+
+ /*!
+ * Push an async message onto the queue
+ *
+ * \param async_metadata the metadata to be pushed
+ */
+ void enqueue(const async_metadata_t& async_metadata);
+
+private:
+ boost::lockfree::queue<async_metadata_t> _queue;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index 819ed5558..35a724fa9 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -99,7 +99,6 @@ public:
return _spp;
}
-
/*! Get width of each over-the-wire item component. For complex items,
* returns the width of one component only (real or imaginary).
*/
@@ -178,15 +177,13 @@ public:
}
}
- //! Implementation of rx_streamer API method
- bool recv_async_msg(
- uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+protected:
+ //! Returns the tick rate for conversion of timestamp
+ double get_tick_rate() const
{
- // TODO: implement me
- return false;
+ return _zero_copy_streamer.get_tick_rate();
}
-protected:
//! Returns the size in bytes of a sample in a packet
size_t get_mtu() const
{
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
index 1b6f55238..5ac7a1e8c 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -51,6 +51,12 @@ public:
return _xports.size();
}
+ //! Returns the tick rate for conversion of timestamp
+ double get_tick_rate() const
+ {
+ return _tick_rate;
+ }
+
//! Configures tick rate for conversion of timestamp
void set_tick_rate(const double rate)
{