aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/super_recv_packet_handler.hpp
blob: 5080182d6a295bbd111a67426fd2d243ad68dffe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
//
// Copyright 2011-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
//

#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP

#include <uhd/config.hpp>
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
#include <boost/dynamic_bitset.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread/barrier.hpp>
#include <iostream>
#include <vector>

namespace uhd{ namespace transport{ namespace sph{

UHD_INLINE boost::uint32_t get_context_code(
    const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info
){
    //extract the context word (we dont know the endianness so mirror the bytes)
    boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] |
              uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
    return word0 & 0xff;
}

typedef boost::function<void(void)> handle_overflow_type;
static inline void handle_overflow_nop(void){}

/***********************************************************************
 * Super receive packet handler
 *
 * A receive packet handler represents a group of channels.
 * The channel group shares a common sample rate.
 * All channels are received in unison in recv().
 **********************************************************************/
class recv_packet_handler{
public:
    typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
    typedef boost::function<void(const size_t)> handle_flowctrl_type;
    typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
    typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
    //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;

    /*!
     * Make a new packet handler for receive
     * \param size the number of transport channels
     */
    recv_packet_handler(const size_t size = 1):
        _queue_error_for_next_call(false),
        _buffers_infos_index(0)
    {
        this->resize(size);
        set_alignment_failure_threshold(1000);
    }

    ~recv_packet_handler(void){
        _task_barrier.interrupt();
        _task_handlers.clear();
    }

    //! Resize the number of transport channels
    void resize(const size_t size){
        if (this->size() == size) return;
        _task_handlers.clear();
        _props.resize(size);
        //re-initialize all buffers infos by re-creating the vector
        _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
        _task_barrier.resize(size);
        _task_handlers.resize(size);
        for (size_t i = 1/*skip 0*/; i < size; i++){
            _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
        };
    }

    //! Get the channel width of this handler
    size_t size(void) const{
        return _props.size();
    }

    //! Setup the vrt unpacker function and offset
    void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){
        _vrt_unpacker = vrt_unpacker;
        _header_offset_words32 = header_offset_words32;
    }

    /*!
     * Set the threshold for alignment failure.
     * How many packets throw out before giving up?
     * \param threshold number of packets per channel
     */
    void set_alignment_failure_threshold(const size_t threshold){
        _alignment_faulure_threshold = threshold*this->size();
    }

    //! Set the rate of ticks per second
    void set_tick_rate(const double rate){
        _tick_rate = rate;
    }

    //! Set the rate of samples per second
    void set_samp_rate(const double rate){
        _samp_rate = rate;
    }

    /*!
     * Set the function to get a managed buffer.
     * \param xport_chan which transport channel
     * \param get_buff the getter function
     */
    void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff, const bool flush = false){
        if (flush){
            while (get_buff(0.0));
        }
        _props.at(xport_chan).get_buff = get_buff;
    }

    /*!
     * Set the function to handle flow control
     * \param xport_chan which transport channel
     * \param handle_flowctrl the callback function
     */
    void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
    {
        _props.at(xport_chan).handle_flowctrl = handle_flowctrl;
        //we need the window size to be within the 0xfff (max 12 bit seq)
        _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
        if (do_init) handle_flowctrl(0);
    }

    //! Set the conversion routine for all channels
    void set_converter(const uhd::convert::id_type &id){
        _num_outputs = id.num_outputs;
        _converter = uhd::convert::get_converter(id)();
        this->set_scale_factor(1/32767.); //update after setting converter
        _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
        _bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.output_format);
    }

    //! Set the transport channel's overflow handler
    void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){
        _props.at(xport_chan).handle_overflow = handle_overflow;
    }

    //! Set the scale factor used in float conversion
    void set_scale_factor(const double scale_factor){
        _converter->set_scalar(scale_factor);
    }

    //! Set the callback to issue stream commands
    void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
    {
        _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
    }

    //! Overload call to issue stream commands
    void issue_stream_cmd(const stream_cmd_t &stream_cmd)
    {
        for (size_t i = 0; i < _props.size(); i++)
        {
            if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
        }
    }

    /*******************************************************************
     * Receive:
     * The entry point for the fast-path receive calls.
     * Dispatch into combinations of single packet receive calls.
     ******************************************************************/
    UHD_INLINE size_t recv(
        const uhd::rx_streamer::buffs_type &buffs,
        const size_t nsamps_per_buff,
        uhd::rx_metadata_t &metadata,
        const double timeout,
        const bool one_packet
    ){
        //handle metadata queued from a previous receive
        if (_queue_error_for_next_call){
            _queue_error_for_next_call = false;
            metadata = _queue_metadata;
            //We want to allow a full buffer recv to be cut short by a timeout,
            //but do not want to generate an inline timeout message packet.
            if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0;
        }

        size_t accum_num_samps = recv_one_packet(
            buffs, nsamps_per_buff, metadata, timeout
        );

        if (one_packet) return accum_num_samps;

        //first recv had an error code set, return immediately
        if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;

        //loop until buffer is filled or error code
        while(accum_num_samps < nsamps_per_buff){
            size_t num_samps = recv_one_packet(
                buffs, nsamps_per_buff - accum_num_samps, _queue_metadata,
                timeout, accum_num_samps*_bytes_per_cpu_item
            );

            //metadata had an error code set, store for next call and return
            if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){
                _queue_error_for_next_call = true;
                break;
            }
            accum_num_samps += num_samps;
        }
        return accum_num_samps;
    }

