summaryrefslogtreecommitdiffstats
path: root/src/OutputZeroMQ.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-04-10 11:58:37 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-04-10 11:58:37 +0200
commit5c3d2648abaedc18e36f2ba99bd70aec0df3b1be (patch)
treea9da05140e527f8ef7d1647e6b9a0e7d215057d4 /src/OutputZeroMQ.cpp
parentc126ec3bfc44ab62017e7a75a1a1f49855f46f9a (diff)
downloaddabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.gz
dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.bz2
dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.zip
ZMQ output: Add REP socket type
Diffstat (limited to 'src/OutputZeroMQ.cpp')
-rw-r--r--src/OutputZeroMQ.cpp28
1 files changed, 23 insertions, 5 deletions
diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp
index 793e473..da4473e 100644
--- a/src/OutputZeroMQ.cpp
+++ b/src/OutputZeroMQ.cpp
@@ -32,19 +32,31 @@
#if defined(HAVE_ZEROMQ)
-OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut)
+OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)
: ModOutput(ModFormat(1), ModFormat(0)),
+ m_type(type),
m_zmq_context(1),
- m_zmq_pub_sock(m_zmq_context, ZMQ_PUB),
+ m_zmq_sock(m_zmq_context, type),
m_endpoint(endpoint)
{
PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this);
std::stringstream ss;
- ss << "OutputZeroMQ(" << m_endpoint << ")";
+ ss << "OutputZeroMQ(" << m_endpoint << " ";
+
+ if (type == ZMQ_PUB) {
+ ss << "ZMQ_PUB";
+ }
+ else if (type == ZMQ_REP) {
+ ss << "ZMQ_REP";
+ }
+ else {
+ throw std::invalid_argument("ZMQ socket type unknown");
+ }
+ ss << ")";
m_name = ss.str();
- m_zmq_pub_sock.bind(m_endpoint.c_str());
+ m_zmq_sock.bind(m_endpoint.c_str());
}
OutputZeroMQ::~OutputZeroMQ()
@@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut)
"(dataIn: %p, dataOut: %p)\n",
dataIn, dataOut);
- m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength());
+ if (m_type == ZMQ_REP) {
+ // A ZMQ_REP socket requires a request first
+ zmq::message_t msg;
+ m_zmq_sock.recv(&msg);
+ }
+
+ m_zmq_sock.send(dataIn->getData(), dataIn->getLength());
return dataIn->getLength();
}