aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/chdr.cpp7
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp143
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp41
-rw-r--r--host/lib/transport/zero_copy_flow_ctrl.cpp5
5 files changed, 92 insertions, 105 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index ae903d956..15771697a 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -112,6 +112,7 @@ LIBUHD_PYTHON_GEN_SOURCE(
)
LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
diff --git a/host/lib/transport/chdr.cpp b/host/lib/transport/chdr.cpp
index 48263f57e..36f380d62 100644
--- a/host/lib/transport/chdr.cpp
+++ b/host/lib/transport/chdr.cpp
@@ -23,6 +23,7 @@ using namespace uhd::transport::vrt;
static const uint32_t HDR_FLAG_TSF = (1 << 29);
static const uint32_t HDR_FLAG_EOB = (1 << 28);
static const uint32_t HDR_FLAG_ERROR = (1 << 28);
+static const uint32_t HDR_FLAG_FCACK = (1 << 28);
/***************************************************************************/
/* Packing */
@@ -45,8 +46,8 @@ UHD_INLINE uint32_t _hdr_pack_chdr(
| (if_packet_info.packet_type << 30)
// 1 Bit: Has time
| (if_packet_info.has_tsf ? HDR_FLAG_TSF : 0)
- // 1 Bit: EOB or Error
- | ((if_packet_info.eob or if_packet_info.error) ? HDR_FLAG_EOB : 0)
+ // 1 Bit: EOB or Error or FC ACK
+ | ((if_packet_info.eob or if_packet_info.error or if_packet_info.fc_ack) ? HDR_FLAG_EOB : 0)
// 12 Bits: Sequence number
| ((if_packet_info.packet_count & 0xFFF) << 16)
// 16 Bits: Total packet length
@@ -111,6 +112,8 @@ UHD_INLINE void _hdr_unpack_chdr(
&& ((chdr & HDR_FLAG_EOB) > 0);
if_packet_info.error = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_RESP)
&& ((chdr & HDR_FLAG_ERROR) > 0);
+ if_packet_info.fc_ack = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_FC)
+ && ((chdr & HDR_FLAG_FCACK) > 0);
if_packet_info.packet_count = (chdr >> 16) & 0xFFF;
// Set packet length variables
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 921bfdfa0..c962d40e6 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -12,9 +12,9 @@
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/log.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
@@ -22,7 +22,6 @@
#include <boost/dynamic_bitset.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
-#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
@@ -59,6 +58,7 @@ class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type;
typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -102,33 +102,6 @@ public:
_header_offset_words32 = header_offset_words32;
}
- ////////////////// RFNOC ///////////////////////////
- //! Set the stream ID for a specific channel (or no SID)
- void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){
- _props.at(xport_chan).has_sid = has_sid;
- _props.at(xport_chan).sid = sid;
- }
-
- //! Get the stream ID for a specific channel (or zero if no SID)
- uint32_t get_xport_chan_sid(const size_t xport_chan) const {
- if (_props.at(xport_chan).has_sid) {
- return _props.at(xport_chan).sid;
- } else {
- return 0;
- }
- }
-
- void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator)
- {
- _terminator = terminator;
- }
-
- uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
- {
- return _terminator;
- }
- ////////////////// RFNOC ///////////////////////////
-
/*!
* Set the threshold for alignment failure.
* How many packets throw out before giving up?
@@ -183,6 +156,13 @@ public:
if (do_init) handle_flowctrl(0);
}
+ void set_xport_handle_flowctrl_ack(
+ const size_t xport_chan,
+ const handle_flowctrl_ack_type &handle_flowctrl_ack
+ ) {
+ _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack;
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -211,12 +191,11 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
- // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this.
- //if (stream_cmd.stream_now
- //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- //and _props.size() > 1) {
- //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
- //}
+ if (size() > 1 and stream_cmd.stream_now and
+ stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
+ {
+ throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align.");
+ }
for (size_t i = 0; i < _props.size(); i++)
{
@@ -307,11 +286,8 @@ private:
size_t packet_count;
handle_overflow_type handle_overflow;
handle_flowctrl_type handle_flowctrl;
+ handle_flowctrl_ack_type handle_flowctrl_ack;
size_t fc_update_window;
- /////// RFNOC ///////////
- bool has_sid;
- uint32_t sid;
- /////// RFNOC ///////////
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -340,6 +316,7 @@ private:
buffers_info_type(const size_t size):
std::vector<per_buffer_info_type>(size),
indexes_todo(size, true),
+ alignment_time(0),
alignment_time_valid(false),
data_bytes_to_copy(0),
fragment_offset_in_samps(0)
@@ -384,8 +361,6 @@ private:
int recvd_packets;
#endif
- uhd::rfnoc::rx_stream_terminator::sptr _terminator;
-
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -398,42 +373,58 @@ private:
per_buffer_info_type &curr_buffer_info,
double timeout
){
- //get a single packet from the transport layer
managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
- buff = _props[index].get_buff(timeout);
- if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
-
- #ifdef ERROR_INJECT_DROPPED_PACKETS
- if (++recvd_packets > 1000)
+ per_buffer_info_type &info = curr_buffer_info;
+ while (1)
{
- recvd_packets = 0;
- buff.reset();
+ //get a single packet from the transport layer
buff = _props[index].get_buff(timeout);
if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
- }
- #endif
- //bounds check before extract
- size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
- if (num_packet_words32 <= _header_offset_words32){
- throw std::runtime_error("recv buffer smaller than vrt packet offset");
- }
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ if (++recvd_packets > 1000)
+ {
+ recvd_packets = 0;
+ buff.reset();
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
+ }
+ #endif
- //extract packet info
- per_buffer_info_type &info = curr_buffer_info;
- info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
- info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
- _vrt_unpacker(info.vrt_hdr, info.ifpi);
- info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
- info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
-
- //handle flow control
- if (_props[index].handle_flowctrl)
- {
- if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ //bounds check before extract
+ size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
+ if (num_packet_words32 <= _header_offset_words32){
+ throw std::runtime_error("recv buffer smaller than vrt packet offset");
+ }
+
+ //extract packet info
+ info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
+ info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
+ _vrt_unpacker(info.vrt_hdr, info.ifpi);
+ info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
+ info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+
+ //handle flow control
+ if (_props[index].handle_flowctrl)
{
- _props[index].handle_flowctrl(info.ifpi.packet_count);
+ if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
}
+
+ //handle flow control ack
+ if (info.ifpi.fc_ack){
+ if (_props[index].handle_flowctrl_ack) {
+ _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff));
+ }
+ // Process the next packet
+ buff.reset();
+ info.copy_buff = nullptr;
+ continue;
+ }
+
+ break;
}
//--------------------------------------------------------------
@@ -605,7 +596,7 @@ private:
rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
curr_info.metadata = metadata;
- UHD_LOG_FASTPATH("O")
+ UHD_LOG_FASTPATH("O");
}
curr_info[index].buff.reset();
curr_info[index].copy_buff = nullptr;
@@ -627,7 +618,7 @@ private:
prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate);
curr_info.metadata.out_of_sequence = true;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
- UHD_LOG_FASTPATH("D")
+ UHD_LOG_FASTPATH("D");
return;
}
@@ -635,10 +626,10 @@ private:
//too many iterations: detect alignment failure
if (iterations++ > _alignment_failure_threshold){
UHD_LOGGER_ERROR("STREAMER") << boost::format(
- "The receive packet handler failed to time-align packets. "
- "%u received packets were processed by the handler. "
- "However, a timestamp match could not be determined."
- ) % iterations;
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n"
+ ) % iterations << std::endl;
std::swap(curr_info, next_info); //save progress from curr -> next
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
_props[index].handle_overflow();
@@ -659,7 +650,7 @@ private:
}
/*******************************************************************
- * Receive a single packet:
+ * Receive a single packet on all channels
* Handles fragmentation, messages, errors, and copy-conversion.
* When no fragments are available, call the get aligned buffers.
* Then copy-convert available data into the user's IO buffers.
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index e824ef4e9..5cba570a7 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -12,7 +12,6 @@
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/thread.hpp>
@@ -49,6 +48,7 @@ namespace sph {
class send_packet_handler{
public:
typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<void(void)> post_send_cb_type;
typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;
typedef void(*vrt_packer_type)(uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
@@ -93,27 +93,6 @@ public:
_props.at(xport_chan).sid = sid;
}
- ///////// RFNOC ///////////////////
- //! Get the stream ID for a specific channel (or zero if no SID)
- uint32_t get_xport_chan_sid(const size_t xport_chan) const {
- if (_props.at(xport_chan).has_sid) {
- return _props.at(xport_chan).sid;
- } else {
- return 0;
- }
- }
-
- void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator)
- {
- _terminator = terminator;
- }
-
- uhd::rfnoc::tx_stream_terminator::sptr get_terminator()
- {
- return _terminator;
- }
- ///////// RFNOC ///////////////////
-
void set_enable_trailer(const bool enable)
{
_has_tlr = enable;
@@ -138,6 +117,15 @@ public:
_props.at(xport_chan).get_buff = get_buff;
}
+ /*!
+ * Set the callback function for post-send.
+ * \param xport_chan which transport channel
+ * \param cb post-send callback
+ */
+ void set_xport_chan_post_send_cb(const size_t xport_chan, const post_send_cb_type &cb){
+ _props.at(xport_chan).go_postal = cb;
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_inputs = id.num_inputs;
@@ -198,6 +186,7 @@ public:
if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
if_packet_info.sob = metadata.start_of_burst;
if_packet_info.eob = metadata.end_of_burst;
+ if_packet_info.fc_ack = false; //This is a data packet
/*
* Metadata is cached when we get a send requesting a start of burst with no samples.
@@ -291,6 +280,7 @@ private:
struct xport_chan_props_type{
xport_chan_props_type(void):has_sid(false),sid(0){}
get_buff_type get_buff;
+ post_send_cb_type go_postal;
bool has_sid;
uint32_t sid;
managed_send_buffer::sptr buff;
@@ -308,8 +298,6 @@ private:
bool _cached_metadata;
uhd::tx_metadata_t _metadata_cache;
- uhd::rfnoc::tx_stream_terminator::sptr _terminator;
-
#ifdef UHD_TXRX_DEBUG_PRINTS
struct dbg_send_stat_t {
dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate):
@@ -428,6 +416,11 @@ private:
const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
buff->commit(num_vita_words32*sizeof(uint32_t));
buff.reset(); //effectively a release
+
+ if (_props[index].go_postal)
+ {
+ _props[index].go_postal();
+ }
}
//! Shared variables for the worker threads
diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp
index 709b9e981..25be35569 100644
--- a/host/lib/transport/zero_copy_flow_ctrl.cpp
+++ b/host/lib/transport/zero_copy_flow_ctrl.cpp
@@ -65,7 +65,7 @@ public:
zero_copy_flow_ctrl_mrb(
flow_ctrl_func flow_ctrl
) :
- _mb(nullptr),
+ _mb(NULL),
_flow_ctrl(flow_ctrl)
{
/* NOP */
@@ -80,8 +80,6 @@ public:
{
if (_mb)
{
- _mb->commit(size());
- while (_flow_ctrl and not _flow_ctrl(_mb)) {}
_mb.reset();
}
}
@@ -89,6 +87,7 @@ public:
UHD_INLINE sptr get(sptr &mb)
{
_mb = mb;
+ while (_flow_ctrl and not _flow_ctrl(_mb)) {}
return make(this, _mb->cast<void *>(), _mb->size());
}