diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-05-20 15:05:02 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-05-20 15:05:02 +0200 |
commit | d9ef93e7e8cc94da627f39f3c443b9a845ccecf4 (patch) | |
tree | 08f0ed87e8b88d70bacad136b62949378d40afd5 /src | |
parent | 5d702c4fcfdfe56c0f33c10c76f35b1c36810096 (diff) | |
download | dabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.tar.gz dabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.tar.bz2 dabmod-d9ef93e7e8cc94da627f39f3c443b9a845ccecf4.zip |
Refactor OutputUHD, replace double buffering by queue
Diffstat (limited to 'src')
-rw-r--r-- | src/OutputUHD.cpp | 172 | ||||
-rw-r--r-- | src/OutputUHD.h | 33 |
2 files changed, 79 insertions, 126 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index fab1fd1..b9f800e 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -122,7 +122,6 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), gps_fix_verified(false), - activebuffer(1), myDelayBuf(0) { @@ -237,8 +236,6 @@ OutputUHD::OutputUHD( uwd.myUsrp = myUsrp; #endif - uwd.frame0.ts.timestamp_valid = false; - uwd.frame1.ts.timestamp_valid = false; uwd.sampleRate = myConf.sampleRate; uwd.sourceContainsTimestamp = false; uwd.muteNoTimestamps = myConf.muteNoTimestamps; @@ -265,10 +262,6 @@ OutputUHD::OutputUHD( SetDelayBuffer(myConf.dabMode); - auto b = std::make_shared<boost::barrier>(2); - mySyncBarrier = b; - uwd.sync_barrier = b; - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -277,10 +270,12 @@ OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); worker.stop(); - if (!first_run) { - free(uwd.frame0.buf); - free(uwd.frame1.buf); - } +} + + +void OutputUHD::setETIReader(EtiReader *etiReader) +{ + myEtiReader = etiReader; } int transmission_frame_duration_ms(unsigned int dabMode) @@ -311,15 +306,8 @@ void OutputUHD::SetDelayBuffer(unsigned int dabMode) int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) { - struct frame_timestamp ts; - uwd.muting = myConf.muting; - - // On the first call, we must do some allocation and we must fill - // the first buffer - // We will only wait on the barrier on the subsequent calls to - // OutputUHD::process if (not gps_fix_verified) { if (uwd.check_gpsfix) { initial_gps_check(); @@ -336,45 +324,34 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) myConf.muting = false; } } - else if (first_run) { - etiLog.level(debug) << "OutputUHD: UHD initialising..."; - - worker.start(&uwd); - - uwd.bufsize = dataIn->getLength(); - uwd.frame0.buf = malloc(uwd.bufsize); - uwd.frame1.buf = malloc(uwd.bufsize); - - uwd.sourceContainsTimestamp = myConf.enableSync && - myEtiReader->sourceContainsTimestamp(); - - // The worker begins by transmitting buf0 - memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); + else { + if (first_run) { + etiLog.level(debug) << "OutputUHD: UHD initialising..."; + + uwd.sourceContainsTimestamp = myConf.enableSync && + myEtiReader->sourceContainsTimestamp(); + + switch (myEtiReader->getMode()) { + case 1: uwd.fct_increment = 4; break; + case 2: + case 3: uwd.fct_increment = 1; break; + case 4: uwd.fct_increment = 2; break; + default: break; + } - myEtiReader->calculateTimestamp(ts); - uwd.frame0.ts = ts; + // we only set the delay buffer from the dab mode signaled in ETI if the + // dab mode was not set in contructor + if (myTFDurationMs == 0) { + SetDelayBuffer(myEtiReader->getMode()); + } - switch (myEtiReader->getMode()) { - case 1: uwd.fct_increment = 4; break; - case 2: - case 3: uwd.fct_increment = 1; break; - case 4: uwd.fct_increment = 2; break; - default: break; - } + worker.start(&uwd); - // we only set the delay buffer from the dab mode signaled in ETI if the - // dab mode was not set in contructor - if (myTFDurationMs == 0) { - SetDelayBuffer(myEtiReader->getMode()); + lastLen = dataIn->getLength(); + first_run = false; + etiLog.level(debug) << "OutputUHD: UHD initialising complete"; } - activebuffer = 1; - - lastLen = uwd.bufsize; - first_run = false; - etiLog.level(debug) << "OutputUHD: UHD initialising complete"; - } - else { if (lastLen != dataIn->getLength()) { // I expect that this never happens. @@ -394,7 +371,31 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } } - mySyncBarrier.get()->wait(); + // Prepare the frame for the worker + UHDWorkerFrameData frame; + frame.buf.resize(dataIn->getLength()); + + // calculate delay and fill buffer + uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000; + uint32_t noByteDelay = noSampleDelay * sizeof(complexf); + + const uint8_t* pInData = (uint8_t*)dataIn->getData(); + + uint8_t *pTmp = &frame.buf[0]; + if (noByteDelay) { + // copy remain from delaybuf + memcpy(pTmp, &myDelayBuf[0], noByteDelay); + // copy new data + memcpy(&pTmp[noByteDelay], pInData, dataIn->getLength() - noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[dataIn->getLength() - noByteDelay], noByteDelay); + } + else { + std::copy(pInData, pInData + dataIn->getLength(), + frame.buf.begin()); + } + + myEtiReader->calculateTimestamp(frame.ts); if (!uwd.running) { worker.stop(); @@ -409,45 +410,20 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } } - // write into the our buffer while - // the worker sends the other. - - myEtiReader->calculateTimestamp(ts); uwd.sourceContainsTimestamp = myConf.enableSync && myEtiReader->sourceContainsTimestamp(); - // calculate delay - uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000; - uint32_t noByteDelay = noSampleDelay * sizeof(complexf); - - uint8_t* pInData = (uint8_t*) dataIn->getData(); - if (activebuffer == 0) { - uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; - // copy remain from delaybuf - memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data - memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - uwd.frame0.ts = ts; - } - else if (activebuffer == 1) { - uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; - // copy remain from delaybuf - memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data - memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); + while (true) { + if (uwd.frames.size() > FRAMES_MAX_SIZE) { + usleep(10000); // 10ms + } - uwd.frame1.ts = ts; + uwd.frames.push(frame); + break; } - - activebuffer = (activebuffer + 1) % 2; } - return uwd.bufsize; + return dataIn->getLength(); } @@ -637,7 +613,6 @@ void UHDWorker::process_errhandler() } uwd->running = false; - uwd->sync_barrier.get()->wait(); etiLog.level(warn) << "UHD worker terminated"; } @@ -666,25 +641,10 @@ void UHDWorker::process() md.has_time_spec = false; md.time_spec = uhd::time_spec_t(0.0); - /* Wait for barrier */ - // this wait will hopefully always be the second one - // because modulation should be quicker than transmission - uwd->sync_barrier.get()->wait(); - - struct UHDWorkerFrameData* frame; - - if (workerbuffer == 0) { - frame = &(uwd->frame0); - } - else if (workerbuffer == 1) { - frame = &(uwd->frame1); - } - else { - throw std::runtime_error( - "UHDWorker.process: workerbuffer is neither 0 nor 1 !"); - } + struct UHDWorkerFrameData frame; + uwd->frames.wait_and_pop(frame); - handle_frame(frame); + handle_frame(&frame); // swap buffers workerbuffer = (workerbuffer + 1) % 2; @@ -820,8 +780,8 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame) { const double tx_timeout = 20.0; - const size_t sizeIn = uwd->bufsize / sizeof(complexf); - const complexf* in_data = reinterpret_cast<const complexf*>(frame->buf); + const size_t sizeIn = frame->buf.size() / sizeof(complexf); + const complexf* in_data = reinterpret_cast<const complexf*>(&frame->buf[0]); #if FAKE_UHD == 0 size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); diff --git a/src/OutputUHD.h b/src/OutputUHD.h index d55d38e..8cbfb3d 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -47,9 +47,8 @@ DESCRIPTION: #include <uhd/utils/thread_priority.hpp> #include <uhd/utils/safe_main.hpp> #include <uhd/usrp/multi_usrp.hpp> -#include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp> -#include <list> +#include <boost/thread.hpp> +#include <deque> #include <memory> #include <string> @@ -58,6 +57,7 @@ DESCRIPTION: #include "EtiReader.h" #include "TimestampDecoder.h" #include "RemoteControl.h" +#include "ThreadsafeQueue.h" #include <stdio.h> #include <sys/types.h> @@ -73,13 +73,19 @@ DESCRIPTION: // frames are too far in the future #define TIMESTAMP_MARGIN_FUTURE 0.5 +// Maximum number of frames that can wait in uwd.frames +#define FRAMES_MAX_SIZE 2 + typedef std::complex<float> complexf; +// Each frame contains one OFDM frame, and its +// associated timestamp struct UHDWorkerFrameData { // Buffer holding frame data - void* buf; + std::vector<uint8_t> buf; - // Full timestamp + // A full timestamp contains a TIST according to standard + // and time information within MNSC with tx_second. struct frame_timestamp ts; }; @@ -102,20 +108,13 @@ struct UHDWorkerData { #endif unsigned sampleRate; - // Double buffering between the two threads - // Each buffer contains one OFDM frame, and it's - // associated timestamp - // A full timestamp contains a TIST according to standard - // and time information within MNSC with tx_second. bool sourceContainsTimestamp; // When working with timestamps, mute the frames that // do not have a timestamp bool muteNoTimestamps; - struct UHDWorkerFrameData frame0; - struct UHDWorkerFrameData frame1; - size_t bufsize; // in bytes + ThreadsafeQueue<UHDWorkerFrameData> frames; // If we want to verify loss of refclk bool check_refclk_loss; @@ -128,9 +127,6 @@ struct UHDWorkerData { // muting set by remote control bool muting; - // A barrier to synchronise the two threads - std::shared_ptr<boost::barrier> sync_barrier; - // What to do when the reference clock PLL loses lock refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour; @@ -234,9 +230,7 @@ class OutputUHD: public ModOutput, public RemoteControllable { const char* name() { return "OutputUHD"; } - void setETIReader(EtiReader *etiReader) { - myEtiReader = etiReader; - } + void setETIReader(EtiReader *etiReader); /*********** REMOTE CONTROL ***************/ /* virtual void enrol_at(BaseRemoteController& controller) @@ -264,7 +258,6 @@ class OutputUHD: public ModOutput, public RemoteControllable { bool first_run; bool gps_fix_verified; struct UHDWorkerData uwd; - int activebuffer; private: // Resize the internal delay buffer according to the dabMode and |