aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/muxed_zero_copy_if.cpp
blob: 37bea73557110a7f9249f2392e506a3183bdbb6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
//
// Copyright 2016 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
//

#include <uhd/transport/muxed_zero_copy_if.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/exception.hpp>
#include <uhd/utils/safe_call.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>
#include <map>

using namespace uhd;
using namespace uhd::transport;

class muxed_zero_copy_if_impl : public muxed_zero_copy_if,
                                public boost::enable_shared_from_this<muxed_zero_copy_if_impl>
{
public:
    typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr;

    muxed_zero_copy_if_impl(
        zero_copy_if::sptr base_xport,
        stream_classifier_fn classify_fn,
        size_t max_streams
    ):
        _base_xport(base_xport), _classify(classify_fn),
        _max_num_streams(max_streams), _num_dropped_frames(0)
    {
        //Create the receive thread to poll the underlying transport
        //and classify packets into queues
        _recv_thread = boost::thread(
            boost::bind(&muxed_zero_copy_if_impl::_update_queues, this));
    }

    virtual ~muxed_zero_copy_if_impl()
    {
        UHD_SAFE_CALL(
            //Interrupt buffer updater loop
            _recv_thread.interrupt();
            //Wait for loop to finish
            //No timeout on join. The recv loop is guaranteed
            //to terminate in a reasonable amount of time because
            //there are no timed blocks on the underlying.
            _recv_thread.join();
            //Flush base transport
            while (_base_xport->get_recv_buff(0.0001)) /*NOP*/;
            //Release child streams
            //Note that this will not delete or flush the child streams
            //until the owners of the streams have released the respective
            //shared pointers. This ensures that packets are not dropped.
            _streams.clear();
        );
    }

    virtual zero_copy_if::sptr make_stream(const uint32_t stream_num)
    {
        boost::lock_guard<boost::mutex> lock(_mutex);
        if (_streams.size() >= _max_num_streams) {
            throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams.");
        }
        // Only allocate a portion of the base transport's frames to each stream
        // to prevent all streams from attempting to use all the frames.
        stream_impl::sptr stream = boost::make_shared<stream_impl>(
            this->shared_from_this(), stream_num,
            _base_xport->get_num_send_frames() / _max_num_streams,
            _base_xport->get_num_recv_frames() / _max_num_streams);
        _streams[stream_num] = stream;
        return stream;
    }

    virtual size_t get_num_dropped_frames() const
    {
        return _num_dropped_frames;
    }

    void remove_stream(const uint32_t stream_num)
    {
        boost::lock_guard<boost::mutex> lock(_mutex);
        _streams.erase(stream_num);
    }

private:
    /*
     * @class stream_mrb is used to copy the data and release the original
     * managed receive buffer back to the base transport.
     */
    class stream_mrb : public managed_recv_buffer
    {
    public:
        stream_mrb(size_t size) : _buff(new char[size]) {}

        ~stream_mrb() {
            delete[] _buff;
        }

        void release() {}

        UHD_INLINE sptr get_new(char *buff, size_t len)
        {
            memcpy(_buff, buff, len);
            return make(this, _buff, len);
        }

    private:
        char *_buff;
    };

    class stream_impl : public zero_copy_if
    {
    public:
        typedef boost::shared_ptr<stream_impl> sptr;
        typedef boost::weak_ptr<stream_impl> wptr;

        stream_impl(
            muxed_zero_copy_if_impl::sptr muxed_xport,
            const uint32_t stream_num,
            const size_t num_send_frames,
            const size_t num_recv_frames
            ) :
            _stream_num(stream_num), _muxed_xport(muxed_xport),
            _num_send_frames(num_send_frames),
            _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()),
            _num_recv_frames(num_recv_frames),
            _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()),
            _buff_queue(num_recv_frames),
            _buffers(num_recv_frames),
            _buffer_index(0)
        {
            for (size_t i = 0; i < num_recv_frames; i++) {
                _buffers[i] = boost::make_shared<stream_mrb>(_recv_frame_size);
            }
        }

        ~stream_impl(void)
        {
            //First remove the stream from muxed transport
            //so no more frames are pushed in
            _muxed_xport->remove_stream(_stream_num);
            //Flush the transport
            managed_recv_buffer::sptr buff;
            while (_buff_queue.pop_with_haste(buff)) {
                //NOP
            }
        }

        size_t get_num_recv_frames(void) const {
            return _num_recv_frames;
        }

        size_t get_recv_frame_size(void) const {
            return _recv_frame_size;
        }

        managed_recv_buffer::sptr get_recv_buff(double timeout) {
            managed_recv_buffer::sptr buff;
            if (_buff_queue.pop_with_timed_wait(buff, timeout)) {
                return buff;
            } else {
                return managed_recv_buffer::sptr();
            }
        }

        void push_recv_buff(managed_recv_buffer::sptr buff) {
            _buff_queue.push_with_wait(_buffers.at(_buffer_index++)->get_new(buff->cast<char*>(), buff->size()));
            _buffer_index %= _buffers.size();
        }

        size_t get_num_send_frames(void) const {
            return _num_send_frames;
        }

        size_t get_send_frame_size(void) const {
            return _send_frame_size;
        }

        managed_send_buffer::sptr get_send_buff(double timeout)
        {
            return _muxed_xport->base_xport()->get_send_buff(timeout);
        }

    private:
        const uint32_t                              _stream_num;
        muxed_zero_copy_if_impl::sptr               _muxed_xport;
        const size_t                                _num_send_frames;
        const size_t                                _send_frame_size;
        const size_t                                _num_recv_frames;
        const size_t                                _recv_frame_size;
        bounded_buffer<managed_recv_buffer::sptr>   _buff_queue;
        std::vector< boost::shared_ptr<stream_mrb> >    _buffers;
        size_t                                      _buffer_index;
    };

    inline zero_copy_if::sptr& base_xport() { return _base_xport; }

    void _update_queues()
    {
        //Run forever:
        // - Pull packets from the base transport
        // - Classify them
        // - Push them to the appropriate receive queue
        while (true) {
            {   //Uninterruptable block of code
                boost::this_thread::disable_interruption interrupt_disabler;
                if (not _process_next_buffer()) {
                    //Be a good citizen and yield if no packet is processed
                    static const size_t MIN_DUR = 1;
                    boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
                    //We call sleep(MIN_DUR) above instead of yield() to ensure that we
                    //relinquish the current scheduler time slot.
                    //yield() is a hint to the scheduler to end the time
                    //slice early and schedule in another thread that is ready to run.
                    //However in most situations, there will be no other thread and
                    //this thread will continue to run which will rail a CPU core.
                    //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time.
                    //Ideally we would like to use boost::chrono::.*seconds::min() but that
                    //is bound to 0, which causes the sleep_for call to be a no-op and
                    //thus useless to actually force a sleep.

                    //****************************************************************
                    //NOTE: This behavior makes this transport a poor choice for
                    //      low latency communication.
                    //****************************************************************
                }
            }
            //Check if the master thread has requested a shutdown
            if (boost::this_thread::interruption_requested()) break;
        }
    }

    bool _process_next_buffer()
    {
        managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0);
        if (buff) {
            stream_impl::sptr stream;
            try {
                const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size());
                {
                    //Hold the stream mutex long enough to pull a bounded buffer
                    //and lock it (increment its ref count).
                    boost::lock_guard<boost::mutex> lock(_mutex);
                    stream_map_t::iterator str_iter = _streams.find(stream_num);
                    if (str_iter != _streams.end()) {
                        stream = (*str_iter).second.lock();
                    }
                }
            } catch (std::exception&) {
                //If _classify throws we simply drop the frame
            }
            //Once a bounded buffer is acquired, we can rely on its
            //thread safety to serialize with the consumer.
            if (stream.get()) {
                stream->push_recv_buff(buff);
            } else {
                boost::lock_guard<boost::mutex> lock(_mutex);
                _num_dropped_frames++;
            }
            //We processed a packet, and there could be more coming
            //Don't yield in the next iteration.
            return true;
        } else {
            //The base transport is idle. Return false to let the
            //thread yield.
            return false;
        }
    }

    typedef std::map<uint32_t, stream_impl::wptr> stream_map_t;

    zero_copy_if::sptr      _base_xport;
    stream_classifier_fn    _classify;
    stream_map_t            _streams;
    const size_t            _max_num_streams;
    size_t                  _num_dropped_frames;
    boost::thread           _recv_thread;
    boost::mutex            _mutex;
};

muxed_zero_copy_if::sptr muxed_zero_copy_if::make(
    zero_copy_if::sptr base_xport,
    muxed_zero_copy_if::stream_classifier_fn classify_fn,
    size_t max_streams
) {
    return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams);
}