diff options
-rw-r--r-- | doc/example.ini | 13 | ||||
-rw-r--r-- | src/DabMod.cpp | 21 | ||||
-rw-r--r-- | src/OutputZeroMQ.cpp | 28 | ||||
-rw-r--r-- | src/OutputZeroMQ.h | 5 |
4 files changed, 56 insertions, 11 deletions
diff --git a/doc/example.ini b/doc/example.ini index 3c51142..ee9d567 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -112,7 +112,7 @@ enabled=0 filtertapsfile=simple_taps.txt [output] -; choose output: possible values: uhd, file +; choose output: possible values: uhd, file, zmq output=uhd [fileoutput] @@ -193,6 +193,17 @@ pps_source=none ; possible values: ignore, crash behaviour_refclk_lock_lost=ignore +; section defining ZeroMQ output properties +[zmqoutput] + +; on which port to listen for connections +; please see the Transports section in man zmq +; for more informat io the syntax +listen=tcp://*:54001 + +; what ZMQ socket type to use. Valid values: PUB, REP +; Please see man zmq_socket for documentation +socket_type=pub ; Used for SFN with the UHD output [delaymanagement] diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 75e76e0..304d252 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -47,6 +47,7 @@ #include "RemoteControl.h" #include <boost/shared_ptr.hpp> +#include <boost/make_shared.hpp> #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ini_parser.hpp> #include <complex> @@ -119,6 +120,7 @@ int main(int argc, char* argv[]) std::string outputName; int useZeroMQOutput = 0; + std::string zmqOutputSocketType = ""; int useFileOutput = 0; std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; @@ -563,6 +565,7 @@ int main(int argc, char* argv[]) #if defined(HAVE_ZEROMQ) else if (output_selected == "zmq") { outputName = pt.get<std::string>("zmqoutput.listen"); + zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type"); useZeroMQOutput = 1; } #endif @@ -676,8 +679,10 @@ int main(int argc, char* argv[]) #endif else if (useZeroMQOutput) { fprintf(stderr, " ZeroMQ\n" - " Listening on: %s\n", - outputName.c_str()); + " Listening on: %s\n" + " Socket type : %s\n", + outputName.c_str(), + zmqOutputSocketType.c_str()); } fprintf(stderr, " Sampling rate: "); @@ -744,7 +749,17 @@ int main(int argc, char* argv[]) else if (useZeroMQOutput) { /* We normalise the same way as for the UHD output */ normalise = 1.0f / normalise_factor; - output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName)); + if (zmqOutputSocketType == "pub") { + output = make_shared<OutputZeroMQ>(outputName, ZMQ_PUB); + } + else if (zmqOutputSocketType == "rep") { + output = make_shared<OutputZeroMQ>(outputName, ZMQ_REP); + } + else { + std::stringstream ss; + ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid"; + throw std::invalid_argument(ss.str()); + } } #endif diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 793e473..da4473e 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -32,19 +32,31 @@ #if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) : ModOutput(ModFormat(1), ModFormat(0)), + m_type(type), m_zmq_context(1), - m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), + m_zmq_sock(m_zmq_context, type), m_endpoint(endpoint) { PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this); std::stringstream ss; - ss << "OutputZeroMQ(" << m_endpoint << ")"; + ss << "OutputZeroMQ(" << m_endpoint << " "; + + if (type == ZMQ_PUB) { + ss << "ZMQ_PUB"; + } + else if (type == ZMQ_REP) { + ss << "ZMQ_REP"; + } + else { + throw std::invalid_argument("ZMQ socket type unknown"); + } + ss << ")"; m_name = ss.str(); - m_zmq_pub_sock.bind(m_endpoint.c_str()); + m_zmq_sock.bind(m_endpoint.c_str()); } OutputZeroMQ::~OutputZeroMQ() @@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut) "(dataIn: %p, dataOut: %p)\n", dataIn, dataOut); - m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); + if (m_type == ZMQ_REP) { + // A ZMQ_REP socket requires a request first + zmq::message_t msg; + m_zmq_sock.recv(&msg); + } + + m_zmq_sock.send(dataIn->getData(), dataIn->getLength()); return dataIn->getLength(); } diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h index a80eab4..85f85a7 100644 --- a/src/OutputZeroMQ.h +++ b/src/OutputZeroMQ.h @@ -39,14 +39,15 @@ class OutputZeroMQ : public ModOutput { public: - OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL); + OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL); virtual ~OutputZeroMQ(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return m_name.c_str(); } protected: + int m_type; // zmq socket type zmq::context_t m_zmq_context; // handle for the zmq context - zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket + zmq::socket_t m_zmq_sock; // handle for the zmq publisher socket std::string m_endpoint; // On which port to listen: e.g. // tcp://*:58300 |