// // Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // #ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP #define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace vrt_packet_handler{ //this may change in the future but its a constant for now static const size_t OTW_BYTES_PER_SAMP = sizeof(boost::uint32_t); template UHD_INLINE T get_context_code( const boost::uint32_t *vrt_hdr, const uhd::transport::vrt::if_packet_info_t &if_packet_info ){ //extract the context word (we dont know the endianness so mirror the bytes) boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); return T(word0 & 0xff); } /*********************************************************************** * vrt packet handler for recv **********************************************************************/ typedef std::vector managed_recv_buffs_t; typedef boost::function get_recv_buffs_t; typedef boost::function handle_overflow_t; typedef boost::function vrt_unpacker_t; static inline void handle_overflow_nop(size_t){} struct recv_state{ //width of the receiver in channels size_t width; //state variables to handle fragments managed_recv_buffs_t managed_buffs; std::vector copy_buffs; size_t size_of_copy_buffs; size_t fragment_offset_in_samps; std::vector io_buffs; std::vector otw_buffs; recv_state(size_t width = 1): width(width), managed_buffs(width), copy_buffs(width, NULL), size_of_copy_buffs(0), fragment_offset_in_samps(0), io_buffs(0) //resized later { /* NOP */ } }; /******************************************************************* * Unpack a received vrt header and set the copy buffer. * - helper function for vrt_packet_handler::_recv1 ******************************************************************/ static UHD_INLINE void _recv1_helper( recv_state &state, uhd::rx_metadata_t &metadata, double tick_rate, const vrt_unpacker_t &vrt_unpacker, const handle_overflow_t &handle_overflow, size_t vrt_header_offset_words32 ){ //vrt unpack each managed buffer uhd::transport::vrt::if_packet_info_t if_packet_info; for (size_t i = 0; i < state.width; i++){ if (state.managed_buffs[i].get() == NULL) continue; //better have a message packet coming up... //extract packet words and check thats its enough to move on size_t num_packet_words32 = state.managed_buffs[i]->size()/sizeof(boost::uint32_t); if (num_packet_words32 <= vrt_header_offset_words32){ throw std::runtime_error("recv buffer smaller than vrt packet offset"); } //unpack the vrt header into the info struct const boost::uint32_t *vrt_hdr = state.managed_buffs[i]->cast() + vrt_header_offset_words32; if_packet_info.num_packet_words32 = num_packet_words32 - vrt_header_offset_words32; vrt_unpacker(vrt_hdr, if_packet_info); //handle the non-data packet case and parse its contents if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA){ metadata.error_code = get_context_code(vrt_hdr, if_packet_info); if (metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) handle_overflow(i); //break to exit loop and store metadata below state.size_of_copy_buffs = 0; break; } //setup the buffer to point to the data state.copy_buffs[i] = reinterpret_cast(vrt_hdr + if_packet_info.num_header_words32); //store the minimum payload length into the copy buffer length size_t num_payload_bytes = if_packet_info.num_payload_words32*sizeof(boost::uint32_t); if (i == 0 or state.size_of_copy_buffs > num_payload_bytes){ state.size_of_copy_buffs = num_payload_bytes; } } //store the last vrt info into the metadata metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf; metadata.time_spec = uhd::time_spec_t( time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), tick_rate ); static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits metadata.start_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_sob_flags) == tlr_sob_flags); static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits metadata.end_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_eob_flags) == tlr_eob_flags); } /******************************************************************* * Recv data, unpack a vrt header, and copy-convert the data. * - helper function for vrt_packet_handler::recv ******************************************************************/ static UHD_INLINE size_t _recv1( recv_state &state, const uhd::device::recv_buffs_type &buffs, size_t offset_bytes, size_t total_samps, uhd::rx_metadata_t &metadata, uhd::convert::function_type &converter, double tick_rate, const vrt_unpacker_t &vrt_unpacker, const get_recv_buffs_t &get_recv_buffs, const handle_overflow_t &handle_overflow, size_t vrt_header_offset_words32, size_t chans_per_otw_buff ){ metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_NONE; //perform a receive if no rx data is waiting to be copied if (state.size_of_copy_buffs == 0){ state.fragment_offset_in_samps = 0; if (not get_recv_buffs(state.managed_buffs)){ metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_TIMEOUT; return 0; } try{ _recv1_helper( state, metadata, tick_rate, vrt_unpacker, handle_overflow, vrt_header_offset_words32 ); }catch(const std::exception &e){ state.size_of_copy_buffs = 0; //reset copy buffs size std::cerr << "Error (recv): " << e.what() << std::endl; metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET; return 0; } } //defaults for the metadata when this is a fragment else{ metadata.has_time_spec = false; metadata.start_of_burst = false; metadata.end_of_burst = false; } //extract the number of samples available to copy size_t bytes_per_item = OTW_BYTES_PER_SAMP; size_t nsamps_available = state.size_of_copy_buffs/bytes_per_item; size_t nsamps_to_copy = std::min(total_samps*chans_per_otw_buff, nsamps_available); size_t bytes_to_copy = nsamps_to_copy*bytes_per_item; size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //fill a vector with pointers to the io buffers for (size_t j = 0; j < chans_per_otw_buff; j++){ state.io_buffs[j] = reinterpret_cast(buffs[i+j]) + offset_bytes; } //copy-convert the samples from the recv buffer converter(state.copy_buffs[i], state.io_buffs, nsamps_to_copy_per_io_buff); //update the rx copy buffer to reflect the bytes copied state.copy_buffs[i] += bytes_to_copy; } //update the copy buffer's availability state.size_of_copy_buffs -= bytes_to_copy; //setup the fragment flags and offset metadata.more_fragments = state.size_of_copy_buffs != 0; metadata.fragment_offset = state.fragment_offset_in_samps; state.fragment_offset_in_samps += nsamps_to_copy; //set for next call return nsamps_to_copy_per_io_buff; } /******************************************************************* * Recv vrt packets and copy convert the samples into the buffer. ******************************************************************/ static UHD_INLINE size_t recv( recv_state &state, const uhd::device::recv_buffs_type &buffs, const size_t total_num_samps, uhd::rx_metadata_t &metadata, uhd::device::recv_mode_t recv_mode, const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, const vrt_unpacker_t &vrt_unpacker, const get_recv_buffs_t &get_recv_buffs, const handle_overflow_t &handle_overflow = &handle_overflow_nop, size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ state.io_buffs.resize(chans_per_otw_buff); uhd::convert::function_type converter( uhd::convert::get_converter_otw_to_cpu( io_type, otw_type, 1, chans_per_otw_buff )); switch(recv_mode){ //////////////////////////////////////////////////////////////// case uhd::device::RECV_MODE_ONE_PACKET:{ //////////////////////////////////////////////////////////////// return _recv1( state, buffs, 0, total_num_samps, metadata, converter, tick_rate, vrt_unpacker, get_recv_buffs, handle_overflow, vrt_header_offset_words32, chans_per_otw_buff ); } //////////////////////////////////////////////////////////////// case uhd::device::RECV_MODE_FULL_BUFF:{ //////////////////////////////////////////////////////////////// size_t accum_num_samps = 0; uhd::rx_metadata_t tmp_md; while(accum_num_samps < total_num_samps){ size_t num_samps = _recv1( state, buffs, accum_num_samps*io_type.size, total_num_samps - accum_num_samps, (accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept converter, tick_rate, vrt_unpacker, get_recv_buffs, handle_overflow, vrt_header_offset_words32, chans_per_otw_buff ); if (num_samps == 0) break; //had a recv timeout or error, break loop accum_num_samps += num_samps; } return accum_num_samps; } default: throw std::runtime_error("unknown recv mode"); }//switch(recv_mode) } /*********************************************************************** * vrt packet handler for send **********************************************************************/ typedef std::vector managed_send_buffs_t; typedef boost::function get_send_buffs_t; typedef boost::function vrt_packer_t; static const boost::uint64_t zeros = 0; struct send_state{ //init the expected seq number size_t next_packet_seq; managed_send_buffs_t managed_buffs; std::vector zero_buffs; std::vector io_buffs; send_state(size_t width = 1): next_packet_seq(0), managed_buffs(width), zero_buffs(width, &zeros), io_buffs(0) //resized later { /* NOP */ } }; /******************************************************************* * Pack a vrt header, copy-convert the data, and send it. * - helper function for vrt_packet_handler::send ******************************************************************/ static UHD_INLINE size_t _send1( send_state &state, const uhd::device::send_buffs_type &buffs, const size_t offset_bytes, const size_t num_samps, uhd::transport::vrt::if_packet_info_t &if_packet_info, uhd::convert::function_type &converter, const vrt_packer_t &vrt_packer, const get_send_buffs_t &get_send_buffs, const size_t vrt_header_offset_words32, const size_t chans_per_otw_buff ){ //load the rest of the if_packet_info in here if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t); if_packet_info.packet_count = state.next_packet_seq; //get send buffers for each otw channel if (not get_send_buffs(state.managed_buffs)) return 0; for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //calculate pointers with offsets to io and otw memory for (size_t j = 0; j < chans_per_otw_buff; j++){ state.io_buffs[j] = reinterpret_cast(buffs[i+j]) + offset_bytes; } boost::uint32_t *otw_mem = state.managed_buffs[i]->cast() + vrt_header_offset_words32; //pack metadata into a vrt header vrt_packer(otw_mem, if_packet_info); otw_mem += if_packet_info.num_header_words32; //copy-convert the samples into the send buffer converter(state.io_buffs, otw_mem, num_samps); //commit the samples to the zero-copy interface size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); state.managed_buffs[i]->commit(num_bytes_total); } state.next_packet_seq++; //increment sequence after commits return num_samps; } /******************************************************************* * Send vrt packets and copy convert the samples into the buffer. ******************************************************************/ static UHD_INLINE size_t send( send_state &state, const uhd::device::send_buffs_type &buffs, const size_t total_num_samps, const uhd::tx_metadata_t &metadata, uhd::device::send_mode_t send_mode, const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, const vrt_packer_t &vrt_packer, const get_send_buffs_t &get_send_buffs, size_t max_samples_per_packet, size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ state.io_buffs.resize(chans_per_otw_buff); uhd::convert::function_type converter( uhd::convert::get_converter_cpu_to_otw( io_type, otw_type, chans_per_otw_buff, 1 )); //translate the metadata to vrt if packet info uhd::transport::vrt::if_packet_info_t if_packet_info; if_packet_info.has_sid = false; if_packet_info.has_cid = false; if_packet_info.has_tlr = false; if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(tick_rate)); if (total_num_samps <= max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; switch(send_mode){ //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_ONE_PACKET:{ //////////////////////////////////////////////////////////////// //fill in parts of the packet info overwrote in full buff mode if_packet_info.has_tsi = metadata.has_time_spec; if_packet_info.has_tsf = metadata.has_time_spec; if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; return _send1( state, //TODO remove this code when sample counts of zero are supported by hardware (total_num_samps)?buffs : state.zero_buffs, 0, std::max(1, std::min(total_num_samps, max_samples_per_packet)), if_packet_info, converter, vrt_packer, get_send_buffs, vrt_header_offset_words32, chans_per_otw_buff ); } //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_FULL_BUFF:{ //////////////////////////////////////////////////////////////// size_t total_num_samps_sent = 0; //loop through the following fragment indexes while(total_num_samps_sent < total_num_samps){ //calculate per-loop-iteration variables const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; const bool first_fragment = (total_num_samps_sent == 0); const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet); //calculate new flags for the fragments if_packet_info.has_tsi = metadata.has_time_spec and first_fragment; if_packet_info.has_tsf = if_packet_info.has_tsi; if_packet_info.sob = metadata.start_of_burst and first_fragment; if_packet_info.eob = metadata.end_of_burst and final_fragment; //send the fragment with the helper function const size_t num_samps_sent = _send1( state, buffs, total_num_samps_sent*io_type.size, std::min(total_num_samps_unsent, max_samples_per_packet), if_packet_info, converter, vrt_packer, get_send_buffs, vrt_header_offset_words32, chans_per_otw_buff ); total_num_samps_sent += num_samps_sent; if (num_samps_sent == 0) return total_num_samps_sent; } return total_num_samps_sent; } default: throw std::runtime_error("unknown send mode"); }//switch(send_mode) } } //namespace vrt_packet_handler #endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */