From 794a74cced070402f0e03d9978e8e735b3eb4324 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 14 Jun 2011 09:19:30 -0700 Subject: udp: replaced callbacks in zero copy interface by giving direct queue access The managed receive buffer knows how to restore itself into the queue when released. The managed send buffer knows how to ::send itself and restore when commited. --- host/lib/transport/udp_zero_copy.cpp | 57 ++++++++++++------------------------ 1 file changed, 19 insertions(+), 38 deletions(-) (limited to 'host/lib') diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 8c97e1e99..bc73b96a8 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -39,14 +39,12 @@ static const size_t DEFAULT_NUM_FRAMES = 32; **********************************************************************/ class udp_zero_copy_asio_mrb : public managed_recv_buffer{ public: - typedef boost::function release_cb_type; - - udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): - _mem(mem), _len(0), _release_cb(release_cb){/* NOP */} + udp_zero_copy_asio_mrb(void *mem, bounded_buffer &pending): + _mem(mem), _len(0), _pending(pending){/* NOP */} void release(void){ if (_len == 0) return; - this->_release_cb(this); + _pending.push_with_haste(this); _len = 0; } @@ -63,7 +61,7 @@ private: void *_mem; size_t _len; - release_cb_type _release_cb; + bounded_buffer &_pending; }; /*********************************************************************** @@ -73,14 +71,13 @@ private: **********************************************************************/ class udp_zero_copy_asio_msb : public managed_send_buffer{ public: - typedef boost::function commit_cb_type; - - udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): - _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */} + udp_zero_copy_asio_msb(void *mem, bounded_buffer &pending, int sock_fd): + _mem(mem), _len(0), _pending(pending), _sock_fd(sock_fd){/* NOP */} void commit(size_t len){ if (_len == 0) return; - this->_commit_cb(this, len); + ::send(_sock_fd, this->cast(), len, 0); + _pending.push_with_haste(this); _len = 0; } @@ -95,7 +92,8 @@ private: void *_mem; size_t _len; - commit_cb_type _commit_cb; + bounded_buffer &_pending; + int _sock_fd; }; /*********************************************************************** @@ -138,18 +136,18 @@ public: //allocate re-usable managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ - _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), - boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) - ); - handle_recv(&_mrb_pool.back()); + _mrb_pool.push_back(udp_zero_copy_asio_mrb( + _recv_buffer_pool->at(i), _pending_recv_buffs + )); + _pending_recv_buffs.push_with_haste(&_mrb_pool.back()); } //allocate re-usable managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ - _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i), - boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) - ); - handle_send(&_msb_pool.back()); + _msb_pool.push_back(udp_zero_copy_asio_msb( + _send_buffer_pool->at(i), _pending_send_buffs, _sock_fd + )); + _pending_send_buffs.push_with_haste(&_msb_pool.back()); } } @@ -189,19 +187,11 @@ public: ::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0) ); - this->handle_recv(mrb); //timeout: return the managed buffer to the queue + _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue } return managed_recv_buffer::sptr(); } - UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ - _pending_recv_buffs.push_with_haste(mrb); - } - - void release(udp_zero_copy_asio_mrb *mrb){ - handle_recv(mrb); - } - size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} @@ -221,15 +211,6 @@ public: return managed_send_buffer::sptr(); } - UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ - _pending_send_buffs.push_with_haste(msb); - } - - void commit(udp_zero_copy_asio_msb *msb, size_t len){ - ::send(_sock_fd, msb->cast(), len, 0); - handle_send(msb); - } - size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} -- cgit v1.2.3