From 7e3e3f290e9fbbd314919474ed7bc61c3ce43041 Mon Sep 17 00:00:00 2001 From: Jörgen Scott Date: Tue, 16 Dec 2014 14:49:57 +0100 Subject: added zmq controller to uhd --- src/.DabMod.cpp.un~ | Bin 30089 -> 0 bytes src/.OutputUHD.cpp.un~ | Bin 67312 -> 0 bytes src/.OutputUHD.h.un~ | Bin 13690 -> 0 bytes src/DabMod.cpp | 10 +++-- src/OutputUHD.cpp | 113 ++++++++++++++++++++++++++++++++++++++++++++++++- src/OutputUHD.h | 20 ++++++++- 6 files changed, 137 insertions(+), 6 deletions(-) delete mode 100644 src/.DabMod.cpp.un~ delete mode 100644 src/.OutputUHD.cpp.un~ delete mode 100644 src/.OutputUHD.h.un~ diff --git a/src/.DabMod.cpp.un~ b/src/.DabMod.cpp.un~ deleted file mode 100644 index ebbb822..0000000 Binary files a/src/.DabMod.cpp.un~ and /dev/null differ diff --git a/src/.OutputUHD.cpp.un~ b/src/.OutputUHD.cpp.un~ deleted file mode 100644 index 96f080d..0000000 Binary files a/src/.OutputUHD.cpp.un~ and /dev/null differ diff --git a/src/.OutputUHD.h.un~ b/src/.OutputUHD.h.un~ deleted file mode 100644 index 5f3e54a..0000000 Binary files a/src/.OutputUHD.h.un~ and /dev/null differ diff --git a/src/DabMod.cpp b/src/DabMod.cpp index ee21ed4..4342522 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef HAVE_NETINET_IN_H # include @@ -189,6 +190,9 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif + zmq::context_t zmqCtrlContext(1); + std::string zmqCtrlEndpoint = ""; + // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; modconf.use_offset_file = false; @@ -363,8 +367,8 @@ int main(int argc, char* argv[]) } } - //std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); - //std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; + zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); + std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; // input params: if (pt.get("input.loop", 0) == 1) { @@ -701,7 +705,7 @@ int main(int argc, char* argv[]) outputuhd_conf.sampleRate = outputRate; try { - output = new OutputUHD(outputuhd_conf, logger); + output = new OutputUHD(outputuhd_conf, logger, &zmqCtrlContext, zmqCtrlEndpoint); ((OutputUHD*)output)->enrol_at(*rc); } catch (std::exception& e) { diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 8713042..6a4ccf4 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -45,7 +45,9 @@ typedef std::complex complexf; OutputUHD::OutputUHD( OutputUHDConfig& config, - Logger& logger) : + Logger& logger, + zmq::context_t *pContext, + const std::string &zmqCtrlEndpoint) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), myLogger(logger), @@ -225,6 +227,14 @@ OutputUHD::OutputUHD( worker.start(&uwd); + m_pZmqRepThread = NULL; + if (!zmqCtrlEndpoint.empty()) + { + m_pContext = pContext; + m_zmqCtrlEndpoint = zmqCtrlEndpoint; + m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this)); + } + MDEBUG("OutputUHD:UHD ready.\n"); } @@ -232,6 +242,12 @@ OutputUHD::OutputUHD( OutputUHD::~OutputUHD() { MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this); + if (m_pZmqRepThread != NULL) + { + m_pZmqRepThread->interrupt(); + m_pZmqRepThread->join(); + } + worker.stop(); if (!first_run) { free(uwd.frame0.buf); @@ -667,3 +683,98 @@ const string OutputUHD::get_parameter(const string& parameter) const return ss.str(); } +void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector &message) +{ + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } +} + +void OutputUHD::SendOkReply(zmq::socket_t *pSocket) +{ + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); +} + +void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error) +{ + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); +} + +//TODO: Should be implemented as an alternative to RemoteControllerTelnet and +//moved to the RemoteControl.h/cpp file instead. +void OutputUHD::ZmqCtrl() +{ + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(*m_pContext, ZMQ_REP); + std::cout << "Starting output UHD control thread" << std::endl; + try + { + // connect the socket + int hwm = 5; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.connect(m_zmqCtrlEndpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + RecvAll(&repSocket, msg); + std::string module((char*)msg[0].data(), msg[0].size()); + if (module == "uhd") + { + if (msg.size() != 3) + { + SendFailReply(&repSocket, "Wrong request format"); + continue; + } + + std::string param((char*) msg[1].data(), msg[1].size()); + std::string value((char*) msg[2].data(), msg[2].size()); + try + { + set_parameter(param, value); + } + catch (ParameterError &err) + { + SendFailReply(&repSocket, err.what()); + continue; + } + SendOkReply(&repSocket); + } + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } + repSocket.close(); +} diff --git a/src/OutputUHD.h b/src/OutputUHD.h index f50807d..25f7476 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -181,9 +181,12 @@ struct OutputUHDConfig { class OutputUHD: public ModOutput, public RemoteControllable { public: + OutputUHD( OutputUHDConfig& config, - Logger& logger); + Logger& logger, + zmq::context_t *pContext, + const std::string &zmqCtrlEndpoint); ~OutputUHD(); int process(Buffer* dataIn, Buffer* dataOut); @@ -221,9 +224,22 @@ class OutputUHD: public ModOutput, public RemoteControllable { // muting can only be changed using the remote control bool myMuting; + + private: + // zmq receiving method + //TODO: Should be implemented as an alternative to RemoteControllerTelnet and + //moved to the RemoteControl.h/cpp file instead. + void ZmqCtrl(void); + void RecvAll(zmq::socket_t* pSocket, std::vector &message); + void SendOkReply(zmq::socket_t *pSocket); + void SendFailReply(zmq::socket_t *pSocket, const std::string &error); + + // data int myStaticDelay; std::vector m_delayBuf; - + zmq::context_t *m_pContext; + std::string m_zmqCtrlEndpoint; + boost::thread *m_pZmqRepThread; size_t lastLen; }; -- cgit v1.2.3