summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/example.ini13
-rw-r--r--src/DabMod.cpp14
-rw-r--r--src/OutputUHD.cpp56
-rw-r--r--src/OutputUHD.h8
-rw-r--r--src/RemoteControl.cpp197
-rw-r--r--src/RemoteControl.h64
6 files changed, 172 insertions, 180 deletions
diff --git a/doc/example.ini b/doc/example.ini
index 9a80eeb..ecb7440 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -9,14 +9,15 @@ telnet=1
telnetport=2121
; Enable zmq remote control.
-; The zmq remote control is intended for machine-to-machine
-; integration and requires that the odr-mod is build with zmq support.
-; The zmq remote control may run in parallell with Telnet.
+; The zmq remote control is intended for machine-to-machine
+; integration and requires that ODR-DabMod is built with zmq support.
+; The zmq remote control may run in parallel with Telnet.
+;
; Protocol:
-; The odr-dabmod binds a zmq rep socket so clients must connect
+; ODR-DabMod binds a zmq rep socket so clients must connect
; using either req or dealer socket.
; [] denotes message part as zmq multi-part message are used for delimitation.
-; All message parts are utf-8 encoded strings and matches the Telnet command set.
+; All message parts are utf-8 encoded strings and match the Telnet command set.
; Explicit codes are denoted with "".
; The following commands are supported:
; REQ: ["ping"]
@@ -163,7 +164,7 @@ channel=13C
; The reference clock to use.
; possible values : internal, external, MIMO
-refclk_source=external
+refclk_source=internal
; The reference one pulse-per second to use
; possible values : none, external, MIMO
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index dadade9..ea6334f 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -55,7 +55,6 @@
#include <sys/stat.h>
#include <stdexcept>
#include <signal.h>
-//#include <zmq.hpp>
#ifdef HAVE_NETINET_IN_H
# include <netinet/in.h>
@@ -190,9 +189,6 @@ int main(int argc, char* argv[])
OutputUHDConfig outputuhd_conf;
#endif
- //zmq::context_t zmqCtrlContext(1);
- //std::string zmqCtrlEndpoint = "";
-
// To handle the timestamp offset of the modulator
struct modulator_offset_config modconf;
modconf.use_offset_file = false;
@@ -204,7 +200,6 @@ int main(int argc, char* argv[])
InputMemory* input = NULL;
ModOutput* output = NULL;
- //BaseRemoteController* rc = NULL;
RemoteControllers rcs;
Logger logger;
@@ -371,9 +366,8 @@ int main(int argc, char* argv[])
#if defined(HAVE_INPUT_ZEROMQ)
if (pt.get("remotecontrol.zmqctrl", 0) == 1) {
try {
- std::string zmqCtrlEndpoint =
- pt.get("remotecontrol.zmqctrlendpoint", "");
- std::cout << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl;
+ std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", "");
+ std::cerr << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl;
RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint);
rcs.add_controller(zmqrc);
}
@@ -720,7 +714,7 @@ int main(int argc, char* argv[])
outputuhd_conf.sampleRate = outputRate;
try {
- output = new OutputUHD(outputuhd_conf, logger/*, &zmqCtrlContext, zmqCtrlEndpoint*/);
+ output = new OutputUHD(outputuhd_conf, logger);
((OutputUHD*)output)->enrol_at(rcs);
}
catch (std::exception& e) {
@@ -773,7 +767,7 @@ int main(int argc, char* argv[])
/* Check every once in a while if the remote control
* is still working */
if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) {
- rcs.check_faults();
+ rcs.check_faults();
}
}
if (framesize == 0) {
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index 4776965..c7770fa 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -54,7 +54,7 @@ OutputUHD::OutputUHD(
// the buffers at object initialisation.
first_run(true),
activebuffer(1),
- myDelayBuf(196608)
+ myDelayBuf(196608)
{
myMuting = 0; // is remote-controllable
@@ -233,10 +233,10 @@ OutputUHD::~OutputUHD()
{
MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this);
worker.stop();
- if (!first_run) {
- free(uwd.frame0.buf);
- free(uwd.frame1.buf);
- }
+ if (!first_run) {
+ free(uwd.frame0.buf);
+ free(uwd.frame1.buf);
+ }
}
int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
@@ -291,31 +291,31 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
uwd.sourceContainsTimestamp = myConf.enableSync &&
myEtiReader->sourceContainsTimestamp();
- // calculate delay
- uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000;
- uint32_t noByteDelay = noSampleDelay * sizeof(complexf);
+ // calculate delay
+ uint32_t noSampleDelay = (myStaticDelay * 2048) / 1000;
+ uint32_t noByteDelay = noSampleDelay * sizeof(complexf);
- uint8_t* pInData = (uint8_t*) dataIn->getData();
+ uint8_t* pInData = (uint8_t*) dataIn->getData();
if (activebuffer == 0) {
- uint8_t *pTmp = (uint8_t*) uwd.frame0.buf;
- // copy remain from delaybuf
+ uint8_t *pTmp = (uint8_t*) uwd.frame0.buf;
+ // copy remain from delaybuf
memcpy(pTmp, &myDelayBuf[0], noByteDelay);
- // copy new data
+ // copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
- // copy remaining data to delay buf
- memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
+ // copy remaining data to delay buf
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame0.ts = ts;
uwd.frame0.fct = myEtiReader->getFCT();
}
else if (activebuffer == 1) {
- uint8_t *pTmp = (uint8_t*) uwd.frame1.buf;
- // copy remain from delaybuf
+ uint8_t *pTmp = (uint8_t*) uwd.frame1.buf;
+ // copy remain from delaybuf
memcpy(pTmp, &myDelayBuf[0], noByteDelay);
- // copy new data
+ // copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
- // copy remaining data to delay buf
- memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
+ // copy remaining data to delay buf
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame1.ts = ts;
uwd.frame1.fct = myEtiReader->getFCT();
@@ -617,15 +617,15 @@ void OutputUHD::set_parameter(const string& parameter, const string& value)
ss >> myMuting;
}
else if (parameter == "staticdelay") {
- int adjust;
- ss >> adjust;
- int newStaticDelay = myStaticDelay + adjust;
- if (newStaticDelay > 96000)
- myStaticDelay = newStaticDelay - 96000;
- else if (newStaticDelay < 0)
- myStaticDelay = newStaticDelay + 96000;
- else
- myStaticDelay = newStaticDelay;
+ int adjust;
+ ss >> adjust;
+ int newStaticDelay = myStaticDelay + adjust;
+ if (newStaticDelay > 96000)
+ myStaticDelay = newStaticDelay - 96000;
+ else if (newStaticDelay < 0)
+ myStaticDelay = newStaticDelay + 96000;
+ else
+ myStaticDelay = newStaticDelay;
}
else if (parameter == "iqbalance") {
ss >> myConf.frequency;
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index 60dfc65..90d9d1b 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -222,10 +222,10 @@ class OutputUHD: public ModOutput, public RemoteControllable {
// muting can only be changed using the remote control
bool myMuting;
- private:
- // data
- int myStaticDelay;
- std::vector<complexf> myDelayBuf;
+ private:
+ // data
+ int myStaticDelay;
+ std::vector<complexf> myDelayBuf;
size_t lastLen;
};
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index c7c5914..6f538dc 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -271,120 +271,121 @@ void RemoteControllerZmq::restart_thread()
void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message)
{
- int more = -1;
- size_t more_size = sizeof(more);
-
- while (more != 0)
- {
- zmq::message_t msg;
- pSocket->recv(&msg);
- message.push_back(std::string((char*)msg.data(), msg.size()));
- pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
- }
+ int more = -1;
+ size_t more_size = sizeof(more);
+
+ while (more != 0)
+ {
+ zmq::message_t msg;
+ pSocket->recv(&msg);
+ message.push_back(std::string((char*)msg.data(), msg.size()));
+ pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
+ }
}
void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket)
{
- zmq::message_t msg(2);
- char repCode[2] = {'o', 'k'};
- memcpy ((void*) msg.data(), repCode, 2);
- pSocket->send(msg, 0);
+ zmq::message_t msg(2);
+ char repCode[2] = {'o', 'k'};
+ memcpy ((void*) msg.data(), repCode, 2);
+ pSocket->send(msg, 0);
}
void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error)
{
- zmq::message_t msg1(4);
- char repCode[4] = {'f', 'a', 'i', 'l'};
- memcpy ((void*) msg1.data(), repCode, 4);
- pSocket->send(msg1, ZMQ_SNDMORE);
-
- zmq::message_t msg2(error.length());
- memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket->send(msg2, 0);
+ zmq::message_t msg1(4);
+ char repCode[4] = {'f', 'a', 'i', 'l'};
+ memcpy ((void*) msg1.data(), repCode, 4);
+ pSocket->send(msg1, ZMQ_SNDMORE);
+
+ zmq::message_t msg2(error.length());
+ memcpy ((void*) msg2.data(), error.c_str(), error.length());
+ pSocket->send(msg2, 0);
}
void RemoteControllerZmq::process()
{
- // create zmq reply socket for receiving ctrl parameters
- zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
- std::cout << "Starting zmq remote control thread" << std::endl;
- try
- {
- // connect the socket
- int hwm = 100;
- int linger = 0;
- repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
- repSocket.bind(m_endpoint.c_str());
-
- // create pollitem that polls the ZMQ sockets
- zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- for(;;)
- {
- zmq::poll(pollItems, 1, 100);
- std::vector<std::string> msg;
- if (pollItems[0].revents & ZMQ_POLLIN)
- {
- recv_all(&repSocket, msg);
- std::string command((char*)msg[0].data(), msg[0].size());
-
- if (msg.size() == 1 && command == "ping")
- {
- send_ok_reply(&repSocket);
- }
- else if (msg.size() == 3 && command == "get")
- {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
-
- try
- {
- std::string value = get_param_(module, parameter);
- zmq::message_t *pMsg = new zmq::message_t(value.size());
- memcpy ((void*) pMsg->data(), value.data(), value.size());
- repSocket.send(*pMsg, 0);
- delete pMsg;
- }
- catch (ParameterError &err)
- {
- send_fail_reply(&repSocket, err.what());
- }
- }
- else if (msg.size() == 4 && command == "set")
- {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
- std::string value((char*) msg[3].data(), msg[3].size());
-
- try
- {
- set_param_(module, parameter, value);
- send_ok_reply(&repSocket);
- }
- catch (ParameterError &err)
- {
- send_fail_reply(&repSocket, err.what());
- }
- }
- else
- send_fail_reply(&repSocket, "Unsupported command");
- }
-
- // check if thread is interrupted
- boost::this_thread::interruption_point();
- }
- }
- catch (boost::thread_interrupted&) {}
- catch (zmq::error_t &e)
- {
- std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
- }
+ // create zmq reply socket for receiving ctrl parameters
+ zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
+ std::cout << "Starting zmq remote control thread" << std::endl;
+ try
+ {
+ // connect the socket
+ int hwm = 100;
+ int linger = 0;
+ repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
+ repSocket.bind(m_endpoint.c_str());
+
+ // create pollitem that polls the ZMQ sockets
+ zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
+ for(;;)
+ {
+ zmq::poll(pollItems, 1, 100);
+ std::vector<std::string> msg;
+ if (pollItems[0].revents & ZMQ_POLLIN)
+ {
+ recv_all(&repSocket, msg);
+ std::string command((char*)msg[0].data(), msg[0].size());
+
+ if (msg.size() == 1 && command == "ping")
+ {
+ send_ok_reply(&repSocket);
+ }
+ else if (msg.size() == 3 && command == "get")
+ {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+
+ try
+ {
+ std::string value = get_param_(module, parameter);
+ zmq::message_t *pMsg = new zmq::message_t(value.size());
+ memcpy ((void*) pMsg->data(), value.data(), value.size());
+ repSocket.send(*pMsg, 0);
+ delete pMsg;
+ }
+ catch (ParameterError &err)
+ {
+ send_fail_reply(&repSocket, err.what());
+ }
+ }
+ else if (msg.size() == 4 && command == "set")
+ {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+ std::string value((char*) msg[3].data(), msg[3].size());
+
+ try
+ {
+ set_param_(module, parameter, value);
+ send_ok_reply(&repSocket);
+ }
+ catch (ParameterError &err)
+ {
+ send_fail_reply(&repSocket, err.what());
+ }
+ }
+ else
+ send_fail_reply(&repSocket, "Unsupported command");
+ }
+
+ // check if thread is interrupted
+ boost::this_thread::interruption_point();
+ }
+ }
+ catch (boost::thread_interrupted&) {}
+ catch (zmq::error_t &e)
+ {
+ std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
+ }
catch (std::exception& e)
{
std::cerr << "Remote control caught exception: " << e.what() << std::endl;
m_fault = true;
}
- repSocket.close();
+ repSocket.close();
}
#endif
+
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index 7c830b2..905e153 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -93,40 +93,37 @@ class BaseRemoteController {
virtual ~BaseRemoteController() {}
};
-class RemoteControllers {
/* Holds all our remote controllers, i.e. we may have more than
* one type of controller running.
-*/
+ */
+class RemoteControllers {
public:
- RemoteControllers() {}
- virtual ~RemoteControllers() {}
-
- void add_controller(BaseRemoteController *rc) {
- m_controllers.push_back(rc);
- }
+ void add_controller(BaseRemoteController *rc) {
+ m_controllers.push_back(rc);
+ }
- void add_controllable(RemoteControllable *rc) {
+ void add_controllable(RemoteControllable *rc) {
for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin();
it != m_controllers.end(); ++it) {
- (*it)->enrol(rc);
- }
- }
-
+ (*it)->enrol(rc);
+ }
+ }
+
void check_faults() {
for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin();
it != m_controllers.end(); ++it) {
- if ((*it)->fault_detected())
- {
- fprintf(stderr,
- "Detected Remote Control fault, restarting it\n");
- (*it)->restart();
- }
- }
- }
- size_t get_no_controllers() { return m_controllers.size(); }
-
- private:
- std::list<BaseRemoteController*> m_controllers;
+ if ((*it)->fault_detected())
+ {
+ fprintf(stderr,
+ "Detected Remote Control fault, restarting it\n");
+ (*it)->restart();
+ }
+ }
+ }
+ size_t get_no_controllers() { return m_controllers.size(); }
+
+ private:
+ std::list<BaseRemoteController*> m_controllers;
};
/* Objects that support remote control must implement the following class */
@@ -306,15 +303,14 @@ class RemoteControllerZmq : public BaseRemoteController {
public:
RemoteControllerZmq()
: m_running(false), m_fault(false),
- m_zmqContext(1),
+ m_zmqContext(1),
m_endpoint("") { }
RemoteControllerZmq(std::string endpoint)
: m_running(true), m_fault(false),
m_child_thread(&RemoteControllerZmq::process, this),
- m_zmqContext(1),
- m_endpoint(endpoint)
- { }
+ m_zmqContext(1),
+ m_endpoint(endpoint) { }
~RemoteControllerZmq() {
m_running = false;
@@ -336,9 +332,9 @@ class RemoteControllerZmq : public BaseRemoteController {
private:
void restart_thread();
- void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message);
- void send_ok_reply(zmq::socket_t *pSocket);
- void send_fail_reply(zmq::socket_t *pSocket, const std::string &error);
+ void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message);
+ void send_ok_reply(zmq::socket_t *pSocket);
+ void send_fail_reply(zmq::socket_t *pSocket, const std::string &error);
void process();
@@ -377,8 +373,8 @@ class RemoteControllerZmq : public BaseRemoteController {
/* This controller commands the controllables in the cohort */
std::list<RemoteControllable*> m_cohort;
- zmq::context_t m_zmqContext;
- std::string m_endpoint;
+ zmq::context_t m_zmqContext;
+ std::string m_endpoint;
};
#endif