aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/tcp_zero_copy.cpp
diff options
context:
space:
mode:
authorBrent Stapleton <brent.stapleton@ettus.com>2019-01-18 09:45:34 -0800
committerBrent Stapleton <brent.stapleton@ettus.com>2019-01-18 17:51:35 -0800
commita53130679944ddd179593259eb953b89ab1a7a38 (patch)
tree5d8274750bed0b21aa133bc93d97d75bbce0ecd9 /host/lib/transport/tcp_zero_copy.cpp
parent2a44d6836ca08b6b67b83b63487b838e138ac379 (diff)
downloaduhd-a53130679944ddd179593259eb953b89ab1a7a38.tar.gz
uhd-a53130679944ddd179593259eb953b89ab1a7a38.tar.bz2
uhd-a53130679944ddd179593259eb953b89ab1a7a38.zip
lib: transport: apply clang-format
This is a continuation of 967be2a4. $ find host/lib/transport -iname *.hpp -o -iname *.cpp |\ xargs clang-format -i -style=file Skipping host/lib/transport/nirio/ because of build errors. $ git checkout host/lib/transport/nirio
Diffstat (limited to 'host/lib/transport/tcp_zero_copy.cpp')
-rw-r--r--host/lib/transport/tcp_zero_copy.cpp195
1 files changed, 112 insertions, 83 deletions
diff --git a/host/lib/transport/tcp_zero_copy.cpp b/host/lib/transport/tcp_zero_copy.cpp
index a52d089a8..5cb713427 100644
--- a/host/lib/transport/tcp_zero_copy.cpp
+++ b/host/lib/transport/tcp_zero_copy.cpp
@@ -6,15 +6,15 @@
//
#include "udp_common.hpp"
-#include <uhd/transport/tcp_zero_copy.hpp>
#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/transport/tcp_zero_copy.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/utils/atomic.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
-#include <vector>
#include <chrono>
#include <thread>
+#include <vector>
using namespace uhd;
using namespace uhd::transport;
@@ -27,38 +27,44 @@ static const size_t DEFAULT_FRAME_SIZE = 2048;
* Reusable managed receiver buffer:
* - get_new performs the recv operation
**********************************************************************/
-class tcp_zero_copy_asio_mrb : public managed_recv_buffer{
+class tcp_zero_copy_asio_mrb : public managed_recv_buffer
+{
public:
- tcp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
- _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+ tcp_zero_copy_asio_mrb(void* mem, int sock_fd, const size_t frame_size)
+ : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size)
+ { /*NOP*/
+ }
- void release(void){
+ void release(void)
+ {
_claimer.release();
}
- UHD_INLINE sptr get_new(const double timeout, size_t &index){
- if (not _claimer.claim_with_wait(timeout)) return sptr();
+ UHD_INLINE sptr get_new(const double timeout, size_t& index)
+ {
+ if (not _claimer.claim_with_wait(timeout))
+ return sptr();
- #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
- _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT);
- if (_len > 0){
- index++; //advances the caller's buffer
+#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported
+ _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT);
+ if (_len > 0) {
+ index++; // advances the caller's buffer
return make(this, _mem, size_t(_len));
}
- #endif
+#endif
- if (wait_for_recv_ready(_sock_fd, timeout)){
- _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0);
- index++; //advances the caller's buffer
+ if (wait_for_recv_ready(_sock_fd, timeout)) {
+ _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0);
+ index++; // advances the caller's buffer
return make(this, _mem, size_t(_len));
}
- _claimer.release(); //undo claim
- return sptr(); //null for timeout
+ _claimer.release(); // undo claim
+ return sptr(); // null for timeout
}
private:
- void *_mem;
+ void* _mem;
int _sock_fd;
size_t _frame_size;
ssize_t _len;
@@ -69,44 +75,50 @@ private:
* Reusable managed send buffer:
* - commit performs the send operation
**********************************************************************/
-class tcp_zero_copy_asio_msb : public managed_send_buffer{
+class tcp_zero_copy_asio_msb : public managed_send_buffer
+{
public:
- tcp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
- _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
-
- void release(void){
- //Retry logic because send may fail with ENOBUFS.
- //This is known to occur at least on some OSX systems.
- //But it should be safe to always check for the error.
- while (true)
- {
- this->commit(_frame_size); //always full size frames to avoid pkt coalescing
- const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0);
- if (ret == ssize_t(size())) break;
- if (ret == -1 and errno == ENOBUFS)
- {
+ tcp_zero_copy_asio_msb(void* mem, int sock_fd, const size_t frame_size)
+ : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size)
+ { /*NOP*/
+ }
+
+ void release(void)
+ {
+ // Retry logic because send may fail with ENOBUFS.
+ // This is known to occur at least on some OSX systems.
+ // But it should be safe to always check for the error.
+ while (true) {
+ this->commit(_frame_size); // always full size frames to avoid pkt coalescing
+ const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0);
+ if (ret == ssize_t(size()))
+ break;
+ if (ret == -1 and errno == ENOBUFS) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
- continue; //try to send again
+ continue; // try to send again
}
UHD_ASSERT_THROW(ret == ssize_t(size()));
}
_claimer.release();
}
- UHD_INLINE sptr get_new(const double timeout, size_t &index){
- if (not _claimer.claim_with_wait(timeout)) return sptr();
- index++; //advances the caller's buffer
+ UHD_INLINE sptr get_new(const double timeout, size_t& index)
+ {
+ if (not _claimer.claim_with_wait(timeout))
+ return sptr();
+ index++; // advances the caller's buffer
return make(this, _mem, _frame_size);
}
private:
- void *_mem;
+ void* _mem;
int _sock_fd;
size_t _frame_size;
simple_claimer _claimer;
};
-tcp_zero_copy::~tcp_zero_copy(void){
+tcp_zero_copy::~tcp_zero_copy(void)
+{
/* NOP */
}
@@ -117,51 +129,53 @@ tcp_zero_copy::~tcp_zero_copy(void){
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class tcp_zero_copy_asio_impl : public tcp_zero_copy{
+class tcp_zero_copy_asio_impl : public tcp_zero_copy
+{
public:
typedef boost::shared_ptr<tcp_zero_copy_asio_impl> sptr;
tcp_zero_copy_asio_impl(
- const std::string &addr,
- const std::string &port,
- const device_addr_t &hints
- ):
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_FRAME_SIZE))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_FRAME_SIZE))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
- _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
- _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _next_recv_buff_index(0), _next_send_buff_index(0)
+ const std::string& addr, const std::string& port, const device_addr_t& hints)
+ : _recv_frame_size(
+ size_t(hints.cast<double>("recv_frame_size", DEFAULT_FRAME_SIZE)))
+ , _num_recv_frames(
+ size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES)))
+ , _send_frame_size(
+ size_t(hints.cast<double>("send_frame_size", DEFAULT_FRAME_SIZE)))
+ , _num_send_frames(
+ size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES)))
+ , _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size))
+ , _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size))
+ , _next_recv_buff_index(0)
+ , _next_send_buff_index(0)
{
- UHD_LOGGER_TRACE("TCP") << boost::format("Creating tcp transport for %s %s") % addr % port ;
+ UHD_LOGGER_TRACE("TCP")
+ << boost::format("Creating tcp transport for %s %s") % addr % port;
- //resolve the address
+ // resolve the address
asio::ip::tcp::resolver resolver(_io_service);
asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), addr, port);
asio::ip::tcp::endpoint receiver_endpoint = *resolver.resolve(query);
- //create, open, and connect the socket
+ // create, open, and connect the socket
_socket.reset(new asio::ip::tcp::socket(_io_service));
_socket->connect(receiver_endpoint);
_sock_fd = _socket->native_handle();
- //packets go out ASAP
+ // packets go out ASAP
asio::ip::tcp::no_delay option(true);
_socket->set_option(option);
- //allocate re-usable managed receive buffers
- for (size_t i = 0; i < get_num_recv_frames(); i++){
+ // allocate re-usable managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++) {
_mrb_pool.push_back(boost::make_shared<tcp_zero_copy_asio_mrb>(
- _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()
- ));
+ _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()));
}
- //allocate re-usable managed send buffers
- for (size_t i = 0; i < get_num_send_frames(); i++){
+ // allocate re-usable managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++) {
_msb_pool.push_back(boost::make_shared<tcp_zero_copy_asio_msb>(
- _send_buffer_pool->at(i), _sock_fd, get_send_frame_size()
- ));
+ _send_buffer_pool->at(i), _sock_fd, get_send_frame_size()));
}
}
@@ -169,51 +183,66 @@ public:
* Receive implementation:
* Block on the managed buffer's get call and advance the index.
******************************************************************/
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ if (_next_recv_buff_index == _num_recv_frames)
+ _next_recv_buff_index = 0;
return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
}
- size_t get_num_recv_frames(void) const {return _num_recv_frames;}
- size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+ size_t get_num_recv_frames(void) const
+ {
+ return _num_recv_frames;
+ }
+ size_t get_recv_frame_size(void) const
+ {
+ return _recv_frame_size;
+ }
/*******************************************************************
* Send implementation:
* Block on the managed buffer's get call and advance the index.
******************************************************************/
- managed_send_buffer::sptr get_send_buff(double timeout){
- if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ if (_next_send_buff_index == _num_send_frames)
+ _next_send_buff_index = 0;
return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
}
- size_t get_num_send_frames(void) const {return _num_send_frames;}
- size_t get_send_frame_size(void) const {return _send_frame_size;}
+ size_t get_num_send_frames(void) const
+ {
+ return _num_send_frames;
+ }
+ size_t get_send_frame_size(void) const
+ {
+ return _send_frame_size;
+ }
private:
- //memory management -> buffers and fifos
+ // memory management -> buffers and fifos
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- std::vector<boost::shared_ptr<tcp_zero_copy_asio_msb> > _msb_pool;
- std::vector<boost::shared_ptr<tcp_zero_copy_asio_mrb> > _mrb_pool;
+ std::vector<boost::shared_ptr<tcp_zero_copy_asio_msb>> _msb_pool;
+ std::vector<boost::shared_ptr<tcp_zero_copy_asio_mrb>> _mrb_pool;
size_t _next_recv_buff_index, _next_send_buff_index;
- //asio guts -> socket and service
- asio::io_service _io_service;
+ // asio guts -> socket and service
+ asio::io_service _io_service;
boost::shared_ptr<asio::ip::tcp::socket> _socket;
- int _sock_fd;
+ int _sock_fd;
};
/***********************************************************************
* TCP zero copy make function
**********************************************************************/
zero_copy_if::sptr tcp_zero_copy::make(
- const std::string &addr,
- const std::string &port,
- const device_addr_t &hints
-){
+ const std::string& addr, const std::string& port, const device_addr_t& hints)
+{
zero_copy_if::sptr xport;
xport.reset(new tcp_zero_copy_asio_impl(addr, port, hints));
- while (xport->get_recv_buff(0.0)){} //flush
+ while (xport->get_recv_buff(0.0)) {
+ } // flush
return xport;
}