aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-24 09:46:38 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-24 09:46:38 +0100
commit69f65ccc49e4c61091554e2b9be7c01ff35186f2 (patch)
tree7e26704840ec0cf1ac946b8599c6c33e70c02b21
parent8c7c2e4eb49ce4bbd4e9a6b8b8fccf2905be1966 (diff)
downloadSoapyDummy-69f65ccc49e4c61091554e2b9be7c01ff35186f2.tar.gz
SoapyDummy-69f65ccc49e4c61091554e2b9be7c01ff35186f2.tar.bz2
SoapyDummy-69f65ccc49e4c61091554e2b9be7c01ff35186f2.zip
Add TX to RX feedback with timestamp support, only CF32
-rw-r--r--CircBufferTimestamp.cpp98
-rw-r--r--CircBufferTimestamp.hpp172
-rw-r--r--Dummy_Settings.cpp4
-rw-r--r--Dummy_Streaming.cpp83
-rw-r--r--SoapyDummy.hpp8
5 files changed, 324 insertions, 41 deletions
diff --git a/CircBufferTimestamp.cpp b/CircBufferTimestamp.cpp
new file mode 100644
index 0000000..ba66e68
--- /dev/null
+++ b/CircBufferTimestamp.cpp
@@ -0,0 +1,98 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2018 Matthias P. Braendli
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+// Tests for CircBufferTimestamp.hpp
+
+#include "CircBufferTimestamp.hpp"
+#include <cstdint>
+#include <thread>
+#include <iostream>
+
+using namespace std;
+
+const long long ticks_per_sample = 12;
+const size_t N_ITER = 40;
+const size_t N_SAMPS_PER_BUF = 10;
+
+CircularBuffer<int> cb(1000000);
+
+mutex cerr_mutex;
+
+void writer()
+{
+ const long long ts_start = 200;
+
+ for (size_t iterations = 0; iterations < N_ITER; iterations++) {
+ vector<int> b;
+ for (size_t i = 0; i < N_SAMPS_PER_BUF; i++) {
+ b.push_back(N_SAMPS_PER_BUF*iterations + i);
+ }
+
+ const long long ts_b = ts_start + ticks_per_sample * iterations * b.size();
+
+ cb.write(b.data(), b.size(), ts_b);
+ {
+ unique_lock<mutex> lock(cerr_mutex);
+ cerr << "Wrote " << b.size() << " at t=" << ts_b << endl;
+ }
+
+ this_thread::sleep_for(chrono::milliseconds(10));
+ }
+}
+
+void reader()
+{
+ size_t totaldata = 0;
+ long long req_ts = 450;
+ while (totaldata < N_ITER * N_SAMPS_PER_BUF / 3) {
+ long long ts = req_ts;
+ vector<int> buf(N_SAMPS_PER_BUF/2);
+ const size_t read = cb.read(buf.data(), buf.size(), &ts);
+ buf.resize(read);
+ totaldata += read;
+
+ {
+ unique_lock<mutex> lock(cerr_mutex);
+ cerr << "Read " << totaldata << " so far, ts_req=" <<
+ req_ts << " ts=" << ts << " data:";
+ for (const int samp : buf) {
+ cerr << " " << samp;
+ }
+ cerr << endl;
+ }
+ req_ts += ticks_per_sample * (N_SAMPS_PER_BUF/2 + 1);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ cb.set_ticks_per_sample(ticks_per_sample);
+
+ auto writeThread = thread(writer);
+ auto readThread = thread(reader);
+
+ writeThread.join();
+ readThread.join();
+ return 0;
+}
diff --git a/CircBufferTimestamp.hpp b/CircBufferTimestamp.hpp
new file mode 100644
index 0000000..b6e0751
--- /dev/null
+++ b/CircBufferTimestamp.hpp
@@ -0,0 +1,172 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2018 Matthias P. Braendli
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#pragma once
+
+#include <vector>
+#include <mutex>
+#include <condition_variable>
+#include <cmath>
+
+template<class T>
+class CircularBuffer
+{
+ public:
+ CircularBuffer(size_t max_bufsize) :
+ m_maxbufsize(max_bufsize)
+ {
+ }
+
+ void set_ticks_per_sample(long long tps)
+ {
+ m_ticks_per_sample = tps;
+ }
+
+ /* Write num_elems from buf to the internal buffer, at the position
+ * given by timestamp
+ */
+ void write(const T *buf, size_t num_elems, long long timestamp)
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ if (m_timestamp_buf == 0 or timestamp < m_timestamp_buf) {
+ // First time we write to our buf, or jump back in time,
+ // we reset our buffer
+ resetbuf();
+ m_timestamp_buf = timestamp;
+ }
+
+ size_t sample_offset = (timestamp - m_timestamp_buf) /
+ m_ticks_per_sample;
+ size_t minsize = sample_offset + num_elems;
+
+ if (minsize > m_maxbufsize) {
+ truncatebuf();
+ if (minsize > m_maxbufsize) {
+ resetbuf();
+ m_timestamp_buf = timestamp;
+ sample_offset = 0;
+ minsize = num_elems;
+ }
+ }
+
+ if (m_buf.size() < minsize) {
+ m_buf.resize(minsize);
+ }
+ std::copy(buf, buf + num_elems, m_buf.begin() + sample_offset);
+
+ lock.unlock();
+ m_cv.notify_all();
+ }
+
+ /* Read num_elems elements from the internal buffer, starting
+ * from the end of the last read. Returns number of elements written */
+ size_t read(T *buf, size_t num_elems)
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ while (m_read_offset + num_elems > m_buf.size()) {
+ m_cv.wait(lock);
+ }
+
+ std::copy(m_buf.begin() + m_read_offset,
+ m_buf.begin() + m_read_offset + num_elems,
+ buf);
+
+ m_read_offset += num_elems;
+ return num_elems;
+ }
+
+ /* Read num_elems elements from the internal buffer at the
+ * position corresponding to the timestamp, which is also
+ * updated.
+ * Returns number of elements written.
+ */
+ size_t read(T *buf, size_t num_elems, long long *timestamp)
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ if (*timestamp < m_timestamp_buf) {
+ // Cannot give samples earlier than the timestamp
+ return 0;
+ }
+
+ const size_t sample_offset =
+ std::llrint(
+ (double)(*timestamp - m_timestamp_buf) /
+ (double)m_ticks_per_sample);
+ const size_t minsize = sample_offset + num_elems;
+
+ if (minsize > m_maxbufsize) {
+ // Too far in the future
+ return 0;
+ }
+
+ while (minsize > m_buf.size()) {
+ m_cv.wait(lock);
+ }
+
+ std::copy(m_buf.begin() + sample_offset,
+ m_buf.begin() + sample_offset + num_elems,
+ buf);
+
+ *timestamp = m_timestamp_buf + sample_offset * m_ticks_per_sample;
+ m_read_offset = sample_offset + num_elems;
+ return num_elems;
+ }
+
+ private:
+ void resetbuf()
+ {
+ m_buf.clear();
+ m_timestamp_buf = 0;
+ m_read_offset = 0;
+ }
+
+ void truncatebuf()
+ {
+ if (m_buf.size() < m_read_offset) {
+ throw std::logic_error("truncate " +
+ std::to_string(m_buf.size()) + " " +
+ std::to_string(m_read_offset));
+ }
+
+ for (size_t r = m_read_offset, w = 0;
+ r < m_buf.size();
+ r++, w++) {
+ m_buf[w] = m_buf[r];
+ }
+ m_read_offset = 0;
+ m_buf.resize(m_buf.size() - m_read_offset);
+ }
+
+ std::condition_variable m_cv;
+ std::mutex m_mutex;
+ std::vector<T> m_buf;
+ long long m_timestamp_buf = 0;
+ long long m_ticks_per_sample = 0;
+ size_t m_maxbufsize = 0;
+
+ size_t m_read_offset = 0;
+};
diff --git a/Dummy_Settings.cpp b/Dummy_Settings.cpp
index 85dc056..d449329 100644
--- a/Dummy_Settings.cpp
+++ b/Dummy_Settings.cpp
@@ -26,6 +26,7 @@
SoapyDummy::SoapyDummy( const SoapySDR::Kwargs &args )
+ : m_circ_buffer(1000000)
{
// On startup, initialise our epoch to current time, so that
// it appears the counter just started counting
@@ -272,6 +273,9 @@ SoapySDR::RangeList SoapyDummy::getFrequencyRange( const int direction, const si
void SoapyDummy::setSampleRate( const int direction, const size_t channel, const double rate )
{
m_samplerate = rate;
+
+ // ticks are in nanoseconds
+ m_circ_buffer.set_ticks_per_sample(std::lrint(1e9 / rate));
}
double SoapyDummy::getSampleRate( const int direction, const size_t channel ) const
diff --git a/Dummy_Streaming.cpp b/Dummy_Streaming.cpp
index 1ef1c72..a85986b 100644
--- a/Dummy_Streaming.cpp
+++ b/Dummy_Streaming.cpp
@@ -33,10 +33,7 @@ std::vector<std::string> SoapyDummy::getStreamFormats(const int direction, const
{
std::vector<std::string> formats;
- formats.push_back(SOAPY_SDR_CS8);
- formats.push_back(SOAPY_SDR_CS16);
formats.push_back(SOAPY_SDR_CF32);
- formats.push_back(SOAPY_SDR_CF64);
return formats;
}
@@ -69,40 +66,27 @@ SoapySDR::Stream* SoapyDummy::setupStream(
const std::vector<size_t> &channels,
const SoapySDR::Kwargs &args )
{
- if ( channels.size() > 1 or( channels.size() > 0 and channels.at( 0 ) != 0 ) ) {
+ if (channels.size() > 1 or (channels.size() > 0 and channels.at(0) != 0 )) {
throw std::runtime_error( "setupStream invalid channel selection" );
}
- if(direction==SOAPY_SDR_RX){
+ if (direction==SOAPY_SDR_RX) {
- if ( format == SOAPY_SDR_CS8 )
- {
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS8." );
- }else if ( format == SOAPY_SDR_CS16 )
- {
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS16." );
- }else if ( format == SOAPY_SDR_CF32 )
- {
+ if (format == SOAPY_SDR_CF32) {
SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF32." );
- }else if(format==SOAPY_SDR_CF64){
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF64." );
- }else throw std::runtime_error( "setupStream invalid format " + format );
+ }
+ else
+ throw std::runtime_error( "setupStream invalid format " + format );
return RX_STREAM;
- } else if(direction==SOAPY_SDR_TX){
-
- if ( format == SOAPY_SDR_CS8 )
- {
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS8." );
- }else if ( format == SOAPY_SDR_CS16 )
- {
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CS16." );
- }else if ( format == SOAPY_SDR_CF32 )
- {
+ }
+ else if (direction==SOAPY_SDR_TX) {
+
+ if (format == SOAPY_SDR_CF32) {
SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF32." );
- }else if(format==SOAPY_SDR_CF64){
- SoapySDR_log( SOAPY_SDR_DEBUG, "Using format CF64." );
- }else throw std::runtime_error( "setupStream invalid format " + format );
+ }
+ else
+ throw std::runtime_error( "setupStream invalid format " + format );
return TX_STREAM;
}
@@ -114,18 +98,21 @@ SoapySDR::Stream* SoapyDummy::setupStream(
void SoapyDummy::closeStream( SoapySDR::Stream *stream )
{
if (stream == RX_STREAM) {
- } else if (stream == TX_STREAM) {
+ }
+ else if (stream == TX_STREAM) {
}
}
size_t SoapyDummy::getStreamMTU( SoapySDR::Stream *stream ) const
{
- if(stream == RX_STREAM){
+ if (stream == RX_STREAM) {
return m_rxstream.mtu;
- } else if(stream == TX_STREAM){
+ }
+ else if (stream == TX_STREAM) {
return m_txstream.mtu;
- } else {
+ }
+ else {
throw std::runtime_error("Invalid stream");
}
}
@@ -138,9 +125,11 @@ int SoapyDummy::activateStream(
{
if (stream == RX_STREAM) {
SoapySDR_logf(SOAPY_SDR_DEBUG, "Start RX");
+ m_rxstream.active = true;
}
else if (stream == TX_STREAM) {
SoapySDR_logf(SOAPY_SDR_DEBUG, "Start TX");
+ m_txstream.active = true;
}
return 0;
@@ -152,10 +141,11 @@ int SoapyDummy::deactivateStream(
const int flags,
const long long timeNs )
{
-
if (stream == RX_STREAM) {
+ m_rxstream.active = false;
}
else if (stream == TX_STREAM) {
+ m_rxstream.active = false;
}
return 0;
}
@@ -173,8 +163,17 @@ int SoapyDummy::readStream(
return SOAPY_SDR_NOT_SUPPORTED;
}
- size_t returnedElems = std::min(numElems,this->getStreamMTU(stream));
- return returnedElems;
+ if (not m_rxstream.active) {
+ using namespace std;
+ this_thread::sleep_for(chrono::microseconds(timeoutUs));
+ return SOAPY_SDR_TIMEOUT;
+ }
+
+ std::complex<float> * const buf = static_cast<std::complex<float> * const>(buffs[0]);
+
+ size_t numToRead = std::min(numElems, this->getStreamMTU(stream));
+ const size_t numRead = m_circ_buffer.read(buf, numToRead);
+ return numRead;
}
int SoapyDummy::writeStream(
@@ -189,9 +188,17 @@ int SoapyDummy::writeStream(
return SOAPY_SDR_NOT_SUPPORTED;
}
- size_t returnedElems = std::min(numElems,this->getStreamMTU(stream));
+ if (not m_txstream.active) {
+ using namespace std;
+ this_thread::sleep_for(chrono::microseconds(timeoutUs));
+ return SOAPY_SDR_TIMEOUT;
+ }
+
+ const std::complex<float> * const buf = static_cast<const std::complex<float> * const>(buffs[0]);
- return returnedElems;
+ size_t numWritten = std::min(numElems, this->getStreamMTU(stream));
+ m_circ_buffer.write(buf, numWritten, timeNs);
+ return numWritten;
}
diff --git a/SoapyDummy.hpp b/SoapyDummy.hpp
index 7f08a89..1ae412e 100644
--- a/SoapyDummy.hpp
+++ b/SoapyDummy.hpp
@@ -29,6 +29,7 @@
#include <condition_variable>
#include <SoapySDR/Device.hpp>
#include <SoapySDR/Logger.hpp>
+#include "CircBufferTimestamp.hpp"
class SoapyDummySession
{
@@ -178,9 +179,8 @@ class SoapyDummy : public SoapySDR::Device
private:
struct DummyStream {
- size_t mtu = 1024;
- size_t buf_num = 1;
- std::complex<float> buf[1024];
+ const size_t mtu = 1024;
+ bool active = false;
};
DummyStream m_rxstream;
@@ -188,6 +188,8 @@ class SoapyDummy : public SoapySDR::Device
SoapySDR::Stream* const TX_STREAM = (SoapySDR::Stream*) 0x1;
SoapySDR::Stream* const RX_STREAM = (SoapySDR::Stream*) 0x2;
+ CircularBuffer<std::complex<float> > m_circ_buffer;
+
SoapyDummySession m_session;
double m_samplerate = 0;
double m_frequency = 0;