aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
blob: 662be6d2d89a7b200faa95aab03cdd749b1aba55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP

#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
#include <boost/dynamic_bitset.hpp>
#include <boost/format.hpp>

namespace uhd { namespace transport {

// Number of iterations that get_aligned_buffs will attempt to time align
// packets before returning an alignment failure. get_aligned_buffs increments
// the iteration count when it finds a timestamp that is larger than the
// timestamps on channels it has already aligned and thus has to restart
// aligning timestamps on all channels to the new timestamp.
constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;

/*!
 * Implementation of rx time alignment. This method reads packets from the
 * transports for each channel and discards any packets whose tsf does not
 * match those of other channels due to dropped packets. Packets that do not
 * have a tsf are not checked for alignment and never dropped.
 */
template <typename transport_t>
class get_aligned_buffs
{
public:
    enum alignment_result_t {
        SUCCESS,
        TIMEOUT,
        SEQUENCE_ERROR,
        ALIGNMENT_FAILURE,
        BAD_PACKET
    };

    get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
        std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
        std::vector<typename transport_t::packet_info_t>& infos)
        : _xports(xports)
        , _frame_buffs(frame_buffs)
        , _infos(infos)
        , _prev_tsf(_xports.size(), 0)
        , _channels_to_align(_xports.size())
    {
    }

    alignment_result_t operator()(const int32_t timeout_ms)
    {
        // Clear state
        _channels_to_align.set();
        bool time_valid   = false;
        uint64_t tsf      = 0;
        size_t iterations = 0;

        while (_channels_to_align.any()) {
            const size_t chan = _channels_to_align.find_first();
            auto& xport       = _xports[chan];
            auto& info        = _infos[chan];
            auto& frame_buff  = _frame_buffs[chan];
            bool seq_error    = false;

            // Receive a data packet for the channel if we don't have one. A
            // packet may already be there if the previous call was interrupted
            // by an error.
            if (!frame_buff) {
                try {
                    std::tie(frame_buff, info, seq_error) =
                        xport->get_recv_buff(timeout_ms);
                } catch (const uhd::value_error& e) {
                    // Bad packet
                    UHD_LOGGER_ERROR("STREAMER")
                        << boost::format(
                               "The receive transport caught a value exception.\n%s")
                               % e.what();
                    return BAD_PACKET;
                }
            }

            if (!frame_buff) {
                return TIMEOUT;
            }

            if (info.has_tsf) {
                const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
                _prev_tsf[chan]              = info.tsf;

                // If the user changes the device time while streaming, we can
                // receive a packet that comes before the previous packet in
                // time. This would cause the alignment logic to discard future
                // received packets. Therefore, when this occurs, we reset the
                // info to restart the alignment.
                if (time_out_of_order) {
                    time_valid = false;
                }

                // Check if the time is larger than packets received for other
                // channels, and if so, use this time to align all channels
                if (!time_valid || info.tsf > tsf) {
                    // If we haven't found a set of aligned packets after many
                    // iterations, return an alignment failure
                    if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
                        UHD_LOGGER_ERROR("STREAMER")
                            << "The rx streamer failed to time-align packets.";
                        return ALIGNMENT_FAILURE;
                    }

                    // Release buffers for channels aligned previously. Keep
                    // buffers that don't have a tsf since we don't need to
                    // align those.
                    for (size_t i = 0; i < _xports.size(); i++) {
                        if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
                            _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
                            _frame_buffs[i] = nullptr;
                        }
                    }

                    // Mark only this channel as aligned and save its tsf
                    _channels_to_align.set();
                    _channels_to_align.reset(chan);
                    time_valid = true;
                    tsf        = info.tsf;
                }

                // Check if the time matches that of other aligned channels
                else if (info.tsf == tsf) {
                    _channels_to_align.reset(chan);
                }

                // Otherwise, time is smaller than other channels, release the buffer
                else {
                    _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
                    _frame_buffs[chan] = nullptr;
                }
            } else {
                // Packet doesn't have a tsf, just mark it as aligned
                _channels_to_align.reset(chan);
            }

            // If this packet had a sequence error, stop to return the error.
            // Keep the packet for the next call to get_aligned_buffs.
            if (seq_error) {
                return SEQUENCE_ERROR;
            }
        }

        // All channels aligned
        return SUCCESS;
    }

private:
    // Transports for each channel
    std::vector<typename transport_t::uptr>& _xports;

    // Storage for buffers resulting from alignment
    std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;

    // Packet info corresponding to aligned buffers
    std::vector<typename transport_t::packet_info_t>& _infos;

    // Time of previous packet for each channel
    std::vector<uint64_t> _prev_tsf;

    // Keeps track of channels that are aligned
    boost::dynamic_bitset<> _channels_to_align;
};

}} // namespace uhd::transport

#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */