diff options
-rw-r--r-- | src/ParserConfigfile.cpp | 32 | ||||
-rw-r--r-- | src/dabInput.h | 2 | ||||
-rw-r--r-- | src/dabInputZmq.cpp | 189 | ||||
-rw-r--r-- | src/dabInputZmq.h | 39 |
4 files changed, 120 insertions, 142 deletions
diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index 1ef2b28..14d9e6b 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -444,6 +444,11 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->inputName = inputName; + /* The input is of the old_style type, + * with the struct of function pointers, + * and needs to be a DabInputCompatible + */ + bool input_is_old_style = true; dabInputOperations operations; dabProtection* protection = &subchan->protection; @@ -462,6 +467,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, char* proto; + char* full_inputName = new char[256]; + full_inputName[255] = '\0'; + memcpy(full_inputName, inputName, 255); + proto = strstr(inputName, "://"); if (proto == NULL) { subchan->inputProto = "file"; @@ -479,17 +488,21 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, #if defined(HAVE_INPUT_ZEROMQ) } else if (strcmp(subchan->inputProto, "tcp") == 0) { - operations = dabInputZmqOperations; + input_is_old_style = false; + subchan->input = new DabInputZmq(subchanuid); + subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "epmg") == 0) { - etiLog.log(warn, - "Using untested epmg:// zeromq input\n"); - operations = dabInputZmqOperations; + etiLog.level(warn) << "Using untested epmg:// zeromq input"; + input_is_old_style = false; + subchan->input = new DabInputZmq(subchanuid); + subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "ipc") == 0) { - etiLog.log(warn, - "Using untested ipc:// zeromq input\n"); - operations = dabInputZmqOperations; + etiLog.level(warn) << "Using untested ipc:// zeromq input"; + input_is_old_style = false; + subchan->input = new DabInputZmq(subchanuid); + subchan->inputName = full_inputName; #endif // defined(HAVE_INPUT_ZEROMQ) } else { stringstream ss; @@ -731,5 +744,8 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, catch (ptree_error &e) {} /* Create object */ - subchan->input = new DabInputCompatible(operations); + if (input_is_old_style) { + subchan->input = new DabInputCompatible(operations); + } + // else { it's already been created! } } diff --git a/src/dabInput.h b/src/dabInput.h index 0fc04db..4ccbac9 100644 --- a/src/dabInput.h +++ b/src/dabInput.h @@ -49,11 +49,9 @@ struct dabInputOperations { class DabInputBase { public: virtual int open(const std::string name) = 0; - virtual int setbuf(int size) = 0; virtual int readFrame(void* buffer, int size) = 0; virtual int setBitrate(int bitrate) = 0; virtual int close() = 0; - virtual int rewind() = 0; virtual ~DabInputBase() {}; }; diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index b560313..9b61033 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -34,14 +34,15 @@ #include "dabInput.h" #include "dabInputZmq.h" -#include "dabInputFifo.h" #include "StatsServer.h" #include <stdio.h> -#include <zmq.h> +#include <zmq.hpp> #include <list> +#include <exception> #include <string.h> #include <string> +#include <sstream> #include <limits.h> #ifdef __MINGW32__ @@ -52,170 +53,139 @@ extern StatsServer global_stats; -struct dabInputOperations dabInputZmqOperations = { - dabInputZmqInit, - dabInputZmqOpen, - dabInputSetbuf, - NULL, - NULL, - NULL, - dabInputZmqReadFrame, - dabInputSetbitrate, - dabInputZmqClose, - dabInputZmqClean, - NULL -}; - - -int dabInputZmqInit(void** args) +int DabInputZmq::open(const std::string inputUri) { - dabInputZmqData* input = new dabInputZmqData; - input->zmq_context = zmq_ctx_new(); - if (input->zmq_context == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno)); - return 1; + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(inputUri.c_str()); } - - input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB); - if (input->zmq_sock == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - input->prebuffering = INPUT_ZMQ_PREBUFFERING; - - *args = input; - - return 0; -} - - -int dabInputZmqOpen(void* args, const char* inputUri) -{ - dabInputZmqData* input = (dabInputZmqData*)args; - - std::string uri = "tcp://" + std::string(inputUri); - int connect_error = zmq_bind(input->zmq_sock, uri.c_str()); - - if (connect_error < 0) { - etiLog.log(error, "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno)); - return 1; + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - - connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0); - if (connect_error < 0) { - etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - global_stats.registerInput(uri); + // We want to appear in the statistics ! + global_stats.registerInput(m_name); - input->uri = uri; return 0; } - // size corresponds to a frame size. It is constant for a given bitrate -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) +int DabInputZmq::readFrame(void* buffer, int size) { int rc; - dabInputZmqData* input = (dabInputZmqData*)args; /* We must *always* read data from the ZMQ socket, * to make sure that ZMQ internal buffers are emptied * quickly. It's the only way to control the buffers * of the whole path from encoder to our frame_buffer. */ - rc = dabInputZmqReadFromSocket(input, size); + rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { - global_stats.notifyOverrun(input->uri); + if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { + global_stats.notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { - size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; + if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; while (over_max--) { - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } else { /* Our frame_buffer contains DAB logical frames. Five of these make one * AAC superframe. * - * Dropping this superframe amounts to dropping 120ms of audio. */ - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * beloning to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * */ + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } - if (input->prebuffering > 0) { + if (m_prebuffering > 0) { if (rc > 0) - input->prebuffering--; - if (input->prebuffering == 0) + m_prebuffering--; + if (m_prebuffering == 0) etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", - input->uri.c_str()); + m_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ - global_stats.notifyUnderrun(input->uri); + global_stats.notifyUnderrun(m_name); memset(buffer, 0, size); return size; } // Save stats data in bytes, not in frames - global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size); + global_stats.notifyBuffer(m_name, m_frame_buffer.size() * size); - if (input->frame_buffer.empty()) { + if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", - input->uri.c_str()); + m_name.c_str()); // reset prebuffering - input->prebuffering = INPUT_ZMQ_PREBUFFERING; + m_prebuffering = INPUT_ZMQ_PREBUFFERING; /* We have no data to give, we give a zeroed frame */ - global_stats.notifyUnderrun(input->uri); + global_stats.notifyUnderrun(m_name); memset(buffer, 0, size); return size; } else { /* Normal situation, give a frame from the frame_buffer */ - - char* newframe = input->frame_buffer.front(); + char* newframe = m_frame_buffer.front(); memcpy(buffer, newframe, size); delete[] newframe; - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); return size; } } // Read a superframe from the socket, cut it into five frames, and push to list -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) +int DabInputZmq::readFromSocket(int framesize) { int rc; - zmq_msg_t msg; - rc = zmq_msg_init(&msg); - if (rc == -1) { - etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno)); - return 0; - } + int nBytes; + zmq::message_t msg; - int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT); - if (nBytes == -1) { - if (errno != EAGAIN) { - etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno)); + try { + nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (nBytes == 0) { + return 0; } - zmq_msg_close(&msg); - return 0; + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive from zmq socket " << + m_name << ": " << err.what(); } - char* data = (char*)zmq_msg_data(&msg); + char* data = (char*)msg.data(); /* TS 102 563, Section 6: * Audio super frames are transported in five successive DAB logical frames @@ -223,10 +193,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) */ if (nBytes == 5*framesize) { - if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { etiLog.level(warn) << - "inputZMQ " << input->uri << - " buffer full (" << input->frame_buffer.size() << ")," + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; nBytes = 0; } @@ -237,40 +207,33 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) framestart += framesize) { char* frame = new char[framesize]; memcpy(frame, framestart, framesize); - input->frame_buffer.push_back(frame); + m_frame_buffer.push_back(frame); } } } else { etiLog.level(error) << - "inputZMQ " << input->uri << - " wrong data size: recv'd" << nBytes << + "inputZMQ " << m_name << + " wrong data size: recv'd " << nBytes << ", need " << 5*framesize << "."; nBytes = 0; } - zmq_msg_close(&msg); return nBytes; } - -int dabInputZmqClose(void* args) +int DabInputZmq::close() { - dabInputZmqData* input = (dabInputZmqData*)args; - zmq_close(input->zmq_sock); + m_zmq_sock.close(); return 0; } - -int dabInputZmqClean(void** args) +int DabInputZmq::setBitrate(int bitrate) { - dabInputZmqData* input = (dabInputZmqData*)(*args); - zmq_ctx_term(input->zmq_context); - delete input; - return 0; + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here } - #endif diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 56708f9..3902963 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -42,11 +42,10 @@ #ifdef HAVE_CONFIG_H # include "config.h" #endif -#include <zmq.h> +#include <zmq.hpp> #include <list> #include <string> #include "dabInput.h" -#include "dabInputFifo.h" #include "StatsServer.h" /* The frame_buffer contains DAB logical frames as defined in @@ -61,26 +60,28 @@ #define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms -extern struct dabInputOperations dabInputZmqOperations; - -struct dabInputZmqData { - void* zmq_context; - void* zmq_sock; - std::list<char*> frame_buffer; //stores elements of type char[<framesize>] - int prebuffering; - std::string uri; -}; +class DabInputZmq : public DabInputBase { + public: + DabInputZmq(const std::string name) + : m_name(name), m_zmq_context(1), + m_zmq_sock(m_zmq_context, ZMQ_SUB), + m_prebuffering(INPUT_ZMQ_PREBUFFERING) {} + virtual int open(const std::string inputUri); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); -int dabInputZmqInit(void** args); -int dabInputZmqOpen(void* args, const char* inputUri); -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputZmqClose(void* args); -int dabInputZmqClean(void** args); - -// Get new message from ZeroMQ -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize); + private: + int readFromSocket(int framesize); + std::string m_name; + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; // handle for the zmq socket + int m_prebuffering; + std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] + int m_bitrate; +}; #endif // HAVE_INPUT_ZMQ |