From 3ab9a0ddeb1d053f512114ea0e5d4b4b9b358ee0 Mon Sep 17 00:00:00 2001 From: Michael West Date: Thu, 16 Nov 2017 11:49:03 -0800 Subject: rx_streamer: Release buffers no longer needed This is to allow for num_recv_frames=1 and reduce conversions from ticks to time_spec_t to improve critical path performance. --- host/lib/transport/super_recv_packet_handler.hpp | 35 ++++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'host/lib/transport/super_recv_packet_handler.hpp') diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index c962d40e6..fcb333a04 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -301,13 +301,13 @@ private: { buff.reset(); vrt_hdr = nullptr; - time = time_spec_t(0.0); + time = 0; copy_buff = nullptr; } managed_recv_buffer::sptr buff; const uint32_t *vrt_hdr; vrt::if_packet_info_t ifpi; - time_spec_t time; + uint64_t time; const char *copy_buff; }; @@ -324,7 +324,7 @@ private: void reset() { indexes_todo.set(); - alignment_time = time_spec_t(0.0); + alignment_time = 0; alignment_time_valid = false; data_bytes_to_copy = 0; fragment_offset_in_samps = 0; @@ -333,7 +333,7 @@ private: at(i).reset(); } boost::dynamic_bitset<> indexes_todo; //used in alignment logic - time_spec_t alignment_time; //used in alignment logic + uint64_t alignment_time; //used in alignment logic bool alignment_time_valid; //used in alignment logic size_t data_bytes_to_copy; //keeps track of state size_t fragment_offset_in_samps; //keeps track of state @@ -398,10 +398,11 @@ private: } //extract packet info + memset(&info.ifpi, 0, sizeof (vrt::if_packet_info_t)); info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; info.vrt_hdr = buff->cast() + _header_offset_words32; _vrt_unpacker(info.vrt_hdr, info.ifpi); - info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true + info.time = info.ifpi.tsf; //assumes has_tsf is true info.copy_buff = reinterpret_cast(info.vrt_hdr + info.ifpi.num_header_words32); //handle flow control @@ -472,6 +473,8 @@ private: for (size_t i = 0; i < _props.size(); i++) { per_buffer_info_type prev_buffer_info, curr_buffer_info; + prev_buffer_info.reset(); + curr_buffer_info.reset(); while (true) { //receive a single packet from the transport @@ -485,6 +488,7 @@ private: curr_buffer_info, timeout) == PACKET_TIMEOUT_ERROR) break; } catch(...){} + curr_buffer_info.buff.reset(); // Let my buffer go! prev_buffer_info = curr_buffer_info; curr_buffer_info.reset(); } @@ -506,6 +510,14 @@ private: info.alignment_time = info[index].time; info.indexes_todo.set(); info.indexes_todo.reset(index); + // release the other buffers + for (size_t i = 0; i < info.size(); i++) + { + if (i != index) + { + info[i].reset(); + } + } info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes; } @@ -513,6 +525,9 @@ private: // remove this index from the list and continue else if (info[index].time == info.alignment_time){ info.indexes_todo.reset(index); + } else { + // Not going to use this buffer, so release it + info[index].reset(); } //if the sequence id is older: @@ -536,6 +551,8 @@ private: buffers_info_type &curr_info = get_curr_buffer_info(); buffers_info_type &next_info = get_next_buffer_info(); + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + //Loop until we get a message of an aligned set of buffers: // - Receive a single packet and extract its info. // - Handle the packet type yielded by the receive. @@ -581,9 +598,11 @@ private: break; case PACKET_INLINE_MESSAGE: + curr_info[index].buff.reset(); // No data, so release the buffer + curr_info[index].copy_buff = nullptr; std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf; - curr_info.metadata.time_spec = next_info[index].time; + curr_info.metadata.time_spec = time_spec_t::from_ticks(next_info[index].time, _tick_rate); curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ // Not sending flow control would cause timeouts due to source flow control locking up. @@ -598,8 +617,6 @@ private: curr_info.metadata = metadata; UHD_LOG_FASTPATH("O"); } - curr_info[index].buff.reset(); - curr_info[index].copy_buff = nullptr; return; case PACKET_TIMEOUT_ERROR: @@ -640,7 +657,7 @@ private: //set the metadata from the buffer information at index zero curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf; - curr_info.metadata.time_spec = curr_info[0].time; + curr_info.metadata.time_spec = time_spec_t::from_ticks(curr_info[0].time, _tick_rate); curr_info.metadata.more_fragments = false; curr_info.metadata.fragment_offset = 0; curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob; -- cgit v1.2.3