aboutsummaryrefslogtreecommitdiffstats
path: root/src/SampleQueue.h
blob: eabc301b6b369ff882cad40db5158696a755ad2b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
   Copyright (C) 2013, 2014
   Matthias P. Braendli, matthias.braendli@mpb.li

   An implementation for a threadsafe queue using boost thread library
   for audio samples.
*/

#ifndef _SAMPLE_QUEUE_H_
#define _SAMPLE_QUEUE_H_

#include <boost/thread.hpp>
#include <queue>

#include <stdio.h>

/* This queue is meant to be used by two threads. One producer
 * that pushes elements into the queue, and one consumer that
 * retrieves the elements.
 *
 * The queue can make the consumer block until enough elements
 * are available.
 */

template<typename T>
class SampleQueue
{
public:
    SampleQueue(unsigned int bytes_per_sample,
            unsigned int channels,
            size_t max_size) :
        m_bytes_per_sample(bytes_per_sample),
        m_channels(channels), m_max_size(max_size) {}


    /* Push a bunch of samples into the buffer */
    size_t push(const T *val, size_t len)
    {
        boost::mutex::scoped_lock lock(m_mutex);

        if (m_queue.size() >= m_max_size) {
            /*fprintf(stderr, "######## push overrun %zu, %zu\n",
                    len,
                    m_queue.size()); // */
            return 0;
        }

        for (size_t i = 0; i < len; i++) {
            m_queue.push_back(val[i]);
        }

        size_t new_size = m_queue.size();
        lock.unlock();

        //m_condition_variable.notify_one();

        return new_size;
    }

    size_t size() const
    {
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.size();
    }

    /* Get len elements, place them into the buf array
     * Returns the number of elements it was able to take
     * from the queue
     */
    size_t pop(T* buf, size_t len)
    {
        boost::mutex::scoped_lock lock(m_mutex);
        fprintf(stderr, "######## pop %zu (%zu)\n",
                len,
                m_queue.size());

        size_t ret = 0;

        if (m_queue.size() < len) {
            size_t i;
            for (i = 0; i < m_queue.size(); i++) {
                buf[i] = m_queue[i];
            }

            ret = i;

            for (; i < len; i++) {
                buf[i] = 0;
            }

            m_queue.resize(0);
        }
        else {
            for (size_t i = 0; i < len; i++) {
                buf[i] = m_queue[i];
            }

            ret = len;

            m_queue.erase(m_queue.begin(), m_queue.begin() + len);
        }

        return ret;
    }

    /*
    void wait_and_pop(T& popped_value)
    {
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.size() < m_required_size)
        {
            m_condition_variable.wait(lock);
        }

        popped_value = m_queue.front();
        m_queue.pop_front();
    }
    */

private:
    std::deque<T> m_queue;
    mutable boost::mutex m_mutex;
    //boost::condition_variable m_condition_variable;

    unsigned int m_channels;
    unsigned int m_bytes_per_sample;
    size_t m_max_size;
};

#endif