diff options
Diffstat (limited to 'host/lib/include/uhdlib/transport/get_aligned_buffs.hpp')
-rw-r--r-- | host/lib/include/uhdlib/transport/get_aligned_buffs.hpp | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, + std::vector<typename transport_t::buff_t::uptr>& frame_buffs, + std::vector<typename transport_t::packet_info_t>& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr>& _xports; + + // Storage for buffers resulting from alignment + std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector<typename transport_t::packet_info_t>& _infos; + + // Time of previous packet for each channel + std::vector<uint64_t> _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ |