aboutsummaryrefslogtreecommitdiffstats
path: root/src/OutputUHD.cpp
diff options
context:
space:
mode:
authorJörgen Scott <jorgen.scott@gmail.com>2015-01-13 11:27:39 +0100
committerJörgen Scott <jorgen.scott@gmail.com>2015-01-13 11:27:39 +0100
commitba790cba2f7b48dd66f4418de0b7b366422926b0 (patch)
treea31f4fce6227b549561f53fb6f10e455070871f0 /src/OutputUHD.cpp
parent593c130b1e6848a08b30a84732ebd6862ef2e3b7 (diff)
downloaddabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.tar.gz
dabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.tar.bz2
dabmod-ba790cba2f7b48dd66f4418de0b7b366422926b0.zip
added zmq remote control
Diffstat (limited to 'src/OutputUHD.cpp')
-rw-r--r--src/OutputUHD.cpp137
1 files changed, 6 insertions, 131 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index e3d2d77..4776965 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -45,9 +45,7 @@ typedef std::complex<float> complexf;
OutputUHD::OutputUHD(
OutputUHDConfig& config,
- Logger& logger,
- zmq::context_t *pContext,
- const std::string &zmqCtrlEndpoint) :
+ Logger& logger) :
ModOutput(ModFormat(1), ModFormat(0)),
RemoteControllable("uhd"),
myLogger(logger),
@@ -56,7 +54,7 @@ OutputUHD::OutputUHD(
// the buffers at object initialisation.
first_run(true),
activebuffer(1),
- m_delayBuf(196608)
+ myDelayBuf(196608)
{
myMuting = 0; // is remote-controllable
@@ -227,14 +225,6 @@ OutputUHD::OutputUHD(
worker.start(&uwd);
- m_pZmqRepThread = NULL;
- if (!zmqCtrlEndpoint.empty())
- {
- m_pContext = pContext;
- m_zmqCtrlEndpoint = zmqCtrlEndpoint;
- m_pZmqRepThread = new boost::thread(boost::bind(&OutputUHD::ZmqCtrl, this));
- }
-
MDEBUG("OutputUHD:UHD ready.\n");
}
@@ -242,12 +232,6 @@ OutputUHD::OutputUHD(
OutputUHD::~OutputUHD()
{
MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this);
- if (m_pZmqRepThread != NULL)
- {
- m_pZmqRepThread->interrupt();
- m_pZmqRepThread->join();
- }
-
worker.stop();
if (!first_run) {
free(uwd.frame0.buf);
@@ -315,13 +299,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
if (activebuffer == 0) {
uint8_t *pTmp = (uint8_t*) uwd.frame0.buf;
// copy remain from delaybuf
- memcpy(pTmp, &m_delayBuf[0], noByteDelay);
+ memcpy(pTmp, &myDelayBuf[0], noByteDelay);
// copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
// copy remaining data to delay buf
- memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
-
- //memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize);
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame0.ts = ts;
uwd.frame0.fct = myEtiReader->getFCT();
@@ -329,13 +311,11 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
else if (activebuffer == 1) {
uint8_t *pTmp = (uint8_t*) uwd.frame1.buf;
// copy remain from delaybuf
- memcpy(pTmp, &m_delayBuf[0], noByteDelay);
+ memcpy(pTmp, &myDelayBuf[0], noByteDelay);
// copy new data
memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay);
// copy remaining data to delay buf
- memcpy(&m_delayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
-
- //memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize);
+ memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);
uwd.frame1.ts = ts;
uwd.frame1.fct = myEtiReader->getFCT();
@@ -683,108 +663,3 @@ const string OutputUHD::get_parameter(const string& parameter) const
return ss.str();
}
-void OutputUHD::RecvAll(zmq::socket_t* pSocket, std::vector<std::string> &message)
-{
- int more = -1;
- size_t more_size = sizeof(more);
-
- while (more != 0)
- {
- zmq::message_t msg;
- pSocket->recv(&msg);
- message.push_back(std::string((char*)msg.data(), msg.size()));
- pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
- }
-}
-
-void OutputUHD::SendOkReply(zmq::socket_t *pSocket)
-{
- zmq::message_t msg(2);
- char repCode[2] = {'o', 'k'};
- memcpy ((void*) msg.data(), repCode, 2);
- pSocket->send(msg, 0);
-}
-
-void OutputUHD::SendFailReply(zmq::socket_t *pSocket, const std::string &error)
-{
- zmq::message_t msg1(4);
- char repCode[4] = {'f', 'a', 'i', 'l'};
- memcpy ((void*) msg1.data(), repCode, 4);
- pSocket->send(msg1, ZMQ_SNDMORE);
-
- zmq::message_t msg2(error.length());
- memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket->send(msg2, 0);
-}
-
-//TODO: Should be implemented as an alternative to RemoteControllerTelnet and
-//moved to the RemoteControl.h/cpp file instead.
-void OutputUHD::ZmqCtrl()
-{
- // create zmq reply socket for receiving ctrl parameters
- zmq::socket_t repSocket(*m_pContext, ZMQ_REP);
- std::cout << "Starting output UHD control thread" << std::endl;
- try
- {
- // connect the socket
- int hwm = 100;
- int linger = 0;
- repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
- repSocket.bind(m_zmqCtrlEndpoint.c_str());
-
- // create pollitem that polls the ZMQ sockets
- zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- for(;;)
- {
- zmq::poll(pollItems, 1, 100);
- std::vector<std::string> msg;
- if (pollItems[0].revents & ZMQ_POLLIN)
- {
- RecvAll(&repSocket, msg);
- std::string module((char*)msg[0].data(), msg[0].size());
- if (module == "uhd")
- {
- if (msg.size() < 2)
- {
- SendFailReply(&repSocket, "Wrong request format");
- continue;
- }
-
- std::string param((char*) msg[1].data(), msg[1].size());
- if (msg.size() == 2 && param == "ping")
- {
- SendOkReply(&repSocket);
- }
- else if (msg.size() != 3)
- {
- SendFailReply(&repSocket, "Wrong request format");
- }
- else
- {
- std::string value((char*) msg[2].data(), msg[2].size());
- try
- {
- set_parameter(param, value);
- SendOkReply(&repSocket);
- }
- catch (ParameterError &err)
- {
- SendFailReply(&repSocket, err.what());
- }
- }
- }
- }
-
- // check if thread is interrupted
- boost::this_thread::interruption_point();
- }
- }
- catch (boost::thread_interrupted&) {}
- catch (zmq::error_t &e)
- {
- std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
- }
- repSocket.close();
-}