aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp73
1 files changed, 60 insertions, 13 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 541d9f3bc..5ca1da687 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -28,6 +28,9 @@
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#ifdef DEVICE3_STREAMER
+# include "../rfnoc/rx_stream_terminator.hpp"
+#endif
#include <boost/dynamic_bitset.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
@@ -112,6 +115,35 @@ public:
_header_offset_words32 = header_offset_words32;
}
+ ////////////////// RFNOC ///////////////////////////
+ //! Set the stream ID for a specific channel (or no SID)
+ void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){
+ _props.at(xport_chan).has_sid = has_sid;
+ _props.at(xport_chan).sid = sid;
+ }
+
+ //! Get the stream ID for a specific channel (or zero if no SID)
+ boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const {
+ if (_props.at(xport_chan).has_sid) {
+ return _props.at(xport_chan).sid;
+ } else {
+ return 0;
+ }
+ }
+
+ #ifdef DEVICE3_STREAMER
+ void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator)
+ {
+ _terminator = terminator;
+ }
+
+ uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
+ {
+ return _terminator;
+ }
+ #endif
+ ////////////////// RFNOC ///////////////////////////
+
/*!
* Set the threshold for alignment failure.
* How many packets throw out before giving up?
@@ -194,11 +226,12 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
- if (stream_cmd.stream_now
- and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- and _props.size() > 1) {
- throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
- }
+ // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this.
+ //if (stream_cmd.stream_now
+ //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ //and _props.size() > 1) {
+ //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
+ //}
for (size_t i = 0; i < _props.size(); i++)
{
@@ -288,6 +321,10 @@ private:
handle_overflow_type handle_overflow;
handle_flowctrl_type handle_flowctrl;
size_t fc_update_window;
+ /////// RFNOC ///////////
+ bool has_sid;
+ boost::uint32_t sid;
+ /////// RFNOC ///////////
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -360,6 +397,10 @@ private:
int recvd_packets;
#endif
+ #ifdef DEVICE3_STREAMER
+ uhd::rfnoc::rx_stream_terminator::sptr _terminator;
+ #endif
+
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -426,6 +467,7 @@ private:
const size_t expected_packet_count = _props[index].packet_count;
_props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
if (expected_packet_count != info.ifpi.packet_count){
+ //UHD_MSG(status) << "expected: " << expected_packet_count << " got: " << info.ifpi.packet_count << std::endl;
if (_props[index].handle_flowctrl) {
// Always update flow control in this case, because we don't
// know which packet was dropped and what state the upstream
@@ -447,6 +489,10 @@ private:
void _flush_all(double timeout)
{
+ get_prev_buffer_info().reset();
+ get_curr_buffer_info().reset();
+ get_next_buffer_info().reset();
+
for (size_t i = 0; i < _props.size(); i++)
{
per_buffer_info_type prev_buffer_info, curr_buffer_info;
@@ -467,9 +513,6 @@ private:
curr_buffer_info.reset();
}
}
- get_prev_buffer_info().reset();
- get_curr_buffer_info().reset();
- get_next_buffer_info().reset();
}
/*******************************************************************
@@ -567,16 +610,20 @@ private:
curr_info.metadata.time_spec = next_info[index].time;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ // Not sending flow control would cause timeouts due to source flow control locking up.
+ // Send first as the overrun handler may flush the receive buffers which could contain
+ // packets with sequence numbers after this packet's sequence number!
+ if(_props[index].handle_flowctrl) {
+ _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
+ }
+
rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
curr_info.metadata = metadata;
UHD_MSG(fastpath) << "O";
-
- // Not sending flow control would cause timeouts due to source flow control locking up
- if(_props[index].handle_flowctrl) {
- _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
- }
}
+ curr_info[index].buff.reset();
+ curr_info[index].copy_buff = NULL;
return;
case PACKET_TIMEOUT_ERROR: