From 69f65ccc49e4c61091554e2b9be7c01ff35186f2 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 24 Jan 2018 09:46:38 +0100 Subject: Add TX to RX feedback with timestamp support, only CF32 --- CircBufferTimestamp.cpp | 98 +++++++++++++++++++++++++++ CircBufferTimestamp.hpp | 172 ++++++++++++++++++++++++++++++++++++++++++++++++ Dummy_Settings.cpp | 4 ++ Dummy_Streaming.cpp | 83 ++++++++++++----------- SoapyDummy.hpp | 8 ++- 5 files changed, 324 insertions(+), 41 deletions(-) create mode 100644 CircBufferTimestamp.cpp create mode 100644 CircBufferTimestamp.hpp diff --git a/CircBufferTimestamp.cpp b/CircBufferTimestamp.cpp new file mode 100644 index 0000000..ba66e68 --- /dev/null +++ b/CircBufferTimestamp.cpp @@ -0,0 +1,98 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2018 Matthias P. Braendli + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +// Tests for CircBufferTimestamp.hpp + +#include "CircBufferTimestamp.hpp" +#include +#include +#include + +using namespace std; + +const long long ticks_per_sample = 12; +const size_t N_ITER = 40; +const size_t N_SAMPS_PER_BUF = 10; + +CircularBuffer cb(1000000); + +mutex cerr_mutex; + +void writer() +{ + const long long ts_start = 200; + + for (size_t iterations = 0; iterations < N_ITER; iterations++) { + vector b; + for (size_t i = 0; i < N_SAMPS_PER_BUF; i++) { + b.push_back(N_SAMPS_PER_BUF*iterations + i); + } + + const long long ts_b = ts_start + ticks_per_sample * iterations * b.size(); + + cb.write(b.data(), b.size(), ts_b); + { + unique_lock lock(cerr_mutex); + cerr << "Wrote " << b.size() << " at t=" << ts_b << endl; + } + + this_thread::sleep_for(chrono::milliseconds(10)); + } +} + +void reader() +{ + size_t totaldata = 0; + long long req_ts = 450; + while (totaldata < N_ITER * N_SAMPS_PER_BUF / 3) { + long long ts = req_ts; + vector buf(N_SAMPS_PER_BUF/2); + const size_t read = cb.read(buf.data(), buf.size(), &ts); + buf.resize(read); + totaldata += read; + + { + unique_lock lock(cerr_mutex); + cerr << "Read " << totaldata << " so far, ts_req=" << + req_ts << " ts=" << ts << " data:"; + for (const int samp : buf) { + cerr << " " << samp; + } + cerr << endl; + } + req_ts += ticks_per_sample * (N_SAMPS_PER_BUF/2 + 1); + } +} + +int main(int argc, char **argv) +{ + cb.set_ticks_per_sample(ticks_per_sample); + + auto writeThread = thread(writer); + auto readThread = thread(reader); + + writeThread.join(); + readThread.join(); + return 0; +} diff --git a/CircBufferTimestamp.hpp b/CircBufferTimestamp.hpp new file mode 100644 index 0000000..b6e0751 --- /dev/null +++ b/CircBufferTimestamp.hpp @@ -0,0 +1,172 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2018 Matthias P. Braendli + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include +#include +#include +#include + +template +class CircularBuffer +{ + public: + CircularBuffer(size_t max_bufsize) : + m_maxbufsize(max_bufsize) + { + } + + void set_ticks_per_sample(long long tps) + { + m_ticks_per_sample = tps; + } + + /* Write num_elems from buf to the internal buffer, at the position + * given by timestamp + */ + void write(const T *buf, size_t num_elems, long long timestamp) + { + std::unique_lock lock(m_mutex); + + if (m_timestamp_buf == 0 or timestamp < m_timestamp_buf) { + // First time we write to our buf, or jump back in time, + // we reset our buffer + resetbuf(); + m_timestamp_buf = timestamp; + } + + size_t sample_offset = (timestamp - m_timestamp_buf) / + m_ticks_per_sample; + size_t minsize = sample_offset + num_elems; + + if (minsize > m_maxbufsize) { + truncatebuf(); + if (minsize > m_maxbufsize) { + resetbuf(); + m_timestamp_buf = timestamp; + sample_offset = 0; + minsize = num_elems; + } + } + + if (m_buf.size() < minsize) { + m_buf.resize(minsize); + } + std::copy(buf, buf + num_elems, m_buf.begin() + sample_offset); + + lock.unlock(); + m_cv.notify_all(); + } + + /* Read num_elems elements from the internal buffer, starting + * from the end of the last read. Returns number of elements written */ + size_t read(T *buf, size_t num_elems) + { + std::unique_lock lock(m_mutex); + + while (m_read_offset + num_elems > m_buf.size()) { + m_cv.wait(lock); + } + + std::copy(m_buf.begin() + m_read_offset, + m_buf.begin() + m_read_offset + num_elems, + buf); + + m_read_offset += num_elems; + return num_elems; + } + + /* Read num_elems elements from the internal buffer at the + * position corresponding to the timestamp, which is also + * updated. + * Returns number of elements written. + */ + size_t read(T *buf, size_t num_elems, long long *timestamp) + { + std::unique_lock lock(m_mutex); + + if (*timestamp < m_timestamp_buf) { + // Cannot give samples earlier than the timestamp + return 0; + } + + const size_t sample_offset = + std::llrint( + (double)(*timestamp - m_timestamp_buf) / + (double)m_ticks_per_sample); + const size_t minsize = sample_offset + num_elems; + + if (minsize > m_maxbufsize) { + // Too far in the future + return 0; + } + + while (minsize > m_buf.size()) { + m_cv.wait(lock); + } + + std::copy(m_buf.begin() + sample_offset, + m_buf.begin() + sample_offset + num_elems, + buf); + + *timestamp = m_timestamp_buf + sample_offset * m_ticks_per_sample; + m_read_offset = sample_offset + num_elems; + return num_elems; + } + + private: + void resetbuf() + { + m_buf.clear(); + m_timestamp_buf = 0; + m_read_offset = 0; + } + + void truncatebuf() + { + if (m_buf.size() < m_read_offset) { + throw std::logic_error("truncate " + + std::to_string(m_buf.size()) + " " + + std::to_string(m_read_offset)); + } + + for (size_t r = m_read_offset, w = 0; + r < m_buf.size(); + r++, w++) { + m_buf[w] = m_buf[r]; + } + m_read_offset = 0; + m_buf.resize(m_buf.size() - m_read_offset); + } + + std::condition_variable m_cv; + std::mutex m_mutex; + std::vector m_buf; + long long m_timestamp_buf = 0; + long long m_ticks_per_sample = 0; + size_t m_maxbufsize = 0; + + size_t m_read_offset = 0; +}; diff --git a/Dummy_Settings.cpp b/Dummy_Settings.cpp index 85dc056..d449329 100644 --- a/Dummy_Settings.cpp +++ b/Dummy_Settings.cpp @@ -26,6 +26,7 @@ SoapyDummy::SoapyDummy( const SoapySDR::Kwargs &args ) + : m_circ_buffer(1000000) { // On startup, initialise our epoch to current time, so that // it appears the counter just started counting @@ -272,6 +273,9 @@ SoapySDR::RangeList SoapyDummy::getFrequencyRange( const int direction, const si void SoapyDummy::setSampleRate( const int direction, const size_t channel, const double rate ) { m_samplerate = rate; + + // ticks are in nanoseconds + m_circ_buffer.set_ticks_per_sample(std::lrint(1e9 / rate)); } double SoapyDummy::getSampleRate( const int direction, const size_t channel ) const diff --git a/Dummy_Streaming.cpp b/Dummy_Streaming.cpp index 1ef1c72..a85986b 100644 --- a/Dummy_Streaming.cpp +++ b/Dummy_Streaming.cpp @@ -33,10 +33,7 @@ std::vector SoapyDummy::getStreamFormats(const int direction, const { std::vector formats; - formats.push_back(SOAPY_SDR_CS8); - formats.push_back(SOAPY_SDR_CS16); formats.push_back(SOAPY_SDR_CF32); - formats.push_back(SOAPY_SDR_CF64); return formats; } @@ -69,40 +66,27 @@ SoapySDR::Stream* SoapyDummy::setupStream( const std::vector &channels, const SoapySDR::Kwargs &args ) { - if ( channels.size() > 1 or( channels.size() > 0 and channels.at( 0 ) != 0 ) ) { + if (channels.size() > 1 or (channels.size() > 0 and channels.at(0) != 0 )) { throw std::runtime_error( "setupStream invalid channel selection" ); } - if(direction==SOAPY_SDR_RX){ + if (direction==SOAPY_SDR_RX) { - if ( format == SOAPY_SDR_CS8 ) - { - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS8." ); - }else if ( format == SOAPY_SDR_CS16 ) - { - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS16." ); - }else if ( format == SOAPY_SDR_CF32 ) - { + if (format == SOAPY_SDR_CF32) { SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF32." ); - }else if(format==SOAPY_SDR_CF64){ - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF64." ); - }else throw std::runtime_error( "setupStream invalid format " + format ); + } + else + throw std::runtime_error( "setupStream invalid format " + format ); return RX_STREAM; - } else if(direction==SOAPY_SDR_TX){ - - if ( format == SOAPY_SDR_CS8 ) - { - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS8." ); - }else if ( format == SOAPY_SDR_CS16 ) - { - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS16." ); - }else if ( format == SOAPY_SDR_CF32 ) - { + } + else if (direction==SOAPY_SDR_TX) { + + if (format == SOAPY_SDR_CF32) { SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF32." ); - }else if(format==SOAPY_SDR_CF64){ - SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF64." ); - }else throw std::runtime_error( "setupStream invalid format " + format ); + } + else + throw std::runtime_error( "setupStream invalid format " + format ); return TX_STREAM; } @@ -114,18 +98,21 @@ SoapySDR::Stream* SoapyDummy::setupStream( void SoapyDummy::closeStream( SoapySDR::Stream *stream ) { if (stream == RX_STREAM) { - } else if (stream == TX_STREAM) { + } + else if (stream == TX_STREAM) { } } size_t SoapyDummy::getStreamMTU( SoapySDR::Stream *stream ) const { - if(stream == RX_STREAM){ + if (stream == RX_STREAM) { return m_rxstream.mtu; - } else if(stream == TX_STREAM){ + } + else if (stream == TX_STREAM) { return m_txstream.mtu; - } else { + } + else { throw std::runtime_error("Invalid stream"); } } @@ -138,9 +125,11 @@ int SoapyDummy::activateStream( { if (stream == RX_STREAM) { SoapySDR_logf(SOAPY_SDR_DEBUG, "Start RX"); + m_rxstream.active = true; } else if (stream == TX_STREAM) { SoapySDR_logf(SOAPY_SDR_DEBUG, "Start TX"); + m_txstream.active = true; } return 0; @@ -152,10 +141,11 @@ int SoapyDummy::deactivateStream( const int flags, const long long timeNs ) { - if (stream == RX_STREAM) { + m_rxstream.active = false; } else if (stream == TX_STREAM) { + m_rxstream.active = false; } return 0; } @@ -173,8 +163,17 @@ int SoapyDummy::readStream( return SOAPY_SDR_NOT_SUPPORTED; } - size_t returnedElems = std::min(numElems,this->getStreamMTU(stream)); - return returnedElems; + if (not m_rxstream.active) { + using namespace std; + this_thread::sleep_for(chrono::microseconds(timeoutUs)); + return SOAPY_SDR_TIMEOUT; + } + + std::complex * const buf = static_cast * const>(buffs[0]); + + size_t numToRead = std::min(numElems, this->getStreamMTU(stream)); + const size_t numRead = m_circ_buffer.read(buf, numToRead); + return numRead; } int SoapyDummy::writeStream( @@ -189,9 +188,17 @@ int SoapyDummy::writeStream( return SOAPY_SDR_NOT_SUPPORTED; } - size_t returnedElems = std::min(numElems,this->getStreamMTU(stream)); + if (not m_txstream.active) { + using namespace std; + this_thread::sleep_for(chrono::microseconds(timeoutUs)); + return SOAPY_SDR_TIMEOUT; + } + + const std::complex * const buf = static_cast * const>(buffs[0]); - return returnedElems; + size_t numWritten = std::min(numElems, this->getStreamMTU(stream)); + m_circ_buffer.write(buf, numWritten, timeNs); + return numWritten; } diff --git a/SoapyDummy.hpp b/SoapyDummy.hpp index 7f08a89..1ae412e 100644 --- a/SoapyDummy.hpp +++ b/SoapyDummy.hpp @@ -29,6 +29,7 @@ #include #include #include +#include "CircBufferTimestamp.hpp" class SoapyDummySession { @@ -178,9 +179,8 @@ class SoapyDummy : public SoapySDR::Device private: struct DummyStream { - size_t mtu = 1024; - size_t buf_num = 1; - std::complex buf[1024]; + const size_t mtu = 1024; + bool active = false; }; DummyStream m_rxstream; @@ -188,6 +188,8 @@ class SoapyDummy : public SoapySDR::Device SoapySDR::Stream* const TX_STREAM = (SoapySDR::Stream*) 0x1; SoapySDR::Stream* const RX_STREAM = (SoapySDR::Stream*) 0x2; + CircularBuffer > m_circ_buffer; + SoapyDummySession m_session; double m_samplerate = 0; double m_frequency = 0; -- cgit v1.2.3