diff options
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 59 | 
1 files changed, 41 insertions, 18 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 48b0941eb..87c5ec823 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -187,13 +187,19 @@ public:          return get_buff_size<Opt>();      } -    UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ -        _pending_recv_buffs.push_with_pop_on_full(mrb); -    } - -    managed_recv_buffer::sptr get_recv_buff(double timeout){ -        udp_zero_copy_asio_mrb *mrb; - +    /******************************************************************* +     * Receive implementation: +     * +     * Use select to perform a blocking receive with timeout. +     * Return the managed receive buffer with the new length. +     * When the caller is finished with the managed buffer, +     * the managed receive buffer is released back into the queue. +     * +     * Assumptions: +     *  - A managed buffer is always available. +     *  - The queue can never be over-filled. +     ******************************************************************/ +    UHD_INLINE bool is_recv_socket_ready(double timeout){          //setup timeval for timeout          timeval tv;          tv.tv_sec = 0; @@ -204,17 +210,22 @@ public:          FD_ZERO(&rset);          FD_SET(_sock_fd, &rset); -        //call select to perform timed wait and grab an available buffer now -        //if the condition is true, call receive and return the managed buffer -        if ( -            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 -            and _pending_recv_buffs.pop_with_haste(mrb) -        ){ +        //call select with timeout on receive socket +        return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0; +    } + +    managed_recv_buffer::sptr get_recv_buff(double timeout){ +        udp_zero_copy_asio_mrb *mrb = NULL; +        if (is_recv_socket_ready(timeout) and _pending_recv_buffs.pop_with_haste(mrb)){              return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0));          }          return managed_recv_buffer::sptr();      } +    UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ +        _pending_recv_buffs.push_with_pop_on_full(mrb); +    } +      void release(udp_zero_copy_asio_mrb *mrb){          handle_recv(mrb);      } @@ -222,18 +233,30 @@ public:      size_t get_num_recv_frames(void) const {return _num_recv_frames;}      size_t get_recv_frame_size(void) const {return _recv_frame_size;} -    UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ -        _pending_send_buffs.push_with_pop_on_full(msb); -    } - +    /******************************************************************* +     * Send implementation: +     * +     * Get a managed receive buffer immediately with max length set. +     * The caller will fill the buffer and commit it when finished. +     * The commit routine will perform a blocking send operation, +     * and push the managed send buffer back into the queue. +     * +     * Assumptions: +     *  - A managed buffer is always available. +     *  - The queue can never be over-filled. +     ******************************************************************/      managed_send_buffer::sptr get_send_buff(double){ -        udp_zero_copy_asio_msb *msb; +        udp_zero_copy_asio_msb *msb = NULL;          if (_pending_send_buffs.pop_with_haste(msb)){              return msb->get_new(_send_frame_size);          }          return managed_send_buffer::sptr();      } +    UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ +        _pending_send_buffs.push_with_pop_on_full(msb); +    } +      void commit(udp_zero_copy_asio_msb *msb, size_t len){          ::send(_sock_fd, msb->cast<const char *>(), len, 0);          handle_send(msb);  | 
