diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/RemoteControl.cpp | 3 | ||||
| -rw-r--r-- | src/RemoteControl.h | 3 | ||||
| -rw-r--r-- | src/dabInputZmq.cpp | 58 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 31 | 
4 files changed, 76 insertions, 19 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index a075497..fb1aa7e 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -3,7 +3,8 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li   */  /*     This file is part of ODR-DabMux. diff --git a/src/RemoteControl.h b/src/RemoteControl.h index f8c14fd..a39af09 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -3,7 +3,8 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li     This module adds remote-control capability to some of the dabmod modules.     see testremotecontrol/test.cpp for an example of how to use this. diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index c883f35..387c8cc 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2013 Matthias P. Braendli +   Copyright (C) 2013, 2014 Matthias P. Braendli     http://mpb.li     ZeroMQ input. see www.zeromq.org for more info @@ -40,6 +40,7 @@  #include "dabInputZmq.h"  #include "StatsServer.h"  #include "zmq.hpp" +#include "PcDebug.h"  #ifdef HAVE_CONFIG_H  #   include "config.h" @@ -47,10 +48,11 @@  #ifdef HAVE_INPUT_ZEROMQ -#include <stdio.h> +#include <cstdio> +#include <cstdlib>  #include <list>  #include <exception> -#include <string.h> +#include <cstring>  #include <string>  #include <sstream>  #include <limits.h> @@ -115,14 +117,14 @@ int DabInputZmqBase::readFrame(void* buffer, int size)      rc = readFromSocket(size);      /* Notify of a buffer overrun, and drop some frames */ -    if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { +    if (m_frame_buffer.size() >= m_frame_buffer_limit) {          global_stats->notifyOverrun(m_name);          /* If the buffer is really too full, we drop as many frames as needed           * to get down to the prebuffering size. We would like to have our buffer           * filled to the prebuffering length.           */ -        if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { +        if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) {              size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;              while (over_max--) { @@ -217,19 +219,22 @@ int DabInputZmqMPEG::readFromSocket(int framesize)      if (msg.size() == framesize)      { -        if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { +        if (m_frame_buffer.size() > m_frame_buffer_limit) {              etiLog.level(warn) <<                  "inputZMQ " << m_name <<                  " buffer full (" << m_frame_buffer.size() << "),"                  " dropping incoming frame !";              messageReceived = 0;          } -        else { +        else if (m_enable_input) {              // copy the input frame blockwise into the frame_buffer              char* frame = new char[framesize];              memcpy(frame, data, framesize);              m_frame_buffer.push_back(frame);          } +        else { +            return 0; +        }      }      else {          etiLog.level(error) << @@ -273,14 +278,14 @@ int DabInputZmqAAC::readFromSocket(int framesize)       */      if (msg.size() == 5*framesize)      { -        if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { +        if (m_frame_buffer.size() > m_frame_buffer_limit) {              etiLog.level(warn) <<                  "inputZMQ " << m_name <<                  " buffer full (" << m_frame_buffer.size() << "),"                  " dropping incoming superframe !";              messageReceived = 0;          } -        else { +        else if (m_enable_input) {              // copy the input frame blockwise into the frame_buffer              for (char* framestart = data;                      framestart < &data[5*framesize]; @@ -290,6 +295,9 @@ int DabInputZmqAAC::readFromSocket(int framesize)                  m_frame_buffer.push_back(frame);              }          } +        else { +            return 0; +        }      }      else {          etiLog.level(error) << @@ -309,7 +317,29 @@ void DabInputZmqBase::set_parameter(string parameter, string value)      ss.exceptions ( stringstream::failbit | stringstream::badbit );      if (parameter == "buffer") { -        throw ParameterError("Parameter 'buffer' is read-only"); +        size_t new_limit = atol(value.c_str()); + +        if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { +            throw ParameterError("Desired buffer size too small." +                   " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); +        } +        else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { +            throw ParameterError("Desired buffer size too large." +                   " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); +        } + +        m_frame_buffer_limit = new_limit; +    } +    else if (parameter == "enable") { +        if (value == "1") { +            m_enable_input = true; +        } +        else if (value == "0") { +            m_enable_input = false; +        } +        else { +            throw ParameterError("Value not understood, specify 0 or 1."); +        }      }      else {          stringstream ss; @@ -323,7 +353,13 @@ string DabInputZmqBase::get_parameter(string parameter)  {      stringstream ss;      if (parameter == "buffer") { -        ss << INPUT_ZMQ_MAX_BUFFER_SIZE; +        ss << m_frame_buffer_limit; +    } +    else if (parameter == "enable") { +        if (m_enable_input) +            ss << "true"; +        else +            ss << "false";      }      else {          ss << "Parameter '" << parameter << diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index cd8df6f..cb7fdd4 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2013 Matthias P. Braendli +   Copyright (C) 2013, 2014 Matthias P. Braendli     http://mpb.li     ZeroMQ input. see www.zeromq.org for more info @@ -61,17 +61,28 @@  // Number of elements to prebuffer before starting the pipeline  #define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms +// Default frame_buffer size in number of elements +#define INPUT_ZMQ_DEF_BUFFER_SIZE (5*8) // 960ms + +// Minimum frame_buffer size in number of elements +// This is one AAC superframe, but you probably don't want to +// go that low anyway. +#define INPUT_ZMQ_MIN_BUFFER_SIZE (5*1) // 120ms +  // Maximum frame_buffer size in number of elements -#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms +// One minute is clearly way over what everybody would +// want. +#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s  class DabInputZmqBase : public DabInputBase, public RemoteControllable {      public:          DabInputZmqBase(const std::string name)              : RemoteControllable(name), -            m_name(name), m_zmq_context(1), +            m_zmq_context(1),              m_zmq_sock(m_zmq_context, ZMQ_SUB), -            m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING) { -            } +            m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING), +            m_enable_input(true), +            m_frame_buffer_limit(INPUT_ZMQ_DEF_BUFFER_SIZE) { }          virtual int open(const std::string inputUri);          virtual int readFrame(void* buffer, int size); @@ -87,11 +98,15 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {      protected:          virtual int readFromSocket(int framesize) = 0; -        std::string m_name;          zmq::context_t m_zmq_context;          zmq::socket_t m_zmq_sock; // handle for the zmq socket          int m_bitrate;          int m_prebuffering; + +        /* set this to zero to empty the input buffer */ +        bool m_enable_input; + +        size_t m_frame_buffer_limit;          std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]  }; @@ -101,6 +116,8 @@ class DabInputZmqMPEG : public DabInputZmqBase {              : DabInputZmqBase(name) {                  RC_ADD_PARAMETER(buffer,                          "Size of the input buffer [mpeg frames]"); +                RC_ADD_PARAMETER(enable, +                        "If the input is enabled. Set to zero to empty the buffer.");              }      private: @@ -113,6 +130,8 @@ class DabInputZmqAAC : public DabInputZmqBase {              : DabInputZmqBase(name) {                  RC_ADD_PARAMETER(buffer,                          "Size of the input buffer [aac superframes]"); +                RC_ADD_PARAMETER(enable, +                        "If the input is enabled. Set to zero to empty the buffer.");              }      private:  | 
