diff options
author | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-18 09:45:34 -0800 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-18 17:51:35 -0800 |
commit | a53130679944ddd179593259eb953b89ab1a7a38 (patch) | |
tree | 5d8274750bed0b21aa133bc93d97d75bbce0ecd9 /host/lib/transport/tcp_zero_copy.cpp | |
parent | 2a44d6836ca08b6b67b83b63487b838e138ac379 (diff) | |
download | uhd-a53130679944ddd179593259eb953b89ab1a7a38.tar.gz uhd-a53130679944ddd179593259eb953b89ab1a7a38.tar.bz2 uhd-a53130679944ddd179593259eb953b89ab1a7a38.zip |
lib: transport: apply clang-format
This is a continuation of 967be2a4.
$ find host/lib/transport -iname *.hpp -o -iname *.cpp |\
xargs clang-format -i -style=file
Skipping host/lib/transport/nirio/ because of build errors.
$ git checkout host/lib/transport/nirio
Diffstat (limited to 'host/lib/transport/tcp_zero_copy.cpp')
-rw-r--r-- | host/lib/transport/tcp_zero_copy.cpp | 195 |
1 files changed, 112 insertions, 83 deletions
diff --git a/host/lib/transport/tcp_zero_copy.cpp b/host/lib/transport/tcp_zero_copy.cpp index a52d089a8..5cb713427 100644 --- a/host/lib/transport/tcp_zero_copy.cpp +++ b/host/lib/transport/tcp_zero_copy.cpp @@ -6,15 +6,15 @@ // #include "udp_common.hpp" -#include <uhd/transport/tcp_zero_copy.hpp> #include <uhd/transport/buffer_pool.hpp> +#include <uhd/transport/tcp_zero_copy.hpp> #include <uhd/utils/log.hpp> #include <uhdlib/utils/atomic.hpp> #include <boost/format.hpp> #include <boost/make_shared.hpp> -#include <vector> #include <chrono> #include <thread> +#include <vector> using namespace uhd; using namespace uhd::transport; @@ -27,38 +27,44 @@ static const size_t DEFAULT_FRAME_SIZE = 2048; * Reusable managed receiver buffer: * - get_new performs the recv operation **********************************************************************/ -class tcp_zero_copy_asio_mrb : public managed_recv_buffer{ +class tcp_zero_copy_asio_mrb : public managed_recv_buffer +{ public: - tcp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size): - _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } + tcp_zero_copy_asio_mrb(void* mem, int sock_fd, const size_t frame_size) + : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) + { /*NOP*/ + } - void release(void){ + void release(void) + { _claimer.release(); } - UHD_INLINE sptr get_new(const double timeout, size_t &index){ - if (not _claimer.claim_with_wait(timeout)) return sptr(); + UHD_INLINE sptr get_new(const double timeout, size_t& index) + { + if (not _claimer.claim_with_wait(timeout)) + return sptr(); - #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported - _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT); - if (_len > 0){ - index++; //advances the caller's buffer +#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported + _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT); + if (_len > 0) { + index++; // advances the caller's buffer return make(this, _mem, size_t(_len)); } - #endif +#endif - if (wait_for_recv_ready(_sock_fd, timeout)){ - _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0); - index++; //advances the caller's buffer + if (wait_for_recv_ready(_sock_fd, timeout)) { + _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); + index++; // advances the caller's buffer return make(this, _mem, size_t(_len)); } - _claimer.release(); //undo claim - return sptr(); //null for timeout + _claimer.release(); // undo claim + return sptr(); // null for timeout } private: - void *_mem; + void* _mem; int _sock_fd; size_t _frame_size; ssize_t _len; @@ -69,44 +75,50 @@ private: * Reusable managed send buffer: * - commit performs the send operation **********************************************************************/ -class tcp_zero_copy_asio_msb : public managed_send_buffer{ +class tcp_zero_copy_asio_msb : public managed_send_buffer +{ public: - tcp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): - _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } - - void release(void){ - //Retry logic because send may fail with ENOBUFS. - //This is known to occur at least on some OSX systems. - //But it should be safe to always check for the error. - while (true) - { - this->commit(_frame_size); //always full size frames to avoid pkt coalescing - const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0); - if (ret == ssize_t(size())) break; - if (ret == -1 and errno == ENOBUFS) - { + tcp_zero_copy_asio_msb(void* mem, int sock_fd, const size_t frame_size) + : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) + { /*NOP*/ + } + + void release(void) + { + // Retry logic because send may fail with ENOBUFS. + // This is known to occur at least on some OSX systems. + // But it should be safe to always check for the error. + while (true) { + this->commit(_frame_size); // always full size frames to avoid pkt coalescing + const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0); + if (ret == ssize_t(size())) + break; + if (ret == -1 and errno == ENOBUFS) { std::this_thread::sleep_for(std::chrono::microseconds(1)); - continue; //try to send again + continue; // try to send again } UHD_ASSERT_THROW(ret == ssize_t(size())); } _claimer.release(); } - UHD_INLINE sptr get_new(const double timeout, size_t &index){ - if (not _claimer.claim_with_wait(timeout)) return sptr(); - index++; //advances the caller's buffer + UHD_INLINE sptr get_new(const double timeout, size_t& index) + { + if (not _claimer.claim_with_wait(timeout)) + return sptr(); + index++; // advances the caller's buffer return make(this, _mem, _frame_size); } private: - void *_mem; + void* _mem; int _sock_fd; size_t _frame_size; simple_claimer _claimer; }; -tcp_zero_copy::~tcp_zero_copy(void){ +tcp_zero_copy::~tcp_zero_copy(void) +{ /* NOP */ } @@ -117,51 +129,53 @@ tcp_zero_copy::~tcp_zero_copy(void){ * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class tcp_zero_copy_asio_impl : public tcp_zero_copy{ +class tcp_zero_copy_asio_impl : public tcp_zero_copy +{ public: typedef boost::shared_ptr<tcp_zero_copy_asio_impl> sptr; tcp_zero_copy_asio_impl( - const std::string &addr, - const std::string &port, - const device_addr_t &hints - ): - _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_FRAME_SIZE))), - _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), - _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_FRAME_SIZE))), - _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), - _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), - _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _next_recv_buff_index(0), _next_send_buff_index(0) + const std::string& addr, const std::string& port, const device_addr_t& hints) + : _recv_frame_size( + size_t(hints.cast<double>("recv_frame_size", DEFAULT_FRAME_SIZE))) + , _num_recv_frames( + size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))) + , _send_frame_size( + size_t(hints.cast<double>("send_frame_size", DEFAULT_FRAME_SIZE))) + , _num_send_frames( + size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))) + , _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)) + , _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)) + , _next_recv_buff_index(0) + , _next_send_buff_index(0) { - UHD_LOGGER_TRACE("TCP") << boost::format("Creating tcp transport for %s %s") % addr % port ; + UHD_LOGGER_TRACE("TCP") + << boost::format("Creating tcp transport for %s %s") % addr % port; - //resolve the address + // resolve the address asio::ip::tcp::resolver resolver(_io_service); asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), addr, port); asio::ip::tcp::endpoint receiver_endpoint = *resolver.resolve(query); - //create, open, and connect the socket + // create, open, and connect the socket _socket.reset(new asio::ip::tcp::socket(_io_service)); _socket->connect(receiver_endpoint); _sock_fd = _socket->native_handle(); - //packets go out ASAP + // packets go out ASAP asio::ip::tcp::no_delay option(true); _socket->set_option(option); - //allocate re-usable managed receive buffers - for (size_t i = 0; i < get_num_recv_frames(); i++){ + // allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++) { _mrb_pool.push_back(boost::make_shared<tcp_zero_copy_asio_mrb>( - _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size() - )); + _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size())); } - //allocate re-usable managed send buffers - for (size_t i = 0; i < get_num_send_frames(); i++){ + // allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++) { _msb_pool.push_back(boost::make_shared<tcp_zero_copy_asio_msb>( - _send_buffer_pool->at(i), _sock_fd, get_send_frame_size() - )); + _send_buffer_pool->at(i), _sock_fd, get_send_frame_size())); } } @@ -169,51 +183,66 @@ public: * Receive implementation: * Block on the managed buffer's get call and advance the index. ******************************************************************/ - managed_recv_buffer::sptr get_recv_buff(double timeout){ - if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + if (_next_recv_buff_index == _num_recv_frames) + _next_recv_buff_index = 0; return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); } - size_t get_num_recv_frames(void) const {return _num_recv_frames;} - size_t get_recv_frame_size(void) const {return _recv_frame_size;} + size_t get_num_recv_frames(void) const + { + return _num_recv_frames; + } + size_t get_recv_frame_size(void) const + { + return _recv_frame_size; + } /******************************************************************* * Send implementation: * Block on the managed buffer's get call and advance the index. ******************************************************************/ - managed_send_buffer::sptr get_send_buff(double timeout){ - if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + managed_send_buffer::sptr get_send_buff(double timeout) + { + if (_next_send_buff_index == _num_send_frames) + _next_send_buff_index = 0; return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); } - size_t get_num_send_frames(void) const {return _num_send_frames;} - size_t get_send_frame_size(void) const {return _send_frame_size;} + size_t get_num_send_frames(void) const + { + return _num_send_frames; + } + size_t get_send_frame_size(void) const + { + return _send_frame_size; + } private: - //memory management -> buffers and fifos + // memory management -> buffers and fifos const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - std::vector<boost::shared_ptr<tcp_zero_copy_asio_msb> > _msb_pool; - std::vector<boost::shared_ptr<tcp_zero_copy_asio_mrb> > _mrb_pool; + std::vector<boost::shared_ptr<tcp_zero_copy_asio_msb>> _msb_pool; + std::vector<boost::shared_ptr<tcp_zero_copy_asio_mrb>> _mrb_pool; size_t _next_recv_buff_index, _next_send_buff_index; - //asio guts -> socket and service - asio::io_service _io_service; + // asio guts -> socket and service + asio::io_service _io_service; boost::shared_ptr<asio::ip::tcp::socket> _socket; - int _sock_fd; + int _sock_fd; }; /*********************************************************************** * TCP zero copy make function **********************************************************************/ zero_copy_if::sptr tcp_zero_copy::make( - const std::string &addr, - const std::string &port, - const device_addr_t &hints -){ + const std::string& addr, const std::string& port, const device_addr_t& hints) +{ zero_copy_if::sptr xport; xport.reset(new tcp_zero_copy_asio_impl(addr, port, hints)); - while (xport->get_recv_buff(0.0)){} //flush + while (xport->get_recv_buff(0.0)) { + } // flush return xport; } |