diff options
Diffstat (limited to 'host/lib/rfnoc/chdr_ctrl_endpoint.cpp')
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_endpoint.cpp | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp index d1d1dccca..d3c7cd58f 100644 --- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp +++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp @@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default; class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint { public: - chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport, + chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid) : _my_epid(my_epid) @@ -57,7 +57,14 @@ public: // there are no timed blocks on the underlying. _recv_thread.join(); // Flush base transport - while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/; + while (true) { + auto buff = _xport->get_recv_buff(100); + if (buff) { + _xport->release_recv_buff(std::move(buff)); + } else { + break; + } + } // Release child endpoints _endpoint_map.clear();); } @@ -82,9 +89,10 @@ public: header.set_dst_epid(dst_epid); // Acquire send buffer and send the packet std::lock_guard<std::mutex> lock(_send_mutex); - auto send_buff = _xport.send->get_send_buff(timeout); - _send_pkt->refresh(send_buff->cast<void*>(), header, payload); - send_buff->commit(header.get_length()); + auto send_buff = _xport->get_send_buff(timeout * 1000); + _send_pkt->refresh(send_buff->data(), header, payload); + send_buff->set_packet_size(header.get_length()); + _xport->release_send_buff(std::move(send_buff)); }; if (_endpoint_map.find(key) == _endpoint_map.end()) { @@ -118,11 +126,14 @@ private: // - Route them based on the dst_port // - Pass them to the ctrlport_endpoint for additional processing while (not _stop_recv_thread) { - auto buff = _xport.recv->get_recv_buff(0.0); + // FIXME Move lock back once have threaded_io_service + std::unique_lock<std::mutex> lock(_mutex); + auto buff = _xport->get_recv_buff(0); if (buff) { - std::lock_guard<std::mutex> lock(_mutex); + // FIXME Move lock back to here once have threaded_io_service + // std::lock_guard<std::mutex> lock(_mutex); try { - _recv_pkt->refresh(buff->cast<void*>()); + _recv_pkt->refresh(buff->data()); const ctrl_payload payload = _recv_pkt->get_payload(); ep_map_key_t key{payload.src_epid, payload.dst_port}; if (_endpoint_map.find(key) != _endpoint_map.end()) { @@ -131,7 +142,10 @@ private: } catch (...) { // Ignore all errors } + _xport->release_recv_buff(std::move(buff)); } else { + // FIXME Move lock back to lock_guard once have threaded_io_service + lock.unlock(); // Be a good citizen and yield if no packet is processed static const size_t MIN_DUR = 1; boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); @@ -154,7 +168,7 @@ private: // The endpoint ID of this software endpoint const sep_id_t _my_epid; // Send/recv transports - const chdr_ctrl_xport_t _xport; + chdr_ctrl_xport::sptr _xport; // The curent sequence number for a send packet size_t _send_seqnum = 0; // The number of packets dropped @@ -173,7 +187,7 @@ private: std::mutex _send_mutex; }; -chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport, +chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid) { |