diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-19 22:32:27 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-19 22:32:27 +0100 | 
| commit | ca4fb30104c5f883794c40f2516636447ea5dd0f (patch) | |
| tree | 052c4f300cb04908feac5812c5a9ef4b6f3571b7 /src | |
| parent | 6c482c8f1fdd74f6e7a8a9481b9f2211c559ebad (diff) | |
| download | dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.gz dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.bz2 dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.zip | |
make DabInputZMQ a new-style input
Diffstat (limited to 'src')
| -rw-r--r-- | src/ParserConfigfile.cpp | 32 | ||||
| -rw-r--r-- | src/dabInput.h | 2 | ||||
| -rw-r--r-- | src/dabInputZmq.cpp | 189 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 39 | 
4 files changed, 120 insertions, 142 deletions
| diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index 1ef2b28..14d9e6b 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -444,6 +444,11 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,      subchan->inputName = inputName; +    /* The input is of the old_style type, +     * with the struct of function pointers, +     * and needs to be a DabInputCompatible +     */ +    bool input_is_old_style = true;      dabInputOperations operations;      dabProtection* protection = &subchan->protection; @@ -462,6 +467,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,          char* proto; +        char* full_inputName = new char[256]; +        full_inputName[255] = '\0'; +        memcpy(full_inputName, inputName, 255); +          proto = strstr(inputName, "://");          if (proto == NULL) {              subchan->inputProto = "file"; @@ -479,17 +488,21 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,  #if defined(HAVE_INPUT_ZEROMQ)          }          else if (strcmp(subchan->inputProto, "tcp") == 0) { -            operations = dabInputZmqOperations; +            input_is_old_style = false; +            subchan->input = new DabInputZmq(subchanuid); +            subchan->inputName = full_inputName;          }          else if (strcmp(subchan->inputProto, "epmg") == 0) { -            etiLog.log(warn, -                    "Using untested epmg:// zeromq input\n"); -            operations = dabInputZmqOperations; +            etiLog.level(warn) << "Using untested epmg:// zeromq input"; +            input_is_old_style = false; +            subchan->input = new DabInputZmq(subchanuid); +            subchan->inputName = full_inputName;          }          else if (strcmp(subchan->inputProto, "ipc") == 0) { -            etiLog.log(warn, -                    "Using untested ipc:// zeromq input\n"); -            operations = dabInputZmqOperations; +            etiLog.level(warn) << "Using untested ipc:// zeromq input"; +            input_is_old_style = false; +            subchan->input = new DabInputZmq(subchanuid); +            subchan->inputName = full_inputName;  #endif // defined(HAVE_INPUT_ZEROMQ)          } else {              stringstream ss; @@ -731,5 +744,8 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,      catch (ptree_error &e) {}      /* Create object */ -    subchan->input = new DabInputCompatible(operations); +    if (input_is_old_style) { +        subchan->input = new DabInputCompatible(operations); +    } +    // else { it's already been created! }  } diff --git a/src/dabInput.h b/src/dabInput.h index 0fc04db..4ccbac9 100644 --- a/src/dabInput.h +++ b/src/dabInput.h @@ -49,11 +49,9 @@ struct dabInputOperations {  class DabInputBase {      public:          virtual int open(const std::string name) = 0; -        virtual int setbuf(int size) = 0;          virtual int readFrame(void* buffer, int size) = 0;          virtual int setBitrate(int bitrate) = 0;          virtual int close() = 0; -        virtual int rewind() = 0;          virtual ~DabInputBase() {};  }; diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index b560313..9b61033 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -34,14 +34,15 @@  #include "dabInput.h"  #include "dabInputZmq.h" -#include "dabInputFifo.h"  #include "StatsServer.h"  #include <stdio.h> -#include <zmq.h> +#include <zmq.hpp>  #include <list> +#include <exception>  #include <string.h>  #include <string> +#include <sstream>  #include <limits.h>  #ifdef __MINGW32__ @@ -52,170 +53,139 @@  extern StatsServer global_stats; -struct dabInputOperations dabInputZmqOperations = { -    dabInputZmqInit, -    dabInputZmqOpen, -    dabInputSetbuf, -    NULL, -    NULL, -    NULL, -    dabInputZmqReadFrame, -    dabInputSetbitrate, -    dabInputZmqClose, -    dabInputZmqClean, -    NULL -}; - - -int dabInputZmqInit(void** args) +int DabInputZmq::open(const std::string inputUri)  { -    dabInputZmqData* input = new dabInputZmqData; -    input->zmq_context = zmq_ctx_new(); -    if (input->zmq_context == NULL) { -        etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno)); -        return 1; +    // Prepare the ZMQ socket to accept connections +    try { +        m_zmq_sock.bind(inputUri.c_str());      } - -    input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB); -    if (input->zmq_sock == NULL) { -        etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno)); -        return 1; +    catch (zmq::error_t& err) { +        std::ostringstream os; +        os << "ZMQ bind for input " << m_name << " failed"; +        throw std::runtime_error(os.str());      } -    input->prebuffering = INPUT_ZMQ_PREBUFFERING; - -    *args = input; - -    return 0; -} - - -int dabInputZmqOpen(void* args, const char* inputUri) -{ -    dabInputZmqData* input = (dabInputZmqData*)args; - -    std::string uri = "tcp://" + std::string(inputUri); -    int connect_error = zmq_bind(input->zmq_sock, uri.c_str()); - -    if (connect_error < 0) { -        etiLog.log(error,  "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno)); -        return 1; +    try { +        m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);      } - -    connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0); -    if (connect_error < 0) { -        etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno)); -        return 1; +    catch (zmq::error_t& err) { +        std::ostringstream os; +        os << "ZMQ set socket options for input " << m_name << " failed"; +        throw std::runtime_error(os.str());      } -    global_stats.registerInput(uri); +    // We want to appear in the statistics ! +    global_stats.registerInput(m_name); -    input->uri = uri;      return 0;  } -  // size corresponds to a frame size. It is constant for a given bitrate -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) +int DabInputZmq::readFrame(void* buffer, int size)  {      int rc; -    dabInputZmqData* input = (dabInputZmqData*)args;      /* We must *always* read data from the ZMQ socket,       * to make sure that ZMQ internal buffers are emptied       * quickly. It's the only way to control the buffers       * of the whole path from encoder to our frame_buffer.       */ -    rc = dabInputZmqReadFromSocket(input, size); +    rc = readFromSocket(size);      /* Notify of a buffer overrun, and drop some frames */ -    if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { -        global_stats.notifyOverrun(input->uri); +    if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { +        global_stats.notifyOverrun(m_name);          /* If the buffer is really too full, we drop as many frames as needed           * to get down to the prebuffering size. We would like to have our buffer           * filled to the prebuffering length.           */ -        if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { -            size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; +        if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { +            size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;              while (over_max--) { -                input->frame_buffer.pop_front(); +                m_frame_buffer.pop_front();              }          }          else {              /* Our frame_buffer contains DAB logical frames. Five of these make one               * AAC superframe.               * -             * Dropping this superframe amounts to dropping 120ms of audio. */ -            input->frame_buffer.pop_front(); -            input->frame_buffer.pop_front(); -            input->frame_buffer.pop_front(); -            input->frame_buffer.pop_front(); -            input->frame_buffer.pop_front(); +             * Dropping this superframe amounts to dropping 120ms of audio. +             * +             * We're actually not sure to drop five DAB logical frames +             * beloning to the same AAC superframe. It is assumed that no +             * receiver will crash because of this. At least, the DAB logical frame +             * vs. AAC superframe alignment is preserved. +             * +             * TODO: of course this assumption probably doesn't hold. Fix this ! +             * */ +            m_frame_buffer.pop_front(); +            m_frame_buffer.pop_front(); +            m_frame_buffer.pop_front(); +            m_frame_buffer.pop_front(); +            m_frame_buffer.pop_front();          }      } -    if (input->prebuffering > 0) { +    if (m_prebuffering > 0) {          if (rc > 0) -            input->prebuffering--; -        if (input->prebuffering == 0) +            m_prebuffering--; +        if (m_prebuffering == 0)              etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", -                input->uri.c_str()); +                m_name.c_str());          /* During prebuffering, give a zeroed frame to the mux */ -        global_stats.notifyUnderrun(input->uri); +        global_stats.notifyUnderrun(m_name);          memset(buffer, 0, size);          return size;      }      // Save stats data in bytes, not in frames -    global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size); +    global_stats.notifyBuffer(m_name, m_frame_buffer.size() * size); -    if (input->frame_buffer.empty()) { +    if (m_frame_buffer.empty()) {          etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", -                input->uri.c_str()); +                m_name.c_str());          // reset prebuffering -        input->prebuffering = INPUT_ZMQ_PREBUFFERING; +        m_prebuffering = INPUT_ZMQ_PREBUFFERING;          /* We have no data to give, we give a zeroed frame */ -        global_stats.notifyUnderrun(input->uri); +        global_stats.notifyUnderrun(m_name);          memset(buffer, 0, size);          return size;      }      else      {          /* Normal situation, give a frame from the frame_buffer */ - -        char* newframe = input->frame_buffer.front(); +        char* newframe = m_frame_buffer.front();          memcpy(buffer, newframe, size);          delete[] newframe; -        input->frame_buffer.pop_front(); +        m_frame_buffer.pop_front();          return size;      }  }  // Read a superframe from the socket, cut it into five frames, and push to list -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) +int DabInputZmq::readFromSocket(int framesize)  {      int rc; -    zmq_msg_t msg; -    rc = zmq_msg_init(&msg); -    if (rc == -1) { -        etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno)); -        return 0; -    } +    int nBytes; +    zmq::message_t msg; -    int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT); -    if (nBytes == -1) { -        if (errno != EAGAIN) { -            etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno)); +    try { +        nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); +        if (nBytes == 0) { +            return 0;          } -        zmq_msg_close(&msg); -        return 0; +    } +    catch (zmq::error_t& err) +    { +        etiLog.level(error) << "Failed to receive from zmq socket " << +                m_name << ": " << err.what();      } -    char* data = (char*)zmq_msg_data(&msg); +    char* data = (char*)msg.data();      /* TS 102 563, Section 6:       * Audio super frames are transported in five successive DAB logical frames @@ -223,10 +193,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)       */      if (nBytes == 5*framesize)      { -        if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { +        if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {              etiLog.level(warn) << -                "inputZMQ " << input->uri << -                " buffer full (" << input->frame_buffer.size() << ")," +                "inputZMQ " << m_name << +                " buffer full (" << m_frame_buffer.size() << "),"                  " dropping incoming superframe !";              nBytes = 0;          } @@ -237,40 +207,33 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)                      framestart += framesize) {                  char* frame = new char[framesize];                  memcpy(frame, framestart, framesize); -                input->frame_buffer.push_back(frame); +                m_frame_buffer.push_back(frame);              }          }      }      else      {          etiLog.level(error) << -            "inputZMQ " << input->uri << -            " wrong data size: recv'd" << nBytes << +            "inputZMQ " << m_name << +            " wrong data size: recv'd " << nBytes <<              ", need " << 5*framesize << ".";          nBytes = 0;      } -    zmq_msg_close(&msg);      return nBytes;  } - -int dabInputZmqClose(void* args) +int DabInputZmq::close()  { -    dabInputZmqData* input = (dabInputZmqData*)args; -    zmq_close(input->zmq_sock); +    m_zmq_sock.close();      return 0;  } - -int dabInputZmqClean(void** args) +int DabInputZmq::setBitrate(int bitrate)  { -    dabInputZmqData* input = (dabInputZmqData*)(*args); -    zmq_ctx_term(input->zmq_context); -    delete input; -    return 0; +    m_bitrate = bitrate; +    return bitrate; // TODO do a nice check here  } -  #endif diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 56708f9..3902963 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -42,11 +42,10 @@  #ifdef HAVE_CONFIG_H  #   include "config.h"  #endif -#include <zmq.h> +#include <zmq.hpp>  #include <list>  #include <string>  #include "dabInput.h" -#include "dabInputFifo.h"  #include "StatsServer.h"  /* The frame_buffer contains DAB logical frames as defined in @@ -61,26 +60,28 @@  #define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms -extern struct dabInputOperations dabInputZmqOperations; - -struct dabInputZmqData { -    void* zmq_context; -    void* zmq_sock; -    std::list<char*> frame_buffer; //stores elements of type char[<framesize>] -    int prebuffering; -    std::string uri; -}; +class DabInputZmq : public DabInputBase { +    public: +        DabInputZmq(const std::string name) +            : m_name(name), m_zmq_context(1), +            m_zmq_sock(m_zmq_context, ZMQ_SUB), +            m_prebuffering(INPUT_ZMQ_PREBUFFERING) {} +        virtual int open(const std::string inputUri); +        virtual int readFrame(void* buffer, int size); +        virtual int setBitrate(int bitrate); +        virtual int close(); -int dabInputZmqInit(void** args); -int dabInputZmqOpen(void* args, const char* inputUri); -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputZmqClose(void* args); -int dabInputZmqClean(void** args); - -// Get new message from ZeroMQ -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize); +    private: +        int readFromSocket(int framesize); +        std::string m_name; +        zmq::context_t m_zmq_context; +        zmq::socket_t m_zmq_sock; // handle for the zmq socket +        int m_prebuffering; +        std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] +        int m_bitrate; +};  #endif // HAVE_INPUT_ZMQ | 
