aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2019-10-17 08:44:11 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:32 -0800
commit0bd233e64210c6605e8a6ec1424fa81f9ea8a681 (patch)
treef97729a7bba21cdfc45ee756bee1ac0489358544 /host/lib/transport
parent912ed28b3df13b9f9c33f2fa92867ec0ac7445fd (diff)
downloaduhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.gz
uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.bz2
uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.zip
uhd: Introduce I/O service manager
- Implement I/O service detach link methods - The I/O service manager instantiates new I/O services or connects links to existing I/O services based on options provided by the user in stream_args. - Add a streamer ID parameter to methods to create transports so that the I/O service manager can group transports appropriately when using offload threads. - Change X300 and MPMD to use I/O service manager to connect links to I/O services. - There is now a single I/O service manager per rfnoc_graph (and it is also stored in the graph) - The I/O service manager now also knows the device args for the rfnoc_graph it was created with, and can make decisions based upon those (e.g, use a specific I/O service for DPDK, share cores between streamers, etc.) - The I/O Service Manager does not get any decision logic with this commit, though - The MB ifaces for mpmd and x300 now access this global I/O service manager - Add configuration of link parameters with overrides Co-Authored-By: Martin Braun <martin.braun@ettus.com> Co-Authored-By: Aaron Rossetto <aaron.rossetto@ni.com>
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/inline_io_service.cpp30
-rw-r--r--host/lib/transport/offload_io_service.cpp73
2 files changed, 76 insertions, 27 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 9dd0814ca..93967e09a 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -272,10 +272,19 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link)
{
auto link_ptr = link.get();
UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0);
- _recv_tbl[link_ptr] =
- std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr);
+ _recv_tbl[link_ptr] = std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr);
_recv_links.push_back(link);
-};
+}
+
+void inline_io_service::detach_recv_link(recv_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
+ _recv_tbl.erase(link_ptr);
+
+ _recv_links.remove_if(
+ [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; });
+}
recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link,
size_t num_recv_frames,
@@ -301,9 +310,17 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin
void inline_io_service::attach_send_link(send_link_if::sptr link)
{
- UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end());
+ UHD_ASSERT_THROW(
+ std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end());
_send_links.push_back(link);
-};
+}
+
+void inline_io_service::detach_send_link(send_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ _send_links.remove_if(
+ [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; });
+}
send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link,
size_t num_send_frames,
@@ -365,8 +382,7 @@ void inline_io_service::connect_receiver(
_recv_tbl[link] = std::make_tuple(mux, rcvr);
}
-void inline_io_service::disconnect_receiver(
- recv_link_if* link, inline_recv_cb* cb)
+void inline_io_service::disconnect_receiver(recv_link_if* link, inline_recv_cb* cb)
{
inline_recv_mux* mux;
inline_recv_cb* rcvr;
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index ed28a93f9..012c86868 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -54,6 +54,20 @@ public:
_send_tbl[send_link.get()] = 0;
}
+ void unregister_link(const recv_link_if::sptr& recv_link)
+ {
+ auto link_ptr = recv_link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
+ _recv_tbl.erase(link_ptr);
+ }
+
+ void unregister_link(const send_link_if::sptr& send_link)
+ {
+ auto link_ptr = send_link.get();
+ UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0);
+ _send_tbl.erase(link_ptr);
+ }
+
void reserve_frames(const frame_reservation_t& reservation)
{
if (reservation.recv_link) {
@@ -358,6 +372,9 @@ public:
void attach_recv_link(recv_link_if::sptr link);
void attach_send_link(send_link_if::sptr link);
+ void detach_recv_link(recv_link_if::sptr link);
+ void detach_send_link(send_link_if::sptr link);
+
recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link,
size_t num_recv_frames,
recv_callback_t cb,
@@ -400,6 +417,7 @@ private:
frame_reservation_t frames_reserved;
};
+ void _queue_client_req(std::function<void()> fn);
void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms);
void _get_send_buff(send_client_info_t& info);
void _release_recv_buff(recv_client_info_t& info, frame_buff* buff);
@@ -661,12 +679,7 @@ void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link)
_io_srv->attach_recv_link(link);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push attach_recv_link request");
- }
+ _queue_client_req(req_fn);
}
void offload_io_service_impl::attach_send_link(send_link_if::sptr link)
@@ -685,6 +698,28 @@ void offload_io_service_impl::attach_send_link(send_link_if::sptr link)
}
}
+void offload_io_service_impl::detach_recv_link(recv_link_if::sptr link)
+{
+ // Create a request to detach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.unregister_link(link);
+ _io_srv->detach_recv_link(link);
+ };
+
+ _queue_client_req(req_fn);
+}
+
+void offload_io_service_impl::detach_send_link(send_link_if::sptr link)
+{
+ // Create a request to detach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.unregister_link(link);
+ _io_srv->detach_send_link(link);
+ };
+
+ _queue_client_req(req_fn);
+}
+
recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link,
size_t num_recv_frames,
recv_callback_t cb,
@@ -720,13 +755,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->offload_thread_set_connected(true);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push make_recv_client request");
- }
-
+ _queue_client_req(req_fn);
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
@@ -775,13 +804,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
port->offload_thread_set_connected(true);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push make_send_client request");
- }
-
+ _queue_client_req(req_fn);
port->client_wait_until_connected();
// Wait for buffer queue to be full
@@ -794,6 +817,16 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
shared_from_this(), num_recv_frames, num_send_frames, port);
}
+void offload_io_service_impl::_queue_client_req(std::function<void()> fn)
+{
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to queue client request");
+ }
+}
+
// Get a single receive buffer if available and update client info
void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms)
{