diff options
Diffstat (limited to 'host/lib/usrp/common/recv_packet_demuxer.cpp')
-rw-r--r-- | host/lib/usrp/common/recv_packet_demuxer.cpp | 88 |
1 files changed, 49 insertions, 39 deletions
diff --git a/host/lib/usrp/common/recv_packet_demuxer.cpp b/host/lib/usrp/common/recv_packet_demuxer.cpp index 96eafa4be..c5ed1563e 100644 --- a/host/lib/usrp/common/recv_packet_demuxer.cpp +++ b/host/lib/usrp/common/recv_packet_demuxer.cpp @@ -5,15 +5,14 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include <uhdlib/usrp/common/recv_packet_demuxer.hpp> -#include <uhd/utils/log.hpp> -#include <uhd/utils/byteswap.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/types/metadata.hpp> - +#include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/usrp/common/recv_packet_demuxer.hpp> #include <boost/thread/mutex.hpp> -#include <queue> #include <deque> +#include <queue> #include <vector> using namespace uhd; @@ -23,7 +22,9 @@ using namespace uhd::transport; struct recv_pkt_demux_mrb : public managed_recv_buffer { public: - recv_pkt_demux_mrb(void){/*NOP*/} + recv_pkt_demux_mrb(void) + { /*NOP*/ + } void release(void) { @@ -33,62 +34,68 @@ public: uint32_t buff[10]; }; -static UHD_INLINE uint32_t extract_sid(managed_recv_buffer::sptr &buff){ - //ASSUME that the data is in little endian format - return uhd::wtohx(buff->cast<const uint32_t *>()[1]); +static UHD_INLINE uint32_t extract_sid(managed_recv_buffer::sptr& buff) +{ + // ASSUME that the data is in little endian format + return uhd::wtohx(buff->cast<const uint32_t*>()[1]); } -recv_packet_demuxer::~recv_packet_demuxer(void){ +recv_packet_demuxer::~recv_packet_demuxer(void) +{ /* NOP */ } -class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer{ +class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer +{ public: - recv_packet_demuxer_impl( - transport::zero_copy_if::sptr transport, + recv_packet_demuxer_impl(transport::zero_copy_if::sptr transport, const size_t size, - const uint32_t sid_base - ): - _transport(transport), _sid_base(sid_base), _queues(size) + const uint32_t sid_base) + : _transport(transport), _sid_base(sid_base), _queues(size) { /* NOP */ } - managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){ + managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout) + { boost::mutex::scoped_lock lock(_mutex); managed_recv_buffer::sptr buff; - //there is already an entry in the queue, so pop that - if (not _queues[index].wrapper.empty()){ + // there is already an entry in the queue, so pop that + if (not _queues[index].wrapper.empty()) { std::swap(buff, _queues[index].wrapper.front()); _queues[index].wrapper.pop(); return buff; } - while (true){ - //otherwise call into the transport + while (true) { + // otherwise call into the transport buff = _transport->get_recv_buff(timeout); - if (buff.get() == NULL) return buff; //timeout + if (buff.get() == NULL) + return buff; // timeout - //check the stream id to know which channel + // check the stream id to know which channel const size_t rx_index = extract_sid(buff) - _sid_base; - if (rx_index == index) return buff; //got expected message - - //otherwise queue and try again - if (rx_index < _queues.size()) _queues[rx_index].wrapper.push(buff); - else - { - UHD_LOGGER_ERROR("STREAMER") << "Got a data packet with unknown SID " << extract_sid(buff) ; - recv_pkt_demux_mrb *mrb = new recv_pkt_demux_mrb(); + if (rx_index == index) + return buff; // got expected message + + // otherwise queue and try again + if (rx_index < _queues.size()) + _queues[rx_index].wrapper.push(buff); + else { + UHD_LOGGER_ERROR("STREAMER") + << "Got a data packet with unknown SID " << extract_sid(buff); + recv_pkt_demux_mrb* mrb = new recv_pkt_demux_mrb(); vrt::if_packet_info_t info; - info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; + info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; info.num_payload_words32 = 1; - info.num_payload_bytes = info.num_payload_words32*sizeof(uint32_t); - info.has_sid = true; - info.sid = _sid_base + index; + info.num_payload_bytes = info.num_payload_words32 * sizeof(uint32_t); + info.has_sid = true; + info.sid = _sid_base + index; vrt::if_hdr_pack_le(mrb->buff, info); mrb->buff[info.num_header_words32] = rx_metadata_t::ERROR_CODE_OVERFLOW; - return mrb->make(mrb, mrb->buff, info.num_packet_words32*sizeof(uint32_t)); + return mrb->make( + mrb, mrb->buff, info.num_packet_words32 * sizeof(uint32_t)); } } } @@ -97,14 +104,17 @@ private: transport::zero_copy_if::sptr _transport; const uint32_t _sid_base; boost::mutex _mutex; - struct channel_guts_type{ - channel_guts_type(void): wrapper(container){} + struct channel_guts_type + { + channel_guts_type(void) : wrapper(container) {} std::deque<managed_recv_buffer::sptr> container; std::queue<managed_recv_buffer::sptr> wrapper; }; std::vector<channel_guts_type> _queues; }; -recv_packet_demuxer::sptr recv_packet_demuxer::make(transport::zero_copy_if::sptr transport, const size_t size, const uint32_t sid_base){ +recv_packet_demuxer::sptr recv_packet_demuxer::make( + transport::zero_copy_if::sptr transport, const size_t size, const uint32_t sid_base) +{ return sptr(new recv_packet_demuxer_impl(transport, size, sid_base)); } |