diff options
author | Aaron Rossetto <aaron.rossetto@ni.com> | 2019-10-17 08:44:11 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:32 -0800 |
commit | 0bd233e64210c6605e8a6ec1424fa81f9ea8a681 (patch) | |
tree | f97729a7bba21cdfc45ee756bee1ac0489358544 /host/lib/transport | |
parent | 912ed28b3df13b9f9c33f2fa92867ec0ac7445fd (diff) | |
download | uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.gz uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.bz2 uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.zip |
uhd: Introduce I/O service manager
- Implement I/O service detach link methods
- The I/O service manager instantiates new I/O services or connects
links to existing I/O services based on options provided by the user
in stream_args.
- Add a streamer ID parameter to methods to create transports so that
the I/O service manager can group transports appropriately when using
offload threads.
- Change X300 and MPMD to use I/O service manager to connect links to
I/O services.
- There is now a single I/O service manager per rfnoc_graph (and it is
also stored in the graph)
- The I/O service manager now also knows the device args for the
rfnoc_graph it was created with, and can make decisions based upon
those (e.g, use a specific I/O service for DPDK, share cores between
streamers, etc.)
- The I/O Service Manager does not get any decision logic with this
commit, though
- The MB ifaces for mpmd and x300 now access this global I/O service
manager
- Add configuration of link parameters with overrides
Co-Authored-By: Martin Braun <martin.braun@ettus.com>
Co-Authored-By: Aaron Rossetto <aaron.rossetto@ni.com>
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 30 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 73 |
2 files changed, 76 insertions, 27 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 9dd0814ca..93967e09a 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -272,10 +272,19 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link) { auto link_ptr = link.get(); UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); - _recv_tbl[link_ptr] = - std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr); + _recv_tbl[link_ptr] = std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr); _recv_links.push_back(link); -}; +} + +void inline_io_service::detach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); + _recv_tbl.erase(link_ptr); + + _recv_links.remove_if( + [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); +} recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link, size_t num_recv_frames, @@ -301,9 +310,17 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin void inline_io_service::attach_send_link(send_link_if::sptr link) { - UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end()); + UHD_ASSERT_THROW( + std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end()); _send_links.push_back(link); -}; +} + +void inline_io_service::detach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + _send_links.remove_if( + [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); +} send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link, size_t num_send_frames, @@ -365,8 +382,7 @@ void inline_io_service::connect_receiver( _recv_tbl[link] = std::make_tuple(mux, rcvr); } -void inline_io_service::disconnect_receiver( - recv_link_if* link, inline_recv_cb* cb) +void inline_io_service::disconnect_receiver(recv_link_if* link, inline_recv_cb* cb) { inline_recv_mux* mux; inline_recv_cb* rcvr; diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index ed28a93f9..012c86868 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -54,6 +54,20 @@ public: _send_tbl[send_link.get()] = 0; } + void unregister_link(const recv_link_if::sptr& recv_link) + { + auto link_ptr = recv_link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); + _recv_tbl.erase(link_ptr); + } + + void unregister_link(const send_link_if::sptr& send_link) + { + auto link_ptr = send_link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); + _send_tbl.erase(link_ptr); + } + void reserve_frames(const frame_reservation_t& reservation) { if (reservation.recv_link) { @@ -358,6 +372,9 @@ public: void attach_recv_link(recv_link_if::sptr link); void attach_send_link(send_link_if::sptr link); + void detach_recv_link(recv_link_if::sptr link); + void detach_send_link(send_link_if::sptr link); + recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link, size_t num_recv_frames, recv_callback_t cb, @@ -400,6 +417,7 @@ private: frame_reservation_t frames_reserved; }; + void _queue_client_req(std::function<void()> fn); void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms); void _get_send_buff(send_client_info_t& info); void _release_recv_buff(recv_client_info_t& info, frame_buff* buff); @@ -661,12 +679,7 @@ void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link) _io_srv->attach_recv_link(link); }; - client_req_t queue_element; - queue_element.req = {new std::function<void()>(req_fn)}; - const bool success = _client_connect_queue.push(queue_element); - if (!success) { - throw uhd::runtime_error("Failed to push attach_recv_link request"); - } + _queue_client_req(req_fn); } void offload_io_service_impl::attach_send_link(send_link_if::sptr link) @@ -685,6 +698,28 @@ void offload_io_service_impl::attach_send_link(send_link_if::sptr link) } } +void offload_io_service_impl::detach_recv_link(recv_link_if::sptr link) +{ + // Create a request to detach link in the offload thread + auto req_fn = [this, link]() { + _reservation_mgr.unregister_link(link); + _io_srv->detach_recv_link(link); + }; + + _queue_client_req(req_fn); +} + +void offload_io_service_impl::detach_send_link(send_link_if::sptr link) +{ + // Create a request to detach link in the offload thread + auto req_fn = [this, link]() { + _reservation_mgr.unregister_link(link); + _io_srv->detach_send_link(link); + }; + + _queue_client_req(req_fn); +} + recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link, size_t num_recv_frames, recv_callback_t cb, @@ -720,13 +755,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re port->offload_thread_set_connected(true); }; - client_req_t queue_element; - queue_element.req = {new std::function<void()>(req_fn)}; - const bool success = _client_connect_queue.push(queue_element); - if (!success) { - throw uhd::runtime_error("Failed to push make_recv_client request"); - } - + _queue_client_req(req_fn); port->client_wait_until_connected(); // Return a new recv client to the caller that just operates on the queues @@ -775,13 +804,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se port->offload_thread_set_connected(true); }; - client_req_t queue_element; - queue_element.req = {new std::function<void()>(req_fn)}; - const bool success = _client_connect_queue.push(queue_element); - if (!success) { - throw uhd::runtime_error("Failed to push make_send_client request"); - } - + _queue_client_req(req_fn); port->client_wait_until_connected(); // Wait for buffer queue to be full @@ -794,6 +817,16 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se shared_from_this(), num_recv_frames, num_send_frames, port); } +void offload_io_service_impl::_queue_client_req(std::function<void()> fn) +{ + client_req_t queue_element; + queue_element.req = {new std::function<void()>(fn)}; + const bool success = _client_connect_queue.push(queue_element); + if (!success) { + throw uhd::runtime_error("Failed to queue client request"); + } +} + // Get a single receive buffer if available and update client info void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms) { |