diff options
| -rw-r--r-- | src/ParserConfigfile.cpp | 26 | ||||
| -rw-r--r-- | src/dabInputZmq.cpp | 20 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 55 | 
3 files changed, 71 insertions, 30 deletions
| diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index 6b261e5..c59f5e5 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -590,10 +590,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                  (strcmp(subchan->inputProto, "ipc") == 0) ) {              input_is_old_style = false; -            int buffer_size; -            int prebuffering; +            dab_input_zmq_config_t zmqconfig; +              try { -                buffer_size = pt.get<int>("zmq-buffer"); +                zmqconfig.buffer_size = pt.get<int>("zmq-buffer");              }              catch (ptree_error &e) {                  stringstream ss; @@ -602,7 +602,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                  throw runtime_error(ss.str());              }              try { -                prebuffering = pt.get<int>("zmq-prebuffering"); +                zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering");              }              catch (ptree_error &e) {                  stringstream ss; @@ -610,8 +610,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                      " has no zmq-buffer defined!";                  throw runtime_error(ss.str());              } +            zmqconfig.enable_encryption = false; +              DabInputZmqMPEG* inzmq = -                new DabInputZmqMPEG(subchanuid, buffer_size, prebuffering); +                new DabInputZmqMPEG(subchanuid, zmqconfig);              inzmq->enrol_at(*rc);              subchan->input     = inzmq;              subchan->inputName = full_inputName; @@ -663,10 +665,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                  (strcmp(subchan->inputProto, "ipc") == 0) ) {              input_is_old_style = false; -            int buffer_size; -            int prebuffering; +            dab_input_zmq_config_t zmqconfig; +              try { -                buffer_size = pt.get<int>("zmq-buffer"); +                zmqconfig.buffer_size = pt.get<int>("zmq-buffer");              }              catch (ptree_error &e) {                  stringstream ss; @@ -675,7 +677,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                  throw runtime_error(ss.str());              }              try { -                prebuffering = pt.get<int>("zmq-prebuffering"); +                zmqconfig.prebuffering = pt.get<int>("zmq-prebuffering");              }              catch (ptree_error &e) {                  stringstream ss; @@ -683,8 +685,12 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,                      " has no zmq-buffer defined!";                  throw runtime_error(ss.str());              } + +            zmqconfig.enable_encryption = false; +              DabInputZmqAAC* inzmq = -                new DabInputZmqAAC(subchanuid, buffer_size, prebuffering); +                new DabInputZmqAAC(subchanuid, zmqconfig); +              inzmq->enrol_at(*rc);              subchan->input     = inzmq;              subchan->inputName = full_inputName; diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 368d646..753d6da 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -121,15 +121,15 @@ int DabInputZmqBase::readFrame(void* buffer, int size)      rc = readFromSocket(size);      /* Notify of a buffer overrun, and drop some frames */ -    if (m_frame_buffer.size() >= m_frame_buffer_limit) { +    if (m_frame_buffer.size() >= m_config.buffer_size) {          global_stats->notifyOverrun(m_name);          /* If the buffer is really too full, we drop as many frames as needed           * to get down to the prebuffering size. We would like to have our buffer           * filled to the prebuffering length.           */ -        if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) { -            size_t over_max = m_frame_buffer.size() - m_prebuffering; +        if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { +            size_t over_max = m_frame_buffer.size() - m_config.prebuffering;              while (over_max--) {                  m_frame_buffer.pop_front(); @@ -178,7 +178,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)          etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",                  m_name.c_str());          // reset prebuffering -        m_prebuf_current = m_prebuffering; +        m_prebuf_current = m_config.prebuffering;          /* We have no data to give, we give a zeroed frame */          global_stats->notifyUnderrun(m_name); @@ -222,7 +222,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize)      if (msg.size() == framesize)      { -        if (m_frame_buffer.size() > m_frame_buffer_limit) { +        if (m_frame_buffer.size() > m_config.buffer_size) {              etiLog.level(warn) <<                  "inputZMQ " << m_name <<                  " buffer full (" << m_frame_buffer.size() << ")," @@ -280,7 +280,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize)       */      if (msg.size() == 5*framesize)      { -        if (m_frame_buffer.size() > m_frame_buffer_limit) { +        if (m_frame_buffer.size() > m_config.buffer_size) {              etiLog.level(warn) <<                  "inputZMQ " << m_name <<                  " buffer full (" << m_frame_buffer.size() << ")," @@ -331,7 +331,7 @@ void DabInputZmqBase::set_parameter(const string& parameter,                     " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );          } -        m_frame_buffer_limit = new_limit; +        m_config.buffer_size = new_limit;      }      else if (parameter == "prebuffering") {          size_t new_prebuf = atol(value.c_str()); @@ -345,7 +345,7 @@ void DabInputZmqBase::set_parameter(const string& parameter,                     " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );          } -        m_prebuffering = new_prebuf; +        m_config.prebuffering = new_prebuf;      }      else if (parameter == "enable") {          if (value == "1") { @@ -370,10 +370,10 @@ const string DabInputZmqBase::get_parameter(const string& parameter) const  {      stringstream ss;      if (parameter == "buffer") { -        ss << m_frame_buffer_limit; +        ss << m_config.buffer_size;      }      else if (parameter == "prebuffering") { -        ss << m_prebuffering; +        ss << m_config.prebuffering;      }      else if (parameter == "enable") {          if (m_enable_input) diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index f41defa..50357f5 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -76,16 +76,50 @@  // want.  #define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s +struct dab_input_zmq_config_t +{ +    /* The size of the internal buffer, measured in number +     * of elements. +     * +     * Each element corresponds to five frames, +     * or one AAC superframe. +     */ +    int buffer_size; + +    /* The amount of prebuffering to do before we start streaming +     * +     * Same units as buffer_size +     */ +    int prebuffering; + +    /* Whether to enforce encryption or not +     */ +    bool enable_encryption; + +    /* Full path to file containing public key. +     */ +    std::string curve_public_keyfile; + +    /* Full path to file containing secret key. +     */ +    std::string curve_secret_keyfile; + +    /* Full path to file containing encoder public key. +     */ +    std::string curve_encoder_keyfile; +}; +  class DabInputZmqBase : public DabInputBase, public RemoteControllable {      public:          DabInputZmqBase(const std::string name, -                int buffer_size, int prebuffering) +                dab_input_zmq_config_t config)              : RemoteControllable(name),              m_zmq_context(1),              m_zmq_sock(m_zmq_context, ZMQ_SUB), -            m_bitrate(0), m_prebuffering(prebuffering), +            m_bitrate(0),              m_enable_input(true), -            m_frame_buffer_limit(buffer_size) { +            m_config(config), +            m_prebuf_current(0) {                  RC_ADD_PARAMETER(enable,                          "If the input is enabled. Set to zero to empty the buffer.");              } @@ -108,13 +142,14 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {          zmq::context_t m_zmq_context;          zmq::socket_t m_zmq_sock; // handle for the zmq socket          int m_bitrate; -        int m_prebuffering;          /* set this to zero to empty the input buffer */          bool m_enable_input; -        size_t m_frame_buffer_limit; -        std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] +        /* stores elements of type char[<superframesize>] */ +        std::list<char*> m_frame_buffer; + +        dab_input_zmq_config_t m_config;      private:          int m_prebuf_current; @@ -123,8 +158,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {  class DabInputZmqMPEG : public DabInputZmqBase {      public:          DabInputZmqMPEG(const std::string name, -                int buffer_size, int prebuffering) -            : DabInputZmqBase(name, buffer_size, prebuffering) { +                dab_input_zmq_config_t config) +            : DabInputZmqBase(name, config) {                  RC_ADD_PARAMETER(buffer,                          "Size of the input buffer [mpeg frames]"); @@ -139,8 +174,8 @@ class DabInputZmqMPEG : public DabInputZmqBase {  class DabInputZmqAAC : public DabInputZmqBase {      public:          DabInputZmqAAC(const std::string name, -                int buffer_size, int prebuffering) -            : DabInputZmqBase(name, buffer_size, prebuffering) { +                dab_input_zmq_config_t config) +            : DabInputZmqBase(name, config) {                  RC_ADD_PARAMETER(buffer,                          "Size of the input buffer [aac superframes]"); | 
