aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/common/recv_packet_demuxer.cpp
blob: c5ed1563e2f6b6809ade25a393f61f4100963341 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//
// Copyright 2011,2014 Ettus Research LLC
// Copyright 2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/usrp/common/recv_packet_demuxer.hpp>
#include <boost/thread/mutex.hpp>
#include <deque>
#include <queue>
#include <vector>

using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;

struct recv_pkt_demux_mrb : public managed_recv_buffer
{
public:
    recv_pkt_demux_mrb(void)
    { /*NOP*/
    }

    void release(void)
    {
        delete this;
    }

    uint32_t buff[10];
};

static UHD_INLINE uint32_t extract_sid(managed_recv_buffer::sptr& buff)
{
    // ASSUME that the data is in little endian format
    return uhd::wtohx(buff->cast<const uint32_t*>()[1]);
}

recv_packet_demuxer::~recv_packet_demuxer(void)
{
    /* NOP */
}

class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer
{
public:
    recv_packet_demuxer_impl(transport::zero_copy_if::sptr transport,
        const size_t size,
        const uint32_t sid_base)
        : _transport(transport), _sid_base(sid_base), _queues(size)
    {
        /* NOP */
    }

    managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout)
    {
        boost::mutex::scoped_lock lock(_mutex);
        managed_recv_buffer::sptr buff;

        // there is already an entry in the queue, so pop that
        if (not _queues[index].wrapper.empty()) {
            std::swap(buff, _queues[index].wrapper.front());
            _queues[index].wrapper.pop();
            return buff;
        }

        while (true) {
            // otherwise call into the transport
            buff = _transport->get_recv_buff(timeout);
            if (buff.get() == NULL)
                return buff; // timeout

            // check the stream id to know which channel
            const size_t rx_index = extract_sid(buff) - _sid_base;
            if (rx_index == index)
                return buff; // got expected message

            // otherwise queue and try again
            if (rx_index < _queues.size())
                _queues[rx_index].wrapper.push(buff);
            else {
                UHD_LOGGER_ERROR("STREAMER")
                    << "Got a data packet with unknown SID " << extract_sid(buff);
                recv_pkt_demux_mrb* mrb = new recv_pkt_demux_mrb();
                vrt::if_packet_info_t info;
                info.packet_type         = vrt::if_packet_info_t::PACKET_TYPE_DATA;
                info.num_payload_words32 = 1;
                info.num_payload_bytes   = info.num_payload_words32 * sizeof(uint32_t);
                info.has_sid             = true;
                info.sid                 = _sid_base + index;
                vrt::if_hdr_pack_le(mrb->buff, info);
                mrb->buff[info.num_header_words32] = rx_metadata_t::ERROR_CODE_OVERFLOW;
                return mrb->make(
                    mrb, mrb->buff, info.num_packet_words32 * sizeof(uint32_t));
            }
        }
    }

private:
    transport::zero_copy_if::sptr _transport;
    const uint32_t _sid_base;
    boost::mutex _mutex;
    struct channel_guts_type
    {
        channel_guts_type(void) : wrapper(container) {}
        std::deque<managed_recv_buffer::sptr> container;
        std::queue<managed_recv_buffer::sptr> wrapper;
    };
    std::vector<channel_guts_type> _queues;
};

recv_packet_demuxer::sptr recv_packet_demuxer::make(
    transport::zero_copy_if::sptr transport, const size_t size, const uint32_t sid_base)
{
    return sptr(new recv_packet_demuxer_impl(transport, size, sid_base));
}