diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-19 22:32:27 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-19 22:32:27 +0100 |
commit | ca4fb30104c5f883794c40f2516636447ea5dd0f (patch) | |
tree | 052c4f300cb04908feac5812c5a9ef4b6f3571b7 /src/dabInputZmq.cpp | |
parent | 6c482c8f1fdd74f6e7a8a9481b9f2211c559ebad (diff) | |
download | dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.gz dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.bz2 dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.zip |
make DabInputZMQ a new-style input
Diffstat (limited to 'src/dabInputZmq.cpp')
-rw-r--r-- | src/dabInputZmq.cpp | 189 |
1 files changed, 76 insertions, 113 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index b560313..9b61033 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -34,14 +34,15 @@ #include "dabInput.h" #include "dabInputZmq.h" -#include "dabInputFifo.h" #include "StatsServer.h" #include <stdio.h> -#include <zmq.h> +#include <zmq.hpp> #include <list> +#include <exception> #include <string.h> #include <string> +#include <sstream> #include <limits.h> #ifdef __MINGW32__ @@ -52,170 +53,139 @@ extern StatsServer global_stats; -struct dabInputOperations dabInputZmqOperations = { - dabInputZmqInit, - dabInputZmqOpen, - dabInputSetbuf, - NULL, - NULL, - NULL, - dabInputZmqReadFrame, - dabInputSetbitrate, - dabInputZmqClose, - dabInputZmqClean, - NULL -}; - - -int dabInputZmqInit(void** args) +int DabInputZmq::open(const std::string inputUri) { - dabInputZmqData* input = new dabInputZmqData; - input->zmq_context = zmq_ctx_new(); - if (input->zmq_context == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno)); - return 1; + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(inputUri.c_str()); } - - input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB); - if (input->zmq_sock == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - input->prebuffering = INPUT_ZMQ_PREBUFFERING; - - *args = input; - - return 0; -} - - -int dabInputZmqOpen(void* args, const char* inputUri) -{ - dabInputZmqData* input = (dabInputZmqData*)args; - - std::string uri = "tcp://" + std::string(inputUri); - int connect_error = zmq_bind(input->zmq_sock, uri.c_str()); - - if (connect_error < 0) { - etiLog.log(error, "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno)); - return 1; + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - - connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0); - if (connect_error < 0) { - etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - global_stats.registerInput(uri); + // We want to appear in the statistics ! + global_stats.registerInput(m_name); - input->uri = uri; return 0; } - // size corresponds to a frame size. It is constant for a given bitrate -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) +int DabInputZmq::readFrame(void* buffer, int size) { int rc; - dabInputZmqData* input = (dabInputZmqData*)args; /* We must *always* read data from the ZMQ socket, * to make sure that ZMQ internal buffers are emptied * quickly. It's the only way to control the buffers * of the whole path from encoder to our frame_buffer. */ - rc = dabInputZmqReadFromSocket(input, size); + rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { - global_stats.notifyOverrun(input->uri); + if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { + global_stats.notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { - size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; + if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; while (over_max--) { - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } else { /* Our frame_buffer contains DAB logical frames. Five of these make one * AAC superframe. * - * Dropping this superframe amounts to dropping 120ms of audio. */ - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * beloning to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * */ + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } - if (input->prebuffering > 0) { + if (m_prebuffering > 0) { if (rc > 0) - input->prebuffering--; - if (input->prebuffering == 0) + m_prebuffering--; + if (m_prebuffering == 0) etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", - input->uri.c_str()); + m_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ - global_stats.notifyUnderrun(input->uri); + global_stats.notifyUnderrun(m_name); memset(buffer, 0, size); return size; } // Save stats data in bytes, not in frames - global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size); + global_stats.notifyBuffer(m_name, m_frame_buffer.size() * size); - if (input->frame_buffer.empty()) { + if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", - input->uri.c_str()); + m_name.c_str()); // reset prebuffering - input->prebuffering = INPUT_ZMQ_PREBUFFERING; + m_prebuffering = INPUT_ZMQ_PREBUFFERING; /* We have no data to give, we give a zeroed frame */ - global_stats.notifyUnderrun(input->uri); + global_stats.notifyUnderrun(m_name); memset(buffer, 0, size); return size; } else { /* Normal situation, give a frame from the frame_buffer */ - - char* newframe = input->frame_buffer.front(); + char* newframe = m_frame_buffer.front(); memcpy(buffer, newframe, size); delete[] newframe; - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); return size; } } // Read a superframe from the socket, cut it into five frames, and push to list -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) +int DabInputZmq::readFromSocket(int framesize) { int rc; - zmq_msg_t msg; - rc = zmq_msg_init(&msg); - if (rc == -1) { - etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno)); - return 0; - } + int nBytes; + zmq::message_t msg; - int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT); - if (nBytes == -1) { - if (errno != EAGAIN) { - etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno)); + try { + nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (nBytes == 0) { + return 0; } - zmq_msg_close(&msg); - return 0; + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive from zmq socket " << + m_name << ": " << err.what(); } - char* data = (char*)zmq_msg_data(&msg); + char* data = (char*)msg.data(); /* TS 102 563, Section 6: * Audio super frames are transported in five successive DAB logical frames @@ -223,10 +193,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) */ if (nBytes == 5*framesize) { - if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { etiLog.level(warn) << - "inputZMQ " << input->uri << - " buffer full (" << input->frame_buffer.size() << ")," + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; nBytes = 0; } @@ -237,40 +207,33 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) framestart += framesize) { char* frame = new char[framesize]; memcpy(frame, framestart, framesize); - input->frame_buffer.push_back(frame); + m_frame_buffer.push_back(frame); } } } else { etiLog.level(error) << - "inputZMQ " << input->uri << - " wrong data size: recv'd" << nBytes << + "inputZMQ " << m_name << + " wrong data size: recv'd " << nBytes << ", need " << 5*framesize << "."; nBytes = 0; } - zmq_msg_close(&msg); return nBytes; } - -int dabInputZmqClose(void* args) +int DabInputZmq::close() { - dabInputZmqData* input = (dabInputZmqData*)args; - zmq_close(input->zmq_sock); + m_zmq_sock.close(); return 0; } - -int dabInputZmqClean(void** args) +int DabInputZmq::setBitrate(int bitrate) { - dabInputZmqData* input = (dabInputZmqData*)(*args); - zmq_ctx_term(input->zmq_context); - delete input; - return 0; + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here } - #endif |