From d6d8d67b1c921d8fca7257291c15c07bdea8d14a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 14 Feb 2014 18:09:29 +0100 Subject: Add more ZMQ RC parameters --- src/RemoteControl.cpp | 3 ++- src/RemoteControl.h | 3 ++- src/dabInputZmq.cpp | 58 +++++++++++++++++++++++++++++++++++++++++---------- src/dabInputZmq.h | 31 +++++++++++++++++++++------ 4 files changed, 76 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index a075497..fb1aa7e 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -3,7 +3,8 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li */ /* This file is part of ODR-DabMux. diff --git a/src/RemoteControl.h b/src/RemoteControl.h index f8c14fd..a39af09 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -3,7 +3,8 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li This module adds remote-control capability to some of the dabmod modules. see testremotecontrol/test.cpp for an example of how to use this. diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index c883f35..387c8cc 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli + Copyright (C) 2013, 2014 Matthias P. Braendli http://mpb.li ZeroMQ input. see www.zeromq.org for more info @@ -40,6 +40,7 @@ #include "dabInputZmq.h" #include "StatsServer.h" #include "zmq.hpp" +#include "PcDebug.h" #ifdef HAVE_CONFIG_H # include "config.h" @@ -47,10 +48,11 @@ #ifdef HAVE_INPUT_ZEROMQ -#include +#include +#include #include #include -#include +#include #include #include #include @@ -115,14 +117,14 @@ int DabInputZmqBase::readFrame(void* buffer, int size) rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() >= m_frame_buffer_limit) { global_stats->notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) { size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; while (over_max--) { @@ -217,19 +219,22 @@ int DabInputZmqMPEG::readFromSocket(int framesize) if (msg.size() == framesize) { - if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > m_frame_buffer_limit) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming frame !"; messageReceived = 0; } - else { + else if (m_enable_input) { // copy the input frame blockwise into the frame_buffer char* frame = new char[framesize]; memcpy(frame, data, framesize); m_frame_buffer.push_back(frame); } + else { + return 0; + } } else { etiLog.level(error) << @@ -273,14 +278,14 @@ int DabInputZmqAAC::readFromSocket(int framesize) */ if (msg.size() == 5*framesize) { - if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > m_frame_buffer_limit) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; messageReceived = 0; } - else { + else if (m_enable_input) { // copy the input frame blockwise into the frame_buffer for (char* framestart = data; framestart < &data[5*framesize]; @@ -290,6 +295,9 @@ int DabInputZmqAAC::readFromSocket(int framesize) m_frame_buffer.push_back(frame); } } + else { + return 0; + } } else { etiLog.level(error) << @@ -309,7 +317,29 @@ void DabInputZmqBase::set_parameter(string parameter, string value) ss.exceptions ( stringstream::failbit | stringstream::badbit ); if (parameter == "buffer") { - throw ParameterError("Parameter 'buffer' is read-only"); + size_t new_limit = atol(value.c_str()); + + if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_frame_buffer_limit = new_limit; + } + else if (parameter == "enable") { + if (value == "1") { + m_enable_input = true; + } + else if (value == "0") { + m_enable_input = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } } else { stringstream ss; @@ -323,7 +353,13 @@ string DabInputZmqBase::get_parameter(string parameter) { stringstream ss; if (parameter == "buffer") { - ss << INPUT_ZMQ_MAX_BUFFER_SIZE; + ss << m_frame_buffer_limit; + } + else if (parameter == "enable") { + if (m_enable_input) + ss << "true"; + else + ss << "false"; } else { ss << "Parameter '" << parameter << diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index cd8df6f..cb7fdd4 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli + Copyright (C) 2013, 2014 Matthias P. Braendli http://mpb.li ZeroMQ input. see www.zeromq.org for more info @@ -61,17 +61,28 @@ // Number of elements to prebuffer before starting the pipeline #define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms +// Default frame_buffer size in number of elements +#define INPUT_ZMQ_DEF_BUFFER_SIZE (5*8) // 960ms + +// Minimum frame_buffer size in number of elements +// This is one AAC superframe, but you probably don't want to +// go that low anyway. +#define INPUT_ZMQ_MIN_BUFFER_SIZE (5*1) // 120ms + // Maximum frame_buffer size in number of elements -#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms +// One minute is clearly way over what everybody would +// want. +#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s class DabInputZmqBase : public DabInputBase, public RemoteControllable { public: DabInputZmqBase(const std::string name) : RemoteControllable(name), - m_name(name), m_zmq_context(1), + m_zmq_context(1), m_zmq_sock(m_zmq_context, ZMQ_SUB), - m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING) { - } + m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING), + m_enable_input(true), + m_frame_buffer_limit(INPUT_ZMQ_DEF_BUFFER_SIZE) { } virtual int open(const std::string inputUri); virtual int readFrame(void* buffer, int size); @@ -87,11 +98,15 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { protected: virtual int readFromSocket(int framesize) = 0; - std::string m_name; zmq::context_t m_zmq_context; zmq::socket_t m_zmq_sock; // handle for the zmq socket int m_bitrate; int m_prebuffering; + + /* set this to zero to empty the input buffer */ + bool m_enable_input; + + size_t m_frame_buffer_limit; std::list m_frame_buffer; //stores elements of type char[] }; @@ -101,6 +116,8 @@ class DabInputZmqMPEG : public DabInputZmqBase { : DabInputZmqBase(name) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [mpeg frames]"); + RC_ADD_PARAMETER(enable, + "If the input is enabled. Set to zero to empty the buffer."); } private: @@ -113,6 +130,8 @@ class DabInputZmqAAC : public DabInputZmqBase { : DabInputZmqBase(name) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [aac superframes]"); + RC_ADD_PARAMETER(enable, + "If the input is enabled. Set to zero to empty the buffer."); } private: -- cgit v1.2.3