private:

    vrt_unpacker_type _vrt_unpacker;
    size_t _header_offset_words32;
    double _tick_rate, _samp_rate;
    bool _queue_error_for_next_call;
    size_t _alignment_faulure_threshold;
    rx_metadata_t _queue_metadata;
    struct xport_chan_props_type{
        xport_chan_props_type(void):
            packet_count(0),
            handle_overflow(&handle_overflow_nop),
            fc_update_window(0)
        {}
        get_buff_type get_buff;
        issue_stream_cmd_type issue_stream_cmd;
        size_t packet_count;
        handle_overflow_type handle_overflow;
        handle_flowctrl_type handle_flowctrl;
        size_t fc_update_window;
    };
    std::vector<xport_chan_props_type> _props;
    size_t _num_outputs;
    size_t _bytes_per_otw_item; //used in conversion
    size_t _bytes_per_cpu_item; //used in conversion
    uhd::convert::converter::sptr _converter; //used in conversion

    //! information stored for a received buffer
    struct per_buffer_info_type{
        managed_recv_buffer::sptr buff;
        const boost::uint32_t *vrt_hdr;
        vrt::if_packet_info_t ifpi;
        time_spec_t time;
        const char *copy_buff;
    };

    //!information stored for a set of aligned buffers
    struct buffers_info_type : std::vector<per_buffer_info_type> {
        buffers_info_type(const size_t size):
            std::vector<per_buffer_info_type>(size),
            indexes_todo(size, true),
            alignment_time_valid(false),
            data_bytes_to_copy(0),
            fragment_offset_in_samps(0)
        {/* NOP */}
        boost::dynamic_bitset<> indexes_todo; //used in alignment logic
        time_spec_t alignment_time; //used in alignment logic
        bool alignment_time_valid; //used in alignment logic
        size_t data_bytes_to_copy; //keeps track of state
        size_t fragment_offset_in_samps; //keeps track of state
        rx_metadata_t metadata; //packet description
    };

    //! a circular queue of buffer infos
    std::vector<buffers_info_type> _buffers_infos;
    size_t _buffers_infos_index;
    buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];}
    buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];}
    buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];}
    void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;}

    //! possible return options for the packet receiver
    enum packet_type{
        PACKET_IF_DATA,
        PACKET_TIMESTAMP_ERROR,
        PACKET_INLINE_MESSAGE,
        PACKET_TIMEOUT_ERROR,
        PACKET_SEQUENCE_ERROR
    };

    /*******************************************************************
     * Get and process a single packet from the transport:
     * Receive a single packet at the given index.
     * Extract all the relevant info and store.
     * Check the info to determine the return code.
     ******************************************************************/
    UHD_INLINE packet_type get_and_process_single_packet(
        const size_t index,
        buffers_info_type &prev_buffer_info,
        buffers_info_type &curr_buffer_info,
        double timeout
    ){
        //get a single packet from the transport layer
        managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff;
        buff = _props[index].get_buff(timeout);
        if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;

        //bounds check before extract
        size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
        if (num_packet_words32 <= _header_offset_words32){
            throw std::runtime_error("recv buffer smaller than vrt packet offset");
        }

        //extract packet info
        per_buffer_info_type &info = curr_buffer_info[index];
        info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
        info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
        _vrt_unpacker(info.vrt_hdr, info.ifpi);
        info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
        info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);

        //handle flow control
        if (_props[index].handle_flowctrl)
        {
            if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0)
            {
                _props[index].handle_flowctrl(info.ifpi.packet_count);
            }
        }

        //--------------------------------------------------------------
        //-- Determine return conditions:
        //-- The order of these checks is HOLY.
        //--------------------------------------------------------------

        //1) check for inline IF message packets
        if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
            return PACKET_INLINE_MESSAGE;
        }

        //2) check for sequence errors
        #ifndef SRPH_DONT_CHECK_SEQUENCE
        const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
        const size_t expected_packet_count = _props[index].packet_count;
        _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
        if (expected_packet_count != info.ifpi.packet_count){
            return PACKET_SEQUENCE_ERROR;
        }
        #endif

        //3) check for out of order timestamps
        if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
            return PACKET_TIMESTAMP_ERROR;
        }

        //4) otherwise the packet is normal!
        return PACKET_IF_DATA;
    }

    /*******************************************************************
     * Alignment check:
     * Check the received packet for alignment and mark accordingly.
     ******************************************************************/
    UHD_INLINE void alignment_check(
        const size_t index, buffers_info_type &info
    ){
        //if alignment time was not valid or if the sequence id is newer:
        //  use this index's time as the alignment time
        //  reset the indexes list and remove this index
        if (not info.alignment_time_valid or info[index].time > info.alignment_time){
            info.alignment_time_valid = true;
            info.alignment_time = info[index].time;
            info.indexes_todo.set();
            info.indexes_todo.reset(index);
            info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes;
        }

        //if the sequence id matches:
        //  remove this index from the list and continue
        else if (info[index].time == info.alignment_time){
            info.indexes_todo.reset(index);
        }

        //if the sequence id is older:
        //  continue with the same index to try again
        //else if (info[index].time < info.alignment_time)...
    }

    /*******************************************************************
     * Get aligned buffers:
     * Iterate through each index and try to accumulate aligned buffers.
     * Handle all of the edge cases like inline messages and errors.
     * The logic will throw out older packets until it finds a match.
     ******************************************************************/
    UHD_INLINE void get_aligned_buffs(double timeout){

        increment_buffer_info(); //increment to next buffer
        buffers_info_type &prev_info = get_prev_buffer_info();
        buffers_info_type &curr_info = get_curr_buffer_info();
        buffers_info_type &next_info = get_next_buffer_info();

        //Loop until we get a message of an aligned set of buffers:
        // - Receive a single packet and extract its info.
        // - Handle the packet type yielded by the receive.
        // - Check the timestamps for alignment conditions.
        size_t iterations = 0;
        while (curr_info.indexes_todo.any()){

            //get the index to process for this iteration
            const size_t index = curr_info.indexes_todo.find_first();
            packet_type packet;

            //receive a single packet from the transport
            try{
                packet = get_and_process_single_packet(
                    index, prev_info, curr_info, timeout
                );
            }

            //handle the case when the get packet throws
            catch(const std::exception &e){
                UHD_MSG(error) << boost::format(
                    "The receive packet handler caught an exception.\n%s"
                ) % e.what() << std::endl;
                std::swap(curr_info, next_info); //save progress from curr -> next
                curr_info.metadata.has_time_spec = false;
                curr_info.metadata.time_spec = time_spec_t(0.0);
                curr_info.metadata.more_fragments = false;
                curr_info.metadata.fragment_offset = 0;
                curr_info.metadata.start_of_burst = false;
                curr_info.metadata.end_of_burst = false;
                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
                return;
            }

            switch(packet){
            case PACKET_IF_DATA:
                alignment_check(index, curr_info);
                break;

            case PACKET_TIMESTAMP_ERROR:
                //If the user changes the device time while streaming or without flushing,
                //we can receive a packet that comes before the previous packet in time.
                //This could cause the alignment logic to discard future received packets.
                //Therefore, when this occurs, we reset the info to restart from scratch.
                if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){
                    curr_info.alignment_time_valid = false;
                }
                alignment_check(index, curr_info);
                break;

            case PACKET_INLINE_MESSAGE:
                std::swap(curr_info, next_info); //save progress from curr -> next
                curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
                curr_info.metadata.time_spec = next_info[index].time;
                curr_info.metadata.more_fragments = false;
                curr_info.metadata.fragment_offset = 0;
                curr_info.metadata.start_of_burst = false;
                curr_info.metadata.end_of_burst = false;
                curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
                if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
                    _props[index].handle_overflow();
                    UHD_MSG(fastpath) << "O";
                }
                return;

            case PACKET_TIMEOUT_ERROR:
                std::swap(curr_info, next_info); //save progress from curr -> next
                curr_info.metadata.has_time_spec = false;
                curr_info.metadata.time_spec = time_spec_t(0.0);
                curr_info.metadata.more_fragments = false;
                curr_info.metadata.fragment_offset = 0;
                curr_info.metadata.start_of_burst = false;
                curr_info.metadata.end_of_burst = false;
                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
                return;

            case PACKET_SEQUENCE_ERROR:
                alignment_check(index, curr_info);
                std::swap(curr_info, next_info); //save progress from curr -> next
                curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
                curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks(
                    prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate);
                curr_info.metadata.more_fragments = false;
                curr_info.metadata.fragment_offset = 0;
                curr_info.metadata.start_of_burst = false;
                curr_info.metadata.end_of_burst = false;
                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
                UHD_MSG(fastpath) << "D";
                return;

            }

            //too many iterations: detect alignment failure
            if (iterations++ > _alignment_faulure_threshold){
                UHD_MSG(error) << boost::format(
                    "The receive packet handler failed to time-align packets.\n"
                    "%u received packets were processed by the handler.\n"
                    "However, a timestamp match could not be determined.\n"
                ) % iterations << std::endl;
                std::swap(curr_info, next_info); //save progress from curr -> next
                curr_info.metadata.has_time_spec = false;
                curr_info.metadata.time_spec = time_spec_t(0.0);
                curr_info.metadata.more_fragments = false;
                curr_info.metadata.fragment_offset = 0;
                curr_info.metadata.start_of_burst = false;
                curr_info.metadata.end_of_burst = false;
                curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
                _props[index].handle_overflow();
                return;
            }

        }

        //set the metadata from the buffer information at index zero
        curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf;
        curr_info.metadata.time_spec = curr_info[0].time;
        curr_info.metadata.more_fragments = false;
        curr_info.metadata.fragment_offset = 0;
        curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob;
        curr_info.metadata.end_of_burst = curr_info[0].ifpi.eob;
        curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;

    }

    /*******************************************************************
     * Receive a single packet:
     * Handles fragmentation, messages, errors, and copy-conversion.
     * When no fragments are available, call the get aligned buffers.
     * Then copy-convert available data into the user's IO buffers.
     ******************************************************************/
    UHD_INLINE size_t recv_one_packet(
        const uhd::rx_streamer::buffs_type &buffs,
        const size_t nsamps_per_buff,
        uhd::rx_metadata_t &metadata,
        const double timeout,
        const size_t buffer_offset_bytes = 0
    ){
        //get the next buffer if the current one has expired
        if (get_curr_buffer_info().data_bytes_to_copy == 0){

            //reset current buffer info members for reuse
            get_curr_buffer_info().fragment_offset_in_samps = 0;
            get_curr_buffer_info().alignment_time_valid = false;
            get_curr_buffer_info().indexes_todo.set();

            //perform receive with alignment logic
            get_aligned_buffs(timeout);
        }

        buffers_info_type &info = get_curr_buffer_info();
        metadata = info.metadata;

        //interpolate the time spec (useful when this is a fragment)
        metadata.time_spec += time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate);

        //extract the number of samples available to copy
        const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
        const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
        const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
        const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;

        //setup the data to share with converter threads
        _convert_nsamps = nsamps_to_copy_per_io_buff;
        _convert_buffs = &buffs;
        _convert_buffer_offset_bytes = buffer_offset_bytes;
        _convert_bytes_to_copy = bytes_to_copy;

        //perform N channels of conversion
        converter_thread_task(0);

        //update the copy buffer's availability
        info.data_bytes_to_copy -= bytes_to_copy;

        //setup the fragment flags and offset
        metadata.more_fragments = info.data_bytes_to_copy != 0;
        metadata.fragment_offset = info.fragment_offset_in_samps;
        info.fragment_offset_in_samps += nsamps_to_copy; //set for next call

        return nsamps_to_copy_per_io_buff;
    }

    /*******************************************************************
     * Perform one thread's work of the conversion task.
     * The entry and exit use a dual synchronization barrier,
     * to wait for data to become ready and block until completion.
     ******************************************************************/
    UHD_INLINE void converter_thread_task(const size_t index)
    {
        _task_barrier.wait();

        //shortcut references to local data structures
        buffers_info_type &buff_info = get_curr_buffer_info();
        per_buffer_info_type &info = buff_info[index];
        const rx_streamer::buffs_type &buffs = *_convert_buffs;

        //fill IO buffs with pointers into the output buffer
        void *io_buffs[4/*max interleave*/];
        for (size_t i = 0; i < _num_outputs; i++){
            char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
            io_buffs[i] = b + _convert_buffer_offset_bytes;
        }
        const ref_vector<void *> out_buffs(io_buffs, _num_outputs);

        //perform the conversion operation
        _converter->conv(info.copy_buff, out_buffs, _convert_nsamps);

        //advance the pointer for the source buffer
        info.copy_buff += _convert_bytes_to_copy;

        //release the buffer if fully consumed
        if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
            info.buff.reset(); //effectively a release
        }

        if (index == 0) _task_barrier.wait_others();
    }

    //! Shared variables for the worker threads
    reusable_barrier _task_barrier;
    std::vector<task::sptr> _task_handlers;
    size_t _convert_nsamps;
    const rx_streamer::buffs_type *_convert_buffs;
    size_t _convert_buffer_offset_bytes;
    size_t _convert_bytes_to_copy;

};

class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
public:
    recv_packet_streamer(const size_t max_num_samps){
        _max_num_samps = max_num_samps;
    }

    size_t get_num_channels(void) const{
        return this->size();
    }

    size_t get_max_num_samps(void) const{
        return _max_num_samps;
    }

    size_t recv(
        const rx_streamer::buffs_type &buffs,
        const size_t nsamps_per_buff,
        uhd::rx_metadata_t &metadata,
        const double timeout,
        const bool one_packet
    ){
        return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
    }

    void issue_stream_cmd(const stream_cmd_t &stream_cmd)
    {
        return recv_packet_handler::issue_stream_cmd(stream_cmd);
    }

private:
    size_t _max_num_samps;
};

}}} //namespace

#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */