aboutsummaryrefslogtreecommitdiffstats
path: root/src/SampleQueue.h
blob: aeeb8d4ee127c16d7499d68f2ae8add818d56263 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
 * Copyright (C) 2018 Matthias P. Braendli
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * Matthias P. Braendli, matthias.braendli@mpb.li
 */

/*!
 * \file SampleQueue.h
 * \brief An implementation for a threadsafe queue using the C++11 thread library
 * for audio samples.
 */

#ifndef _SAMPLE_QUEUE_H_
#define _SAMPLE_QUEUE_H_

#define DEBUG_SAMPLE_QUEUE 0

#include <mutex>
#include <thread>
#include <chrono>
#include <condition_variable>
#include <queue>
#include <cassert>
#include <sstream>
#include <cstdio>
#include <cmath>

/*! This queue is meant to be used by two threads. One producer
 * that pushes elements into the queue, and one consumer that
 * retrieves the elements.
 *
 * This queue should contain audio sample data, interleaved L/R
 * form, 2bytes per sample. Therefore, the push and pop functions
 * should always place or retrieve data in multiples of
 * bytes_per_sample * number_of_channels
 *
 * The queue has a maximum size. If this size is reached, push()
 * either blocks or ignores new data, depending on drift_compensation.
 *
 * If pop() is called but there is not enough data in the queue,
 * the missing samples are replaced by zeros. pop() will always
 * write the requested length.
 */


/* The template is actually not really tested for anything else
 * than uint8_t
 */
template<typename T>
class SampleQueue
{
public:
    SampleQueue(unsigned int bytes_per_sample,
            unsigned int channels,
            size_t max_size,
            bool drift_compensation) :
        m_bytes_per_sample(bytes_per_sample),
        m_channels(channels),
        m_max_size(max_size),
        m_push_block(not drift_compensation),
        m_overruns(0) {}


    /*! Push a bunch of samples into the buffer
     *
     * \return size of the queue after the push
     */
    size_t push(const T *val, size_t len)
    {
        size_t new_size = 0;

        {
            std::unique_lock<std::mutex> lock(m_mutex);

            assert(len % (m_channels * m_bytes_per_sample) == 0);

#if DEBUG_SAMPLE_QUEUE
            fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
                    (m_queue.size() >= m_max_size) ? "overrun" : "ok",
                    len / 4,
                    m_queue.size() / 4,
                    m_max_size / 4);
#endif

            if (m_push_block) {
                while (len) {
                    const size_t available = m_max_size - m_queue.size();
                    const size_t copy_len = std::min(available, len);

                    if (copy_len > 0) {
                        std::copy(val, val + copy_len, std::back_inserter(m_queue));
                        len -= copy_len;
                        val += copy_len;
                    }
                    else {
                        const auto wait_timeout = std::chrono::milliseconds(100);
                        m_pop_notification.wait_for(lock, wait_timeout);
                    }
                }
            }
            else {
                if (m_queue.size() < m_max_size) {
                    std::copy(val, val + len, std::back_inserter(m_queue));
                }
                else {
                    m_overruns++;
                }
            }

            new_size = m_queue.size();
        }

        m_push_notification.notify_all();

        return new_size;
    }

    size_t size() const
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_queue.size();
    }

    /*! Wait until len elements in the queue are available,
     * and then fill the buf. If the timeout_ms (expressed in milliseconds
     * expires), fill the available number of elements.
     * Also update the overrun variable with the information
     * of how many overruns we saw since the last pop.
     *
     * \return the number of elemets written into buf
     */
    size_t pop_wait(T* buf, size_t len, int timeout_ms, size_t* overruns = NULL)
    {
        assert(len % (m_channels * m_bytes_per_sample) == 0);

#if DEBUG_SAMPLE_QUEUE
        fprintf(stdout, "######## pop_wait %zu\n", len);
#endif
        std::unique_lock<std::mutex> lock(m_mutex);

        if (overruns) {
            *overruns = m_overruns;
            m_overruns = 0;
        }

        auto time_start = std::chrono::steady_clock::now();
        const auto timeout = std::chrono::milliseconds(timeout_ms);

        do {
            const auto wait_timeout = std::chrono::milliseconds(10);
            m_push_notification.wait_for(lock, wait_timeout);

#if DEBUG_SAMPLE_QUEUE
                fprintf(stdout, "######## pop_wait %zu need %zu\n",
                        m_queue.size(), len);
#endif

            if (std::chrono::steady_clock::now() - time_start > timeout) {
#if DEBUG_SAMPLE_QUEUE
                fprintf(stdout, "######## pop_wait timeout\n");
#endif
                break;
            }
        } while (m_queue.size() < len);

        size_t num_to_copy = (m_queue.size() < len) ?
            m_queue.size() : len;

        std::copy(
                m_queue.begin(),
                m_queue.begin() + num_to_copy,
                buf);

        m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy);

        lock.unlock();

#if DEBUG_SAMPLE_QUEUE
        fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy);
#endif

        m_pop_notification.notify_all();
        return num_to_copy;
    }

    /*! Get up to len elements, place them into the buf array
     *
     * \return the number of elements it was able to take
     * from the queue
     */
    size_t pop(T* buf, size_t len)
    {
        size_t ovr;
        return pop(buf, len, ovr);
    }

    /*! Get up to len elements, place them into the buf array.
     * Also update the overrun variable with the information
     * of how many overruns we saw since the last pop.
     *
     * \return the number of elements it was able to take
     * from the queue
     */
    size_t pop(T* buf, size_t len, size_t* overruns)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        assert(len % (m_channels * m_bytes_per_sample) == 0);

#if DEBUG_SAMPLE_QUEUE
        fprintf(stdout, "######## pop %zu (%zu), %zu overruns: ",
                len / 4,
                m_queue.size() / 4,
                m_overruns);
#endif
        *overruns = m_overruns;
        m_overruns = 0;

        size_t ret = 0;

        if (m_queue.size() < len) {
            /* Not enough data in queue, fill with zeros */

            size_t i;
            for (i = 0; i < m_queue.size(); i++) {
                buf[i] = m_queue[i];
            }

            ret = i;

            for (; i < len; i++) {
                buf[i] = 0;
            }

            m_queue.resize(0);

#if DEBUG_SAMPLE_QUEUE
            fprintf(stdout, "after short pop %zu (%zu)\n",
                len / 4,
                m_queue.size() / 4);
#endif
        }
        else {
            /* Queue contains enough data */

            for (size_t i = 0; i < len; i++) {
                buf[i] = m_queue[i];
            }

            ret = len;

            m_queue.erase(m_queue.begin(), m_queue.begin() + len);

#if DEBUG_SAMPLE_QUEUE
            fprintf(stdout, "after ok pop %zu (%zu)\n",
                len / 4,
                m_queue.size() / 4);
#endif
        }

        m_pop_notification.notify_all();
        return ret;
    }

private:
    std::deque<T> m_queue;
    mutable std::mutex m_mutex;
    std::condition_variable m_push_notification;
    std::condition_variable m_pop_notification;

    unsigned int m_channels;
    unsigned int m_bytes_per_sample;
    size_t m_max_size;
    bool m_push_block;

    /*! Counter to keep track of number of overruns between calls
     * to pop()
     */
    size_t m_overruns;
};

#endif