summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJörgen Scott <jorgen.scott@gmail.com>2015-01-13 11:27:39 +0100
committerJörgen Scott <jorgen.scott@gmail.com>2015-01-13 11:27:39 +0100
commitba790cba2f7b48dd66f4418de0b7b366422926b0 (patch)
treea31f4fce6227b549561f53fb6f10e455070871f0
parent593c130b1e6848a08b30a84732ebd6862ef2e3b7 (diff)
downloaddabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.tar.gz
dabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.tar.bz2
dabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.zip
added zmq remote control
-rw-r--r--doc/example.ini22
-rw-r--r--src/DabMod.cpp45
-rw-r--r--src/DabModulator.cpp8
-rw-r--r--src/DabModulator.h4
-rw-r--r--src/OutputUHD.cpp137
-rw-r--r--src/OutputUHD.h18
-rw-r--r--src/RemoteControl.cpp142
-rw-r--r--src/RemoteControl.h131
8 files changed, 336 insertions, 171 deletions
diff --git a/doc/example.ini b/doc/example.ini
index 49f6eda..9a80eeb 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -8,6 +8,28 @@
telnet=1
telnetport=2121
+; Enable zmq remote control.
+; The zmq remote control is intended for machine-to-machine
+; integration and requires that the odr-mod is build with zmq support.
+; The zmq remote control may run in parallell with Telnet.
+; Protocol:
+; The odr-dabmod binds a zmq rep socket so clients must connect
+; using either req or dealer socket.
+; [] denotes message part as zmq multi-part message are used for delimitation.
+; All message parts are utf-8 encoded strings and matches the Telnet command set.
+; Explicit codes are denoted with "".
+; The following commands are supported:
+; REQ: ["ping"]
+; REP: ["ok"]
+;
+; REQ: ["get"][module name][parameter]
+; REP: [value] _OR_ ["fail"][error description]
+;
+; REQ: ["set"][module name][parameter][value]
+; REP: ["ok"] _OR_ ["fail"][error description]
+zmqctrl=1
+zmqctrlendpoint=tcp://127.0.0.1:9400
+
[log]
; Write to a logfile or to syslog.
; Setting filename to stderr is very useful during tests and development
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 4342522..dadade9 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -55,7 +55,7 @@
#include <sys/stat.h>
#include <stdexcept>
#include <signal.h>
-#include <zmq.hpp>
+//#include <zmq.hpp>
#ifdef HAVE_NETINET_IN_H
# include <netinet/in.h>
@@ -190,8 +190,8 @@ int main(int argc, char* argv[])
OutputUHDConfig outputuhd_conf;
#endif
- zmq::context_t zmqCtrlContext(1);
- std::string zmqCtrlEndpoint = "";
+ //zmq::context_t zmqCtrlContext(1);
+ //std::string zmqCtrlEndpoint = "";
// To handle the timestamp offset of the modulator
struct modulator_offset_config modconf;
@@ -204,7 +204,8 @@ int main(int argc, char* argv[])
InputMemory* input = NULL;
ModOutput* output = NULL;
- BaseRemoteController* rc = NULL;
+ //BaseRemoteController* rc = NULL;
+ RemoteControllers rcs;
Logger logger;
InputFileReader inputFileReader(logger);
@@ -358,7 +359,7 @@ int main(int argc, char* argv[])
try {
int telnetport = pt.get<int>("remotecontrol.telnetport");
RemoteControllerTelnet* telnetrc = new RemoteControllerTelnet(telnetport);
- rc = telnetrc;
+ rcs.add_controller(telnetrc);
}
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
@@ -367,8 +368,22 @@ int main(int argc, char* argv[])
}
}
- zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", "");
- std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl;
+#if defined(HAVE_INPUT_ZEROMQ)
+ if (pt.get("remotecontrol.zmqctrl", 0) == 1) {
+ try {
+ std::string zmqCtrlEndpoint =
+ pt.get("remotecontrol.zmqctrlendpoint", "");
+ std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl;
+ RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint);
+ rcs.add_controller(zmqrc);
+ }
+ catch (std::exception &e) {
+ std::cerr << "Error: " << e.what() << "\n";
+ std::cerr << " zmq remote control enabled, but no endpoint defined.\n";
+ goto END_MAIN;
+ }
+ }
+#endif
// input params:
if (pt.get("input.loop", 0) == 1) {
@@ -570,9 +585,9 @@ int main(int argc, char* argv[])
outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1);
#endif
}
- if (!rc) {
+ if (rcs.get_no_controllers() == 0) {
logger.level(warn) << "No Remote-Control started";
- rc = new RemoteControllerDummy();
+ rcs.add_controller(new RemoteControllerDummy());
}
@@ -705,8 +720,8 @@ int main(int argc, char* argv[])
outputuhd_conf.sampleRate = outputRate;
try {
- output = new OutputUHD(outputuhd_conf, logger, &zmqCtrlContext, zmqCtrlEndpoint);
- ((OutputUHD*)output)->enrol_at(*rc);
+ output = new OutputUHD(outputuhd_conf, logger/*, &zmqCtrlContext, zmqCtrlEndpoint*/);
+ ((OutputUHD*)output)->enrol_at(rcs);
}
catch (std::exception& e) {
logger.level(error) << "UHD initialisation failed:" << e.what();
@@ -718,7 +733,7 @@ int main(int argc, char* argv[])
flowgraph = new Flowgraph();
data.setLength(6144);
input = new InputMemory(&data);
- modulator = new DabModulator(modconf, rc, logger, outputRate, clockRate,
+ modulator = new DabModulator(modconf, &rcs, logger, outputRate, clockRate,
dabMode, gainMode, digitalgain, normalise, filterTapsFilename);
flowgraph->connect(input, modulator);
flowgraph->connect(modulator, output);
@@ -757,10 +772,8 @@ int main(int argc, char* argv[])
/* Check every once in a while if the remote control
* is still working */
- if (rc && (frame % 250) == 0 && rc->fault_detected()) {
- fprintf(stderr,
- "Detected Remote Control fault, restarting it\n");
- rc->restart();
+ if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) {
+ rcs.check_faults();
}
}
if (framesize == 0) {
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 7f246d8..2664a08 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -53,7 +53,7 @@
DabModulator::DabModulator(
struct modulator_offset_config& modconf,
- BaseRemoteController* rc,
+ RemoteControllers* rcs,
Logger& logger,
unsigned outputRate, unsigned clockRate,
unsigned dabMode, GainMode gainMode,
@@ -71,7 +71,7 @@ DabModulator::DabModulator(
myEtiReader(EtiReader(modconf, myLogger)),
myFlowgraph(NULL),
myFilterTapsFilename(filterTapsFilename),
- myRC(rc)
+ myRCs(rcs)
{
PDEBUG("DabModulator::DabModulator(%u, %u, %u, %u) @ %p\n",
outputRate, clockRate, dabMode, gainMode, this);
@@ -201,13 +201,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing);
cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise);
- cifGain->enrol_at(*myRC);
+ cifGain->enrol_at(*myRCs);
cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing,
myNullSize, mySymSize);
if (myFilterTapsFilename != "") {
cifFilter = new FIRFilter(myFilterTapsFilename);
- cifFilter->enrol_at(*myRC);
+ cifFilter->enrol_at(*myRCs);
}
myOutput = new OutputMemory();
diff --git a/src/DabModulator.h b/src/DabModulator.h
index 21f9f61..84c9926 100644
--- a/src/DabModulator.h
+++ b/src/DabModulator.h
@@ -47,7 +47,7 @@ class DabModulator : public ModCodec
public:
DabModulator(
struct modulator_offset_config& modconf,
- BaseRemoteController* rc,
+ RemoteControllers* rcs,
Logger& logger,
unsigned outputRate = 2048000, unsigned clockRate = 0,
unsigned dabMode = 0, GainMode gainMode = GAIN_VAR,
@@ -77,7 +77,7 @@ protected:
Flowgraph* myFlowgraph;
OutputMemory* myOutput;
std::string myFilterTapsFilename;
- BaseRemoteController* myRC;
+ RemoteControllers* myRCs;
size_t myNbSymbols;
size_t myNbCarriers;
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index e3d2d77..4776965 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -45,9 +45,7 @@ typedef std::complex<float> complexf;
OutputUHD::OutputUHD(
OutputUHDConfig& config,
- Logger& logger,
- zmq::context_t *pContext,
- const std::string &zmqCtrlEndpoint) :
+ Logger& logger) :
ModOutput(ModFormat(1), ModFormat(0)),
RemoteControllable("uhd"),
myLogger(logger),
@@ -56,7 +54,7 @@ OutputUHD::OutputUHD(
// the buffers at object initialisation.
first_run(true),
activebuffer(1),
- m_delayBuf(196608)
+ myDelayBuf(196608)
{
myMuting = 0; // is remote-controllable
@@ -227,14 +225,6 @@ OutputUHD::OutputUHD(
worker.start(&uwd);
- m_pZmqRepThread = NULL;
- if (!zmqCtrlEndpoint.empty())
- {
- m_pContext = pContext;
- m_zmqCtrlEndpoint = zmqCtrlEndpoint;
- m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this));
- }
-
MDEBUG("OutputUHD:UHD ready.\n");
}
@@ -242,12 +232,6 @@ OutputUHD::OutputUHD(
OutputUHD::~OutputUHD()
{
MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this);
- if (m_pZmqRepThread != NULL)
- {
- m_pZmqRepThread->interrupt();
- m_pZmqRepThread->join();
- }
-
worker.stop();
if (!first_run) {
free(uwd.frame0.buf);
@@ -315,13 +299,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
if (activebuffer == 0) {
uint8_t *pTmp = (uint8_t*) uwd.frame0.buf;
// copy remain from delaybuf
- memcpy(pTmp, &m_delayBuf[0], noByteDelay);
+ memcpy(pTmp, &myDelayBuf[0], noByteDelay);
// copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
// copy remaining data to delay buf
- memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
-
- //memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize);
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame0.ts = ts;
uwd.frame0.fct = myEtiReader->getFCT();
@@ -329,13 +311,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
else if (activebuffer == 1) {
uint8_t *pTmp = (uint8_t*) uwd.frame1.buf;
// copy remain from delaybuf
- memcpy(pTmp, &m_delayBuf[0], noByteDelay);
+ memcpy(pTmp, &myDelayBuf[0], noByteDelay);
// copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
// copy remaining data to delay buf
- memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
-
- //memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize);
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame1.ts = ts;
uwd.frame1.fct = myEtiReader->getFCT();
@@ -683,108 +663,3 @@ const string OutputUHD::get_parameter(const string& parameter) const
return ss.str();
}
-void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector<std::string> &message)
-{
- int more = -1;
- size_t more_size = sizeof(more);
-
- while (more != 0)
- {
- zmq::message_t msg;
- pSocket->recv(&msg);
- message.push_back(std::string((char*)msg.data(), msg.size()));
- pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
- }
-}
-
-void OutputUHD::SendOkReply(zmq::socket_t *pSocket)
-{
- zmq::message_t msg(2);
- char repCode[2] = {'o', 'k'};
- memcpy ((void*) msg.data(), repCode, 2);
- pSocket->send(msg, 0);
-}
-
-void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error)
-{
- zmq::message_t msg1(4);
- char repCode[4] = {'f', 'a', 'i', 'l'};
- memcpy ((void*) msg1.data(), repCode, 4);
- pSocket->send(msg1, ZMQ_SNDMORE);
-
- zmq::message_t msg2(error.length());
- memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket->send(msg2, 0);
-}
-
-//TODO: Should be implemented as an alternative to RemoteControllerTelnet and
-//moved to the RemoteControl.h/cpp file instead.
-void OutputUHD::ZmqCtrl()
-{
- // create zmq reply socket for receiving ctrl parameters
- zmq::socket_t repSocket(*m_pContext, ZMQ_REP);
- std::cout << "Starting output UHD control thread" << std::endl;
- try
- {
- // connect the socket
- int hwm = 100;
- int linger = 0;
- repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
- repSocket.bind(m_zmqCtrlEndpoint.c_str());
-
- // create pollitem that polls the ZMQ sockets
- zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- for(;;)
- {
- zmq::poll(pollItems, 1, 100);
- std::vector<std::string> msg;
- if (pollItems[0].revents & ZMQ_POLLIN)
- {
- RecvAll(&repSocket, msg);
- std::string module((char*)msg[0].data(), msg[0].size());
- if (module == "uhd")
- {
- if (msg.size() < 2)
- {
- SendFailReply(&repSocket, "Wrong request format");
- continue;
- }
-
- std::string param((char*) msg[1].data(), msg[1].size());
- if (msg.size() == 2 && param == "ping")
- {
- SendOkReply(&repSocket);
- }
- else if (msg.size() != 3)
- {
- SendFailReply(&repSocket, "Wrong request format");
- }
- else
- {
- std::string value((char*) msg[2].data(), msg[2].size());
- try
- {
- set_parameter(param, value);
- SendOkReply(&repSocket);
- }
- catch (ParameterError &err)
- {
- SendFailReply(&repSocket, err.what());
- }
- }
- }
- }
-
- // check if thread is interrupted
- boost::this_thread::interruption_point();
- }
- }
- catch (boost::thread_interrupted&) {}
- catch (zmq::error_t &e)
- {
- std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
- }
- repSocket.close();
-}
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index 25f7476..60dfc65 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -48,7 +48,6 @@ DESCRIPTION:
#include <boost/shared_ptr.hpp>
#include <list>
#include <string>
-#include <zmq.hpp>
#include "Log.h"
#include "ModOutput.h"
@@ -184,9 +183,7 @@ class OutputUHD: public ModOutput, public RemoteControllable {
OutputUHD(
OutputUHDConfig& config,
- Logger& logger,
- zmq::context_t *pContext,
- const std::string &zmqCtrlEndpoint);
+ Logger& logger);
~OutputUHD();
int process(Buffer* dataIn, Buffer* dataOut);
@@ -226,20 +223,9 @@ class OutputUHD: public ModOutput, public RemoteControllable {
bool myMuting;
private:
- // zmq receiving method
- //TODO: Should be implemented as an alternative to RemoteControllerTelnet and
- //moved to the RemoteControl.h/cpp file instead.
- void ZmqCtrl(void);
- void RecvAll(zmq::socket_t* pSocket, std::vector<std::string> &message);
- void SendOkReply(zmq::socket_t *pSocket);
- void SendFailReply(zmq::socket_t *pSocket, const std::string &error);
-
// data
int myStaticDelay;
- std::vector<complexf> m_delayBuf;
- zmq::context_t *m_pContext;
- std::string m_zmqCtrlEndpoint;
- boost::thread *m_pZmqRepThread;
+ std::vector<complexf> myDelayBuf;
size_t lastLen;
};
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 5bbd2f8..c7c5914 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -246,3 +246,145 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
ignored_error);
}
+
+#if defined(HAVE_INPUT_ZEROMQ)
+
+void RemoteControllerZmq::restart()
+{
+ m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void RemoteControllerZmq::restart_thread()
+{
+ m_running = false;
+
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+
+ m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
+}
+
+void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message)
+{
+ int more = -1;
+ size_t more_size = sizeof(more);
+
+ while (more != 0)
+ {
+ zmq::message_t msg;
+ pSocket->recv(&msg);
+ message.push_back(std::string((char*)msg.data(), msg.size()));
+ pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
+ }
+}
+
+void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket)
+{
+ zmq::message_t msg(2);
+ char repCode[2] = {'o', 'k'};
+ memcpy ((void*) msg.data(), repCode, 2);
+ pSocket->send(msg, 0);
+}
+
+void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error)
+{
+ zmq::message_t msg1(4);
+ char repCode[4] = {'f', 'a', 'i', 'l'};
+ memcpy ((void*) msg1.data(), repCode, 4);
+ pSocket->send(msg1, ZMQ_SNDMORE);
+
+ zmq::message_t msg2(error.length());
+ memcpy ((void*) msg2.data(), error.c_str(), error.length());
+ pSocket->send(msg2, 0);
+}
+
+void RemoteControllerZmq::process()
+{
+ // create zmq reply socket for receiving ctrl parameters
+ zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
+ std::cout << "Starting zmq remote control thread" << std::endl;
+ try
+ {
+ // connect the socket
+ int hwm = 100;
+ int linger = 0;
+ repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
+ repSocket.bind(m_endpoint.c_str());
+
+ // create pollitem that polls the ZMQ sockets
+ zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
+ for(;;)
+ {
+ zmq::poll(pollItems, 1, 100);
+ std::vector<std::string> msg;
+ if (pollItems[0].revents & ZMQ_POLLIN)
+ {
+ recv_all(&repSocket, msg);
+ std::string command((char*)msg[0].data(), msg[0].size());
+
+ if (msg.size() == 1 && command == "ping")
+ {
+ send_ok_reply(&repSocket);
+ }
+ else if (msg.size() == 3 && command == "get")
+ {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+
+ try
+ {
+ std::string value = get_param_(module, parameter);
+ zmq::message_t *pMsg = new zmq::message_t(value.size());
+ memcpy ((void*) pMsg->data(), value.data(), value.size());
+ repSocket.send(*pMsg, 0);
+ delete pMsg;
+ }
+ catch (ParameterError &err)
+ {
+ send_fail_reply(&repSocket, err.what());
+ }
+ }
+ else if (msg.size() == 4 && command == "set")
+ {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+ std::string value((char*) msg[3].data(), msg[3].size());
+
+ try
+ {
+ set_param_(module, parameter, value);
+ send_ok_reply(&repSocket);
+ }
+ catch (ParameterError &err)
+ {
+ send_fail_reply(&repSocket, err.what());
+ }
+ }
+ else
+ send_fail_reply(&repSocket, "Unsupported command");
+ }
+
+ // check if thread is interrupted
+ boost::this_thread::interruption_point();
+ }
+ }
+ catch (boost::thread_interrupted&) {}
+ catch (zmq::error_t &e)
+ {
+ std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
+ }
+ catch (std::exception& e)
+ {
+ std::cerr << "Remote control caught exception: " << e.what() << std::endl;
+ m_fault = true;
+ }
+ repSocket.close();
+}
+#endif
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index 09e7492..7c830b2 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -29,6 +29,14 @@
#ifndef _REMOTECONTROL_H
#define _REMOTECONTROL_H
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#if defined(HAVE_INPUT_ZEROMQ)
+#include <zmq.hpp>
+#endif
+
#include <list>
#include <map>
#include <string>
@@ -85,6 +93,42 @@ class BaseRemoteController {
virtual ~BaseRemoteController() {}
};
+class RemoteControllers {
+/* Holds all our remote controllers, i.e. we may have more than
+ * one type of controller running.
+*/
+ public:
+ RemoteControllers() {}
+ virtual ~RemoteControllers() {}
+
+ void add_controller(BaseRemoteController *rc) {
+ m_controllers.push_back(rc);
+ }
+
+ void add_controllable(RemoteControllable *rc) {
+ for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin();
+ it != m_controllers.end(); ++it) {
+ (*it)->enrol(rc);
+ }
+ }
+
+ void check_faults() {
+ for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin();
+ it != m_controllers.end(); ++it) {
+ if ((*it)->fault_detected())
+ {
+ fprintf(stderr,
+ "Detected Remote Control fault, restarting it\n");
+ (*it)->restart();
+ }
+ }
+ }
+ size_t get_no_controllers() { return m_controllers.size(); }
+
+ private:
+ std::list<BaseRemoteController*> m_controllers;
+};
+
/* Objects that support remote control must implement the following class */
class RemoteControllable {
public:
@@ -100,8 +144,8 @@ class RemoteControllable {
virtual std::string get_rc_name() const { return m_name; }
/* Tell the controllable to enrol at the given controller */
- virtual void enrol_at(BaseRemoteController& controller) {
- controller.enrol(this);
+ virtual void enrol_at(RemoteControllers& controllers) {
+ controllers.add_controllable(this);
}
/* Return a list of possible parameters that can be set */
@@ -254,6 +298,89 @@ class RemoteControllerTelnet : public BaseRemoteController {
int m_port;
};
+#if defined(HAVE_INPUT_ZEROMQ)
+/* Implements a Remote controller using zmq transportlayer
+ * that listens on localhost
+ */
+class RemoteControllerZmq : public BaseRemoteController {
+ public:
+ RemoteControllerZmq()
+ : m_running(false), m_fault(false),
+ m_zmqContext(1),
+ m_endpoint("") { }
+
+ RemoteControllerZmq(std::string endpoint)
+ : m_running(true), m_fault(false),
+ m_child_thread(&RemoteControllerZmq::process, this),
+ m_zmqContext(1),
+ m_endpoint(endpoint)
+ { }
+
+ ~RemoteControllerZmq() {
+ m_running = false;
+ m_fault = false;
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+ }
+
+ void enrol(RemoteControllable* controllable) {
+ m_cohort.push_back(controllable);
+ }
+
+ virtual bool fault_detected() { return m_fault; }
+
+ virtual void restart();
+
+ private:
+ void restart_thread();
+
+ void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message);
+ void send_ok_reply(zmq::socket_t *pSocket);
+ void send_fail_reply(zmq::socket_t *pSocket, const std::string &error);
+ void process();
+
+
+ RemoteControllerZmq& operator=(const RemoteControllerZmq& other);
+ RemoteControllerZmq(const RemoteControllerZmq& other);
+
+ RemoteControllable* get_controllable_(std::string name) {
+ for (std::list<RemoteControllable*>::iterator it = m_cohort.begin();
+ it != m_cohort.end(); ++it) {
+ if ((*it)->get_rc_name() == name)
+ {
+ return *it;
+ }
+ }
+ throw ParameterError("Module name unknown");
+ }
+
+ std::string get_param_(std::string name, std::string param) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->get_parameter(param);
+ }
+
+ void set_param_(std::string name, std::string param, std::string value) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->set_parameter(param, value);
+ }
+
+ bool m_running;
+
+ /* This is set to true if a fault occurred */
+ bool m_fault;
+ boost::thread m_restarter_thread;
+
+ boost::thread m_child_thread;
+
+ /* This controller commands the controllables in the cohort */
+ std::list<RemoteControllable*> m_cohort;
+
+ zmq::context_t m_zmqContext;
+ std::string m_endpoint;
+};
+#endif
/* The Dummy remote controller does nothing, and never fails
*/