diff options
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 63 | 
1 files changed, 38 insertions, 25 deletions
| diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5fdf2594d..5c84327a4 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -154,28 +154,12 @@ public:      /*!       * Flush all transports in the streamer: -     * This calls into get_and_process_single_packet(), -     * so the sequence and flow control are handled. -     * However, the packet payload is discarded. +     * The packet payload is discarded.       */      void flush_all(const double timeout = 0.0)      { -        increment_buffer_info(); //increment to next buffer - -        for (size_t i = 0; i < _props.size(); i++) -        { -            while (true) //while (_props.at(i).get_buff(timeout)); -            { -                //receive a single packet from the transport -                try -                { -                    if (get_and_process_single_packet(i, -                        get_prev_buffer_info(), -                        get_curr_buffer_info(), -                    timeout) == PACKET_TIMEOUT_ERROR) break; -                }catch(...){} -            } -        } +        _flush_all(timeout); +        return;      }      /*! @@ -379,12 +363,12 @@ private:       ******************************************************************/      UHD_INLINE packet_type get_and_process_single_packet(          const size_t index, -        buffers_info_type &prev_buffer_info, -        buffers_info_type &curr_buffer_info, +        per_buffer_info_type &prev_buffer_info, +        per_buffer_info_type &curr_buffer_info,          double timeout      ){          //get a single packet from the transport layer -        managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; +        managed_recv_buffer::sptr &buff = curr_buffer_info.buff;          buff = _props[index].get_buff(timeout);          if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; @@ -405,7 +389,7 @@ private:          }          //extract packet info -        per_buffer_info_type &info = curr_buffer_info[index]; +        per_buffer_info_type &info = curr_buffer_info;          info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;          info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;          _vrt_unpacker(info.vrt_hdr, info.ifpi); @@ -442,7 +426,7 @@ private:          #endif          //3) check for out of order timestamps -        if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ +        if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){              return PACKET_TIMESTAMP_ERROR;          } @@ -450,6 +434,33 @@ private:          return PACKET_IF_DATA;      } +    void _flush_all(double timeout) +    { +        for (size_t i = 0; i < _props.size(); i++) +        { +            per_buffer_info_type prev_buffer_info, curr_buffer_info; +            while (true) +            { +                //receive a single packet from the transport +                try +                { +                    // call into get_and_process_single_packet() +                    // to make sure flow control is handled +                    if (get_and_process_single_packet( +                            i, +                            prev_buffer_info, +                            curr_buffer_info, +                            timeout) == PACKET_TIMEOUT_ERROR) break; +                } catch(...){} +                prev_buffer_info = curr_buffer_info; +                curr_buffer_info.reset(); +            } +        } +        get_prev_buffer_info().reset(); +        get_curr_buffer_info().reset(); +        get_next_buffer_info().reset(); +    } +      /*******************************************************************       * Alignment check:       * Check the received packet for alignment and mark accordingly. @@ -509,7 +520,7 @@ private:              //receive a single packet from the transport              try{                  packet = get_and_process_single_packet( -                    index, prev_info, curr_info, timeout +                    index, prev_info[index], curr_info[index], timeout                  );              } @@ -545,7 +556,9 @@ private:                  curr_info.metadata.time_spec = next_info[index].time;                  curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));                  if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ +                    rx_metadata_t metadata = curr_info.metadata;                      _props[index].handle_overflow(); +                    curr_info.metadata = metadata;                      UHD_MSG(fastpath) << "O";                  }                  return; | 
