diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ParserConfigfile.cpp | 26 | ||||
-rw-r--r-- | src/dabInputZmq.cpp | 20 | ||||
-rw-r--r-- | src/dabInputZmq.h | 55 |
3 files changed, 71 insertions, 30 deletions
diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index 6b261e5..c59f5e5 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -590,10 +590,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, (strcmp(subchan->inputProto, "ipc") == 0) ) { input_is_old_style = false; - int buffer_size; - int prebuffering; + dab_input_zmq_config_t zmqconfig; + try { - buffer_size = pt.get<int>("zmq-buffer"); + zmqconfig.buffer_size = pt.get<int>("zmq-buffer"); } catch (ptree_error &e) { stringstream ss; @@ -602,7 +602,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, throw runtime_error(ss.str()); } try { - prebuffering = pt.get<int>("zmq-prebuffering"); + zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering"); } catch (ptree_error &e) { stringstream ss; @@ -610,8 +610,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, " has no zmq-buffer defined!"; throw runtime_error(ss.str()); } + zmqconfig.enable_encryption = false; + DabInputZmqMPEG* inzmq = - new DabInputZmqMPEG(subchanuid, buffer_size, prebuffering); + new DabInputZmqMPEG(subchanuid, zmqconfig); inzmq->enrol_at(*rc); subchan->input = inzmq; subchan->inputName = full_inputName; @@ -663,10 +665,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, (strcmp(subchan->inputProto, "ipc") == 0) ) { input_is_old_style = false; - int buffer_size; - int prebuffering; + dab_input_zmq_config_t zmqconfig; + try { - buffer_size = pt.get<int>("zmq-buffer"); + zmqconfig.buffer_size = pt.get<int>("zmq-buffer"); } catch (ptree_error &e) { stringstream ss; @@ -675,7 +677,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, throw runtime_error(ss.str()); } try { - prebuffering = pt.get<int>("zmq-prebuffering"); + zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering"); } catch (ptree_error &e) { stringstream ss; @@ -683,8 +685,12 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, " has no zmq-buffer defined!"; throw runtime_error(ss.str()); } + + zmqconfig.enable_encryption = false; + DabInputZmqAAC* inzmq = - new DabInputZmqAAC(subchanuid, buffer_size, prebuffering); + new DabInputZmqAAC(subchanuid, zmqconfig); + inzmq->enrol_at(*rc); subchan->input = inzmq; subchan->inputName = full_inputName; diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 368d646..753d6da 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -121,15 +121,15 @@ int DabInputZmqBase::readFrame(void* buffer, int size) rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (m_frame_buffer.size() >= m_frame_buffer_limit) { + if (m_frame_buffer.size() >= m_config.buffer_size) { global_stats->notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) { - size_t over_max = m_frame_buffer.size() - m_prebuffering; + if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { + size_t over_max = m_frame_buffer.size() - m_config.prebuffering; while (over_max--) { m_frame_buffer.pop_front(); @@ -178,7 +178,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size) etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", m_name.c_str()); // reset prebuffering - m_prebuf_current = m_prebuffering; + m_prebuf_current = m_config.prebuffering; /* We have no data to give, we give a zeroed frame */ global_stats->notifyUnderrun(m_name); @@ -222,7 +222,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize) if (msg.size() == framesize) { - if (m_frame_buffer.size() > m_frame_buffer_limit) { + if (m_frame_buffer.size() > m_config.buffer_size) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," @@ -280,7 +280,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize) */ if (msg.size() == 5*framesize) { - if (m_frame_buffer.size() > m_frame_buffer_limit) { + if (m_frame_buffer.size() > m_config.buffer_size) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," @@ -331,7 +331,7 @@ void DabInputZmqBase::set_parameter(const string& parameter, " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); } - m_frame_buffer_limit = new_limit; + m_config.buffer_size = new_limit; } else if (parameter == "prebuffering") { size_t new_prebuf = atol(value.c_str()); @@ -345,7 +345,7 @@ void DabInputZmqBase::set_parameter(const string& parameter, " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); } - m_prebuffering = new_prebuf; + m_config.prebuffering = new_prebuf; } else if (parameter == "enable") { if (value == "1") { @@ -370,10 +370,10 @@ const string DabInputZmqBase::get_parameter(const string& parameter) const { stringstream ss; if (parameter == "buffer") { - ss << m_frame_buffer_limit; + ss << m_config.buffer_size; } else if (parameter == "prebuffering") { - ss << m_prebuffering; + ss << m_config.prebuffering; } else if (parameter == "enable") { if (m_enable_input) diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index f41defa..50357f5 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -76,16 +76,50 @@ // want. #define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s +struct dab_input_zmq_config_t +{ + /* The size of the internal buffer, measured in number + * of elements. + * + * Each element corresponds to five frames, + * or one AAC superframe. + */ + int buffer_size; + + /* The amount of prebuffering to do before we start streaming + * + * Same units as buffer_size + */ + int prebuffering; + + /* Whether to enforce encryption or not + */ + bool enable_encryption; + + /* Full path to file containing public key. + */ + std::string curve_public_keyfile; + + /* Full path to file containing secret key. + */ + std::string curve_secret_keyfile; + + /* Full path to file containing encoder public key. + */ + std::string curve_encoder_keyfile; +}; + class DabInputZmqBase : public DabInputBase, public RemoteControllable { public: DabInputZmqBase(const std::string name, - int buffer_size, int prebuffering) + dab_input_zmq_config_t config) : RemoteControllable(name), m_zmq_context(1), m_zmq_sock(m_zmq_context, ZMQ_SUB), - m_bitrate(0), m_prebuffering(prebuffering), + m_bitrate(0), m_enable_input(true), - m_frame_buffer_limit(buffer_size) { + m_config(config), + m_prebuf_current(0) { RC_ADD_PARAMETER(enable, "If the input is enabled. Set to zero to empty the buffer."); } @@ -108,13 +142,14 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { zmq::context_t m_zmq_context; zmq::socket_t m_zmq_sock; // handle for the zmq socket int m_bitrate; - int m_prebuffering; /* set this to zero to empty the input buffer */ bool m_enable_input; - size_t m_frame_buffer_limit; - std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] + /* stores elements of type char[<superframesize>] */ + std::list<char*> m_frame_buffer; + + dab_input_zmq_config_t m_config; private: int m_prebuf_current; @@ -123,8 +158,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { class DabInputZmqMPEG : public DabInputZmqBase { public: DabInputZmqMPEG(const std::string name, - int buffer_size, int prebuffering) - : DabInputZmqBase(name, buffer_size, prebuffering) { + dab_input_zmq_config_t config) + : DabInputZmqBase(name, config) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [mpeg frames]"); @@ -139,8 +174,8 @@ class DabInputZmqMPEG : public DabInputZmqBase { class DabInputZmqAAC : public DabInputZmqBase { public: DabInputZmqAAC(const std::string name, - int buffer_size, int prebuffering) - : DabInputZmqBase(name, buffer_size, prebuffering) { + dab_input_zmq_config_t config) + : DabInputZmqBase(name, config) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [aac superframes]"); |