diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 35 | 
1 files changed, 26 insertions, 9 deletions
| diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index c962d40e6..fcb333a04 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -301,13 +301,13 @@ private:          {              buff.reset();              vrt_hdr = nullptr; -            time = time_spec_t(0.0); +            time = 0;              copy_buff = nullptr;          }          managed_recv_buffer::sptr buff;          const uint32_t *vrt_hdr;          vrt::if_packet_info_t ifpi; -        time_spec_t time; +        uint64_t time;          const char *copy_buff;      }; @@ -324,7 +324,7 @@ private:          void reset()          {              indexes_todo.set(); -            alignment_time = time_spec_t(0.0); +            alignment_time = 0;              alignment_time_valid = false;              data_bytes_to_copy = 0;              fragment_offset_in_samps = 0; @@ -333,7 +333,7 @@ private:                  at(i).reset();          }          boost::dynamic_bitset<> indexes_todo; //used in alignment logic -        time_spec_t alignment_time; //used in alignment logic +        uint64_t alignment_time; //used in alignment logic          bool alignment_time_valid; //used in alignment logic          size_t data_bytes_to_copy; //keeps track of state          size_t fragment_offset_in_samps; //keeps track of state @@ -398,10 +398,11 @@ private:              }              //extract packet info +            memset(&info.ifpi, 0, sizeof (vrt::if_packet_info_t));              info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;              info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;              _vrt_unpacker(info.vrt_hdr, info.ifpi); -            info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true +            info.time = info.ifpi.tsf; //assumes has_tsf is true              info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);              //handle flow control @@ -472,6 +473,8 @@ private:          for (size_t i = 0; i < _props.size(); i++)          {              per_buffer_info_type prev_buffer_info, curr_buffer_info; +            prev_buffer_info.reset(); +            curr_buffer_info.reset();              while (true)              {                  //receive a single packet from the transport @@ -485,6 +488,7 @@ private:                              curr_buffer_info,                              timeout) == PACKET_TIMEOUT_ERROR) break;                  } catch(...){} +                curr_buffer_info.buff.reset();  // Let my buffer go!                  prev_buffer_info = curr_buffer_info;                  curr_buffer_info.reset();              } @@ -506,6 +510,14 @@ private:              info.alignment_time = info[index].time;              info.indexes_todo.set();              info.indexes_todo.reset(index); +            // release the other buffers +            for (size_t i = 0; i < info.size(); i++) +            { +                if (i != index) +                { +                    info[i].reset(); +                } +            }              info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes;          } @@ -513,6 +525,9 @@ private:          //  remove this index from the list and continue          else if (info[index].time == info.alignment_time){              info.indexes_todo.reset(index); +        } else { +            // Not going to use this buffer, so release it +            info[index].reset();          }          //if the sequence id is older: @@ -536,6 +551,8 @@ private:          buffers_info_type &curr_info = get_curr_buffer_info();          buffers_info_type &next_info = get_next_buffer_info(); +        curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; +          //Loop until we get a message of an aligned set of buffers:          // - Receive a single packet and extract its info.          // - Handle the packet type yielded by the receive. @@ -581,9 +598,11 @@ private:                  break;              case PACKET_INLINE_MESSAGE: +                curr_info[index].buff.reset();  // No data, so release the buffer +                curr_info[index].copy_buff = nullptr;                  std::swap(curr_info, next_info); //save progress from curr -> next                  curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf; -                curr_info.metadata.time_spec = next_info[index].time; +                curr_info.metadata.time_spec = time_spec_t::from_ticks(next_info[index].time, _tick_rate);                  curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));                  if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){                      // Not sending flow control would cause timeouts due to source flow control locking up. @@ -598,8 +617,6 @@ private:                      curr_info.metadata = metadata;                      UHD_LOG_FASTPATH("O");                  } -                curr_info[index].buff.reset(); -                curr_info[index].copy_buff = nullptr;                  return;              case PACKET_TIMEOUT_ERROR: @@ -640,7 +657,7 @@ private:          //set the metadata from the buffer information at index zero          curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf; -        curr_info.metadata.time_spec = curr_info[0].time; +        curr_info.metadata.time_spec = time_spec_t::from_ticks(curr_info[0].time, _tick_rate);          curr_info.metadata.more_fragments = false;          curr_info.metadata.fragment_offset = 0;          curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob; | 
