aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-04-10 11:58:37 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-04-10 11:58:37 +0200
commit5c3d2648abaedc18e36f2ba99bd70aec0df3b1be (patch)
treea9da05140e527f8ef7d1647e6b9a0e7d215057d4
parentc126ec3bfc44ab62017e7a75a1a1f49855f46f9a (diff)
downloaddabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.gz
dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.bz2
dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.zip
ZMQ output: Add REP socket type
-rw-r--r--doc/example.ini13
-rw-r--r--src/DabMod.cpp21
-rw-r--r--src/OutputZeroMQ.cpp28
-rw-r--r--src/OutputZeroMQ.h5
4 files changed, 56 insertions, 11 deletions
diff --git a/doc/example.ini b/doc/example.ini
index 3c51142..ee9d567 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -112,7 +112,7 @@ enabled=0
filtertapsfile=simple_taps.txt
[output]
-; choose output: possible values: uhd, file
+; choose output: possible values: uhd, file, zmq
output=uhd
[fileoutput]
@@ -193,6 +193,17 @@ pps_source=none
; possible values: ignore, crash
behaviour_refclk_lock_lost=ignore
+; section defining ZeroMQ output properties
+[zmqoutput]
+
+; on which port to listen for connections
+; please see the Transports section in man zmq
+; for more informat io the syntax
+listen=tcp://*:54001
+
+; what ZMQ socket type to use. Valid values: PUB, REP
+; Please see man zmq_socket for documentation
+socket_type=pub
; Used for SFN with the UHD output
[delaymanagement]
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 75e76e0..304d252 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -47,6 +47,7 @@
#include "RemoteControl.h"
#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <complex>
@@ -119,6 +120,7 @@ int main(int argc, char* argv[])
std::string outputName;
int useZeroMQOutput = 0;
+ std::string zmqOutputSocketType = "";
int useFileOutput = 0;
std::string fileOutputFormat = "complexf";
int useUHDOutput = 0;
@@ -563,6 +565,7 @@ int main(int argc, char* argv[])
#if defined(HAVE_ZEROMQ)
else if (output_selected == "zmq") {
outputName = pt.get<std::string>("zmqoutput.listen");
+ zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type");
useZeroMQOutput = 1;
}
#endif
@@ -676,8 +679,10 @@ int main(int argc, char* argv[])
#endif
else if (useZeroMQOutput) {
fprintf(stderr, " ZeroMQ\n"
- " Listening on: %s\n",
- outputName.c_str());
+ " Listening on: %s\n"
+ " Socket type : %s\n",
+ outputName.c_str(),
+ zmqOutputSocketType.c_str());
}
fprintf(stderr, " Sampling rate: ");
@@ -744,7 +749,17 @@ int main(int argc, char* argv[])
else if (useZeroMQOutput) {
/* We normalise the same way as for the UHD output */
normalise = 1.0f / normalise_factor;
- output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName));
+ if (zmqOutputSocketType == "pub") {
+ output = make_shared<OutputZeroMQ>(outputName, ZMQ_PUB);
+ }
+ else if (zmqOutputSocketType == "rep") {
+ output = make_shared<OutputZeroMQ>(outputName, ZMQ_REP);
+ }
+ else {
+ std::stringstream ss;
+ ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid";
+ throw std::invalid_argument(ss.str());
+ }
}
#endif
diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp
index 793e473..da4473e 100644
--- a/src/OutputZeroMQ.cpp
+++ b/src/OutputZeroMQ.cpp
@@ -32,19 +32,31 @@
#if defined(HAVE_ZEROMQ)
-OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut)
+OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)
: ModOutput(ModFormat(1), ModFormat(0)),
+ m_type(type),
m_zmq_context(1),
- m_zmq_pub_sock(m_zmq_context, ZMQ_PUB),
+ m_zmq_sock(m_zmq_context, type),
m_endpoint(endpoint)
{
PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this);
std::stringstream ss;
- ss << "OutputZeroMQ(" << m_endpoint << ")";
+ ss << "OutputZeroMQ(" << m_endpoint << " ";
+
+ if (type == ZMQ_PUB) {
+ ss << "ZMQ_PUB";
+ }
+ else if (type == ZMQ_REP) {
+ ss << "ZMQ_REP";
+ }
+ else {
+ throw std::invalid_argument("ZMQ socket type unknown");
+ }
+ ss << ")";
m_name = ss.str();
- m_zmq_pub_sock.bind(m_endpoint.c_str());
+ m_zmq_sock.bind(m_endpoint.c_str());
}
OutputZeroMQ::~OutputZeroMQ()
@@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut)
"(dataIn: %p, dataOut: %p)\n",
dataIn, dataOut);
- m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength());
+ if (m_type == ZMQ_REP) {
+ // A ZMQ_REP socket requires a request first
+ zmq::message_t msg;
+ m_zmq_sock.recv(&msg);
+ }
+
+ m_zmq_sock.send(dataIn->getData(), dataIn->getLength());
return dataIn->getLength();
}
diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h
index a80eab4..85f85a7 100644
--- a/src/OutputZeroMQ.h
+++ b/src/OutputZeroMQ.h
@@ -39,14 +39,15 @@
class OutputZeroMQ : public ModOutput
{
public:
- OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL);
+ OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL);
virtual ~OutputZeroMQ();
virtual int process(Buffer* dataIn, Buffer* dataOut);
const char* name() { return m_name.c_str(); }
protected:
+ int m_type; // zmq socket type
zmq::context_t m_zmq_context; // handle for the zmq context
- zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket
+ zmq::socket_t m_zmq_sock; // handle for the zmq publisher socket
std::string m_endpoint; // On which port to listen: e.g.
// tcp://*:58300