aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp50
1 files changed, 49 insertions, 1 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5a75d5f0d..75d1f3068 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){}
class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -139,6 +141,19 @@ public:
_props.at(xport_chan).get_buff = get_buff;
}
+ /*!
+ * Set the function to handle flow control
+ * \param xport_chan which transport channel
+ * \param handle_flowctrl the callback function
+ */
+ void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
+ {
+ _props.at(xport_chan).handle_flowctrl = handle_flowctrl;
+ //we need the window size to be within the 0xfff (max 12 bit seq)
+ _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
+ if (do_init) handle_flowctrl(0);
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -158,6 +173,21 @@ public:
_converter->set_scalar(scale_factor);
}
+ //! Set the callback to issue stream commands
+ void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
+ {
+ _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
+ }
+
+ //! Overload call to issue stream commands
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
+ }
+ }
+
/*******************************************************************
* Receive:
* The entry point for the fast-path receive calls.
@@ -219,8 +249,11 @@ private:
handle_overflow(&handle_overflow_nop)
{}
get_buff_type get_buff;
+ issue_stream_cmd_type issue_stream_cmd;
size_t packet_count;
handle_overflow_type handle_overflow;
+ handle_flowctrl_type handle_flowctrl;
+ size_t fc_update_window;
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -302,6 +335,15 @@ private:
info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+ //handle flow control
+ if (_props[index].handle_flowctrl)
+ {
+ if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
+ }
+
//--------------------------------------------------------------
//-- Determine return conditions:
//-- The order of these checks is HOLY.
@@ -314,8 +356,9 @@ private:
//2) check for sequence errors
#ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
const size_t expected_packet_count = _props[index].packet_count;
- _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
+ _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
if (expected_packet_count != info.ifpi.packet_count){
return PACKET_SEQUENCE_ERROR;
}
@@ -622,6 +665,11 @@ public:
return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
}
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ return recv_packet_handler::issue_stream_cmd(stream_cmd);
+ }
+
private:
size_t _max_num_samps;
};