aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ParserConfigfile.cpp26
-rw-r--r--src/dabInputZmq.cpp20
-rw-r--r--src/dabInputZmq.h55
3 files changed, 71 insertions, 30 deletions
diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp
index 6b261e5..c59f5e5 100644
--- a/src/ParserConfigfile.cpp
+++ b/src/ParserConfigfile.cpp
@@ -590,10 +590,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
(strcmp(subchan->inputProto, "ipc") == 0) ) {
input_is_old_style = false;
- int buffer_size;
- int prebuffering;
+ dab_input_zmq_config_t zmqconfig;
+
try {
- buffer_size = pt.get<int>("zmq-buffer");
+ zmqconfig.buffer_size = pt.get<int>("zmq-buffer");
}
catch (ptree_error &e) {
stringstream ss;
@@ -602,7 +602,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
throw runtime_error(ss.str());
}
try {
- prebuffering = pt.get<int>("zmq-prebuffering");
+ zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering");
}
catch (ptree_error &e) {
stringstream ss;
@@ -610,8 +610,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
" has no zmq-buffer defined!";
throw runtime_error(ss.str());
}
+ zmqconfig.enable_encryption = false;
+
DabInputZmqMPEG* inzmq =
- new DabInputZmqMPEG(subchanuid, buffer_size, prebuffering);
+ new DabInputZmqMPEG(subchanuid, zmqconfig);
inzmq->enrol_at(*rc);
subchan->input = inzmq;
subchan->inputName = full_inputName;
@@ -663,10 +665,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
(strcmp(subchan->inputProto, "ipc") == 0) ) {
input_is_old_style = false;
- int buffer_size;
- int prebuffering;
+ dab_input_zmq_config_t zmqconfig;
+
try {
- buffer_size = pt.get<int>("zmq-buffer");
+ zmqconfig.buffer_size = pt.get<int>("zmq-buffer");
}
catch (ptree_error &e) {
stringstream ss;
@@ -675,7 +677,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
throw runtime_error(ss.str());
}
try {
- prebuffering = pt.get<int>("zmq-prebuffering");
+ zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering");
}
catch (ptree_error &e) {
stringstream ss;
@@ -683,8 +685,12 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
" has no zmq-buffer defined!";
throw runtime_error(ss.str());
}
+
+ zmqconfig.enable_encryption = false;
+
DabInputZmqAAC* inzmq =
- new DabInputZmqAAC(subchanuid, buffer_size, prebuffering);
+ new DabInputZmqAAC(subchanuid, zmqconfig);
+
inzmq->enrol_at(*rc);
subchan->input = inzmq;
subchan->inputName = full_inputName;
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 368d646..753d6da 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -121,15 +121,15 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
rc = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
- if (m_frame_buffer.size() >= m_frame_buffer_limit) {
+ if (m_frame_buffer.size() >= m_config.buffer_size) {
global_stats->notifyOverrun(m_name);
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length.
*/
- if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) {
- size_t over_max = m_frame_buffer.size() - m_prebuffering;
+ if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) {
+ size_t over_max = m_frame_buffer.size() - m_config.prebuffering;
while (over_max--) {
m_frame_buffer.pop_front();
@@ -178,7 +178,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
m_name.c_str());
// reset prebuffering
- m_prebuf_current = m_prebuffering;
+ m_prebuf_current = m_config.prebuffering;
/* We have no data to give, we give a zeroed frame */
global_stats->notifyUnderrun(m_name);
@@ -222,7 +222,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize)
if (msg.size() == framesize)
{
- if (m_frame_buffer.size() > m_frame_buffer_limit) {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
@@ -280,7 +280,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize)
*/
if (msg.size() == 5*framesize)
{
- if (m_frame_buffer.size() > m_frame_buffer_limit) {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
@@ -331,7 +331,7 @@ void DabInputZmqBase::set_parameter(const string& parameter,
" Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
}
- m_frame_buffer_limit = new_limit;
+ m_config.buffer_size = new_limit;
}
else if (parameter == "prebuffering") {
size_t new_prebuf = atol(value.c_str());
@@ -345,7 +345,7 @@ void DabInputZmqBase::set_parameter(const string& parameter,
" Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
}
- m_prebuffering = new_prebuf;
+ m_config.prebuffering = new_prebuf;
}
else if (parameter == "enable") {
if (value == "1") {
@@ -370,10 +370,10 @@ const string DabInputZmqBase::get_parameter(const string& parameter) const
{
stringstream ss;
if (parameter == "buffer") {
- ss << m_frame_buffer_limit;
+ ss << m_config.buffer_size;
}
else if (parameter == "prebuffering") {
- ss << m_prebuffering;
+ ss << m_config.prebuffering;
}
else if (parameter == "enable") {
if (m_enable_input)
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index f41defa..50357f5 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -76,16 +76,50 @@
// want.
#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s
+struct dab_input_zmq_config_t
+{
+ /* The size of the internal buffer, measured in number
+ * of elements.
+ *
+ * Each element corresponds to five frames,
+ * or one AAC superframe.
+ */
+ int buffer_size;
+
+ /* The amount of prebuffering to do before we start streaming
+ *
+ * Same units as buffer_size
+ */
+ int prebuffering;
+
+ /* Whether to enforce encryption or not
+ */
+ bool enable_encryption;
+
+ /* Full path to file containing public key.
+ */
+ std::string curve_public_keyfile;
+
+ /* Full path to file containing secret key.
+ */
+ std::string curve_secret_keyfile;
+
+ /* Full path to file containing encoder public key.
+ */
+ std::string curve_encoder_keyfile;
+};
+
class DabInputZmqBase : public DabInputBase, public RemoteControllable {
public:
DabInputZmqBase(const std::string name,
- int buffer_size, int prebuffering)
+ dab_input_zmq_config_t config)
: RemoteControllable(name),
m_zmq_context(1),
m_zmq_sock(m_zmq_context, ZMQ_SUB),
- m_bitrate(0), m_prebuffering(prebuffering),
+ m_bitrate(0),
m_enable_input(true),
- m_frame_buffer_limit(buffer_size) {
+ m_config(config),
+ m_prebuf_current(0) {
RC_ADD_PARAMETER(enable,
"If the input is enabled. Set to zero to empty the buffer.");
}
@@ -108,13 +142,14 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
zmq::context_t m_zmq_context;
zmq::socket_t m_zmq_sock; // handle for the zmq socket
int m_bitrate;
- int m_prebuffering;
/* set this to zero to empty the input buffer */
bool m_enable_input;
- size_t m_frame_buffer_limit;
- std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
+ /* stores elements of type char[<superframesize>] */
+ std::list<char*> m_frame_buffer;
+
+ dab_input_zmq_config_t m_config;
private:
int m_prebuf_current;
@@ -123,8 +158,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
class DabInputZmqMPEG : public DabInputZmqBase {
public:
DabInputZmqMPEG(const std::string name,
- int buffer_size, int prebuffering)
- : DabInputZmqBase(name, buffer_size, prebuffering) {
+ dab_input_zmq_config_t config)
+ : DabInputZmqBase(name, config) {
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [mpeg frames]");
@@ -139,8 +174,8 @@ class DabInputZmqMPEG : public DabInputZmqBase {
class DabInputZmqAAC : public DabInputZmqBase {
public:
DabInputZmqAAC(const std::string name,
- int buffer_size, int prebuffering)
- : DabInputZmqBase(name, buffer_size, prebuffering) {
+ dab_input_zmq_config_t config)
+ : DabInputZmqBase(name, config) {
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [aac superframes]");