summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-05-20 15:05:02 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-05-20 15:05:02 +0200
commitd9ef93e7e8cc94da627f39f3c443b9a845ccecf4 (patch)
tree08f0ed87e8b88d70bacad136b62949378d40afd5
parent5d702c4fcfdfe56c0f33c10c76f35b1c36810096 (diff)
downloaddabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.tar.gz
dabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.tar.bz2
dabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.zip
Refactor OutputUHD, replace double buffering by queue
-rw-r--r--src/OutputUHD.cpp172
-rw-r--r--src/OutputUHD.h33
2 files changed, 79 insertions, 126 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index fab1fd1..b9f800e 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -122,7 +122,6 @@ OutputUHD::OutputUHD(
// the buffers at object initialisation.
first_run(true),
gps_fix_verified(false),
- activebuffer(1),
myDelayBuf(0)
{
@@ -237,8 +236,6 @@ OutputUHD::OutputUHD(
uwd.myUsrp = myUsrp;
#endif
- uwd.frame0.ts.timestamp_valid = false;
- uwd.frame1.ts.timestamp_valid = false;
uwd.sampleRate = myConf.sampleRate;
uwd.sourceContainsTimestamp = false;
uwd.muteNoTimestamps = myConf.muteNoTimestamps;
@@ -265,10 +262,6 @@ OutputUHD::OutputUHD(
SetDelayBuffer(myConf.dabMode);
- auto b = std::make_shared<boost::barrier>(2);
- mySyncBarrier = b;
- uwd.sync_barrier = b;
-
MDEBUG("OutputUHD:UHD ready.\n");
}
@@ -277,10 +270,12 @@ OutputUHD::~OutputUHD()
{
MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this);
worker.stop();
- if (!first_run) {
- free(uwd.frame0.buf);
- free(uwd.frame1.buf);
- }
+}
+
+
+void OutputUHD::setETIReader(EtiReader *etiReader)
+{
+ myEtiReader = etiReader;
}
int transmission_frame_duration_ms(unsigned int dabMode)
@@ -311,15 +306,8 @@ void OutputUHD::SetDelayBuffer(unsigned int dabMode)
int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
{
- struct frame_timestamp ts;
-
uwd.muting = myConf.muting;
-
- // On the first call, we must do some allocation and we must fill
- // the first buffer
- // We will only wait on the barrier on the subsequent calls to
- // OutputUHD::process
if (not gps_fix_verified) {
if (uwd.check_gpsfix) {
initial_gps_check();
@@ -336,45 +324,34 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
myConf.muting = false;
}
}
- else if (first_run) {
- etiLog.level(debug) << "OutputUHD: UHD initialising...";
-
- worker.start(&uwd);
-
- uwd.bufsize = dataIn->getLength();
- uwd.frame0.buf = malloc(uwd.bufsize);
- uwd.frame1.buf = malloc(uwd.bufsize);
-
- uwd.sourceContainsTimestamp = myConf.enableSync &&
- myEtiReader->sourceContainsTimestamp();
-
- // The worker begins by transmitting buf0
- memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize);
+ else {
+ if (first_run) {
+ etiLog.level(debug) << "OutputUHD: UHD initialising...";
+
+ uwd.sourceContainsTimestamp = myConf.enableSync &&
+ myEtiReader->sourceContainsTimestamp();
+
+ switch (myEtiReader->getMode()) {
+ case 1: uwd.fct_increment = 4; break;
+ case 2:
+ case 3: uwd.fct_increment = 1; break;
+ case 4: uwd.fct_increment = 2; break;
+ default: break;
+ }
- myEtiReader->calculateTimestamp(ts);
- uwd.frame0.ts = ts;
+ // we only set the delay buffer from the dab mode signaled in ETI if the
+ // dab mode was not set in contructor
+ if (myTFDurationMs == 0) {
+ SetDelayBuffer(myEtiReader->getMode());
+ }
- switch (myEtiReader->getMode()) {
- case 1: uwd.fct_increment = 4; break;
- case 2:
- case 3: uwd.fct_increment = 1; break;
- case 4: uwd.fct_increment = 2; break;
- default: break;
- }
+ worker.start(&uwd);
- // we only set the delay buffer from the dab mode signaled in ETI if the
- // dab mode was not set in contructor
- if (myTFDurationMs == 0) {
- SetDelayBuffer(myEtiReader->getMode());
+ lastLen = dataIn->getLength();
+ first_run = false;
+ etiLog.level(debug) << "OutputUHD: UHD initialising complete";
}
- activebuffer = 1;
-
- lastLen = uwd.bufsize;
- first_run = false;
- etiLog.level(debug) << "OutputUHD: UHD initialising complete";
- }
- else {
if (lastLen != dataIn->getLength()) {
// I expect that this never happens.
@@ -394,7 +371,31 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
}
}
- mySyncBarrier.get()->wait();
+ // Prepare the frame for the worker
+ UHDWorkerFrameData frame;
+ frame.buf.resize(dataIn->getLength());
+
+ // calculate delay and fill buffer
+ uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000;
+ uint32_t noByteDelay = noSampleDelay * sizeof(complexf);
+
+ const uint8_t* pInData = (uint8_t*)dataIn->getData();
+
+ uint8_t *pTmp = &frame.buf[0];
+ if (noByteDelay) {
+ // copy remain from delaybuf
+ memcpy(pTmp, &myDelayBuf[0], noByteDelay);
+ // copy new data
+ memcpy(&pTmp[noByteDelay], pInData, dataIn->getLength() - noByteDelay);
+ // copy remaining data to delay buf
+ memcpy(&myDelayBuf[0], &pInData[dataIn->getLength() - noByteDelay], noByteDelay);
+ }
+ else {
+ std::copy(pInData, pInData + dataIn->getLength(),
+ frame.buf.begin());
+ }
+
+ myEtiReader->calculateTimestamp(frame.ts);
if (!uwd.running) {
worker.stop();
@@ -409,45 +410,20 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
}
}
- // write into the our buffer while
- // the worker sends the other.
-
- myEtiReader->calculateTimestamp(ts);
uwd.sourceContainsTimestamp = myConf.enableSync &&
myEtiReader->sourceContainsTimestamp();
- // calculate delay
- uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000;
- uint32_t noByteDelay = noSampleDelay * sizeof(complexf);
-
- uint8_t* pInData = (uint8_t*) dataIn->getData();
- if (activebuffer == 0) {
- uint8_t *pTmp = (uint8_t*) uwd.frame0.buf;
- // copy remain from delaybuf
- memcpy(pTmp, &myDelayBuf[0], noByteDelay);
- // copy new data
- memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
- // copy remaining data to delay buf
- memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
-
- uwd.frame0.ts = ts;
- }
- else if (activebuffer == 1) {
- uint8_t *pTmp = (uint8_t*) uwd.frame1.buf;
- // copy remain from delaybuf
- memcpy(pTmp, &myDelayBuf[0], noByteDelay);
- // copy new data
- memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
- // copy remaining data to delay buf
- memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
+ while (true) {
+ if (uwd.frames.size() > FRAMES_MAX_SIZE) {
+ usleep(10000); // 10ms
+ }
- uwd.frame1.ts = ts;
+ uwd.frames.push(frame);
+ break;
}
-
- activebuffer = (activebuffer + 1) % 2;
}
- return uwd.bufsize;
+ return dataIn->getLength();
}
@@ -637,7 +613,6 @@ void UHDWorker::process_errhandler()
}
uwd->running = false;
- uwd->sync_barrier.get()->wait();
etiLog.level(warn) << "UHD worker terminated";
}
@@ -666,25 +641,10 @@ void UHDWorker::process()
md.has_time_spec = false;
md.time_spec = uhd::time_spec_t(0.0);
- /* Wait for barrier */
- // this wait will hopefully always be the second one
- // because modulation should be quicker than transmission
- uwd->sync_barrier.get()->wait();
-
- struct UHDWorkerFrameData* frame;
-
- if (workerbuffer == 0) {
- frame = &(uwd->frame0);
- }
- else if (workerbuffer == 1) {
- frame = &(uwd->frame1);
- }
- else {
- throw std::runtime_error(
- "UHDWorker.process: workerbuffer is neither 0 nor 1 !");
- }
+ struct UHDWorkerFrameData frame;
+ uwd->frames.wait_and_pop(frame);
- handle_frame(frame);
+ handle_frame(&frame);
// swap buffers
workerbuffer = (workerbuffer + 1) % 2;
@@ -820,8 +780,8 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame)
void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame)
{
const double tx_timeout = 20.0;
- const size_t sizeIn = uwd->bufsize / sizeof(complexf);
- const complexf* in_data = reinterpret_cast<const complexf*>(frame->buf);
+ const size_t sizeIn = frame->buf.size() / sizeof(complexf);
+ const complexf* in_data = reinterpret_cast<const complexf*>(&frame->buf[0]);
#if FAKE_UHD == 0
size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index d55d38e..8cbfb3d 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -47,9 +47,8 @@ DESCRIPTION:
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/safe_main.hpp>
#include <uhd/usrp/multi_usrp.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
-#include <list>
+#include <boost/thread.hpp>
+#include <deque>
#include <memory>
#include <string>
@@ -58,6 +57,7 @@ DESCRIPTION:
#include "EtiReader.h"
#include "TimestampDecoder.h"
#include "RemoteControl.h"
+#include "ThreadsafeQueue.h"
#include <stdio.h>
#include <sys/types.h>
@@ -73,13 +73,19 @@ DESCRIPTION:
// frames are too far in the future
#define TIMESTAMP_MARGIN_FUTURE 0.5
+// Maximum number of frames that can wait in uwd.frames
+#define FRAMES_MAX_SIZE 2
+
typedef std::complex<float> complexf;
+// Each frame contains one OFDM frame, and its
+// associated timestamp
struct UHDWorkerFrameData {
// Buffer holding frame data
- void* buf;
+ std::vector<uint8_t> buf;
- // Full timestamp
+ // A full timestamp contains a TIST according to standard
+ // and time information within MNSC with tx_second.
struct frame_timestamp ts;
};
@@ -102,20 +108,13 @@ struct UHDWorkerData {
#endif
unsigned sampleRate;
- // Double buffering between the two threads
- // Each buffer contains one OFDM frame, and it's
- // associated timestamp
- // A full timestamp contains a TIST according to standard
- // and time information within MNSC with tx_second.
bool sourceContainsTimestamp;
// When working with timestamps, mute the frames that
// do not have a timestamp
bool muteNoTimestamps;
- struct UHDWorkerFrameData frame0;
- struct UHDWorkerFrameData frame1;
- size_t bufsize; // in bytes
+ ThreadsafeQueue<UHDWorkerFrameData> frames;
// If we want to verify loss of refclk
bool check_refclk_loss;
@@ -128,9 +127,6 @@ struct UHDWorkerData {
// muting set by remote control
bool muting;
- // A barrier to synchronise the two threads
- std::shared_ptr<boost::barrier> sync_barrier;
-
// What to do when the reference clock PLL loses lock
refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour;
@@ -234,9 +230,7 @@ class OutputUHD: public ModOutput, public RemoteControllable {
const char* name() { return "OutputUHD"; }
- void setETIReader(EtiReader *etiReader) {
- myEtiReader = etiReader;
- }
+ void setETIReader(EtiReader *etiReader);
/*********** REMOTE CONTROL ***************/
/* virtual void enrol_at(BaseRemoteController& controller)
@@ -264,7 +258,6 @@ class OutputUHD: public ModOutput, public RemoteControllable {
bool first_run;
bool gps_fix_verified;
struct UHDWorkerData uwd;
- int activebuffer;
private:
// Resize the internal delay buffer according to the dabMode and