diff options
-rw-r--r-- | doc/example.ini | 25 | ||||
-rw-r--r-- | src/DabMod.cpp | 34 | ||||
-rw-r--r-- | src/DabModulator.cpp | 8 | ||||
-rw-r--r-- | src/DabModulator.h | 4 | ||||
-rw-r--r-- | src/OutputUHD.cpp | 52 | ||||
-rw-r--r-- | src/OutputUHD.h | 5 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 143 | ||||
-rw-r--r-- | src/RemoteControl.h | 127 |
8 files changed, 375 insertions, 23 deletions
diff --git a/doc/example.ini b/doc/example.ini index e1ce252..3c81c04 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -8,6 +8,29 @@ telnet=1 telnetport=2121 +; Enable zmq remote control. +; The zmq remote control is intended for machine-to-machine +; integration and requires that ODR-DabMod is built with zmq support. +; The zmq remote control may run in parallel with Telnet. +; +; Protocol: +; ODR-DabMod binds a zmq rep socket so clients must connect +; using either req or dealer socket. +; [] denotes message part as zmq multi-part message are used for delimitation. +; All message parts are utf-8 encoded strings and match the Telnet command set. +; Explicit codes are denoted with "". +; The following commands are supported: +; REQ: ["ping"] +; REP: ["ok"] +; +; REQ: ["get"][module name][parameter] +; REP: [value] _OR_ ["fail"][error description] +; +; REQ: ["set"][module name][parameter][value] +; REP: ["ok"] _OR_ ["fail"][error description] +zmqctrl=1 +zmqctrlendpoint=tcp://127.0.0.1:9400 + [log] ; Write to a logfile or to syslog. ; Setting filename to stderr is very useful during tests and development @@ -59,7 +82,7 @@ gainmode=2 ; and wide-band noise will be generated. ; ; Be aware that there is a dependency with resampling. -digital_gain=1.0 +digital_gain=0.8 ; Output sample rate. Values other than 2048000 enable ; resampling. diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 91c0b9d..4912bee 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -209,7 +209,7 @@ int main(int argc, char* argv[]) InputMemory* input = NULL; ModOutput* output = NULL; - BaseRemoteController* rc = NULL; + RemoteControllers rcs; Logger logger; InputFileReader inputFileReader(logger); @@ -399,7 +399,7 @@ int main(int argc, char* argv[]) try { int telnetport = pt.get<int>("remotecontrol.telnetport"); RemoteControllerTelnet* telnetrc = new RemoteControllerTelnet(telnetport); - rc = telnetrc; + rcs.add_controller(telnetrc); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; @@ -408,6 +408,22 @@ int main(int argc, char* argv[]) } } +#if defined(HAVE_INPUT_ZEROMQ) + if (pt.get("remotecontrol.zmqctrl", 0) == 1) { + try { + std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); + std::cerr << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; + RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint); + rcs.add_controller(zmqrc); + } + catch (std::exception &e) { + std::cerr << "Error: " << e.what() << "\n"; + std::cerr << " zmq remote control enabled, but no endpoint defined.\n"; + goto END_MAIN; + } + } +#endif + // input params: if (pt.get("input.loop", 0) == 1) { loop = true; @@ -615,9 +631,9 @@ int main(int argc, char* argv[]) #endif } - if (!rc) { + if (rcs.get_no_controllers() == 0) { logger.level(warn) << "No Remote-Control started"; - rc = new RemoteControllerDummy(); + rcs.add_controller(new RemoteControllerDummy()); } @@ -762,7 +778,7 @@ int main(int argc, char* argv[]) outputuhd_conf.sampleRate = outputRate; try { output = new OutputUHD(outputuhd_conf, logger); - ((OutputUHD*)output)->enrol_at(*rc); + ((OutputUHD*)output)->enrol_at(rcs); } catch (std::exception& e) { logger.level(error) << "UHD initialisation failed:" << e.what(); @@ -782,7 +798,7 @@ int main(int argc, char* argv[]) flowgraph = new Flowgraph(); data.setLength(6144); input = new InputMemory(&data); - modulator = new DabModulator(modconf, rc, logger, outputRate, clockRate, + modulator = new DabModulator(modconf, &rcs, logger, outputRate, clockRate, dabMode, gainMode, digitalgain, normalise, filterTapsFilename); flowgraph->connect(input, modulator); flowgraph->connect(modulator, output); @@ -821,10 +837,8 @@ int main(int argc, char* argv[]) /* Check every once in a while if the remote control * is still working */ - if (rc && (frame % 250) == 0 && rc->fault_detected()) { - fprintf(stderr, - "Detected Remote Control fault, restarting it\n"); - rc->restart(); + if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { + rcs.check_faults(); } } if (framesize == 0) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 7f246d8..2664a08 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -53,7 +53,7 @@ DabModulator::DabModulator( struct modulator_offset_config& modconf, - BaseRemoteController* rc, + RemoteControllers* rcs, Logger& logger, unsigned outputRate, unsigned clockRate, unsigned dabMode, GainMode gainMode, @@ -71,7 +71,7 @@ DabModulator::DabModulator( myEtiReader(EtiReader(modconf, myLogger)), myFlowgraph(NULL), myFilterTapsFilename(filterTapsFilename), - myRC(rc) + myRCs(rcs) { PDEBUG("DabModulator::DabModulator(%u, %u, %u, %u) @ %p\n", outputRate, clockRate, dabMode, gainMode, this); @@ -201,13 +201,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing); cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise); - cifGain->enrol_at(*myRC); + cifGain->enrol_at(*myRCs); cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing, myNullSize, mySymSize); if (myFilterTapsFilename != "") { cifFilter = new FIRFilter(myFilterTapsFilename); - cifFilter->enrol_at(*myRC); + cifFilter->enrol_at(*myRCs); } myOutput = new OutputMemory(); diff --git a/src/DabModulator.h b/src/DabModulator.h index 21f9f61..84c9926 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -47,7 +47,7 @@ class DabModulator : public ModCodec public: DabModulator( struct modulator_offset_config& modconf, - BaseRemoteController* rc, + RemoteControllers* rcs, Logger& logger, unsigned outputRate = 2048000, unsigned clockRate = 0, unsigned dabMode = 0, GainMode gainMode = GAIN_VAR, @@ -77,7 +77,7 @@ protected: Flowgraph* myFlowgraph; OutputMemory* myOutput; std::string myFilterTapsFilename; - BaseRemoteController* myRC; + RemoteControllers* myRCs; size_t myNbSymbols; size_t myNbCarriers; diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 8063e75..741731e 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -56,10 +56,12 @@ OutputUHD::OutputUHD( // Since we don't know the buffer size, we cannot initialise // the buffers at object initialisation. first_run(true), - activebuffer(1) + activebuffer(1), + myDelayBuf(196608) { myMuting = 0; // is remote-controllable + myStaticDelay = 0; // is remote-controllable #if FAKE_UHD MDEBUG("OutputUHD:Using fake UHD output"); @@ -87,7 +89,9 @@ OutputUHD::OutputUHD( /* register the parameters that can be remote controlled */ RC_ADD_PARAMETER(txgain, "UHD analog daughterboard TX gain"); RC_ADD_PARAMETER(freq, "UHD transmission frequency"); - RC_ADD_PARAMETER(muting, "mute the output by stopping the transmitter"); + RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); + RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000"); + RC_ADD_PARAMETER(iqbalance, "Set I/Q balance between 0 and 1.0"); uhd::set_thread_priority_safe(); @@ -236,6 +240,10 @@ OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); worker.stop(); + if (!first_run) { + free(uwd.frame0.buf); + free(uwd.frame1.buf); + } } int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) @@ -298,13 +306,30 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) uwd.sourceContainsTimestamp = myConf.enableSync && myEtiReader->sourceContainsTimestamp(); + // calculate delay + uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000; + uint32_t noByteDelay = noSampleDelay * sizeof(complexf); + + uint8_t* pInData = (uint8_t*) dataIn->getData(); if (activebuffer == 0) { - memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); + uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; + // copy remain from delaybuf + memcpy(pTmp, &myDelayBuf[0], noByteDelay); + // copy new data + memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame0.ts = ts; } else if (activebuffer == 1) { - memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize); + uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; + // copy remain from delaybuf + memcpy(pTmp, &myDelayBuf[0], noByteDelay); + // copy new data + memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); + // copy remaining data to delay buf + memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay); uwd.frame1.ts = ts; } @@ -627,6 +652,22 @@ void OutputUHD::set_parameter(const string& parameter, const string& value) else if (parameter == "muting") { ss >> myMuting; } + else if (parameter == "staticdelay") { + int adjust; + ss >> adjust; + int newStaticDelay = myStaticDelay + adjust; + if (newStaticDelay > 96000) + myStaticDelay = newStaticDelay - 96000; + else if (newStaticDelay < 0) + myStaticDelay = newStaticDelay + 96000; + else + myStaticDelay = newStaticDelay; + } + else if (parameter == "iqbalance") { + ss >> myConf.frequency; + myUsrp->set_tx_freq(myConf.frequency); + myConf.frequency = myUsrp->get_tx_freq(); + } else { stringstream ss; ss << "Parameter '" << parameter @@ -647,6 +688,9 @@ const string OutputUHD::get_parameter(const string& parameter) const else if (parameter == "muting") { ss << myMuting; } + else if (parameter == "staticdelay") { + ss << myStaticDelay; + } else { ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); diff --git a/src/OutputUHD.h b/src/OutputUHD.h index a2ffb7d..7eb6733 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -188,6 +188,7 @@ struct OutputUHDConfig { class OutputUHD: public ModOutput, public RemoteControllable { public: + OutputUHD( OutputUHDConfig& config, Logger& logger); @@ -229,6 +230,10 @@ class OutputUHD: public ModOutput, public RemoteControllable { // muting can only be changed using the remote control bool myMuting; + private: + // data + int myStaticDelay; + std::vector<complexf> myDelayBuf; size_t lastLen; }; diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 5bbd2f8..6f538dc 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -246,3 +246,146 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) ignored_error); } + +#if defined(HAVE_INPUT_ZEROMQ) + +void RemoteControllerZmq::restart() +{ + m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ + m_running = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + + m_child_thread = boost::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message) +{ + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) +{ + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) +{ + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + std::cout << "Starting zmq remote control thread" << std::endl; + try + { + // connect the socket + int hwm = 100; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.bind(m_endpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector<std::string> msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + recv_all(&repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); + + if (msg.size() == 1 && command == "ping") + { + send_ok_reply(&repSocket); + } + else if (msg.size() == 3 && command == "get") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + + try + { + std::string value = get_param_(module, parameter); + zmq::message_t *pMsg = new zmq::message_t(value.size()); + memcpy ((void*) pMsg->data(), value.data(), value.size()); + repSocket.send(*pMsg, 0); + delete pMsg; + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else if (msg.size() == 4 && command == "set") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + std::string value((char*) msg[3].data(), msg[3].size()); + + try + { + set_param_(module, parameter, value); + send_ok_reply(&repSocket); + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else + send_fail_reply(&repSocket, "Unsupported command"); + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } + catch (std::exception& e) + { + std::cerr << "Remote control caught exception: " << e.what() << std::endl; + m_fault = true; + } + repSocket.close(); +} +#endif + diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 09e7492..905e153 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -29,6 +29,14 @@ #ifndef _REMOTECONTROL_H #define _REMOTECONTROL_H +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_INPUT_ZEROMQ) +#include <zmq.hpp> +#endif + #include <list> #include <map> #include <string> @@ -85,6 +93,39 @@ class BaseRemoteController { virtual ~BaseRemoteController() {} }; +/* Holds all our remote controllers, i.e. we may have more than + * one type of controller running. + */ +class RemoteControllers { + public: + void add_controller(BaseRemoteController *rc) { + m_controllers.push_back(rc); + } + + void add_controllable(RemoteControllable *rc) { + for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin(); + it != m_controllers.end(); ++it) { + (*it)->enrol(rc); + } + } + + void check_faults() { + for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin(); + it != m_controllers.end(); ++it) { + if ((*it)->fault_detected()) + { + fprintf(stderr, + "Detected Remote Control fault, restarting it\n"); + (*it)->restart(); + } + } + } + size_t get_no_controllers() { return m_controllers.size(); } + + private: + std::list<BaseRemoteController*> m_controllers; +}; + /* Objects that support remote control must implement the following class */ class RemoteControllable { public: @@ -100,8 +141,8 @@ class RemoteControllable { virtual std::string get_rc_name() const { return m_name; } /* Tell the controllable to enrol at the given controller */ - virtual void enrol_at(BaseRemoteController& controller) { - controller.enrol(this); + virtual void enrol_at(RemoteControllers& controllers) { + controllers.add_controllable(this); } /* Return a list of possible parameters that can be set */ @@ -254,6 +295,88 @@ class RemoteControllerTelnet : public BaseRemoteController { int m_port; }; +#if defined(HAVE_INPUT_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { + public: + RemoteControllerZmq() + : m_running(false), m_fault(false), + m_zmqContext(1), + m_endpoint("") { } + + RemoteControllerZmq(std::string endpoint) + : m_running(true), m_fault(false), + m_child_thread(&RemoteControllerZmq::process, this), + m_zmqContext(1), + m_endpoint(endpoint) { } + + ~RemoteControllerZmq() { + m_running = false; + m_fault = false; + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + } + + void enrol(RemoteControllable* controllable) { + m_cohort.push_back(controllable); + } + + virtual bool fault_detected() { return m_fault; } + + virtual void restart(); + + private: + void restart_thread(); + + void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message); + void send_ok_reply(zmq::socket_t *pSocket); + void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); + void process(); + + + RemoteControllerZmq& operator=(const RemoteControllerZmq& other); + RemoteControllerZmq(const RemoteControllerZmq& other); + + RemoteControllable* get_controllable_(std::string name) { + for (std::list<RemoteControllable*>::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + if ((*it)->get_rc_name() == name) + { + return *it; + } + } + throw ParameterError("Module name unknown"); + } + + std::string get_param_(std::string name, std::string param) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter(param); + } + + void set_param_(std::string name, std::string param, std::string value) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); + } + + bool m_running; + + /* This is set to true if a fault occurred */ + bool m_fault; + boost::thread m_restarter_thread; + + boost::thread m_child_thread; + + /* This controller commands the controllables in the cohort */ + std::list<RemoteControllable*> m_cohort; + + zmq::context_t m_zmqContext; + std::string m_endpoint; +}; +#endif /* The Dummy remote controller does nothing, and never fails */ |