From d9ef93e7e8cc94da627f39f3c443b9a845ccecf4 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 May 2016 15:05:02 +0200 Subject: Refactor OutputUHD, replace double buffering by queue --- src/OutputUHD.cpp | 172 +++++++++++++++++++++--------------------------------- 1 file changed, 66 insertions(+), 106 deletions(-) (limited to 'src/OutputUHD.cpp') diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index fab1fd1..b9f800e 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -122,7 +122,6 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), gps_fix_verified(false), - activebuffer(1), myDelayBuf(0) { @@ -237,8 +236,6 @@ OutputUHD::OutputUHD( uwd.myUsrp = myUsrp; #endif - uwd.frame0.ts.timestamp_valid = false; - uwd.frame1.ts.timestamp_valid = false; uwd.sampleRate = myConf.sampleRate; uwd.sourceContainsTimestamp = false; uwd.muteNoTimestamps = myConf.muteNoTimestamps; @@ -265,10 +262,6 @@ OutputUHD::OutputUHD( SetDelayBuffer(myConf.dabMode); - auto b = std::make_shared(2); - mySyncBarrier = b; - uwd.sync_barrier = b; - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -277,10 +270,12 @@ OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); worker.stop(); - if (!first_run) { - free(uwd.frame0.buf); - free(uwd.frame1.buf); - } +} + + +void OutputUHD::setETIReader(EtiReader *etiReader) +{ + myEtiReader = etiReader; } int transmission_frame_duration_ms(unsigned int dabMode) @@ -311,15 +306,8 @@ void OutputUHD::SetDelayBuffer(unsigned int dabMode) int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) { - struct frame_timestamp ts; - uwd.muting = myConf.muting; - - // On the first call, we must do some allocation and we must fill - // the first buffer - // We will only wait on the barrier on the subsequent calls to - // OutputUHD::process if (not gps_fix_verified) { if (uwd.check_gpsfix) { initial_gps_check(); @@ -336,45 +324,34 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) myConf.muting = false; } } - else if (first_run) { - etiLog.level(debug) << "OutputUHD: UHD initialising..."; - - worker.start(&uwd); - - uwd.bufsize = dataIn->getLength(); - uwd.frame0.buf = malloc(uwd.bufsize); - uwd.frame1.buf = malloc(uwd.bufsize); - - uwd.sourceContainsTimestamp = myConf.enableSync && - myEtiReader->sourceContainsTimestamp(); - - // The worker begins by transmitting buf0 - memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); + else { + if (first_run) { + etiLog.level(debug) << "OutputUHD: UHD initialising..."; + + uwd.sourceContainsTimestamp = myConf.enableSync && + myEtiReader->sourceContainsTimestamp(); + + switch (myEtiReader->getMode()) { + case 1: uwd.fct_increment = 4; break; + case 2: + case 3: uwd.fct_increment = 1; break; + case 4: uwd.fct_increment = 2; break; + default: break; + } - myEtiReader->calculateTimestamp(ts); - uwd.frame0.ts = ts; + // we only set the delay buffer from the dab mode signaled in ETI if the + // dab mode was not set in contructor + if (myTFDurationMs == 0) { + SetDelayBuffer(myEtiReader->getMode()); + } - switch (myEtiReader->getMode()) { - case 1: uwd.fct_increment = 4; break; - case 2: - case 3: uwd.fct_increment = 1; break; - case 4: uwd.fct_increment = 2; break; - default: break; - } + worker.start(&uwd); - // we only set the delay buffer from the dab mode signaled in ETI if the - // dab mode was not set in contructor - if (myTFDurationMs == 0) { - SetDelayBuffer(myEtiReader->getMode()); + lastLen = dataIn->getLength(); + first_run = false; + etiLog.level(debug) << "OutputUHD: UHD initialising complete"; } - activebuffer = 1; - - lastLen = uwd.bufsize; - first_run = false; - etiLog.level(debug) << "OutputUHD: UHD initialising complete"; - } - else { if (lastLen != dataIn->getLength()) { // I expect that this never happens. @@ -394,7 +371,31 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } } - mySyncBarrier.get()->wait(); + // Prepare the frame for the worker + UHDWorkerFrameData frame; + frame.buf.resize(dataIn->getLength()); + + // calculate delay and fill buffer + uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000; + uint32_t noByteDelay = noSampleDelay * sizeof(complexf); + + const uint8_t* pInData = (uint8_t*)dataIn->getData(); + + uint8_t *pTmp = &frame.buf[0]; + if (noByteDelay) { + // copy remain from delaybuf + memcpy(pTmp, &myDelayBuf[0], noByteDelay); + // copy new data + memcpy(&pTmp[noByteDelay], pInData, dataIn->getLength() - noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[dataIn->getLength() - noByteDelay], noByteDelay); + } + else { + std::copy(pInData, pInData + dataIn->getLength(), + frame.buf.begin()); + } + + myEtiReader->calculateTimestamp(frame.ts); if (!uwd.running) { worker.stop(); @@ -409,45 +410,20 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } } - // write into the our buffer while - // the worker sends the other. - - myEtiReader->calculateTimestamp(ts); uwd.sourceContainsTimestamp = myConf.enableSync && myEtiReader->sourceContainsTimestamp(); - // calculate delay - uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000; - uint32_t noByteDelay = noSampleDelay * sizeof(complexf); - - uint8_t* pInData = (uint8_t*) dataIn->getData(); - if (activebuffer == 0) { - uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; - // copy remain from delaybuf - memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data - memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - uwd.frame0.ts = ts; - } - else if (activebuffer == 1) { - uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; - // copy remain from delaybuf - memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data - memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); + while (true) { + if (uwd.frames.size() > FRAMES_MAX_SIZE) { + usleep(10000); // 10ms + } - uwd.frame1.ts = ts; + uwd.frames.push(frame); + break; } - - activebuffer = (activebuffer + 1) % 2; } - return uwd.bufsize; + return dataIn->getLength(); } @@ -637,7 +613,6 @@ void UHDWorker::process_errhandler() } uwd->running = false; - uwd->sync_barrier.get()->wait(); etiLog.level(warn) << "UHD worker terminated"; } @@ -666,25 +641,10 @@ void UHDWorker::process() md.has_time_spec = false; md.time_spec = uhd::time_spec_t(0.0); - /* Wait for barrier */ - // this wait will hopefully always be the second one - // because modulation should be quicker than transmission - uwd->sync_barrier.get()->wait(); - - struct UHDWorkerFrameData* frame; - - if (workerbuffer == 0) { - frame = &(uwd->frame0); - } - else if (workerbuffer == 1) { - frame = &(uwd->frame1); - } - else { - throw std::runtime_error( - "UHDWorker.process: workerbuffer is neither 0 nor 1 !"); - } + struct UHDWorkerFrameData frame; + uwd->frames.wait_and_pop(frame); - handle_frame(frame); + handle_frame(&frame); // swap buffers workerbuffer = (workerbuffer + 1) % 2; @@ -820,8 +780,8 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame) { const double tx_timeout = 20.0; - const size_t sizeIn = uwd->bufsize / sizeof(complexf); - const complexf* in_data = reinterpret_cast(frame->buf); + const size_t sizeIn = frame->buf.size() / sizeof(complexf); + const complexf* in_data = reinterpret_cast(&frame->buf[0]); #if FAKE_UHD == 0 size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); -- cgit v1.2.3