diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 280 |
1 files changed, 206 insertions, 74 deletions
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index a6161ef55..cd0c89b05 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -57,6 +57,49 @@ namespace vrt_packet_handler{ /* NOP */ } + /******************************************************************* + * Unpack a received vrt header and set the copy buffer. + * - helper function for vrt_packet_handler::recv + ******************************************************************/ + static inline void _recv_helper( + recv_state &state, + uhd::rx_metadata_t &metadata, + double tick_rate, + size_t vrt_header_offset_words32 + ){ + size_t num_packet_words32 = state.managed_buff->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= vrt_header_offset_words32){ + state.copy_buff = boost::asio::buffer("", 0); + return; //must exit here after setting the buffer + } + const boost::uint32_t *vrt_hdr = state.managed_buff->cast<const boost::uint32_t *>() + vrt_header_offset_words32; + size_t num_header_words32_out, num_payload_words32_out, packet_count_out; + uhd::transport::vrt::unpack( + metadata, //output + vrt_hdr, //input + num_header_words32_out, //output + num_payload_words32_out, //output + num_packet_words32, //input + packet_count_out, //output + tick_rate + ); + + //handle the packet count / sequence number + if (packet_count_out != state.next_packet_seq){ + std::cerr << "S" << (packet_count_out - state.next_packet_seq)%16; + } + state.next_packet_seq = (packet_count_out+1)%16; + + //setup the buffer to point to the data + state.copy_buff = boost::asio::buffer( + vrt_hdr + num_header_words32_out, + num_payload_words32_out*sizeof(boost::uint32_t) + ); + } + + /******************************************************************* + * Recv vrt packets and copy convert the samples into the buffer. + ******************************************************************/ static inline size_t recv( recv_state &state, const boost::asio::mutable_buffer &buff, @@ -65,95 +108,184 @@ namespace vrt_packet_handler{ const uhd::otw_type_t &otw_type, double tick_rate, uhd::transport::zero_copy_if::sptr zc_iface, + //use these two params to handle a layer above vrt size_t vrt_header_offset_words32 = 0, const recv_cb_t& recv_cb = &recv_cb_nop ){ - //////////////////////////////////////////////////////////////// - // Perform the recv - //////////////////////////////////////////////////////////////// - { - //perform a receive if no rx data is waiting to be copied - if (boost::asio::buffer_size(state.copy_buff) == 0){ - state.fragment_offset_in_samps = 0; - state.managed_buff = zc_iface->get_recv_buff(); - recv_cb(state.managed_buff); - } - //otherwise flag the metadata to show that is is a fragment - else{ - metadata = uhd::rx_metadata_t(); - goto vrt_recv_copy_convert; - } - } + metadata = uhd::rx_metadata_t(); //init the metadata - //////////////////////////////////////////////////////////////// - // Unpack the vrt header - //////////////////////////////////////////////////////////////// - { - size_t num_packet_words32 = state.managed_buff->size()/sizeof(boost::uint32_t); - if (num_packet_words32 <= vrt_header_offset_words32){ - state.copy_buff = boost::asio::buffer("", 0); - goto vrt_recv_copy_convert; //must exit here after setting the buffer - } - const boost::uint32_t *vrt_hdr = state.managed_buff->cast<const boost::uint32_t *>() + vrt_header_offset_words32; - size_t num_header_words32_out, num_payload_words32_out, packet_count_out; - uhd::transport::vrt::unpack( - metadata, //output - vrt_hdr, //input - num_header_words32_out, //output - num_payload_words32_out, //output - num_packet_words32, //input - packet_count_out, //output - tick_rate + //perform a receive if no rx data is waiting to be copied + if (boost::asio::buffer_size(state.copy_buff) == 0){ + state.fragment_offset_in_samps = 0; + state.managed_buff = zc_iface->get_recv_buff(); + recv_cb(state.managed_buff); //callback before vrt unpack + _recv_helper( + state, metadata, tick_rate, vrt_header_offset_words32 ); + } - //handle the packet count / sequence number - if (packet_count_out != state.next_packet_seq){ - std::cerr << "S" << (packet_count_out - state.next_packet_seq)%16; - } - state.next_packet_seq = (packet_count_out+1)%16; + //extract the number of samples available to copy + size_t bytes_per_item = otw_type.get_sample_size(); + size_t bytes_available = boost::asio::buffer_size(state.copy_buff); + size_t num_samps = std::min( + boost::asio::buffer_size(buff)/io_type.size, + bytes_available/bytes_per_item + ); - //setup the buffer to point to the data - state.copy_buff = boost::asio::buffer( - vrt_hdr + num_header_words32_out, - num_payload_words32_out*sizeof(boost::uint32_t) - ); + //setup the fragment flags and offset + metadata.more_fragments = boost::asio::buffer_size(buff)/io_type.size < num_samps; + metadata.fragment_offset = state.fragment_offset_in_samps; + state.fragment_offset_in_samps += num_samps; //set for next call + + //copy-convert the samples from the recv buffer + uhd::transport::convert_otw_type_to_io_type( + boost::asio::buffer_cast<const void*>(state.copy_buff), otw_type, + boost::asio::buffer_cast<void*>(buff), io_type, + num_samps + ); + + //update the rx copy buffer to reflect the bytes copied + size_t bytes_copied = num_samps*bytes_per_item; + state.copy_buff = boost::asio::buffer( + boost::asio::buffer_cast<const boost::uint8_t*>(state.copy_buff) + bytes_copied, + bytes_available - bytes_copied + ); + + return num_samps; + } + +/*********************************************************************** + * vrt packet handler for send + **********************************************************************/ + struct send_state{ + //init the expected seq number + size_t next_packet_seq; + + send_state(void){ + next_packet_seq = 0; } + }; - //////////////////////////////////////////////////////////////// - // Perform copy-convert into buffer - //////////////////////////////////////////////////////////////// - vrt_recv_copy_convert:{ - size_t bytes_per_item = (otw_type.width * 2) / 8; - - //extract the number of samples available to copy - //and a pointer into the usrp2 received items memory - size_t bytes_available = boost::asio::buffer_size(state.copy_buff); - size_t num_samps = std::min( - boost::asio::buffer_size(buff)/io_type.size, - bytes_available/bytes_per_item - ); + typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t; - //setup the fragment flags and offset - metadata.more_fragments = boost::asio::buffer_size(buff)/io_type.size < num_samps; - metadata.fragment_offset = state.fragment_offset_in_samps; - state.fragment_offset_in_samps += num_samps; //set for next time + static inline void send_cb_nop(uhd::transport::managed_send_buffer::sptr){ + /* NOP */ + } - //copy-convert the samples from the recv buffer - uhd::transport::convert_otw_type_to_io_type( - boost::asio::buffer_cast<const void*>(state.copy_buff), otw_type, - boost::asio::buffer_cast<void*>(buff), io_type, - num_samps - ); + /******************************************************************* + * Pack a vrt header, copy-convert the data, and send it. + * - helper function for vrt_packet_handler::send + ******************************************************************/ + static inline void _send_helper( + send_state &state, + const void *send_mem, + size_t num_samps, + const uhd::tx_metadata_t &metadata, + const uhd::io_type_t &io_type, + const uhd::otw_type_t &otw_type, + double tick_rate, + uhd::transport::zero_copy_if::sptr zc_iface, + size_t vrt_header_offset_words32, + const send_cb_t& send_cb + ){ + //get a new managed send buffer + uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff(); + boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32; + + size_t num_header_words32, num_packet_words32; + size_t packet_count = state.next_packet_seq++; - //update the rx copy buffer to reflect the bytes copied - size_t bytes_copied = num_samps*bytes_per_item; - state.copy_buff = boost::asio::buffer( - boost::asio::buffer_cast<const boost::uint8_t*>(state.copy_buff) + bytes_copied, - bytes_available - bytes_copied + //pack metadata into a vrt header + uhd::transport::vrt::pack( + metadata, //input + tx_mem, //output + num_header_words32, //output + num_samps, //input + num_packet_words32, //output + packet_count, //input + tick_rate + ); + + //copy-convert the samples into the send buffer + uhd::transport::convert_io_type_to_otw_type( + send_mem, io_type, + tx_mem + num_header_words32, otw_type, + num_samps + ); + + send_cb(send_buff); //callback after memory filled + + //commit the samples to the zero-copy interface + send_buff->done(num_packet_words32*sizeof(boost::uint32_t)); + } + + /******************************************************************* + * Send vrt packets and copy convert the samples into the buffer. + ******************************************************************/ + static inline size_t send( + send_state &state, + const boost::asio::const_buffer &buff, + const uhd::tx_metadata_t &metadata, + const uhd::io_type_t &io_type, + const uhd::otw_type_t &otw_type, + double tick_rate, + uhd::transport::zero_copy_if::sptr zc_iface, + size_t max_samples_per_packet, + //use these two params to handle a layer above vrt + size_t vrt_header_offset_words32 = 0, + const send_cb_t& send_cb = &send_cb_nop + ){ + + size_t total_num_samps = boost::asio::buffer_size(buff)/io_type.size; + + //special case: no fragmentation needed + if (total_num_samps <= max_samples_per_packet){ + _send_helper( + state, + boost::asio::buffer_cast<const void *>(buff), + total_num_samps, + metadata, + io_type, otw_type, + tick_rate, + zc_iface, + vrt_header_offset_words32, + send_cb ); + return total_num_samps; + } - return num_samps; + //calculate constants for fragmentation + const size_t final_packet_samps = total_num_samps%max_samples_per_packet; + const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; + static const size_t first_fragment_index = 0; + const size_t final_fragment_index = num_fragments-1; + + //make a rw copy of the metadata to re-flag below + uhd::tx_metadata_t md(metadata); + + //loop through the following fragment indexes + for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ + + //calculate new flags for the fragments + md.has_time_spec = md.has_time_spec and (n == first_fragment_index); + md.start_of_burst = md.start_of_burst and (n == first_fragment_index); + md.end_of_burst = md.end_of_burst and (n == final_fragment_index); + + //send the fragment with the helper function + _send_helper( + state, + boost::asio::buffer_cast<const boost::uint8_t *>(buff) + (n*max_samples_per_packet*io_type.size), + (n == final_fragment_index)?final_packet_samps:max_samples_per_packet, + md, + io_type, otw_type, + tick_rate, + zc_iface, + vrt_header_offset_words32, + send_cb + ); } + + return total_num_samps; } } //namespace vrt_packet_handler |