diff options
Diffstat (limited to 'src/OutputUHD.cpp')
-rw-r--r-- | src/OutputUHD.cpp | 137 |
1 files changed, 6 insertions, 131 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index e3d2d77..4776965 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -45,9 +45,7 @@ typedef std::complex<float> complexf; OutputUHD::OutputUHD( OutputUHDConfig& config, - Logger& logger, - zmq::context_t *pContext, - const std::string &zmqCtrlEndpoint) : + Logger& logger) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), myLogger(logger), @@ -56,7 +54,7 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), activebuffer(1), - m_delayBuf(196608) + myDelayBuf(196608) { myMuting = 0; // is remote-controllable @@ -227,14 +225,6 @@ OutputUHD::OutputUHD( worker.start(&uwd); - m_pZmqRepThread = NULL; - if (!zmqCtrlEndpoint.empty()) - { - m_pContext = pContext; - m_zmqCtrlEndpoint = zmqCtrlEndpoint; - m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this)); - } - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -242,12 +232,6 @@ OutputUHD::OutputUHD( OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); - if (m_pZmqRepThread != NULL) - { - m_pZmqRepThread->interrupt(); - m_pZmqRepThread->join(); - } - worker.stop(); if (!first_run) { free(uwd.frame0.buf); @@ -315,13 +299,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) if (activebuffer == 0) { uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; // copy remain from delaybuf - memcpy(pTmp, &m_delayBuf[0], noByteDelay); + memcpy(pTmp, &myDelayBuf[0], noByteDelay); // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); // copy remaining data to delay buf - memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - //memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame0.ts = ts; uwd.frame0.fct = myEtiReader->getFCT(); @@ -329,13 +311,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) else if (activebuffer == 1) { uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; // copy remain from delaybuf - memcpy(pTmp, &m_delayBuf[0], noByteDelay); + memcpy(pTmp, &myDelayBuf[0], noByteDelay); // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); // copy remaining data to delay buf - memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - //memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize); + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame1.ts = ts; uwd.frame1.fct = myEtiReader->getFCT(); @@ -683,108 +663,3 @@ const string OutputUHD::get_parameter(const string& parameter) const return ss.str(); } -void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector<std::string> &message) -{ - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) - { - zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } -} - -void OutputUHD::SendOkReply(zmq::socket_t *pSocket) -{ - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); -} - -void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error) -{ - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); -} - -//TODO: Should be implemented as an alternative to RemoteControllerTelnet and -//moved to the RemoteControl.h/cpp file instead. -void OutputUHD::ZmqCtrl() -{ - // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(*m_pContext, ZMQ_REP); - std::cout << "Starting output UHD control thread" << std::endl; - try - { - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_zmqCtrlEndpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for(;;) - { - zmq::poll(pollItems, 1, 100); - std::vector<std::string> msg; - if (pollItems[0].revents & ZMQ_POLLIN) - { - RecvAll(&repSocket, msg); - std::string module((char*)msg[0].data(), msg[0].size()); - if (module == "uhd") - { - if (msg.size() < 2) - { - SendFailReply(&repSocket, "Wrong request format"); - continue; - } - - std::string param((char*) msg[1].data(), msg[1].size()); - if (msg.size() == 2 && param == "ping") - { - SendOkReply(&repSocket); - } - else if (msg.size() != 3) - { - SendFailReply(&repSocket, "Wrong request format"); - } - else - { - std::string value((char*) msg[2].data(), msg[2].size()); - try - { - set_parameter(param, value); - SendOkReply(&repSocket); - } - catch (ParameterError &err) - { - SendFailReply(&repSocket, err.what()); - } - } - } - } - - // check if thread is interrupted - boost::this_thread::interruption_point(); - } - } - catch (boost::thread_interrupted&) {} - catch (zmq::error_t &e) - { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; - } - repSocket.close(); -} |