aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-19 22:32:27 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-19 22:32:27 +0100
commitca4fb30104c5f883794c40f2516636447ea5dd0f (patch)
tree052c4f300cb04908feac5812c5a9ef4b6f3571b7
parent6c482c8f1fdd74f6e7a8a9481b9f2211c559ebad (diff)
downloaddabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.gz
dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.tar.bz2
dabmux-ca4fb30104c5f883794c40f2516636447ea5dd0f.zip
make DabInputZMQ a new-style input
-rw-r--r--src/ParserConfigfile.cpp32
-rw-r--r--src/dabInput.h2
-rw-r--r--src/dabInputZmq.cpp189
-rw-r--r--src/dabInputZmq.h39
4 files changed, 120 insertions, 142 deletions
diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp
index 1ef2b28..14d9e6b 100644
--- a/src/ParserConfigfile.cpp
+++ b/src/ParserConfigfile.cpp
@@ -444,6 +444,11 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->inputName = inputName;
+ /* The input is of the old_style type,
+ * with the struct of function pointers,
+ * and needs to be a DabInputCompatible
+ */
+ bool input_is_old_style = true;
dabInputOperations operations;
dabProtection* protection = &subchan->protection;
@@ -462,6 +467,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
char* proto;
+ char* full_inputName = new char[256];
+ full_inputName[255] = '\0';
+ memcpy(full_inputName, inputName, 255);
+
proto = strstr(inputName, "://");
if (proto == NULL) {
subchan->inputProto = "file";
@@ -479,17 +488,21 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
#if defined(HAVE_INPUT_ZEROMQ)
}
else if (strcmp(subchan->inputProto, "tcp") == 0) {
- operations = dabInputZmqOperations;
+ input_is_old_style = false;
+ subchan->input = new DabInputZmq(subchanuid);
+ subchan->inputName = full_inputName;
}
else if (strcmp(subchan->inputProto, "epmg") == 0) {
- etiLog.log(warn,
- "Using untested epmg:// zeromq input\n");
- operations = dabInputZmqOperations;
+ etiLog.level(warn) << "Using untested epmg:// zeromq input";
+ input_is_old_style = false;
+ subchan->input = new DabInputZmq(subchanuid);
+ subchan->inputName = full_inputName;
}
else if (strcmp(subchan->inputProto, "ipc") == 0) {
- etiLog.log(warn,
- "Using untested ipc:// zeromq input\n");
- operations = dabInputZmqOperations;
+ etiLog.level(warn) << "Using untested ipc:// zeromq input";
+ input_is_old_style = false;
+ subchan->input = new DabInputZmq(subchanuid);
+ subchan->inputName = full_inputName;
#endif // defined(HAVE_INPUT_ZEROMQ)
} else {
stringstream ss;
@@ -731,5 +744,8 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
catch (ptree_error &e) {}
/* Create object */
- subchan->input = new DabInputCompatible(operations);
+ if (input_is_old_style) {
+ subchan->input = new DabInputCompatible(operations);
+ }
+ // else { it's already been created! }
}
diff --git a/src/dabInput.h b/src/dabInput.h
index 0fc04db..4ccbac9 100644
--- a/src/dabInput.h
+++ b/src/dabInput.h
@@ -49,11 +49,9 @@ struct dabInputOperations {
class DabInputBase {
public:
virtual int open(const std::string name) = 0;
- virtual int setbuf(int size) = 0;
virtual int readFrame(void* buffer, int size) = 0;
virtual int setBitrate(int bitrate) = 0;
virtual int close() = 0;
- virtual int rewind() = 0;
virtual ~DabInputBase() {};
};
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index b560313..9b61033 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -34,14 +34,15 @@
#include "dabInput.h"
#include "dabInputZmq.h"
-#include "dabInputFifo.h"
#include "StatsServer.h"
#include <stdio.h>
-#include <zmq.h>
+#include <zmq.hpp>
#include <list>
+#include <exception>
#include <string.h>
#include <string>
+#include <sstream>
#include <limits.h>
#ifdef __MINGW32__
@@ -52,170 +53,139 @@
extern StatsServer global_stats;
-struct dabInputOperations dabInputZmqOperations = {
- dabInputZmqInit,
- dabInputZmqOpen,
- dabInputSetbuf,
- NULL,
- NULL,
- NULL,
- dabInputZmqReadFrame,
- dabInputSetbitrate,
- dabInputZmqClose,
- dabInputZmqClean,
- NULL
-};
-
-
-int dabInputZmqInit(void** args)
+int DabInputZmq::open(const std::string inputUri)
{
- dabInputZmqData* input = new dabInputZmqData;
- input->zmq_context = zmq_ctx_new();
- if (input->zmq_context == NULL) {
- etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno));
- return 1;
+ // Prepare the ZMQ socket to accept connections
+ try {
+ m_zmq_sock.bind(inputUri.c_str());
}
-
- input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB);
- if (input->zmq_sock == NULL) {
- etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno));
- return 1;
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ bind for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
}
- input->prebuffering = INPUT_ZMQ_PREBUFFERING;
-
- *args = input;
-
- return 0;
-}
-
-
-int dabInputZmqOpen(void* args, const char* inputUri)
-{
- dabInputZmqData* input = (dabInputZmqData*)args;
-
- std::string uri = "tcp://" + std::string(inputUri);
- int connect_error = zmq_bind(input->zmq_sock, uri.c_str());
-
- if (connect_error < 0) {
- etiLog.log(error, "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno));
- return 1;
+ try {
+ m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
-
- connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0);
- if (connect_error < 0) {
- etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno));
- return 1;
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set socket options for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
}
- global_stats.registerInput(uri);
+ // We want to appear in the statistics !
+ global_stats.registerInput(m_name);
- input->uri = uri;
return 0;
}
-
// size corresponds to a frame size. It is constant for a given bitrate
-int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
+int DabInputZmq::readFrame(void* buffer, int size)
{
int rc;
- dabInputZmqData* input = (dabInputZmqData*)args;
/* We must *always* read data from the ZMQ socket,
* to make sure that ZMQ internal buffers are emptied
* quickly. It's the only way to control the buffers
* of the whole path from encoder to our frame_buffer.
*/
- rc = dabInputZmqReadFromSocket(input, size);
+ rc = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
- if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
- global_stats.notifyOverrun(input->uri);
+ if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ global_stats.notifyOverrun(m_name);
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length.
*/
- if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
- size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
+ if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
while (over_max--) {
- input->frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
}
}
else {
/* Our frame_buffer contains DAB logical frames. Five of these make one
* AAC superframe.
*
- * Dropping this superframe amounts to dropping 120ms of audio. */
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
+ * Dropping this superframe amounts to dropping 120ms of audio.
+ *
+ * We're actually not sure to drop five DAB logical frames
+ * beloning to the same AAC superframe. It is assumed that no
+ * receiver will crash because of this. At least, the DAB logical frame
+ * vs. AAC superframe alignment is preserved.
+ *
+ * TODO: of course this assumption probably doesn't hold. Fix this !
+ * */
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
}
}
- if (input->prebuffering > 0) {
+ if (m_prebuffering > 0) {
if (rc > 0)
- input->prebuffering--;
- if (input->prebuffering == 0)
+ m_prebuffering--;
+ if (m_prebuffering == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
- input->uri.c_str());
+ m_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
- global_stats.notifyUnderrun(input->uri);
+ global_stats.notifyUnderrun(m_name);
memset(buffer, 0, size);
return size;
}
// Save stats data in bytes, not in frames
- global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size);
+ global_stats.notifyBuffer(m_name, m_frame_buffer.size() * size);
- if (input->frame_buffer.empty()) {
+ if (m_frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
- input->uri.c_str());
+ m_name.c_str());
// reset prebuffering
- input->prebuffering = INPUT_ZMQ_PREBUFFERING;
+ m_prebuffering = INPUT_ZMQ_PREBUFFERING;
/* We have no data to give, we give a zeroed frame */
- global_stats.notifyUnderrun(input->uri);
+ global_stats.notifyUnderrun(m_name);
memset(buffer, 0, size);
return size;
}
else
{
/* Normal situation, give a frame from the frame_buffer */
-
- char* newframe = input->frame_buffer.front();
+ char* newframe = m_frame_buffer.front();
memcpy(buffer, newframe, size);
delete[] newframe;
- input->frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
return size;
}
}
// Read a superframe from the socket, cut it into five frames, and push to list
-int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
+int DabInputZmq::readFromSocket(int framesize)
{
int rc;
- zmq_msg_t msg;
- rc = zmq_msg_init(&msg);
- if (rc == -1) {
- etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno));
- return 0;
- }
+ int nBytes;
+ zmq::message_t msg;
- int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT);
- if (nBytes == -1) {
- if (errno != EAGAIN) {
- etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno));
+ try {
+ nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (nBytes == 0) {
+ return 0;
}
- zmq_msg_close(&msg);
- return 0;
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) << "Failed to receive from zmq socket " <<
+ m_name << ": " << err.what();
}
- char* data = (char*)zmq_msg_data(&msg);
+ char* data = (char*)msg.data();
/* TS 102 563, Section 6:
* Audio super frames are transported in five successive DAB logical frames
@@ -223,10 +193,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
*/
if (nBytes == 5*framesize)
{
- if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
etiLog.level(warn) <<
- "inputZMQ " << input->uri <<
- " buffer full (" << input->frame_buffer.size() << "),"
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming superframe !";
nBytes = 0;
}
@@ -237,40 +207,33 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
framestart += framesize) {
char* frame = new char[framesize];
memcpy(frame, framestart, framesize);
- input->frame_buffer.push_back(frame);
+ m_frame_buffer.push_back(frame);
}
}
}
else
{
etiLog.level(error) <<
- "inputZMQ " << input->uri <<
- " wrong data size: recv'd" << nBytes <<
+ "inputZMQ " << m_name <<
+ " wrong data size: recv'd " << nBytes <<
", need " << 5*framesize << ".";
nBytes = 0;
}
- zmq_msg_close(&msg);
return nBytes;
}
-
-int dabInputZmqClose(void* args)
+int DabInputZmq::close()
{
- dabInputZmqData* input = (dabInputZmqData*)args;
- zmq_close(input->zmq_sock);
+ m_zmq_sock.close();
return 0;
}
-
-int dabInputZmqClean(void** args)
+int DabInputZmq::setBitrate(int bitrate)
{
- dabInputZmqData* input = (dabInputZmqData*)(*args);
- zmq_ctx_term(input->zmq_context);
- delete input;
- return 0;
+ m_bitrate = bitrate;
+ return bitrate; // TODO do a nice check here
}
-
#endif
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index 56708f9..3902963 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -42,11 +42,10 @@
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
-#include <zmq.h>
+#include <zmq.hpp>
#include <list>
#include <string>
#include "dabInput.h"
-#include "dabInputFifo.h"
#include "StatsServer.h"
/* The frame_buffer contains DAB logical frames as defined in
@@ -61,26 +60,28 @@
#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms
-extern struct dabInputOperations dabInputZmqOperations;
-
-struct dabInputZmqData {
- void* zmq_context;
- void* zmq_sock;
- std::list<char*> frame_buffer; //stores elements of type char[<framesize>]
- int prebuffering;
- std::string uri;
-};
+class DabInputZmq : public DabInputBase {
+ public:
+ DabInputZmq(const std::string name)
+ : m_name(name), m_zmq_context(1),
+ m_zmq_sock(m_zmq_context, ZMQ_SUB),
+ m_prebuffering(INPUT_ZMQ_PREBUFFERING) {}
+ virtual int open(const std::string inputUri);
+ virtual int readFrame(void* buffer, int size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
-int dabInputZmqInit(void** args);
-int dabInputZmqOpen(void* args, const char* inputUri);
-int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size);
-int dabInputZmqClose(void* args);
-int dabInputZmqClean(void** args);
-
-// Get new message from ZeroMQ
-int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize);
+ private:
+ int readFromSocket(int framesize);
+ std::string m_name;
+ zmq::context_t m_zmq_context;
+ zmq::socket_t m_zmq_sock; // handle for the zmq socket
+ int m_prebuffering;
+ std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
+ int m_bitrate;
+};
#endif // HAVE_INPUT_ZMQ