//
// Copyright 2010 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace vrt_packet_handler{
/***********************************************************************
* vrt packet handler for recv
**********************************************************************/
struct recv_state{
//init the expected seq number
size_t next_packet_seq;
//state variables to handle fragments
uhd::transport::managed_recv_buffer::sptr managed_buff;
boost::asio::const_buffer copy_buff;
size_t fragment_offset_in_samps;
recv_state(void){
//first expected seq is zero
next_packet_seq = 0;
//initially empty copy buffer
copy_buff = boost::asio::buffer("", 0);
}
};
typedef boost::function get_recv_buff_t;
typedef boost::function recv_cb_t;
static UHD_INLINE void recv_cb_nop(uhd::transport::managed_recv_buffer::sptr){
/* NOP */
}
/*******************************************************************
* Unpack a received vrt header and set the copy buffer.
* - helper function for vrt_packet_handler::_recv1
******************************************************************/
template
static UHD_INLINE void _recv1_helper(
recv_state &state,
uhd::rx_metadata_t &metadata,
double tick_rate,
vrt_unpacker_type vrt_unpacker,
size_t vrt_header_offset_words32
){
size_t num_packet_words32 = state.managed_buff->size()/sizeof(boost::uint32_t);
if (num_packet_words32 <= vrt_header_offset_words32){
state.copy_buff = boost::asio::buffer("", 0);
return; //must exit here after setting the buffer
}
const boost::uint32_t *vrt_hdr = state.managed_buff->cast() + vrt_header_offset_words32;
size_t num_header_words32_out, num_payload_words32_out, packet_count_out;
vrt_unpacker(
metadata, //output
vrt_hdr, //input
num_header_words32_out, //output
num_payload_words32_out, //output
num_packet_words32, //input
packet_count_out, //output
tick_rate
);
//handle the packet count / sequence number
if (packet_count_out != state.next_packet_seq){
std::cerr << "S" << (packet_count_out - state.next_packet_seq)%16;
}
state.next_packet_seq = (packet_count_out+1)%16;
//setup the buffer to point to the data
state.copy_buff = boost::asio::buffer(
vrt_hdr + num_header_words32_out,
num_payload_words32_out*sizeof(boost::uint32_t)
);
}
/*******************************************************************
* Recv data, unpack a vrt header, and copy-convert the data.
* - helper function for vrt_packet_handler::recv
******************************************************************/
template
static UHD_INLINE size_t _recv1(
recv_state &state,
void *recv_mem,
size_t total_samps,
uhd::rx_metadata_t &metadata,
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
vrt_unpacker_type vrt_unpacker,
const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32,
const recv_cb_t &recv_cb
){
//perform a receive if no rx data is waiting to be copied
if (boost::asio::buffer_size(state.copy_buff) == 0){
state.fragment_offset_in_samps = 0;
state.managed_buff = get_recv_buff();
if (state.managed_buff.get() == NULL) return 0;
recv_cb(state.managed_buff); //callback before vrt unpack
try{
_recv1_helper(
state, metadata, tick_rate, vrt_unpacker, vrt_header_offset_words32
);
}catch(const std::exception &e){
std::cerr << "Error (recv): " << e.what() << std::endl;
return 0;
}
}
//extract the number of samples available to copy
size_t bytes_per_item = otw_type.get_sample_size();
size_t bytes_available = boost::asio::buffer_size(state.copy_buff);
size_t num_samps = std::min(total_samps, bytes_available/bytes_per_item);
//setup the fragment flags and offset
metadata.more_fragments = total_samps < num_samps;
metadata.fragment_offset = state.fragment_offset_in_samps;
state.fragment_offset_in_samps += num_samps; //set for next call
//copy-convert the samples from the recv buffer
uhd::transport::convert_otw_type_to_io_type(
boost::asio::buffer_cast(state.copy_buff), otw_type,
recv_mem, io_type, num_samps
);
//update the rx copy buffer to reflect the bytes copied
size_t bytes_copied = num_samps*bytes_per_item;
state.copy_buff = boost::asio::buffer(
boost::asio::buffer_cast(state.copy_buff) + bytes_copied,
bytes_available - bytes_copied
);
return num_samps;
}
/*******************************************************************
* Recv vrt packets and copy convert the samples into the buffer.
******************************************************************/
template
static UHD_INLINE size_t recv(
recv_state &state,
const boost::asio::mutable_buffer &buff,
uhd::rx_metadata_t &metadata,
uhd::device::recv_mode_t recv_mode,
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
vrt_unpacker_type vrt_unpacker,
const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
const recv_cb_t& recv_cb = &recv_cb_nop
){
metadata = uhd::rx_metadata_t(); //init the metadata
const size_t total_num_samps = boost::asio::buffer_size(buff)/io_type.size;
switch(recv_mode){
////////////////////////////////////////////////////////////////
case uhd::device::RECV_MODE_ONE_PACKET:{
////////////////////////////////////////////////////////////////
return _recv1(
state,
boost::asio::buffer_cast(buff),
total_num_samps,
metadata,
io_type, otw_type,
tick_rate,
vrt_unpacker,
get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
}
////////////////////////////////////////////////////////////////
case uhd::device::RECV_MODE_FULL_BUFF:{
////////////////////////////////////////////////////////////////
size_t accum_num_samps = 0;
uhd::rx_metadata_t tmp_md;
while(accum_num_samps < total_num_samps){
size_t num_samps = _recv1(
state,
boost::asio::buffer_cast(buff) + (accum_num_samps*io_type.size),
total_num_samps - accum_num_samps,
(accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept
io_type, otw_type,
tick_rate,
vrt_unpacker,
get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
if (num_samps == 0) break; //had a recv timeout or error, break loop
accum_num_samps += num_samps;
}
return accum_num_samps;
}
default: throw std::runtime_error("unknown recv mode");
}//switch(recv_mode)
}
/***********************************************************************
* vrt packet handler for send
**********************************************************************/
struct send_state{
//init the expected seq number
size_t next_packet_seq;
send_state(void){
next_packet_seq = 0;
}
};
typedef boost::function get_send_buff_t;
typedef boost::function send_cb_t;
static UHD_INLINE void send_cb_nop(uhd::transport::managed_send_buffer::sptr){
/* NOP */
}
/*******************************************************************
* Pack a vrt header, copy-convert the data, and send it.
* - helper function for vrt_packet_handler::send
******************************************************************/
template
static UHD_INLINE void _send1(
send_state &state,
const void *send_mem,
size_t num_samps,
const uhd::tx_metadata_t &metadata,
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
vrt_packer_type vrt_packer,
const get_send_buff_t &get_send_buff,
size_t vrt_header_offset_words32,
const send_cb_t& send_cb
){
//get a new managed send buffer
uhd::transport::managed_send_buffer::sptr send_buff = get_send_buff();
boost::uint32_t *tx_mem = send_buff->cast() + vrt_header_offset_words32;
size_t num_header_words32, num_packet_words32;
size_t packet_count = state.next_packet_seq++;
//pack metadata into a vrt header
vrt_packer(
metadata, //input
tx_mem, //output
num_header_words32, //output
num_samps, //input
num_packet_words32, //output
packet_count, //input
tick_rate
);
//copy-convert the samples into the send buffer
uhd::transport::convert_io_type_to_otw_type(
send_mem, io_type,
tx_mem + num_header_words32, otw_type,
num_samps
);
send_cb(send_buff); //callback after memory filled
//commit the samples to the zero-copy interface
send_buff->commit(num_packet_words32*sizeof(boost::uint32_t));
}
/*******************************************************************
* Send vrt packets and copy convert the samples into the buffer.
******************************************************************/
template
static UHD_INLINE size_t send(
send_state &state,
const boost::asio::const_buffer &buff,
const uhd::tx_metadata_t &metadata,
uhd::device::send_mode_t send_mode,
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
vrt_packer_type vrt_packer,
const get_send_buff_t &get_send_buff,
size_t max_samples_per_packet,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
const send_cb_t& send_cb = &send_cb_nop
){
const size_t total_num_samps = boost::asio::buffer_size(buff)/io_type.size;
if (total_num_samps <= max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET;
switch(send_mode){
////////////////////////////////////////////////////////////////
case uhd::device::SEND_MODE_ONE_PACKET:{
////////////////////////////////////////////////////////////////
size_t num_samps = std::min(total_num_samps, max_samples_per_packet);
_send1(
state,
boost::asio::buffer_cast(buff),
num_samps,
metadata,
io_type, otw_type,
tick_rate,
vrt_packer,
get_send_buff,
vrt_header_offset_words32,
send_cb
);
return num_samps;
}
////////////////////////////////////////////////////////////////
case uhd::device::SEND_MODE_FULL_BUFF:{
////////////////////////////////////////////////////////////////
//calculate constants for fragmentation
const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet;
static const size_t first_fragment_index = 0;
const size_t final_fragment_index = num_fragments-1;
//make a rw copy of the metadata to re-flag below
uhd::tx_metadata_t md(metadata);
//loop through the following fragment indexes
for (size_t n = first_fragment_index; n <= final_fragment_index; n++){
//calculate new flags for the fragments
md.has_time_spec = metadata.has_time_spec and (n == first_fragment_index);
md.start_of_burst = metadata.start_of_burst and (n == first_fragment_index);
md.end_of_burst = metadata.end_of_burst and (n == final_fragment_index);
//send the fragment with the helper function
_send1(
state,
boost::asio::buffer_cast(buff) + (n*max_samples_per_packet*io_type.size),
(n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet,
md,
io_type, otw_type,
tick_rate,
vrt_packer,
get_send_buff,
vrt_header_offset_words32,
send_cb
);
}
return total_num_samps;
}
default: throw std::runtime_error("unknown send mode");
}//switch(send_mode)
}
} //namespace vrt_packet_handler
#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */