From f4e359f774eef5ec2a006a431a546e915b27f02b Mon Sep 17 00:00:00 2001 From: Jörgen Scott Date: Tue, 16 Dec 2014 09:23:31 +0100 Subject: Added static delay via telnet control --- src/OutputUHD.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/OutputUHD.h') diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 69e5b20..f50807d 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -48,6 +48,7 @@ DESCRIPTION: #include #include #include +#include #include "Log.h" #include "ModOutput.h" @@ -220,7 +221,9 @@ class OutputUHD: public ModOutput, public RemoteControllable { // muting can only be changed using the remote control bool myMuting; - + int myStaticDelay; + std::vector m_delayBuf; + size_t lastLen; }; -- cgit v1.2.3 From 7e3e3f290e9fbbd314919474ed7bc61c3ce43041 Mon Sep 17 00:00:00 2001 From: Jörgen Scott Date: Tue, 16 Dec 2014 14:49:57 +0100 Subject: added zmq controller to uhd --- src/.DabMod.cpp.un~ | Bin 30089 -> 0 bytes src/.OutputUHD.cpp.un~ | Bin 67312 -> 0 bytes src/.OutputUHD.h.un~ | Bin 13690 -> 0 bytes src/DabMod.cpp | 10 +++-- src/OutputUHD.cpp | 113 ++++++++++++++++++++++++++++++++++++++++++++++++- src/OutputUHD.h | 20 ++++++++- 6 files changed, 137 insertions(+), 6 deletions(-) delete mode 100644 src/.DabMod.cpp.un~ delete mode 100644 src/.OutputUHD.cpp.un~ delete mode 100644 src/.OutputUHD.h.un~ (limited to 'src/OutputUHD.h') diff --git a/src/.DabMod.cpp.un~ b/src/.DabMod.cpp.un~ deleted file mode 100644 index ebbb822..0000000 Binary files a/src/.DabMod.cpp.un~ and /dev/null differ diff --git a/src/.OutputUHD.cpp.un~ b/src/.OutputUHD.cpp.un~ deleted file mode 100644 index 96f080d..0000000 Binary files a/src/.OutputUHD.cpp.un~ and /dev/null differ diff --git a/src/.OutputUHD.h.un~ b/src/.OutputUHD.h.un~ deleted file mode 100644 index 5f3e54a..0000000 Binary files a/src/.OutputUHD.h.un~ and /dev/null differ diff --git a/src/DabMod.cpp b/src/DabMod.cpp index ee21ed4..4342522 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef HAVE_NETINET_IN_H # include @@ -189,6 +190,9 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif + zmq::context_t zmqCtrlContext(1); + std::string zmqCtrlEndpoint = ""; + // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; modconf.use_offset_file = false; @@ -363,8 +367,8 @@ int main(int argc, char* argv[]) } } - //std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); - //std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; + zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); + std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; // input params: if (pt.get("input.loop", 0) == 1) { @@ -701,7 +705,7 @@ int main(int argc, char* argv[]) outputuhd_conf.sampleRate = outputRate; try { - output = new OutputUHD(outputuhd_conf, logger); + output = new OutputUHD(outputuhd_conf, logger, &zmqCtrlContext, zmqCtrlEndpoint); ((OutputUHD*)output)->enrol_at(*rc); } catch (std::exception& e) { diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 8713042..6a4ccf4 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -45,7 +45,9 @@ typedef std::complex complexf; OutputUHD::OutputUHD( OutputUHDConfig& config, - Logger& logger) : + Logger& logger, + zmq::context_t *pContext, + const std::string &zmqCtrlEndpoint) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), myLogger(logger), @@ -225,6 +227,14 @@ OutputUHD::OutputUHD( worker.start(&uwd); + m_pZmqRepThread = NULL; + if (!zmqCtrlEndpoint.empty()) + { + m_pContext = pContext; + m_zmqCtrlEndpoint = zmqCtrlEndpoint; + m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this)); + } + MDEBUG("OutputUHD:UHD ready.\n"); } @@ -232,6 +242,12 @@ OutputUHD::OutputUHD( OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); + if (m_pZmqRepThread != NULL) + { + m_pZmqRepThread->interrupt(); + m_pZmqRepThread->join(); + } + worker.stop(); if (!first_run) { free(uwd.frame0.buf); @@ -667,3 +683,98 @@ const string OutputUHD::get_parameter(const string& parameter) const return ss.str(); } +void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector &message) +{ + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } +} + +void OutputUHD::SendOkReply(zmq::socket_t *pSocket) +{ + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); +} + +void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error) +{ + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); +} + +//TODO: Should be implemented as an alternative to RemoteControllerTelnet and +//moved to the RemoteControl.h/cpp file instead. +void OutputUHD::ZmqCtrl() +{ + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(*m_pContext, ZMQ_REP); + std::cout << "Starting output UHD control thread" << std::endl; + try + { + // connect the socket + int hwm = 5; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.connect(m_zmqCtrlEndpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + RecvAll(&repSocket, msg); + std::string module((char*)msg[0].data(), msg[0].size()); + if (module == "uhd") + { + if (msg.size() != 3) + { + SendFailReply(&repSocket, "Wrong request format"); + continue; + } + + std::string param((char*) msg[1].data(), msg[1].size()); + std::string value((char*) msg[2].data(), msg[2].size()); + try + { + set_parameter(param, value); + } + catch (ParameterError &err) + { + SendFailReply(&repSocket, err.what()); + continue; + } + SendOkReply(&repSocket); + } + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } + repSocket.close(); +} diff --git a/src/OutputUHD.h b/src/OutputUHD.h index f50807d..25f7476 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -181,9 +181,12 @@ struct OutputUHDConfig { class OutputUHD: public ModOutput, public RemoteControllable { public: + OutputUHD( OutputUHDConfig& config, - Logger& logger); + Logger& logger, + zmq::context_t *pContext, + const std::string &zmqCtrlEndpoint); ~OutputUHD(); int process(Buffer* dataIn, Buffer* dataOut); @@ -221,9 +224,22 @@ class OutputUHD: public ModOutput, public RemoteControllable { // muting can only be changed using the remote control bool myMuting; + + private: + // zmq receiving method + //TODO: Should be implemented as an alternative to RemoteControllerTelnet and + //moved to the RemoteControl.h/cpp file instead. + void ZmqCtrl(void); + void RecvAll(zmq::socket_t* pSocket, std::vector &message); + void SendOkReply(zmq::socket_t *pSocket); + void SendFailReply(zmq::socket_t *pSocket, const std::string &error); + + // data int myStaticDelay; std::vector m_delayBuf; - + zmq::context_t *m_pContext; + std::string m_zmqCtrlEndpoint; + boost::thread *m_pZmqRepThread; size_t lastLen; }; -- cgit v1.2.3 From ba790cba2f7b48dd66f4418de0b7b366422926b0 Mon Sep 17 00:00:00 2001 From: Jörgen Scott Date: Tue, 13 Jan 2015 11:27:39 +0100 Subject: added zmq remote control --- doc/example.ini | 22 ++++++++ src/DabMod.cpp | 45 ++++++++++------ src/DabModulator.cpp | 8 +-- src/DabModulator.h | 4 +- src/OutputUHD.cpp | 137 +++--------------------------------------------- src/OutputUHD.h | 18 +------ src/RemoteControl.cpp | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/RemoteControl.h | 131 +++++++++++++++++++++++++++++++++++++++++++++- 8 files changed, 336 insertions(+), 171 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/doc/example.ini b/doc/example.ini index 49f6eda..9a80eeb 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -8,6 +8,28 @@ telnet=1 telnetport=2121 +; Enable zmq remote control. +; The zmq remote control is intended for machine-to-machine +; integration and requires that the odr-mod is build with zmq support. +; The zmq remote control may run in parallell with Telnet. +; Protocol: +; The odr-dabmod binds a zmq rep socket so clients must connect +; using either req or dealer socket. +; [] denotes message part as zmq multi-part message are used for delimitation. +; All message parts are utf-8 encoded strings and matches the Telnet command set. +; Explicit codes are denoted with "". +; The following commands are supported: +; REQ: ["ping"] +; REP: ["ok"] +; +; REQ: ["get"][module name][parameter] +; REP: [value] _OR_ ["fail"][error description] +; +; REQ: ["set"][module name][parameter][value] +; REP: ["ok"] _OR_ ["fail"][error description] +zmqctrl=1 +zmqctrlendpoint=tcp://127.0.0.1:9400 + [log] ; Write to a logfile or to syslog. ; Setting filename to stderr is very useful during tests and development diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 4342522..dadade9 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -55,7 +55,7 @@ #include #include #include -#include +//#include #ifdef HAVE_NETINET_IN_H # include @@ -190,8 +190,8 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif - zmq::context_t zmqCtrlContext(1); - std::string zmqCtrlEndpoint = ""; + //zmq::context_t zmqCtrlContext(1); + //std::string zmqCtrlEndpoint = ""; // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; @@ -204,7 +204,8 @@ int main(int argc, char* argv[]) InputMemory* input = NULL; ModOutput* output = NULL; - BaseRemoteController* rc = NULL; + //BaseRemoteController* rc = NULL; + RemoteControllers rcs; Logger logger; InputFileReader inputFileReader(logger); @@ -358,7 +359,7 @@ int main(int argc, char* argv[]) try { int telnetport = pt.get("remotecontrol.telnetport"); RemoteControllerTelnet* telnetrc = new RemoteControllerTelnet(telnetport); - rc = telnetrc; + rcs.add_controller(telnetrc); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; @@ -367,8 +368,22 @@ int main(int argc, char* argv[]) } } - zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); - std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; +#if defined(HAVE_INPUT_ZEROMQ) + if (pt.get("remotecontrol.zmqctrl", 0) == 1) { + try { + std::string zmqCtrlEndpoint = + pt.get("remotecontrol.zmqctrlendpoint", ""); + std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; + RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint); + rcs.add_controller(zmqrc); + } + catch (std::exception &e) { + std::cerr << "Error: " << e.what() << "\n"; + std::cerr << " zmq remote control enabled, but no endpoint defined.\n"; + goto END_MAIN; + } + } +#endif // input params: if (pt.get("input.loop", 0) == 1) { @@ -570,9 +585,9 @@ int main(int argc, char* argv[]) outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1); #endif } - if (!rc) { + if (rcs.get_no_controllers() == 0) { logger.level(warn) << "No Remote-Control started"; - rc = new RemoteControllerDummy(); + rcs.add_controller(new RemoteControllerDummy()); } @@ -705,8 +720,8 @@ int main(int argc, char* argv[]) outputuhd_conf.sampleRate = outputRate; try { - output = new OutputUHD(outputuhd_conf, logger, &zmqCtrlContext, zmqCtrlEndpoint); - ((OutputUHD*)output)->enrol_at(*rc); + output = new OutputUHD(outputuhd_conf, logger/*, &zmqCtrlContext, zmqCtrlEndpoint*/); + ((OutputUHD*)output)->enrol_at(rcs); } catch (std::exception& e) { logger.level(error) << "UHD initialisation failed:" << e.what(); @@ -718,7 +733,7 @@ int main(int argc, char* argv[]) flowgraph = new Flowgraph(); data.setLength(6144); input = new InputMemory(&data); - modulator = new DabModulator(modconf, rc, logger, outputRate, clockRate, + modulator = new DabModulator(modconf, &rcs, logger, outputRate, clockRate, dabMode, gainMode, digitalgain, normalise, filterTapsFilename); flowgraph->connect(input, modulator); flowgraph->connect(modulator, output); @@ -757,10 +772,8 @@ int main(int argc, char* argv[]) /* Check every once in a while if the remote control * is still working */ - if (rc && (frame % 250) == 0 && rc->fault_detected()) { - fprintf(stderr, - "Detected Remote Control fault, restarting it\n"); - rc->restart(); + if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { + rcs.check_faults(); } } if (framesize == 0) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 7f246d8..2664a08 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -53,7 +53,7 @@ DabModulator::DabModulator( struct modulator_offset_config& modconf, - BaseRemoteController* rc, + RemoteControllers* rcs, Logger& logger, unsigned outputRate, unsigned clockRate, unsigned dabMode, GainMode gainMode, @@ -71,7 +71,7 @@ DabModulator::DabModulator( myEtiReader(EtiReader(modconf, myLogger)), myFlowgraph(NULL), myFilterTapsFilename(filterTapsFilename), - myRC(rc) + myRCs(rcs) { PDEBUG("DabModulator::DabModulator(%u, %u, %u, %u) @ %p\n", outputRate, clockRate, dabMode, gainMode, this); @@ -201,13 +201,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing); cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise); - cifGain->enrol_at(*myRC); + cifGain->enrol_at(*myRCs); cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing, myNullSize, mySymSize); if (myFilterTapsFilename != "") { cifFilter = new FIRFilter(myFilterTapsFilename); - cifFilter->enrol_at(*myRC); + cifFilter->enrol_at(*myRCs); } myOutput = new OutputMemory(); diff --git a/src/DabModulator.h b/src/DabModulator.h index 21f9f61..84c9926 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -47,7 +47,7 @@ class DabModulator : public ModCodec public: DabModulator( struct modulator_offset_config& modconf, - BaseRemoteController* rc, + RemoteControllers* rcs, Logger& logger, unsigned outputRate = 2048000, unsigned clockRate = 0, unsigned dabMode = 0, GainMode gainMode = GAIN_VAR, @@ -77,7 +77,7 @@ protected: Flowgraph* myFlowgraph; OutputMemory* myOutput; std::string myFilterTapsFilename; - BaseRemoteController* myRC; + RemoteControllers* myRCs; size_t myNbSymbols; size_t myNbCarriers; diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index e3d2d77..4776965 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -45,9 +45,7 @@ typedef std::complex complexf; OutputUHD::OutputUHD( OutputUHDConfig& config, - Logger& logger, - zmq::context_t *pContext, - const std::string &zmqCtrlEndpoint) : + Logger& logger) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), myLogger(logger), @@ -56,7 +54,7 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), activebuffer(1), - m_delayBuf(196608) + myDelayBuf(196608) { myMuting = 0; // is remote-controllable @@ -227,14 +225,6 @@ OutputUHD::OutputUHD( worker.start(&uwd); - m_pZmqRepThread = NULL; - if (!zmqCtrlEndpoint.empty()) - { - m_pContext = pContext; - m_zmqCtrlEndpoint = zmqCtrlEndpoint; - m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this)); - } - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -242,12 +232,6 @@ OutputUHD::OutputUHD( OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); - if (m_pZmqRepThread != NULL) - { - m_pZmqRepThread->interrupt(); - m_pZmqRepThread->join(); - } - worker.stop(); if (!first_run) { free(uwd.frame0.buf); @@ -315,13 +299,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) if (activebuffer == 0) { uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; // copy remain from delaybuf - memcpy(pTmp, &m_delayBuf[0], noByteDelay); + memcpy(pTmp, &myDelayBuf[0], noByteDelay); // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); // copy remaining data to delay buf - memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - //memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame0.ts = ts; uwd.frame0.fct = myEtiReader->getFCT(); @@ -329,13 +311,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) else if (activebuffer == 1) { uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; // copy remain from delaybuf - memcpy(pTmp, &m_delayBuf[0], noByteDelay); + memcpy(pTmp, &myDelayBuf[0], noByteDelay); // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); // copy remaining data to delay buf - memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); - - //memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize); + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame1.ts = ts; uwd.frame1.fct = myEtiReader->getFCT(); @@ -683,108 +663,3 @@ const string OutputUHD::get_parameter(const string& parameter) const return ss.str(); } -void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector &message) -{ - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) - { - zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } -} - -void OutputUHD::SendOkReply(zmq::socket_t *pSocket) -{ - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); -} - -void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error) -{ - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); -} - -//TODO: Should be implemented as an alternative to RemoteControllerTelnet and -//moved to the RemoteControl.h/cpp file instead. -void OutputUHD::ZmqCtrl() -{ - // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(*m_pContext, ZMQ_REP); - std::cout << "Starting output UHD control thread" << std::endl; - try - { - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_zmqCtrlEndpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for(;;) - { - zmq::poll(pollItems, 1, 100); - std::vector msg; - if (pollItems[0].revents & ZMQ_POLLIN) - { - RecvAll(&repSocket, msg); - std::string module((char*)msg[0].data(), msg[0].size()); - if (module == "uhd") - { - if (msg.size() < 2) - { - SendFailReply(&repSocket, "Wrong request format"); - continue; - } - - std::string param((char*) msg[1].data(), msg[1].size()); - if (msg.size() == 2 && param == "ping") - { - SendOkReply(&repSocket); - } - else if (msg.size() != 3) - { - SendFailReply(&repSocket, "Wrong request format"); - } - else - { - std::string value((char*) msg[2].data(), msg[2].size()); - try - { - set_parameter(param, value); - SendOkReply(&repSocket); - } - catch (ParameterError &err) - { - SendFailReply(&repSocket, err.what()); - } - } - } - } - - // check if thread is interrupted - boost::this_thread::interruption_point(); - } - } - catch (boost::thread_interrupted&) {} - catch (zmq::error_t &e) - { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; - } - repSocket.close(); -} diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 25f7476..60dfc65 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -48,7 +48,6 @@ DESCRIPTION: #include #include #include -#include #include "Log.h" #include "ModOutput.h" @@ -184,9 +183,7 @@ class OutputUHD: public ModOutput, public RemoteControllable { OutputUHD( OutputUHDConfig& config, - Logger& logger, - zmq::context_t *pContext, - const std::string &zmqCtrlEndpoint); + Logger& logger); ~OutputUHD(); int process(Buffer* dataIn, Buffer* dataOut); @@ -226,20 +223,9 @@ class OutputUHD: public ModOutput, public RemoteControllable { bool myMuting; private: - // zmq receiving method - //TODO: Should be implemented as an alternative to RemoteControllerTelnet and - //moved to the RemoteControl.h/cpp file instead. - void ZmqCtrl(void); - void RecvAll(zmq::socket_t* pSocket, std::vector &message); - void SendOkReply(zmq::socket_t *pSocket); - void SendFailReply(zmq::socket_t *pSocket, const std::string &error); - // data int myStaticDelay; - std::vector m_delayBuf; - zmq::context_t *m_pContext; - std::string m_zmqCtrlEndpoint; - boost::thread *m_pZmqRepThread; + std::vector myDelayBuf; size_t lastLen; }; diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 5bbd2f8..c7c5914 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -246,3 +246,145 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) ignored_error); } + +#if defined(HAVE_INPUT_ZEROMQ) + +void RemoteControllerZmq::restart() +{ + m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ + m_running = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + + m_child_thread = boost::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector &message) +{ + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) +{ + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) +{ + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + std::cout << "Starting zmq remote control thread" << std::endl; + try + { + // connect the socket + int hwm = 100; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.bind(m_endpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + recv_all(&repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); + + if (msg.size() == 1 && command == "ping") + { + send_ok_reply(&repSocket); + } + else if (msg.size() == 3 && command == "get") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + + try + { + std::string value = get_param_(module, parameter); + zmq::message_t *pMsg = new zmq::message_t(value.size()); + memcpy ((void*) pMsg->data(), value.data(), value.size()); + repSocket.send(*pMsg, 0); + delete pMsg; + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else if (msg.size() == 4 && command == "set") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + std::string value((char*) msg[3].data(), msg[3].size()); + + try + { + set_param_(module, parameter, value); + send_ok_reply(&repSocket); + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else + send_fail_reply(&repSocket, "Unsupported command"); + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } + catch (std::exception& e) + { + std::cerr << "Remote control caught exception: " << e.what() << std::endl; + m_fault = true; + } + repSocket.close(); +} +#endif diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 09e7492..7c830b2 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -29,6 +29,14 @@ #ifndef _REMOTECONTROL_H #define _REMOTECONTROL_H +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_INPUT_ZEROMQ) +#include +#endif + #include #include #include @@ -85,6 +93,42 @@ class BaseRemoteController { virtual ~BaseRemoteController() {} }; +class RemoteControllers { +/* Holds all our remote controllers, i.e. we may have more than + * one type of controller running. +*/ + public: + RemoteControllers() {} + virtual ~RemoteControllers() {} + + void add_controller(BaseRemoteController *rc) { + m_controllers.push_back(rc); + } + + void add_controllable(RemoteControllable *rc) { + for (std::list::iterator it = m_controllers.begin(); + it != m_controllers.end(); ++it) { + (*it)->enrol(rc); + } + } + + void check_faults() { + for (std::list::iterator it = m_controllers.begin(); + it != m_controllers.end(); ++it) { + if ((*it)->fault_detected()) + { + fprintf(stderr, + "Detected Remote Control fault, restarting it\n"); + (*it)->restart(); + } + } + } + size_t get_no_controllers() { return m_controllers.size(); } + + private: + std::list m_controllers; +}; + /* Objects that support remote control must implement the following class */ class RemoteControllable { public: @@ -100,8 +144,8 @@ class RemoteControllable { virtual std::string get_rc_name() const { return m_name; } /* Tell the controllable to enrol at the given controller */ - virtual void enrol_at(BaseRemoteController& controller) { - controller.enrol(this); + virtual void enrol_at(RemoteControllers& controllers) { + controllers.add_controllable(this); } /* Return a list of possible parameters that can be set */ @@ -254,6 +298,89 @@ class RemoteControllerTelnet : public BaseRemoteController { int m_port; }; +#if defined(HAVE_INPUT_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { + public: + RemoteControllerZmq() + : m_running(false), m_fault(false), + m_zmqContext(1), + m_endpoint("") { } + + RemoteControllerZmq(std::string endpoint) + : m_running(true), m_fault(false), + m_child_thread(&RemoteControllerZmq::process, this), + m_zmqContext(1), + m_endpoint(endpoint) + { } + + ~RemoteControllerZmq() { + m_running = false; + m_fault = false; + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + } + + void enrol(RemoteControllable* controllable) { + m_cohort.push_back(controllable); + } + + virtual bool fault_detected() { return m_fault; } + + virtual void restart(); + + private: + void restart_thread(); + + void recv_all(zmq::socket_t* pSocket, std::vector &message); + void send_ok_reply(zmq::socket_t *pSocket); + void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); + void process(); + + + RemoteControllerZmq& operator=(const RemoteControllerZmq& other); + RemoteControllerZmq(const RemoteControllerZmq& other); + + RemoteControllable* get_controllable_(std::string name) { + for (std::list::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + if ((*it)->get_rc_name() == name) + { + return *it; + } + } + throw ParameterError("Module name unknown"); + } + + std::string get_param_(std::string name, std::string param) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter(param); + } + + void set_param_(std::string name, std::string param, std::string value) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); + } + + bool m_running; + + /* This is set to true if a fault occurred */ + bool m_fault; + boost::thread m_restarter_thread; + + boost::thread m_child_thread; + + /* This controller commands the controllables in the cohort */ + std::list m_cohort; + + zmq::context_t m_zmqContext; + std::string m_endpoint; +}; +#endif /* The Dummy remote controller does nothing, and never fails */ -- cgit v1.2.3 From d82422fbb3d9d34a0566197245376548ce3ef14e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 23 Jan 2015 10:15:34 +0100 Subject: Code indentation, minor corrections --- doc/example.ini | 13 ++-- src/DabMod.cpp | 14 +--- src/OutputUHD.cpp | 56 +++++++------- src/OutputUHD.h | 8 +- src/RemoteControl.cpp | 197 +++++++++++++++++++++++++------------------------- src/RemoteControl.h | 64 ++++++++-------- 6 files changed, 172 insertions(+), 180 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/doc/example.ini b/doc/example.ini index 9a80eeb..ecb7440 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -9,14 +9,15 @@ telnet=1 telnetport=2121 ; Enable zmq remote control. -; The zmq remote control is intended for machine-to-machine -; integration and requires that the odr-mod is build with zmq support. -; The zmq remote control may run in parallell with Telnet. +; The zmq remote control is intended for machine-to-machine +; integration and requires that ODR-DabMod is built with zmq support. +; The zmq remote control may run in parallel with Telnet. +; ; Protocol: -; The odr-dabmod binds a zmq rep socket so clients must connect +; ODR-DabMod binds a zmq rep socket so clients must connect ; using either req or dealer socket. ; [] denotes message part as zmq multi-part message are used for delimitation. -; All message parts are utf-8 encoded strings and matches the Telnet command set. +; All message parts are utf-8 encoded strings and match the Telnet command set. ; Explicit codes are denoted with "". ; The following commands are supported: ; REQ: ["ping"] @@ -163,7 +164,7 @@ channel=13C ; The reference clock to use. ; possible values : internal, external, MIMO -refclk_source=external +refclk_source=internal ; The reference one pulse-per second to use ; possible values : none, external, MIMO diff --git a/src/DabMod.cpp b/src/DabMod.cpp index dadade9..ea6334f 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -55,7 +55,6 @@ #include #include #include -//#include #ifdef HAVE_NETINET_IN_H # include @@ -190,9 +189,6 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif - //zmq::context_t zmqCtrlContext(1); - //std::string zmqCtrlEndpoint = ""; - // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; modconf.use_offset_file = false; @@ -204,7 +200,6 @@ int main(int argc, char* argv[]) InputMemory* input = NULL; ModOutput* output = NULL; - //BaseRemoteController* rc = NULL; RemoteControllers rcs; Logger logger; @@ -371,9 +366,8 @@ int main(int argc, char* argv[]) #if defined(HAVE_INPUT_ZEROMQ) if (pt.get("remotecontrol.zmqctrl", 0) == 1) { try { - std::string zmqCtrlEndpoint = - pt.get("remotecontrol.zmqctrlendpoint", ""); - std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; + std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); + std::cerr << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint); rcs.add_controller(zmqrc); } @@ -720,7 +714,7 @@ int main(int argc, char* argv[]) outputuhd_conf.sampleRate = outputRate; try { - output = new OutputUHD(outputuhd_conf, logger/*, &zmqCtrlContext, zmqCtrlEndpoint*/); + output = new OutputUHD(outputuhd_conf, logger); ((OutputUHD*)output)->enrol_at(rcs); } catch (std::exception& e) { @@ -773,7 +767,7 @@ int main(int argc, char* argv[]) /* Check every once in a while if the remote control * is still working */ if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { - rcs.check_faults(); + rcs.check_faults(); } } if (framesize == 0) { diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 4776965..c7770fa 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -54,7 +54,7 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), activebuffer(1), - myDelayBuf(196608) + myDelayBuf(196608) { myMuting = 0; // is remote-controllable @@ -233,10 +233,10 @@ OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); worker.stop(); - if (!first_run) { - free(uwd.frame0.buf); - free(uwd.frame1.buf); - } + if (!first_run) { + free(uwd.frame0.buf); + free(uwd.frame1.buf); + } } int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) @@ -291,31 +291,31 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) uwd.sourceContainsTimestamp = myConf.enableSync && myEtiReader->sourceContainsTimestamp(); - // calculate delay - uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000; - uint32_t noByteDelay = noSampleDelay * sizeof(complexf); + // calculate delay + uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000; + uint32_t noByteDelay = noSampleDelay * sizeof(complexf); - uint8_t* pInData = (uint8_t*) dataIn->getData(); + uint8_t* pInData = (uint8_t*) dataIn->getData(); if (activebuffer == 0) { - uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; - // copy remain from delaybuf + uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; + // copy remain from delaybuf memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data + // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame0.ts = ts; uwd.frame0.fct = myEtiReader->getFCT(); } else if (activebuffer == 1) { - uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; - // copy remain from delaybuf + uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; + // copy remain from delaybuf memcpy(pTmp, &myDelayBuf[0], noByteDelay); - // copy new data + // copy new data memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); - // copy remaining data to delay buf - memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame1.ts = ts; uwd.frame1.fct = myEtiReader->getFCT(); @@ -617,15 +617,15 @@ void OutputUHD::set_parameter(const string& parameter, const string& value) ss >> myMuting; } else if (parameter == "staticdelay") { - int adjust; - ss >> adjust; - int newStaticDelay = myStaticDelay + adjust; - if (newStaticDelay > 96000) - myStaticDelay = newStaticDelay - 96000; - else if (newStaticDelay < 0) - myStaticDelay = newStaticDelay + 96000; - else - myStaticDelay = newStaticDelay; + int adjust; + ss >> adjust; + int newStaticDelay = myStaticDelay + adjust; + if (newStaticDelay > 96000) + myStaticDelay = newStaticDelay - 96000; + else if (newStaticDelay < 0) + myStaticDelay = newStaticDelay + 96000; + else + myStaticDelay = newStaticDelay; } else if (parameter == "iqbalance") { ss >> myConf.frequency; diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 60dfc65..90d9d1b 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -222,10 +222,10 @@ class OutputUHD: public ModOutput, public RemoteControllable { // muting can only be changed using the remote control bool myMuting; - private: - // data - int myStaticDelay; - std::vector myDelayBuf; + private: + // data + int myStaticDelay; + std::vector myDelayBuf; size_t lastLen; }; diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index c7c5914..6f538dc 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -271,120 +271,121 @@ void RemoteControllerZmq::restart_thread() void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector &message) { - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) - { - zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } } void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) { - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); } void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) { - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); } void RemoteControllerZmq::process() { - // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - std::cout << "Starting zmq remote control thread" << std::endl; - try - { - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_endpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for(;;) - { - zmq::poll(pollItems, 1, 100); - std::vector msg; - if (pollItems[0].revents & ZMQ_POLLIN) - { - recv_all(&repSocket, msg); - std::string command((char*)msg[0].data(), msg[0].size()); - - if (msg.size() == 1 && command == "ping") - { - send_ok_reply(&repSocket); - } - else if (msg.size() == 3 && command == "get") - { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - - try - { - std::string value = get_param_(module, parameter); - zmq::message_t *pMsg = new zmq::message_t(value.size()); - memcpy ((void*) pMsg->data(), value.data(), value.size()); - repSocket.send(*pMsg, 0); - delete pMsg; - } - catch (ParameterError &err) - { - send_fail_reply(&repSocket, err.what()); - } - } - else if (msg.size() == 4 && command == "set") - { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - std::string value((char*) msg[3].data(), msg[3].size()); - - try - { - set_param_(module, parameter, value); - send_ok_reply(&repSocket); - } - catch (ParameterError &err) - { - send_fail_reply(&repSocket, err.what()); - } - } - else - send_fail_reply(&repSocket, "Unsupported command"); - } - - // check if thread is interrupted - boost::this_thread::interruption_point(); - } - } - catch (boost::thread_interrupted&) {} - catch (zmq::error_t &e) - { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; - } + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + std::cout << "Starting zmq remote control thread" << std::endl; + try + { + // connect the socket + int hwm = 100; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.bind(m_endpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + recv_all(&repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); + + if (msg.size() == 1 && command == "ping") + { + send_ok_reply(&repSocket); + } + else if (msg.size() == 3 && command == "get") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + + try + { + std::string value = get_param_(module, parameter); + zmq::message_t *pMsg = new zmq::message_t(value.size()); + memcpy ((void*) pMsg->data(), value.data(), value.size()); + repSocket.send(*pMsg, 0); + delete pMsg; + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else if (msg.size() == 4 && command == "set") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + std::string value((char*) msg[3].data(), msg[3].size()); + + try + { + set_param_(module, parameter, value); + send_ok_reply(&repSocket); + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else + send_fail_reply(&repSocket, "Unsupported command"); + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } catch (std::exception& e) { std::cerr << "Remote control caught exception: " << e.what() << std::endl; m_fault = true; } - repSocket.close(); + repSocket.close(); } #endif + diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 7c830b2..905e153 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -93,40 +93,37 @@ class BaseRemoteController { virtual ~BaseRemoteController() {} }; -class RemoteControllers { /* Holds all our remote controllers, i.e. we may have more than * one type of controller running. -*/ + */ +class RemoteControllers { public: - RemoteControllers() {} - virtual ~RemoteControllers() {} - - void add_controller(BaseRemoteController *rc) { - m_controllers.push_back(rc); - } + void add_controller(BaseRemoteController *rc) { + m_controllers.push_back(rc); + } - void add_controllable(RemoteControllable *rc) { + void add_controllable(RemoteControllable *rc) { for (std::list::iterator it = m_controllers.begin(); it != m_controllers.end(); ++it) { - (*it)->enrol(rc); - } - } - + (*it)->enrol(rc); + } + } + void check_faults() { for (std::list::iterator it = m_controllers.begin(); it != m_controllers.end(); ++it) { - if ((*it)->fault_detected()) - { - fprintf(stderr, - "Detected Remote Control fault, restarting it\n"); - (*it)->restart(); - } - } - } - size_t get_no_controllers() { return m_controllers.size(); } - - private: - std::list m_controllers; + if ((*it)->fault_detected()) + { + fprintf(stderr, + "Detected Remote Control fault, restarting it\n"); + (*it)->restart(); + } + } + } + size_t get_no_controllers() { return m_controllers.size(); } + + private: + std::list m_controllers; }; /* Objects that support remote control must implement the following class */ @@ -306,15 +303,14 @@ class RemoteControllerZmq : public BaseRemoteController { public: RemoteControllerZmq() : m_running(false), m_fault(false), - m_zmqContext(1), + m_zmqContext(1), m_endpoint("") { } RemoteControllerZmq(std::string endpoint) : m_running(true), m_fault(false), m_child_thread(&RemoteControllerZmq::process, this), - m_zmqContext(1), - m_endpoint(endpoint) - { } + m_zmqContext(1), + m_endpoint(endpoint) { } ~RemoteControllerZmq() { m_running = false; @@ -336,9 +332,9 @@ class RemoteControllerZmq : public BaseRemoteController { private: void restart_thread(); - void recv_all(zmq::socket_t* pSocket, std::vector &message); - void send_ok_reply(zmq::socket_t *pSocket); - void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); + void recv_all(zmq::socket_t* pSocket, std::vector &message); + void send_ok_reply(zmq::socket_t *pSocket); + void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); void process(); @@ -377,8 +373,8 @@ class RemoteControllerZmq : public BaseRemoteController { /* This controller commands the controllables in the cohort */ std::list m_cohort; - zmq::context_t m_zmqContext; - std::string m_endpoint; + zmq::context_t m_zmqContext; + std::string m_endpoint; }; #endif -- cgit v1.2.3 From 1ab555f832c764bd10cebeeee51d9f7ad5c4b2c6 Mon Sep 17 00:00:00 2001 From: Jörgen Scott Date: Wed, 4 Feb 2015 12:19:55 +0100 Subject: Removed magick numbers and added support for static delay in all dab modes --- src/DabMod.cpp | 1 + src/OutputUHD.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++---------- src/OutputUHD.h | 7 ++++++- 3 files changed, 51 insertions(+), 11 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 82f03e5..214231c 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -535,6 +535,7 @@ int main(int argc, char* argv[]) outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0); outputuhd_conf.frequency = pt.get("uhdoutput.frequency", 0); std::string chan = pt.get("uhdoutput.channel", ""); + outputuhd_conf.dabMode = dabMode; if (outputuhd_conf.frequency == 0 && chan == "") { std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n"; diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 741731e..91c030f 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -57,11 +57,11 @@ OutputUHD::OutputUHD( // the buffers at object initialisation. first_run(true), activebuffer(1), - myDelayBuf(196608) + myDelayBuf(0) { myMuting = 0; // is remote-controllable - myStaticDelay = 0; // is remote-controllable + myStaticDelayUs = 0; // is remote-controllable #if FAKE_UHD MDEBUG("OutputUHD:Using fake UHD output"); @@ -225,6 +225,7 @@ OutputUHD::OutputUHD( uwd.check_refclk_loss = true; } + SetDelayBuffer(config.dabMode); shared_ptr b(new barrier(2)); mySyncBarrier = b; @@ -246,6 +247,35 @@ OutputUHD::~OutputUHD() } } +void OutputUHD::SetDelayBuffer(unsigned int dabMode) +{ + // find out the duration of the transmission frame (Table 2 in ETSI 300 401) + switch (dabMode) { + case 0: // could happen when called from constructor and we take the mode from ETI + myTFDurationMs = 0; + break; + case 1: + myTFDurationMs = 96; + break; + case 2: + myTFDurationMs = 24; + break; + case 3: + myTFDurationMs = 24; + break; + case 4: + myTFDurationMs = 48; + break; + default: + throw std::runtime_error("OutPutUHD: invalid DAB mode"); + } + fprintf(stderr, "DelayBuf = %d\n", myTFDurationMs * myConf.sampleRate / 1000); + // The buffer size equals the number of samples per transmission frame so + // we calculate it by multiplying the duration of the transmission frame + // with the samplerate. + myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); +} + int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) { struct frame_timestamp ts; @@ -281,6 +311,10 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) default: break; } + // we only set the delay buffer from the dab mode signaled in ETI if the + // dab mode was not set in contructor + if (myTFDurationMs == 0) + SetDelayBuffer(myEtiReader->getMode()); activebuffer = 1; @@ -307,7 +341,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) myEtiReader->sourceContainsTimestamp(); // calculate delay - uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000; + uint32_t noSampleDelay = (myStaticDelayUs * myConf.sampleRate / 1000) / 1000; uint32_t noByteDelay = noSampleDelay * sizeof(complexf); uint8_t* pInData = (uint8_t*) dataIn->getData(); @@ -655,13 +689,13 @@ void OutputUHD::set_parameter(const string& parameter, const string& value) else if (parameter == "staticdelay") { int adjust; ss >> adjust; - int newStaticDelay = myStaticDelay + adjust; - if (newStaticDelay > 96000) - myStaticDelay = newStaticDelay - 96000; - else if (newStaticDelay < 0) - myStaticDelay = newStaticDelay + 96000; + int newStaticDelayUs = myStaticDelayUs + adjust; + if (newStaticDelayUs > myTFDurationMs * 1000) + myStaticDelayUs = newStaticDelayUs - myTFDurationMs * 1000; + else if (newStaticDelayUs < 0) + myStaticDelayUs = newStaticDelayUs + myTFDurationMs * 1000; else - myStaticDelay = newStaticDelay; + myStaticDelayUs = newStaticDelayUs; } else if (parameter == "iqbalance") { ss >> myConf.frequency; @@ -689,7 +723,7 @@ const string OutputUHD::get_parameter(const string& parameter) const ss << myMuting; } else if (parameter == "staticdelay") { - ss << myStaticDelay; + ss << myStaticDelayUs; } else { ss << "Parameter '" << parameter << diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 7eb6733..d002e98 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -171,6 +171,7 @@ struct OutputUHDConfig { double txgain; bool enableSync; bool muteNoTimestamps; + unsigned dabMode; /* allowed values : auto, int, sma, mimo */ std::string refclk_src; @@ -231,8 +232,12 @@ class OutputUHD: public ModOutput, public RemoteControllable { bool myMuting; private: + // methods + void SetDelayBuffer(unsigned int dabMode); + // data - int myStaticDelay; + int myStaticDelayUs; // static delay in microseconds + int myTFDurationMs; // TF duration in milliseconds std::vector myDelayBuf; size_t lastLen; }; -- cgit v1.2.3 From 277d29a529c37a8fe59883291e43db8ff8831b22 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Thu, 12 Feb 2015 10:20:54 +0100 Subject: indentation --- src/DabMod.cpp | 2 +- src/OutputUHD.cpp | 85 ++++++++++++++++++++++++++++--------------------------- src/OutputUHD.h | 9 +++--- 3 files changed, 49 insertions(+), 47 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 214231c..77e5da4 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -535,7 +535,7 @@ int main(int argc, char* argv[]) outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0); outputuhd_conf.frequency = pt.get("uhdoutput.frequency", 0); std::string chan = pt.get("uhdoutput.channel", ""); - outputuhd_conf.dabMode = dabMode; + outputuhd_conf.dabMode = dabMode; if (outputuhd_conf.frequency == 0 && chan == "") { std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n"; diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index bfd24a8..d033700 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -225,7 +225,7 @@ OutputUHD::OutputUHD( uwd.check_refclk_loss = true; } - SetDelayBuffer(config.dabMode); + SetDelayBuffer(config.dabMode); shared_ptr b(new barrier(2)); mySyncBarrier = b; @@ -249,30 +249,30 @@ OutputUHD::~OutputUHD() void OutputUHD::SetDelayBuffer(unsigned int dabMode) { - // find out the duration of the transmission frame (Table 2 in ETSI 300 401) + // find out the duration of the transmission frame (Table 2 in ETSI 300 401) switch (dabMode) { - case 0: // could happen when called from constructor and we take the mode from ETI - myTFDurationMs = 0; - break; - case 1: - myTFDurationMs = 96; - break; - case 2: - myTFDurationMs = 24; - break; - case 3: - myTFDurationMs = 24; - break; - case 4: - myTFDurationMs = 48; - break; - default: - throw std::runtime_error("OutPutUHD: invalid DAB mode"); + case 0: // could happen when called from constructor and we take the mode from ETI + myTFDurationMs = 0; + break; + case 1: + myTFDurationMs = 96; + break; + case 2: + myTFDurationMs = 24; + break; + case 3: + myTFDurationMs = 24; + break; + case 4: + myTFDurationMs = 48; + break; + default: + throw std::runtime_error("OutputUHD: invalid DAB mode"); } - // The buffer size equals the number of samples per transmission frame so - // we calculate it by multiplying the duration of the transmission frame - // with the samplerate. - myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); + // The buffer size equals the number of samples per transmission frame so + // we calculate it by multiplying the duration of the transmission frame + // with the samplerate. + myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); } int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) @@ -310,10 +310,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) default: break; } - // we only set the delay buffer from the dab mode signaled in ETI if the - // dab mode was not set in contructor - if (myTFDurationMs == 0) - SetDelayBuffer(myEtiReader->getMode()); + // we only set the delay buffer from the dab mode signaled in ETI if the + // dab mode was not set in contructor + if (myTFDurationMs == 0) { + SetDelayBuffer(myEtiReader->getMode()); + } activebuffer = 1; @@ -688,21 +689,21 @@ void OutputUHD::set_parameter(const string& parameter, const string& value) else if (parameter == "staticdelay") { int64_t adjust; ss >> adjust; - if (adjust > (myTFDurationMs * 1000)) - { // reset static delay for values outside range - myStaticDelayUs = 0; - } - else - { // the new adjust value is added to the existing delay and the result - // is wrapped around at TF duration - int newStaticDelayUs = myStaticDelayUs + adjust; - if (newStaticDelayUs > (myTFDurationMs * 1000)) - myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); - else if (newStaticDelayUs < 0) - myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); - else - myStaticDelayUs = newStaticDelayUs; - } + if (adjust > (myTFDurationMs * 1000)) + { // reset static delay for values outside range + myStaticDelayUs = 0; + } + else + { // the new adjust value is added to the existing delay and the result + // is wrapped around at TF duration + int newStaticDelayUs = myStaticDelayUs + adjust; + if (newStaticDelayUs > (myTFDurationMs * 1000)) + myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); + else if (newStaticDelayUs < 0) + myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); + else + myStaticDelayUs = newStaticDelayUs; + } } else if (parameter == "iqbalance") { ss >> myConf.frequency; diff --git a/src/OutputUHD.h b/src/OutputUHD.h index d002e98..c5d561b 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -171,7 +171,7 @@ struct OutputUHDConfig { double txgain; bool enableSync; bool muteNoTimestamps; - unsigned dabMode; + unsigned dabMode; /* allowed values : auto, int, sma, mimo */ std::string refclk_src; @@ -232,12 +232,13 @@ class OutputUHD: public ModOutput, public RemoteControllable { bool myMuting; private: - // methods - void SetDelayBuffer(unsigned int dabMode); + // Resize the internal delay buffer according to the dabMode and + // the sample rate. + void SetDelayBuffer(unsigned int dabMode); // data int myStaticDelayUs; // static delay in microseconds - int myTFDurationMs; // TF duration in milliseconds + int myTFDurationMs; // TF duration in milliseconds std::vector myDelayBuf; size_t lastLen; }; -- cgit v1.2.3 From b11eff2c8c913d470897e5395b240939ed46dc35 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 27 Feb 2015 13:52:54 +0100 Subject: Restart whole modulator on FCT discontinuity --- src/DabMod.cpp | 4 ++++ src/OutputUHD.cpp | 39 +++++++++++++++++++++++++++++++++------ src/OutputUHD.h | 27 ++++++++++++++++++--------- 3 files changed, 55 insertions(+), 15 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 3548f9d..2489797 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -866,6 +866,10 @@ run_modulator_state run_modulator(modulator_data& m) running = 0; ret = MOD_NORMAL_END; } + } catch (fct_discontinuity_error& e) { + // The OutputUHD saw a FCT discontinuity + fprintf(stderr, "Stream discontinuity\n"); + ret = MOD_AGAIN; } catch (std::overflow_error& e) { // The ZeroMQ input has overflowed its buffer fprintf(stderr, "overflow error: %s\n", e.what()); diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 54acf40..a877161 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -230,8 +230,6 @@ OutputUHD::OutputUHD( mySyncBarrier = b; uwd.sync_barrier = b; - worker.start(&uwd); - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -288,6 +286,8 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) if (first_run) { myLogger.level(debug) << "OutputUHD: UHD initialising..."; + worker.start(&uwd); + uwd.bufsize = dataIn->getLength(); uwd.frame0.buf = malloc(uwd.bufsize); uwd.frame1.buf = malloc(uwd.bufsize); @@ -332,6 +332,19 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } mySyncBarrier.get()->wait(); + if (!uwd.running) { + worker.stop(); + first_run = true; + if (uwd.failed_due_to_fct) { + throw fct_discontinuity_error(); + } + else { + myLogger.level(error) << + "OutputUHD: Error, UHD worker failed"; + throw std::runtime_error("UHD worker failed"); + } + } + // write into the our buffer while // the worker sends the other. @@ -374,6 +387,21 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } +void UHDWorker::process_errhandler() +{ + try { + process(); + } + catch (fct_discontinuity_error& e) { + uwd->logger->level(warn) << e.what(); + uwd->failed_due_to_fct = true; + } + + uwd->running = false; + uwd->sync_barrier.get()->wait(); + uwd->logger->level(warn) << "UHD worker terminated"; +} + void UHDWorker::process() { int workerbuffer = 0; @@ -409,7 +437,7 @@ void UHDWorker::process() int expected_next_fct = -1; - while (running) { + while (uwd->running) { bool fct_discontinuity = false; md.has_time_spec = false; md.time_spec = uhd::time_spec_t(0.0); @@ -449,6 +477,7 @@ void UHDWorker::process() "OutputUHD: Incorrect expect fct " << frame->ts.fct; fct_discontinuity = true; + throw fct_discontinuity_error(); } } @@ -553,7 +582,7 @@ void UHDWorker::process() PDEBUG("UHDWorker::process:max_num_samps: %zu.\n", usrp_max_num_samps); - while (running && !uwd->muting && (num_acc_samps < sizeIn)) { + while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) { size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); //ensure the the last packet has EOB set if the timestamps has been @@ -665,8 +694,6 @@ loopend: // swap buffers workerbuffer = (workerbuffer + 1) % 2; } - - uwd->logger->level(warn) << "UHD worker terminated"; } diff --git a/src/OutputUHD.h b/src/OutputUHD.h index c5d561b..d92c7a4 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -83,9 +83,20 @@ struct UHDWorkerFrameData { struct frame_timestamp ts; }; +struct fct_discontinuity_error : public std::exception +{ + const char* what () const throw () + { + return "FCT discontinuity detected"; + } +}; + enum refclk_lock_loss_behaviour_t { CRASH, IGNORE }; struct UHDWorkerData { + bool running; + bool failed_due_to_fct; + #if FAKE_UHD == 0 uhd::usrp::multi_usrp::sptr myUsrp; #endif @@ -130,28 +141,26 @@ struct UHDWorkerData { class UHDWorker { public: - UHDWorker () { - running = false; - } - void start(struct UHDWorkerData *uhdworkerdata) { - running = true; uwd = uhdworkerdata; - uhd_thread = boost::thread(&UHDWorker::process, this); + + uwd->running = true; + uwd->failed_due_to_fct = false; + uhd_thread = boost::thread(&UHDWorker::process_errhandler, this); } void stop() { - running = false; + uwd->running = false; uhd_thread.interrupt(); uhd_thread.join(); } + private: void process(); + void process_errhandler(); - private: struct UHDWorkerData *uwd; - bool running; boost::thread uhd_thread; uhd::tx_streamer::sptr myTxStream; -- cgit v1.2.3 From b042d58100282b480834b283fb40f3b5390327e8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 10 Apr 2015 12:54:40 +0200 Subject: Use make_shared in DabMod.cpp --- src/DabMod.cpp | 11 +++++------ src/OutputUHD.cpp | 28 ++++++++++++++-------------- src/OutputUHD.h | 6 +++--- 3 files changed, 22 insertions(+), 23 deletions(-) (limited to 'src/OutputUHD.h') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 304d252..b713d30 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -726,22 +726,22 @@ int main(int argc, char* argv[]) if (useFileOutput) { if (fileOutputFormat == "complexf") { - output = shared_ptr(new OutputFile(outputName)); + output = make_shared(outputName); } else if (fileOutputFormat == "s8") { // We must normalise the samples to the interval [-127.0; 127.0] normalise = 127.0f / normalise_factor; - format_converter = shared_ptr(new FormatConverter()); + format_converter = make_shared(); - output = shared_ptr(new OutputFile(outputName)); + output = make_shared(outputName); } } #if defined(HAVE_OUTPUT_UHD) else if (useUHDOutput) { normalise = 1.0f / normalise_factor; outputuhd_conf.sampleRate = outputRate; - output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); + output = make_shared(outputuhd_conf, &logger); ((OutputUHD*)output.get())->enrol_at(rcs); } #endif @@ -808,8 +808,7 @@ int main(int argc, char* argv[]) run_again = true; // Create a new input reader - inputZeroMQReader = shared_ptr( - new InputZeroMQReader(logger)); + inputZeroMQReader = make_shared(logger); inputZeroMQReader->Open(inputName, inputMaxFramesQueued); m.inputReader = inputZeroMQReader.get(); } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index e8950a2..dbf8b9d 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -47,8 +47,8 @@ using namespace std; typedef std::complex complexf; OutputUHD::OutputUHD( - OutputUHDConfig& config, - Logger& logger) : + const OutputUHDConfig& config, + Logger *logger) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), myLogger(logger), @@ -152,18 +152,18 @@ OutputUHD::OutputUHD( myConf.muteNoTimestamps ? "enabled" : "disabled"); if (myConf.enableSync && (myConf.pps_src == "none")) { - myLogger.level(warn) << + myLogger->level(warn) << "OutputUHD: WARNING:" " you are using synchronous transmission without PPS input!"; struct timespec now; if (clock_gettime(CLOCK_REALTIME, &now)) { perror("OutputUHD:Error: could not get time: "); - myLogger.level(error) << "OutputUHD: could not get time"; + myLogger->level(error) << "OutputUHD: could not get time"; } else { myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec)); - myLogger.level(info) << "OutputUHD: Setting USRP time to " << + myLogger->level(info) << "OutputUHD: Setting USRP time to " << uhd::time_spec_t(now.tv_sec).get_real_secs(); } } @@ -174,7 +174,7 @@ OutputUHD::OutputUHD( struct timespec now; time_t seconds; if (clock_gettime(CLOCK_REALTIME, &now)) { - myLogger.level(error) << "OutputUHD: could not get time :" << + myLogger->level(error) << "OutputUHD: could not get time :" << strerror(errno); throw std::runtime_error("OutputUHD: could not get time."); } @@ -185,7 +185,7 @@ OutputUHD::OutputUHD( while (seconds + 1 > now.tv_sec) { usleep(1); if (clock_gettime(CLOCK_REALTIME, &now)) { - myLogger.level(error) << "OutputUHD: could not get time :" << + myLogger->level(error) << "OutputUHD: could not get time :" << strerror(errno); throw std::runtime_error("OutputUHD: could not get time."); } @@ -195,12 +195,12 @@ OutputUHD::OutputUHD( usleep(200000); // 200ms, we want the PPS to be later myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2)); - myLogger.level(info) << "OutputUHD: Setting USRP time next pps to " << + myLogger->level(info) << "OutputUHD: Setting USRP time next pps to " << uhd::time_spec_t(seconds + 2).get_real_secs(); } usleep(1e6); - myLogger.log(info, "OutputUHD: USRP time %f\n", + myLogger->log(info, "OutputUHD: USRP time %f\n", myUsrp->get_time_now().get_real_secs()); } @@ -214,7 +214,7 @@ OutputUHD::OutputUHD( uwd.sampleRate = myConf.sampleRate; uwd.sourceContainsTimestamp = false; uwd.muteNoTimestamps = myConf.muteNoTimestamps; - uwd.logger = &myLogger; + uwd.logger = myLogger; uwd.refclk_lock_loss_behaviour = myConf.refclk_lock_loss_behaviour; if (myConf.refclk_src == "internal") { @@ -284,7 +284,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) // We will only wait on the barrier on the subsequent calls to // OutputUHD::process if (first_run) { - myLogger.level(debug) << "OutputUHD: UHD initialising..."; + myLogger->level(debug) << "OutputUHD: UHD initialising..."; worker.start(&uwd); @@ -319,13 +319,13 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) lastLen = uwd.bufsize; first_run = false; - myLogger.level(debug) << "OutputUHD: UHD initialising complete"; + myLogger->level(debug) << "OutputUHD: UHD initialising complete"; } else { if (lastLen != dataIn->getLength()) { // I expect that this never happens. - myLogger.level(emerg) << + myLogger->level(emerg) << "OutputUHD: Fatal error, input length changed from " << lastLen << " to " << dataIn->getLength(); throw std::runtime_error("Non-constant input length!"); @@ -339,7 +339,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) throw fct_discontinuity_error(); } else { - myLogger.level(error) << + myLogger->level(error) << "OutputUHD: Error, UHD worker failed"; throw std::runtime_error("UHD worker failed"); } diff --git a/src/OutputUHD.h b/src/OutputUHD.h index d92c7a4..aed80f6 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -200,8 +200,8 @@ class OutputUHD: public ModOutput, public RemoteControllable { public: OutputUHD( - OutputUHDConfig& config, - Logger& logger); + const OutputUHDConfig& config, + Logger *logger); ~OutputUHD(); int process(Buffer* dataIn, Buffer* dataOut); @@ -227,7 +227,7 @@ class OutputUHD: public ModOutput, public RemoteControllable { protected: - Logger& myLogger; + Logger *myLogger; EtiReader *myEtiReader; OutputUHDConfig myConf; uhd::usrp::multi_usrp::sptr myUsrp; -- cgit v1.2.3