diff options
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 59 |
1 files changed, 41 insertions, 18 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 48b0941eb..87c5ec823 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -187,13 +187,19 @@ public: return get_buff_size<Opt>(); } - UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ - _pending_recv_buffs.push_with_pop_on_full(mrb); - } - - managed_recv_buffer::sptr get_recv_buff(double timeout){ - udp_zero_copy_asio_mrb *mrb; - + /******************************************************************* + * Receive implementation: + * + * Use select to perform a blocking receive with timeout. + * Return the managed receive buffer with the new length. + * When the caller is finished with the managed buffer, + * the managed receive buffer is released back into the queue. + * + * Assumptions: + * - A managed buffer is always available. + * - The queue can never be over-filled. + ******************************************************************/ + UHD_INLINE bool is_recv_socket_ready(double timeout){ //setup timeval for timeout timeval tv; tv.tv_sec = 0; @@ -204,17 +210,22 @@ public: FD_ZERO(&rset); FD_SET(_sock_fd, &rset); - //call select to perform timed wait and grab an available buffer now - //if the condition is true, call receive and return the managed buffer - if ( - ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 - and _pending_recv_buffs.pop_with_haste(mrb) - ){ + //call select with timeout on receive socket + return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0; + } + + managed_recv_buffer::sptr get_recv_buff(double timeout){ + udp_zero_copy_asio_mrb *mrb = NULL; + if (is_recv_socket_ready(timeout) and _pending_recv_buffs.pop_with_haste(mrb)){ return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)); } return managed_recv_buffer::sptr(); } + UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ + _pending_recv_buffs.push_with_pop_on_full(mrb); + } + void release(udp_zero_copy_asio_mrb *mrb){ handle_recv(mrb); } @@ -222,18 +233,30 @@ public: size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} - UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ - _pending_send_buffs.push_with_pop_on_full(msb); - } - + /******************************************************************* + * Send implementation: + * + * Get a managed receive buffer immediately with max length set. + * The caller will fill the buffer and commit it when finished. + * The commit routine will perform a blocking send operation, + * and push the managed send buffer back into the queue. + * + * Assumptions: + * - A managed buffer is always available. + * - The queue can never be over-filled. + ******************************************************************/ managed_send_buffer::sptr get_send_buff(double){ - udp_zero_copy_asio_msb *msb; + udp_zero_copy_asio_msb *msb = NULL; if (_pending_send_buffs.pop_with_haste(msb)){ return msb->get_new(_send_frame_size); } return managed_send_buffer::sptr(); } + UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ + _pending_send_buffs.push_with_pop_on_full(msb); + } + void commit(udp_zero_copy_asio_msb *msb, size_t len){ ::send(_sock_fd, msb->cast<const char *>(), len, 0); handle_send(msb); |