From 5c3d2648abaedc18e36f2ba99bd70aec0df3b1be Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 10 Apr 2015 11:58:37 +0200 Subject: ZMQ output: Add REP socket type --- src/DabMod.cpp | 21 ++++++++++++++++++--- src/OutputZeroMQ.cpp | 28 +++++++++++++++++++++++----- src/OutputZeroMQ.h | 5 +++-- 3 files changed, 44 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 75e76e0..304d252 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -47,6 +47,7 @@ #include "RemoteControl.h" #include +#include #include #include #include @@ -119,6 +120,7 @@ int main(int argc, char* argv[]) std::string outputName; int useZeroMQOutput = 0; + std::string zmqOutputSocketType = ""; int useFileOutput = 0; std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; @@ -563,6 +565,7 @@ int main(int argc, char* argv[]) #if defined(HAVE_ZEROMQ) else if (output_selected == "zmq") { outputName = pt.get("zmqoutput.listen"); + zmqOutputSocketType = pt.get("zmqoutput.socket_type"); useZeroMQOutput = 1; } #endif @@ -676,8 +679,10 @@ int main(int argc, char* argv[]) #endif else if (useZeroMQOutput) { fprintf(stderr, " ZeroMQ\n" - " Listening on: %s\n", - outputName.c_str()); + " Listening on: %s\n" + " Socket type : %s\n", + outputName.c_str(), + zmqOutputSocketType.c_str()); } fprintf(stderr, " Sampling rate: "); @@ -744,7 +749,17 @@ int main(int argc, char* argv[]) else if (useZeroMQOutput) { /* We normalise the same way as for the UHD output */ normalise = 1.0f / normalise_factor; - output = shared_ptr(new OutputZeroMQ(outputName)); + if (zmqOutputSocketType == "pub") { + output = make_shared(outputName, ZMQ_PUB); + } + else if (zmqOutputSocketType == "rep") { + output = make_shared(outputName, ZMQ_REP); + } + else { + std::stringstream ss; + ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid"; + throw std::invalid_argument(ss.str()); + } } #endif diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 793e473..da4473e 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -32,19 +32,31 @@ #if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) : ModOutput(ModFormat(1), ModFormat(0)), + m_type(type), m_zmq_context(1), - m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), + m_zmq_sock(m_zmq_context, type), m_endpoint(endpoint) { PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this); std::stringstream ss; - ss << "OutputZeroMQ(" << m_endpoint << ")"; + ss << "OutputZeroMQ(" << m_endpoint << " "; + + if (type == ZMQ_PUB) { + ss << "ZMQ_PUB"; + } + else if (type == ZMQ_REP) { + ss << "ZMQ_REP"; + } + else { + throw std::invalid_argument("ZMQ socket type unknown"); + } + ss << ")"; m_name = ss.str(); - m_zmq_pub_sock.bind(m_endpoint.c_str()); + m_zmq_sock.bind(m_endpoint.c_str()); } OutputZeroMQ::~OutputZeroMQ() @@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut) "(dataIn: %p, dataOut: %p)\n", dataIn, dataOut); - m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); + if (m_type == ZMQ_REP) { + // A ZMQ_REP socket requires a request first + zmq::message_t msg; + m_zmq_sock.recv(&msg); + } + + m_zmq_sock.send(dataIn->getData(), dataIn->getLength()); return dataIn->getLength(); } diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h index a80eab4..85f85a7 100644 --- a/src/OutputZeroMQ.h +++ b/src/OutputZeroMQ.h @@ -39,14 +39,15 @@ class OutputZeroMQ : public ModOutput { public: - OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL); + OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL); virtual ~OutputZeroMQ(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return m_name.c_str(); } protected: + int m_type; // zmq socket type zmq::context_t m_zmq_context; // handle for the zmq context - zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket + zmq::socket_t m_zmq_sock; // handle for the zmq publisher socket std::string m_endpoint; // On which port to listen: e.g. // tcp://*:58300 -- cgit v1.2.3