From 75a090543b8fb8e7c875387eee6d3fe7227e4450 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Thu, 23 May 2019 20:38:07 -0500 Subject: rfnoc: add rx and tx transports, and amend rfnoc_graph transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections. --- .../include/uhdlib/transport/get_aligned_buffs.hpp | 175 +++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 host/lib/include/uhdlib/transport/get_aligned_buffs.hpp (limited to 'host/lib/include/uhdlib/transport/get_aligned_buffs.hpp') diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include +#include +#include +#include + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector& xports, + std::vector& frame_buffs, + std::vector& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector& _xports; + + // Storage for buffers resulting from alignment + std::vector& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector& _infos; + + // Time of previous packet for each channel + std::vector _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ -- cgit v1.2.3