aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/transport/muxed_zero_copy_if.cpp68
1 files changed, 59 insertions, 9 deletions
diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp
index 996db3c98..7a2b76165 100644
--- a/host/lib/transport/muxed_zero_copy_if.cpp
+++ b/host/lib/transport/muxed_zero_copy_if.cpp
@@ -74,7 +74,12 @@ public:
if (_streams.size() >= _max_num_streams) {
throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams.");
}
- stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num);
+ // Only allocate a portion of the base transport's frames to each stream
+ // to prevent all streams from attempting to use all the frames.
+ stream_impl::sptr stream = boost::make_shared<stream_impl>(
+ this->shared_from_this(), stream_num,
+ _base_xport->get_num_send_frames() / _max_num_streams,
+ _base_xport->get_num_recv_frames() / _max_num_streams);
_streams[stream_num] = stream;
return stream;
}
@@ -91,16 +96,55 @@ public:
}
private:
+ /*
+ * @class stream_mrb is used to copy the data and release the original
+ * managed receive buffer back to the base transport.
+ */
+ class stream_mrb : public managed_recv_buffer
+ {
+ public:
+ stream_mrb(size_t size) : _buff(new char[size]) {}
+
+ ~stream_mrb() {
+ delete _buff;
+ }
+
+ void release() {}
+
+ UHD_INLINE sptr get_new(char *buff, size_t len)
+ {
+ memcpy(_buff, buff, len);
+ return make(this, _buff, len);
+ }
+
+ private:
+ char *_buff;
+ };
+
class stream_impl : public zero_copy_if
{
public:
typedef boost::shared_ptr<stream_impl> sptr;
typedef boost::weak_ptr<stream_impl> wptr;
- stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num):
+ stream_impl(
+ muxed_zero_copy_if_impl::sptr muxed_xport,
+ const uint32_t stream_num,
+ const size_t num_send_frames,
+ const size_t num_recv_frames
+ ) :
_stream_num(stream_num), _muxed_xport(muxed_xport),
- _buff_queue(muxed_xport->base_xport()->get_num_recv_frames())
+ _num_send_frames(num_send_frames),
+ _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()),
+ _num_recv_frames(num_recv_frames),
+ _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()),
+ _buff_queue(num_recv_frames),
+ _buffers(num_recv_frames),
+ _buffer_index(0)
{
+ for (size_t i = 0; i < num_recv_frames; i++) {
+ _buffers[i] = boost::make_shared<stream_mrb>(_recv_frame_size);
+ }
}
~stream_impl(void)
@@ -116,11 +160,11 @@ private:
}
size_t get_num_recv_frames(void) const {
- return _muxed_xport->base_xport()->get_num_recv_frames();
+ return _num_recv_frames;
}
size_t get_recv_frame_size(void) const {
- return _muxed_xport->base_xport()->get_recv_frame_size();
+ return _recv_frame_size;
}
managed_recv_buffer::sptr get_recv_buff(double timeout) {
@@ -130,19 +174,19 @@ private:
} else {
return managed_recv_buffer::sptr();
}
-
}
void push_recv_buff(managed_recv_buffer::sptr buff) {
- _buff_queue.push_with_wait(buff);
+ _buff_queue.push_with_wait(_buffers.at(_buffer_index++)->get_new(buff->cast<char*>(), buff->size()));
+ _buffer_index %= _buffers.size();
}
size_t get_num_send_frames(void) const {
- return _muxed_xport->base_xport()->get_num_send_frames();
+ return _num_send_frames;
}
size_t get_send_frame_size(void) const {
- return _muxed_xport->base_xport()->get_send_frame_size();
+ return _send_frame_size;
}
managed_send_buffer::sptr get_send_buff(double timeout)
@@ -153,7 +197,13 @@ private:
private:
const uint32_t _stream_num;
muxed_zero_copy_if_impl::sptr _muxed_xport;
+ const size_t _num_send_frames;
+ const size_t _send_frame_size;
+ const size_t _num_recv_frames;
+ const size_t _recv_frame_size;
bounded_buffer<managed_recv_buffer::sptr> _buff_queue;
+ std::vector< boost::shared_ptr<stream_mrb> > _buffers;
+ size_t _buffer_index;
};
inline zero_copy_if::sptr& base_xport() { return _base_xport; }