aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-26 18:22:56 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-26 18:22:56 +0100
commit77b7ee920e74d73178254a8f6d0954df6bd57ee9 (patch)
tree46872cefa78bc0ec21152e8fe6df0ecc6325c3d5
parent12f130a487a0119fa9f9fa2da33e0ab19b54cdda (diff)
downloaddabmux-77b7ee920e74d73178254a8f6d0954df6bd57ee9.tar.gz
dabmux-77b7ee920e74d73178254a8f6d0954df6bd57ee9.tar.bz2
dabmux-77b7ee920e74d73178254a8f6d0954df6bd57ee9.zip
Change ZMQ output message format
-rw-r--r--src/dabOutput/dabOutput.h60
-rw-r--r--src/dabOutput/dabOutputZMQ.cpp40
2 files changed, 88 insertions, 12 deletions
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 5d03184..1050de1 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -2,9 +2,11 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
+ Copyright (C) 2013, 2014 Matthias P. Braendli
http://mpb.li
+ http://opendigitalradio.org
+
An object-oriented version of the output channels.
*/
/*
@@ -247,27 +249,57 @@ class DabOutputSimul : public DabOutput
};
#if defined(HAVE_OUTPUT_ZEROMQ)
+
+#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
+/* A concatenation of four ETI frames,
+ * whose maximal size is 6144.
+ *
+ * If we transmit four frames in one zmq message,
+ * we do not risk breaking ETI vs. transmission frame
+ * phase.
+ *
+ * The frames are concatenated in buf, and
+ * their sizes is given in the buflen array.
+ *
+ * Most of the time, the buf will not be completely
+ * filled
+ */
+struct zmq_dab_message_t
+{
+ zmq_dab_message_t()
+ {
+ /* set buf lengths to invalid */
+ buflen[0] = -1;
+ buflen[1] = -1;
+ buflen[2] = -1;
+ buflen[3] = -1;
+
+ version = 1;
+ }
+ uint32_t version;
+ uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
+ uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
+};
+
// -------------- ZeroMQ message queue ------------------
class DabOutputZMQ : public DabOutput
{
public:
DabOutputZMQ() :
- zmq_proto_(""), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB)
+ zmq_proto_(""), zmq_context_(1),
+ zmq_pub_sock_(zmq_context_, ZMQ_PUB),
+ zmq_message_ix(0)
{
throw std::runtime_error("ERROR: No ZMQ protocol specified !");
}
DabOutputZMQ(std::string zmq_proto) :
- zmq_proto_(zmq_proto), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB)
+ zmq_proto_(zmq_proto), zmq_context_(1),
+ zmq_pub_sock_(zmq_context_, ZMQ_PUB),
+ zmq_message_ix(0)
{ }
- DabOutputZMQ(const DabOutputZMQ& other) :
- zmq_proto_(other.zmq_proto_), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB)
- {
- // cannot copy context
- }
-
~DabOutputZMQ()
{
zmq_pub_sock_.close();
@@ -277,9 +309,19 @@ class DabOutputZMQ : public DabOutput
int Write(void* buffer, int size);
int Close();
private:
+ DabOutputZMQ(const DabOutputZMQ& other) :
+ zmq_proto_(other.zmq_proto_), zmq_context_(1),
+ zmq_pub_sock_(zmq_context_, ZMQ_PUB)
+ {
+ /* Forbid copy constructor */
+ }
+
std::string zmq_proto_;
zmq::context_t zmq_context_; // handle for the zmq context
zmq::socket_t zmq_pub_sock_; // handle for the zmq publisher socket
+
+ zmq_dab_message_t zmq_message;
+ int zmq_message_ix;
};
#endif
diff --git a/src/dabOutput/dabOutputZMQ.cpp b/src/dabOutput/dabOutputZMQ.cpp
index cbb3445..ac26403 100644
--- a/src/dabOutput/dabOutputZMQ.cpp
+++ b/src/dabOutput/dabOutputZMQ.cpp
@@ -2,9 +2,11 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
+ Copyright (C) 2013, 2014 Matthias P. Braendli
http://mpb.li
+ http://opendigitalradio.org
+
ZeroMQ output. see www.zeromq.org for more info
From the ZeroMQ manpage 'zmq':
@@ -66,9 +68,40 @@ int DabOutputZMQ::Open(const char* endpoint)
int DabOutputZMQ::Write(void* buffer, int size)
{
- int flags = 0;
+ int offset = 0;
+
+ for (int i = 0; i < zmq_message_ix; i++) {
+ offset += zmq_message.buflen[i];
+ }
+
+ if (offset + size > NUM_FRAMES_PER_ZMQ_MESSAGE*6144) {
+ throw std::runtime_error("FAULT: invalid ETI frame size!");
+ }
+ memcpy(zmq_message.buf + offset, buffer, size);
+ zmq_message.buflen[zmq_message_ix] = size;
+ zmq_message_ix++;
+
+ if (zmq_message_ix == NUM_FRAMES_PER_ZMQ_MESSAGE) {
+ int full_length = 0;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ full_length += zmq_message.buflen[i];
+ }
- return zmq_send(zmq_pub_sock_, buffer, size, flags);
+ zmq_message_ix = 0;
+
+ const int flags = 0;
+ zmq_send(zmq_pub_sock_,
+ (uint8_t*)&zmq_message,
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*zmq_message.buflen) +
+ full_length, flags);
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ zmq_message.buflen[i] = -1;
+ }
+ }
+
+ return size;
}
@@ -78,3 +111,4 @@ int DabOutputZMQ::Close()
}
#endif
+