diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-05-23 20:38:07 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:29 -0800 |
commit | 75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch) | |
tree | 2904b48607cc07158aa6b068ada35ab56c4da516 /host/lib/include/uhdlib/transport/get_aligned_buffs.hpp | |
parent | d8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff) | |
download | uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2 uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip |
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports:
Transports build on I/O service and implements flow control and
sequence number checking.
The rx streamer subclass extends the streamer implementation to connect
it to the rfnoc graph. It receives configuration values from property
propagation and configures the streamer accordingly. It also implements
the issue_stream_cmd rx_streamer API method.
Add implementation of rx streamer creation and method to connect it to
an rfnoc block.
rfnoc_graph: Cache more connection info, clarify contract
Summary of changes:
- rfnoc_graph stores more information about static connections at the
beginning. Some search algorithms are replaced by simpler lookups.
- The contract for connect() was clarified. It is required to call
connect, even for static connections.
Diffstat (limited to 'host/lib/include/uhdlib/transport/get_aligned_buffs.hpp')
-rw-r--r-- | host/lib/include/uhdlib/transport/get_aligned_buffs.hpp | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, + std::vector<typename transport_t::buff_t::uptr>& frame_buffs, + std::vector<typename transport_t::packet_info_t>& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr>& _xports; + + // Storage for buffers resulting from alignment + std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector<typename transport_t::packet_info_t>& _infos; + + // Time of previous packet for each channel + std::vector<uint64_t> _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ |