/* * The MIT License (MIT) * * Copyright (c) 2018 Matthias P. Braendli * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #pragma once #include #include #include #include template class CircularBuffer { public: CircularBuffer(size_t max_bufsize) : m_maxbufsize(max_bufsize) { } void set_ticks_per_sample(long long tps) { m_ticks_per_sample = tps; } /* Write num_elems from buf to the internal buffer, at the position * given by timestamp */ void write(const T *buf, size_t num_elems, long long timestamp) { std::unique_lock lock(m_mutex); if (m_timestamp_buf == 0 or timestamp < m_timestamp_buf) { // First time we write to our buf, or jump back in time, // we reset our buffer resetbuf(); m_timestamp_buf = timestamp; } size_t sample_offset = (timestamp - m_timestamp_buf) / m_ticks_per_sample; size_t minsize = sample_offset + num_elems; if (minsize > m_maxbufsize) { truncatebuf(); if (minsize > m_maxbufsize) { resetbuf(); m_timestamp_buf = timestamp; sample_offset = 0; minsize = num_elems; } } if (m_buf.size() < minsize) { m_buf.resize(minsize); } std::copy(buf, buf + num_elems, m_buf.begin() + sample_offset); lock.unlock(); m_cv.notify_all(); } /* Read num_elems elements from the internal buffer, starting * from the end of the last read. Returns number of elements written */ size_t read(T *buf, size_t num_elems) { std::unique_lock lock(m_mutex); while (m_read_offset + num_elems > m_buf.size()) { m_cv.wait(lock); } std::copy(m_buf.begin() + m_read_offset, m_buf.begin() + m_read_offset + num_elems, buf); m_read_offset += num_elems; return num_elems; } /* Read num_elems elements from the internal buffer at the * position corresponding to the timestamp, which is also * updated. * Returns number of elements written. */ size_t read(T *buf, size_t num_elems, long long *timestamp) { std::unique_lock lock(m_mutex); if (*timestamp < m_timestamp_buf) { // Cannot give samples earlier than the timestamp return 0; } const size_t sample_offset = std::llrint( (double)(*timestamp - m_timestamp_buf) / (double)m_ticks_per_sample); const size_t minsize = sample_offset + num_elems; if (minsize > m_maxbufsize) { // Too far in the future return 0; } while (minsize > m_buf.size()) { m_cv.wait(lock); } std::copy(m_buf.begin() + sample_offset, m_buf.begin() + sample_offset + num_elems, buf); *timestamp = m_timestamp_buf + sample_offset * m_ticks_per_sample; m_read_offset = sample_offset + num_elems; return num_elems; } private: void resetbuf() { m_buf.clear(); m_timestamp_buf = 0; m_read_offset = 0; } void truncatebuf() { if (m_buf.size() < m_read_offset) { throw std::logic_error("truncate " + std::to_string(m_buf.size()) + " " + std::to_string(m_read_offset)); } for (size_t r = m_read_offset, w = 0; r < m_buf.size(); r++, w++) { m_buf[w] = m_buf[r]; } m_read_offset = 0; m_buf.resize(m_buf.size() - m_read_offset); } std::condition_variable m_cv; std::mutex m_mutex; std::vector m_buf; long long m_timestamp_buf = 0; long long m_ticks_per_sample = 0; size_t m_maxbufsize = 0; size_t m_read_offset = 0; };