From 77b7ee920e74d73178254a8f6d0954df6bd57ee9 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 26 Mar 2014 18:22:56 +0100 Subject: Change ZMQ output message format --- src/dabOutput/dabOutput.h | 60 +++++++++++++++++++++++++++++++++++------- src/dabOutput/dabOutputZMQ.cpp | 40 +++++++++++++++++++++++++--- 2 files changed, 88 insertions(+), 12 deletions(-) diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 5d03184..1050de1 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -2,9 +2,11 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli + Copyright (C) 2013, 2014 Matthias P. Braendli http://mpb.li + http://opendigitalradio.org + An object-oriented version of the output channels. */ /* @@ -247,27 +249,57 @@ class DabOutputSimul : public DabOutput }; #if defined(HAVE_OUTPUT_ZEROMQ) + +#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 +/* A concatenation of four ETI frames, + * whose maximal size is 6144. + * + * If we transmit four frames in one zmq message, + * we do not risk breaking ETI vs. transmission frame + * phase. + * + * The frames are concatenated in buf, and + * their sizes is given in the buflen array. + * + * Most of the time, the buf will not be completely + * filled + */ +struct zmq_dab_message_t +{ + zmq_dab_message_t() + { + /* set buf lengths to invalid */ + buflen[0] = -1; + buflen[1] = -1; + buflen[2] = -1; + buflen[3] = -1; + + version = 1; + } + uint32_t version; + uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; + uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; +}; + // -------------- ZeroMQ message queue ------------------ class DabOutputZMQ : public DabOutput { public: DabOutputZMQ() : - zmq_proto_(""), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) + zmq_proto_(""), zmq_context_(1), + zmq_pub_sock_(zmq_context_, ZMQ_PUB), + zmq_message_ix(0) { throw std::runtime_error("ERROR: No ZMQ protocol specified !"); } DabOutputZMQ(std::string zmq_proto) : - zmq_proto_(zmq_proto), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) + zmq_proto_(zmq_proto), zmq_context_(1), + zmq_pub_sock_(zmq_context_, ZMQ_PUB), + zmq_message_ix(0) { } - DabOutputZMQ(const DabOutputZMQ& other) : - zmq_proto_(other.zmq_proto_), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) - { - // cannot copy context - } - ~DabOutputZMQ() { zmq_pub_sock_.close(); @@ -277,9 +309,19 @@ class DabOutputZMQ : public DabOutput int Write(void* buffer, int size); int Close(); private: + DabOutputZMQ(const DabOutputZMQ& other) : + zmq_proto_(other.zmq_proto_), zmq_context_(1), + zmq_pub_sock_(zmq_context_, ZMQ_PUB) + { + /* Forbid copy constructor */ + } + std::string zmq_proto_; zmq::context_t zmq_context_; // handle for the zmq context zmq::socket_t zmq_pub_sock_; // handle for the zmq publisher socket + + zmq_dab_message_t zmq_message; + int zmq_message_ix; }; #endif diff --git a/src/dabOutput/dabOutputZMQ.cpp b/src/dabOutput/dabOutputZMQ.cpp index cbb3445..ac26403 100644 --- a/src/dabOutput/dabOutputZMQ.cpp +++ b/src/dabOutput/dabOutputZMQ.cpp @@ -2,9 +2,11 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli + Copyright (C) 2013, 2014 Matthias P. Braendli http://mpb.li + http://opendigitalradio.org + ZeroMQ output. see www.zeromq.org for more info From the ZeroMQ manpage 'zmq': @@ -66,9 +68,40 @@ int DabOutputZMQ::Open(const char* endpoint) int DabOutputZMQ::Write(void* buffer, int size) { - int flags = 0; + int offset = 0; + + for (int i = 0; i < zmq_message_ix; i++) { + offset += zmq_message.buflen[i]; + } + + if (offset + size > NUM_FRAMES_PER_ZMQ_MESSAGE*6144) { + throw std::runtime_error("FAULT: invalid ETI frame size!"); + } + memcpy(zmq_message.buf + offset, buffer, size); + zmq_message.buflen[zmq_message_ix] = size; + zmq_message_ix++; + + if (zmq_message_ix == NUM_FRAMES_PER_ZMQ_MESSAGE) { + int full_length = 0; + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + full_length += zmq_message.buflen[i]; + } - return zmq_send(zmq_pub_sock_, buffer, size, flags); + zmq_message_ix = 0; + + const int flags = 0; + zmq_send(zmq_pub_sock_, + (uint8_t*)&zmq_message, + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*zmq_message.buflen) + + full_length, flags); + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + zmq_message.buflen[i] = -1; + } + } + + return size; } @@ -78,3 +111,4 @@ int DabOutputZMQ::Close() } #endif + -- cgit v1.2.3