// // Copyright 2017 Ettus Research // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // #include #include #include #include #include #include #include #include #include #include using namespace uhd; using namespace uhd::transport; typedef bounded_buffer bounded_buffer_t; class zero_copy_flow_ctrl_msb : public managed_send_buffer { public: zero_copy_flow_ctrl_msb( flow_ctrl_func flow_ctrl ) : _mb(nullptr), _flow_ctrl(flow_ctrl) { /* NOP */ } ~zero_copy_flow_ctrl_msb() { /* NOP */ } void release() { if (_mb) { _mb->commit(size()); while (_flow_ctrl and not _flow_ctrl(_mb)) {} _mb.reset(); } } UHD_INLINE sptr get(sptr &mb) { _mb = mb; return make(this, _mb->cast(), _mb->size()); } private: sptr _mb; flow_ctrl_func _flow_ctrl; }; class zero_copy_flow_ctrl_mrb : public managed_recv_buffer { public: zero_copy_flow_ctrl_mrb( flow_ctrl_func flow_ctrl ) : _mb(nullptr), _flow_ctrl(flow_ctrl) { /* NOP */ } ~zero_copy_flow_ctrl_mrb() { /* NOP */ } void release() { if (_mb) { _mb->commit(size()); while (_flow_ctrl and not _flow_ctrl(_mb)) {} _mb.reset(); } } UHD_INLINE sptr get(sptr &mb) { _mb = mb; return make(this, _mb->cast(), _mb->size()); } private: sptr _mb; flow_ctrl_func _flow_ctrl; }; /*********************************************************************** * Zero copy offload transport: * An intermediate transport that utilizes threading to free * the main thread from any receive work. **********************************************************************/ class zero_copy_flow_ctrl_impl : public zero_copy_flow_ctrl { public: typedef boost::shared_ptr sptr; zero_copy_flow_ctrl_impl(zero_copy_if::sptr transport, flow_ctrl_func send_flow_ctrl, flow_ctrl_func recv_flow_ctrl) : _transport(transport), _send_buffers(transport->get_num_send_frames()), _recv_buffers(transport->get_num_recv_frames()), _send_buff_index(0), _recv_buff_index(0), _send_flow_ctrl(send_flow_ctrl), _recv_flow_ctrl(recv_flow_ctrl) { UHD_LOG_TRACE("TRANSPORT", "Created zero_copy_flow_ctrl"); for (size_t i = 0; i < transport->get_num_send_frames(); i++) { _send_buffers[i] = boost::make_shared(_send_flow_ctrl); } for (size_t i = 0; i < transport->get_num_recv_frames(); i++) { _recv_buffers[i] = boost::make_shared(_recv_flow_ctrl); } } ~zero_copy_flow_ctrl_impl() { } /******************************************************************* * Receive implementation: * Pop the receive buffer pointer from the underlying transport ******************************************************************/ UHD_INLINE managed_recv_buffer::sptr get_recv_buff(double timeout) { managed_recv_buffer::sptr ptr; managed_recv_buffer::sptr buff = _transport->get_recv_buff(timeout); if (buff) { boost::shared_ptr mb = _recv_buffers[_recv_buff_index++]; _recv_buff_index %= _recv_buffers.size(); ptr = mb->get(buff); } return ptr; } UHD_INLINE size_t get_num_recv_frames() const { return _transport->get_num_recv_frames(); } UHD_INLINE size_t get_recv_frame_size() const { return _transport->get_recv_frame_size(); } /******************************************************************* * Send implementation: * Pass the send buffer pointer from the underlying transport ******************************************************************/ managed_send_buffer::sptr get_send_buff(double timeout) { managed_send_buffer::sptr ptr; managed_send_buffer::sptr buff = _transport->get_send_buff(timeout); if (buff) { boost::shared_ptr mb = _send_buffers[_send_buff_index++]; _send_buff_index %= _send_buffers.size(); ptr = mb->get(buff); } return ptr; } UHD_INLINE size_t get_num_send_frames() const { return _transport->get_num_send_frames(); } UHD_INLINE size_t get_send_frame_size() const { return _transport->get_send_frame_size(); } private: // The underlying transport zero_copy_if::sptr _transport; // buffers std::vector< boost::shared_ptr > _send_buffers; std::vector< boost::shared_ptr > _recv_buffers; size_t _send_buff_index; size_t _recv_buff_index; // Flow control functions flow_ctrl_func _send_flow_ctrl; flow_ctrl_func _recv_flow_ctrl; }; zero_copy_flow_ctrl::sptr zero_copy_flow_ctrl::make( zero_copy_if::sptr transport, flow_ctrl_func send_flow_ctrl, flow_ctrl_func recv_flow_ctrl ) { zero_copy_flow_ctrl_impl::sptr zero_copy_flow_ctrl( new zero_copy_flow_ctrl_impl(transport, send_flow_ctrl, recv_flow_ctrl) ); return zero_copy_flow_ctrl; }