aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/device3/device3_io_impl.cpp
diff options
context:
space:
mode:
authorMichael West <michael.west@ettus.com>2017-11-16 11:59:45 -0800
committerMartin Braun <martin.braun@ettus.com>2018-07-25 15:34:03 -0700
commitcb9c97d643ac51279e61439c4e7caae9b1212c7d (patch)
treec6f7ff98dae9b979a75a924aab3c843963d7fd0c /host/lib/usrp/device3/device3_io_impl.cpp
parent7ed7b207735fee5f7bd055472e591935b5f96cf5 (diff)
downloaduhd-cb9c97d643ac51279e61439c4e7caae9b1212c7d.tar.gz
uhd-cb9c97d643ac51279e61439c4e7caae9b1212c7d.tar.bz2
uhd-cb9c97d643ac51279e61439c4e7caae9b1212c7d.zip
X300: Change Ethernet buffering
Ethernet buffering is now done so that most of the buffering is done in the socket buffers and multiple frames are only used to support the receive side offload of the socket I/O. Eliminates dropped packets at high full duplex rates.
Diffstat (limited to 'host/lib/usrp/device3/device3_io_impl.cpp')
-rw-r--r--host/lib/usrp/device3/device3_io_impl.cpp163
1 files changed, 83 insertions, 80 deletions
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp
index e4c38aca6..490f10c3c 100644
--- a/host/lib/usrp/device3/device3_io_impl.cpp
+++ b/host/lib/usrp/device3/device3_io_impl.cpp
@@ -158,7 +158,10 @@ struct rx_fc_cache_t
uint64_t seq_num;
sid_t sid;
zero_copy_if::sptr xport;
- endianness_t endianness;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack;
+ std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack;
};
/*! Determine the size of the flow control window in number of packets.
@@ -218,8 +221,8 @@ static size_t get_rx_flow_control_window(
* skip the counter update.
*/
static bool rx_flow_ctrl(
- boost::shared_ptr<rx_fc_cache_t> fc_cache,
- managed_buffer::sptr buff
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ managed_buffer::sptr buff
) {
// If the caller supplied a buffer
if (buff)
@@ -229,17 +232,12 @@ static bool rx_flow_ctrl(
packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
const uint32_t *pkt = buff->cast<const uint32_t *>();
try {
- if (fc_cache->endianness == ENDIANNESS_BIG)
- {
- vrt::chdr::if_hdr_unpack_be(pkt, packet_info);
- } else {
- vrt::chdr::if_hdr_unpack_le(pkt, packet_info);
- }
+ fc_cache->unpack(pkt, packet_info);
}
catch(const std::exception &ex)
{
// Log and ignore
- UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl;
+ UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl;
return true;
}
@@ -282,23 +280,13 @@ static bool rx_flow_ctrl(
packet_info.has_tsf = false;
packet_info.has_tlr = false;
- if (fc_cache->endianness == ENDIANNESS_BIG) {
- // Load Header:
- vrt::chdr::if_hdr_pack_be(pkt, packet_info);
- // Load Payload: Packet count, and byte count
- pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
- uhd::htonx<uint32_t>(fc_cache->total_packets_consumed);
- pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
- uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed);
- } else {
- // Load Header:
- vrt::chdr::if_hdr_pack_le(pkt, packet_info);
- // Load Payload: Packet count, and byte count
- pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
- uhd::htowx<uint32_t>(fc_cache->total_packets_consumed);
- pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
- uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed);
- }
+ // Load Header:
+ fc_cache->pack(pkt, packet_info);
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->total_packets_consumed);
+ pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->total_bytes_consumed);
//send the buffer over the interface
fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
@@ -313,15 +301,11 @@ static bool rx_flow_ctrl(
*
*/
static void handle_rx_flowctrl_ack(
- boost::shared_ptr<rx_fc_cache_t> fc_cache,
- const uint32_t *payload
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ const uint32_t *payload
) {
- const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ?
- uhd::ntohx<uint32_t>(payload[0]) :
- uhd::wtohx<uint32_t>(payload[0]);
- const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ?
- uhd::ntohx<uint32_t>(payload[1]) :
- uhd::wtohx<uint32_t>(payload[1]);
+ const uint32_t pkt_count = fc_cache->to_host(payload[0]);
+ const uint32_t byte_count = fc_cache->to_host(payload[1]);
if (fc_cache->total_bytes_consumed != byte_count)
{
UHD_LOGGER_DEBUG("device3")
@@ -362,13 +346,15 @@ struct tx_fc_cache_t
uint32_t window_size;
uint32_t fc_ack_seqnum;
bool fc_received;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack;
+ std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack;
};
static bool tx_flow_ctrl(
boost::shared_ptr<tx_fc_cache_t> fc_cache,
- zero_copy_if::sptr xport,
- uint32_t (*to_host)(uint32_t),
- void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &),
+ zero_copy_if::sptr xport,
managed_buffer::sptr buff
) {
while (true)
@@ -395,7 +381,7 @@ static bool tx_flow_ctrl(
if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
const uint32_t *packet_buff = buff->cast<const uint32_t *>();
try {
- unpack(packet_buff, if_packet_info);
+ fc_cache->unpack(packet_buff, if_packet_info);
}
catch(const std::exception &ex)
{
@@ -410,8 +396,8 @@ static bool tx_flow_ctrl(
}
const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32];
- const uint32_t pkt_count = to_host(payload[0]);
- const uint32_t byte_count = to_host(payload[1]);
+ const uint32_t pkt_count = fc_cache->to_host(payload[0]);
+ const uint32_t byte_count = fc_cache->to_host(payload[1]);
// update the amount of space
fc_cache->last_byte_ack = byte_count;
@@ -426,9 +412,7 @@ static bool tx_flow_ctrl(
static void tx_flow_ctrl_ack(
boost::shared_ptr<tx_fc_cache_t> fc_cache,
zero_copy_if::sptr send_xport,
- sid_t send_sid,
- uint32_t (*from_host)(uint32_t),
- void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &)
+ sid_t send_sid
) {
if (not fc_cache->fc_received)
{
@@ -462,7 +446,7 @@ static void tx_flow_ctrl_ack(
packet_info.has_tlr = false;
// Load Header:
- pack(pkt, packet_info);
+ fc_cache->pack(pkt, packet_info);
// Update counters to include this packet
size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32);
@@ -476,9 +460,9 @@ static void tx_flow_ctrl_ack(
// Load Payload: Packet count, and byte count
pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
- from_host(fc_cache->pkt_count);
+ fc_cache->from_host(fc_cache->pkt_count);
pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
- from_host(fc_cache->byte_count);
+ fc_cache->from_host(fc_cache->byte_count);
// Send the buffer over the interface
fc_buff->commit(fc_ack_pkt_size);
@@ -506,7 +490,8 @@ struct async_tx_info_t
static void handle_tx_async_msgs(
boost::shared_ptr<async_tx_info_t> async_info,
zero_copy_if::sptr xport,
- endianness_t endianness,
+ uint32_t (*to_host)(uint32_t),
+ void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &),
boost::function<double(void)> get_tick_rate
) {
managed_recv_buffer::sptr buff = xport->get_recv_buff();
@@ -521,19 +506,9 @@ static void handle_tx_async_msgs(
const uint32_t *packet_buff = buff->cast<const uint32_t *>();
//unpacking can fail
- uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
try
{
- if (endianness == ENDIANNESS_BIG)
- {
- vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info);
- endian_conv = uhd::ntohx;
- }
- else
- {
- vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info);
- endian_conv = uhd::wtohx;
- }
+ unpack(packet_buff, if_packet_info);
}
catch(const std::exception &ex)
{
@@ -549,7 +524,7 @@ static void handle_tx_async_msgs(
//fill in the async metadata
async_metadata_t metadata;
load_metadata_from_buff(
- endian_conv,
+ to_host,
metadata,
if_packet_info,
packet_buff,
@@ -681,14 +656,29 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
fc_cache->sid = xport.send_sid;
fc_cache->xport = xport.send;
- fc_cache->endianness = xport.endianness;
fc_cache->interval = fc_handle_window;
+ if (xport.endianness == ENDIANNESS_BIG)
+ {
+ fc_cache->to_host = uhd::ntohx<uint32_t>;
+ fc_cache->from_host = uhd::htonx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_be;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
+ }
+ else
+ {
+ fc_cache->to_host = uhd::wtohx<uint32_t>;
+ fc_cache->from_host = uhd::htowx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_le;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_le;
+ }
xport.recv = zero_copy_flow_ctrl::make
(
xport.recv,
NULL,
- [=](managed_buffer::sptr buff) {
- return rx_flow_ctrl(fc_cache, buff);
+ [fc_cache](managed_buffer::sptr buff) {
+ return rx_flow_ctrl(
+ fc_cache,
+ buff);
}
);
@@ -753,7 +743,7 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
// Give the streamer a functor to handle flow control ACK messages
my_streamer->set_xport_handle_flowctrl_ack(
stream_i,
- [=](const uint32_t *payload) {
+ [fc_cache](const uint32_t *payload) {
handle_rx_flowctrl_ack(
fc_cache,
payload
@@ -764,7 +754,9 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
//Give the streamer a functor to get the recv_buffer
my_streamer->set_xport_chan_get_buff(
stream_i,
- [=](double timeout) {return xport.recv->get_recv_buff(timeout);},
+ [xport](double timeout) {
+ return xport.recv->get_recv_buff(timeout);
+ },
true /*flush*/
);
@@ -774,7 +766,7 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer);
my_streamer->set_overflow_handler(
stream_i,
- [=]() {
+ [recv_terminator, weak_ptr, stream_i]() {
recv_terminator->handle_overrun(
weak_ptr,
stream_i);
@@ -784,7 +776,9 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
//Give the streamer a functor issue stream cmd
my_streamer->set_issue_stream_cmd(
stream_i,
- [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);}
+ [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) {
+ blk_ctrl->issue_stream_cmd(stream_cmd, block_port);
+ }
);
}
@@ -913,14 +907,24 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
);
// Add flow control transport
boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
+ if (xport.endianness == ENDIANNESS_BIG)
+ {
+ fc_cache->to_host = uhd::ntohx<uint32_t>;
+ fc_cache->from_host = uhd::htonx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_be;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
+ } else {
+ fc_cache->to_host = uhd::wtohx<uint32_t>;
+ fc_cache->from_host = uhd::htowx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_le;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_le;
+ }
xport.send = zero_copy_flow_ctrl::make(
xport.send,
- [=](managed_buffer::sptr buff) {
+ [fc_cache, xport](managed_buffer::sptr buff) {
return tx_flow_ctrl(
fc_cache,
xport.recv,
- (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>),
- (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le),
buff);
},
NULL
@@ -1004,12 +1008,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
async_tx_info->old_async_queue = _async_md;
task::sptr async_task = task::make(
- [=]() {
+ [async_tx_info, async_xport, xport, send_terminator]() {
handle_tx_async_msgs(
async_tx_info,
async_xport.recv,
- xport.endianness,
- [=]() {return send_terminator->get_tick_rate();}
+ xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>,
+ xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le,
+ [send_terminator]() {return send_terminator->get_tick_rate();}
);
}
);
@@ -1018,13 +1023,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
//Give the streamer a functor to get the send buffer
my_streamer->set_xport_chan_get_buff(
stream_i,
- [=](const double timeout) {
+ [xport](const double timeout) {
return xport.send->get_send_buff(timeout);
}
);
//Give the streamer a functor handled received async messages
my_streamer->set_async_receiver(
- [=](uhd::async_metadata_t& md, const double timeout) {
+ [async_md](uhd::async_metadata_t& md, const double timeout) {
return async_md->pop_with_timed_wait(md, timeout);
}
);
@@ -1034,15 +1039,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
my_streamer->set_xport_chan_post_send_cb(
stream_i,
- [=]() {
+ [fc_cache, xport]() {
tx_flow_ctrl_ack(
fc_cache,
xport.send,
- xport.send_sid,
- (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>),
- (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le)
+ xport.send_sid
);
- }
+ }
);
}