diff options
Diffstat (limited to 'src/dabInputZmq.cpp')
-rw-r--r-- | src/dabInputZmq.cpp | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index c883f35..387c8cc 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli + Copyright (C) 2013, 2014 Matthias P. Braendli http://mpb.li ZeroMQ input. see www.zeromq.org for more info @@ -40,6 +40,7 @@ #include "dabInputZmq.h" #include "StatsServer.h" #include "zmq.hpp" +#include "PcDebug.h" #ifdef HAVE_CONFIG_H # include "config.h" @@ -47,10 +48,11 @@ #ifdef HAVE_INPUT_ZEROMQ -#include <stdio.h> +#include <cstdio> +#include <cstdlib> #include <list> #include <exception> -#include <string.h> +#include <cstring> #include <string> #include <sstream> #include <limits.h> @@ -115,14 +117,14 @@ int DabInputZmqBase::readFrame(void* buffer, int size) rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() >= m_frame_buffer_limit) { global_stats->notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) { size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; while (over_max--) { @@ -217,19 +219,22 @@ int DabInputZmqMPEG::readFromSocket(int framesize) if (msg.size() == framesize) { - if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > m_frame_buffer_limit) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming frame !"; messageReceived = 0; } - else { + else if (m_enable_input) { // copy the input frame blockwise into the frame_buffer char* frame = new char[framesize]; memcpy(frame, data, framesize); m_frame_buffer.push_back(frame); } + else { + return 0; + } } else { etiLog.level(error) << @@ -273,14 +278,14 @@ int DabInputZmqAAC::readFromSocket(int framesize) */ if (msg.size() == 5*framesize) { - if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > m_frame_buffer_limit) { etiLog.level(warn) << "inputZMQ " << m_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; messageReceived = 0; } - else { + else if (m_enable_input) { // copy the input frame blockwise into the frame_buffer for (char* framestart = data; framestart < &data[5*framesize]; @@ -290,6 +295,9 @@ int DabInputZmqAAC::readFromSocket(int framesize) m_frame_buffer.push_back(frame); } } + else { + return 0; + } } else { etiLog.level(error) << @@ -309,7 +317,29 @@ void DabInputZmqBase::set_parameter(string parameter, string value) ss.exceptions ( stringstream::failbit | stringstream::badbit ); if (parameter == "buffer") { - throw ParameterError("Parameter 'buffer' is read-only"); + size_t new_limit = atol(value.c_str()); + + if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_frame_buffer_limit = new_limit; + } + else if (parameter == "enable") { + if (value == "1") { + m_enable_input = true; + } + else if (value == "0") { + m_enable_input = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } } else { stringstream ss; @@ -323,7 +353,13 @@ string DabInputZmqBase::get_parameter(string parameter) { stringstream ss; if (parameter == "buffer") { - ss << INPUT_ZMQ_MAX_BUFFER_SIZE; + ss << m_frame_buffer_limit; + } + else if (parameter == "enable") { + if (m_enable_input) + ss << "true"; + else + ss << "false"; } else { ss << "Parameter '" << parameter << |