diff options
Diffstat (limited to 'CircBufferTimestamp.hpp')
-rw-r--r-- | CircBufferTimestamp.hpp | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/CircBufferTimestamp.hpp b/CircBufferTimestamp.hpp new file mode 100644 index 0000000..b6e0751 --- /dev/null +++ b/CircBufferTimestamp.hpp @@ -0,0 +1,172 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2018 Matthias P. Braendli + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include <vector> +#include <mutex> +#include <condition_variable> +#include <cmath> + +template<class T> +class CircularBuffer +{ + public: + CircularBuffer(size_t max_bufsize) : + m_maxbufsize(max_bufsize) + { + } + + void set_ticks_per_sample(long long tps) + { + m_ticks_per_sample = tps; + } + + /* Write num_elems from buf to the internal buffer, at the position + * given by timestamp + */ + void write(const T *buf, size_t num_elems, long long timestamp) + { + std::unique_lock<std::mutex> lock(m_mutex); + + if (m_timestamp_buf == 0 or timestamp < m_timestamp_buf) { + // First time we write to our buf, or jump back in time, + // we reset our buffer + resetbuf(); + m_timestamp_buf = timestamp; + } + + size_t sample_offset = (timestamp - m_timestamp_buf) / + m_ticks_per_sample; + size_t minsize = sample_offset + num_elems; + + if (minsize > m_maxbufsize) { + truncatebuf(); + if (minsize > m_maxbufsize) { + resetbuf(); + m_timestamp_buf = timestamp; + sample_offset = 0; + minsize = num_elems; + } + } + + if (m_buf.size() < minsize) { + m_buf.resize(minsize); + } + std::copy(buf, buf + num_elems, m_buf.begin() + sample_offset); + + lock.unlock(); + m_cv.notify_all(); + } + + /* Read num_elems elements from the internal buffer, starting + * from the end of the last read. Returns number of elements written */ + size_t read(T *buf, size_t num_elems) + { + std::unique_lock<std::mutex> lock(m_mutex); + + while (m_read_offset + num_elems > m_buf.size()) { + m_cv.wait(lock); + } + + std::copy(m_buf.begin() + m_read_offset, + m_buf.begin() + m_read_offset + num_elems, + buf); + + m_read_offset += num_elems; + return num_elems; + } + + /* Read num_elems elements from the internal buffer at the + * position corresponding to the timestamp, which is also + * updated. + * Returns number of elements written. + */ + size_t read(T *buf, size_t num_elems, long long *timestamp) + { + std::unique_lock<std::mutex> lock(m_mutex); + + if (*timestamp < m_timestamp_buf) { + // Cannot give samples earlier than the timestamp + return 0; + } + + const size_t sample_offset = + std::llrint( + (double)(*timestamp - m_timestamp_buf) / + (double)m_ticks_per_sample); + const size_t minsize = sample_offset + num_elems; + + if (minsize > m_maxbufsize) { + // Too far in the future + return 0; + } + + while (minsize > m_buf.size()) { + m_cv.wait(lock); + } + + std::copy(m_buf.begin() + sample_offset, + m_buf.begin() + sample_offset + num_elems, + buf); + + *timestamp = m_timestamp_buf + sample_offset * m_ticks_per_sample; + m_read_offset = sample_offset + num_elems; + return num_elems; + } + + private: + void resetbuf() + { + m_buf.clear(); + m_timestamp_buf = 0; + m_read_offset = 0; + } + + void truncatebuf() + { + if (m_buf.size() < m_read_offset) { + throw std::logic_error("truncate " + + std::to_string(m_buf.size()) + " " + + std::to_string(m_read_offset)); + } + + for (size_t r = m_read_offset, w = 0; + r < m_buf.size(); + r++, w++) { + m_buf[w] = m_buf[r]; + } + m_read_offset = 0; + m_buf.resize(m_buf.size() - m_read_offset); + } + + std::condition_variable m_cv; + std::mutex m_mutex; + std::vector<T> m_buf; + long long m_timestamp_buf = 0; + long long m_ticks_per_sample = 0; + size_t m_maxbufsize = 0; + + size_t m_read_offset = 0; +}; |