diff options
-rw-r--r-- | host/lib/transport/muxed_zero_copy_if.cpp | 68 |
1 files changed, 59 insertions, 9 deletions
diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp index 996db3c98..7a2b76165 100644 --- a/host/lib/transport/muxed_zero_copy_if.cpp +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -74,7 +74,12 @@ public: if (_streams.size() >= _max_num_streams) { throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams."); } - stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num); + // Only allocate a portion of the base transport's frames to each stream + // to prevent all streams from attempting to use all the frames. + stream_impl::sptr stream = boost::make_shared<stream_impl>( + this->shared_from_this(), stream_num, + _base_xport->get_num_send_frames() / _max_num_streams, + _base_xport->get_num_recv_frames() / _max_num_streams); _streams[stream_num] = stream; return stream; } @@ -91,16 +96,55 @@ public: } private: + /* + * @class stream_mrb is used to copy the data and release the original + * managed receive buffer back to the base transport. + */ + class stream_mrb : public managed_recv_buffer + { + public: + stream_mrb(size_t size) : _buff(new char[size]) {} + + ~stream_mrb() { + delete _buff; + } + + void release() {} + + UHD_INLINE sptr get_new(char *buff, size_t len) + { + memcpy(_buff, buff, len); + return make(this, _buff, len); + } + + private: + char *_buff; + }; + class stream_impl : public zero_copy_if { public: typedef boost::shared_ptr<stream_impl> sptr; typedef boost::weak_ptr<stream_impl> wptr; - stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num): + stream_impl( + muxed_zero_copy_if_impl::sptr muxed_xport, + const uint32_t stream_num, + const size_t num_send_frames, + const size_t num_recv_frames + ) : _stream_num(stream_num), _muxed_xport(muxed_xport), - _buff_queue(muxed_xport->base_xport()->get_num_recv_frames()) + _num_send_frames(num_send_frames), + _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()), + _num_recv_frames(num_recv_frames), + _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()), + _buff_queue(num_recv_frames), + _buffers(num_recv_frames), + _buffer_index(0) { + for (size_t i = 0; i < num_recv_frames; i++) { + _buffers[i] = boost::make_shared<stream_mrb>(_recv_frame_size); + } } ~stream_impl(void) @@ -116,11 +160,11 @@ private: } size_t get_num_recv_frames(void) const { - return _muxed_xport->base_xport()->get_num_recv_frames(); + return _num_recv_frames; } size_t get_recv_frame_size(void) const { - return _muxed_xport->base_xport()->get_recv_frame_size(); + return _recv_frame_size; } managed_recv_buffer::sptr get_recv_buff(double timeout) { @@ -130,19 +174,19 @@ private: } else { return managed_recv_buffer::sptr(); } - } void push_recv_buff(managed_recv_buffer::sptr buff) { - _buff_queue.push_with_wait(buff); + _buff_queue.push_with_wait(_buffers.at(_buffer_index++)->get_new(buff->cast<char*>(), buff->size())); + _buffer_index %= _buffers.size(); } size_t get_num_send_frames(void) const { - return _muxed_xport->base_xport()->get_num_send_frames(); + return _num_send_frames; } size_t get_send_frame_size(void) const { - return _muxed_xport->base_xport()->get_send_frame_size(); + return _send_frame_size; } managed_send_buffer::sptr get_send_buff(double timeout) @@ -153,7 +197,13 @@ private: private: const uint32_t _stream_num; muxed_zero_copy_if_impl::sptr _muxed_xport; + const size_t _num_send_frames; + const size_t _send_frame_size; + const size_t _num_recv_frames; + const size_t _recv_frame_size; bounded_buffer<managed_recv_buffer::sptr> _buff_queue; + std::vector< boost::shared_ptr<stream_mrb> > _buffers; + size_t _buffer_index; }; inline zero_copy_if::sptr& base_xport() { return _base_xport; } |