aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp143
1 files changed, 67 insertions, 76 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 921bfdfa0..c962d40e6 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -12,9 +12,9 @@
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/log.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
@@ -22,7 +22,6 @@
#include <boost/dynamic_bitset.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
-#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
@@ -59,6 +58,7 @@ class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type;
typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -102,33 +102,6 @@ public:
_header_offset_words32 = header_offset_words32;
}
- ////////////////// RFNOC ///////////////////////////
- //! Set the stream ID for a specific channel (or no SID)
- void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){
- _props.at(xport_chan).has_sid = has_sid;
- _props.at(xport_chan).sid = sid;
- }
-
- //! Get the stream ID for a specific channel (or zero if no SID)
- uint32_t get_xport_chan_sid(const size_t xport_chan) const {
- if (_props.at(xport_chan).has_sid) {
- return _props.at(xport_chan).sid;
- } else {
- return 0;
- }
- }
-
- void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator)
- {
- _terminator = terminator;
- }
-
- uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
- {
- return _terminator;
- }
- ////////////////// RFNOC ///////////////////////////
-
/*!
* Set the threshold for alignment failure.
* How many packets throw out before giving up?
@@ -183,6 +156,13 @@ public:
if (do_init) handle_flowctrl(0);
}
+ void set_xport_handle_flowctrl_ack(
+ const size_t xport_chan,
+ const handle_flowctrl_ack_type &handle_flowctrl_ack
+ ) {
+ _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack;
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -211,12 +191,11 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
- // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this.
- //if (stream_cmd.stream_now
- //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- //and _props.size() > 1) {
- //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
- //}
+ if (size() > 1 and stream_cmd.stream_now and
+ stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
+ {
+ throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align.");
+ }
for (size_t i = 0; i < _props.size(); i++)
{
@@ -307,11 +286,8 @@ private:
size_t packet_count;
handle_overflow_type handle_overflow;
handle_flowctrl_type handle_flowctrl;
+ handle_flowctrl_ack_type handle_flowctrl_ack;
size_t fc_update_window;
- /////// RFNOC ///////////
- bool has_sid;
- uint32_t sid;
- /////// RFNOC ///////////
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -340,6 +316,7 @@ private:
buffers_info_type(const size_t size):
std::vector<per_buffer_info_type>(size),
indexes_todo(size, true),
+ alignment_time(0),
alignment_time_valid(false),
data_bytes_to_copy(0),
fragment_offset_in_samps(0)
@@ -384,8 +361,6 @@ private:
int recvd_packets;
#endif
- uhd::rfnoc::rx_stream_terminator::sptr _terminator;
-
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -398,42 +373,58 @@ private:
per_buffer_info_type &curr_buffer_info,
double timeout
){
- //get a single packet from the transport layer
managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
- buff = _props[index].get_buff(timeout);
- if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
-
- #ifdef ERROR_INJECT_DROPPED_PACKETS
- if (++recvd_packets > 1000)
+ per_buffer_info_type &info = curr_buffer_info;
+ while (1)
{
- recvd_packets = 0;
- buff.reset();
+ //get a single packet from the transport layer
buff = _props[index].get_buff(timeout);
if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
- }
- #endif
- //bounds check before extract
- size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
- if (num_packet_words32 <= _header_offset_words32){
- throw std::runtime_error("recv buffer smaller than vrt packet offset");
- }
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ if (++recvd_packets > 1000)
+ {
+ recvd_packets = 0;
+ buff.reset();
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
+ }
+ #endif
- //extract packet info
- per_buffer_info_type &info = curr_buffer_info;
- info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
- info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
- _vrt_unpacker(info.vrt_hdr, info.ifpi);
- info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
- info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
-
- //handle flow control
- if (_props[index].handle_flowctrl)
- {
- if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ //bounds check before extract
+ size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
+ if (num_packet_words32 <= _header_offset_words32){
+ throw std::runtime_error("recv buffer smaller than vrt packet offset");
+ }
+
+ //extract packet info
+ info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
+ info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
+ _vrt_unpacker(info.vrt_hdr, info.ifpi);
+ info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
+ info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+
+ //handle flow control
+ if (_props[index].handle_flowctrl)
{
- _props[index].handle_flowctrl(info.ifpi.packet_count);
+ if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
}
+
+ //handle flow control ack
+ if (info.ifpi.fc_ack){
+ if (_props[index].handle_flowctrl_ack) {
+ _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff));
+ }
+ // Process the next packet
+ buff.reset();
+ info.copy_buff = nullptr;
+ continue;
+ }
+
+ break;
}
//--------------------------------------------------------------
@@ -605,7 +596,7 @@ private:
rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
curr_info.metadata = metadata;
- UHD_LOG_FASTPATH("O")
+ UHD_LOG_FASTPATH("O");
}
curr_info[index].buff.reset();
curr_info[index].copy_buff = nullptr;
@@ -627,7 +618,7 @@ private:
prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate);
curr_info.metadata.out_of_sequence = true;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
- UHD_LOG_FASTPATH("D")
+ UHD_LOG_FASTPATH("D");
return;
}
@@ -635,10 +626,10 @@ private:
//too many iterations: detect alignment failure
if (iterations++ > _alignment_failure_threshold){
UHD_LOGGER_ERROR("STREAMER") << boost::format(
- "The receive packet handler failed to time-align packets. "
- "%u received packets were processed by the handler. "
- "However, a timestamp match could not be determined."
- ) % iterations;
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n"
+ ) % iterations << std::endl;
std::swap(curr_info, next_info); //save progress from curr -> next
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
_props[index].handle_overflow();
@@ -659,7 +650,7 @@ private:
}
/*******************************************************************
- * Receive a single packet:
+ * Receive a single packet on all channels
* Handles fragmentation, messages, errors, and copy-conversion.
* When no fragments are available, call the get aligned buffers.
* Then copy-convert available data into the user's IO buffers.