From fa36e03255eb668195f043d39307f3dc2fa5e809 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 11 Apr 2014 11:55:43 +0200 Subject: Add zmq buffer options to config file --- src/ParserConfigfile.cpp | 102 ++++++++++++++++++++++++++++++----------------- src/dabInputZmq.h | 18 +++++---- 2 files changed, 77 insertions(+), 43 deletions(-) (limited to 'src') diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index f781b78..6b261e5 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -585,28 +585,43 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, #endif // defined(HAVE_INPUT_FILE) #if defined(HAVE_INPUT_ZEROMQ) } - else if (strcmp(subchan->inputProto, "tcp") == 0) { + else if ((strcmp(subchan->inputProto, "tcp") == 0) || + (strcmp(subchan->inputProto, "epmg") == 0) || + (strcmp(subchan->inputProto, "ipc") == 0) ) { input_is_old_style = false; - DabInputZmqMPEG* inzmq = new DabInputZmqMPEG(subchanuid); - inzmq->enrol_at(*rc); - subchan->input = inzmq; - subchan->inputName = full_inputName; - } - else if (strcmp(subchan->inputProto, "epmg") == 0) { - etiLog.level(warn) << "Using untested epmg:// zeromq input"; - input_is_old_style = false; - DabInputZmqMPEG* inzmq = new DabInputZmqMPEG(subchanuid); - inzmq->enrol_at(*rc); - subchan->input = inzmq; - subchan->inputName = full_inputName; - } - else if (strcmp(subchan->inputProto, "ipc") == 0) { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - input_is_old_style = false; - DabInputZmqMPEG* inzmq = new DabInputZmqMPEG(subchanuid); + + int buffer_size; + int prebuffering; + try { + buffer_size = pt.get("zmq-buffer"); + } + catch (ptree_error &e) { + stringstream ss; + ss << "ZMQ Subchannel with uid " << subchanuid << + " has no zmq-buffer defined!"; + throw runtime_error(ss.str()); + } + try { + prebuffering = pt.get("zmq-prebuffering"); + } + catch (ptree_error &e) { + stringstream ss; + ss << "ZMQ Subchannel with uid " << subchanuid << + " has no zmq-buffer defined!"; + throw runtime_error(ss.str()); + } + DabInputZmqMPEG* inzmq = + new DabInputZmqMPEG(subchanuid, buffer_size, prebuffering); inzmq->enrol_at(*rc); subchan->input = inzmq; subchan->inputName = full_inputName; + + if (strcmp(subchan->inputProto, "epmg") == 0) { + etiLog.level(warn) << "Using untested epmg:// zeromq input"; + } + else if (strcmp(subchan->inputProto, "ipc") == 0) { + etiLog.level(warn) << "Using untested ipc:// zeromq input"; + } #endif // defined(HAVE_INPUT_ZEROMQ) } else { stringstream ss; @@ -643,28 +658,43 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, #endif // defined(HAVE_INPUT_FILE) #if defined(HAVE_INPUT_ZEROMQ) } - else if (strcmp(subchan->inputProto, "tcp") == 0) { + else if ((strcmp(subchan->inputProto, "tcp") == 0) || + (strcmp(subchan->inputProto, "epmg") == 0) || + (strcmp(subchan->inputProto, "ipc") == 0) ) { input_is_old_style = false; - DabInputZmqAAC* inzmq = new DabInputZmqAAC(subchanuid); - inzmq->enrol_at(*rc); - subchan->input = inzmq; - subchan->inputName = full_inputName; - } - else if (strcmp(subchan->inputProto, "epmg") == 0) { - etiLog.level(warn) << "Using untested epmg:// zeromq input"; - input_is_old_style = false; - DabInputZmqAAC* inzmq = new DabInputZmqAAC(subchanuid); - inzmq->enrol_at(*rc); - subchan->input = inzmq; - subchan->inputName = full_inputName; - } - else if (strcmp(subchan->inputProto, "ipc") == 0) { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - input_is_old_style = false; - DabInputZmqAAC* inzmq = new DabInputZmqAAC(subchanuid); + + int buffer_size; + int prebuffering; + try { + buffer_size = pt.get("zmq-buffer"); + } + catch (ptree_error &e) { + stringstream ss; + ss << "ZMQ Subchannel with uid " << subchanuid << + " has no zmq-buffer defined!"; + throw runtime_error(ss.str()); + } + try { + prebuffering = pt.get("zmq-prebuffering"); + } + catch (ptree_error &e) { + stringstream ss; + ss << "ZMQ Subchannel with uid " << subchanuid << + " has no zmq-buffer defined!"; + throw runtime_error(ss.str()); + } + DabInputZmqAAC* inzmq = + new DabInputZmqAAC(subchanuid, buffer_size, prebuffering); inzmq->enrol_at(*rc); subchan->input = inzmq; subchan->inputName = full_inputName; + + if (strcmp(subchan->inputProto, "epmg") == 0) { + etiLog.level(warn) << "Using untested epmg:// zeromq input"; + } + else if (strcmp(subchan->inputProto, "ipc") == 0) { + etiLog.level(warn) << "Using untested ipc:// zeromq input"; + } #endif // defined(HAVE_INPUT_ZEROMQ) } else { stringstream ss; diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 11fb49a..a052815 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -76,13 +76,14 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { public: - DabInputZmqBase(const std::string name) + DabInputZmqBase(const std::string name, + int buffer_size, int prebuffering) : RemoteControllable(name), m_zmq_context(1), m_zmq_sock(m_zmq_context, ZMQ_SUB), - m_bitrate(0), m_prebuffering(INPUT_ZMQ_DEF_PREBUFFERING), + m_bitrate(0), m_prebuffering(prebuffering), m_enable_input(true), - m_frame_buffer_limit(INPUT_ZMQ_DEF_BUFFER_SIZE) { + m_frame_buffer_limit(buffer_size) { RC_ADD_PARAMETER(enable, "If the input is enabled. Set to zero to empty the buffer."); } @@ -119,8 +120,9 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { class DabInputZmqMPEG : public DabInputZmqBase { public: - DabInputZmqMPEG(const std::string name) - : DabInputZmqBase(name) { + DabInputZmqMPEG(const std::string name, + int buffer_size, int prebuffering) + : DabInputZmqBase(name, buffer_size, prebuffering) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [mpeg frames]"); @@ -134,8 +136,9 @@ class DabInputZmqMPEG : public DabInputZmqBase { class DabInputZmqAAC : public DabInputZmqBase { public: - DabInputZmqAAC(const std::string name) - : DabInputZmqBase(name) { + DabInputZmqAAC(const std::string name, + int buffer_size, int prebuffering) + : DabInputZmqBase(name, buffer_size, prebuffering) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [aac superframes]"); @@ -150,3 +153,4 @@ class DabInputZmqAAC : public DabInputZmqBase { #endif // HAVE_INPUT_ZMQ #endif // DAB_INPUT_ZMQ_H + -- cgit v1.2.3