diff options
| -rw-r--r-- | src/dabOutput/dabOutput.h | 60 | ||||
| -rw-r--r-- | src/dabOutput/dabOutputZMQ.cpp | 40 | 
2 files changed, 88 insertions, 12 deletions
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 5d03184..1050de1 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -2,9 +2,11 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2013 Matthias P. Braendli +   Copyright (C) 2013, 2014 Matthias P. Braendli     http://mpb.li +    http://opendigitalradio.org +     An object-oriented version of the output channels.     */  /* @@ -247,27 +249,57 @@ class DabOutputSimul : public DabOutput  };  #if defined(HAVE_OUTPUT_ZEROMQ) + +#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 +/* A concatenation of four ETI frames, + * whose maximal size is 6144. + * + * If we transmit four frames in one zmq message, + * we do not risk breaking ETI vs. transmission frame + * phase. + * + * The frames are concatenated in buf, and + * their sizes is given in the buflen array. + * + * Most of the time, the buf will not be completely + * filled + */ +struct zmq_dab_message_t +{ +    zmq_dab_message_t() +    { +        /* set buf lengths to invalid */ +        buflen[0] = -1; +        buflen[1] = -1; +        buflen[2] = -1; +        buflen[3] = -1; + +        version = 1; +    } +    uint32_t version; +    uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; +    uint8_t  buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; +}; +  // -------------- ZeroMQ message queue ------------------  class DabOutputZMQ : public DabOutput  {      public:          DabOutputZMQ() : -            zmq_proto_(""), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) +            zmq_proto_(""), zmq_context_(1), +            zmq_pub_sock_(zmq_context_, ZMQ_PUB), +            zmq_message_ix(0)          {              throw std::runtime_error("ERROR: No ZMQ protocol specified !");          }          DabOutputZMQ(std::string zmq_proto) : -            zmq_proto_(zmq_proto), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) +            zmq_proto_(zmq_proto), zmq_context_(1), +            zmq_pub_sock_(zmq_context_, ZMQ_PUB), +            zmq_message_ix(0)          { } -        DabOutputZMQ(const DabOutputZMQ& other) : -            zmq_proto_(other.zmq_proto_), zmq_context_(1), zmq_pub_sock_(zmq_context_, ZMQ_PUB) -        { -            // cannot copy context -        } -          ~DabOutputZMQ()          {              zmq_pub_sock_.close(); @@ -277,9 +309,19 @@ class DabOutputZMQ : public DabOutput          int Write(void* buffer, int size);          int Close();      private: +        DabOutputZMQ(const DabOutputZMQ& other) : +            zmq_proto_(other.zmq_proto_), zmq_context_(1), +            zmq_pub_sock_(zmq_context_, ZMQ_PUB) +        { +            /* Forbid copy constructor */ +        } +          std::string zmq_proto_;          zmq::context_t zmq_context_; // handle for the zmq context          zmq::socket_t zmq_pub_sock_; // handle for the zmq publisher socket + +        zmq_dab_message_t zmq_message; +        int zmq_message_ix;  };  #endif diff --git a/src/dabOutput/dabOutputZMQ.cpp b/src/dabOutput/dabOutputZMQ.cpp index cbb3445..ac26403 100644 --- a/src/dabOutput/dabOutputZMQ.cpp +++ b/src/dabOutput/dabOutputZMQ.cpp @@ -2,9 +2,11 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2013 Matthias P. Braendli +   Copyright (C) 2013, 2014 Matthias P. Braendli     http://mpb.li +    http://opendigitalradio.org +     ZeroMQ output. see www.zeromq.org for more info     From the ZeroMQ manpage 'zmq': @@ -66,9 +68,40 @@ int DabOutputZMQ::Open(const char* endpoint)  int DabOutputZMQ::Write(void* buffer, int size)  { -    int flags = 0; +    int offset = 0; + +    for (int i = 0; i < zmq_message_ix; i++) { +        offset += zmq_message.buflen[i]; +    } + +    if (offset + size > NUM_FRAMES_PER_ZMQ_MESSAGE*6144) { +        throw std::runtime_error("FAULT: invalid ETI frame size!"); +    } +    memcpy(zmq_message.buf + offset, buffer, size); +    zmq_message.buflen[zmq_message_ix] = size; +    zmq_message_ix++; + +    if (zmq_message_ix == NUM_FRAMES_PER_ZMQ_MESSAGE) { +        int full_length = 0; + +        for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +            full_length += zmq_message.buflen[i]; +        } -    return zmq_send(zmq_pub_sock_, buffer, size, flags); +        zmq_message_ix = 0; + +        const int flags = 0; +        zmq_send(zmq_pub_sock_, +                (uint8_t*)&zmq_message, +                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*zmq_message.buflen) + +                full_length, flags); + +        for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +            zmq_message.buflen[i] = -1; +        } +    } + +    return size;  } @@ -78,3 +111,4 @@ int DabOutputZMQ::Close()  }  #endif +  | 
