summaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp63
1 files changed, 38 insertions, 25 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5fdf2594d..5c84327a4 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -154,28 +154,12 @@ public:
/*!
* Flush all transports in the streamer:
- * This calls into get_and_process_single_packet(),
- * so the sequence and flow control are handled.
- * However, the packet payload is discarded.
+ * The packet payload is discarded.
*/
void flush_all(const double timeout = 0.0)
{
- increment_buffer_info(); //increment to next buffer
-
- for (size_t i = 0; i < _props.size(); i++)
- {
- while (true) //while (_props.at(i).get_buff(timeout));
- {
- //receive a single packet from the transport
- try
- {
- if (get_and_process_single_packet(i,
- get_prev_buffer_info(),
- get_curr_buffer_info(),
- timeout) == PACKET_TIMEOUT_ERROR) break;
- }catch(...){}
- }
- }
+ _flush_all(timeout);
+ return;
}
/*!
@@ -379,12 +363,12 @@ private:
******************************************************************/
UHD_INLINE packet_type get_and_process_single_packet(
const size_t index,
- buffers_info_type &prev_buffer_info,
- buffers_info_type &curr_buffer_info,
+ per_buffer_info_type &prev_buffer_info,
+ per_buffer_info_type &curr_buffer_info,
double timeout
){
//get a single packet from the transport layer
- managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff;
+ managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
buff = _props[index].get_buff(timeout);
if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
@@ -405,7 +389,7 @@ private:
}
//extract packet info
- per_buffer_info_type &info = curr_buffer_info[index];
+ per_buffer_info_type &info = curr_buffer_info;
info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
_vrt_unpacker(info.vrt_hdr, info.ifpi);
@@ -442,7 +426,7 @@ private:
#endif
//3) check for out of order timestamps
- if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
+ if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){
return PACKET_TIMESTAMP_ERROR;
}
@@ -450,6 +434,33 @@ private:
return PACKET_IF_DATA;
}
+ void _flush_all(double timeout)
+ {
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ per_buffer_info_type prev_buffer_info, curr_buffer_info;
+ while (true)
+ {
+ //receive a single packet from the transport
+ try
+ {
+ // call into get_and_process_single_packet()
+ // to make sure flow control is handled
+ if (get_and_process_single_packet(
+ i,
+ prev_buffer_info,
+ curr_buffer_info,
+ timeout) == PACKET_TIMEOUT_ERROR) break;
+ } catch(...){}
+ prev_buffer_info = curr_buffer_info;
+ curr_buffer_info.reset();
+ }
+ }
+ get_prev_buffer_info().reset();
+ get_curr_buffer_info().reset();
+ get_next_buffer_info().reset();
+ }
+
/*******************************************************************
* Alignment check:
* Check the received packet for alignment and mark accordingly.
@@ -509,7 +520,7 @@ private:
//receive a single packet from the transport
try{
packet = get_and_process_single_packet(
- index, prev_info, curr_info, timeout
+ index, prev_info[index], curr_info[index], timeout
);
}
@@ -545,7 +556,9 @@ private:
curr_info.metadata.time_spec = next_info[index].time;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
+ curr_info.metadata = metadata;
UHD_MSG(fastpath) << "O";
}
return;