diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-04-10 11:58:37 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-04-10 11:58:37 +0200 |
commit | 5c3d2648abaedc18e36f2ba99bd70aec0df3b1be (patch) | |
tree | a9da05140e527f8ef7d1647e6b9a0e7d215057d4 /src/OutputZeroMQ.cpp | |
parent | c126ec3bfc44ab62017e7a75a1a1f49855f46f9a (diff) | |
download | dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.gz dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.tar.bz2 dabmod-5c3d2648abaedc18e36f2ba99bd70aec0df3b1be.zip |
ZMQ output: Add REP socket type
Diffstat (limited to 'src/OutputZeroMQ.cpp')
-rw-r--r-- | src/OutputZeroMQ.cpp | 28 |
1 files changed, 23 insertions, 5 deletions
diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 793e473..da4473e 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -32,19 +32,31 @@ #if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) : ModOutput(ModFormat(1), ModFormat(0)), + m_type(type), m_zmq_context(1), - m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), + m_zmq_sock(m_zmq_context, type), m_endpoint(endpoint) { PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this); std::stringstream ss; - ss << "OutputZeroMQ(" << m_endpoint << ")"; + ss << "OutputZeroMQ(" << m_endpoint << " "; + + if (type == ZMQ_PUB) { + ss << "ZMQ_PUB"; + } + else if (type == ZMQ_REP) { + ss << "ZMQ_REP"; + } + else { + throw std::invalid_argument("ZMQ socket type unknown"); + } + ss << ")"; m_name = ss.str(); - m_zmq_pub_sock.bind(m_endpoint.c_str()); + m_zmq_sock.bind(m_endpoint.c_str()); } OutputZeroMQ::~OutputZeroMQ() @@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut) "(dataIn: %p, dataOut: %p)\n", dataIn, dataOut); - m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); + if (m_type == ZMQ_REP) { + // A ZMQ_REP socket requires a request first + zmq::message_t msg; + m_zmq_sock.recv(&msg); + } + + m_zmq_sock.send(dataIn->getData(), dataIn->getLength()); return dataIn->getLength(); } |