aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp806
1 files changed, 437 insertions, 369 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 894287d6b..342d273a6 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -9,43 +9,43 @@
#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
#include <uhd/config.hpp>
-#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/byteswap.hpp>
-#include <uhd/utils/log.hpp>
-#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhdlib/rfnoc/rx_stream_terminator.hpp>
#include <boost/dynamic_bitset.hpp>
-#include <boost/function.hpp>
#include <boost/format.hpp>
+#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
// Included for debugging
#ifdef UHD_TXRX_DEBUG_PRINTS
-#include <boost/format.hpp>
-#include <boost/thread/thread.hpp>
-#include "boost/date_time/posix_time/posix_time.hpp"
+# include "boost/date_time/posix_time/posix_time.hpp"
+# include <boost/format.hpp>
+# include <boost/thread/thread.hpp>
#endif
-namespace uhd{ namespace transport{ namespace sph{
+namespace uhd { namespace transport { namespace sph {
UHD_INLINE uint32_t get_context_code(
- const uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info
-){
- //extract the context word (we dont know the endianness so mirror the bytes)
- uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] |
- uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
+ const uint32_t* vrt_hdr, const vrt::if_packet_info_t& if_packet_info)
+{
+ // extract the context word (we dont know the endianness so mirror the bytes)
+ uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32]
+ | uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
return word0 & 0xff;
}
typedef boost::function<void(void)> handle_overflow_type;
-static inline void handle_overflow_nop(void){}
+static inline void handle_overflow_nop(void) {}
/***********************************************************************
* Super receive packet handler
@@ -54,51 +54,58 @@ static inline void handle_overflow_nop(void){}
* The channel group shares a common sample rate.
* All channels are received in unison in recv().
**********************************************************************/
-class recv_packet_handler{
+class recv_packet_handler
+{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
typedef boost::function<void(const size_t)> handle_flowctrl_type;
- typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type;
+ typedef std::function<void(const uint32_t*)> handle_flowctrl_ack_type;
typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
- typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &);
- //typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
+ typedef void (*vrt_unpacker_type)(const uint32_t*, vrt::if_packet_info_t&);
+ // typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)>
+ // vrt_unpacker_type;
/*!
* Make a new packet handler for receive
* \param size the number of transport channels
*/
- recv_packet_handler(const size_t size = 1):
- _queue_error_for_next_call(false),
- _buffers_infos_index(0)
+ recv_packet_handler(const size_t size = 1)
+ : _queue_error_for_next_call(false), _buffers_infos_index(0)
{
- #ifdef ERROR_INJECT_DROPPED_PACKETS
+#ifdef ERROR_INJECT_DROPPED_PACKETS
recvd_packets = 0;
- #endif
+#endif
this->resize(size);
set_alignment_failure_threshold(1000);
}
- ~recv_packet_handler(void){
+ ~recv_packet_handler(void)
+ {
/* NOP */
}
//! Resize the number of transport channels
- void resize(const size_t size){
- if (this->size() == size) return;
+ void resize(const size_t size)
+ {
+ if (this->size() == size)
+ return;
_props.resize(size);
- //re-initialize all buffers infos by re-creating the vector
+ // re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
}
//! Get the channel width of this handler
- size_t size(void) const{
+ size_t size(void) const
+ {
return _props.size();
}
//! Setup the vrt unpacker function and offset
- void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){
- _vrt_unpacker = vrt_unpacker;
+ void set_vrt_unpacker(
+ const vrt_unpacker_type& vrt_unpacker, const size_t header_offset_words32 = 0)
+ {
+ _vrt_unpacker = vrt_unpacker;
_header_offset_words32 = header_offset_words32;
}
@@ -107,17 +114,20 @@ public:
* How many packets throw out before giving up?
* \param threshold number of packets per channel
*/
- void set_alignment_failure_threshold(const size_t threshold){
- _alignment_failure_threshold = threshold*this->size();
+ void set_alignment_failure_threshold(const size_t threshold)
+ {
+ _alignment_failure_threshold = threshold * this->size();
}
//! Set the rate of ticks per second
- void set_tick_rate(const double rate){
+ void set_tick_rate(const double rate)
+ {
_tick_rate = rate;
}
//! Set the rate of samples per second
- void set_samp_rate(const double rate){
+ void set_samp_rate(const double rate)
+ {
_samp_rate = rate;
}
@@ -126,9 +136,12 @@ public:
* \param xport_chan which transport channel
* \param get_buff the getter function
*/
- void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff, const bool flush = false){
- if (flush){
- while (get_buff(0.0)) {};
+ void set_xport_chan_get_buff(
+ const size_t xport_chan, const get_buff_type& get_buff, const bool flush = false)
+ {
+ if (flush) {
+ while (get_buff(0.0)) {
+ };
}
_props.at(xport_chan).get_buff = get_buff;
}
@@ -148,58 +161,67 @@ public:
* \param xport_chan which transport channel
* \param handle_flowctrl the callback function
*/
- void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
+ void set_xport_handle_flowctrl(const size_t xport_chan,
+ const handle_flowctrl_type& handle_flowctrl,
+ const size_t update_window,
+ const bool do_init = false)
{
_props.at(xport_chan).handle_flowctrl = handle_flowctrl;
- //we need the window size to be within the 0xfff (max 12 bit seq)
+ // we need the window size to be within the 0xfff (max 12 bit seq)
_props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
- if (do_init) handle_flowctrl(0);
+ if (do_init)
+ handle_flowctrl(0);
}
void set_xport_handle_flowctrl_ack(
- const size_t xport_chan,
- const handle_flowctrl_ack_type &handle_flowctrl_ack
- ) {
+ const size_t xport_chan, const handle_flowctrl_ack_type& handle_flowctrl_ack)
+ {
_props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack;
}
//! Set the conversion routine for all channels
- void set_converter(const uhd::convert::id_type &id){
+ void set_converter(const uhd::convert::id_type& id)
+ {
_num_outputs = id.num_outputs;
- _converter = uhd::convert::get_converter(id)();
- this->set_scale_factor(1/32767.); //update after setting converter
+ _converter = uhd::convert::get_converter(id)();
+ this->set_scale_factor(1 / 32767.); // update after setting converter
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
_bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.output_format);
}
//! Set the transport channel's overflow handler
- void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){
+ void set_overflow_handler(
+ const size_t xport_chan, const handle_overflow_type& handle_overflow)
+ {
_props.at(xport_chan).handle_overflow = handle_overflow;
}
//! Set the scale factor used in float conversion
- void set_scale_factor(const double scale_factor){
+ void set_scale_factor(const double scale_factor)
+ {
_converter->set_scalar(scale_factor);
}
//! Set the callback to issue stream commands
- void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
+ void set_issue_stream_cmd(
+ const size_t xport_chan, const issue_stream_cmd_type& issue_stream_cmd)
{
_props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
}
//! Overload call to issue stream commands
- void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd)
{
- if (size() > 1 and stream_cmd.stream_now and
- stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
- {
- throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align.");
+ if (size() > 1 and stream_cmd.stream_now
+ and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) {
+ throw uhd::runtime_error(
+ "Invalid recv stream command - stream now on multiple channels in a "
+ "single streamer will fail to time align.");
}
- for (size_t i = 0; i < _props.size(); i++)
- {
- if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
+ for (size_t i = 0; i < _props.size(); i++) {
+ if (_props[i].issue_stream_cmd)
+ _props[i].issue_stream_cmd(stream_cmd);
}
}
@@ -208,56 +230,57 @@ public:
* The entry point for the fast-path receive calls.
* Dispatch into combinations of single packet receive calls.
******************************************************************/
- UHD_INLINE size_t recv(
- const uhd::rx_streamer::buffs_type &buffs,
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
const size_t nsamps_per_buff,
- uhd::rx_metadata_t &metadata,
+ uhd::rx_metadata_t& metadata,
const double timeout,
- const bool one_packet
- ){
- //handle metadata queued from a previous receive
- if (_queue_error_for_next_call){
+ const bool one_packet)
+ {
+ // handle metadata queued from a previous receive
+ if (_queue_error_for_next_call) {
_queue_error_for_next_call = false;
- metadata = _queue_metadata;
- //We want to allow a full buffer recv to be cut short by a timeout,
- //but do not want to generate an inline timeout message packet.
- if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0;
+ metadata = _queue_metadata;
+ // We want to allow a full buffer recv to be cut short by a timeout,
+ // but do not want to generate an inline timeout message packet.
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT)
+ return 0;
}
- size_t accum_num_samps = recv_one_packet(
- buffs, nsamps_per_buff, metadata, timeout
- );
+ size_t accum_num_samps =
+ recv_one_packet(buffs, nsamps_per_buff, metadata, timeout);
- if (one_packet or metadata.end_of_burst){
+ if (one_packet or metadata.end_of_burst) {
#ifdef UHD_TXRX_DEBUG_PRINTS
- dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
+ dbg_gather_data(
+ nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
#endif
return accum_num_samps;
}
- //first recv had an error code set, return immediately
+ // first recv had an error code set, return immediately
if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
return accum_num_samps;
}
- //loop until buffer is filled or error code
- while(accum_num_samps < nsamps_per_buff){
- size_t num_samps = recv_one_packet(
- buffs, nsamps_per_buff - accum_num_samps, _queue_metadata,
- timeout, accum_num_samps*_bytes_per_cpu_item
- );
+ // loop until buffer is filled or error code
+ while (accum_num_samps < nsamps_per_buff) {
+ size_t num_samps = recv_one_packet(buffs,
+ nsamps_per_buff - accum_num_samps,
+ _queue_metadata,
+ timeout,
+ accum_num_samps * _bytes_per_cpu_item);
metadata.end_of_burst = _queue_metadata.end_of_burst;
- //metadata had an error code set, store for next call and return
- if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){
+ // metadata had an error code set, store for next call and return
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
_queue_error_for_next_call = true;
break;
}
accum_num_samps += num_samps;
- //return immediately if end of burst
+ // return immediately if end of burst
if (_queue_metadata.end_of_burst) {
break;
}
@@ -275,12 +298,12 @@ private:
bool _queue_error_for_next_call;
size_t _alignment_failure_threshold;
rx_metadata_t _queue_metadata;
- struct xport_chan_props_type{
- xport_chan_props_type(void):
- packet_count(0),
- handle_overflow(&handle_overflow_nop),
- fc_update_window(0)
- {}
+ struct xport_chan_props_type
+ {
+ xport_chan_props_type(void)
+ : packet_count(0), handle_overflow(&handle_overflow_nop), fc_update_window(0)
+ {
+ }
get_buff_type get_buff;
issue_stream_cmd_type issue_stream_cmd;
size_t packet_count;
@@ -291,65 +314,80 @@ private:
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
- size_t _bytes_per_otw_item; //used in conversion
- size_t _bytes_per_cpu_item; //used in conversion
- uhd::convert::converter::sptr _converter; //used in conversion
+ size_t _bytes_per_otw_item; // used in conversion
+ size_t _bytes_per_cpu_item; // used in conversion
+ uhd::convert::converter::sptr _converter; // used in conversion
//! information stored for a received buffer
- struct per_buffer_info_type{
+ struct per_buffer_info_type
+ {
void reset()
{
buff.reset();
- vrt_hdr = nullptr;
- time = 0;
+ vrt_hdr = nullptr;
+ time = 0;
copy_buff = nullptr;
}
managed_recv_buffer::sptr buff;
- const uint32_t *vrt_hdr;
+ const uint32_t* vrt_hdr;
vrt::if_packet_info_t ifpi;
uint64_t time;
- const char *copy_buff;
+ const char* copy_buff;
};
- //!information stored for a set of aligned buffers
- struct buffers_info_type : std::vector<per_buffer_info_type> {
- buffers_info_type(const size_t size):
- std::vector<per_buffer_info_type>(size),
- indexes_todo(size, true),
- alignment_time(0),
- alignment_time_valid(false),
- data_bytes_to_copy(0),
- fragment_offset_in_samps(0)
- {/* NOP */}
+ //! information stored for a set of aligned buffers
+ struct buffers_info_type : std::vector<per_buffer_info_type>
+ {
+ buffers_info_type(const size_t size)
+ : std::vector<per_buffer_info_type>(size)
+ , indexes_todo(size, true)
+ , alignment_time(0)
+ , alignment_time_valid(false)
+ , data_bytes_to_copy(0)
+ , fragment_offset_in_samps(0)
+ { /* NOP */
+ }
void reset()
{
indexes_todo.set();
- alignment_time = 0;
- alignment_time_valid = false;
- data_bytes_to_copy = 0;
+ alignment_time = 0;
+ alignment_time_valid = false;
+ data_bytes_to_copy = 0;
fragment_offset_in_samps = 0;
metadata.reset();
for (size_t i = 0; i < size(); i++)
at(i).reset();
}
- boost::dynamic_bitset<> indexes_todo; //used in alignment logic
- uint64_t alignment_time; //used in alignment logic
- bool alignment_time_valid; //used in alignment logic
- size_t data_bytes_to_copy; //keeps track of state
- size_t fragment_offset_in_samps; //keeps track of state
- rx_metadata_t metadata; //packet description
+ boost::dynamic_bitset<> indexes_todo; // used in alignment logic
+ uint64_t alignment_time; // used in alignment logic
+ bool alignment_time_valid; // used in alignment logic
+ size_t data_bytes_to_copy; // keeps track of state
+ size_t fragment_offset_in_samps; // keeps track of state
+ rx_metadata_t metadata; // packet description
};
//! a circular queue of buffer infos
std::vector<buffers_info_type> _buffers_infos;
size_t _buffers_infos_index;
- buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];}
- buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];}
- buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];}
- void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;}
+ buffers_info_type& get_curr_buffer_info(void)
+ {
+ return _buffers_infos[_buffers_infos_index];
+ }
+ buffers_info_type& get_prev_buffer_info(void)
+ {
+ return _buffers_infos[(_buffers_infos_index + 3) % 4];
+ }
+ buffers_info_type& get_next_buffer_info(void)
+ {
+ return _buffers_infos[(_buffers_infos_index + 1) % 4];
+ }
+ void increment_buffer_info(void)
+ {
+ _buffers_infos_index = (_buffers_infos_index + 1) % 4;
+ }
//! possible return options for the packet receiver
- enum packet_type{
+ enum packet_type {
PACKET_IF_DATA,
PACKET_TIMESTAMP_ERROR,
PACKET_INLINE_MESSAGE,
@@ -357,9 +395,9 @@ private:
PACKET_SEQUENCE_ERROR
};
- #ifdef ERROR_INJECT_DROPPED_PACKETS
+#ifdef ERROR_INJECT_DROPPED_PACKETS
int recvd_packets;
- #endif
+#endif
/*******************************************************************
* Get and process a single packet from the transport:
@@ -367,57 +405,56 @@ private:
* Extract all the relevant info and store.
* Check the info to determine the return code.
******************************************************************/
- UHD_INLINE packet_type get_and_process_single_packet(
- const size_t index,
- per_buffer_info_type &prev_buffer_info,
- per_buffer_info_type &curr_buffer_info,
- double timeout
- ){
- managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
- per_buffer_info_type &info = curr_buffer_info;
- while (1)
- {
- //get a single packet from the transport layer
+ UHD_INLINE packet_type get_and_process_single_packet(const size_t index,
+ per_buffer_info_type& prev_buffer_info,
+ per_buffer_info_type& curr_buffer_info,
+ double timeout)
+ {
+ managed_recv_buffer::sptr& buff = curr_buffer_info.buff;
+ per_buffer_info_type& info = curr_buffer_info;
+ while (1) {
+ // get a single packet from the transport layer
buff = _props[index].get_buff(timeout);
- if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
+ if (buff.get() == nullptr)
+ return PACKET_TIMEOUT_ERROR;
- #ifdef ERROR_INJECT_DROPPED_PACKETS
- if (++recvd_packets > 1000)
- {
+#ifdef ERROR_INJECT_DROPPED_PACKETS
+ if (++recvd_packets > 1000) {
recvd_packets = 0;
buff.reset();
buff = _props[index].get_buff(timeout);
- if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
+ if (buff.get() == nullptr)
+ return PACKET_TIMEOUT_ERROR;
}
- #endif
+#endif
- //bounds check before extract
- const size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
- if (num_packet_words32 <= _header_offset_words32){
+ // bounds check before extract
+ const size_t num_packet_words32 = buff->size() / sizeof(uint32_t);
+ if (num_packet_words32 <= _header_offset_words32) {
throw std::runtime_error("recv buffer smaller than vrt packet offset");
}
- //extract packet info
- memset(&info.ifpi, 0, sizeof (vrt::if_packet_info_t));
+ // extract packet info
+ memset(&info.ifpi, 0, sizeof(vrt::if_packet_info_t));
info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
- info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
+ info.vrt_hdr = buff->cast<const uint32_t*>() + _header_offset_words32;
_vrt_unpacker(info.vrt_hdr, info.ifpi);
- info.time = info.ifpi.tsf; //assumes has_tsf is true
- info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
-
- //handle flow control
- if (_props[index].handle_flowctrl)
- {
- if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
- {
+ info.time = info.ifpi.tsf; // assumes has_tsf is true
+ info.copy_buff = reinterpret_cast<const char*>(
+ info.vrt_hdr + info.ifpi.num_header_words32);
+
+ // handle flow control
+ if (_props[index].handle_flowctrl) {
+ if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) {
_props[index].handle_flowctrl(info.ifpi.packet_count);
}
}
- //handle flow control ack
- if (info.ifpi.fc_ack){
+ // handle flow control ack
+ if (info.ifpi.fc_ack) {
if (_props[index].handle_flowctrl_ack) {
- _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff));
+ _props[index].handle_flowctrl_ack(
+ reinterpret_cast<const uint32_t*>(info.copy_buff));
}
// Process the next packet
buff.reset();
@@ -433,18 +470,20 @@ private:
//-- The order of these checks is HOLY.
//--------------------------------------------------------------
- //1) check for inline IF message packets
- if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
+ // 1) check for inline IF message packets
+ if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA) {
return PACKET_INLINE_MESSAGE;
}
- //2) check for sequence errors
- #ifndef SRPH_DONT_CHECK_SEQUENCE
- const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
+// 2) check for sequence errors
+#ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t seq_mask =
+ (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE) ? 0xf : 0xfff;
const size_t expected_packet_count = _props[index].packet_count;
- _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
- if (expected_packet_count != info.ifpi.packet_count){
- //UHD_LOGGER_INFO("STREAMER") << "expected: " << expected_packet_count << " got: " << info.ifpi.packet_count;
+ _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
+ if (expected_packet_count != info.ifpi.packet_count) {
+ // UHD_LOGGER_INFO("STREAMER") << "expected: " << expected_packet_count << "
+ // got: " << info.ifpi.packet_count;
if (_props[index].handle_flowctrl) {
// Always update flow control in this case, because we don't
// know which packet was dropped and what state the upstream
@@ -453,14 +492,14 @@ private:
}
return PACKET_SEQUENCE_ERROR;
}
- #endif
+#endif
- //3) check for out of order timestamps
- if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){
+ // 3) check for out of order timestamps
+ if (info.ifpi.has_tsf and prev_buffer_info.time > info.time) {
return PACKET_TIMESTAMP_ERROR;
}
- //4) otherwise the packet is normal!
+ // 4) otherwise the packet is normal!
return PACKET_IF_DATA;
}
@@ -470,25 +509,22 @@ private:
get_curr_buffer_info().reset();
get_next_buffer_info().reset();
- for (size_t i = 0; i < _props.size(); i++)
- {
+ for (size_t i = 0; i < _props.size(); i++) {
per_buffer_info_type prev_buffer_info, curr_buffer_info;
prev_buffer_info.reset();
curr_buffer_info.reset();
- while (true)
- {
- //receive a single packet from the transport
- try
- {
+ while (true) {
+ // receive a single packet from the transport
+ try {
// call into get_and_process_single_packet()
// to make sure flow control is handled
if (get_and_process_single_packet(
- i,
- prev_buffer_info,
- curr_buffer_info,
- timeout) == PACKET_TIMEOUT_ERROR) break;
- } catch(...){}
- curr_buffer_info.buff.reset(); // Let my buffer go!
+ i, prev_buffer_info, curr_buffer_info, timeout)
+ == PACKET_TIMEOUT_ERROR)
+ break;
+ } catch (...) {
+ }
+ curr_buffer_info.buff.reset(); // Let my buffer go!
prev_buffer_info = curr_buffer_info;
curr_buffer_info.reset();
}
@@ -499,34 +535,31 @@ private:
* Alignment check:
* Check the received packet for alignment and mark accordingly.
******************************************************************/
- UHD_INLINE void alignment_check(
- const size_t index, buffers_info_type &info
- ){
- //if alignment time was not valid or if the sequence id is newer:
+ UHD_INLINE void alignment_check(const size_t index, buffers_info_type& info)
+ {
+ // if alignment time was not valid or if the sequence id is newer:
// use this index's time as the alignment time
// reset the indexes list and remove this index
- if (not info.alignment_time_valid or info[index].time > info.alignment_time){
+ if (not info.alignment_time_valid or info[index].time > info.alignment_time) {
info.alignment_time_valid = true;
- info.alignment_time = info[index].time;
+ info.alignment_time = info[index].time;
info.indexes_todo.set();
info.indexes_todo.reset(index);
// release the other buffers
- for (size_t i = 0; i < info.size(); i++)
- {
- if (i != index)
- {
+ for (size_t i = 0; i < info.size(); i++) {
+ if (i != index) {
info[i].reset();
}
}
info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes;
// reset start_of_burst and end_of_burst states
info.metadata.start_of_burst = info[index].ifpi.sob;
- info.metadata.end_of_burst = info[index].ifpi.eob;
+ info.metadata.end_of_burst = info[index].ifpi.eob;
}
- //if the sequence id matches:
+ // if the sequence id matches:
// remove this index from the list and continue
- else if (info[index].time == info.alignment_time){
+ else if (info[index].time == info.alignment_time) {
info.indexes_todo.reset(index);
// All channels should have sob set at the same time, so only
// set start_of burst if all channels have sob set.
@@ -539,9 +572,9 @@ private:
info[index].reset();
}
- //if the sequence id is older:
+ // if the sequence id is older:
// continue with the same index to try again
- //else if (info[index].time < info.alignment_time)...
+ // else if (info[index].time < info.alignment_time)...
}
/*******************************************************************
@@ -550,127 +583,139 @@ private:
* Handle all of the edge cases like inline messages and errors.
* The logic will throw out older packets until it finds a match.
******************************************************************/
- UHD_INLINE void get_aligned_buffs(double timeout){
-
- get_prev_buffer_info().reset(); // no longer need the previous info - reset it for future use
+ UHD_INLINE void get_aligned_buffs(double timeout)
+ {
+ get_prev_buffer_info()
+ .reset(); // no longer need the previous info - reset it for future use
- increment_buffer_info(); //increment to next buffer
+ increment_buffer_info(); // increment to next buffer
- buffers_info_type &prev_info = get_prev_buffer_info();
- buffers_info_type &curr_info = get_curr_buffer_info();
- buffers_info_type &next_info = get_next_buffer_info();
+ buffers_info_type& prev_info = get_prev_buffer_info();
+ buffers_info_type& curr_info = get_curr_buffer_info();
+ buffers_info_type& next_info = get_next_buffer_info();
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
- //Loop until we get a message of an aligned set of buffers:
+ // Loop until we get a message of an aligned set of buffers:
// - Receive a single packet and extract its info.
// - Handle the packet type yielded by the receive.
// - Check the timestamps for alignment conditions.
size_t iterations = 0;
- while (curr_info.indexes_todo.any()){
-
- //get the index to process for this iteration
+ while (curr_info.indexes_todo.any()) {
+ // get the index to process for this iteration
const size_t index = curr_info.indexes_todo.find_first();
packet_type packet;
- //receive a single packet from the transport
- try{
+ // receive a single packet from the transport
+ try {
packet = get_and_process_single_packet(
- index, prev_info[index], curr_info[index], timeout
- );
+ index, prev_info[index], curr_info[index], timeout);
}
- //handle the case where a bad header exists
- catch(const uhd::value_error &e){
- UHD_LOGGER_ERROR("STREAMER") << boost::format(
- "The receive packet handler caught a value exception.\n%s"
- ) % e.what();
- std::swap(curr_info, next_info); //save progress from curr -> next
+ // handle the case where a bad header exists
+ catch (const uhd::value_error& e) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive packet handler caught a value exception.\n%s")
+ % e.what();
+ std::swap(curr_info, next_info); // save progress from curr -> next
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
return;
}
- switch(packet){
- case PACKET_IF_DATA:
- alignment_check(index, curr_info);
- break;
-
- case PACKET_TIMESTAMP_ERROR:
- //If the user changes the device time while streaming or without flushing,
- //we can receive a packet that comes before the previous packet in time.
- //This could cause the alignment logic to discard future received packets.
- //Therefore, when this occurs, we reset the info to restart from scratch.
- if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){
- curr_info.alignment_time_valid = false;
- }
- alignment_check(index, curr_info);
- break;
+ switch (packet) {
+ case PACKET_IF_DATA:
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_TIMESTAMP_ERROR:
+ // If the user changes the device time while streaming or without
+ // flushing, we can receive a packet that comes before the previous
+ // packet in time. This could cause the alignment logic to discard
+ // future received packets. Therefore, when this occurs, we reset the
+ // info to restart from scratch.
+ if (curr_info.alignment_time_valid
+ and curr_info.alignment_time != curr_info[index].time) {
+ curr_info.alignment_time_valid = false;
+ }
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_INLINE_MESSAGE:
+ curr_info[index].buff.reset(); // No data, so release the buffer
+ curr_info[index].copy_buff = nullptr;
+ std::swap(curr_info, next_info); // save progress from curr -> next
+ curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
+ curr_info.metadata.time_spec =
+ time_spec_t::from_ticks(next_info[index].time, _tick_rate);
+ curr_info.metadata.error_code =
+ rx_metadata_t::error_code_t(get_context_code(
+ next_info[index].vrt_hdr, next_info[index].ifpi));
+ if (curr_info.metadata.error_code
+ == rx_metadata_t::ERROR_CODE_OVERFLOW) {
+ // Not sending flow control would cause timeouts due to source
+ // flow control locking up. Send first as the overrun handler may
+ // flush the receive buffers which could contain packets with
+ // sequence numbers after this packet's sequence number!
+ if (_props[index].handle_flowctrl) {
+ _props[index].handle_flowctrl(
+ next_info[index].ifpi.packet_count);
+ }
+
+ rx_metadata_t metadata = curr_info.metadata;
+ _props[index].handle_overflow();
+ curr_info.metadata = metadata;
+ UHD_LOG_FASTPATH("O");
+ }
+ return;
- case PACKET_INLINE_MESSAGE:
- curr_info[index].buff.reset(); // No data, so release the buffer
- curr_info[index].copy_buff = nullptr;
- std::swap(curr_info, next_info); //save progress from curr -> next
- curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
- curr_info.metadata.time_spec = time_spec_t::from_ticks(next_info[index].time, _tick_rate);
- curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
- if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
- // Not sending flow control would cause timeouts due to source flow control locking up.
- // Send first as the overrun handler may flush the receive buffers which could contain
- // packets with sequence numbers after this packet's sequence number!
- if(_props[index].handle_flowctrl) {
+ case PACKET_TIMEOUT_ERROR:
+ std::swap(curr_info, next_info); // save progress from curr -> next
+ if (_props[index].handle_flowctrl) {
_props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
}
-
- rx_metadata_t metadata = curr_info.metadata;
- _props[index].handle_overflow();
- curr_info.metadata = metadata;
- UHD_LOG_FASTPATH("O");
- }
- return;
-
- case PACKET_TIMEOUT_ERROR:
- std::swap(curr_info, next_info); //save progress from curr -> next
- if(_props[index].handle_flowctrl) {
- _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
- }
- curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
- return;
-
- case PACKET_SEQUENCE_ERROR:
- alignment_check(index, curr_info);
- std::swap(curr_info, next_info); //save progress from curr -> next
- curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
- curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks(
- prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate);
- curr_info.metadata.out_of_sequence = true;
- curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
- UHD_LOG_FASTPATH("D");
- return;
-
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return;
+
+ case PACKET_SEQUENCE_ERROR:
+ alignment_check(index, curr_info);
+ std::swap(curr_info, next_info); // save progress from curr -> next
+ curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
+ curr_info.metadata.time_spec =
+ prev_info.metadata.time_spec
+ + time_spec_t::from_ticks(
+ prev_info[index].ifpi.num_payload_words32 * sizeof(uint32_t)
+ / _bytes_per_otw_item,
+ _samp_rate);
+ curr_info.metadata.out_of_sequence = true;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ UHD_LOG_FASTPATH("D");
+ return;
}
- //too many iterations: detect alignment failure
- if (iterations++ > _alignment_failure_threshold){
- UHD_LOGGER_ERROR("STREAMER") << boost::format(
- "The receive packet handler failed to time-align packets.\n"
- "%u received packets were processed by the handler.\n"
- "However, a timestamp match could not be determined.\n"
- ) % iterations << std::endl;
- std::swap(curr_info, next_info); //save progress from curr -> next
+ // too many iterations: detect alignment failure
+ if (iterations++ > _alignment_failure_threshold) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n")
+ % iterations
+ << std::endl;
+ std::swap(curr_info, next_info); // save progress from curr -> next
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
_props[index].handle_overflow();
return;
}
-
}
- //set the metadata from the buffer information at index zero
+ // set the metadata from the buffer information at index zero
curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf;
- curr_info.metadata.time_spec = time_spec_t::from_ticks(curr_info[0].time, _tick_rate);
- curr_info.metadata.more_fragments = false;
+ curr_info.metadata.time_spec =
+ time_spec_t::from_ticks(curr_info[0].time, _tick_rate);
+ curr_info.metadata.more_fragments = false;
curr_info.metadata.fragment_offset = 0;
- curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
-
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
}
/*******************************************************************
@@ -679,50 +724,50 @@ private:
* When no fragments are available, call the get aligned buffers.
* Then copy-convert available data into the user's IO buffers.
******************************************************************/
- UHD_INLINE size_t recv_one_packet(
- const uhd::rx_streamer::buffs_type &buffs,
+ UHD_INLINE size_t recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
const size_t nsamps_per_buff,
- uhd::rx_metadata_t &metadata,
+ uhd::rx_metadata_t& metadata,
const double timeout,
- const size_t buffer_offset_bytes = 0
- ){
- //get the next buffer if the current one has expired
- if (get_curr_buffer_info().data_bytes_to_copy == 0)
- {
- //perform receive with alignment logic
+ const size_t buffer_offset_bytes = 0)
+ {
+ // get the next buffer if the current one has expired
+ if (get_curr_buffer_info().data_bytes_to_copy == 0) {
+ // perform receive with alignment logic
get_aligned_buffs(timeout);
}
- buffers_info_type &info = get_curr_buffer_info();
- metadata = info.metadata;
+ buffers_info_type& info = get_curr_buffer_info();
+ metadata = info.metadata;
- //interpolate the time spec (useful when this is a fragment)
- metadata.time_spec += time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate);
+ // interpolate the time spec (useful when this is a fragment)
+ metadata.time_spec +=
+ time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate);
- //extract the number of samples available to copy
- const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
- const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
- const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
- const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;
+ // extract the number of samples available to copy
+ const size_t nsamps_available = info.data_bytes_to_copy / _bytes_per_otw_item;
+ const size_t nsamps_to_copy =
+ std::min(nsamps_per_buff * _num_outputs, nsamps_available);
+ const size_t bytes_to_copy = nsamps_to_copy * _bytes_per_otw_item;
+ const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy / _num_outputs;
- //setup the data to share with converter threads
- _convert_nsamps = nsamps_to_copy_per_io_buff;
- _convert_buffs = &buffs;
+ // setup the data to share with converter threads
+ _convert_nsamps = nsamps_to_copy_per_io_buff;
+ _convert_buffs = &buffs;
_convert_buffer_offset_bytes = buffer_offset_bytes;
- _convert_bytes_to_copy = bytes_to_copy;
+ _convert_bytes_to_copy = bytes_to_copy;
- //perform N channels of conversion
+ // perform N channels of conversion
for (size_t i = 0; i < this->size(); i++) {
convert_to_out_buff(i);
}
- //update the copy buffer's availability
+ // update the copy buffer's availability
info.data_bytes_to_copy -= bytes_to_copy;
- //setup the fragment flags and offset
- metadata.more_fragments = info.data_bytes_to_copy != 0;
+ // setup the fragment flags and offset
+ metadata.more_fragments = info.data_bytes_to_copy != 0;
metadata.fragment_offset = info.fragment_offset_in_samps;
- info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
+ info.fragment_offset_in_samps += nsamps_to_copy; // set for next call
return nsamps_to_copy_per_io_buff;
}
@@ -736,34 +781,34 @@ private:
*/
inline void convert_to_out_buff(const size_t index)
{
- //shortcut references to local data structures
- buffers_info_type &buff_info = get_curr_buffer_info();
- per_buffer_info_type &info = buff_info[index];
- const rx_streamer::buffs_type &buffs = *_convert_buffs;
-
- //fill IO buffs with pointers into the output buffer
- void *io_buffs[4/*max interleave*/];
- for (size_t i = 0; i < _num_outputs; i++){
- char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
+ // shortcut references to local data structures
+ buffers_info_type& buff_info = get_curr_buffer_info();
+ per_buffer_info_type& info = buff_info[index];
+ const rx_streamer::buffs_type& buffs = *_convert_buffs;
+
+ // fill IO buffs with pointers into the output buffer
+ void* io_buffs[4 /*max interleave*/];
+ for (size_t i = 0; i < _num_outputs; i++) {
+ char* b = reinterpret_cast<char*>(buffs[index * _num_outputs + i]);
io_buffs[i] = b + _convert_buffer_offset_bytes;
}
- const ref_vector<void *> out_buffs(io_buffs, _num_outputs);
+ const ref_vector<void*> out_buffs(io_buffs, _num_outputs);
- //perform the conversion operation
+ // perform the conversion operation
_converter->conv(info.copy_buff, out_buffs, _convert_nsamps);
- //advance the pointer for the source buffer
+ // advance the pointer for the source buffer
info.copy_buff += _convert_bytes_to_copy;
- //release the buffer if fully consumed
- if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
- info.buff.reset(); //effectively a release
+ // release the buffer if fully consumed
+ if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy) {
+ info.buff.reset(); // effectively a release
}
}
//! Shared variables for the worker threads
size_t _convert_nsamps;
- const rx_streamer::buffs_type *_convert_buffs;
+ const rx_streamer::buffs_type* _convert_buffs;
size_t _convert_buffer_offset_bytes;
size_t _convert_bytes_to_copy;
@@ -773,10 +818,24 @@ private:
* Gathered data can be used to post process it with external tools.
*/
#ifdef UHD_TXRX_DEBUG_PRINTS
- struct dbg_recv_stat_t {
- dbg_recv_stat_t(long wc, size_t nspb, size_t nsr, uhd::rx_metadata_t md, double to, bool op, double rate):
- wallclock(wc), nsamps_per_buff(nspb), nsamps_recv(nsr), metadata(md), timeout(to), one_packet(op), samp_rate(rate)
- {}
+ struct dbg_recv_stat_t
+ {
+ dbg_recv_stat_t(long wc,
+ size_t nspb,
+ size_t nsr,
+ uhd::rx_metadata_t md,
+ double to,
+ bool op,
+ double rate)
+ : wallclock(wc)
+ , nsamps_per_buff(nspb)
+ , nsamps_recv(nsr)
+ , metadata(md)
+ , timeout(to)
+ , one_packet(op)
+ , samp_rate(rate)
+ {
+ }
long wallclock;
size_t nsamps_per_buff;
size_t nsamps_recv;
@@ -785,42 +844,47 @@ private:
bool one_packet;
double samp_rate;
// Create a formatted print line for all the info gathered in this struct.
- std::string print_line() {
+ std::string print_line()
+ {
boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld");
fmt % wallclock;
- fmt % timeout % (int)nsamps_per_buff % (int) nsamps_recv;
- fmt % (one_packet ? "true":"false");
+ fmt % timeout % (int)nsamps_per_buff % (int)nsamps_recv;
+ fmt % (one_packet ? "true" : "false");
fmt % metadata.error_code;
- fmt % (metadata.start_of_burst ? "true":"false") % (metadata.end_of_burst ? "true":"false");
- fmt % (metadata.more_fragments ? "true":"false") % (int)metadata.fragment_offset;
- fmt % (metadata.has_time_spec ? "true":"false") % metadata.time_spec.to_ticks(samp_rate);
+ fmt % (metadata.start_of_burst ? "true" : "false")
+ % (metadata.end_of_burst ? "true" : "false");
+ fmt % (metadata.more_fragments ? "true" : "false")
+ % (int)metadata.fragment_offset;
+ fmt % (metadata.has_time_spec ? "true" : "false")
+ % metadata.time_spec.to_ticks(samp_rate);
return fmt.str();
}
};
- void dbg_gather_data(const size_t nsamps_per_buff, const size_t nsamps_recv,
- uhd::rx_metadata_t &metadata, const double timeout,
- const bool one_packet,
- bool dbg_print_directly = true
- )
+ void dbg_gather_data(const size_t nsamps_per_buff,
+ const size_t nsamps_recv,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet,
+ bool dbg_print_directly = true)
{
- // Initialize a struct with all available data. It can return a formatted string with all infos if wanted.
+ // Initialize a struct with all available data. It can return a formatted string
+ // with all infos if wanted.
dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(),
- nsamps_per_buff,
- nsamps_recv,
- metadata,
- timeout,
- one_packet,
- _samp_rate
- );
- if(dbg_print_directly) {
+ nsamps_per_buff,
+ nsamps_recv,
+ metadata,
+ timeout,
+ one_packet,
+ _samp_rate);
+ if (dbg_print_directly) {
dbg_print_err(data.print_line());
}
}
-
- void dbg_print_err(std::string msg) {
+ void dbg_print_err(std::string msg)
+ {
std::string dbg_prefix("super_recv_packet_handler,");
msg = dbg_prefix + msg;
fprintf(stderr, "%s\n", msg.c_str());
@@ -828,31 +892,35 @@ private:
#endif
};
-class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
+class recv_packet_streamer : public recv_packet_handler, public rx_streamer
+{
public:
- recv_packet_streamer(const size_t max_num_samps){
+ recv_packet_streamer(const size_t max_num_samps)
+ {
_max_num_samps = max_num_samps;
}
- size_t get_num_channels(void) const{
+ size_t get_num_channels(void) const
+ {
return this->size();
}
- size_t get_max_num_samps(void) const{
+ size_t get_max_num_samps(void) const
+ {
return _max_num_samps;
}
- size_t recv(
- const rx_streamer::buffs_type &buffs,
+ size_t recv(const rx_streamer::buffs_type& buffs,
const size_t nsamps_per_buff,
- uhd::rx_metadata_t &metadata,
+ uhd::rx_metadata_t& metadata,
const double timeout,
- const bool one_packet
- ){
- return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
+ const bool one_packet)
+ {
+ return recv_packet_handler::recv(
+ buffs, nsamps_per_buff, metadata, timeout, one_packet);
}
- void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd)
{
return recv_packet_handler::issue_stream_cmd(stream_cmd);
}
@@ -861,6 +929,6 @@ private:
size_t _max_num_samps;
};
-}}} //namespace
+}}} // namespace uhd::transport::sph
#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */