diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 13 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service_client.hpp | 44 | ||||
| -rw-r--r-- | host/lib/transport/inline_io_service.cpp | 81 | ||||
| -rw-r--r-- | host/lib/transport/offload_io_service.cpp | 27 | 
4 files changed, 140 insertions, 25 deletions
| diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp index fe41b96b6..d4a6dbbae 100644 --- a/host/lib/include/uhdlib/transport/inline_io_service.hpp +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -104,6 +104,19 @@ private:      frame_buff::uptr recv(          inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); +    /* +     * Function to perform recv operations on a link, which is potentially +     * muxed. This function is only called from send_io::release_send_buff, and +     * always expects recv_io_cb to release its incoming buffer. Packets are +     * forwarded to the appropriate mux or callback. +     * +     * \param recv_io_cb the callback+interface initiating the operation +     * \param recv_link link to perform receive on +     * \param timeout_ms timeout to wait for a buffer on the link +     */ +    void recv_flow_ctrl( +        inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); +      /* Track whether link is muxed and the callback */      std::unordered_map<recv_link_if*, std::tuple<inline_recv_mux*, inline_recv_cb*>>          _recv_tbl; diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp index 620e796ef..2f606878c 100644 --- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -50,7 +50,7 @@ static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms  /*!   * Recv I/O client for offload I/O service   */ -template <typename io_service_t> +template <typename io_service_t, bool polling>  class offload_recv_io : public recv_io_if  {  public: @@ -75,13 +75,19 @@ public:      frame_buff::uptr get_recv_buff(int32_t timeout_ms)      { -        return detail::client_get_buff( -            [this]() { -                frame_buff* buff = _port->client_pop(); -                _num_frames_in_use += buff ? 1 : 0; -                return buff; -            }, -            timeout_ms); +        if (polling) { +            return detail::client_get_buff( +                [this]() { +                    frame_buff* buff = _port->client_pop(); +                    _num_frames_in_use += buff ? 1 : 0; +                    return buff; +                }, +                timeout_ms); +        } else { +            frame_buff* buff = _port->client_pop(timeout_ms); +            _num_frames_in_use += buff ? 1 : 0; +            return frame_buff::uptr(buff); +        }      }      void release_recv_buff(frame_buff::uptr buff) @@ -103,7 +109,7 @@ private:  /*!   * Send I/O client for offload I/O service   */ -template <typename io_service_t> +template <typename io_service_t, bool polling>  class offload_send_io : public send_io_if  {  public: @@ -128,13 +134,19 @@ public:      frame_buff::uptr get_send_buff(int32_t timeout_ms)      { -        return detail::client_get_buff( -            [this]() { -                frame_buff* buff = _port->client_pop(); -                _num_frames_in_use += buff ? 1 : 0; -                return buff; -            }, -            timeout_ms); +        if (polling) { +            return detail::client_get_buff( +                [this]() { +                    frame_buff* buff = _port->client_pop(); +                    _num_frames_in_use += buff ? 1 : 0; +                    return buff; +                }, +                timeout_ms); +        } else { +            frame_buff* buff = _port->client_pop(timeout_ms); +            _num_frames_in_use += buff ? 1 : 0; +            return frame_buff::uptr(buff); +        }      }      void release_send_buff(frame_buff::uptr buff) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 93967e09a..6449bbda8 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,6 +144,41 @@ public:          }      } +    void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) +    { +        while (true) { +            frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); +            /* Process buffer */ +            if (buff) { +                bool rcvr_found = false; +                for (auto& rcvr : _callbacks) { +                    if (rcvr->callback(buff, recv_link)) { +                        rcvr_found = true; +                        if (rcvr == cb) { +                            assert(!buff); +                            return; +                        } else if (buff) { +                            /* NOTE: Should not overflow, by construction +                             * Every queue can hold link->get_num_recv_frames() +                             */ +                            _queues[rcvr]->push_back(buff.release()); +                        } else { +                            /* Continue looping if buffer was consumed and +                               receiver is not the requested one */ +                            break; +                        } +                    } +                } +                if (not rcvr_found) { +                    UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); +                    recv_link->release_recv_buff(std::move(buff)); +                } +            } else { /* Timeout */ +                return; +            } +        } +    } +  private:      recv_link_if* _link;      std::list<inline_recv_cb*> _callbacks; @@ -248,11 +283,18 @@ public:      void release_send_buff(frame_buff::uptr buff)      { -        while (buff) { /* TODO: Possibly don't loop indefinitely here */ +        while (buff) { +            // Try to send a packet +            _send_cb(buff, _send_link.get());              if (_recv_link) { -                _io_srv->recv(this, _recv_link.get(), 0); +                // If the buffer was not released, use a timeout to receive +                // the flow control packet, to avoid wasting CPU. +                if (!buff) { +                    _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); +                } else { +                    _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); +                }              } -            _send_cb(buff, _send_link.get());          }          _num_frames_in_use--;      } @@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv(              return frame_buff::uptr();          }      } -    return frame_buff::uptr(); +} + +void inline_io_service::recv_flow_ctrl( +    inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ +    inline_recv_mux* mux; +    inline_recv_cb* rcvr; +    std::tie(mux, rcvr) = _recv_tbl.at(recv_link); + +    if (mux) { +        /* Defer to mux's recv_flow_ctrl() if present */ +        mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); +        return; +    } else { +        assert(recv_io_cb == rcvr); +    } + +    while (true) { +        frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); +        /* Process buffer */ +        if (buff) { +            if (rcvr->callback(buff, recv_link)) { +                assert(!buff); +                return; +            } else { +                UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); +                recv_link->release_recv_buff(std::move(buff)); +            } +        } else { /* Timeout */ +            return; +        } +    }  }  }} // namespace uhd::transport diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index c9b9af344..53c8f017d 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -24,7 +24,7 @@ namespace uhd { namespace transport {  namespace { -constexpr int32_t blocking_timeout_ms = 100; +constexpr int32_t blocking_timeout_ms = 10;  // Fixed-size queue that supports blocking semantics  template <typename queue_item_t> @@ -108,6 +108,13 @@ public:          return queue_element.buff;      } +    frame_buff* client_pop(int32_t timeout_ms) +    { +        from_offload_thread_t queue_element; +        _from_offload_thread.pop(queue_element, timeout_ms); +        return queue_element.buff; +    } +      size_t client_read_available()      {          return _from_offload_thread.read_available(); @@ -474,8 +481,13 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re      port->client_wait_until_connected();      // Return a new recv client to the caller that just operates on the queues -    return std::make_shared<offload_recv_io<offload_io_service_impl>>( -        shared_from_this(), num_recv_frames, num_send_frames, port); +    if (_offload_thread_params.wait_mode == POLL) { +        return std::make_shared<offload_recv_io<offload_io_service_impl, true>>( +            shared_from_this(), num_recv_frames, num_send_frames, port); +    } else { +        return std::make_shared<offload_recv_io<offload_io_service_impl, false>>( +            shared_from_this(), num_recv_frames, num_send_frames, port); +    }  }  send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link, @@ -528,8 +540,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se      }      // Return a new recv client to the caller that just operates on the queues -    return std::make_shared<offload_send_io<offload_io_service_impl>>( -        shared_from_this(), num_recv_frames, num_send_frames, port); +    if (_offload_thread_params.wait_mode == POLL) { +        return std::make_shared<offload_send_io<offload_io_service_impl, true>>( +            shared_from_this(), num_recv_frames, num_send_frames, port); +    } else { +        return std::make_shared<offload_send_io<offload_io_service_impl, false>>( +            shared_from_this(), num_recv_frames, num_send_frames, port); +    }  }  void offload_io_service_impl::_queue_client_req(std::function<void()> fn) | 
