aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/chdr_ctrl_endpoint.cpp')
-rw-r--r--host/lib/rfnoc/chdr_ctrl_endpoint.cpp34
1 files changed, 24 insertions, 10 deletions
diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
index d1d1dccca..d3c7cd58f 100644
--- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
+++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
@@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
{
public:
- chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport,
+ chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
: _my_epid(my_epid)
@@ -57,7 +57,14 @@ public:
// there are no timed blocks on the underlying.
_recv_thread.join();
// Flush base transport
- while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/;
+ while (true) {
+ auto buff = _xport->get_recv_buff(100);
+ if (buff) {
+ _xport->release_recv_buff(std::move(buff));
+ } else {
+ break;
+ }
+ }
// Release child endpoints
_endpoint_map.clear(););
}
@@ -82,9 +89,10 @@ public:
header.set_dst_epid(dst_epid);
// Acquire send buffer and send the packet
std::lock_guard<std::mutex> lock(_send_mutex);
- auto send_buff = _xport.send->get_send_buff(timeout);
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ auto send_buff = _xport->get_send_buff(timeout * 1000);
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ _xport->release_send_buff(std::move(send_buff));
};
if (_endpoint_map.find(key) == _endpoint_map.end()) {
@@ -118,11 +126,14 @@ private:
// - Route them based on the dst_port
// - Pass them to the ctrlport_endpoint for additional processing
while (not _stop_recv_thread) {
- auto buff = _xport.recv->get_recv_buff(0.0);
+ // FIXME Move lock back once have threaded_io_service
+ std::unique_lock<std::mutex> lock(_mutex);
+ auto buff = _xport->get_recv_buff(0);
if (buff) {
- std::lock_guard<std::mutex> lock(_mutex);
+ // FIXME Move lock back to here once have threaded_io_service
+ // std::lock_guard<std::mutex> lock(_mutex);
try {
- _recv_pkt->refresh(buff->cast<void*>());
+ _recv_pkt->refresh(buff->data());
const ctrl_payload payload = _recv_pkt->get_payload();
ep_map_key_t key{payload.src_epid, payload.dst_port};
if (_endpoint_map.find(key) != _endpoint_map.end()) {
@@ -131,7 +142,10 @@ private:
} catch (...) {
// Ignore all errors
}
+ _xport->release_recv_buff(std::move(buff));
} else {
+ // FIXME Move lock back to lock_guard once have threaded_io_service
+ lock.unlock();
// Be a good citizen and yield if no packet is processed
static const size_t MIN_DUR = 1;
boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
@@ -154,7 +168,7 @@ private:
// The endpoint ID of this software endpoint
const sep_id_t _my_epid;
// Send/recv transports
- const chdr_ctrl_xport_t _xport;
+ chdr_ctrl_xport::sptr _xport;
// The curent sequence number for a send packet
size_t _send_seqnum = 0;
// The number of packets dropped
@@ -173,7 +187,7 @@ private:
std::mutex _send_mutex;
};
-chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport,
+chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
{